Men-debug masalah penjadwalan tugas

Cloud Composer 1 | Cloud Composer 2

Tutorial ini akan memandu Anda dalam mendiagnosis dan memecahkan masalah penjadwalan tugas dan penguraian masalah yang menyebabkan kegagalan fungsi penjadwal, penguraian error dan latensi, serta kegagalan tugas.

Pengantar

Penjadwal Airflow terutama dipengaruhi oleh dua faktor: penjadwalan tugas dan penguraian DAG. Masalah di salah satu faktor tersebut dapat berdampak negatif terhadap kesehatan dan performa lingkungan.

Terkadang terlalu banyak tugas yang dijadwalkan secara bersamaan. Dalam situasi ini, antrean akan diisi, dan tugas tetap dalam status "terjadwal" atau menjadi dijadwalkan ulang setelah diantrekan, yang dapat menyebabkan kegagalan tugas dan latensi performa.

Masalah umum lainnya adalah mengurai latensi dan error yang disebabkan oleh kompleksitas kode DAG. Misalnya, kode DAG yang berisi variabel Airflow di tingkat atas kode dapat menyebabkan penundaan penguraian, overload database, kegagalan penjadwalan, dan waktu tunggu DAG.

Dalam tutorial ini, Anda akan mendiagnosis contoh DAG dan mempelajari cara memecahkan masalah penjadwalan dan penguraian, meningkatkan penjadwalan DAG, serta mengoptimalkan kode DAG dan konfigurasi lingkungan untuk meningkatkan performa.

Tujuan

Bagian ini mencantumkan tujuan untuk contoh dalam tutorial ini.

Contoh: Scheduler gagal berfungsi dan latensi disebabkan oleh konkurensi tugas yang tinggi

  • Upload DAG sampel yang berjalan beberapa kali secara bersamaan dan mendiagnosis kegagalan fungsi penjadwal dan masalah latensi dengan Cloud Monitoring.

  • Optimalkan kode DAG Anda dengan menggabungkan tugas-tugas dan mengevaluasi dampak performanya.

  • Distribusikan tugas secara lebih merata dari waktu ke waktu dan evaluasi dampak performanya.

  • Optimalkan konfigurasi Airflow dan konfigurasi lingkungan Anda, serta evaluasi dampaknya.

Contoh: Error penguraian DAG dan latensi yang disebabkan oleh kode kompleks

  • Upload contoh DAG dengan variabel Airflow dan diagnosis masalah penguraian dengan Cloud Monitoring.

  • Optimalkan kode DAG dengan menghindari variabel Airflow di tingkat teratas kode dan mengevaluasi dampaknya terhadap waktu penguraian.

  • Mengoptimalkan konfigurasi dan konfigurasi lingkungan Airflow serta mengevaluasi dampaknya terhadap waktu penguraian.

Biaya

Tutorial ini menggunakan komponen Google Cloud yang dapat ditagih berikut:

Setelah menyelesaikan tutorial ini, Anda dapat menghindari penagihan berkelanjutan dengan menghapus resource yang Anda buat. Untuk mengetahui detail selengkapnya, lihat Membersihkan.

Sebelum memulai

Bagian ini menjelaskan tindakan yang diperlukan sebelum Anda memulai tutorial.

Membuat dan mengonfigurasi project

Untuk tutorial ini, Anda memerlukan project Google Cloud. Konfigurasikan project dengan cara berikut:

  1. Di Konsol Google Cloud, pilih atau buat project:

    Buka Project Selector

  2. Pastikan penagihan diaktifkan untuk project Anda. Pelajari cara memeriksa apakah penagihan diaktifkan di sebuah project.

  3. Pastikan pengguna project Google Cloud Anda memiliki peran berikut untuk membuat resource yang diperlukan:

    • Environment and Storage Object Administrator (roles/composer.environmentAndStorageObjectAdmin)
    • Compute Admin (roles/compute.admin)

Mengaktifkan API untuk project Anda

Aktifkan API Cloud Composer.

Mengaktifkan API

Membuat lingkungan Cloud Composer

Membuat lingkungan Cloud Composer 2.

Sebagai bagian dari pembuatan lingkungan, Anda memberikan peran Ekstensi Agen Layanan Cloud Composer v2 API (roles/composer.ServiceAgentV2Ext) ke akun Agen Layanan Composer. Cloud Composer menggunakan akun ini untuk menjalankan operasi di project Google Cloud Anda.

Contoh: Penjadwal gagal berfungsi dan kegagalan tugas karena masalah penjadwalan tugas

Contoh ini menunjukkan kegagalan fungsi penjadwal dan latensi yang disebabkan oleh konkurensi tugas yang tinggi.

Upload DAG sampel ke lingkungan Anda

Upload contoh DAG berikut ke lingkungan yang Anda buat pada langkah sebelumnya. Dalam tutorial ini, DAG ini diberi nama dag_10_tasks_200_seconds_1.

DAG ini memiliki 200 tugas. Setiap tugas menunggu selama 1 detik dan mencetak "Selesai!". DAG dipicu secara otomatis setelah diupload. Cloud Composer menjalankan DAG ini 10 kali, dan semua DAG berjalan secara paralel.

