Attivare i DAG in altri ambienti e progetti

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Questa pagina mostra come implementare un DAG che attiva DAG in altri progetti e ambienti Cloud Composer utilizzando gli operatori Airflow per Cloud Composer.

Se invece vuoi attivare i DAG nel tuo ambiente, consulta Pianificare e attivare i DAG.

Configura le autorizzazioni IAM

Se l'ambiente di destinazione si trova in un altro progetto, l'account di servizio del tuo ambiente deve avere ruoli che consentano di interagire con gli ambienti di quel progetto.

Progetto Risorsa Entità Ruolo
Progetto in cui si trova l'ambiente di destinazione Progetto Service account dell'ambiente di origine Ruolo Worker Composer (composer.worker)
Progetto in cui si trova l'ambiente di destinazione Progetto Service account dell'ambiente di origine Un ruolo personalizzato con l'autorizzazionecomposer.environment.executeAirflowCommand

Attivare un DAG in un altro ambiente

Il DAG di esempio descritto in questa sezione esegue le seguenti operazioni:

  1. Attiva un DAG in un altro ambiente Cloud Composer.
  2. Controlla se un'esecuzione di DAG è stata completata.

Al termine dell'esecuzione del DAG in un altro ambiente, il DAG di esempio viene contrassegnato come riuscito.

Esegui i comandi dell'interfaccia a riga di comando di Airflow con CloudComposerRunAirflowCLICommandOperator

Puoi utilizzare l'operatore CloudComposerRunAirflowCLICommandOperator per eseguire i comandi Airflow CLI in un altro ambiente Cloud Composer. Il DAG di esempio esegue il comando dags trigger, che attiva un DAG.

Questo operatore può essere eseguito in modalità differibile. Puoi attivarlo impostando il parametro deferrable su True.

run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(
    task_id="run_airflow_cli_cmd",
    project_id="target-project",
    environment_id="target-composer-environment",
    region="us-central1",
    command="dags trigger -- target_dag",
    # You can run this operator in the deferrable mode:
    # deferrable=True
)

Verificare se un'esecuzione di DAG è stata completata

Puoi utilizzare il sensore CloudComposerDAGRunSensor per verificare se l'esecuzione di un DAG è stata completata in un altro ambiente Cloud Composer.

Questo sensore può funzionare in modalità differibile. Puoi attivarlo impostando il parametro deferrable su True.

dag_run_sensor = CloudComposerDAGRunSensor(
    task_id="dag_run_sensor",
    project_id="target-project",
    environment_id="target-composer-environment",
    region="us-central1",
    composer_dag_id="target_dag",
    allowed_states=["success"],
    # You can run this sensor in the deferrable mode:
    # deferrable=True
)

Codice di esempio completo

Di seguito è riportato l'esempio completo di codice di un DAG che combina le due attività descritte in precedenza.

from datetime import datetime, timedelta
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.cloud_composer import (
    CloudComposerRunAirflowCLICommandOperator,
)
from airflow.providers.google.cloud.sensors.cloud_composer import CloudComposerDAGRunSensor

DAG_ID = "trigger_dag_in_another_composer_environment"

TARGET_PROJECT_ID = "example-target-project"
TARGET_REGION = "example-target-region"
TARGET_ENV_ID = "example-target-composer-environment"

TARGET_DAG = "example_target_dag_id"
COMMAND = f"dags trigger -- {TARGET_DAG}"

with DAG(
    DAG_ID,
    schedule="@once",
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=["example", "composer"],
) as dag:

    run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(
        task_id="run_airflow_cli_cmd",
        project_id=TARGET_PROJECT_ID,
        environment_id=TARGET_ENV_ID,
        region=TARGET_REGION,
        command=COMMAND,
        # You can run this operator in the deferrable mode:
        # deferrable=True
    )

    dag_run_sensor = CloudComposerDAGRunSensor(
        task_id="dag_run_sensor",
        project_id=TARGET_PROJECT_ID,
        environment_id=TARGET_ENV_ID,
        region=TARGET_REGION,
        composer_dag_id=TARGET_DAG,
        allowed_states=["success"],
        execution_range=timedelta(minutes=5),
        # You can run this sensor in the deferrable mode:
        # deferrable=True
    )

    run_airflow_cli_cmd >> dag_run_sensor

Passaggi successivi