みなさんこんにちは! 私の名前はアントンです。ロステレコムでは、中央データウェアハウスを開発しています。 リポジトリはモジュールで構成され、オーケストレーターはいくつかのInformaticaインスタンスを使用します。そのうちのいくつかは、オープンソースソリューションへの移行の一環としてAirflowに転送したいものです。 InformaticaとAirflowは根本的に異なるツールであるため、既存の実装を取得して繰り返すことはそれほど簡単ではありません。 一方で、現在の実装に可能な限り近いワークフローを取得し、一方で、最も興味深い最初のエアフローの原理である柔軟性を提供するダイナミズムを使用したいと考えました。
この短い記事では、Airflowでの真に動的なDAGの生成について説明します。 主にインドのこのテーマに関するインターネットの開発者からの多くの記事があります。これは「Airflowで動的にdagsを生成できます。例:<10個のHelloWorldタスク/ dagsを生成する例>」という形式の資料です。 しかし、Dagの生成に興味がありました。Dagは、変数の数とタスク名に応じて変化します。

現在、Airflowは、リポジトリにさらにアップロードするためにリモートソースサーバーでデータパケットを生成するモジュールを起動するために実装されています。 単純なスケジュールに従って実行されるため、詳細に検討することはあまり面白くありません。 また、中間のステージングにレイヤーでさらにロードするためにデータパケットを配信するモジュールのエアフローによるオーケストレーションが間もなく導入されます。 ここでは、一連のレーキを待っています。その説明はどこにも見つからず、私の経験を共有したいと思います。
Airflow onHabréには、Mail.ruの開発者からの基本的な事柄が詳しく説明されている記事がいくつかあります。
気流の一般的な説明
分岐、jinjaを介したパラメーター化、およびXcomを介したDAG通信
小さな用語集:
DAG / DAGは、有向非巡回グラフです。 この場合、相互に依存し、サイクルを形成しない一連のアクションを意味します。
SubDAG / Sabdag -DAGと同じですが、別のDAG内にあり、親DAGの一部として(タスクとして)起動され、個別のスケジュールはありません。
オペレーター/オペレーター -特定のアクションを実行するDAGの特定のステップ。 たとえば、PythonOperator。
タスク/タスク -DAGを開始するときのオペレーターの特定のインスタンスは、Webインターフェースで小さな四角形として視覚化されます。 たとえば、PythonOperatorはrun_taskと呼ばれ、DAG check_dagで実行されます。
動的タスク生成のアイデア、問題、欠点
入力データ:
オーケストラリポジトリにテーブルがあります。PKG_TABLEと呼びましょう。
データパケットをダウンロードする準備ができているエントリをPKG_TABLEテーブルに追加するメカニズムがあります。
私たちが望んだもの:
DAG。すぐにダウンロードできるパッケージ用に生成され、ダウンロードを開始します(ネタバレ:最終的にすべてが判明しました)。
以下のコードを使用して、LatestOnlyOperatorタスクと、pkg_subdag_factory関数の実行時に作成される依存タスクsubdagで構成されるdagを生成します。これは、PKG_TABLEテーブルからパッケージのリストを受け取り、いくつかのPythonOperatorsを生成します。 ダウンロードするパッケージがない場合、DummyOperatorが生成されます。
最初のバージョンを1つのPythonOperatorで作成し、Airflowを使用して詳細なワークフローに再配布することにしました。
次のスクリーンショットは、結果としてこれがどのように見えるかを示しています。
DAGの外観:

配達用の荷物がない場合のサブダグの外観:

配達用のパッケージがある場合のサブダグの外観:

問題とニュアンス
- キャッチアップは期待どおりに機能しませんでした。オフになったDAGをオンにした後、複数の起動が発生しました(スケジュールの全期間ではなく、同時に2〜3回)。 このため、LatestOnlyOperatorを追加して、最後の起動を除くすべての起動がアイドルになるようにしました。
- サブダグを作成する場合、コマンド「airflow unpause <subdag_name>」を使用してコマンドラインから明示的に有効にする必要があります。そうしないと、起動しません。 。 エアフロー構成($ airflow_home / airflow.cfg)でパラメーター "dags_are_paused_at_creation" = falseを設定した場合、それは必要ではありませんが、新しいdagの自動自動起動で不快な結果につながる可能性があります-新しいdagasを手動で起動する必要があるようです。
ドキュメントにあるように、「Airflowの重要な機能は、これらのDAG実行がアトミックでべき等のアイテムである<...>」、つまり「DAGは変更されずに生成されることが理解される」ということです。 この「キー機能」に違反したという事実により、次のことを学びました。
- 空のDAG(タスクなし)が開始されて終了することはできず、考えられるすべてのパラレルが詰まります。 これは、DAGの起動時にダウンロードパッケージがなかった場合に発生しました。 これを回避するために、DummyOperatorが作成されます。
- 作業中にタスクdagが再生成され、更新されたdagにこのタスクがなくなった場合、実行中のプロセスが中断されて停止します。 そして、これはシェダーのすべてのステップで発生しますが、エアフロー構成のmin_file_process_intervalパラメーターで示されるよりも頻繁ではありません($ airflow_home / airflow.cfg)。 これを回避するために、「ダウンロード準備完了」のステータスだけでなく、「処理中のロード」のステータスによってタスクパックを生成し、ダウンロードの進行中にタスクパックが生成されるようにしました。
- 現在のバージョンのdagに以前のタスクがない場合-たとえば、以前にロードされたdagの現在のバージョンで作成されていない「pkg_123」という名前のタスクがあった場合、Webインターフェースでこのタスクの統計を表示できません。 すべての情報はエアフローデータベースに格納されますが、それに基づいて、外部の手段によって古い起動用の美しいダッシュボードを構築することができます。 DAGの更新頻度とそれを無効にする機能について疑問が生じた場合は、 こちらで読むことができます 。
- task_idは動的に生成されるため、現在のパッケージのIDだけでなく、現在のすべてのパッケージのデータを含むディクショナリをスローする必要があります。これにより、関数自体が機能するときに、パッケージIDで同じディクショナリから必要なデータを選択できます。 それ以外の場合、同じパッケージに対してすべてのタスクが開始されます。
ログ内のExecution_dateおよび実際の開始時刻
最後に、Airflowの別のニュアンスで説明します。これは、他の記事で混同され、簡単な言葉で説明されていません-execution_date(すべてのログ、インターフェースなどに表示されます)および実際の開始時間。 原則として、説明はエアフローのドキュメントとFAQにありますが、結果は明らかではないため、説明が必要なようです。
ドキュメント :「スケジューラーは期間の終わりにジョブを開始します」
結果 :たとえば、@ dailyなどのスケジュールでdagを作成すると、execution_dateが「2018-01-01 00:00:00」の実行は、実際には「2018-02-01 00:00:00」を実行します。
便利なリンク:
キャッチアップドキュメント
LatestOnlyOperatorドキュメント
LatestOnlyOperatorに関するその他のドキュメント
LatestOnlyOperatorの使用例
いくつかのニュアンス
前回の起動の依存関係に関する質問
動的生成に関する小さな例
簡単な説明での動的生成に関する質問