import time
from datetime import datetime, timedelta

from airflow import DAG
from airflow.decorators import task

tasks_amount = 200
seconds = 1
minutes = 5

with DAG(
    dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}",
    start_date=datetime(2023, 11, 22, 20, 0),
    end_date=datetime(2023, 11, 22, 20, 49),
    schedule_interval=timedelta(minutes=minutes),
    catchup=True,
) as dag:

    @task
    def create_subtasks(seconds: int) -> None:
        time.sleep(seconds)

    for i in range(tasks_amount):
        create_subtasks(seconds)

Mendiagnosis malfungsi penjadwal dan masalah kegagalan tugas

Setelah DAG berjalan selesai, buka UI Airflow, lalu klik DAG dag_10_tasks_200_seconds_1. Anda akan melihat bahwa total 10 operasi DAG berhasil, dan masing-masing memiliki 200 tugas yang berhasil.

Tinjau log tugas Airflow:

  1. Di konsol Google Cloud, buka halaman Environments.

    Buka Lingkungan

  2. Pada daftar lingkungan, klik nama lingkungan Anda. Halaman Detail lingkungan akan terbuka.

  3. Buka tab Logs, lalu buka All logs > Airflow logs > Workers > View in Logs Explorer.

Pada histogram log, Anda dapat melihat error dan peringatan yang ditunjukkan dengan warna merah dan oranye:

Histogram log pekerja Airflow dengan error dan peringatan yang ditunjukkan dengan warna merah dan oranye
Gambar 1. Histogram log pekerja Airflow (klik untuk memperbesar)

Contoh DAG menghasilkan sekitar 130 peringatan dan 60 error. Klik kolom mana pun yang berisi batang kuning dan merah. Anda akan melihat beberapa peringatan dan error berikut dalam log:

State of this instance has been externally set to success. Terminating
instance.

Received SIGTERM. Terminating subprocesses.

worker: Warm shutdown (MainProcess).

Log ini mungkin menunjukkan bahwa penggunaan resource melampaui batas dan pekerja memulai ulang dirinya sendiri.

Jika tugas Airflow disimpan dalam antrean terlalu lama, penjadwal akan menandainya sebagai gagal dan up_for_retry, serta akan menjadwalkan ulang tugas tersebut sekali lagi untuk dieksekusi. Salah satu cara untuk mengamati gejala situasi ini adalah dengan melihat diagram beserta jumlah tugas dalam antrean dan jika lonjakan dalam diagram ini tidak turun dalam waktu sekitar 10 menit, kemungkinan akan ada kegagalan tugas (tanpa log).

Tinjau informasi pemantauan:

  1. Buka tab Pemantauan dan pilih Ringkasan.

  2. Tinjau grafik Tugas Airflow.

    Grafik tugas Airflow dari waktu ke waktu, yang menunjukkan lonjakan jumlah tugas dalam antrean
    Gambar 2. Grafik tugas Airflow (klik untuk memperbesar)

    Dalam grafik tugas Airflow, ada lonjakan tugas dalam antrean yang berlangsung selama lebih dari 10 menit, yang berarti bahwa resource di lingkungan Anda tidak cukup untuk memproses semua tugas yang dijadwalkan.

  3. Tinjau grafik Pekerja aktif:

    Grafik pekerja Airflow aktif dari waktu ke waktu menunjukkan bahwa jumlah pekerja aktif ditingkatkan hingga batas maksimum
    Gambar 3. Grafik pekerja aktif (klik untuk memperbesar)

    Grafik Active worker menunjukkan bahwa DAG memicu penskalaan otomatis hingga batas maksimum tiga worker yang diizinkan selama menjalankan DAG.

  4. Grafik penggunaan resource dapat menunjukkan kurangnya kapasitas pada pekerja Airflow untuk menjalankan tugas dalam antrean. Pada tab Monitoring, pilih Workers, lalu tinjau grafik Total worker CPU usage dan Total worker memory usage.

    Grafik penggunaan CPU oleh pekerja Airflow menunjukkan penggunaan CPU yang meningkat hingga batas maksimum
    Gambar 4. Grafik penggunaan CPU total pekerja (klik untuk memperbesar)
    Grafik penggunaan memori oleh pekerja Airflow menunjukkan penggunaan memori meningkat, tetapi tidak mencapai batas maksimum
    Gambar 5. Grafik penggunaan memori total pekerja (klik untuk memperbesar)

    Grafik ini menunjukkan bahwa eksekusi terlalu banyak tugas secara bersamaan yang mengakibatkan tercapainya batas CPU. Resource telah digunakan selama lebih dari 30 menit, yang bahkan lebih lama dari total durasi 200 tugas dalam 10 DAG yang berjalan satu per satu.

Ini adalah indikator antrean terisi dan kurangnya resource untuk memproses semua tugas yang dijadwalkan.

Menggabungkan tugas

Kode saat ini membuat banyak DAG dan tugas tanpa resource yang memadai untuk memproses semua tugas secara paralel, yang menyebabkan antrean terisi. Menyimpan tugas dalam antrean terlalu lama dapat menyebabkan tugas menjadwal ulang atau gagal. Dalam situasi tersebut, Anda harus memilih sejumlah kecil tugas gabungan yang lebih kecil.

