Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
Halaman ini menjelaskan cara mengelompokkan tugas di pipeline Airflow menggunakan pola desain berikut:
- Mengelompokkan tugas dalam grafik DAG.
- Memicu DAG turunan dari DAG induk.
- Mengelompokkan tugas dengan operator
TaskGroup
(hanya dengan Airflow 2).
Mengelompokkan tugas dalam grafik DAG
Untuk mengelompokkan tugas dalam fase tertentu pada pipeline, Anda dapat menggunakan hubungan antara tugas dalam file DAG.
Perhatikan contoh berikut:
Dalam alur kerja ini, tugas op-1
dan op-2
berjalan bersama setelah tugas awal
start
.
Anda dapat melakukannya dengan mengelompokkan tugas bersama dengan pernyataan
start >> [task_1, task_2]
.
Kode berikut memberikan implementasi lengkap DAG di atas:
Airflow 2
Aliran udara 1
Memicu DAG turunan dari DAG induk
Anda dapat memicu satu DAG dari DAG lain dengan
TriggerDagRunOperator
.
Bergantung pada versi Airflow, Anda dapat menemukan operator TriggerDagRunOperator
di modul yang berbeda:
- Untuk Airflow 1.10.*, gunakan modul
airflow.operators.dagrun_operator
. - Untuk Airflow 2, gunakan modul
airflow.operators.trigger_dagrun
.
Perhatikan contoh berikut:
Dalam alur kerja ini, blok dag_1
dan dag_2
mewakili serangkaian tugas
yang dikelompokkan bersama dalam DAG terpisah di lingkungan Cloud Composer.
Penerapan alur kerja ini memerlukan dua file DAG terpisah. File DAG pengontrol terlihat seperti berikut:
Airflow 2
Aliran udara 1
Implementasi DAG turunan, yang dipicu oleh DAG kontrol, terlihat seperti berikut:
Airflow 2
Aliran udara 1
Anda harus mengupload kedua file DAG di lingkungan Cloud Composer agar alur kerja berfungsi.
Mengelompokkan tugas dengan operator TaskGroup
Di Airflow 2, Anda dapat menggunakan
operator TaskGroup
untuk mengelompokkan tugas dalam DAG.
Tugas yang ditentukan dalam blok TaskGroup
masih merupakan bagian dari DAG utama.
Perhatikan contoh berikut:
Tugas op-1
dan op-2
dikelompokkan bersama dalam blok dengan ID
taskgroup_1
.
Penerapan alur kerja ini terlihat seperti kode berikut: