Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
Questa pagina mostra come implementare un DAG che attiva DAG in altri progetti e ambienti Cloud Composer utilizzando gli operatori Airflow per Cloud Composer.
Se invece vuoi attivare i DAG nel tuo ambiente, consulta Pianificare e attivare i DAG.
Configura le autorizzazioni IAM
Se l'ambiente di destinazione si trova in un altro progetto, l'account di servizio del tuo ambiente deve avere ruoli che consentano di interagire con gli ambienti di quel progetto.
Progetto | Risorsa | Entità | Ruolo |
---|---|---|---|
Progetto in cui si trova l'ambiente di destinazione | Progetto | Service account dell'ambiente di origine |
Ruolo Worker Composer (composer.worker ) |
Progetto in cui si trova l'ambiente di destinazione | Progetto | Service account dell'ambiente di origine |
Un ruolo personalizzato con l'autorizzazionecomposer.environment.executeAirflowCommand |
Attivare un DAG in un altro ambiente
Il DAG di esempio descritto in questa sezione esegue le seguenti operazioni:
- Attiva un DAG in un altro ambiente Cloud Composer.
- Controlla se un'esecuzione di DAG è stata completata.
Al termine dell'esecuzione del DAG in un altro ambiente, il DAG di esempio viene contrassegnato come riuscito.
Esegui i comandi dell'interfaccia a riga di comando di Airflow con CloudComposerRunAirflowCLICommandOperator
Puoi utilizzare l'operatore
CloudComposerRunAirflowCLICommandOperator
per eseguire i comandi Airflow CLI in un altro ambiente Cloud Composer. Il DAG di esempio esegue il comando dags trigger
, che attiva un DAG.
Questo operatore può essere eseguito in modalità differibile. Puoi attivarlo impostando il parametro deferrable
su True
.
run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(
task_id="run_airflow_cli_cmd",
project_id="target-project",
environment_id="target-composer-environment",
region="us-central1",
command="dags trigger -- target_dag",
# You can run this operator in the deferrable mode:
# deferrable=True
)
Verificare se un'esecuzione di DAG è stata completata
Puoi utilizzare il sensore CloudComposerDAGRunSensor per verificare se l'esecuzione di un DAG è stata completata in un altro ambiente Cloud Composer.
Questo sensore può funzionare in modalità differibile. Puoi attivarlo impostando il parametro deferrable
su True
.
dag_run_sensor = CloudComposerDAGRunSensor(
task_id="dag_run_sensor",
project_id="target-project",
environment_id="target-composer-environment",
region="us-central1",
composer_dag_id="target_dag",
allowed_states=["success"],
# You can run this sensor in the deferrable mode:
# deferrable=True
)
Codice di esempio completo
Di seguito è riportato l'esempio completo di codice di un DAG che combina le due attività descritte in precedenza.
from datetime import datetime, timedelta
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.cloud_composer import (
CloudComposerRunAirflowCLICommandOperator,
)
from airflow.providers.google.cloud.sensors.cloud_composer import CloudComposerDAGRunSensor
DAG_ID = "trigger_dag_in_another_composer_environment"
TARGET_PROJECT_ID = "example-target-project"
TARGET_REGION = "example-target-region"
TARGET_ENV_ID = "example-target-composer-environment"
TARGET_DAG = "example_target_dag_id"
COMMAND = f"dags trigger -- {TARGET_DAG}"
with DAG(
DAG_ID,
schedule="@once",
start_date=datetime(2024, 1, 1),
catchup=False,
tags=["example", "composer"],
) as dag:
run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(
task_id="run_airflow_cli_cmd",
project_id=TARGET_PROJECT_ID,
environment_id=TARGET_ENV_ID,
region=TARGET_REGION,
command=COMMAND,
# You can run this operator in the deferrable mode:
# deferrable=True
)
dag_run_sensor = CloudComposerDAGRunSensor(
task_id="dag_run_sensor",
project_id=TARGET_PROJECT_ID,
environment_id=TARGET_ENV_ID,
region=TARGET_REGION,
composer_dag_id=TARGET_DAG,
allowed_states=["success"],
execution_range=timedelta(minutes=5),
# You can run this sensor in the deferrable mode:
# deferrable=True
)
run_airflow_cli_cmd >> dag_run_sensor