Contoh DAG berikut mengubah jumlah tugas dalam contoh awal dari 200 menjadi 20 dan meningkatkan waktu tunggu dari 1 menjadi 10 detik untuk meniru lebih banyak tugas terkonsolidasi yang melakukan jumlah pekerjaan yang sama.

Upload contoh DAG berikut ke lingkungan yang Anda buat. Dalam tutorial ini, DAG ini diberi nama dag_10_tasks_20_seconds_10.

import time
from datetime import datetime, timedelta

from airflow import DAG
from airflow.decorators import task

tasks_amount = 20
seconds = 10
minutes = 5

with DAG(
    dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}",
    start_date=datetime(2021, 12, 22, 20, 0),
    end_date=datetime(2021, 12, 22, 20, 49),
    schedule_interval=timedelta(minutes=minutes),
    catchup=True,
) as dag:

    @task
    def create_subtasks(seconds: int) -> None:
        time.sleep(seconds)

    for i in range(tasks_amount):
        create_subtasks(seconds)

Mengevaluasi dampak tugas yang lebih terkonsolidasi pada proses penjadwalan:

  1. Tunggu hingga operasi DAG selesai.

  2. Di UI Airflow, pada halaman DAGs, klik DAG dag_10_tasks_20_seconds_10. Anda akan melihat 10 DAG yang dijalankan, masing-masing memiliki 20 tugas yang berhasil.

  3. Di konsol Google Cloud, buka halaman Environments.

    Buka Lingkungan

  4. Pada daftar lingkungan, klik nama lingkungan Anda. Halaman Detail lingkungan akan terbuka.

  5. Buka tab Logs, lalu buka All logs > Airflow logs > Workers > View in Logs Explorer.

    Contoh kedua dengan tugas yang lebih terkonsolidasi menghasilkan sekitar 10 peringatan dan 7 error. Pada histogram, Anda dapat membandingkan jumlah error dan peringatan di contoh awal (nilai sebelumnya) dan contoh kedua (nilai kemudian).

    Histogram log pekerja Airflow dengan error dan peringatan menunjukkan penurunan jumlah error dan peringatan setelah tugas digabungkan
    Gambar 6. Histogram log pekerja Airflow setelah tugas digabungkan (klik untuk memperbesar)

    Saat membandingkan contoh pertama dengan contoh yang lebih terkonsolidasi, Anda dapat melihat bahwa jumlah error dan peringatan yang jauh lebih sedikit dalam contoh kedua. Namun, error yang sama yang terkait dengan warm shutdown masih muncul di log karena kelebihan beban resource.

  6. Pada tab Monitoring, pilih Workers dan tinjau grafiknya.

    Saat membandingkan grafik tugas Airflow untuk contoh pertama (nilai sebelumnya) dengan grafik untuk contoh kedua yang berisi tugas yang lebih terkonsolidasi, Anda dapat melihat bahwa lonjakan tugas dalam antrean berlangsung lebih singkat ketika tugas lebih digabungkan. Namun, video ini berjalan selama hampir 10 menit, yang masih kurang optimal.

    Grafik tugas Airflow dari waktu ke waktu menunjukkan bahwa lonjakan tugas Airflow berlangsung dalam jangka waktu yang lebih singkat daripada sebelumnya.
    Gambar 7. Grafik tugas Airflow setelah tugas digabungkan (klik untuk memperbesar)

    Pada grafik Pekerja aktif, Anda dapat melihat contoh pertama (di sisi kiri grafik) menggunakan resource dalam jangka waktu yang jauh lebih lama daripada yang kedua, meskipun kedua contoh tersebut meniru jumlah pekerjaan yang sama.

    Grafik pekerja Airflow aktif dari waktu ke waktu menunjukkan bahwa jumlah pekerja aktif meningkat untuk jangka waktu yang lebih singkat dari sebelumnya.
    Gambar 8. Grafik pekerja aktif setelah tugas digabungkan (klik untuk memperbesar)

    Tinjau grafik konsumsi resource pekerja. Meskipun perbedaan antara resource yang digunakan dalam contoh dengan tugas yang lebih terkonsolidasi dan contoh awal cukup signifikan, penggunaan CPU masih melonjak hingga 70% dari batasnya.

    Grafik penggunaan CPU oleh pekerja Airflow menunjukkan penggunaan CPU yang meningkat hingga 70% dari batas maksimum
    Gambar 9. Grafik penggunaan CPU total pekerja setelah tugas digabungkan (klik untuk memperbesar)
    Grafik penggunaan memori oleh pekerja Airflow menunjukkan peningkatan penggunaan memori, tetapi tidak mencapai batas maksimum
    Gambar 10. Grafik penggunaan memori total pekerja setelah tugas digabungkan (klik untuk memperbesar)

Mendistribusikan tugas secara lebih merata seiring waktu

