Activar DAGs en otros entornos y proyectos

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

En esta página se muestra cómo implementar un DAG que activa DAGs en otros entornos y proyectos de Cloud Composer mediante operadores de Airflow para Cloud Composer.

Si quieres activar DAGs en tu entorno, consulta Programar y activar DAGs.

Configurar permisos de gestión de identidades y accesos

Si el entorno de destino se encuentra en otro proyecto, la cuenta de servicio de tu entorno necesita roles que permitan interactuar con entornos de ese proyecto.

Proyecto Recurso Principal Rol
Proyecto en el que se encuentra el entorno de destino Proyecto Cuenta de servicio del entorno de origen Rol Trabajador de Composer (composer.worker)
Proyecto en el que se encuentra el entorno de destino Proyecto Cuenta de servicio del entorno de origen Un rol personalizado con el permiso composer.environments.executeAirflowCommand

Activar un DAG en otro entorno

El DAG de ejemplo descrito en esta sección hace lo siguiente:

  1. Activa un DAG en otro entorno de Cloud Composer.
  2. Comprueba si se ha completado una ejecución de DAG.

Una vez completada la ejecución del DAG en otro entorno, el DAG de ejemplo se marca como correcto.

Ejecutar comandos de la CLI de Airflow con CloudComposerRunAirflowCLICommandOperator

Puedes usar el operador CloudComposerRunAirflowCLICommandOperator para ejecutar comandos de la CLI de Airflow en otro entorno de Cloud Composer. El DAG de ejemplo ejecuta el comando dags trigger, que activa un DAG.

Este operador se puede ejecutar en el modo aplazable. Para habilitarlo, asigna el valor True al parámetro deferrable.

run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(
    task_id="run_airflow_cli_cmd",
    project_id="target-project",
    environment_id="target-composer-environment",
    region="us-central1",
    command="dags trigger -- target_dag",
    # You can run this operator in the deferrable mode:
    # deferrable=True
)

Comprobar si se ha completado una ejecución de DAG

Puedes usar el sensor CloudComposerDAGRunSensor para comprobar si se ha completado una ejecución de DAG en otro entorno de Cloud Composer.

Este sensor puede funcionar en el modo aplazable. Para habilitarlo, asigna el valor True al parámetro deferrable.

dag_run_sensor = CloudComposerDAGRunSensor(
    task_id="dag_run_sensor",
    project_id="target-project",
    environment_id="target-composer-environment",
    region="us-central1",
    composer_dag_id="target_dag",
    allowed_states=["success"],
    # You can run this sensor in the deferrable mode:
    # deferrable=True
)

Código de ejemplo completo

A continuación, se muestra el ejemplo de código completo de un DAG que combina las dos tareas descritas anteriormente.

from datetime import datetime, timedelta
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.cloud_composer import (
    CloudComposerRunAirflowCLICommandOperator,
)
from airflow.providers.google.cloud.sensors.cloud_composer import CloudComposerDAGRunSensor

DAG_ID = "trigger_dag_in_another_composer_environment"

TARGET_PROJECT_ID = "example-target-project"
TARGET_REGION = "example-target-region"
TARGET_ENV_ID = "example-target-composer-environment"

TARGET_DAG = "example_target_dag_id"
COMMAND = f"dags trigger -- {TARGET_DAG}"

with DAG(
    DAG_ID,
    schedule="@once",
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=["example", "composer"],
) as dag:

    run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(
        task_id="run_airflow_cli_cmd",
        project_id=TARGET_PROJECT_ID,
        environment_id=TARGET_ENV_ID,
        region=TARGET_REGION,
        command=COMMAND,
        # You can run this operator in the deferrable mode:
        # deferrable=True
    )

    dag_run_sensor = CloudComposerDAGRunSensor(
        task_id="dag_run_sensor",
        project_id=TARGET_PROJECT_ID,
        environment_id=TARGET_ENV_ID,
        region=TARGET_REGION,
        composer_dag_id=TARGET_DAG,
        allowed_states=["success"],
        execution_range=timedelta(minutes=5),
        # You can run this sensor in the deferrable mode:
        # deferrable=True
    )

    run_airflow_cli_cmd >> dag_run_sensor

Siguientes pasos