Trigger DAGs in other environments and projects

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

This page demonstrates how to implement a DAG that triggers DAGs in other Cloud Composer environments and projects by using Airflow operators for Cloud Composer.

If you want to trigger DAGs in your environment instead, see Schedule and trigger DAGs.

Configure IAM permissions

If the target environment is located in another project, then the service account of your environment needs roles that allows interacting with environments in that project.

Project Resource Principal Role
Project where the target environment is located Project Environment's service account of the source environment Composer Worker role (composer.worker)
Project where the target environment is located Project Environment's service account of the source environment A custom role with the composer.environment.executeAirflowCommand permission

Trigger a DAG in another environment

The example DAG described in this section does the following:

  1. Trigger a DAG in another Cloud Composer environment.
  2. Checks if a DAG run is completed.

After the DAG run in another environment is completed, the example DAG is marked as successful.

Run Airflow CLI commands with CloudComposerRunAirflowCLICommandOperator

You can use the CloudComposerRunAirflowCLICommandOperator operator to run Airflow CLI commands in another Cloud Composer environment. The example DAG executes the dags trigger command, which triggers a DAG.

This operator can run in the deferrable mode, you can enable it by setting the deferrable parameter to True.

run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(
    task_id="run_airflow_cli_cmd",
    project_id="target-project",
    environment_id="target-composer-environment",
    region="us-central1",
    command="dags trigger -- target_dag",
    # You can run this operator in the deferrable mode:
    # deferrable=True
)

Check if a DAG run is completed

You can use the CloudComposerDAGRunSensor sensor to checks if a DAG run is completed in another Cloud Composer environment.

This sensor can run in the deferrable mode, you can enable it by setting the deferrable parameter to True.

dag_run_sensor = CloudComposerDAGRunSensor(
    task_id="dag_run_sensor",
    project_id="target-project",
    environment_id="target-composer-environment",
    region="us-central1",
    composer_dag_id="target_dag",
    allowed_states=["success"],
    # You can run this sensor in the deferrable mode:
    # deferrable=True
)

Full example code

The following is the full code example of a DAG that combines the two previously described tasks.

from datetime import datetime, timedelta
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.cloud_composer import (
    CloudComposerRunAirflowCLICommandOperator,
)
from airflow.providers.google.cloud.sensors.cloud_composer import CloudComposerDAGRunSensor

DAG_ID = "trigger_dag_in_another_composer_environment"

TARGET_PROJECT_ID = "example-target-project"
TARGET_REGION = "example-target-region"
TARGET_ENV_ID = "example-target-composer-environment"

TARGET_DAG = "example_target_dag_id"
COMMAND = f"dags trigger -- {TARGET_DAG}"

with DAG(
    DAG_ID,
    schedule="@once",
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=["example", "composer"],
) as dag:

    run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(
        task_id="run_airflow_cli_cmd",
        project_id=TARGET_PROJECT_ID,
        environment_id=TARGET_ENV_ID,
        region=TARGET_REGION,
        command=COMMAND,
        # You can run this operator in the deferrable mode:
        # deferrable=True
    )

    dag_run_sensor = CloudComposerDAGRunSensor(
        task_id="dag_run_sensor",
        project_id=TARGET_PROJECT_ID,
        environment_id=TARGET_ENV_ID,
        region=TARGET_REGION,
        composer_dag_id=TARGET_DAG,
        allowed_states=["success"],
        execution_range=timedelta(minutes=5),
        # You can run this sensor in the deferrable mode:
        # deferrable=True
    )

    run_airflow_cli_cmd >> dag_run_sensor

What's next