Terlalu banyak tugas serentak menyebabkan antrean diisi, yang menyebabkan tugas macet dalam antrean atau dijadwalkan ulang. Pada langkah sebelumnya, Anda telah mengurangi jumlah tugas dengan menggabungkan tugas-tugas tersebut, tetapi log output dan pemantauan menunjukkan bahwa jumlah tugas serentak masih kurang optimal.

Anda dapat mengontrol jumlah tugas serentak dengan menerapkan jadwal atau menetapkan batas jumlah tugas yang dapat dijalankan secara bersamaan.

Dalam tutorial ini, Anda akan mendistribusikan tugas secara lebih merata dari waktu ke waktu dengan menambahkan parameter tingkat DAG ke dalam DAG dag_10_tasks_20_seconds_10:

  1. Menambahkan argumen max_active_runs=1 ke pengelola konteks DAG. Argumen ini menetapkan batas hanya satu instance DAG yang dijalankan dalam momen tertentu.

  2. Menambahkan argumen max_active_tasks=5 ke pengelola konteks DAG. Argumen ini mengontrol jumlah maksimum instance tugas yang dapat dijalankan secara serentak di setiap DAG.

Upload contoh DAG berikut ke lingkungan yang Anda buat. Dalam tutorial ini, DAG ini diberi nama dag_10_tasks_20_seconds_10_scheduled.py.

import time
from datetime import datetime, timedelta

from airflow import DAG
from airflow.decorators import task

tasks_amount = 20
seconds = 10
minutes = 5
active_runs = 1
active_tasks = 5

with DAG(
    dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}_runs_{active_runs}_tasks_{active_tasks}",
    start_date=datetime(2021, 12, 22, 20, 0),
    end_date=datetime(2021, 12, 22, 20, 49),
    schedule_interval=timedelta(minutes=minutes),
    max_active_runs=active_runs,
    max_active_tasks=active_tasks,
    catchup=True,
) as dag:

    @task
    def create_subtasks(seconds: int) -> None:
        time.sleep(seconds)

    for i in range(tasks_amount):
        create_subtasks(seconds)

Mengevaluasi dampak distribusi tugas dari waktu ke waktu pada proses penjadwalan:

  1. Tunggu hingga operasi DAG selesai.

  2. Di konsol Google Cloud, buka halaman Environments.

    Buka Lingkungan

  3. Pada daftar lingkungan, klik nama lingkungan Anda. Halaman Detail lingkungan akan terbuka.

  4. Buka tab Logs, lalu buka All logs > Airflow logs > Workers > View in Logs Explorer.

  5. Pada histogram, Anda dapat melihat bahwa DAG ketiga dengan jumlah tugas aktif dan operasi terbatas tidak menghasilkan peringatan atau error apa pun, dan distribusi log terlihat lebih merata dibandingkan dengan nilai sebelumnya.

    Histogram log pekerja Airflow dengan error dan peringatan tidak menampilkan error atau peringatan setelah tugas digabungkan dan didistribusikan dari waktu ke waktu.
    Gambar 11. Histogram log pekerja Airflow setelah tugas digabungkan dan didistribusikan dari waktu ke waktu (klik untuk memperbesar)

Tugas dalam contoh dag_10_tasks_20_seconds_10_scheduled yang memiliki jumlah tugas aktif dan berjalan terbatas tidak menyebabkan tekanan resource karena tugas diantrekan secara merata.

Setelah melakukan langkah-langkah yang dijelaskan, Anda mengoptimalkan penggunaan resource dengan menggabungkan tugas-tugas kecil dan mendistribusikannya secara lebih merata dari waktu ke waktu.

Konfigurasi lingkungan Optimize

Anda dapat menyesuaikan konfigurasi lingkungan untuk memastikan selalu ada kapasitas di pekerja Airflow untuk menjalankan tugas dalam antrean.

Jumlah pekerja dan konkurensi pekerja

Anda dapat menyesuaikan jumlah maksimum pekerja agar Cloud Composer otomatis menskalakan lingkungan Anda sesuai batas yang ditetapkan.

Parameter [celery]worker_concurrency menentukan jumlah maksimum tugas yang dapat diambil satu pekerja dari task queue. Mengubah parameter ini akan menyesuaikan jumlah tugas yang dapat dijalankan oleh satu pekerja secara bersamaan. Anda dapat mengubah opsi konfigurasi Airflow ini dengan menggantinya. Secara default, konkurensi pekerja ditetapkan ke minimum berikut: 32, 12 * worker_CPU, 8 * worker_memory, yang berarti bergantung pada batas resource pekerja. Lihat Mengoptimalkan lingkungan untuk mengetahui informasi selengkapnya tentang nilai serentak pekerja default.

