Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
En esta página, se muestra cómo implementar un DAG que activa DAG en otros proyectos y entornos de Cloud Composer con operadores de Airflow para Cloud Composer.
Si, en cambio, deseas activar los DAG en tu entorno, consulta Cómo programar y activar DAGs.
Configura los permisos de IAM
Si el entorno de destino se encuentra en otro proyecto, la cuenta de servicio de tu entorno necesita roles que permitan interactuar con los entornos de ese proyecto.
Proyecto | Recurso | Principal | Rol |
---|---|---|---|
Es el proyecto en el que se encuentra el entorno de destino. | Proyecto | Cuenta de servicio del entorno de origen |
Rol de trabajador de Composer (composer.worker ) |
Es el proyecto en el que se encuentra el entorno de destino. | Proyecto | Cuenta de servicio del entorno de origen |
Un rol personalizado con el permiso composer.environment.executeAirflowCommand |
Cómo activar un DAG en otro entorno
El DAG de ejemplo que se describe en esta sección hace lo siguiente:
- Activar un DAG en otro entorno de Cloud Composer
- Verifica si se completó una ejecución de DAG.
Después de que se complete la ejecución del DAG en otro entorno, el DAG de ejemplo se marca como correcta.
Ejecuta comandos de la CLI de Airflow con CloudComposerRunAirflowCLICommandOperator
Puedes usar el operador
CloudComposerRunAirflowCLICommandOperator
para ejecutar comandos de la CLI de Airflow en otro entorno de Cloud Composer. El DAG de ejemplo ejecuta el comando dags trigger
, que activa un DAG.
Este operador puede ejecutarse en el modo diferido. Para habilitarlo, configura el parámetro deferrable
en True
.
run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(
task_id="run_airflow_cli_cmd",
project_id="target-project",
environment_id="target-composer-environment",
region="us-central1",
command="dags trigger -- target_dag",
# You can run this operator in the deferrable mode:
# deferrable=True
)
Cómo verificar si se completó una ejecución de DAG
Puedes usar el sensor CloudComposerDAGRunSensor para verificar si se completó una ejecución de DAG en otro entorno de Cloud Composer.
Este sensor puede ejecutarse en el modo diferido. Para habilitarlo, configura el parámetro deferrable
en True
.
dag_run_sensor = CloudComposerDAGRunSensor(
task_id="dag_run_sensor",
project_id="target-project",
environment_id="target-composer-environment",
region="us-central1",
composer_dag_id="target_dag",
allowed_states=["success"],
# You can run this sensor in the deferrable mode:
# deferrable=True
)
Código de ejemplo completo
El siguiente es el ejemplo de código completo de un DAG que combina las dos tareas descritas anteriormente.
from datetime import datetime, timedelta
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.cloud_composer import (
CloudComposerRunAirflowCLICommandOperator,
)
from airflow.providers.google.cloud.sensors.cloud_composer import CloudComposerDAGRunSensor
DAG_ID = "trigger_dag_in_another_composer_environment"
TARGET_PROJECT_ID = "example-target-project"
TARGET_REGION = "example-target-region"
TARGET_ENV_ID = "example-target-composer-environment"
TARGET_DAG = "example_target_dag_id"
COMMAND = f"dags trigger -- {TARGET_DAG}"
with DAG(
DAG_ID,
schedule="@once",
start_date=datetime(2024, 1, 1),
catchup=False,
tags=["example", "composer"],
) as dag:
run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(
task_id="run_airflow_cli_cmd",
project_id=TARGET_PROJECT_ID,
environment_id=TARGET_ENV_ID,
region=TARGET_REGION,
command=COMMAND,
# You can run this operator in the deferrable mode:
# deferrable=True
)
dag_run_sensor = CloudComposerDAGRunSensor(
task_id="dag_run_sensor",
project_id=TARGET_PROJECT_ID,
environment_id=TARGET_ENV_ID,
region=TARGET_REGION,
composer_dag_id=TARGET_DAG,
allowed_states=["success"],
execution_range=timedelta(minutes=5),
# You can run this sensor in the deferrable mode:
# deferrable=True
)
run_airflow_cli_cmd >> dag_run_sensor