Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
Auf dieser Seite wird gezeigt, wie Sie mithilfe von Airflow-Operatoren für Cloud Composer einen DAG implementieren, der DAGs in anderen Cloud Composer-Umgebungen und ‑Projekten auslöst.
Wenn Sie DAGs stattdessen in Ihrer Umgebung auslösen möchten, lesen Sie den Hilfeartikel DAGs planen und auslösen.
IAM-Berechtigungen konfigurieren
Wenn sich die Zielumgebung in einem anderen Projekt befindet, benötigt das Dienstkonto Ihrer Umgebung Rollen, die die Interaktion mit Umgebungen in diesem Projekt ermöglichen.
Projekt | Ressource | Hauptkonto | Rolle |
---|---|---|---|
Projekt, in dem sich die Zielumgebung befindet | Projekt | Das Dienstkonto der Quellumgebung |
Rolle Composer-Worker (composer.worker ) |
Projekt, in dem sich die Zielumgebung befindet | Projekt | Das Dienstkonto der Quellumgebung |
Eine benutzerdefinierte Rolle mit der Berechtigung composer.environment.executeAirflowCommand |
DAG in einer anderen Umgebung auslösen
Die in diesem Abschnitt beschriebene Beispiel-DAG führt Folgendes aus:
- Einen DAG in einer anderen Cloud Composer-Umgebung auslösen
- Prüft, ob eine DAG-Ausführung abgeschlossen ist.
Nachdem die DAG-Ausführung in einer anderen Umgebung abgeschlossen wurde, wird der Beispiel-DAG als erfolgreich markiert.
Befehle der Airflow-Befehlszeile mit CloudComposerRunAirflowCLICommandOperator ausführen
Mit dem Operator CloudComposerRunAirflowCLICommandOperator können Sie Airflow-Befehle in einer anderen Cloud Composer-Umgebung ausführen. Im Beispiel-DAG wird der Befehl dags trigger
ausgeführt, wodurch ein DAG ausgelöst wird.
Dieser Operator kann im verschiebbaren Modus ausgeführt werden. Sie können ihn aktivieren, indem Sie den Parameter deferrable
auf True
festlegen.
run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(
task_id="run_airflow_cli_cmd",
project_id="target-project",
environment_id="target-composer-environment",
region="us-central1",
command="dags trigger -- target_dag",
# You can run this operator in the deferrable mode:
# deferrable=True
)
Prüfen, ob eine DAG-Ausführung abgeschlossen ist
Mit dem Sensor CloudComposerDAGRunSensor können Sie prüfen, ob ein DAG-Lauf in einer anderen Cloud Composer-Umgebung abgeschlossen ist.
Dieser Sensor kann im verschiebbaren Modus ausgeführt werden. Sie können ihn aktivieren, indem Sie den Parameter deferrable
auf True
festlegen.
dag_run_sensor = CloudComposerDAGRunSensor(
task_id="dag_run_sensor",
project_id="target-project",
environment_id="target-composer-environment",
region="us-central1",
composer_dag_id="target_dag",
allowed_states=["success"],
# You can run this sensor in the deferrable mode:
# deferrable=True
)
Vollständiger Beispielcode
Im Folgenden finden Sie das vollständige Codebeispiel für einen DAG, der die beiden zuvor beschriebenen Aufgaben kombiniert.
from datetime import datetime, timedelta
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.cloud_composer import (
CloudComposerRunAirflowCLICommandOperator,
)
from airflow.providers.google.cloud.sensors.cloud_composer import CloudComposerDAGRunSensor
DAG_ID = "trigger_dag_in_another_composer_environment"
TARGET_PROJECT_ID = "example-target-project"
TARGET_REGION = "example-target-region"
TARGET_ENV_ID = "example-target-composer-environment"
TARGET_DAG = "example_target_dag_id"
COMMAND = f"dags trigger -- {TARGET_DAG}"
with DAG(
DAG_ID,
schedule="@once",
start_date=datetime(2024, 1, 1),
catchup=False,
tags=["example", "composer"],
) as dag:
run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(
task_id="run_airflow_cli_cmd",
project_id=TARGET_PROJECT_ID,
environment_id=TARGET_ENV_ID,
region=TARGET_REGION,
command=COMMAND,
# You can run this operator in the deferrable mode:
# deferrable=True
)
dag_run_sensor = CloudComposerDAGRunSensor(
task_id="dag_run_sensor",
project_id=TARGET_PROJECT_ID,
environment_id=TARGET_ENV_ID,
region=TARGET_REGION,
composer_dag_id=TARGET_DAG,
allowed_states=["success"],
execution_range=timedelta(minutes=5),
# You can run this sensor in the deferrable mode:
# deferrable=True
)
run_airflow_cli_cmd >> dag_run_sensor