Cloud Composer 1 | Cloud Composer 2
このページでは、次の設計パターンを使用して Airflow パイプライン内のタスクをグループ化する方法について説明します。
- DAG グラフ内のタスクのグループ化。
- 親 DAG からの子 DAG のトリガー。
TaskGroup
演算子を使用したタスクのグループ化(Airflow 2 のみ)。
DAG グラフ内のタスクのグループ化。
パイプラインの特定のフェーズのタスクをグループ化するには、DAG ファイル内のタスク間の関係を使用します。
次に例を示します。
このワークフローでは、タスク op-1
と op-2
は、最初のタスク start
の後に一緒に実行されます。これは、ステートメント start >> [task_1, task_2]
を使用してタスクをグループ化することで実現できます。
次のコードは、前述の DAG の完全な実装を示しています。
Airflow 2
Airflow 1
親 DAG からの子 DAG のトリガー
TriggerDagRunOperator
を使用して、ある DAG から別の DAG をトリガーできます。Airflow のバージョンに応じて、TriggerDagRunOperator
演算子は次のモジュールで見つかります。
- Airflow 1.10.* では、
airflow.operators.dagrun_operator
モジュールを使用します。 - Airflow 2 では、
airflow.operators.trigger_dagrun
モジュールを使用します。
次に例を示します。
このワークフローでは、ブロック dag_1
と dag_2
は、Cloud Composer 環境の別々の DAG にグループ化される一連のタスクを表します。
このワークフローを実装するには、2 つの個別の DAG ファイルが必要です。制御 DAG ファイルは次のようになります。
Airflow 2
Airflow 1
子 DAG の実装は、次に示すように、制御 DAG によってトリガーされます。
Airflow 2
Airflow 1
ワークフローを機能させるには、Cloud Composer 環境で両方の DAG ファイルをアップロードする必要があります。
TaskGroup
演算子を使用したタスクのグループ化
Airflow 2 では、TaskGroup
演算子を使用して DAG 内のタスクをグループ化できます。TaskGroup
ブロック内で定義されたタスクは、引き続きメインの DAG の一部です。
次に例を示します。
タスク op-1
と op-2
は、ID taskgroup_1
のブロックにグループ化されます。
このワークフローの実装は、次のコードのようになります。