Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
このページでは、次の設計パターンを使用して Airflow パイプラインのタスクをグループ化する方法について説明します。
- DAG グラフ内のタスクのグループ化。
- 親 DAG からの子 DAG のトリガー。
TaskGroup
演算子を使用したタスクのグループ化(Airflow 2 のみ)。
DAG グラフ内のタスクのグループ化。
パイプラインの特定のフェーズでタスクをグループ化するには、DAG ファイルのタスク間の関係を使用します。
次に例を示します。
このワークフローでは、タスク op-1
と op-2
が最初のタスク start
の後に一緒に実行されます。これは、ステートメント start >> [task_1, task_2]
を使用してタスクをグループ化することで実現できます。
次のコードは、前述の DAG の完全な実装を示しています。
Airflow 2
Airflow 1
親 DAG からの子 DAG のトリガー
TriggerDagRunOperator
を使用して、別の DAG からある DAG をトリガーできます。
Airflow のバージョンに応じて、TriggerDagRunOperator
演算子は別のモジュールにあります。
- Airflow 1.10.* では、
airflow.operators.dagrun_operator
モジュールを使用します。 - Airflow 2 では、
airflow.operators.trigger_dagrun
モジュールを使用します。
次に例を示します。
このワークフローでは、ブロック dag_1
と dag_2
は一連のタスクを表します。これらのタスクは Cloud Composer 環境の別の DAG にグループ化されます。
このワークフローを実装するには、2 つの個別の DAG ファイルが必要です。制御 DAG ファイルは次のようになります。
Airflow 2
Airflow 1
子 DAG の実装は、次に示すように、制御 DAG によってトリガーされます。
Airflow 2
Airflow 1
ワークフローが機能するには、Cloud Composer 環境に両方の DAG ファイルをアップロードする必要があります。
TaskGroup
演算子を使用したタスクのグループ化
Airflow 2 では、TaskGroup
演算子を使用して DAG 内のタスクをグループ化できます。TaskGroup
ブロック内で定義されたタスクは、引き続きメイン DAG の一部です。
次に例を示します。
タスク op-1
と op-2
は、ID taskgroup_1
のブロックにグループ化されています。このワークフローの実装は次のコードのようになります。