Memicu DAG di lingkungan dan project lain

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Halaman ini menunjukkan cara menerapkan DAG yang memicu DAG di lingkungan dan project Cloud Composer lain menggunakan operator Airflow untuk Cloud Composer.

Jika Anda ingin memicu DAG di lingkungan Anda, lihat Menjadwalkan dan memicu DAG.

Mengonfigurasi Izin IAM

Jika lingkungan target berada di project lain, akun layanan lingkungan Anda memerlukan peran yang memungkinkan interaksi dengan lingkungan di project tersebut.

Project Resource Akun utama Peran
Project tempat lingkungan target berada Project Akun layanan lingkungan dari lingkungan sumber Peran Composer Worker (composer.worker)
Project tempat lingkungan target berada Project Akun layanan lingkungan dari lingkungan sumber Peran khusus dengan izin composer.environments.executeAirflowCommand

Memicu DAG di lingkungan lain

DAG contoh yang dijelaskan di bagian ini melakukan hal berikut:

  1. Memicu DAG di lingkungan Cloud Composer lain.
  2. Memeriksa apakah operasi DAG telah selesai.

Setelah operasi DAG di lingkungan lain selesai, DAG contoh akan ditandai sebagai berhasil.

Menjalankan perintah Airflow CLI dengan CloudComposerRunAirflowCLICommandOperator

Anda dapat menggunakan operator CloudComposerRunAirflowCLICommandOperator untuk menjalankan perintah CLI Airflow di lingkungan Cloud Composer lain. DAG contoh menjalankan perintah dags trigger, yang memicu DAG.

Operator ini dapat berjalan dalam mode yang dapat ditangguhkan, Anda dapat mengaktifkannya dengan menyetel parameter deferrable ke True.

run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(
    task_id="run_airflow_cli_cmd",
    project_id="target-project",
    environment_id="target-composer-environment",
    region="us-central1",
    command="dags trigger -- target_dag",
    # You can run this operator in the deferrable mode:
    # deferrable=True
)

Memeriksa apakah operasi DAG telah selesai

Anda dapat menggunakan sensor CloudComposerDAGRunSensor untuk memeriksa apakah DAG berjalan selesai di lingkungan Cloud Composer lain.

Sensor ini dapat berjalan dalam mode yang dapat ditangguhkan, Anda dapat mengaktifkannya dengan menyetel parameter deferrable ke True.

dag_run_sensor = CloudComposerDAGRunSensor(
    task_id="dag_run_sensor",
    project_id="target-project",
    environment_id="target-composer-environment",
    region="us-central1",
    composer_dag_id="target_dag",
    allowed_states=["success"],
    # You can run this sensor in the deferrable mode:
    # deferrable=True
)

Contoh kode lengkap

Berikut adalah contoh kode lengkap DAG yang menggabungkan dua tugas yang dijelaskan sebelumnya.

from datetime import datetime, timedelta
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.cloud_composer import (
    CloudComposerRunAirflowCLICommandOperator,
)
from airflow.providers.google.cloud.sensors.cloud_composer import CloudComposerDAGRunSensor

DAG_ID = "trigger_dag_in_another_composer_environment"

TARGET_PROJECT_ID = "example-target-project"
TARGET_REGION = "example-target-region"
TARGET_ENV_ID = "example-target-composer-environment"

TARGET_DAG = "example_target_dag_id"
COMMAND = f"dags trigger -- {TARGET_DAG}"

with DAG(
    DAG_ID,
    schedule="@once",
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=["example", "composer"],
) as dag:

    run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(
        task_id="run_airflow_cli_cmd",
        project_id=TARGET_PROJECT_ID,
        environment_id=TARGET_ENV_ID,
        region=TARGET_REGION,
        command=COMMAND,
        # You can run this operator in the deferrable mode:
        # deferrable=True
    )

    dag_run_sensor = CloudComposerDAGRunSensor(
        task_id="dag_run_sensor",
        project_id=TARGET_PROJECT_ID,
        environment_id=TARGET_ENV_ID,
        region=TARGET_REGION,
        composer_dag_id=TARGET_DAG,
        allowed_states=["success"],
        execution_range=timedelta(minutes=5),
        # You can run this sensor in the deferrable mode:
        # deferrable=True
    )

    run_airflow_cli_cmd >> dag_run_sensor

Langkah berikutnya