Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
Halaman ini menunjukkan cara menerapkan DAG yang memicu DAG di lingkungan dan project Cloud Composer lainnya menggunakan operator Airflow untuk Cloud Composer.
Jika Anda ingin memicu DAG di lingkungan, lihat Menjadwalkan dan memicu DAG.
Mengonfigurasi Izin IAM
Jika lingkungan target berada di project lain, akun layanan lingkungan Anda memerlukan peran yang memungkinkan interaksi dengan lingkungan di project tersebut.
Project | Resource | Akun utama | Peran |
---|---|---|---|
Project tempat lingkungan target berada | Project | Akun layanan lingkungan dari lingkungan sumber |
Peran Pekerja Composer (composer.worker ) |
Project tempat lingkungan target berada | Project | Akun layanan lingkungan dari lingkungan sumber |
Peran kustom dengan izin composer.environment.executeAirflowCommand |
Memicu DAG di lingkungan lain
Contoh DAG yang dijelaskan di bagian ini melakukan hal berikut:
- Memicu DAG di lingkungan Cloud Composer lain.
- Memeriksa apakah operasi DAG selesai.
Setelah operasi DAG di lingkungan lain selesai, contoh DAG akan ditandai sebagai berhasil.
Menjalankan perintah Airflow CLI dengan CloudComposerRunAirflowCLICommandOperator
Anda dapat menggunakan operator CloudComposerRunAirflowCLICommandOperator untuk menjalankan perintah Airflow CLI di lingkungan Cloud Composer lain. Contoh DAG mengeksekusi perintah dags trigger
, yang
memicu DAG.
Operator ini dapat berjalan dalam mode yang dapat ditangguhkan, Anda
dapat mengaktifkannya dengan menetapkan parameter deferrable
ke True
.
run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(
task_id="run_airflow_cli_cmd",
project_id="target-project",
environment_id="target-composer-environment",
region="us-central1",
command="dags trigger -- target_dag",
# You can run this operator in the deferrable mode:
# deferrable=True
)
Memeriksa apakah operasi DAG selesai
Anda dapat menggunakan sensor CloudComposerDAGRunSensor untuk memeriksa apakah operasi DAG selesai di lingkungan Cloud Composer lain.
Sensor ini dapat berjalan dalam mode yang dapat ditangguhkan, Anda dapat
mengaktifkannya dengan menetapkan parameter deferrable
ke True
.
dag_run_sensor = CloudComposerDAGRunSensor(
task_id="dag_run_sensor",
project_id="target-project",
environment_id="target-composer-environment",
region="us-central1",
composer_dag_id="target_dag",
allowed_states=["success"],
# You can run this sensor in the deferrable mode:
# deferrable=True
)
Kode contoh lengkap
Berikut adalah contoh kode lengkap DAG yang menggabungkan dua tugas yang dijelaskan sebelumnya.
from datetime import datetime, timedelta
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.cloud_composer import (
CloudComposerRunAirflowCLICommandOperator,
)
from airflow.providers.google.cloud.sensors.cloud_composer import CloudComposerDAGRunSensor
DAG_ID = "trigger_dag_in_another_composer_environment"
TARGET_PROJECT_ID = "example-target-project"
TARGET_REGION = "example-target-region"
TARGET_ENV_ID = "example-target-composer-environment"
TARGET_DAG = "example_target_dag_id"
COMMAND = f"dags trigger -- {TARGET_DAG}"
with DAG(
DAG_ID,
schedule="@once",
start_date=datetime(2024, 1, 1),
catchup=False,
tags=["example", "composer"],
) as dag:
run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(
task_id="run_airflow_cli_cmd",
project_id=TARGET_PROJECT_ID,
environment_id=TARGET_ENV_ID,
region=TARGET_REGION,
command=COMMAND,
# You can run this operator in the deferrable mode:
# deferrable=True
)
dag_run_sensor = CloudComposerDAGRunSensor(
task_id="dag_run_sensor",
project_id=TARGET_PROJECT_ID,
environment_id=TARGET_ENV_ID,
region=TARGET_REGION,
composer_dag_id=TARGET_DAG,
allowed_states=["success"],
execution_range=timedelta(minutes=5),
# You can run this sensor in the deferrable mode:
# deferrable=True
)
run_airflow_cli_cmd >> dag_run_sensor