Activa DAG en otros entornos y proyectos

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

En esta página, se muestra cómo implementar un DAG que activa DAGs en otros entornos y proyectos de Cloud Composer con operadores de Airflow para Cloud Composer.

Si, en cambio, deseas activar DAGs en tu entorno, consulta Cómo programar y activar DAGs.

Configura los permisos de IAM

Si el entorno de destino se encuentra en otro proyecto, la cuenta de servicio de tu entorno necesita roles que permitan interactuar con entornos en ese proyecto.

Proyecto Recurso Principal Rol
Proyecto en el que se encuentra el entorno de destino Proyecto Cuenta de servicio del entorno de origen Rol de trabajador de Composer (composer.worker)
Proyecto en el que se encuentra el entorno de destino Proyecto Cuenta de servicio del entorno de origen Una función personalizada con el permiso composer.environments.executeAirflowCommand

Activa un DAG en otro entorno

El DAG de ejemplo que se describe en esta sección hace lo siguiente:

  1. Activa un DAG en otro entorno de Cloud Composer.
  2. Comprueba si se completó una ejecución de DAG.

Una vez que se completa la ejecución del DAG en otro entorno, el DAG de ejemplo se marca como exitoso.

Ejecuta comandos de la CLI de Airflow con CloudComposerRunAirflowCLICommandOperator

Puedes usar el operador CloudComposerRunAirflowCLICommandOperator para ejecutar comandos de la CLI de Airflow en otro entorno de Cloud Composer. El DAG de ejemplo ejecuta el comando dags trigger, que activa un DAG.

Este operador puede ejecutarse en el modo aplazable, que puedes habilitar configurando el parámetro deferrable en True.

run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(
    task_id="run_airflow_cli_cmd",
    project_id="target-project",
    environment_id="target-composer-environment",
    region="us-central1",
    command="dags trigger -- target_dag",
    # You can run this operator in the deferrable mode:
    # deferrable=True
)

Cómo verificar si se completó una ejecución de DAG

Puedes usar el sensor CloudComposerDAGRunSensor para verificar si se completó una ejecución de DAG en otro entorno de Cloud Composer.

Este sensor puede ejecutarse en el modo aplazable. Para habilitarlo, configura el parámetro deferrable en True.

dag_run_sensor = CloudComposerDAGRunSensor(
    task_id="dag_run_sensor",
    project_id="target-project",
    environment_id="target-composer-environment",
    region="us-central1",
    composer_dag_id="target_dag",
    allowed_states=["success"],
    # You can run this sensor in the deferrable mode:
    # deferrable=True
)

Código de ejemplo completo

A continuación, se muestra el ejemplo de código completo de un DAG que combina las dos tareas descritas anteriormente.

from datetime import datetime, timedelta
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.cloud_composer import (
    CloudComposerRunAirflowCLICommandOperator,
)
from airflow.providers.google.cloud.sensors.cloud_composer import CloudComposerDAGRunSensor

DAG_ID = "trigger_dag_in_another_composer_environment"

TARGET_PROJECT_ID = "example-target-project"
TARGET_REGION = "example-target-region"
TARGET_ENV_ID = "example-target-composer-environment"

TARGET_DAG = "example_target_dag_id"
COMMAND = f"dags trigger -- {TARGET_DAG}"

with DAG(
    DAG_ID,
    schedule="@once",
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=["example", "composer"],
) as dag:

    run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(
        task_id="run_airflow_cli_cmd",
        project_id=TARGET_PROJECT_ID,
        environment_id=TARGET_ENV_ID,
        region=TARGET_REGION,
        command=COMMAND,
        # You can run this operator in the deferrable mode:
        # deferrable=True
    )

    dag_run_sensor = CloudComposerDAGRunSensor(
        task_id="dag_run_sensor",
        project_id=TARGET_PROJECT_ID,
        environment_id=TARGET_ENV_ID,
        region=TARGET_REGION,
        composer_dag_id=TARGET_DAG,
        allowed_states=["success"],
        execution_range=timedelta(minutes=5),
        # You can run this sensor in the deferrable mode:
        # deferrable=True
    )

    run_airflow_cli_cmd >> dag_run_sensor

¿Qué sigue?