Memecahkan masalah DAG

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Halaman ini memberikan langkah-langkah pemecahan masalah dan informasi untuk masalah alur kerja umum.

Banyak masalah eksekusi DAG disebabkan oleh performa lingkungan yang tidak optimal. Anda dapat mengoptimalkan lingkungan dengan mengikuti panduan Mengoptimalkan performa dan biaya lingkungan.

Beberapa masalah eksekusi DAG mungkin disebabkan oleh penjadwal Airflow yang tidak berfungsi dengan benar atau optimal. Ikuti petunjuk pemecahan masalah Penjadwal untuk menyelesaikan masalah ini.

Memecahkan masalah alur kerja

Untuk memulai pemecahan masalah:

  1. Periksa log Airflow.

    Anda dapat meningkatkan tingkat logging Airflow dengan mengganti opsi konfigurasi Airflow berikut.

    Bagian Kunci Nilai
    logging logging_level Nilai defaultnya adalah INFO. Tetapkan ke DEBUG untuk mendapatkan lebih banyak informasi dalam pesan log.
  2. Periksa Dasbor Monitoring.

  3. Tinjau Cloud Monitoring.

  4. Di konsol Google Cloud, periksa error di halaman untuk komponen lingkungan Anda.

  5. Di antarmuka web Airflow, periksa Tampilan Grafik DAG untuk instance tugas yang gagal.

    Bagian Kunci Nilai
    webserver dag_orientation LR, TB, RL, atau BT

Men-debug kegagalan operator

Untuk men-debug kegagalan operator:

  1. Periksa error khusus tugas.
  2. Periksa log Airflow.
  3. Tinjau Cloud Monitoring.
  4. Periksa log khusus operator.
  5. Perbaiki error.
  6. Upload DAG ke folder /dags.
  7. Di antarmuka web Airflow, hapus status sebelumnya untuk DAG.
  8. Lanjutkan atau jalankan DAG.

Memecahkan masalah eksekusi tugas

Airflow adalah sistem terdistribusi dengan banyak entitas seperti penjadwal, eksekutor, pekerja yang berkomunikasi satu sama lain melalui antrean tugas dan database Airflow serta mengirim sinyal (seperti SIGTERM). Diagram berikut menunjukkan ringkasan interkoneksi antara komponen Airflow.

Interaksi antara komponen Airflow
Gambar 1. Interaksi antara komponen Airflow (klik untuk memperbesar)

Dalam sistem terdistribusi seperti Airflow, mungkin ada beberapa masalah konektivitas jaringan, atau infrastruktur yang mendasarinya mungkin mengalami masalah yang terputus-putus; hal ini dapat menyebabkan situasi saat tugas dapat gagal dan dijadwalkan ulang untuk dieksekusi, atau tugas mungkin tidak berhasil diselesaikan (misalnya, tugas Zombie, atau tugas yang macet dalam eksekusi). Airflow memiliki mekanisme untuk menangani situasi tersebut dan melanjutkan fungsi normal secara otomatis. Bagian berikut menjelaskan masalah umum yang terjadi selama eksekusi tugas oleh Airflow: Tugas zombie, Poison Pills, dan sinyal SIGTERM.

Memecahkan masalah tugas Zombie

Airflow mendeteksi dua jenis ketidakcocokan antara tugas dan proses yang menjalankan tugas:

  • Tugas zombie adalah tugas yang seharusnya berjalan, tetapi tidak berjalan. Hal ini dapat terjadi jika proses tugas dihentikan atau tidak merespons, jika pekerja Airflow tidak melaporkan status tugas tepat waktu karena kelebihan beban, atau jika VM tempat tugas dieksekusi dimatikan. Airflow menemukan tugas tersebut secara berkala, dan gagal atau mencoba ulang tugas, bergantung pada setelan tugas.

    Menemukan tugas zombie

    resource.type="cloud_composer_environment"
    resource.labels.environment_name="ENVIRONMENT_NAME"
    log_id("airflow-scheduler")
    textPayload:"Detected zombie job"
  • Tugas yang tidak mati adalah tugas yang tidak seharusnya berjalan. Airflow menemukan tugas tersebut secara berkala dan menghentikannya.

Bagian berikut menjelaskan alasan dan solusi paling umum untuk tugas Zombie.

Pekerja Airflow kehabisan memori

Setiap pekerja Airflow dapat menjalankan hingga [celery]worker_concurrency instance tugas secara bersamaan. Jika konsumsi memori kumulatif dari instance tugas tersebut melebihi batas memori untuk pekerja Airflow, proses acak di dalamnya akan dihentikan untuk mengosongkan resource.

Terkadang, kekurangan memori pada pekerja Airflow dapat menyebabkan paket yang salah format dikirim selama sesi SQL Alchemy ke database, ke server DNS, atau ke layanan lain yang dipanggil oleh DAG. Dalam hal ini, ujung lain koneksi mungkin menolak atau membatalkan koneksi dari pekerja Airflow. Contoh:

