DAGs in anderen Umgebungen und Projekten auslösen

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Auf dieser Seite wird gezeigt, wie Sie einen DAG implementieren, der DAGs in anderen Cloud Composer-Umgebungen und -Projekten mithilfe von Airflow-Operatoren für Cloud Composer auslöst.

Wenn Sie stattdessen DAGs in Ihrer Umgebung auslösen möchten, lesen Sie den Abschnitt DAGs planen und auslösen.

IAM-Berechtigungen konfigurieren

Wenn sich die Zielumgebung in einem anderen Projekt befindet, benötigt das Dienstkonto Ihrer Umgebung Rollen, die die Interaktion mit Umgebungen in diesem Projekt ermöglichen.

Projekt Ressource Hauptkonto Rolle
Projekt, in dem sich die Zielumgebung befindet Projekt Dienstkonto der Quellumgebung Rolle Composer-Worker (composer.worker)
Projekt, in dem sich die Zielumgebung befindet Projekt Dienstkonto der Quellumgebung Eine benutzerdefinierte Rolle mit der Berechtigung composer.environments.executeAirflowCommand

DAG in einer anderen Umgebung auslösen

Die in diesem Abschnitt beschriebene Beispiel-DAG führt folgende Schritte aus:

  1. Lösen Sie einen DAG in einer anderen Cloud Composer-Umgebung aus.
  2. Prüft, ob eine DAG-Ausführung abgeschlossen ist.

Nachdem die DAG-Ausführung in einer anderen Umgebung abgeschlossen ist, wird die Beispiel-DAG als erfolgreich markiert.

Befehle der Airflow-Befehlszeile mit CloudComposerRunAirflowCLICommandOperator ausführen

Mit dem Operator CloudComposerRunAirflowCLICommandOperator können Sie Airflow-Befehlszeilenbefehle in einer anderen Cloud Composer-Umgebung ausführen. Im Beispiel-DAG wird der Befehl dags trigger ausgeführt, der einen DAG auslöst.

Dieser Operator kann im aufschiebbaren Modus ausgeführt werden. Sie können ihn aktivieren, indem Sie den Parameter deferrable auf True festlegen.

run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(
    task_id="run_airflow_cli_cmd",
    project_id="target-project",
    environment_id="target-composer-environment",
    region="us-central1",
    command="dags trigger -- target_dag",
    # You can run this operator in the deferrable mode:
    # deferrable=True
)

Prüfen, ob ein DAG-Lauf abgeschlossen ist

Mit dem Sensor CloudComposerDAGRunSensor können Sie prüfen, ob ein DAG-Lauf in einer anderen Cloud Composer-Umgebung abgeschlossen ist.

Dieser Sensor kann im aufschiebbaren Modus ausgeführt werden. Sie können ihn aktivieren, indem Sie den Parameter deferrable auf True setzen.

dag_run_sensor = CloudComposerDAGRunSensor(
    task_id="dag_run_sensor",
    project_id="target-project",
    environment_id="target-composer-environment",
    region="us-central1",
    composer_dag_id="target_dag",
    allowed_states=["success"],
    # You can run this sensor in the deferrable mode:
    # deferrable=True
)

Vollständiges Codebeispiel

Im Folgenden finden Sie das vollständige Codebeispiel für einen DAG, in dem die beiden zuvor beschriebenen Aufgaben kombiniert werden.

from datetime import datetime, timedelta
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.cloud_composer import (
    CloudComposerRunAirflowCLICommandOperator,
)
from airflow.providers.google.cloud.sensors.cloud_composer import CloudComposerDAGRunSensor

DAG_ID = "trigger_dag_in_another_composer_environment"

TARGET_PROJECT_ID = "example-target-project"
TARGET_REGION = "example-target-region"
TARGET_ENV_ID = "example-target-composer-environment"

TARGET_DAG = "example_target_dag_id"
COMMAND = f"dags trigger -- {TARGET_DAG}"

with DAG(
    DAG_ID,
    schedule="@once",
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=["example", "composer"],
) as dag:

    run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(
        task_id="run_airflow_cli_cmd",
        project_id=TARGET_PROJECT_ID,
        environment_id=TARGET_ENV_ID,
        region=TARGET_REGION,
        command=COMMAND,
        # You can run this operator in the deferrable mode:
        # deferrable=True
    )

    dag_run_sensor = CloudComposerDAGRunSensor(
        task_id="dag_run_sensor",
        project_id=TARGET_PROJECT_ID,
        environment_id=TARGET_ENV_ID,
        region=TARGET_REGION,
        composer_dag_id=TARGET_DAG,
        allowed_states=["success"],
        execution_range=timedelta(minutes=5),
        # You can run this sensor in the deferrable mode:
        # deferrable=True
    )

    run_airflow_cli_cmd >> dag_run_sensor

Nächste Schritte