DAGs in anderen Umgebungen und Projekten auslösen

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Auf dieser Seite wird gezeigt, wie Sie mithilfe von Airflow-Operatoren für Cloud Composer einen DAG implementieren, der DAGs in anderen Cloud Composer-Umgebungen und ‑Projekten auslöst.

Wenn Sie DAGs stattdessen in Ihrer Umgebung auslösen möchten, lesen Sie den Hilfeartikel DAGs planen und auslösen.

IAM-Berechtigungen konfigurieren

Wenn sich die Zielumgebung in einem anderen Projekt befindet, benötigt das Dienstkonto Ihrer Umgebung Rollen, die die Interaktion mit Umgebungen in diesem Projekt ermöglichen.

Projekt Ressource Hauptkonto Rolle
Projekt, in dem sich die Zielumgebung befindet Projekt Das Dienstkonto der Quellumgebung Rolle Composer-Worker (composer.worker)
Projekt, in dem sich die Zielumgebung befindet Projekt Das Dienstkonto der Quellumgebung Eine benutzerdefinierte Rolle mit der Berechtigung composer.environment.executeAirflowCommand

DAG in einer anderen Umgebung auslösen

Die in diesem Abschnitt beschriebene Beispiel-DAG führt Folgendes aus:

  1. Einen DAG in einer anderen Cloud Composer-Umgebung auslösen
  2. Prüft, ob eine DAG-Ausführung abgeschlossen ist.

Nachdem die DAG-Ausführung in einer anderen Umgebung abgeschlossen wurde, wird der Beispiel-DAG als erfolgreich markiert.

Befehle der Airflow-Befehlszeile mit CloudComposerRunAirflowCLICommandOperator ausführen

Mit dem Operator CloudComposerRunAirflowCLICommandOperator können Sie Airflow-Befehle in einer anderen Cloud Composer-Umgebung ausführen. Im Beispiel-DAG wird der Befehl dags trigger ausgeführt, wodurch ein DAG ausgelöst wird.

Dieser Operator kann im verschiebbaren Modus ausgeführt werden. Sie können ihn aktivieren, indem Sie den Parameter deferrable auf True festlegen.

run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(
    task_id="run_airflow_cli_cmd",
    project_id="target-project",
    environment_id="target-composer-environment",
    region="us-central1",
    command="dags trigger -- target_dag",
    # You can run this operator in the deferrable mode:
    # deferrable=True
)

Prüfen, ob eine DAG-Ausführung abgeschlossen ist

Mit dem Sensor CloudComposerDAGRunSensor können Sie prüfen, ob ein DAG-Lauf in einer anderen Cloud Composer-Umgebung abgeschlossen ist.

Dieser Sensor kann im verschiebbaren Modus ausgeführt werden. Sie können ihn aktivieren, indem Sie den Parameter deferrable auf True festlegen.

dag_run_sensor = CloudComposerDAGRunSensor(
    task_id="dag_run_sensor",
    project_id="target-project",
    environment_id="target-composer-environment",
    region="us-central1",
    composer_dag_id="target_dag",
    allowed_states=["success"],
    # You can run this sensor in the deferrable mode:
    # deferrable=True
)

Vollständiger Beispielcode

Im Folgenden finden Sie das vollständige Codebeispiel für einen DAG, der die beiden zuvor beschriebenen Aufgaben kombiniert.

from datetime import datetime, timedelta
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.cloud_composer import (
    CloudComposerRunAirflowCLICommandOperator,
)
from airflow.providers.google.cloud.sensors.cloud_composer import CloudComposerDAGRunSensor

DAG_ID = "trigger_dag_in_another_composer_environment"

TARGET_PROJECT_ID = "example-target-project"
TARGET_REGION = "example-target-region"
TARGET_ENV_ID = "example-target-composer-environment"

TARGET_DAG = "example_target_dag_id"
COMMAND = f"dags trigger -- {TARGET_DAG}"

with DAG(
    DAG_ID,
    schedule="@once",
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=["example", "composer"],
) as dag:

    run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(
        task_id="run_airflow_cli_cmd",
        project_id=TARGET_PROJECT_ID,
        environment_id=TARGET_ENV_ID,
        region=TARGET_REGION,
        command=COMMAND,
        # You can run this operator in the deferrable mode:
        # deferrable=True
    )

    dag_run_sensor = CloudComposerDAGRunSensor(
        task_id="dag_run_sensor",
        project_id=TARGET_PROJECT_ID,
        environment_id=TARGET_ENV_ID,
        region=TARGET_REGION,
        composer_dag_id=TARGET_DAG,
        allowed_states=["success"],
        execution_range=timedelta(minutes=5),
        # You can run this sensor in the deferrable mode:
        # deferrable=True
    )

    run_airflow_cli_cmd >> dag_run_sensor

Nächste Schritte