Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
このページでは、Cloud Composer の Airflow オペレーターを使用して、他の Cloud Composer 環境とプロジェクトの DAG をトリガーする DAG を実装する方法について説明します。
代わりに環境で DAG をトリガーする場合は、DAG のスケジュール設定とトリガーをご覧ください。
IAM 権限を構成する
移行先の環境が別のプロジェクトにある場合、環境のサービス アカウントには、そのプロジェクトの環境を操作できるロールが必要です。
プロジェクト | リソース | プリンシパル | ロール |
---|---|---|---|
移行先の環境が配置されているプロジェクト | プロジェクト | 移行元の環境の環境のサービス アカウント |
Composer ワーカーロール(composer.worker ) |
移行先の環境が配置されているプロジェクト | プロジェクト | 移行元の環境の環境のサービス アカウント |
composer.environment.executeAirflowCommand 権限のあるカスタムロール |
別の環境で DAG をトリガーする
このセクションで説明する DAG の例では、次のことを行います。
- 別の Cloud Composer 環境で DAG をトリガーする。
- DAG の実行が完了したかどうかを確認します。
別の環境での DAG の実行が完了すると、サンプル DAG は成功としてマークされます。
CloudComposerRunAirflowCLICommandOperator を使用して Airflow CLI コマンドを実行する
CloudComposerRunAirflowCLICommandOperator オペレーターを使用すると、別の Cloud Composer 環境で Airflow CLI コマンドを実行できます。この DAG の例では、dags trigger
コマンドを実行して DAG をトリガーします。
この演算子は遅延可能モードで実行できます。deferrable
パラメータを True
に設定することで有効にできます。
run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(
task_id="run_airflow_cli_cmd",
project_id="target-project",
environment_id="target-composer-environment",
region="us-central1",
command="dags trigger -- target_dag",
# You can run this operator in the deferrable mode:
# deferrable=True
)
DAG の実行が完了したかどうかを確認する
CloudComposerDAGRunSensor センサーを使用して、別の Cloud Composer 環境で DAG 実行が完了したかどうかを確認できます。
このセンサーは延期モードで実行できます。有効にするには、deferrable
パラメータを True
に設定します。
dag_run_sensor = CloudComposerDAGRunSensor(
task_id="dag_run_sensor",
project_id="target-project",
environment_id="target-composer-environment",
region="us-central1",
composer_dag_id="target_dag",
allowed_states=["success"],
# You can run this sensor in the deferrable mode:
# deferrable=True
)
完全なサンプルコード
以下は、前述の 2 つのタスクを組み合わせた DAG の完全なコード例です。
from datetime import datetime, timedelta
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.cloud_composer import (
CloudComposerRunAirflowCLICommandOperator,
)
from airflow.providers.google.cloud.sensors.cloud_composer import CloudComposerDAGRunSensor
DAG_ID = "trigger_dag_in_another_composer_environment"
TARGET_PROJECT_ID = "example-target-project"
TARGET_REGION = "example-target-region"
TARGET_ENV_ID = "example-target-composer-environment"
TARGET_DAG = "example_target_dag_id"
COMMAND = f"dags trigger -- {TARGET_DAG}"
with DAG(
DAG_ID,
schedule="@once",
start_date=datetime(2024, 1, 1),
catchup=False,
tags=["example", "composer"],
) as dag:
run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(
task_id="run_airflow_cli_cmd",
project_id=TARGET_PROJECT_ID,
environment_id=TARGET_ENV_ID,
region=TARGET_REGION,
command=COMMAND,
# You can run this operator in the deferrable mode:
# deferrable=True
)
dag_run_sensor = CloudComposerDAGRunSensor(
task_id="dag_run_sensor",
project_id=TARGET_PROJECT_ID,
environment_id=TARGET_ENV_ID,
region=TARGET_REGION,
composer_dag_id=TARGET_DAG,
allowed_states=["success"],
execution_range=timedelta(minutes=5),
# You can run this sensor in the deferrable mode:
# deferrable=True
)
run_airflow_cli_cmd >> dag_run_sensor