Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
Halaman ini menunjukkan cara menerapkan DAG yang memicu DAG di lingkungan dan project Cloud Composer lain menggunakan operator Airflow untuk Cloud Composer.
Jika Anda ingin memicu DAG di lingkungan Anda, lihat Menjadwalkan dan memicu DAG.
Mengonfigurasi Izin IAM
Jika lingkungan target berada di project lain, akun layanan lingkungan Anda memerlukan peran yang memungkinkan interaksi dengan lingkungan di project tersebut.
Project | Resource | Akun utama | Peran |
---|---|---|---|
Project tempat lingkungan target berada | Project | Akun layanan lingkungan dari lingkungan sumber |
Peran Composer Worker (composer.worker ) |
Project tempat lingkungan target berada | Project | Akun layanan lingkungan dari lingkungan sumber |
Peran khusus dengan izin
composer.environments.executeAirflowCommand |
Memicu DAG di lingkungan lain
DAG contoh yang dijelaskan di bagian ini melakukan hal berikut:
- Memicu DAG di lingkungan Cloud Composer lain.
- Memeriksa apakah operasi DAG telah selesai.
Setelah operasi DAG di lingkungan lain selesai, DAG contoh akan ditandai sebagai berhasil.
Menjalankan perintah Airflow CLI dengan CloudComposerRunAirflowCLICommandOperator
Anda dapat menggunakan operator
CloudComposerRunAirflowCLICommandOperator
untuk menjalankan perintah CLI Airflow di lingkungan Cloud Composer
lain. DAG contoh menjalankan perintah dags trigger
, yang
memicu DAG.
Operator ini dapat berjalan dalam mode yang dapat ditangguhkan, Anda
dapat mengaktifkannya dengan menyetel parameter deferrable
ke True
.
run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(
task_id="run_airflow_cli_cmd",
project_id="target-project",
environment_id="target-composer-environment",
region="us-central1",
command="dags trigger -- target_dag",
# You can run this operator in the deferrable mode:
# deferrable=True
)
Memeriksa apakah operasi DAG telah selesai
Anda dapat menggunakan sensor CloudComposerDAGRunSensor untuk memeriksa apakah DAG berjalan selesai di lingkungan Cloud Composer lain.
Sensor ini dapat berjalan dalam mode yang dapat ditangguhkan, Anda dapat mengaktifkannya dengan menyetel parameter deferrable
ke True
.
dag_run_sensor = CloudComposerDAGRunSensor(
task_id="dag_run_sensor",
project_id="target-project",
environment_id="target-composer-environment",
region="us-central1",
composer_dag_id="target_dag",
allowed_states=["success"],
# You can run this sensor in the deferrable mode:
# deferrable=True
)
Contoh kode lengkap
Berikut adalah contoh kode lengkap DAG yang menggabungkan dua tugas yang dijelaskan sebelumnya.
from datetime import datetime, timedelta
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.cloud_composer import (
CloudComposerRunAirflowCLICommandOperator,
)
from airflow.providers.google.cloud.sensors.cloud_composer import CloudComposerDAGRunSensor
DAG_ID = "trigger_dag_in_another_composer_environment"
TARGET_PROJECT_ID = "example-target-project"
TARGET_REGION = "example-target-region"
TARGET_ENV_ID = "example-target-composer-environment"
TARGET_DAG = "example_target_dag_id"
COMMAND = f"dags trigger -- {TARGET_DAG}"
with DAG(
DAG_ID,
schedule="@once",
start_date=datetime(2024, 1, 1),
catchup=False,
tags=["example", "composer"],
) as dag:
run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(
task_id="run_airflow_cli_cmd",
project_id=TARGET_PROJECT_ID,
environment_id=TARGET_ENV_ID,
region=TARGET_REGION,
command=COMMAND,
# You can run this operator in the deferrable mode:
# deferrable=True
)
dag_run_sensor = CloudComposerDAGRunSensor(
task_id="dag_run_sensor",
project_id=TARGET_PROJECT_ID,
environment_id=TARGET_ENV_ID,
region=TARGET_REGION,
composer_dag_id=TARGET_DAG,
allowed_states=["success"],
execution_range=timedelta(minutes=5),
# You can run this sensor in the deferrable mode:
# deferrable=True
)
run_airflow_cli_cmd >> dag_run_sensor