Jumlah pekerja dan pekerjaan serentak pekerja dalam kombinasi satu sama lain, dan performa lingkungan Anda sangat bergantung pada kedua parameter tersebut. Anda dapat menggunakan pertimbangan berikut untuk memilih kombinasi yang tepat:

  • Beberapa tugas cepat yang berjalan secara paralel. Anda dapat meningkatkan konkurensi pekerja saat ada tugas yang menunggu dalam antrean, dan pekerja Anda menggunakan persentase CPU dan memori yang rendah secara bersamaan. Namun, dalam keadaan tertentu, antrean mungkin tidak pernah terisi, sehingga menyebabkan penskalaan otomatis tidak pernah dipicu. Jika tugas kecil menyelesaikan eksekusi pada saat pekerja baru siap, pekerja yang ada dapat mengambil tugas yang tersisa, dan tidak akan ada tugas untuk pekerja yang baru dibuat.

    Dalam situasi ini, sebaiknya tingkatkan jumlah minimum pekerja dan tingkatkan konkurensi pekerja untuk menghindari penskalaan yang berlebihan.

  • Beberapa tugas panjang yang berjalan secara paralel. Konkurensi pekerja yang tinggi mencegah sistem menskalakan jumlah pekerja. Jika beberapa tugas menggunakan banyak resource dan memerlukan waktu lama untuk diselesaikan, konkurensi pekerja yang tinggi dapat menyebabkan antrean tidak pernah terisi dan semua tugas hanya diambil oleh satu pekerja, sehingga menyebabkan masalah performa. Dalam situasi ini, sebaiknya tingkatkan jumlah maksimum pekerja dan kurangi konkurensi pekerja.

Pentingnya paralelisme

Penjadwal Airflow mengontrol penjadwalan pengoperasian DAG dan tugas individual dari DAG. Opsi konfigurasi Airflow [core]parallelism mengontrol jumlah tugas yang dapat diantrekan oleh penjadwal Airflow di antrean eksekutor setelah semua dependensi untuk tugas ini terpenuhi.

Paralelisme adalah mekanisme perlindungan Airflow yang menentukan jumlah tugas yang dapat dijalankan secara bersamaan per setiap penjadwal, terlepas dari jumlah pekerjanya. Nilai paralelisme, dikalikan dengan jumlah penjadwal di cluster Anda, adalah jumlah maksimum instance tugas yang dapat diantrekan oleh lingkungan Anda.

Biasanya, [core]parallelism ditetapkan sebagai hasil kali jumlah pekerja maksimum dan [celery]worker_concurrency. Jumlah ini juga dipengaruhi oleh kumpulan. Anda dapat mengubah opsi konfigurasi Airflow ini dengan menggantinya. Untuk mengetahui informasi selengkapnya tentang cara menyesuaikan konfigurasi Airflow yang terkait dengan penskalaan, lihat Menskalakan konfigurasi Airflow.

Menemukan konfigurasi lingkungan yang optimal

Cara yang direkomendasikan untuk memperbaiki masalah penjadwalan adalah menggabungkan tugas-tugas kecil menjadi tugas yang lebih besar dan mendistribusikan tugas secara lebih merata seiring waktu. Selain mengoptimalkan kode DAG, Anda juga dapat mengoptimalkan konfigurasi lingkungan agar memiliki kapasitas yang cukup untuk menjalankan beberapa tugas secara serentak.

Misalnya, anggap Anda menggabungkan tugas di DAG sebanyak mungkin, tetapi membatasi tugas aktif untuk menyebarkannya secara lebih merata dari waktu ke waktu bukanlah solusi yang lebih disukai untuk kasus penggunaan spesifik Anda.

Anda dapat menyesuaikan paralelisme, jumlah pekerja, dan parameter serentak pekerja untuk menjalankan DAG dag_10_tasks_20_seconds_10 tanpa membatasi tugas aktif. Dalam contoh ini, DAG berjalan 10 kali dan setiap sesi berisi 20 tugas kecil. Jika Anda ingin menjalankan semuanya secara bersamaan:

  • Anda memerlukan ukuran lingkungan yang lebih besar karena lingkungan tersebut mengontrol parameter performa infrastruktur Cloud Composer terkelola di lingkungan Anda.

  • Pekerja Airflow harus dapat menjalankan 20 tugas secara bersamaan, yang berarti Anda harus menetapkan konkurensi pekerja ke 20.

  • Pekerja memerlukan CPU dan memori yang cukup untuk menangani semua tugas. Konkurensi pekerja dipengaruhi oleh CPU dan memori pekerja, sehingga Anda akan memerlukan setidaknya worker_concurrency / 12 di CPU dan least worker_concurrency / 8 dalam memori.

  • Anda perlu meningkatkan paralelisme agar cocok dengan konkurensi pekerja yang lebih tinggi. Agar pekerja dapat mengambil 20 tugas dari antrean, penjadwal harus menjadwalkan 20 tugas tersebut terlebih dahulu.

Sesuaikan konfigurasi lingkungan Anda dengan cara berikut:

  1. Di konsol Google Cloud, buka halaman Environments.

    Buka Lingkungan

  2. Pada daftar lingkungan, klik nama lingkungan Anda. Halaman Detail lingkungan akan terbuka.

  3. Buka tab Environment configuration.

  4. Temukan konfigurasi Resource > Workloads, lalu klik Edit.

  5. Di bagian Worker, pada kolom Memory, tentukan batas memori baru untuk pekerja Airflow. Dalam tutorial ini, gunakan 4 GB.

  6. Di kolom CPU, tentukan batas CPU baru untuk pekerja Airflow. Dalam tutorial ini, gunakan 2 vCPU.

  7. Simpan perubahan dan tunggu beberapa menit sampai pekerja Airflow Anda memulai ulang.

