DAG のタスクをグループ化する

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

このページでは、次の設計パターンを使用して Airflow パイプラインでタスクをグループ化する方法について説明します。

  • DAG グラフ内のタスクのグループ化。
  • 親 DAG からの子 DAG のトリガー。
  • TaskGroup 演算子を使用したタスクのグループ化。

DAG グラフ内のタスクをグループ化する

パイプラインの特定のフェーズでタスクをグループ化するには、DAG ファイルのタスク間の関係を使用します。

次に例を示します。

分岐タスクを示す Airflow タスクのグラフ
図 1.タスクは Airflow DAG でグループ化できます(クリックして拡大)

このワークフローでは、タスク op-1op-2 が最初のタスク start の後に一緒に実行されます。これを実現するには、ステートメント start >> [task_1, task_2] を使用してタスクをグループ化します。

次の例は、この DAG の完全な実装を示しています。

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago

DAG_NAME = "all_tasks_in_one_dag"

args = {"owner": "airflow", "start_date": days_ago(1), "schedule_interval": "@once"}

with DAG(dag_id=DAG_NAME, default_args=args) as dag:
    start = DummyOperator(task_id="start")

    task_1 = BashOperator(task_id="op-1", bash_command=":", dag=dag)

    task_2 = BashOperator(task_id="op-2", bash_command=":", dag=dag)

    some_other_task = DummyOperator(task_id="some-other-task")

    task_3 = BashOperator(task_id="op-3", bash_command=":", dag=dag)

    task_4 = BashOperator(task_id="op-4", bash_command=":", dag=dag)

    end = DummyOperator(task_id="end")

    start >> [task_1, task_2] >> some_other_task >> [task_3, task_4] >> end

親 DAG からの子 DAG のトリガー

TriggerDagRunOperator 演算子を使用して、別の DAG から 1 つの DAG をトリガーできます。

次に例を示します。

DAG グラフの一部としてトリガーされた子 DAG を示す Airflow タスクのグラフ
図 2.DAG は、TriggerDagRunOperator を使用して DAG 内でトリガーできます(クリックして拡大)

このワークフローでは、ブロック dag_1dag_2 は、Cloud Composer 環境の別の DAG でグループ化された一連のタスクを表します。

このワークフローを実装するには、2 つの個別の DAG ファイルが必要です。制御 DAG ファイルは次のようになります。

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.utils.dates import days_ago


with DAG(
    dag_id="controller_dag_to_trigger_other_dags",
    default_args={"owner": "airflow"},
    start_date=days_ago(1),
    schedule_interval="@once",
) as dag:
    start = DummyOperator(task_id="start")

    trigger_1 = TriggerDagRunOperator(
        task_id="dag_1",
        trigger_dag_id="dag-to-trigger",  # Ensure this equals the dag_id of the DAG to trigger
        conf={"message": "Hello World"},
    )
    trigger_2 = TriggerDagRunOperator(
        task_id="dag_2",
        trigger_dag_id="dag-to-trigger",  # Ensure this equals the dag_id of the DAG to trigger
        conf={"message": "Hello World"},
    )

    some_other_task = DummyOperator(task_id="some-other-task")

    end = DummyOperator(task_id="end")

    start >> trigger_1 >> some_other_task >> trigger_2 >> end

子 DAG の実装は、次に示すように、制御 DAG によってトリガーされます。

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago

DAG_NAME = "dag-to-trigger"

args = {"owner": "airflow", "start_date": days_ago(1), "schedule_interval": "None"}

with DAG(dag_id=DAG_NAME, default_args=args) as dag:
    dag_task = DummyOperator(task_id="dag-task")

DAG を機能させるには、Cloud Composer 環境に両方の DAG ファイルをアップロードする必要があります。

TaskGroup 演算子を使用したタスクのグループ化

TaskGroup 演算子を使用して、DAG 内でタスクをグループ化できます。TaskGroup ブロック内で定義されたタスクは、引き続きメイン DAG の一部です。

次に例を示します。

2 つのタスクグループを示す Airflow タスクのグラフ
図 3.タスクは、TaskGroup 演算子を使用して UI で視覚的にグループ化できます(クリックして拡大)

タスク op-1op-2 は、ID taskgroup_1 のブロックにグループ化されています。このワークフローの実装は次のコードのようになります。

from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago
from airflow.utils.task_group import TaskGroup

with DAG(dag_id="taskgroup_example", start_date=days_ago(1)) as dag:
    start = DummyOperator(task_id="start")

    with TaskGroup("taskgroup_1", tooltip="task group #1") as section_1:
        task_1 = BashOperator(task_id="op-1", bash_command=":")
        task_2 = BashOperator(task_id="op-2", bash_command=":")

    with TaskGroup("taskgroup_2", tooltip="task group #2") as section_2:
        task_3 = BashOperator(task_id="op-3", bash_command=":")
        task_4 = BashOperator(task_id="op-4", bash_command=":")

    some_other_task = DummyOperator(task_id="some-other-task")

    end = DummyOperator(task_id="end")

    start >> section_1 >> some_other_task >> section_2 >> end

次のステップ