他の環境やプロジェクトで DAG をトリガーする

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

このページでは、Cloud Composer の Airflow オペレーターを使用して、他の Cloud Composer 環境とプロジェクトの DAG をトリガーする DAG を実装する方法について説明します。

代わりに環境で DAG をトリガーする場合は、DAG のスケジュール設定とトリガーをご覧ください。

IAM 権限を構成する

移行先の環境が別のプロジェクトにある場合、環境のサービス アカウントには、そのプロジェクトの環境を操作できるロールが必要です。

プロジェクト リソース プリンシパル ロール
移行先の環境が配置されているプロジェクト プロジェクト 移行元の環境の環境のサービス アカウント Composer ワーカーロール(composer.worker
移行先の環境が配置されているプロジェクト プロジェクト 移行元の環境の環境のサービス アカウント composer.environment.executeAirflowCommand 権限のあるカスタムロール

別の環境で DAG をトリガーする

このセクションで説明する DAG の例では、次のことを行います。

  1. 別の Cloud Composer 環境で DAG をトリガーする。
  2. DAG の実行が完了したかどうかを確認します。

別の環境での DAG の実行が完了すると、サンプル DAG は成功としてマークされます。

CloudComposerRunAirflowCLICommandOperator を使用して Airflow CLI コマンドを実行する

CloudComposerRunAirflowCLICommandOperator オペレーターを使用すると、別の Cloud Composer 環境で Airflow CLI コマンドを実行できます。この DAG の例では、dags trigger コマンドを実行して DAG をトリガーします。

この演算子は遅延可能モードで実行できます。deferrable パラメータを True に設定することで有効にできます。

run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(
    task_id="run_airflow_cli_cmd",
    project_id="target-project",
    environment_id="target-composer-environment",
    region="us-central1",
    command="dags trigger -- target_dag",
    # You can run this operator in the deferrable mode:
    # deferrable=True
)

DAG の実行が完了したかどうかを確認する

CloudComposerDAGRunSensor センサーを使用して、別の Cloud Composer 環境で DAG 実行が完了したかどうかを確認できます。

このセンサーは延期モードで実行できます。有効にするには、deferrable パラメータを True に設定します。

dag_run_sensor = CloudComposerDAGRunSensor(
    task_id="dag_run_sensor",
    project_id="target-project",
    environment_id="target-composer-environment",
    region="us-central1",
    composer_dag_id="target_dag",
    allowed_states=["success"],
    # You can run this sensor in the deferrable mode:
    # deferrable=True
)

完全なサンプルコード

以下は、前述の 2 つのタスクを組み合わせた DAG の完全なコード例です。

from datetime import datetime, timedelta
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.cloud_composer import (
    CloudComposerRunAirflowCLICommandOperator,
)
from airflow.providers.google.cloud.sensors.cloud_composer import CloudComposerDAGRunSensor

DAG_ID = "trigger_dag_in_another_composer_environment"

TARGET_PROJECT_ID = "example-target-project"
TARGET_REGION = "example-target-region"
TARGET_ENV_ID = "example-target-composer-environment"

TARGET_DAG = "example_target_dag_id"
COMMAND = f"dags trigger -- {TARGET_DAG}"

with DAG(
    DAG_ID,
    schedule="@once",
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=["example", "composer"],
) as dag:

    run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(
        task_id="run_airflow_cli_cmd",
        project_id=TARGET_PROJECT_ID,
        environment_id=TARGET_ENV_ID,
        region=TARGET_REGION,
        command=COMMAND,
        # You can run this operator in the deferrable mode:
        # deferrable=True
    )

    dag_run_sensor = CloudComposerDAGRunSensor(
        task_id="dag_run_sensor",
        project_id=TARGET_PROJECT_ID,
        environment_id=TARGET_ENV_ID,
        region=TARGET_REGION,
        composer_dag_id=TARGET_DAG,
        allowed_states=["success"],
        execution_range=timedelta(minutes=5),
        # You can run this sensor in the deferrable mode:
        # deferrable=True
    )

    run_airflow_cli_cmd >> dag_run_sensor

次のステップ