在其他環境和專案中觸發 DAG

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

本頁面說明如何實作 DAG,透過 Cloud Composer 適用的 Airflow 運算子,在其他 Cloud Composer 環境和專案中觸發 DAG。

如要在環境中觸發 DAG,請參閱「排定及觸發 DAG」。

設定 IAM 權限

如果目標環境位於其他專案,環境的服務帳戶需要具備可與該專案環境互動的角色。

專案 資源 主體 角色
目標環境所在的專案 專案 來源環境的環境服務帳戶 Composer Worker 角色 (composer.worker)
目標環境所在的專案 專案 來源環境的環境服務帳戶 具有 composer.environments.executeAirflowCommand 權限的自訂角色

在其他環境中觸發 DAG

本節所述的 DAG 範例會執行下列操作:

  1. 在其他 Cloud Composer 環境中觸發 DAG。
  2. 檢查 DAG 執行作業是否完成。

另一個環境中的 DAG 執行作業完成後,範例 DAG 會標示為成功。

使用 CloudComposerRunAirflowCLICommandOperator 執行 Airflow CLI 指令

您可以使用 CloudComposerRunAirflowCLICommandOperator 運算子,在另一個 Cloud Composer 環境中執行 Airflow CLI 指令。範例 DAG 會執行 dags trigger 指令,觸發 DAG。

這個運算子可以在可延遲模式中執行,您可以將 deferrable 參數設為 True 來啟用。

run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(
    task_id="run_airflow_cli_cmd",
    project_id="target-project",
    environment_id="target-composer-environment",
    region="us-central1",
    command="dags trigger -- target_dag",
    # You can run this operator in the deferrable mode:
    # deferrable=True
)

檢查 DAG 執行作業是否完成

您可以使用 CloudComposerDAGRunSensor 感應器,檢查另一個 Cloud Composer 環境中的 DAG 執行作業是否已完成。

這個感應器可以在可延遲模式下運作,您可以將 deferrable 參數設為 True 來啟用這項功能。

dag_run_sensor = CloudComposerDAGRunSensor(
    task_id="dag_run_sensor",
    project_id="target-project",
    environment_id="target-composer-environment",
    region="us-central1",
    composer_dag_id="target_dag",
    allowed_states=["success"],
    # You can run this sensor in the deferrable mode:
    # deferrable=True
)

完整程式碼範例

以下是 DAG 的完整程式碼範例,其中結合了先前說明的兩項工作。

from datetime import datetime, timedelta
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.cloud_composer import (
    CloudComposerRunAirflowCLICommandOperator,
)
from airflow.providers.google.cloud.sensors.cloud_composer import CloudComposerDAGRunSensor

DAG_ID = "trigger_dag_in_another_composer_environment"

TARGET_PROJECT_ID = "example-target-project"
TARGET_REGION = "example-target-region"
TARGET_ENV_ID = "example-target-composer-environment"

TARGET_DAG = "example_target_dag_id"
COMMAND = f"dags trigger -- {TARGET_DAG}"

with DAG(
    DAG_ID,
    schedule="@once",
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=["example", "composer"],
) as dag:

    run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(
        task_id="run_airflow_cli_cmd",
        project_id=TARGET_PROJECT_ID,
        environment_id=TARGET_ENV_ID,
        region=TARGET_REGION,
        command=COMMAND,
        # You can run this operator in the deferrable mode:
        # deferrable=True
    )

    dag_run_sensor = CloudComposerDAGRunSensor(
        task_id="dag_run_sensor",
        project_id=TARGET_PROJECT_ID,
        environment_id=TARGET_ENV_ID,
        region=TARGET_REGION,
        composer_dag_id=TARGET_DAG,
        allowed_states=["success"],
        execution_range=timedelta(minutes=5),
        # You can run this sensor in the deferrable mode:
        # deferrable=True
    )

    run_airflow_cli_cmd >> dag_run_sensor

後續步驟