Attivare i DAG in altri ambienti e progetti

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Questa pagina mostra come implementare un DAG che attiva DAG in altri progetti e ambienti Cloud Composer utilizzando gli operatori Airflow per Cloud Composer.

Se invece vuoi attivare i DAG nel tuo ambiente, consulta Pianificare e attivare i DAG.

Configura le autorizzazioni IAM

Se l'ambiente di destinazione si trova in un altro progetto, il service account del tuo ambiente deve disporre di ruoli che consentano l'interazione con gli ambienti in quel progetto.

Progetto Risorsa Entità Ruolo
Il progetto in cui si trova l'ambiente di destinazione Progetto Il account di servizio dell'ambiente di origine Ruolo Worker Composer (composer.worker)
Il progetto in cui si trova l'ambiente di destinazione Progetto Il account di servizio dell'ambiente di origine Un ruolo personalizzato con l'autorizzazione composer.environments.executeAirflowCommand

Attivare un DAG in un altro ambiente

Il DAG di esempio descritto in questa sezione esegue le seguenti operazioni:

  1. Attiva un DAG in un altro ambiente Cloud Composer.
  2. Verifica se un'esecuzione di DAG è stata completata.

Al termine dell'esecuzione del DAG in un altro ambiente, il DAG di esempio viene contrassegnato come riuscito.

Esegui i comandi dell'interfaccia a riga di comando di Airflow con CloudComposerRunAirflowCLICommandOperator

Puoi utilizzare l'operatore CloudComposerRunAirflowCLICommandOperator per eseguire i comandi della CLI di Airflow in un altro ambiente Cloud Composer. Il DAG di esempio esegue il comando dags trigger, che attiva un DAG.

Questo operatore può essere eseguito in modalità differibile, che puoi attivare impostando il parametro deferrable su True.

run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(
    task_id="run_airflow_cli_cmd",
    project_id="target-project",
    environment_id="target-composer-environment",
    region="us-central1",
    command="dags trigger -- target_dag",
    # You can run this operator in the deferrable mode:
    # deferrable=True
)

Controllare se l'esecuzione di un DAG è stata completata

Puoi utilizzare il sensore CloudComposerDAGRunSensor per verificare se l'esecuzione di un DAG è completata in un altro ambiente Cloud Composer.

Questo sensore può essere eseguito in modalità differibile, che puoi attivare impostando il parametro deferrable su True.

dag_run_sensor = CloudComposerDAGRunSensor(
    task_id="dag_run_sensor",
    project_id="target-project",
    environment_id="target-composer-environment",
    region="us-central1",
    composer_dag_id="target_dag",
    allowed_states=["success"],
    # You can run this sensor in the deferrable mode:
    # deferrable=True
)

Codice di esempio completo

Di seguito è riportato l'esempio di codice completo di un DAG che combina le due attività descritte in precedenza.

from datetime import datetime, timedelta
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.cloud_composer import (
    CloudComposerRunAirflowCLICommandOperator,
)
from airflow.providers.google.cloud.sensors.cloud_composer import CloudComposerDAGRunSensor

DAG_ID = "trigger_dag_in_another_composer_environment"

TARGET_PROJECT_ID = "example-target-project"
TARGET_REGION = "example-target-region"
TARGET_ENV_ID = "example-target-composer-environment"

TARGET_DAG = "example_target_dag_id"
COMMAND = f"dags trigger -- {TARGET_DAG}"

with DAG(
    DAG_ID,
    schedule="@once",
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=["example", "composer"],
) as dag:

    run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(
        task_id="run_airflow_cli_cmd",
        project_id=TARGET_PROJECT_ID,
        environment_id=TARGET_ENV_ID,
        region=TARGET_REGION,
        command=COMMAND,
        # You can run this operator in the deferrable mode:
        # deferrable=True
    )

    dag_run_sensor = CloudComposerDAGRunSensor(
        task_id="dag_run_sensor",
        project_id=TARGET_PROJECT_ID,
        environment_id=TARGET_ENV_ID,
        region=TARGET_REGION,
        composer_dag_id=TARGET_DAG,
        allowed_states=["success"],
        execution_range=timedelta(minutes=5),
        # You can run this sensor in the deferrable mode:
        # deferrable=True
    )

    run_airflow_cli_cmd >> dag_run_sensor

Passaggi successivi