"UNKNOWN:Error received from peer
{created_time:"2024-11-31T10:09:52.217738071+00:00", grpc_status:14,
grpc_message:"failed to connect to all addresses; last error: UNKNOWN:
ipv4:<ip address>:443: handshaker shutdown"}"

Solusi:

Pekerja Airflow dikeluarkan

Penghapusan pod adalah bagian yang normal saat menjalankan workload di Kubernetes. GKE mengeluarkan pod jika kehabisan penyimpanan atau untuk mengosongkan resource untuk workload dengan prioritas yang lebih tinggi.

Solusi:

Pekerja alur data dihentikan

Pekerja Airflow mungkin dihapus secara eksternal. Jika tugas yang sedang berjalan tidak selesai selama periode penghentian yang wajar, tugas tersebut akan terganggu dan mungkin akhirnya terdeteksi sebagai zombie.

Kemungkinan skenario dan solusi:

  • Worker Airflow dimulai ulang selama modifikasi lingkungan, seperti upgrade atau penginstalan paket:

    Menemukan modifikasi lingkungan Composer

    resource.type="cloud_composer_environment"
    resource.labels.environment_name="ENVIRONMENT_NAME"
    log_id("cloudaudit.googleapis.com%2Factivity")

    Anda dapat melakukan operasi tersebut saat tidak ada tugas penting yang berjalan atau mengaktifkan percobaan ulang tugas.

  • Berbagai komponen mungkin tidak tersedia untuk sementara selama operasi pemeliharaan.

    Anda dapat menentukan masa pemeliharaan untuk meminimalkan

    tumpang-tindih dengan eksekusi tugas penting.

Pekerja Airflow mengalami beban berat

Jumlah resource CPU dan memori yang tersedia untuk pekerja Airflow dibatasi oleh konfigurasi lingkungan. Jika penggunaan resource mendekati batas, hal ini dapat menyebabkan pertentangan resource dan penundaan yang tidak perlu selama eksekusi tugas. Dalam situasi ekstrem, jika resource tidak tersedia selama jangka waktu yang lebih lama, hal ini dapat menyebabkan tugas zombie.

Solusi:

Database Airflow mengalami beban berat

Database digunakan oleh berbagai komponen Airflow untuk berkomunikasi satu sama lain dan, khususnya, untuk menyimpan heartbeat instance tugas. Kekurangan resource di database menyebabkan waktu kueri yang lebih lama dan dapat memengaruhi eksekusi tugas.

Terkadang, error berikut muncul di log pekerja Airflow:

(psycopg2.OperationalError) connection to server at <IP address>,
port 3306 failed: server closed the connection unexpectedly

This probably means the server terminated abnormally before or while
processing the request.

Solusi:

Database Airflow tidak tersedia untuk sementara

Pekerja Airflow mungkin memerlukan waktu untuk mendeteksi dan menangani error terputus-putus dengan baik, seperti masalah konektivitas sementara. Hal ini mungkin melebihi batas deteksi zombie default.

Menemukan waktu tunggu heartbeat Airflow

resource.type="cloud_composer_environment"
resource.labels.environment_name="ENVIRONMENT_NAME"
log_id("airflow-worker")
textPayload:"Heartbeat time limit exceeded"

Solusi:

  • Tingkatkan waktu tunggu untuk tugas zombie dan ganti nilai opsi konfigurasi Airflow [scheduler]scheduler_zombie_task_threshold:

    Bagian Kunci Nilai Catatan
    scheduler scheduler_zombie_task_threshold Waktu tunggu baru (dalam detik) Nilai defaultnya adalah 300

Memecahkan Masalah Poison Pill

Poison Pill adalah mekanisme yang digunakan oleh Airflow untuk menonaktifkan tugas Airflow.

Airflow menggunakan Poison Pill dalam situasi berikut:

  • Saat penjadwal menghentikan tugas yang tidak selesai tepat waktu.
  • Saat tugas habis waktu tunggunya atau dieksekusi terlalu lama.

Saat Airflow menggunakan Poison Pill, Anda dapat melihat entri log berikut dalam log pekerja Airflow yang menjalankan tugas:

  INFO - Subtask ... WARNING - State of this instance has been externally set
  to success. Taking the poison pill.
  INFO - Subtask ... INFO - Sending Signals.SIGTERM to GPID <X>
  INFO - Subtask ... ERROR - Received SIGTERM. Terminating subprocesses.

Kemungkinan solusi:

  • Periksa kode tugas untuk menemukan error yang mungkin menyebabkannya berjalan terlalu lama.

  • Tingkatkan CPU dan memori untuk pekerja Airflow, sehingga tugas dieksekusi lebih cepat.

  • Tingkatkan nilai opsi konfigurasi Airflow [celery_broker_transport_options]visibility-timeout.

    Akibatnya, penjadwal menunggu lebih lama hingga tugas selesai, sebelum menganggap tugas tersebut sebagai tugas Zombie. Opsi ini sangat berguna untuk tugas yang memakan waktu berjam-jam. Jika nilainya terlalu rendah (misalnya, 3 jam), penjadwal akan menganggap tugas yang berjalan selama 5 atau 6 jam sebagai "macet" (tugas Zombie).

  • Tingkatkan nilai opsi konfigurasi Airflow [core]killed_task_cleanup_time.

    Nilai yang lebih lama memberi pekerja Airflow lebih banyak waktu untuk menyelesaikan tugas mereka dengan baik. Jika nilainya terlalu rendah, tugas Airflow mungkin terganggu secara tiba-tiba, tanpa cukup waktu untuk menyelesaikan tugasnya dengan baik.

Memecahkan masalah sinyal SIGTERM

Sinyal SIGTERM digunakan oleh Linux, Kubernetes, penjadwal Airflow, dan Celery untuk menghentikan proses yang bertanggung jawab untuk menjalankan pekerja Airflow atau tugas Airflow.

Mungkin ada beberapa alasan mengapa sinyal SIGTERM dikirim di lingkungan:

  • Tugas menjadi tugas Zombie dan harus dihentikan.

  • Penjadwal menemukan duplikat tugas dan mengirimkan sinyal Poison Pill dan SIGTERM ke tugas untuk menghentikannya.

  • Dalam Penskalaan Otomatis Pod Horizontal, Platform Kontrol GKE mengirim sinyal SIGTERM untuk menghapus Pod yang tidak lagi diperlukan.

  • Penjadwal dapat mengirim sinyal SIGTERM ke proses DagFileProcessorManager. Sinyal SIGTERM tersebut digunakan oleh Penjadwal untuk mengelola siklus proses DagFileProcessorManager dan dapat diabaikan dengan aman.

    Contoh:

    Launched DagFileProcessorManager with pid: 353002
    Sending Signals.SIGTERM to group 353002. PIDs of all processes in the group: []
    Sending the signal Signals.SIGTERM to group 353002
    Sending the signal Signals.SIGTERM to process 353002 as process group is missing.
    
  • Kondisi perlombaan antara callback heartbeat dan callback keluar di local_task_job, yang memantau eksekusi tugas. Jika heartbeat mendeteksi bahwa tugas ditandai sebagai berhasil, heartbeat tidak dapat membedakan apakah tugas itu sendiri berhasil atau Airflow diberi tahu untuk menganggap tugas berhasil. Meskipun demikian, tindakan ini akan menghentikan runner tugas, tanpa menunggu tugas keluar.

    Sinyal SIGTERM tersebut dapat diabaikan dengan aman. Tugas sudah dalam status berhasil dan eksekusi DAG yang berjalan secara keseluruhan tidak akan dipengaruhi.

    Entri log Received SIGTERM. adalah satu-satunya perbedaan antara keluar reguler dan penghentian tugas dalam status berhasil.

    Kondisi perlombaan antara heartbeat dan callback keluar
    Gambar 2. Kondisi race antara heartbeat dan callback keluar (klik untuk memperbesar)
  • Komponen Airflow menggunakan lebih banyak resource (CPU, memori) daripada yang diizinkan oleh node cluster.

  • Layanan GKE melakukan operasi pemeliharaan dan mengirim sinyal SIGTERM ke Pod yang berjalan di node yang akan diupgrade.

    Saat instance tugas dihentikan dengan SIGTERM, Anda dapat melihat entri log berikut dalam log pekerja Airflow yang menjalankan tugas:

    {local_task_job.py:211} WARNING - State of this instance has been externally
    set to queued. Terminating instance. {taskinstance.py:1411} ERROR - Received
    SIGTERM. Terminating subprocesses. {taskinstance.py:1703} ERROR - Task failed
    with exception
    

Kemungkinan solusi:

Masalah ini terjadi saat VM yang menjalankan tugas kehabisan memori. Hal ini tidak terkait dengan konfigurasi Airflow, tetapi dengan jumlah memori yang tersedia untuk VM.

  • Di Cloud Composer 3, Anda dapat menetapkan lebih banyak resource CPU dan memori ke pekerja Airflow.

  • Anda dapat menurunkan nilai opsi konfigurasi Airflow serentak [celery]worker_concurrency. Opsi ini menentukan jumlah tugas yang dijalankan secara serentak oleh pekerja Airflow tertentu.

Untuk mengetahui informasi selengkapnya tentang cara mengoptimalkan lingkungan, lihat Mengoptimalkan performa dan biaya lingkungan.

Dampak operasi update atau upgrade pada eksekusi tugas Airflow

Operasi update atau upgrade akan mengganggu tugas Airflow yang sedang dijalankan, kecuali jika tugas dijalankan dalam mode yang dapat ditangguhkan.

Sebaiknya lakukan operasi ini jika Anda mengharapkan dampak minimal pada eksekusi tugas Airflow dan siapkan mekanisme percobaan ulang yang sesuai di DAG dan tugas Anda.

Memecahkan masalah tugas KubernetesExecutor

CeleryKubernetesExecutor adalah jenis eksekutor di Cloud Composer 3 yang dapat menggunakan CeleryExecutor dan KubernetesExecutor secara bersamaan.

Lihat halaman Menggunakan CeleryKubernetesExecutor untuk mengetahui informasi selengkapnya tentang pemecahan masalah tugas yang dijalankan dengan KubernetesExecutor.

Masalah umum

Bagian berikut menjelaskan gejala dan kemungkinan perbaikan untuk beberapa masalah DAG umum.

Tugas Airflow terganggu oleh Negsignal.SIGKILL

Terkadang, tugas Anda mungkin menggunakan lebih banyak memori daripada yang dialokasikan pekerja Airflow. Dalam situasi seperti itu, proses tersebut mungkin terganggu oleh Negsignal.SIGKILL. Sistem mengirim sinyal ini untuk menghindari penggunaan memori lebih lanjut yang dapat memengaruhi eksekusi tugas Airflow lainnya. Di log pekerja Airflow, Anda mungkin melihat entri log berikut:

{local_task_job.py:102} INFO - Task exited with return code Negsignal.SIGKILL

Negsignal.SIGKILL juga dapat muncul sebagai kode -9.

Kemungkinan solusi:

  • worker_concurrency pekerja Airflow yang lebih rendah.

  • Meningkatkan jumlah memori yang tersedia untuk pekerja Airflow.

  • Kelola tugas yang membutuhkan banyak resource di Cloud Composer menggunakan KubernetesPodOperator atau GKEStartPodOperator untuk isolasi tugas dan alokasi resource yang disesuaikan.

  • Optimalkan tugas Anda agar menggunakan lebih sedikit memori.

Tugas gagal tanpa menampilkan log karena error penguraian DAG

Terkadang mungkin ada error DAG yang halus yang menyebabkan situasi saat penjadwal Airflow dapat menjadwalkan tugas untuk dieksekusi, pemroses DAG dapat menguraikan file DAG, tetapi pekerja Airflow gagal menjalankan tugas dari DAG karena ada error pemrograman dalam file DAG. Hal ini dapat menyebabkan situasi saat tugas Airflow ditandai sebagai Failed dan tidak ada log dari eksekusinya.

Solusi:

  • Verifikasi di log pekerja Airflow bahwa tidak ada error yang ditampilkan oleh pekerja Airflow yang terkait dengan DAG yang hilang atau error penguraian DAG.

  • Meningkatkan parameter yang terkait dengan penguraian DAG:

    • Tingkatkan dagbag-import-timeout setidaknya 120 detik (atau lebih, jika diperlukan).

    • Tingkatkan dag-file-processor-timeout menjadi minimal 180 detik (atau lebih, jika diperlukan). Nilai ini harus lebih tinggi dari dagbag-import-timeout.

  • Lihat juga Memeriksa log Pemroses DAG.

Tugas gagal tanpa menampilkan log karena tekanan resource

Gejala: selama eksekusi tugas, subproses pekerja Airflow yang bertanggung jawab untuk eksekusi tugas Airflow terputus secara tiba-tiba. Error yang terlihat di log pekerja Airflow mungkin terlihat mirip dengan yang di bawah ini:

...
File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 412, in trace_task    R = retval = fun(*args, **kwargs)  File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 704, in __protected_call__    return self.run(*args, **kwargs)  File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 88, in execute_command    _execute_in_fork(command_to_exec)  File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 99, in _execute_in_fork
raise AirflowException('Celery command failed on host: ' + get_hostname())airflow.exceptions.AirflowException: Celery command failed on host: airflow-worker-9qg9x
...

Solusi:

Tugas gagal tanpa menampilkan log karena penghapusan Pod

Pod Google Kubernetes Engine tunduk pada Siklus Proses Pod Kubernetes dan pengusiran Pod. Lonjakan tugas adalah penyebab paling umum untuk penghapusan Pod di Cloud Composer.

Penghapusan Pod dapat terjadi saat Pod tertentu menggunakan resource node secara berlebihan, secara relatif terhadap ekspektasi penggunaan resource yang dikonfigurasi untuk node. Misalnya, pengusiran dapat terjadi saat beberapa tugas yang membutuhkan banyak memori berjalan di Pod, dan beban gabungannya menyebabkan node tempat Pod ini berjalan melebihi batas konsumsi memori.

Jika Pod pekerja Airflow dihapus, semua instance tugas yang berjalan di Pod tersebut akan terganggu, dan kemudian ditandai sebagai gagal oleh Airflow.

Log dibuffer. Jika Pod pekerja dihapus sebelum buffering dihapus, log tidak akan ditampilkan. Kegagalan tugas tanpa log adalah indikasi bahwa pekerja Airflow dimulai ulang karena kehabisan memori (OOM). Beberapa log mungkin ada di Cloud Logging meskipun log Airflow tidak ditampilkan.

Untuk melihat log:

  1. Di konsol Google Cloud, buka halaman Environments.

    Buka Lingkungan

  2. Di daftar lingkungan, klik nama lingkungan Anda. Halaman Environment details akan terbuka.

  3. Buka tab Logs.

  4. Lihat log setiap pekerja Airflow di bagian Semua log > Log Airflow > Pekerja.

Solusi:

  • Meningkatkan batas memori untuk pekerja Airflow.

  • Pastikan tugas dalam DAG bersifat idempoten dan dapat dicoba ulang.

  • Hindari mendownload file yang tidak perlu ke sistem file lokal pekerja Airflow.

    Pekerja Airflow memiliki kapasitas sistem file lokal yang terbatas. Pekerja Airflow dapat memiliki penyimpanan dari 1 GB hingga 10 GB. Saat ruang penyimpanan habis, Pod pekerja Airflow akan dihapus oleh Bidang Kontrol GKE. Tindakan ini akan membuat semua tugas yang dijalankan pekerja yang dikeluarkan gagal.

    Contoh operasi yang bermasalah:

    • Mendownload file atau objek dan menyimpannya secara lokal di pekerja Airflow. Sebagai gantinya, simpan objek ini langsung di layanan yang sesuai seperti bucket Cloud Storage.
    • Mengakses objek besar di folder /data dari pekerja Airflow. Pekerja Airflow mendownload objek ke sistem file lokalnya. Sebagai gantinya, terapkan DAG Anda sehingga file besar diproses di luar Pod pekerja Airflow.

Waktu tunggu impor pemuatan DAG habis

Gejala:

  • Di antarmuka web Airflow, di bagian atas halaman daftar DAG, kotak pemberitahuan merah akan menampilkan Broken DAG: [/path/to/dagfile] Timeout.
  • Di Cloud Monitoring: Log airflow-scheduler berisi entri yang mirip dengan:

    • ERROR - Process timed out
    • ERROR - Failed to import: /path/to/dagfile
    • AirflowTaskTimeout: Timeout

Perbaikan:

Ganti opsi konfigurasi Airflow dag_file_processor_timeout dan berikan lebih banyak waktu untuk penguraian DAG:

Bagian Kunci Nilai
core dag_file_processor_timeout Nilai waktu tunggu baru

Eksekusi DAG tidak berakhir dalam waktu yang diharapkan

Gejala:

Terkadang, operasi DAG tidak berakhir karena tugas Airflow macet dan operasi DAG berlangsung lebih lama dari yang diharapkan. Dalam kondisi normal, tugas Airflow tidak akan terus-menerus berada dalam status antrean atau berjalan, karena Airflow memiliki prosedur waktu tunggu dan pembersihan yang membantu menghindari situasi ini.

Perbaikan:

  • Gunakan parameter dagrun_timeout untuk DAG. Misalnya: dagrun_timeout=timedelta(minutes=120) Akibatnya, setiap operasi jalankan DAG harus selesai dalam waktu tunggu operasi jalankan DAG. Tugas yang belum selesai ditandai sebagai Failed atau Upstream Failed. Untuk informasi selengkapnya tentang status tugas Airflow, lihat dokumentasi Apache Airflow.

  • Gunakan parameter task execution timeout untuk menentukan waktu tunggu default bagi tugas yang berjalan berdasarkan operator Apache Airflow.

Operasi DAG tidak dieksekusi

Gejala:

Jika tanggal jadwal untuk DAG ditetapkan secara dinamis, hal ini dapat menyebabkan berbagai efek samping yang tidak terduga. Contoh:

  • Eksekusi DAG selalu di masa mendatang, dan DAG tidak pernah dieksekusi.

  • Operasi DAG sebelumnya ditandai sebagai dieksekusi dan berhasil meskipun tidak dieksekusi.

Informasi selengkapnya tersedia di dokumentasi Apache Airflow.

Kemungkinan solusi:

  • Ikuti rekomendasi dalam dokumentasi Apache Airflow.

  • Menetapkan start_date statis untuk DAG. Sebagai opsi, Anda dapat menggunakan catchup=False untuk menonaktifkan DAG yang berjalan untuk tanggal sebelumnya.

  • Hindari penggunaan datetime.now() atau days_ago(<number of days>) kecuali jika Anda mengetahui efek samping dari pendekatan ini.

Peningkatan traffic jaringan ke dan dari database Airflow

Jumlah jaringan traffic antara cluster GKE lingkungan Anda dan database Airflow bergantung pada jumlah DAG, jumlah tugas dalam DAG, dan cara DAG mengakses data dalam database Airflow. Faktor-faktor berikut dapat memengaruhi penggunaan jaringan:

  • Kueri ke database Airflow. Jika DAG Anda melakukan banyak kueri, DAG tersebut akan menghasilkan traffic dalam jumlah besar. Contoh: memeriksa status tugas sebelum melanjutkan tugas lainnya, membuat kueri tabel XCom, membuang konten database Airflow.

  • Jumlah tugas yang besar. Makin banyak tugas yang dijadwalkan, makin banyak traffic jaringan yang dihasilkan. Pertimbangan ini berlaku untuk jumlah total tugas dalam DAG dan frekuensi penjadwalan. Saat menjadwalkan DAG berjalan, penjadwal Airflow akan membuat kueri ke database Airflow dan menghasilkan traffic.

  • Antarmuka web Airflow menghasilkan traffic jaringan karena membuat kueri ke database Airflow. Menggunakan halaman dengan grafik, tugas, dan diagram secara intensif dapat menghasilkan traffic jaringan dalam jumlah besar.

DAG membuat server web Airflow error atau menyebabkannya menampilkan error '502 gateway timeout'

Kegagalan server web dapat terjadi karena beberapa alasan. Periksa log airflow-webserver di Cloud Logging untuk menentukan penyebab error 502 gateway timeout.

Menangani banyak DAG dan plugin di folder dags dan plugin

Isi folder /dags dan /plugins disinkronkan dari bucket lingkungan Anda ke sistem file lokal pekerja dan penjadwal Airflow.

Makin banyak data yang disimpan di folder ini, makin lama waktu yang diperlukan untuk melakukan sinkronisasi. Untuk mengatasi situasi tersebut:

  • Batasi jumlah file dalam folder /dags dan /plugins. Hanya simpan file minimum yang diperlukan.

  • Meningkatkan ruang disk yang tersedia untuk penjadwal dan pekerja Airflow.

  • Meningkatkan CPU dan memori penjadwal dan pekerja Airflow, sehingga operasi sinkronisasi dilakukan lebih cepat.

  • Jika DAG berjumlah sangat besar, bagi DAG menjadi beberapa batch, kompresi ke dalam arsip zip, lalu deploy arsip ini ke folder /dags. Pendekatan ini mempercepat proses sinkronisasi DAG. Komponen Airflow akan mengekstrak arsip zip sebelum memproses DAG.

  • Membuat DAG secara terprogram juga dapat menjadi metode untuk membatasi jumlah file DAG yang disimpan di folder /dags. Lihat bagian tentang DAG Terprogram untuk menghindari masalah terkait penjadwalan dan eksekusi DAG yang dihasilkan secara terprogram.

Jangan menjadwalkan DAG yang dibuat secara terprogram secara bersamaan

Membuat objek DAG secara terprogram dari file DAG adalah metode yang efisien untuk menulis banyak DAG serupa yang hanya memiliki perbedaan kecil.

Sebaiknya jangan segera menjadwalkan semua DAG tersebut untuk dieksekusi. Ada kemungkinan besar bahwa pekerja Airflow tidak memiliki resource CPU dan memori yang memadai untuk menjalankan semua tugas yang dijadwalkan secara bersamaan.

Untuk menghindari masalah terkait penjadwalan DAG terprogram:

  • Tingkatkan konkurensi pekerja dan tingkatkan skala lingkungan Anda, sehingga dapat menjalankan lebih banyak tugas secara bersamaan.
  • Buat DAG dengan cara mendistribusikan jadwalnya secara merata dari waktu ke waktu, untuk menghindari penjadwalan ratusan tugas secara bersamaan, sehingga pekerja Airflow memiliki waktu untuk menjalankan semua tugas terjadwal.

Error 504 saat mengakses server web Airflow

Lihat Error 504 saat mengakses UI Airflow.

Koneksi ke server Postgres terputus selama pengecualian kueri ditampilkan selama eksekusi tugas atau tepat setelahnya

Pengecualian Lost connection to Postgres server during query sering terjadi jika kondisi berikut terpenuhi:

  • DAG Anda menggunakan PythonOperator atau operator kustom.
  • DAG Anda membuat kueri ke database Airflow.

Jika beberapa kueri dibuat dari fungsi yang dapat dipanggil, traceback mungkin menunjuk ke baris self.refresh_from_db(lock_for_update=True) dalam kode Airflow secara tidak benar; ini adalah kueri database pertama setelah eksekusi tugas. Penyebab sebenarnya dari pengecualian terjadi sebelum ini, saat sesi SQLAlchemy tidak ditutup dengan benar.

Sesi SQLAlchemy dicakup ke thread dan dibuat dalam sesi fungsi callable yang nantinya dapat dilanjutkan di dalam kode Airflow. Jika ada penundaan yang signifikan antara kueri dalam satu sesi, koneksi mungkin sudah ditutup oleh server Postgres. Waktu tunggu koneksi di lingkungan Cloud Composer ditetapkan ke sekitar 10 menit.

Solusi:

  • Gunakan dekorator airflow.utils.db.provide_session. Dekorator ini memberikan sesi yang valid ke database Airflow dalam parameter session dan menutup sesi dengan benar di akhir fungsi.
  • Jangan gunakan satu fungsi yang berjalan lama. Sebagai gantinya, pindahkan semua kueri database ke fungsi terpisah, sehingga ada beberapa fungsi dengan dekorator airflow.utils.db.provide_session. Dalam hal ini, sesi akan otomatis ditutup setelah mengambil hasil kueri.

Mengontrol waktu eksekusi DAG, tugas, dan eksekusi paralel DAG yang sama

Jika ingin mengontrol durasi eksekusi satu DAG untuk DAG tertentu, Anda dapat menggunakan parameter DAG dagrun_timeout untuk melakukannya. Misalnya, jika Anda ingin satu DAG berjalan (terlepas dari apakah eksekusi selesai dengan sukses atau gagal) tidak boleh berlangsung lebih dari 1 jam, tetapkan parameter ini ke 3600 detik.

Anda juga dapat mengontrol berapa lama Anda mengizinkan satu tugas Airflow berlangsung. Untuk melakukannya, Anda dapat menggunakan execution_timeout.

Jika ingin mengontrol jumlah DAG aktif yang ingin Anda miliki untuk DAG tertentu, Anda dapat menggunakan [core]max-active-runs-per-dag opsi konfigurasi Airflow untuk melakukannya.

Jika Anda hanya ingin menjalankan satu instance DAG pada waktu tertentu, tetapkan parameter max-active-runs-per-dag ke 1.

Masalah yang memengaruhi sinkronisasi DAG dan plugin ke penjadwal, pekerja, dan server web

Cloud Composer menyinkronkan konten folder /dags dan /plugins kepada penjadwal dan pekerja. Objek tertentu di folder /dags dan /plugins mungkin mencegah sinkronisasi ini berfungsi dengan benar atau memperlambatnya.

  • Folder /dags disinkronkan ke penjadwal dan pekerja.

    Folder ini tidak disinkronkan ke server web.

  • Folder /plugins disinkronkan ke penjadwal, pekerja, dan server web.

Anda mungkin mengalami masalah berikut:

  • Anda mengupload file yang dikompresi dengan gzip yang menggunakan transcoding kompresi ke folder /dags dan /plugins. Hal ini biasanya terjadi jika Anda menggunakan flag --gzip-local-all dalam perintah gcloud storage cp untuk mengupload data ke bucket.

    Solusi: Hapus objek yang menggunakan transcoding kompresi dan upload ulang ke bucket.

  • Salah satu objek diberi nama '.'—objek tersebut tidak disinkronkan ke penjadwal dan pekerja, dan mungkin berhenti menyinkronkan sama sekali.

    Solusi: Ganti nama objek.

  • Folder dan file Python DAG memiliki nama yang sama, misalnya a.py. Dalam hal ini, file DAG tidak disinkronkan dengan benar ke komponen Airflow.

    Solusi: Hapus folder yang memiliki nama yang sama dengan file Python DAG.

  • Salah satu objek dalam folder /dags atau /plugins berisi simbol / di akhir nama objek. Objek tersebut dapat mengganggu proses sinkronisasi karena simbol / berarti objek adalah folder, bukan file.

    Solusi: Hapus simbol / dari nama objek yang bermasalah.

  • Jangan menyimpan file yang tidak perlu di folder /dags dan /plugins.

    Terkadang DAG dan plugin yang Anda terapkan dilengkapi dengan file tambahan, seperti file yang menyimpan pengujian untuk komponen ini. File ini disinkronkan ke pekerja dan penjadwal serta memengaruhi waktu yang diperlukan untuk menyalin file ini ke penjadwal, pekerja, dan server web.

    Solusi: Jangan menyimpan file tambahan dan tidak perlu di folder /dags dan /plugins.

Selesai [Errno 21] Adalah direktori: error '/home/airflow/gcs/dags/...' dihasilkan oleh penjadwal dan pekerja

Masalah ini terjadi karena objek dapat memiliki namespace yang tumpang-tindih di Cloud Storage, sementara pada saat yang sama, penjadwal dan pekerja menggunakan sistem file tradisional. Misalnya, Anda dapat menambahkan folder dan objek dengan nama yang sama ke bucket lingkungan. Saat bucket disinkronkan ke penjadwal dan pekerja lingkungan, error ini akan muncul, yang dapat menyebabkan kegagalan tugas.

Untuk memperbaiki masalah ini, pastikan tidak ada namespace yang tumpang-tindih di bucket lingkungan. Misalnya, jika /dags/misc (file) dan /dags/misc/example_file.txt (file lain) berada dalam bucket, error akan dibuat oleh penjadwal.

Gangguan sementara saat terhubung ke DB Metadata Airflow

Cloud Composer berjalan di atas infrastruktur terdistribusi. Artinya, dari waktu ke waktu, beberapa masalah sementara mungkin muncul dan dapat mengganggu eksekusi tugas Airflow Anda.

Dalam situasi seperti itu, Anda mungkin melihat pesan error berikut di log pekerja Airflow:

"Can't connect to Postgres server on 'airflow-sqlproxy-service.default.svc.cluster.local' (111)"

atau

"Can't connect to Postgres server on 'airflow-sqlproxy-service.default.svc.cluster.local' (104)"

Masalah yang terputus-putus tersebut mungkin juga disebabkan oleh operasi pemeliharaan yang dilakukan untuk lingkungan Cloud Composer Anda.

Biasanya, error tersebut bersifat intermiten dan jika tugas Airflow bersifat idempotent dan Anda telah mengonfigurasi percobaan ulang, error tersebut tidak akan memengaruhi Anda. Anda juga dapat mempertimbangkan untuk menentukan masa pemeliharaan.

Salah satu alasan tambahan untuk error tersebut mungkin adalah kurangnya resource di cluster lingkungan Anda. Dalam kasus seperti itu, Anda dapat menskalakan atau mengoptimalkan lingkungan seperti yang dijelaskan dalam petunjuk Menskalakan lingkungan atau Mengoptimalkan lingkungan.

Operasi DAG ditandai sebagai berhasil, tetapi tidak memiliki tugas yang dieksekusi

Jika execution_date operasi DAG lebih awal dari start_date DAG, Anda mungkin melihat operasi DAG yang tidak memiliki operasi tugas, tetapi masih ditandai sebagai berhasil.

Operasi DAG yang berhasil tanpa tugas yang dieksekusi
Gambar 3. Operasi DAG yang berhasil tanpa tugas yang dieksekusi (klik untuk memperbesar)

Penyebab

Situasi ini mungkin terjadi dalam salah satu kasus berikut:

  • Ketidakcocokan disebabkan oleh perbedaan zona waktu antara execution_date dan start_date DAG. Hal ini dapat terjadi, misalnya, saat menggunakan pendulum.parse(...) untuk menetapkan start_date.

  • start_date DAG ditetapkan ke nilai dinamis, misalnya airflow.utils.dates.days_ago(1)

Solusi

  • Pastikan execution_date dan start_date menggunakan zona waktu yang sama.

  • Tentukan start_date statis dan gabungkan dengan catchup=False untuk menghindari menjalankan DAG dengan tanggal mulai sebelumnya.

DAG tidak terlihat di UI Airflow atau UI DAG dan penjadwal tidak menjadwalkannya

Pemroses DAG mengurai setiap DAG sebelum dapat dijadwalkan oleh penjadwal dan sebelum DAG terlihat di UI Airflow atau UI DAG.

Opsi konfigurasi Airflow berikut menentukan waktu tunggu untuk mengurai DAG:

Jika DAG tidak terlihat di UI Airflow atau UI DAG:

  • Periksa log pemroses DAG apakah pemroses DAG dapat memproses DAG Anda dengan benar. Jika terjadi masalah, Anda mungkin melihat entri log berikut di log penjadwal atau pemroses DAG:

    [2020-12-03 03:06:45,672] {dag_processing.py:1334} ERROR - Processor for
    /usr/local/airflow/dags/example_dag.py with PID 21903 started at
    2020-12-03T03:05:55.442709+00:00 has timed out, killing it.
    
  • Periksa log penjadwal untuk melihat apakah penjadwal berfungsi dengan benar. Jika terjadi masalah, Anda mungkin melihat entri log berikut di log penjadwal:

    DagFileProcessorManager (PID=732) last sent a heartbeat 240.09 seconds ago! Restarting it
    Process timed out, PID: 68496
    

Solusi:

  • Perbaiki semua error penguraian DAG. Prosesor DAG menguraikan beberapa DAG, dan dalam kasus yang jarang terjadi, error penguraian satu DAG dapat berdampak negatif pada penguraian DAG lainnya.

  • Jika penguraian DAG Anda memerlukan waktu lebih dari jumlah detik yang ditentukan dalam [core]dagrun_import_timeout, tingkatkan waktu tunggu ini.

  • Jika penguraian semua DAG Anda memerlukan waktu lebih dari jumlah detik yang ditentukan dalam [core]dag_file_processor_timeout, tingkatkan waktu tunggu ini.

  • Jika DAG Anda memerlukan waktu lama untuk diuraikan, hal ini juga dapat berarti bahwa DAG tidak diterapkan dengan cara yang optimal. Misalnya, jika membaca banyak variabel lingkungan, atau melakukan panggilan ke layanan eksternal atau database Airflow. Sebisa mungkin, hindari melakukan operasi tersebut di bagian global DAG.

  • Tingkatkan resource CPU dan memori untuk Penjadwal agar dapat bekerja lebih cepat.

  • Sesuaikan jumlah penjadwal.

  • Meningkatkan jumlah proses pemroses DAG sehingga penguraian dapat dilakukan lebih cepat. Anda dapat melakukannya dengan meningkatkan nilai [scheduler]parsing_process.

  • Menurunkan frekuensi penguraian DAG.

  • Menurunkan beban pada database Airflow.

Gejala database Airflow yang mengalami beban berat

Untuk informasi selengkapnya, lihat Gejala Database Airflow yang mengalami tekanan beban.

Langkah selanjutnya