Déclencher des DAG dans d'autres environnements et projets

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Cette page explique comment implémenter un DAG qui déclenche des DAG dans d'autres environnements et projets Cloud Composer à l'aide d'opérateurs Airflow pour Cloud Composer.

Si vous préférez déclencher des DAG dans votre environnement, consultez la section Planifier et déclencher des DAG.

Configurer les autorisations IAM

Si l'environnement cible se trouve dans un autre projet, le compte de service de votre environnement a besoin de rôles qui permettent d'interagir avec les environnements de ce projet.

Projet Ressource Compte principal Rôle
Projet dans lequel se trouve l'environnement cible Projet Compte de service de l'environnement source Rôle Nœud de calcul Composer (composer.worker)
Projet dans lequel se trouve l'environnement cible Projet Compte de service de l'environnement source Rôle personnalisé disposant de l'autorisation composer.environment.executeAirflowCommand

Déclencher un DAG dans un autre environnement

L'exemple de DAG décrit dans cette section effectue les opérations suivantes:

  1. Déclencher un DAG dans un autre environnement Cloud Composer
  2. Vérifie si une exécution de DAG est terminée.

Une fois l'exécution du DAG dans un autre environnement terminée, l'exemple de DAG est marqué comme "Succès".

Exécuter des commandes de CLI Airflow avec CloudComposerRunAirflowCLICommandOperator

Vous pouvez utiliser l'opérateur CloudComposerRunAirflowCLICommandOperator pour exécuter des commandes de CLI Airflow dans un autre environnement Cloud Composer. L'exemple de DAG exécute la commande dags trigger, qui déclenche un DAG.

Cet opérateur peut s'exécuter en mode différé. Vous pouvez l'activer en définissant le paramètre deferrable sur True.

run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(
    task_id="run_airflow_cli_cmd",
    project_id="target-project",
    environment_id="target-composer-environment",
    region="us-central1",
    command="dags trigger -- target_dag",
    # You can run this operator in the deferrable mode:
    # deferrable=True
)

Vérifier si une exécution DAG est terminée

Vous pouvez utiliser le capteur CloudComposerDAGRunSensor pour vérifier si une exécution de DAG est terminée dans un autre environnement Cloud Composer.

Ce capteur peut s'exécuter en mode différé. Vous pouvez l'activer en définissant le paramètre deferrable sur True.

dag_run_sensor = CloudComposerDAGRunSensor(
    task_id="dag_run_sensor",
    project_id="target-project",
    environment_id="target-composer-environment",
    region="us-central1",
    composer_dag_id="target_dag",
    allowed_states=["success"],
    # You can run this sensor in the deferrable mode:
    # deferrable=True
)

Exemple de code complet

Voici l'exemple de code complet d'un DAG combinant les deux tâches décrites précédemment.

from datetime import datetime, timedelta
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.cloud_composer import (
    CloudComposerRunAirflowCLICommandOperator,
)
from airflow.providers.google.cloud.sensors.cloud_composer import CloudComposerDAGRunSensor

DAG_ID = "trigger_dag_in_another_composer_environment"

TARGET_PROJECT_ID = "example-target-project"
TARGET_REGION = "example-target-region"
TARGET_ENV_ID = "example-target-composer-environment"

TARGET_DAG = "example_target_dag_id"
COMMAND = f"dags trigger -- {TARGET_DAG}"

with DAG(
    DAG_ID,
    schedule="@once",
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=["example", "composer"],
) as dag:

    run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(
        task_id="run_airflow_cli_cmd",
        project_id=TARGET_PROJECT_ID,
        environment_id=TARGET_ENV_ID,
        region=TARGET_REGION,
        command=COMMAND,
        # You can run this operator in the deferrable mode:
        # deferrable=True
    )

    dag_run_sensor = CloudComposerDAGRunSensor(
        task_id="dag_run_sensor",
        project_id=TARGET_PROJECT_ID,
        environment_id=TARGET_ENV_ID,
        region=TARGET_REGION,
        composer_dag_id=TARGET_DAG,
        allowed_states=["success"],
        execution_range=timedelta(minutes=5),
        # You can run this sensor in the deferrable mode:
        # deferrable=True
    )

    run_airflow_cli_cmd >> dag_run_sensor

Étape suivante