Memicu DAG di lingkungan dan project lain

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Halaman ini menunjukkan cara menerapkan DAG yang memicu DAG di lingkungan dan project Cloud Composer lainnya menggunakan operator Airflow untuk Cloud Composer.

Jika Anda ingin memicu DAG di lingkungan, lihat Menjadwalkan dan memicu DAG.

Mengonfigurasi Izin IAM

Jika lingkungan target berada di project lain, akun layanan lingkungan Anda memerlukan peran yang memungkinkan interaksi dengan lingkungan dalam project tersebut.

Project Resource Akun utama Peran
Project tempat lingkungan target berada Project Akun layanan lingkungan dari lingkungan sumber Peran Pekerja Composer (composer.worker)
Project tempat lingkungan target berada Project Akun layanan lingkungan dari lingkungan sumber Peran kustom dengan izin composer.environment.executeAirflowCommand

Memicu DAG di lingkungan lain

Contoh DAG yang dijelaskan di bagian ini melakukan hal berikut:

  1. Memicu DAG di lingkungan Cloud Composer lain.
  2. Memeriksa apakah operasi DAG selesai.

Setelah operasi DAG di lingkungan lain selesai, contoh DAG akan ditandai sebagai berhasil.

Menjalankan perintah Airflow CLI dengan CloudComposerRunAirflowCLICommandOperator

Anda dapat menggunakan operator CloudComposerRunAirflowCLICommandOperator untuk menjalankan perintah Airflow CLI di lingkungan Cloud Composer lain. Contoh DAG mengeksekusi perintah dags trigger, yang memicu DAG.

Operator ini dapat berjalan dalam mode yang dapat ditangguhkan, Anda dapat mengaktifkannya dengan menetapkan parameter deferrable ke True.

run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(
    task_id="run_airflow_cli_cmd",
    project_id="target-project",
    environment_id="target-composer-environment",
    region="us-central1",
    command="dags trigger -- target_dag",
    # You can run this operator in the deferrable mode:
    # deferrable=True
)

Memeriksa apakah operasi DAG selesai

Anda dapat menggunakan sensor CloudComposerDAGRunSensor untuk memeriksa apakah operasi DAG selesai di lingkungan Cloud Composer lain.

Sensor ini dapat berjalan dalam mode yang dapat ditangguhkan, Anda dapat mengaktifkannya dengan menetapkan parameter deferrable ke True.

dag_run_sensor = CloudComposerDAGRunSensor(
    task_id="dag_run_sensor",
    project_id="target-project",
    environment_id="target-composer-environment",
    region="us-central1",
    composer_dag_id="target_dag",
    allowed_states=["success"],
    # You can run this sensor in the deferrable mode:
    # deferrable=True
)

Kode contoh lengkap

Berikut adalah contoh kode lengkap DAG yang menggabungkan dua tugas yang dijelaskan sebelumnya.

from datetime import datetime, timedelta
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.cloud_composer import (
    CloudComposerRunAirflowCLICommandOperator,
)
from airflow.providers.google.cloud.sensors.cloud_composer import CloudComposerDAGRunSensor

DAG_ID = "trigger_dag_in_another_composer_environment"

TARGET_PROJECT_ID = "example-target-project"
TARGET_REGION = "example-target-region"
TARGET_ENV_ID = "example-target-composer-environment"

TARGET_DAG = "example_target_dag_id"
COMMAND = f"dags trigger -- {TARGET_DAG}"

with DAG(
    DAG_ID,
    schedule="@once",
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=["example", "composer"],
) as dag:

    run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(
        task_id="run_airflow_cli_cmd",
        project_id=TARGET_PROJECT_ID,
        environment_id=TARGET_ENV_ID,
        region=TARGET_REGION,
        command=COMMAND,
        # You can run this operator in the deferrable mode:
        # deferrable=True
    )

    dag_run_sensor = CloudComposerDAGRunSensor(
        task_id="dag_run_sensor",
        project_id=TARGET_PROJECT_ID,
        environment_id=TARGET_ENV_ID,
        region=TARGET_REGION,
        composer_dag_id=TARGET_DAG,
        allowed_states=["success"],
        execution_range=timedelta(minutes=5),
        # You can run this sensor in the deferrable mode:
        # deferrable=True
    )

    run_airflow_cli_cmd >> dag_run_sensor

Langkah berikutnya