Berikutnya, ganti opsi konfigurasi Airflow konkurensi pekerja dan paralelisme:

  1. Buka tab Airflow Configuration Overrides.

  2. Klik Edit, lalu klik Add Airflow Configuration Override.

  3. Ganti konfigurasi parralelisme:

    Bagian Kunci Nilai
    core parallelism 20
  4. Klik Add Airflow Configuration Override dan ganti konfigurasi serentak pekerja:

    Bagian Kunci Nilai
    celery worker_concurrency 20
  5. Klik Save dan tunggu hingga lingkungan memperbarui konfigurasinya.

Picu lagi contoh DAG yang sama dengan konfigurasi yang disesuaikan:

  1. Di UI Airflow, buka halaman DAG.

  2. Temukan DAG dag_10_tasks_20_seconds_10, lalu hapus.

    Setelah DAG dihapus, Airflow akan memeriksa folder DAG di bucket lingkungan Anda dan otomatis menjalankan DAG lagi.

Setelah operasi DAG selesai, tinjau lagi histogram Log. Pada diagram, Anda dapat melihat bahwa contoh dag_10_tasks_20_seconds_10 dengan lebih banyak tugas yang telah terkonsolidasi tidak menghasilkan error dan peringatan apa pun saat dijalankan dengan konfigurasi lingkungan yang disesuaikan. Bandingkan hasilnya dengan data sebelumnya pada diagram, di mana contoh yang sama menghasilkan error dan peringatan saat dijalankan dengan konfigurasi lingkungan default.

Histogram log pekerja Airflow dengan error dan peringatan tidak menampilkan error dan peringatan setelah konfigurasi lingkungan disesuaikan
Gambar 12. Histogram log pekerja Airflow setelah konfigurasi lingkungan disesuaikan (klik untuk memperbesar)

Konfigurasi lingkungan dan konfigurasi Airflow memainkan peran penting dalam penjadwalan tugas, tetapi tidak mungkin untuk meningkatkan konfigurasi melebihi batas tertentu.

Sebaiknya optimalkan kode DAG, gabungkan tugas, dan gunakan penjadwalan untuk performa dan efisiensi yang optimal.

Contoh: Error dan latensi penguraian DAG karena kode DAG yang kompleks

Dalam contoh ini, Anda menyelidiki latensi penguraian DAG sampel yang meniru kelebihan variabel Airflow.

Membuat variabel Airflow baru

Sebelum mengupload kode contoh, buat variabel Airflow baru.

  1. Di konsol Google Cloud, buka halaman Environments.

    Buka Lingkungan

  2. Di kolom webserver Airflow, ikuti link Airflow untuk lingkungan Anda.

  3. Buka Admin > Variabel > Tambahkan data baru.

  4. Tetapkan nilai berikut:

    • kunci: example_var
    • nilai: test_airflow_variable

Upload DAG sampel ke lingkungan Anda

Upload contoh DAG berikut ke lingkungan yang Anda buat pada langkah sebelumnya. Dalam tutorial ini, DAG ini diberi nama dag_for_loop_airflow_variable.

DAG ini berisi loop for yang berjalan 1.000 kali dan meniru kelebihan variabel Airflow. Setiap iterasi membaca variabel example_var dan menghasilkan tugas. Setiap tugas berisi satu perintah yang mencetak nilai variabel.

from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.models import Variable

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 2, 17),
    'retries': 0
}

dag = DAG(
    'dag_for_loop_airflow_variable',
    default_args=default_args,
    catchup=False,
    schedule_interval="@daily"
)

for i in range(1000):
    a = Variable.get('example_var', 'N/A')
    task = BashOperator(
        task_id=f'task_{i}',
        bash_command='echo variable foo=${foo_env}',
        dag=dag,
        env={'foo_env': a}
    )

Mendiagnosis masalah penguraian

Waktu penguraian DAG adalah jumlah waktu yang diperlukan penjadwal Airflow untuk membaca file DAG dan mengurainya. Sebelum penjadwal Airflow dapat menjadwalkan tugas apa pun dari DAG, penjadwal harus mengurai file DAG untuk menemukan struktur DAG dan tugas yang ditentukan.

Jika DAG memerlukan waktu lama untuk diurai, kapasitas penjadwal akan habis dan dapat mengurangi performa berjalan DAG.

