Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
Tutorial ini memberikan langkah-langkah untuk men-debug DAG Airflow yang gagal di Cloud Composer dan mendiagnosis masalah terkait resource pekerja, seperti kekurangan memori atau ruang penyimpanan pekerja, dengan bantuan log dan pemantauan lingkungan.
Pengantar
Tutorial ini berfokus pada masalah terkait resource untuk menunjukkan cara men-debug DAG.
Kurangnya resource pekerja yang dialokasikan menyebabkan kegagalan DAG. Jika tugas Airflow kehabisan memori atau penyimpanan, Anda mungkin melihat pengecualian Airflow, seperti:
WARNING airflow.exceptions.AirflowException: Task received SIGTERM signal
INFO - Marking task as FAILED.
atau
Task exited with return code Negsignal.SIGKILL
Dalam kasus tersebut, rekomendasi umum adalah meningkatkan resource pekerja Airflow atau mengurangi jumlah tugas per pekerja. Namun, karena pengecualian Airflow dapat bersifat umum, mungkin sulit untuk mengidentifikasi resource tertentu yang menyebabkan masalah.
Tutorial ini menjelaskan cara mendiagnosis alasan kegagalan DAG dan mengidentifikasi jenis resource yang menyebabkan masalah dengan men-debug dua contoh DAG yang gagal karena kurangnya memori dan penyimpanan pekerja.
Tujuan
Jalankan contoh DAG yang gagal karena alasan berikut:
- Kurangnya memori pekerja
- Kurangnya penyimpanan pekerja
Mendiagnosis alasan kegagalan
Meningkatkan alokasi resource pekerja
Menguji DAG dengan batas resource baru
Biaya
Tutorial ini menggunakan komponen Google Cloud yang dapat ditagih berikut: Google Cloud:
Setelah menyelesaikan tutorial ini, Anda dapat menghindari penagihan berkelanjutan dengan menghapus resource yang Anda buat. Untuk detail selengkapnya, lihat Pembersihan.
Sebelum memulai
Bagian ini menjelaskan tindakan yang diperlukan sebelum Anda memulai tutorial.
Membuat dan mengonfigurasi project
Untuk tutorial ini, Anda memerlukan project Google Cloud. Konfigurasikan project dengan cara berikut:
Di konsol Google Cloud , pilih atau buat project:
Pastikan penagihan diaktifkan untuk project Anda. Pelajari cara memeriksa apakah penagihan diaktifkan pada project.
Pastikan pengguna project Google Cloud Anda memiliki peran berikut untuk membuat resource yang diperlukan:
- Environment and Storage Object Administrator
(
roles/composer.environmentAndStorageObjectAdmin
) - Compute Admin (
roles/compute.admin
) - Monitoring Editor (
roles/monitoring.editor
)
- Environment and Storage Object Administrator
(
Mengaktifkan API untuk project Anda
Enable the Cloud Composer API.
Membuat lingkungan Cloud Composer
Buat lingkungan Cloud Composer 2.
Sebagai bagian dari pembuatan lingkungan, Anda memberikan peran Ekstensi Agen Layanan Cloud Composer v2 API (roles/composer.ServiceAgentV2Ext
) ke akun Agen Layanan Composer. Cloud Composer menggunakan akun ini untuk melakukan operasi
di project Google Cloud .
Memeriksa batas resource pekerja
Periksa batas resource pekerja Airflow di lingkungan Anda:
Di konsol Google Cloud , buka halaman Environments.
Di daftar lingkungan, klik nama lingkungan Anda. Halaman Environment details akan terbuka.
Buka tab Konfigurasi lingkungan.
Buka Resource > Workloads configuration > Worker.
Pastikan nilainya adalah 0,5 vCPU, memori 1,875 GB, dan penyimpanan 1 GB. Ini adalah batas resource pekerja Airflow yang Anda gunakan di langkah berikut dalam tutorial ini.
Contoh: Mendiagnosis masalah kehabisan memori
Upload contoh DAG berikut ke lingkungan
yang Anda buat di langkah sebelumnya. Dalam tutorial ini, DAG ini diberi nama
create_list_with_many_strings
.
DAG ini berisi satu tugas yang menjalankan langkah-langkah berikut:
- Membuat daftar kosong
s
. - Menjalankan siklus untuk menambahkan string
More
ke daftar. - Mencetak jumlah memori yang digunakan daftar dan menunggu 1 detik dalam setiap iterasi 1 menit.
import time
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import sys
from datetime import timedelta
default_args = {
'start_date': airflow.utils.dates.days_ago(0),
'retries': 0,
'retry_delay': timedelta(minutes=10)
}
dag = DAG(
'create_list_with_many_strings',
default_args=default_args,
schedule_interval=None)
def consume():
s = []
for i in range(120):
for j in range(1000000):
s.append("More")
print(f"i={i}; size={sys.getsizeof(s) / (1000**3)}GB")
time.sleep(1)
t1 = PythonOperator(
task_id='task0',
python_callable=consume,
dag=dag,
depends_on_past=False,
retries=0
)
Memicu DAG contoh
Memicu DAG contoh, create_list_with_many_strings
:
Di konsol Google Cloud , buka halaman Environments.
Di kolom Airflow webserver, ikuti link Airflow untuk lingkungan Anda.
Di antarmuka web Airflow, pada halaman DAG, di kolom Link untuk DAG Anda, klik tombol Trigger Dag.
Klik Pemicu.
Di halaman DAG, klik tugas yang Anda picu dan tinjau log output untuk memastikan DAG Anda mulai berjalan.
Saat tugas berjalan, log output akan mencetak ukuran memori dalam GB yang digunakan DAG.
Setelah beberapa menit, tugas akan gagal karena melebihi batas memori pekerja Airflow sebesar 1,875 GB.
Mendiagnosis DAG yang gagal
Jika Anda menjalankan beberapa tugas pada saat kegagalan, pertimbangkan untuk menjalankan hanya satu tugas dan mendiagnosis tekanan resource selama waktu tersebut untuk mengidentifikasi tugas mana yang menyebabkan tekanan resource dan resource mana yang perlu Anda tingkatkan.
Meninjau log tugas Airflow
Perhatikan bahwa tugas dari DAG create_list_with_many_strings
memiliki
status Failed
.
Tinjau log tugas. Anda akan melihat entri log berikut:
```none
{local_task_job.py:102} INFO - Task exited with return code
Negsignal.SIGKILL
```
`Netsignal.SIGKILL` might be an indication of your task using more memory
than the Airflow worker is allocated. The system sends
the `Negsignal.SIGKILL` signal to avoid further memory consumption.
Meninjau workload
Tinjau beban kerja untuk memastikan beban tugas Anda tidak menyebabkan node tempat Pod berjalan melebihi batas konsumsi memori:
Di konsol Google Cloud , buka halaman Environments.
Di daftar lingkungan, klik nama lingkungan Anda. Halaman Environment details akan terbuka.
Buka tab Konfigurasi lingkungan.
Di Resources > GKE cluster > Workloads, klik view cluster workloads.
Periksa apakah beberapa pod beban kerja memiliki status yang mirip dengan berikut:
Error with exit code 137 and 1 more issue. ContainerStatusUnknown with exit code 137 and 1 more issue
Exit code 137
berarti container atau Pod mencoba menggunakan lebih banyak memori dari yang diizinkan. Proses dihentikan untuk mencegah penggunaan memori.
Meninjau pemantauan kesehatan lingkungan dan konsumsi resource
Tinjau pemantauan kesehatan lingkungan dan konsumsi resource:
Di konsol Google Cloud , buka halaman Environments.
Di daftar lingkungan, klik nama lingkungan Anda. Halaman Environment details akan terbuka.
Buka tab Monitoring, lalu pilih Overview.
Di panel Ringkasan lingkungan, temukan grafik Kondisi Lingkungan (DAG Pemantauan Aliran Data). File ini berisi area merah, yang sesuai dengan waktu saat log mulai mencetak error.
Pilih Pekerja, lalu temukan grafik Total penggunaan memori pekerja. Perhatikan bahwa baris Penggunaan memori memiliki lonjakan pada saat tugas berjalan.
Meskipun garis penggunaan memori pada grafik tidak mencapai batas, saat mendiagnosis alasan kegagalan, Anda hanya perlu mempertimbangkan memori yang dapat dialokasikan, sedangkan garis Batas memori pada grafik menunjukkan total memori yang tersedia (termasuk kapasitas yang dicadangkan oleh GKE).
Dalam contoh ini, batas memori pekerja ditetapkan ke 1,875 GB. GKE mencadangkan 25% dari 4 GiB pertama memori. GKE juga mencadangkan ambang penghapusan tambahan: memori 100 MiB di setiap node untuk penghapusan kubelet.
Memori yang dapat dialokasikan dihitung dengan cara berikut:
ALLOCATABLE = CAPACITY - RESERVED - EVICTION-THRESHOLD
Jika batas memori adalah 1,875 GB, memori yang dapat dialokasikan sebenarnya adalah:
1.75 GiB (1.875GB) - 0.44 (25% GiB reserved) - 0.1 = 1.21 GiB (~1.3 GB).
Saat menambahkan batas sebenarnya ini ke grafik penggunaan memori, Anda akan melihat bahwa lonjakan penggunaan memori tugas mencapai batas memori yang sebenarnya, dan Anda dapat menyimpulkan bahwa tugas gagal karena memori pekerja tidak memadai.
Meningkatkan batas memori pekerja
Alokasikan memori pekerja tambahan agar contoh DAG berhasil:
Di konsol Google Cloud , buka halaman Environments.
Di daftar lingkungan, klik nama lingkungan Anda. Halaman Environment details akan terbuka.
Buka tab Konfigurasi lingkungan.
Temukan konfigurasi Resources > Workloads, lalu klik Edit.
Di bagian Worker, di kolom Memory, tentukan batas memori baru untuk pekerja Airflow. Dalam tutorial ini, gunakan 3 GB.
Simpan perubahan dan tunggu beberapa menit hingga pekerja Airflow Anda memulai ulang.
Menguji DAG dengan batas memori baru
Picu DAG create_list_with_many_strings
lagi dan tunggu hingga
selesai berjalan.
Dalam log output dari DAG yang dijalankan, Anda akan melihat
Marking task as SUCCESS
, dan status tugas akan menunjukkan Sukses.Tinjau bagian Ringkasan lingkungan di tab Monitoring dan pastikan tidak ada area merah.
Klik bagian Pekerja dan temukan grafik Total penggunaan memori pekerja. Anda akan melihat bahwa baris Batas memori mencerminkan perubahan batas memori, dan baris Penggunaan memori jauh di bawah batas memori yang dapat dialokasikan yang sebenarnya.
Contoh: Mendiagnosis masalah kehabisan penyimpanan
Pada langkah ini, Anda akan mengupload dua DAG yang membuat file berukuran besar. DAG pertama membuat file berukuran besar. DAG kedua membuat file besar dan meniru operasi yang berjalan lama.
Ukuran file di kedua DAG melebihi batas penyimpanan pekerja Airflow default sebesar 1 GB, tetapi DAG kedua memiliki tugas tunggu tambahan untuk memperpanjang durasinya secara artifisial.
Anda akan menyelidiki perbedaan perilaku kedua DAG di langkah berikutnya.
Mengupload DAG yang membuat file berukuran besar
Upload contoh DAG berikut ke lingkungan
yang Anda buat di langkah sebelumnya. Dalam tutorial ini, DAG ini diberi nama
create_large_txt_file_print_logs
.
DAG ini berisi satu tugas yang menjalankan langkah-langkah berikut:
- Menulis file
localfile.txt
1,5 GB ke penyimpanan pekerja Airflow. - Mencetak ukuran file yang dibuat menggunakan modul
os
Python. - Mencetak durasi operasi DAG setiap 1 menit.
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import os
from datetime import timedelta
import time
default_args = {
'start_date': airflow.utils.dates.days_ago(0),
'retries': 0,
'retry_delay': timedelta(minutes=10)
}
dag = DAG(
'create_large_txt_file_print_logs',
default_args=default_args,
schedule_interval=None)
def consume():
size = 1000**2 # bytes in 1 MB
amount = 100
def create_file():
print(f"Start creating a huge file")
with open("localfile.txt", "ab") as f:
for j in range(15):
f.write(os.urandom(amount) * size)
print("localfile.txt size:", os.stat("localfile.txt").st_size / (1000**3), "GB")
create_file()
print("Success!")
t1 = PythonOperator(
task_id='create_huge_file',
python_callable=consume,
dag=dag,
depends_on_past=False,
retries=0)
Mengupload DAG yang membuat file besar dalam operasi yang berjalan lama
Untuk meniru DAG yang berjalan lama dan menyelidiki dampak durasi tugas
pada status akhir, upload contoh DAG kedua ke
lingkungan Anda. Dalam tutorial ini, DAG ini diberi nama
long_running_create_large_txt_file_print_logs
.
DAG ini berisi satu tugas yang menjalankan langkah-langkah berikut:
- Menulis file
localfile.txt
1,5 GB ke penyimpanan pekerja Airflow. - Mencetak ukuran file yang dibuat menggunakan modul
os
Python. - Menunggu 1 jam 15 menit untuk meniru beberapa waktu yang diperlukan untuk operasi dengan file, misalnya, membaca dari file.
- Mencetak durasi operasi DAG setiap 1 menit.
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import os
from datetime import timedelta
import time
default_args = {
'start_date': airflow.utils.dates.days_ago(0),
'retries': 0,
'retry_delay': timedelta(minutes=10)
}
dag = DAG(
'long_running_create_large_txt_file_print_logs',
default_args=default_args,
schedule_interval=None)
def consume():
size = 1000**2 # bytes in 1 MB
amount = 100
def create_file():
print(f"Start creating a huge file")
with open("localfile.txt", "ab") as f:
for j in range(15):
f.write(os.urandom(amount) * size)
print("localfile.txt size:", os.stat("localfile.txt").st_size / (1000**3), "GB")
create_file()
for k in range(75):
time.sleep(60)
print(f"{k+1} minute")
print("Success!")
t1 = PythonOperator(
task_id='create_huge_file',
python_callable=consume,
dag=dag,
depends_on_past=False,
retries=0)
Memicu contoh DAG
Picu DAG pertama, create_large_txt_file_print_logs
:
Di konsol Google Cloud , buka halaman Environments.
Di kolom Airflow webserver, ikuti link Airflow untuk lingkungan Anda.
Di antarmuka web Airflow, pada halaman DAG, di kolom Link untuk DAG Anda, klik tombol Trigger Dag.
Klik Pemicu.
Di halaman DAG, klik tugas yang Anda picu dan tinjau log output untuk memastikan DAG Anda mulai berjalan.
Tunggu hingga tugas yang Anda buat dengan DAG
create_large_txt_file_print_logs
selesai. Proses ini mungkin memerlukan waktu beberapa menit.Di halaman DAG, klik DAG yang dijalankan. Anda akan melihat tugas Anda memiliki status
Success
, meskipun batas penyimpanan terlampaui.
Tinjau log Airflow tugas:
Di konsol Google Cloud , buka halaman Environments.
Di daftar lingkungan, klik nama lingkungan Anda. Halaman Environment details akan terbuka.
Buka tab Logs, lalu buka All logs > Airflow logs > Workers > View in Logs Explorer.
Memfilter log menurut jenis: hanya menampilkan pesan Error.
Dalam log, Anda akan melihat pesan yang mirip dengan berikut ini:
Worker: warm shutdown (Main Process)
atau
A worker pod was evicted at 2023-12-01T12:30:05Z with message: Pod ephemeral
local storage usage exceeds the total limit of containers 1023Mi.
Log ini menunjukkan bahwa pod memulai proses "warm shutdown" karena penyimpanan yang digunakan melebihi batas dan dihapus dalam 1 jam. Namun, pengoperasian DAG tidak gagal karena selesai dalam masa tenggang penghentian Kubernetes, yang dijelaskan lebih lanjut dalam tutorial ini.
Untuk mengilustrasikan konsep masa tenggang penghentian, tinjau hasil
contoh DAG kedua, long_running_create_large_txt_file_print_logs
.
Picu DAG kedua, long_running_create_large_txt_file_print_logs
:
Di konsol Google Cloud , buka halaman Environments.
Di kolom Airflow webserver, ikuti link Airflow untuk lingkungan Anda.
Di antarmuka web Airflow, pada halaman DAG, di kolom Link untuk DAG Anda, klik tombol Trigger Dag.
Klik Pemicu.
Di halaman DAG, klik tugas yang Anda picu dan tinjau log output untuk memastikan DAG Anda mulai berjalan.
Tunggu hingga operasi DAG
long_running_create_large_txt_file_print_logs
gagal. Proses ini akan memerlukan waktu sekitar satu jam.
Tinjau hasil operasi DAG:
Di halaman DAG, klik eksekusi DAG
long_running_create_large_txt_file_print_logs
. Anda akan melihat bahwa tugas memiliki statusFailed
, dan durasi operasinya tepat 1 jam 5 menit, yang kurang dari periode tunggu tugas 1 jam 15 menit.Tinjau log tugas. Setelah DAG membuat file
localfile.txt
di penampung pekerja Airflow, log akan mencetak bahwa DAG mulai menunggu, dan durasi operasi akan dicetak di log tugas setiap 1 menit. Dalam contoh ini, DAG mencetak loglocalfile.txt size:
dan ukuran filelocalfile.txt
akan menjadi 1,5 GB.
Setelah file yang ditulis ke penampung pekerja Airflow melebihi batas penyimpanan, operasi DAG akan gagal. Namun, tugas tidak langsung gagal dan terus berjalan hingga durasinya mencapai 1 jam 5 menit. Hal ini terjadi karena Kubernetes tidak langsung menghentikan tugas dan terus berjalan untuk memberikan waktu pemulihan selama 1 jam, yang dikenal sebagai "periode masa tenggang penghentian". Setelah node kehabisan resource, Kubernetes tidak segera menghentikan Pod untuk menangani penghentian dengan baik, sehingga ada dampak minimal pada pengguna akhir.
Masa tenggang penghentian membantu pengguna memulihkan file setelah kegagalan tugas, tetapi hal ini dapat menyebabkan kebingungan saat mendiagnosis DAG. Jika batas penyimpanan pekerja Airflow terlampaui, status tugas akhir bergantung pada durasi eksekusi DAG:
Jika DAG yang berjalan melebihi batas penyimpanan pekerja, tetapi selesai dalam waktu kurang dari 1 jam, tugas akan selesai dengan status
Success
karena selesai dalam masa tenggang penghentian. Namun, Kubernetes menghentikan Pod dan file yang ditulis akan langsung dihapus dari penampung.Jika DAG melebihi batas penyimpanan pekerja dan berjalan selama lebih dari 1 jam, DAG akan terus berjalan selama 1 jam dan dapat melebihi batas penyimpanan sebesar ribuan persen sebelum Kubernetes menghapus Pod dan Airflow menandai tugas sebagai
Failed
.
Mendiagnosis DAG yang gagal
Jika Anda menjalankan beberapa tugas pada saat kegagalan, pertimbangkan untuk menjalankan hanya satu tugas dan mendiagnosis tekanan resource selama waktu tersebut untuk mengidentifikasi tugas mana yang menyebabkan tekanan resource dan resource mana yang perlu Anda tingkatkan.
Tinjau log tugas DAG kedua,
long_running_create_large_txt_file_print_logs
:
Di konsol Google Cloud , buka halaman Environments.
Di daftar lingkungan, klik nama lingkungan Anda. Halaman Environment details akan terbuka.
Buka tab Logs, lalu buka All logs > Airflow logs > Workers > View in Logs Explorer.
Memfilter log menurut jenis: hanya menampilkan pesan Error.
Dalam log, Anda akan melihat pesan yang mirip dengan berikut ini:
Container storage usage of worker reached 155.7% of the limit.
This likely means that the total size of local files generated by your DAGs is
close to the storage limit of worker.
You may need to decrease the storage usage or increase the worker storage limit
in your Cloud Composer environment configuration.
atau
Pod storage usage of worker reached 140.2% of the limit.
A worker pod was evicted at 2023-12-01T12:30:05Z with message: Pod ephemeral
local storage usage exceeds the total limit of containers 1023Mi.
This eviction likely means that the total size of dags and plugins folders plus
local files generated by your DAGs exceeds the storage limit of worker.
Please decrease the storage usage or increase the worker storage limit in your
Cloud Composer environment configuration.
Pesan ini menunjukkan bahwa, seiring berjalannya tugas, log Airflow mulai mencetak error saat ukuran file yang dihasilkan oleh DAG Anda melebihi batas penyimpanan pekerja, dan masa tenggang penghentian dimulai. Selama masa tenggang penghentian, penggunaan penyimpanan tidak kembali ke batas, yang menyebabkan penghapusan Pod setelah masa tenggang penghentian berakhir.
Tinjau pemantauan kesehatan lingkungan dan konsumsi resource:
Di konsol Google Cloud , buka halaman Environments.
Di daftar lingkungan, klik nama lingkungan Anda. Halaman Environment details akan terbuka.
Buka tab Monitoring, lalu pilih Overview.
Di panel Ringkasan lingkungan, temukan grafik Kondisi Lingkungan (DAG Pemantauan Aliran Data). File ini berisi area merah, yang sesuai dengan waktu saat log mulai mencetak error.
Pilih Pekerja, lalu temukan grafik Total penggunaan disk pekerja. Perhatikan bahwa baris Disk usage memiliki lonjakan dan melebihi baris Disk limit pada saat tugas Anda berjalan.
Meningkatkan batas penyimpanan pekerja
Alokasikan penyimpanan pekerja Airflow tambahan agar DAG contoh berhasil:
Di konsol Google Cloud , buka halaman Environments.
Di daftar lingkungan, klik nama lingkungan Anda. Halaman Environment details akan terbuka.
Buka tab Konfigurasi lingkungan.
Temukan konfigurasi Resources > Workloads, lalu klik Edit.
Di bagian Pekerja, di kolom Penyimpanan, tentukan batas penyimpanan baru untuk pekerja Airflow. Dalam tutorial ini, tetapkan ke 2 GB.
Simpan perubahan dan tunggu beberapa menit hingga pekerja Airflow Anda memulai ulang.
Menguji DAG dengan batas penyimpanan baru
Picu DAG long_running_create_large_txt_file_print_logs
lagi dan
tunggu selama 1 jam 15 menit hingga selesai berjalan.
Dalam log output dari DAG yang dijalankan, Anda akan melihat
Marking task as SUCCESS
, dan status tugas akan menunjukkan Sukses, dengan durasi 1 jam dan 15 menit yang sama dengan waktu tunggu yang ditetapkan dalam kode DAG.Tinjau bagian Ringkasan lingkungan di tab Monitoring dan pastikan tidak ada area merah.
Klik bagian Workers dan temukan grafik Total workers disk usage. Anda akan melihat bahwa baris Batas disk mencerminkan perubahan dalam batas penyimpanan, dan baris Penggunaan disk berada dalam rentang yang diizinkan.
Ringkasan
Dalam tutorial ini, Anda telah mendiagnosis alasan kegagalan DAG dan mengidentifikasi jenis resource yang menyebabkan tekanan dengan men-debug dua contoh DAG yang gagal karena kurangnya memori dan penyimpanan pekerja. Kemudian, Anda berhasil menjalankan DAG setelah mengalokasikan lebih banyak memori dan penyimpanan ke pekerja. Namun, sebaiknya optimalkan DAG (alur kerja) untuk mengurangi konsumsi resource pekerja terlebih dahulu, karena tidak mungkin untuk meningkatkan resource di luar nilai minimum tertentu.
Pembersihan
Agar tidak dikenai biaya pada akun Google Cloud Anda untuk resource yang digunakan dalam tutorial ini, hapus project yang berisi resource tersebut, atau simpan project dan hapus setiap resource.
Menghapus project
- In the Google Cloud console, go to the Manage resources page.
- In the project list, select the project that you want to delete, and then click Delete.
- In the dialog, type the project ID, and then click Shut down to delete the project.
Menghapus resource satu per satu
Jika Anda berencana mempelajari beberapa tutorial dan panduan memulai, menggunakan kembali project dapat membantu Anda agar tidak melampaui batas kuota project.
Hapus lingkungan Cloud Composer. Anda juga akan menghapus bucket lingkungan selama prosedur ini.