Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
Panduan ini menunjukkan cara menulis directed acyclic graph Apache Airflow (DAG) yang berjalan di lingkungan Cloud Composer.
Karena Apache Airflow tidak menyediakan DAG dan isolasi tugas yang kuat, sebaiknya gunakan lingkungan produksi dan pengujian terpisah untuk mencegah gangguan DAG. Untuk informasi selengkapnya, lihat Menguji DAG.
Menyusun DAG Airflow
DAG Airflow didefinisikan dalam file Python dan terdiri dari hal-hal berikut komponen:
- Definisi DAG
- Operator Airflow
- Hubungan operator
Cuplikan kode berikut menunjukkan contoh setiap komponen di luar konteks.
Definisi DAG
Contoh berikut menunjukkan DAG Airflow definisi:
Operator dan tugas
Operator Aliran Udara menjelaskan pekerjaan yang akan dilakukan. Tugas tugas adalah instance operator tertentu.
Hubungan tugas
Hubungan tugas menjelaskan urutan dalam mana pekerjaan itu harus diselesaikan.
Contoh alur kerja DAG lengkap di Python
Alur kerja berikut adalah template DAG yang berfungsi lengkap dan terdiri dari
dua tugas: tugas hello_python
dan tugas goodbye_bash
:
Untuk informasi selengkapnya tentang menentukan DAG Airflow, lihat tutorial Airflow dan konsep Airflow.
Operator Airflow
Contoh berikut menunjukkan beberapa operator Airflow yang populer. Untuk resmi operator Airflow, lihat referensi Referensi Operator dan Hook dan indeks penyedia.
BashOperator
Gunakan BashOperator untuk menjalankan program command line.
Cloud Composer menjalankan perintah yang disediakan dalam skrip Bash di pekerja Airflow. Worker adalah container Docker berbasis Debian dan mencakup beberapa paket.
PythonOperator
Gunakan PythonOperator untuk menjalankan kode Python arbitrer.
Cloud Composer menjalankan kode Python dalam penampung yang menyertakan paket untuk versi image Cloud Composer yang digunakan di lingkungan Anda.
Untuk menginstal paket Python tambahan, lihat Menginstal Dependensi Python.
Operator Google Cloud
Untuk menjalankan tugas yang menggunakan produk Google Cloud, gunakan Operator Google Cloud Airflow. Misalnya, Operator BigQuery mengkueri dan memproses data di BigQuery.
Terdapat lebih banyak operator Airflow untuk Google Cloud dan masing-masing layanan yang disediakan oleh Google Cloud. Lihat Operator Google Cloud untuk mengetahui daftar lengkapnya.
EmailOperator
Gunakan EmailOperator untuk mengirim email dari DAG. Untuk mengirim email dari lingkungan Cloud Composer, mengonfigurasi lingkungan Anda untuk menggunakan SendGrid.
Notifikasi tentang kegagalan operator
Tetapkan email_on_failure
ke True
untuk mengirim notifikasi email saat operator
di DAG gagal. Untuk mengirim notifikasi email dari lingkungan Cloud Composer, Anda harus mengonfigurasi lingkungan untuk menggunakan SendGrid.
Panduan alur kerja DAG
Tempatkan library Python kustom di arsip ZIP DAG dalam direktori bertingkat. Jangan tempatkan library di tingkat teratas direktori DAG.
Saat Airflow memindai folder
dags/
, Airflow hanya memeriksa DAG di Modul Python yang ada di folder DAG tingkat teratas dan di bagian atas level arsip ZIP yang juga terletak di folderdags/
level teratas. Jika Airflow menemukan modul Python dalam arsip ZIP yang tidak berisi substringairflow
danDAG
, Airflow akan berhenti memproses arsip ZIP. Airflow hanya menampilkan DAG yang ditemukan hingga saat itu.Untuk fault tolerance, jangan tentukan beberapa objek DAG di Python yang sama ruang lingkup modul ini.
Jangan gunakan SubDAG. Sebagai gantinya, mengelompokkan tugas di dalam DAG.
Tempatkan file yang diperlukan pada waktu penguraian DAG ke dalam folder
dags/
, bukan di folderdata/
.Menguji DAG yang dikembangkan atau dimodifikasi seperti yang direkomendasikan dalam petunjuk untuk menguji DAG.
Memastikan bahwa DAG yang dikembangkan tidak meningkatkan Waktu penguraian DAG terlalu banyak.
Tugas Airflow dapat gagal karena beberapa alasan. Untuk menghindari kegagalan seluruh operasi DAG, sebaiknya aktifkan percobaan ulang tugas. Menetapkan percobaan ulang maksimum ke
0
berarti tidak ada percobaan ulang yang dilakukan.Sebaiknya ganti Opsi
default_task_retries
dengan nilai untuk percobaan ulang tugas selain0
. Selain itu, Anda dapat menetapkan parameterretries
di tingkat tugas.Jika Anda ingin menggunakan GPU dalam tugas Airflow, buat elemen Cluster GKE berdasarkan node menggunakan mesin dengan GPU. Gunakan GKEStartPodOperator untuk menjalankan tugas Anda.
Hindari menjalankan tugas yang menggunakan banyak CPU dan memori di kumpulan node cluster tempat komponen Airflow lainnya (penjadwal, pekerja, server web) sedang berjalan. Sebagai gantinya, gunakan KubernetesPodOperator atau Sebagai gantinya, GKEStartPodOperator.
Saat men-deploy DAG ke lingkungan, hanya upload file yang benar-benar diperlukan untuk menafsirkan dan mengeksekusi DAG ke folder
/dags
.Batasi jumlah file DAG di folder
/dags
.Airflow terus menguraikan DAG di folder
/dags
. Penguraian adalah yang melalui {i>loop <i}melalui folder DAG dan jumlah file harus dimuat (dengan dependensinya) akan berdampak pada penguraian dan penjadwalan tugas DAG. Sebaiknya gunakan 100 file dengan masing-masing 100 DAG daripada 10.000 file dengan masing-masing 1 DAG karena pengoptimalan tersebut jauh lebih efisien. Pengoptimalan ini merupakan keseimbangan antara waktu penguraian dan efisiensi pembuatan dan pengelolaan DAG.Anda juga dapat mempertimbangkan, misalnya, untuk men-deploy 10.000 file DAG yang dapat membuat 100 file zip yang masing-masing berisi 100 file DAG.
Selain petunjuk di atas, jika Anda memiliki lebih dari 10.000 file DAG, membuat DAG secara terprogram mungkin merupakan opsi yang baik. Misalnya, Anda dapat menerapkan satu file DAG Python yang menghasilkan sejumlah objek DAG (misalnya, 20, 100 objek DAG).
FAQ untuk menulis DAG
Bagaimana cara meminimalkan pengulangan kode jika saya ingin menjalankan tugas yang sama atau serupa di beberapa DAG?
Sebaiknya tentukan library dan wrapper untuk meminimalkan pengulangan kode.
Bagaimana cara menggunakan kembali kode antar-file DAG?
Tempatkan {i>function<i} utilitas Anda di
library Python lokal
dan mengimpor fungsi. Anda dapat mereferensikan fungsi di DAG mana pun yang berada
di folder dags/
di bucket lingkungan Anda.
Bagaimana cara meminimalkan risiko yang timbul dari definisi yang berbeda?
Misalnya, Anda memiliki dua tim yang ingin menggabungkan data mentah menjadi pendapatan metrik. Tim menulis dua tugas yang sedikit berbeda dengan tujuan yang sama sesuatu. Menentukan library untuk menggunakan data pendapatan sehingga pengimplementasi DAG harus mengklarifikasi definisi pendapatan yang digabungkan.
Bagaimana cara menetapkan dependensi antar-DAG?
Hal ini bergantung pada cara Anda ingin menentukan dependensi.
Jika memiliki dua DAG (DAG A dan DAG B) dan ingin DAG B dipicu setelah DAG A, Anda dapat menempatkan TriggerDagRunOperator
di akhir DAG A.
Jika DAG B hanya bergantung pada artefak yang dihasilkan DAG A, seperti pesan Pub/Sub, sensor mungkin akan berfungsi lebih baik.
Jika DAG B terintegrasi erat dengan DAG A, Anda mungkin dapat menggabungkan kedua DAG menjadi satu DAG.
Bagaimana cara meneruskan ID run unik ke DAG dan tugasnya?
Misalnya, Anda ingin meneruskan nama cluster Dataproc dan jalur file.
Anda dapat membuat ID unik acak dengan menampilkan str(uuid.uuid4())
di
PythonOperator
. Langkah ini menempatkan
ID ke dalam
XComs
agar Anda dapat merujuk ke ID di operator lain
melalui kolom template.
Sebelum membuat uuid
, pertimbangkan apakah ID khusus DagRun akan
lebih bernilai. Anda juga dapat mereferensikan ID ini dalam penggantian Jinja dengan menggunakan makro.
Bagaimana cara memisahkan tugas di DAG?
Setiap tugas harus merupakan unit kerja idempoten. Oleh karena itu, sebaiknya hindari
enkapsulasi alur kerja multi-langkah dalam satu tugas, seperti kompleks
program yang berjalan di PythonOperator
.
Haruskah saya menentukan beberapa tugas dalam satu DAG untuk menggabungkan data dari beberapa sumber?
Misalnya, Anda memiliki beberapa tabel dengan data mentah dan ingin membuatnya untuk setiap tabel. Tugas-tugasnya tidak bergantung satu sama lain. Seharusnya Anda membuat satu tugas dan DAG untuk setiap tabel atau membuat satu DAG umum?
Jika Anda tidak keberatan jika setiap tugas memiliki properti tingkat DAG yang sama, seperti
schedule_interval
, sebaiknya tentukan beberapa tugas dalam satu
DAG. Atau, untuk meminimalkan pengulangan kode, beberapa DAG dapat dihasilkan
dari satu modul Python dengan menempatkannya ke dalam globals()
modul.
Bagaimana cara membatasi jumlah tugas serentak yang berjalan di DAG?
Misalnya, Anda ingin menghindari melebihi batas/kuota penggunaan API atau menghindari menjalankan terlalu banyak proses secara bersamaan.
Anda dapat menentukan Airflow pool di UI web Airflow dan tugas terkait dengan kumpulan yang ada di DAG.
FAQ terkait penggunaan operator
Haruskah saya menggunakan DockerOperator
?
Sebaiknya jangan gunakan
DockerOperator
, kecuali jika digunakan untuk meluncurkan
penampung di penginstalan Docker jarak jauh (bukan dalam cluster
lingkungan). Di lingkungan Cloud Composer, operator tidak memiliki
akses ke daemon Docker.
Sebagai gantinya, gunakan KubernetesPodOperator
atau
GKEStartPodOperator
. Operator ini meluncurkan pod Kubernetes ke dalam
Kubernetes atau GKE. Perhatikan bahwa kita tidak
sebaiknya luncurkan pod ke dalam cluster lingkungan, karena hal ini dapat
dalam persaingan sumber daya.
Haruskah saya menggunakan SubDagOperator
?
Kami tidak merekomendasikan penggunaan SubDagOperator
.
Gunakan alternatif seperti yang disarankan dalam Mengelompokkan tugas.
Haruskah saya menjalankan kode Python hanya di PythonOperators
untuk memisahkan operator Python sepenuhnya?
Bergantung pada sasaran, Anda memiliki beberapa opsi.
Jika satu-satunya perhatian Anda adalah mempertahankan dependensi Python yang terpisah, Anda
dapat menggunakan PythonVirtualenvOperator
.
Pertimbangkan untuk menggunakan KubernetesPodOperator
. Operator ini memungkinkan Anda
untuk menentukan pod Kubernetes dan menjalankan pod di cluster lain.
Bagaimana cara menambahkan paket biner kustom atau non-PyPI?
Anda dapat menginstal paket yang dihosting di repositori paket pribadi.
Bagaimana cara meneruskan argumen ke DAG dan tugasnya secara seragam?
Anda dapat menggunakan dukungan bawaan Airflow untuk Template Jinja untuk meneruskan argumen yang dapat digunakan di kolom template.
Kapan penggantian template terjadi?
Penggantian template terjadi pada pekerja Airflow tepat sebelum pre_execute
fungsi suatu operator dipanggil. Dalam praktiknya, ini berarti bahwa {i>template<i}
tidak diganti sampai tepat
sebelum tugas berjalan.
Bagaimana cara mengetahui argumen operator mana yang mendukung substitusi template?
Argumen operator yang mendukung substitusi template Jinja2 secara eksplisit ditandai sebagaimana mestinya.
Cari kolom template_fields
dalam definisi Operator,
yang berisi daftar nama argumen yang menjalani substitusi template.
Misalnya, lihat
BashOperator
, yang mendukung pemberian template untuk
argumen bash_command
dan env
.
Langkah selanjutnya
- Memecahkan masalah DAG
- Pemecahan Masalah Scheduler
- Operator Google
- Operator Google Cloud
- Tutorial Apache Airflow