Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
Tutorial ini berisi langkah-langkah untuk men-debug DAG Airflow yang gagal dalam Cloud Composer dan mendiagnosis masalah terkait resource pekerja, seperti kurangnya memori pekerja atau ruang penyimpanan, dengan bantuan log dan lingkungan pemantauan model.
Pengantar
Tutorial ini berfokus pada masalah yang berkaitan dengan resource untuk mendemonstrasikan cara men-debug DAG.
Kurangnya resource pekerja yang dialokasikan menyebabkan kegagalan DAG. Jika tugas Airflow kehabisan memori atau penyimpanan, Anda mungkin melihat pengecualian Airflow, seperti:
WARNING airflow.exceptions.AirflowException: Task received SIGTERM signal
INFO - Marking task as FAILED.
atau
Task exited with return code Negsignal.SIGKILL
Dalam kasus tersebut, rekomendasi umumnya adalah meningkatkan worker Airflow sumber daya atau mengurangi jumlah tugas per pekerja. Namun, karena Airflow pengecualian bisa bersifat umum, mungkin akan sulit untuk mengidentifikasi resource tertentu yang menyebabkan masalah.
Tutorial ini menjelaskan cara mendiagnosis alasan kegagalan DAG dan mengidentifikasi jenis resource yang menyebabkan masalah dengan men-debug dua contoh DAG yang gagal karena kurangnya memori dan penyimpanan pekerja.
Tujuan
Jalankan contoh DAG yang gagal karena alasan berikut:
- Kurangnya memori pekerja
- Kurangnya penyimpanan pekerja
Mendiagnosis alasan kegagalan
Meningkatkan resource pekerja yang dialokasikan
Menguji DAG dengan batas resource baru
Biaya
Tutorial ini menggunakan komponen Google Cloud yang dapat ditagih berikut:
Setelah menyelesaikan tutorial ini, Anda dapat menghindari penagihan berkelanjutan dengan menghapus resource yang Anda buat. Untuk mengetahui detail selengkapnya, lihat Membersihkan.
Sebelum memulai
Bagian ini menjelaskan tindakan yang diperlukan sebelum Anda memulai tutorial.
Membuat dan mengonfigurasi project
Untuk tutorial ini, Anda memerlukan resource Google Cloud project Anda. Konfigurasikan project dengan cara berikut:
Di konsol Google Cloud, pilih atau buat project:
Pastikan penagihan diaktifkan untuk project Anda. Pelajari cara memeriksa apakah penagihan diaktifkan pada sebuah project.
Pastikan pengguna project Google Cloud Anda memiliki peran berikut untuk membuat resource yang diperlukan:
- Administrator Objek Penyimpanan dan Lingkungan
(
roles/composer.environmentAndStorageObjectAdmin
) - Compute Admin (
roles/compute.admin
) - Monitoring Editor (
roles/monitoring.editor
)
- Administrator Objek Penyimpanan dan Lingkungan
(
Mengaktifkan API untuk project Anda
Aktifkan API Cloud Composer.
Membuat lingkungan Cloud Composer
Membuat lingkungan Cloud Composer 2.
Sebagai bagian dari upaya menciptakan lingkungan,
Anda memberikan Ekstensi Agen Layanan Cloud Composer v2 API
(roles/composer.ServiceAgentV2Ext
) ke Composer Service Agent
menggunakan akun layanan. Cloud Composer menggunakan akun ini untuk menjalankan operasi
di project Google Cloud Anda.
Memeriksa batas resource worker
Periksa batas resource pekerja Airflow di lingkungan Anda:
Di Konsol Google Cloud, buka halaman Environments.
Pada daftar lingkungan, klik nama lingkungan Anda. Halaman Detail lingkungan akan terbuka.
Buka tab Konfigurasi lingkungan.
Buka Sumber daya > Konfigurasi beban kerja > Pekerja.
Pastikan nilainya adalah 0,5 vCPU, memori 1,875 GB, dan penyimpanan 1 GB. Ini adalah batas resource worker Airflow yang akan Anda gunakan dalam langkah-langkah dalam tutorial ini.
Contoh: Mendiagnosis masalah kehabisan memori
Upload contoh DAG berikut ke lingkungan
yang telah dibuat pada langkah sebelumnya. Dalam tutorial ini, DAG ini diberi nama
create_list_with_many_strings
.
DAG ini berisi satu tugas yang menjalankan langkah-langkah berikut:
- Membuat daftar kosong
s
. - Menjalankan siklus untuk menambahkan string
More
ke daftar. - Mencetak jumlah memori yang digunakan daftar dan menunggu 1 detik dalam setiap 1 detik iterasi menit.
import time
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import sys
from datetime import timedelta
default_args = {
'start_date': airflow.utils.dates.days_ago(0),
'retries': 0,
'retry_delay': timedelta(minutes=10)
}
dag = DAG(
'create_list_with_many_strings',
default_args=default_args,
schedule_interval=None)
def consume():
s = []
for i in range(120):
for j in range(1000000):
s.append("More")
print(f"i={i}; size={sys.getsizeof(s) / (1000**3)}GB")
time.sleep(1)
t1 = PythonOperator(
task_id='task0',
python_callable=consume,
dag=dag,
depends_on_past=False,
retries=0
)
Memicu DAG sampel
Picu DAG sampel, create_list_with_many_strings
:
Di Konsol Google Cloud, buka halaman Environments.
Di kolom server web Airflow, ikuti link Airflow untuk lingkungan fleksibel App Engine.
Di antarmuka web Airflow, pada halaman DAGs, di bagian Link untuk DAG, klik tombol Trigger Dag.
Klik Pemicu.
Di halaman DAG, klik tugas yang Anda picu dan tinjau outputnya log untuk memastikan DAG Anda mulai berjalan.
Saat tugas berjalan, log output akan mencetak ukuran memori dalam GB yang digunakan DAG.
Setelah beberapa menit, tugas akan gagal karena melebihi pekerja Airflow sebesar 1,875 GB.
Mendiagnosis DAG yang gagal
Jika Anda menjalankan beberapa tugas pada saat kegagalan, pertimbangkan untuk menjalankan hanya satu tugas dan mendiagnosis tekanan sumber daya selama waktu itu untuk mengidentifikasi tugas mana yang menyebabkan tekanan sumber daya dan sumber daya mana yang perlu Anda tingkatkan.
Meninjau log tugas Airflow
Perhatikan bahwa tugas dari DAG create_list_with_many_strings
memiliki
status Failed
.
Tinjau log tugas. Anda akan melihat entri log berikut:
```none
{local_task_job.py:102} INFO - Task exited with return code
Negsignal.SIGKILL
```
`Netsignal.SIGKILL` might be an indication of your task using more memory
than the Airflow worker is allocated. The system sends
the `Negsignal.SIGKILL` signal to avoid further memory consumption.
Meninjau workload
Tinjau beban kerja untuk memeriksa apakah beban tugas Anda tidak menyebabkan node tempat Pod akan berjalan melebihi batas konsumsi memori:
Di Konsol Google Cloud, buka halaman Environments.
Pada daftar lingkungan, klik nama lingkungan Anda. Halaman Detail lingkungan akan terbuka.
Buka tab Konfigurasi lingkungan.
Di Resource > cluster GKE > Beban kerja, klik lihat beban kerja cluster.
Periksa apakah beberapa pod workload memiliki status yang serupa dengan berikut ini:
Error with exit code 137 and 1 more issue. ContainerStatusUnknown with exit code 137 and 1 more issue
Exit code 137
berarti container atau Pod mencoba menggunakan lebih banyak memori dari yang diizinkan. Proses ini dihentikan untuk mencegah penggunaan memori.
Meninjau pemantauan kesehatan lingkungan dan konsumsi resource
Tinjau pemantauan kesehatan lingkungan dan konsumsi resource:
Di Konsol Google Cloud, buka halaman Environments.
Pada daftar lingkungan, klik nama lingkungan Anda. Halaman Detail lingkungan akan terbuka.
Buka tab Monitoring lalu pilih Overview.
Di panel Environment overview, cari Grafik Kondisi Lingkungan (Airflow Monitoring DAG). Isinya adalah , yang sesuai dengan waktu saat log mulai mengalami error pencetakan.
Pilih Workers, lalu cari grafik Total worker memory usage. Perhatikan bahwa baris Memory usage memiliki lonjakan pada saat sedang berjalan.
Meskipun garis penggunaan memori pada grafik tidak mencapai batas, saat mendiagnosis alasan kegagalan, Anda perlu mempertimbangkan hanya memori yang dapat dialokasikan, sedangkan garis Batas memori pada grafik mewakili total memori tersedia (termasuk kapasitas yang direservasi oleh GKE).
Dalam contoh ini, batas memori pekerja ditetapkan ke 1,875 GB. GKE memesan 25% dari 4 GiB memori pertama. GKE juga mencadangkan nilai minimum penghapusan: Memori 100 MiB di setiap node untuk penghapusan kubelet.
Memori yang dapat dialokasikan dihitung dengan cara berikut:
ALLOCATABLE = CAPACITY - RESERVED - EVICTION-THRESHOLD
Jika batas memori adalah 1,875 GB, memori sebenarnya yang dapat dialokasikan adalah:
1.75 GiB (1.875GB) - 0.44 (25% GiB reserved) - 0.1 = 1.21 GiB (~1.3 GB).
Saat menambahkan batas aktual ini ke grafik penggunaan memori, Anda akan melihat bahwa lonjakan penggunaan memori tugas mencapai memori sebenarnya sehingga Anda dapat menyimpulkan bahwa tugas gagal karena worker tidak memadai memori.
Meningkatkan batas memori pekerja
Alokasikan memori pekerja tambahan agar DAG contoh berhasil:
Di Konsol Google Cloud, buka halaman Environments.
Pada daftar lingkungan, klik nama lingkungan Anda. Halaman Detail lingkungan akan terbuka.
Buka tab Konfigurasi lingkungan.
Temukan konfigurasi Resource > Workloads dan Klik Edit.
Di bagian Worker, di kolom Memory, tentukan memori baru untuk pekerja Airflow. Dalam tutorial ini, gunakan 3 GB.
Simpan perubahan dan tunggu beberapa menit agar pekerja Airflow Anda dapat mulai ulang.
Menguji DAG dengan batas memori baru
Picu DAG create_list_with_many_strings
lagi dan tunggu hingga
selesai berjalan.
Di log output proses DAG, Anda akan melihat
Marking task as SUCCESS
, dan status tugas akan menunjukkan Success.Tinjau bagian Environment overview di tab Monitoring dan pastikan tidak ada area berwarna merah.
Klik bagian Workers dan cari Total worker memory usage grafik. Anda akan melihat bahwa baris Memory limit mencerminkan perubahan di batas memori, dan baris Memory usage jauh di bawah batas sebenarnya batas memori yang dapat dialokasikan.
Contoh: Mendiagnosis masalah kehabisan penyimpanan
Di langkah ini, Anda akan mengupload dua DAG yang membuat file berukuran besar. DAG pertama membuat file yang besar. DAG kedua membuat file besar dan meniru yang berjalan lama.
Ukuran file di kedua DAG melebihi penyimpanan pekerja Airflow default 1 GB tetapi DAG kedua memiliki tugas tunggu tambahan untuk dari waktu ke waktu secara artifisial.
Anda akan menyelidiki perbedaan perilaku kedua DAG dalam langkah.
Mengupload DAG yang membuat file berukuran besar
Upload contoh DAG berikut ke lingkungan
yang telah dibuat pada langkah sebelumnya. Dalam tutorial ini, DAG ini diberi nama
create_large_txt_file_print_logs
.
DAG ini berisi satu tugas yang menjalankan langkah-langkah berikut:
- Menulis file
localfile.txt
sebesar 1,5 GB ke penyimpanan pekerja Airflow. - Mencetak ukuran file yang dibuat menggunakan modul
os
Python. - Mencetak durasi DAG dijalankan setiap 1 menit.
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import os
from datetime import timedelta
import time
default_args = {
'start_date': airflow.utils.dates.days_ago(0),
'retries': 0,
'retry_delay': timedelta(minutes=10)
}
dag = DAG(
'create_large_txt_file_print_logs',
default_args=default_args,
schedule_interval=None)
def consume():
size = 1000**2 # bytes in 1 MB
amount = 100
def create_file():
print(f"Start creating a huge file")
with open("localfile.txt", "ab") as f:
for j in range(15):
f.write(os.urandom(amount) * size)
print("localfile.txt size:", os.stat("localfile.txt").st_size / (1000**3), "GB")
create_file()
print("Success!")
t1 = PythonOperator(
task_id='create_huge_file',
python_callable=consume,
dag=dag,
depends_on_past=False,
retries=0)
Mengupload DAG yang membuat file besar dalam operasi yang berjalan lama
Untuk meniru DAG yang berjalan lama dan menyelidiki dampak durasi tugas
di status akhir, upload sampel DAG kedua ke
lingkungan fleksibel App Engine. Dalam tutorial ini, DAG ini diberi nama
long_running_create_large_txt_file_print_logs
.
DAG ini berisi satu tugas yang menjalankan langkah-langkah berikut:
- Menulis file
localfile.txt
sebesar 1,5 GB ke penyimpanan pekerja Airflow. - Mencetak ukuran file yang dibuat menggunakan modul
os
Python. - Menunggu 1 jam 15 menit untuk meniru beberapa waktu yang dibutuhkan untuk operasi dengan {i>file<i}, misalnya, membaca dari file.
- Mencetak durasi DAG dijalankan setiap 1 menit.
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import os
from datetime import timedelta
import time
default_args = {
'start_date': airflow.utils.dates.days_ago(0),
'retries': 0,
'retry_delay': timedelta(minutes=10)
}
dag = DAG(
'long_running_create_large_txt_file_print_logs',
default_args=default_args,
schedule_interval=None)
def consume():
size = 1000**2 # bytes in 1 MB
amount = 100
def create_file():
print(f"Start creating a huge file")
with open("localfile.txt", "ab") as f:
for j in range(15):
f.write(os.urandom(amount) * size)
print("localfile.txt size:", os.stat("localfile.txt").st_size / (1000**3), "GB")
create_file()
for k in range(75):
time.sleep(60)
print(f"{k+1} minute")
print("Success!")
t1 = PythonOperator(
task_id='create_huge_file',
python_callable=consume,
dag=dag,
depends_on_past=False,
retries=0)
Memicu DAG contoh
Picu DAG pertama, create_large_txt_file_print_logs
:
Di Konsol Google Cloud, buka halaman Environments.
Di kolom server web Airflow, ikuti link Airflow untuk lingkungan fleksibel App Engine.
Di antarmuka web Airflow, pada halaman DAGs, di bagian Link untuk DAG, klik tombol Trigger Dag.
Klik Pemicu.
Di halaman DAG, klik tugas yang Anda picu dan tinjau outputnya log untuk memastikan DAG Anda mulai berjalan.
Tunggu hingga tugas yang Anda buat dengan
create_large_txt_file_print_logs
DAG selesai. Proses ini mungkin memerlukan beberapa menit.Di halaman DAGs, klik jalankan DAG. Anda akan melihat tugas Anda memiliki status
Success
, meskipun batas penyimpanan terlampaui.
Tinjau tugas log Airflow:
Di Konsol Google Cloud, buka halaman Environments.
Pada daftar lingkungan, klik nama lingkungan Anda. Halaman Detail lingkungan akan terbuka.
Buka tab Logs, lalu buka All logs > Log Airflow > Pekerja > Lihat di Logs Explorer.
Filter log berdasarkan jenis: hanya tampilkan pesan Error.
Di log, Anda akan melihat pesan yang mirip dengan berikut ini:
Worker: warm shutdown (Main Process)
atau
A worker pod was evicted at 2023-12-01T12:30:05Z with message: Pod ephemeral
local storage usage exceeds the total limit of containers 1023Mi.
Log ini menunjukkan bahwa pod memulai "warm shutdown" karena penyimpanan yang digunakan melampaui batas dan dikeluarkan dalam 1 jam. Namun, operasi DAG tidak gagal karena telah diselesaikan dalam masa tenggang penghentian Kubernetes ini, yang akan dijelaskan lebih lanjut dalam tutorial ini.
Untuk menggambarkan konsep masa tenggang penghentian, tinjau hasilnya
contoh DAG kedua, long_running_create_large_txt_file_print_logs
.
Picu DAG kedua, long_running_create_large_txt_file_print_logs
:
Di Konsol Google Cloud, buka halaman Environments.
Di kolom server web Airflow, ikuti link Airflow untuk lingkungan fleksibel App Engine.
Di antarmuka web Airflow, pada halaman DAGs, di bagian Link untuk DAG, klik tombol Trigger Dag.
Klik Pemicu.
Di halaman DAG, klik tugas yang Anda picu dan tinjau outputnya log untuk memastikan DAG Anda mulai berjalan.
Menunggu hingga DAG
long_running_create_large_txt_file_print_logs
berjalan gagal. Proses ini memerlukan waktu sekitar satu jam.
Tinjau hasil operasi DAG:
Di halaman DAGs, klik Operasi DAG
long_running_create_large_txt_file_print_logs
. Anda akan melihat bahwa tugas tersebut memiliki statusFailed
, dan durasi operasinya adalah tepat 1 jam 5 menit, yang kurang dari waktu tunggu tugas 1 jam 15 menit.Tinjau log tugas. Setelah DAG membuat file
localfile.txt
di container pekerja Airflow, log akan mencetak bahwa DAG telah dimulai menunggu, dan durasi operasi dicetak di log tugas setiap 1 menit. Dalam contoh ini, DAG mencetak loglocalfile.txt size:
dan ukuran dari filelocalfile.txt
akan menjadi 1,5 GB.
Setelah file yang ditulis ke container pekerja Airflow melebihi penyimpanan maksimum, proses DAG seharusnya gagal. Namun, tugas tidak gagal segera dan terus berjalan sampai durasinya mencapai 1 jam 5 menit. Ini terjadi karena Kubernetes tidak langsung menghentikan tugas dan berjalan untuk memberikan waktu 1 jam untuk pemulihan, yang dikenal sebagai "gangguan penghentian titik". Setelah node kehabisan resource, Kubernetes tidak akan menghentikan Pod akan segera menangani penghentian dengan baik, sehingga meminimalkan berdampak pada pengguna akhir.
Masa tenggang penghentian membantu pengguna memulihkan file setelah kegagalan tugas, namun, hal ini dapat menyebabkan kebingungan saat mendiagnosis DAG. Saat pekerja Airflow jika batas penyimpanan terlampaui, status tugas akhir bergantung pada durasi Operasi DAG:
Jika operasi DAG melebihi batas penyimpanan pekerja, tetapi selesai dalam waktu kurang dari 1 jam, tugas selesai dengan status
Success
karena telah selesai dalam masa tenggang penghentian. Namun, Kubernetes menghentikan Pod dan file yang ditulis akan segera dihapus dari container.Jika DAG melebihi batas penyimpanan pekerja dan berjalan selama lebih dari 1 jam, DAG terus berjalan selama 1 jam dan dapat melebihi batas penyimpanan sebesar ribuan persen sebelum Kubernetes menghilangkan tanda Pod dan Airflow tugas sebagai
Failed
.
Mendiagnosis DAG yang gagal
Jika Anda menjalankan beberapa tugas pada saat kegagalan, pertimbangkan untuk menjalankan hanya satu tugas dan mendiagnosis tekanan sumber daya selama waktu itu untuk mengidentifikasi tugas mana yang menyebabkan tekanan sumber daya dan sumber daya mana yang perlu Anda tingkatkan.
Tinjau log tugas DAG kedua,
long_running_create_large_txt_file_print_logs
:
Di Konsol Google Cloud, buka halaman Environments.
Pada daftar lingkungan, klik nama lingkungan Anda. Halaman Detail lingkungan akan terbuka.
Buka tab Logs, lalu buka All logs > Log Airflow > Pekerja > Lihat di Logs Explorer.
Filter log berdasarkan jenis: hanya tampilkan pesan Error.
Di log, Anda akan melihat pesan yang mirip dengan berikut ini:
Container storage usage of worker reached 155.7% of the limit.
This likely means that the total size of local files generated by your DAGs is
close to the storage limit of worker.
You may need to decrease the storage usage or increase the worker storage limit
in your Cloud Composer environment configuration.
atau
Pod storage usage of worker reached 140.2% of the limit.
A worker pod was evicted at 2023-12-01T12:30:05Z with message: Pod ephemeral
local storage usage exceeds the total limit of containers 1023Mi.
This eviction likely means that the total size of dags and plugins folders plus
local files generated by your DAGs exceeds the storage limit of worker.
Please decrease the storage usage or increase the worker storage limit in your
Cloud Composer environment configuration.
Pesan ini menunjukkan bahwa, seiring progres tugas, log Airflow dimulai error pencetakan saat ukuran file yang dihasilkan oleh DAG melebihi batas penyimpanan pekerja, dan masa tenggang penghentian dimulai. Selama masa tenggang penghentian, konsumsi penyimpanan tidak kembali ke batas, yang berujung pada dihapusnya Pod setelah masa tenggang penghentian berakhir.
Tinjau pemantauan kesehatan lingkungan dan konsumsi resource:
Di Konsol Google Cloud, buka halaman Environments.
Pada daftar lingkungan, klik nama lingkungan Anda. Halaman Detail lingkungan akan terbuka.
Buka tab Monitoring lalu pilih Overview.
Di panel Environment overview, cari Grafik Kondisi Lingkungan (Airflow Monitoring DAG). Isinya adalah , yang sesuai dengan waktu saat log mulai mengalami error pencetakan.
Pilih Workers, lalu cari grafik Total worker disk usage. Amati bahwa baris Disk usage memiliki lonjakan dan melampaui Batas disk pada saat tugas Anda sedang berjalan.
Meningkatkan batas penyimpanan pekerja
Alokasikan penyimpanan pekerja Airflow tambahan agar DAG contoh dapat berhasil:
Di Konsol Google Cloud, buka halaman Environments.
Pada daftar lingkungan, klik nama lingkungan Anda. Halaman Detail lingkungan akan terbuka.
Buka tab Konfigurasi lingkungan.
Temukan konfigurasi Resource > Workloads dan Klik Edit.
Di bagian Worker, pada kolom Storage, tentukan penyimpanan baru untuk pekerja Airflow. Dalam tutorial ini, tetapkan ke 2 GB.
Simpan perubahan dan tunggu beberapa menit agar pekerja Airflow Anda dapat mulai ulang.
Menguji DAG dengan batas penyimpanan baru
Picu DAG long_running_create_large_txt_file_print_logs
lagi dan
tunggu selama 1 jam 15 menit hingga selesai dijalankan.
Di log output proses DAG, Anda akan melihat
Marking task as SUCCESS
, dan status tugas akan menunjukkan Success, dengan durasi 1 jam dan 15 menit, yang sama dengan waktu tunggu yang ditetapkan dalam kode DAG.Tinjau bagian Environment overview di tab Monitoring dan pastikan tidak ada area berwarna merah.
Klik bagian Workers dan cari Total worker disk usage grafik. Anda akan melihat bahwa baris Batas disk mencerminkan perubahan dalam batas penyimpanan yang diizinkan, dan baris Penggunaan disk berada dalam rentang yang diizinkan.
Ringkasan
Dalam tutorial ini, Anda telah mendiagnosis alasan kegagalan DAG dan mengidentifikasi jenis resource yang menyebabkan tekanan dengan men-debug dua contoh DAG yang gagal karena kurangnya memori dan penyimpanan pekerja. Anda kemudian berhasil menjalankan DAG setelah mengalokasikan lebih banyak memori dan penyimpanan untuk pekerja Anda. Namun, direkomendasikan untuk mengoptimalkan DAG (alur kerja) untuk mengurangi konsumsi sumber daya pekerja, karena hal itu meningkatkan resource di luar batas tertentu.
Pembersihan
Agar tidak menimbulkan biaya ke akun Google Cloud Anda untuk resource yang digunakan dalam tutorial ini, hapus project yang berisi resource tersebut atau mempertahankan proyek dan menghapus sumber daya satu per satu.
Menghapus project
- Di konsol Google Cloud, buka halaman Manage resource.
- Pada daftar project, pilih project yang ingin Anda hapus, lalu klik Delete.
- Pada dialog, ketik project ID, lalu klik Shut down untuk menghapus project.
Menghapus resource satu per satu
Jika Anda berencana mempelajari beberapa tutorial dan panduan memulai, menggunakan kembali project dapat membantu Anda agar tidak melampaui batas kuota project.
Hapus lingkungan Cloud Composer. Anda juga menghapus bucket lingkungan selama prosedur ini.