Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
本页面演示了如何使用适用于 Cloud Composer 的 Airflow 运算符实现一个 DAG,以便在其他 Cloud Composer 环境和项目中触发 DAG。
如果您想改为在自己的环境中触发 DAG,请参阅安排和触发 DAG。
配置 IAM 权限
如果目标环境位于其他项目中,则环境的服务账号需要具有允许与该项目中的环境交互的角色。
项目 | 资源 | 主账号 | 角色 |
---|---|---|---|
目标环境所在的项目 | 项目 | 来源环境的环境服务账号 |
Composer Worker 角色 (composer.worker ) |
目标环境所在的项目 | 项目 | 来源环境的环境服务账号 |
具有 composer.environment.executeAirflowCommand 权限的自定义角色 |
在其他环境中触发 DAG
本部分中介绍的示例 DAG 会执行以下操作:
- 在其他 Cloud Composer 环境中触发 DAG。
- 检查 DAG 运行是否已完成。
在其他环境中运行 DAG 完成后,示例 DAG 会被标记为成功。
使用 CloudComposerRunAirflowCLICommandOperator 运行 Airflow CLI 命令
您可以使用 CloudComposerRunAirflowCLICommandOperator 运算符在其他 Cloud Composer 环境中运行 Airflow CLI 命令。示例 DAG 会执行 dags trigger
命令,该命令会触发 DAG。
此运算符可在可延迟模式下运行,您可以通过将 deferrable
参数设置为 True
来启用此模式。
run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(
task_id="run_airflow_cli_cmd",
project_id="target-project",
environment_id="target-composer-environment",
region="us-central1",
command="dags trigger -- target_dag",
# You can run this operator in the deferrable mode:
# deferrable=True
)
检查 DAG 运行作业是否已完成
您可以使用 CloudComposerDAGRunSensor 传感器检查 DAG 是否已在其他 Cloud Composer 环境中运行完毕。
此传感器可在可延迟模式下运行,您可以通过将 deferrable
参数设置为 True
来启用此模式。
dag_run_sensor = CloudComposerDAGRunSensor(
task_id="dag_run_sensor",
project_id="target-project",
environment_id="target-composer-environment",
region="us-central1",
composer_dag_id="target_dag",
allowed_states=["success"],
# You can run this sensor in the deferrable mode:
# deferrable=True
)
完整示例代码
以下是将前面介绍的两个任务组合到一起的 DAG 的完整代码示例。
from datetime import datetime, timedelta
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.cloud_composer import (
CloudComposerRunAirflowCLICommandOperator,
)
from airflow.providers.google.cloud.sensors.cloud_composer import CloudComposerDAGRunSensor
DAG_ID = "trigger_dag_in_another_composer_environment"
TARGET_PROJECT_ID = "example-target-project"
TARGET_REGION = "example-target-region"
TARGET_ENV_ID = "example-target-composer-environment"
TARGET_DAG = "example_target_dag_id"
COMMAND = f"dags trigger -- {TARGET_DAG}"
with DAG(
DAG_ID,
schedule="@once",
start_date=datetime(2024, 1, 1),
catchup=False,
tags=["example", "composer"],
) as dag:
run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(
task_id="run_airflow_cli_cmd",
project_id=TARGET_PROJECT_ID,
environment_id=TARGET_ENV_ID,
region=TARGET_REGION,
command=COMMAND,
# You can run this operator in the deferrable mode:
# deferrable=True
)
dag_run_sensor = CloudComposerDAGRunSensor(
task_id="dag_run_sensor",
project_id=TARGET_PROJECT_ID,
environment_id=TARGET_ENV_ID,
region=TARGET_REGION,
composer_dag_id=TARGET_DAG,
allowed_states=["success"],
execution_range=timedelta(minutes=5),
# You can run this sensor in the deferrable mode:
# deferrable=True
)
run_airflow_cli_cmd >> dag_run_sensor