Untuk memantau waktu penguraian DAG:

  1. Jalankan perintah Airflow CLI dags report di gcloud CLI untuk melihat waktu penguraian semua DAG Anda:

    gcloud composer environments run ENVIRONMENT_NAME \
        --location LOCATION \
        dags report
    

    Ganti kode berikut:

    • ENVIRONMENT_NAME: nama lingkungan Anda.
    • LOCATION: region tempat lingkungan berada.
  2. Dalam output perintah, cari nilai durasi untuk DAG dag_for_loop_airflow_variables. Nilai yang besar mungkin menunjukkan bahwa DAG ini tidak diterapkan dengan cara yang optimal. Jika Anda memiliki beberapa DAG, dari tabel output, Anda dapat mengidentifikasi DAG mana yang memiliki waktu penguraian yang lama.

    Contoh:

    file                | duration       | dag_num | task_num | dags
    ====================+================+=========+==========+=====================
    /dag_for_loop_airfl | 0:00:14.773594 | 1       | 1000     | dag_for_loop_airflow
    ow_variable.py      |                |         |          | _variable
    /airflow_monitoring | 0:00:00.003035 | 1       | 1        | airflow_monitoring
    .py
    
    
  3. Periksa waktu penguraian DAG di Konsol Google Cloud:

    1. Di konsol Google Cloud, buka halaman Environments.

    Buka Lingkungan

  4. Pada daftar lingkungan, klik nama lingkungan Anda. Halaman Detail lingkungan akan terbuka.

  5. Buka tab Logs, lalu buka All logs > DAG pemroses manager.

  6. Tinjau log dag-processor-manager dan identifikasi kemungkinan masalah.

    Entri log untuk contoh DAG menunjukkan bahwa waktu penguraian DAG adalah 46,3 detik
    Gambar 13. Log pengelola prosesor DAG menampilkan waktu penguraian DAG (klik untuk memperbesar)

Jika total waktu penguraian DAG melebihi sekitar 10 detik, penjadwal Anda mungkin kelebihan beban dengan penguraian DAG dan tidak dapat menjalankan DAG secara efektif.

Mengoptimalkan kode DAG

Direkomendasikan untuk menghindari kode Python "level teratas" yang tidak perlu di DAG Anda. DAG dengan banyak impor, variabel, dan fungsi di luar DAG memperkenalkan waktu penguraian yang lebih besar untuk penjadwal Airflow. Hal ini akan mengurangi performa dan skalabilitas Cloud Composer dan Airflow. Kelebihan pembacaan variabel Airflow dapat menyebabkan waktu penguraian yang lama dan beban database yang tinggi. Jika kode ini berada dalam file DAG, fungsi ini akan dijalankan pada setiap heartbeat penjadwal, yang mungkin lambat.

Kolom template Airflow memungkinkan Anda menggabungkan nilai dari variabel Airflow dan template Jinja ke dalam DAG. Hal ini mencegah eksekusi fungsi yang tidak perlu selama heartbeat penjadwal.

Untuk menerapkan contoh DAG dengan cara yang lebih baik, hindari penggunaan variabel Airflow pada kode Python level teratas DAG. Sebagai gantinya, teruskan variabel Airflow ke operator yang ada melalui template Jinja, yang akan menunda pembacaan nilai hingga tugas dieksekusi.

Upload DAG sampel versi baru ke lingkungan Anda. Dalam tutorial ini, DAG ini diberi nama dag_for_loop_airflow_variable_optimized.

from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 2, 17),
    'retries': 0
}

dag = DAG(
    'dag_for_loop_airflow_variable_optimized',
    default_args=default_args,
    catchup=False,
    schedule_interval='@daily'
)

for i in range(1000):
    task = BashOperator(
        task_id=f'bash_use_variable_good_{i}',
        bash_command='echo variable foo=${foo_env}',
        dag=dag,
        env={'foo_env': '{{ var.value.get("example_var") }}'},
    )

Periksa waktu penguraian DAG yang baru:

  1. Tunggu hingga proses DAG selesai.

  2. Jalankan perintah dags report lagi untuk melihat waktu penguraian semua DAG Anda:

    file                | duration       | dag_num | task_num | dags
    ====================+================+=========+==========+=====================
    /dag_for_loop_airfl | 0:00:37.000369 | 1       | 1000     | dag_for_loop_airflow
    ow_variable.py      |                |         |          | _variable
    /dag_for_loop_airfl | 0:00:01.109457 | 1       | 1000     | dag_for_loop_airflow
    ow_variable_optimiz |                |         |          | _variable_optimized
    ed.py               |                |         |          |
    /airflow_monitoring | 0:00:00.040510 | 1       | 1        | airflow_monitoring
    .py                 |                |         |          |
    
  3. Tinjau log dag-processor-manager lagi dan analisis durasi penguraian.

    Entri log untuk contoh DAG menunjukkan bahwa waktu penguraian DAG adalah 4,21 detik
    Gambar 14. Log pengelola prosesor DAG menampilkan waktu penguraian DAG setelah kode DAG dioptimalkan (klik untuk memperbesar)

Dengan mengganti variabel lingkungan dengan template Airflow, Anda menyederhanakan kode DAG dan mengurangi latensi penguraian sekitar sepuluh kali.

Mengoptimalkan konfigurasi lingkungan Airflow

Penjadwal Airflow terus-menerus mencoba memicu tugas baru dan mengurai semua DAG di bucket lingkungan Anda. Jika DAG memiliki waktu penguraian yang panjang dan penjadwal menghabiskan banyak resource, Anda dapat mengoptimalkan konfigurasi penjadwal Airflow agar penjadwal dapat menggunakan resource dengan lebih efisien.

