Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
本页介绍如何使用以下设计模式对 Airflow 流水线中的任务进行分组:
- 将 DAG 图中的任务分组。
- 从父 DAG 触发子 DAG。
- 使用
TaskGroup
operator 将任务分组(仅适用于 Airflow 2)。
将 DAG 图中的任务分组
要将流水线的特定阶段中的任务分组,您可以使用 DAG 文件中任务之间的关系。
请思考以下示例:
在此工作流中,任务 op-1
和 op-2
在初始任务 start
后一起运行。您可以使用语句 start >> [task_1, task_2]
来将任务分组,以实现此目的。
以下代码提供了上述 DAG 的完整实现:
Airflow 2
Airflow 1
从父 DAG 触发子 DAG
您可以使用 TriggerDagRunOperator
从一个 DAG 触发另一个 DAG。根据您的 Airflow 版本,您可以在不同模块中找到 TriggerDagRunOperator
operator:
- 对于 Airflow 1.10.*,请使用
airflow.operators.dagrun_operator
模块。 - 对于 Airflow 2,请使用
airflow.operators.trigger_dagrun
模块。
请思考以下示例:
在此工作流中,dag_1
和 dag_2
块表示一系列任务,这些任务分组到 Cloud Composer 环境中的不同 DAG 中。
此工作流的实现需要两个独立的 DAG 文件。控制 DAG 文件如下所示:
Airflow 2
Airflow 1
由控制 DAG 触发的子 DAG 的实现如下所示:
Airflow 2
Airflow 1
您必须在 Cloud Composer 环境中上传两个 DAG 文件,此工作流才能正常运行。
使用 TaskGroup
operator 将任务分组
在 Airflow 2 中,您可以使用
TaskGroup
运算符
在 DAG 中将任务组合在一起。
在 TaskGroup
块中定义的任务仍然是主 DAG 的一部分。
请思考以下示例:
任务 op-1
和 op-2
分组到一个 ID 为 taskgroup_1
的块中。此工作流的实现类似于以下代码: