将 DAG 中的任务分组

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

本页介绍如何使用以下设计模式对 Airflow 流水线中的任务进行分组:

  • 将 DAG 图中的任务分组。
  • 从父 DAG 触发子 DAG。
  • 使用 TaskGroup operator 对任务分组。

将 DAG 图中的任务分组

要将流水线的特定阶段中的任务分组,您可以使用 DAG 文件中任务之间的关系。

请参考以下示例:

显示分支任务的 Airflow 任务图
图 1. Airflow DAG 中的任务可以分组到一起(点击可放大)

在此工作流中,任务 op-1op-2 在初始任务 start 后一起运行。您可以使用语句 start >> [task_1, task_2] 来将任务分组,以实现此目的。

以下示例提供了此 DAG 的完整实现:

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago

DAG_NAME = "all_tasks_in_one_dag"

args = {"owner": "airflow", "start_date": days_ago(1), "schedule_interval": "@once"}

with DAG(dag_id=DAG_NAME, default_args=args) as dag:
    start = DummyOperator(task_id="start")

    task_1 = BashOperator(task_id="op-1", bash_command=":", dag=dag)

    task_2 = BashOperator(task_id="op-2", bash_command=":", dag=dag)

    some_other_task = DummyOperator(task_id="some-other-task")

    task_3 = BashOperator(task_id="op-3", bash_command=":", dag=dag)

    task_4 = BashOperator(task_id="op-4", bash_command=":", dag=dag)

    end = DummyOperator(task_id="end")

    start >> [task_1, task_2] >> some_other_task >> [task_3, task_4] >> end

从父 DAG 触发子 DAG

您可以使用 TriggerDagRunOperator 运算符从一个 DAG 触发另一个 DAG。

请参考以下示例:

Airflow 任务图,显示了作为 DAG 图的一部分触发的子 DAG
图 2. 可以使用 TriggerDagRunOperator 从 DAG 中触发 DAG(点击可放大)

在此工作流中,dag_1dag_2 块表示一系列任务,这些任务分组到 Cloud Composer 环境中的不同 DAG 中。

此工作流的实现需要两个独立的 DAG 文件。控制 DAG 文件如下所示:

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.utils.dates import days_ago


with DAG(
    dag_id="controller_dag_to_trigger_other_dags",
    default_args={"owner": "airflow"},
    start_date=days_ago(1),
    schedule_interval="@once",
) as dag:
    start = DummyOperator(task_id="start")

    trigger_1 = TriggerDagRunOperator(
        task_id="dag_1",
        trigger_dag_id="dag-to-trigger",  # Ensure this equals the dag_id of the DAG to trigger
        conf={"message": "Hello World"},
    )
    trigger_2 = TriggerDagRunOperator(
        task_id="dag_2",
        trigger_dag_id="dag-to-trigger",  # Ensure this equals the dag_id of the DAG to trigger
        conf={"message": "Hello World"},
    )

    some_other_task = DummyOperator(task_id="some-other-task")

    end = DummyOperator(task_id="end")

    start >> trigger_1 >> some_other_task >> trigger_2 >> end

由控制 DAG 触发的子 DAG 的实现如下所示:

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago

DAG_NAME = "dag-to-trigger"

args = {"owner": "airflow", "start_date": days_ago(1), "schedule_interval": "None"}

with DAG(dag_id=DAG_NAME, default_args=args) as dag:
    dag_task = DummyOperator(task_id="dag-task")

您必须在 Cloud Composer 环境中上传两个 DAG 文件,此 DAG 才能正常运行。

使用 TaskGroup operator 对任务分组

您可以使用 TaskGroup operator 将 DAG 中的任务分组在一起。在 TaskGroup 块中定义的任务仍然是主 DAG 的一部分。

请参考以下示例:

显示两个任务组的 Airflow 任务图表
图 3. 可以使用 TaskGroup operator 在界面中直观地将任务分组到一起(点击可放大)

任务 op-1op-2 分组到一个 ID 为 taskgroup_1 的块中。此工作流的实现类似于以下代码:

from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago
from airflow.utils.task_group import TaskGroup

with DAG(dag_id="taskgroup_example", start_date=days_ago(1)) as dag:
    start = DummyOperator(task_id="start")

    with TaskGroup("taskgroup_1", tooltip="task group #1") as section_1:
        task_1 = BashOperator(task_id="op-1", bash_command=":")
        task_2 = BashOperator(task_id="op-2", bash_command=":")

    with TaskGroup("taskgroup_2", tooltip="task group #2") as section_2:
        task_3 = BashOperator(task_id="op-3", bash_command=":")
        task_4 = BashOperator(task_id="op-4", bash_command=":")

    some_other_task = DummyOperator(task_id="some-other-task")

    end = DummyOperator(task_id="end")

    start >> section_1 >> some_other_task >> section_2 >> end

后续步骤