Dalam tutorial ini, file DAG membutuhkan banyak waktu untuk diurai, dan siklus penguraian mulai tumpang tindih, yang kemudian menghabiskan kapasitas penjadwal. Dalam contoh kami, contoh pertama DAG membutuhkan waktu lebih dari 5 detik untuk diurai, sehingga Anda akan mengonfigurasi penjadwal agar berjalan lebih jarang untuk menggunakan resource secara lebih efisien. Anda akan mengganti opsi konfigurasi Airflow scheduler_heartbeat_sec. Konfigurasi ini menentukan seberapa sering penjadwal harus berjalan (dalam detik). Secara default, nilai ditetapkan ke 5 detik. Anda dapat mengubah opsi konfigurasi Airflow ini dengan menggantinya.

Ganti opsi konfigurasi Airflow scheduler_heartbeat_sec:

  1. Di konsol Google Cloud, buka halaman Environments.

    Buka Lingkungan

  2. Pada daftar lingkungan, klik nama lingkungan Anda. Halaman Detail lingkungan akan terbuka.

  3. Buka tab Airflow Configuration Overrides.

  4. Klik Edit, lalu klik Add Airflow Configuration Override.

  5. Ganti opsi konfigurasi Airflow:

    Bagian Kunci Nilai
    scheduler scheduler_heartbeat_sec 10
  6. Klik Save dan tunggu hingga lingkungan memperbarui konfigurasinya.

Periksa metrik penjadwal:

  1. Buka tab Pemantauan dan pilih Penjadwal.

  2. Di grafik Denyut jantung Penjadwal, klik tombol Opsi lainnya (tiga titik), lalu klik Lihat di Metrics Explorer.

Grafik detak jantung penjadwal menunjukkan bahwa detak jantung terjadi lebih jarang
Gambar 15. Grafik heartbeat penjadwal (klik untuk memperbesar)

Pada grafik, Anda akan melihat penjadwal berjalan dua kali lebih jarang setelah Anda mengubah konfigurasi default dari 5 detik menjadi 10 detik. Dengan mengurangi frekuensi detak jantung, Anda memastikan bahwa penjadwal tidak mulai berjalan saat siklus penguraian sebelumnya sedang berlangsung dan kapasitas resource penjadwal tidak habis.

Menetapkan lebih banyak resource ke penjadwal

Di Cloud Composer 2, Anda dapat mengalokasikan lebih banyak resource CPU dan memori ke penjadwal. Dengan cara ini, Anda dapat meningkatkan performa penjadwal dan mempercepat waktu penguraian untuk DAG.

Alokasikan CPU dan memori tambahan ke penjadwal:

  1. Di konsol Google Cloud, buka halaman Environments.

    Buka Lingkungan

  2. Pada daftar lingkungan, klik nama lingkungan Anda. Halaman Detail lingkungan akan terbuka.

  3. Buka tab Environment configuration.

  4. Temukan konfigurasi Resource > Workloads, lalu klik Edit.

  5. Di bagian Scheduler, pada kolom Memory, tentukan batas memori baru. Dalam tutorial ini, gunakan 4 GB.

  6. Di kolom CPU, tentukan batas CPU baru. Dalam tutorial ini, gunakan 2 vCPU.

  7. Simpan perubahan dan tunggu beberapa menit hingga penjadwal Airflow Anda dimulai ulang.

  8. Buka tab Logs, lalu buka All logs > DAG pemroses manager.

  9. Tinjau log dag-processor-manager dan bandingkan durasi penguraian untuk contoh DAG:

    Entri log untuk contoh DAG menunjukkan bahwa waktu penguraian DAG untuk DAG yang dioptimalkan adalah 1,5 detik. Untuk DAG yang tidak dioptimalkan, waktu penguraian adalah 28,71 detik
    Gambar 16. Log pengelola prosesor DAG menampilkan waktu penguraian DAG setelah lebih banyak resource ditetapkan ke penjadwal (klik untuk memperbesar)

Dengan menetapkan lebih banyak resource ke penjadwal, Anda meningkatkan kapasitas penjadwal dan mengurangi latensi penguraian secara signifikan dibandingkan dengan konfigurasi lingkungan default. Dengan resource yang lebih banyak, penjadwal dapat mengurai DAG lebih cepat, tetapi biaya yang terkait dengan resource Cloud Composer juga akan meningkat. Selain itu, Anda tidak dapat meningkatkan resource di luar batas tertentu.

Sebaiknya alokasikan resource hanya setelah kode DAG yang memungkinkan dan pengoptimalan konfigurasi Airflow diterapkan.

Pembersihan

Agar tidak menimbulkan biaya pada akun Google Cloud Anda untuk resource yang digunakan dalam tutorial ini, hapus project yang berisi resource atau simpan project dan hapus resource satu per satu.

Menghapus project

  1. Di konsol Google Cloud, buka halaman Manage resource.

    Buka Manage resource

  2. Pada daftar project, pilih project yang ingin Anda hapus, lalu klik Delete.
  3. Pada dialog, ketik project ID, lalu klik Shut down untuk menghapus project.

Menghapus resource satu per satu

Jika Anda berencana mempelajari beberapa tutorial dan panduan memulai, menggunakan kembali project dapat membantu Anda agar tidak melampaui batas kuota project.

Menghapus lingkungan Cloud Composer. Anda juga menghapus bucket lingkungan selama prosedur ini.

Langkah selanjutnya