Men-debug masalah penjadwalan tugas

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Tutorial ini memandu Anda dalam mendiagnosis dan memecahkan masalah penjadwalan tugas dan masalah penguraian yang menyebabkan kegagalan fungsi penjadwal, kesalahan penguraian dan latensi, dan kegagalan tugas.

Pengantar

Scheduler Airflow terutama dipengaruhi oleh dua faktor: penjadwalan tugas dan Penguraian DAG. Masalah di salah satu faktor tersebut dapat berdampak negatif pada kesehatan dan performa lingkungan.

Terkadang terlalu banyak tugas yang dijadwalkan secara bersamaan. Dalam situasi ini, antrean terisi, dan tugas tetap berada di "terjadwal" ditetapkan atau menjadi dijadwalkan ulang setelah dimasukkan ke dalam antrean, yang dapat menyebabkan kegagalan tugas dan performa latensi yang rendah.

Masalah umum lainnya adalah penguraian latensi dan {i>error <i}yang disebabkan oleh kompleksitas sebuah kode DAG. Misalnya, kode DAG yang berisi variabel Airflow di bagian atas kode dapat menyebabkan penundaan penguraian, kelebihan beban database, gagal, dan waktu tunggu DAG habis.

Dalam tutorial ini, Anda akan mendiagnosis contoh DAG dan mempelajari cara memecahkan masalah penjadwalan dan penguraian, meningkatkan penjadwalan DAG, serta mengoptimalkan kode DAG dan konfigurasi lingkungan untuk meningkatkan performa.

Tujuan

Bagian ini mencantumkan beberapa tujuan untuk contoh dalam tutorial ini.

Contoh: Scheduler tidak berfungsi dan latensi yang disebabkan oleh konkurensi tugas yang tinggi

  • Upload contoh DAG yang berjalan beberapa kali secara bersamaan dan mendiagnosis kegagalan fungsi penjadwal dan masalah latensi dengan Cloud Monitoring.

  • Optimalkan kode DAG Anda dengan menggabungkan tugas-tugas dan mengevaluasi dampak performa.

  • Distribusikan tugas secara lebih merata dari waktu ke waktu dan evaluasi performanya yang efektif.

  • Optimalkan konfigurasi Airflow, konfigurasi lingkungan, dan mengevaluasi dampaknya.

Contoh: Error penguraian dan latensi DAG yang disebabkan oleh kode kompleks

  • Mengupload sampel DAG dengan variabel Airflow dan mendiagnosis penguraian dengan Cloud Monitoring.

  • Optimalkan kode DAG dengan menghindari variabel Airflow di level teratas kode dan mengevaluasi dampaknya terhadap waktu penguraian.

  • Mengoptimalkan konfigurasi Airflow, konfigurasi lingkungan, dan mengevaluasi dampaknya terhadap waktu penguraian.

Biaya

Tutorial ini menggunakan komponen Google Cloud yang dapat ditagih berikut:

Setelah menyelesaikan tutorial ini, Anda dapat menghindari penagihan berkelanjutan dengan menghapus resource yang Anda buat. Untuk mengetahui detail selengkapnya, lihat Membersihkan.

Sebelum memulai

Bagian ini menjelaskan tindakan yang diperlukan sebelum Anda memulai tutorial.

Membuat dan mengonfigurasi project

Untuk tutorial ini, Anda memerlukan resource Google Cloud project Anda. Konfigurasikan project dengan cara berikut:

  1. Di konsol Google Cloud, pilih atau buat project:

    Buka Pemilih Project

  2. Pastikan penagihan diaktifkan untuk project Anda. Pelajari cara memeriksa apakah penagihan diaktifkan pada sebuah project.

  3. Pastikan pengguna project Google Cloud Anda memiliki peran berikut untuk membuat resource yang diperlukan:

    • Administrator Objek Penyimpanan dan Lingkungan (roles/composer.environmentAndStorageObjectAdmin)
    • Compute Admin (roles/compute.admin)

Mengaktifkan API untuk project Anda

Aktifkan API Cloud Composer.

Mengaktifkan API

Membuat lingkungan Cloud Composer

Membuat lingkungan Cloud Composer 2.

Sebagai bagian dari upaya menciptakan lingkungan, Anda memberikan Ekstensi Agen Layanan Cloud Composer v2 API (roles/composer.ServiceAgentV2Ext) ke Composer Service Agent menggunakan akun layanan. Cloud Composer menggunakan akun ini untuk menjalankan operasi di project Google Cloud Anda.

Contoh: Scheduler gagal berfungsi dan tugas gagal karena masalah penjadwalan tugas

Contoh ini menunjukkan kerusakan penjadwal proses debug dan latensi yang disebabkan oleh konkurensi tugas tinggi.

Mengupload DAG sampel ke lingkungan Anda

Upload contoh DAG berikut ke lingkungan yang telah dibuat pada langkah sebelumnya. Dalam tutorial ini, DAG ini diberi nama dag_10_tasks_200_seconds_1.

DAG ini memiliki 200 tugas. Setiap tugas menunggu selama 1 detik dan mencetak "Complete!". DAG dipicu secara otomatis setelah diupload. Cloud Composer menjalankan DAG ini 10 kali, dan semua operasi DAG terjadi secara paralel.

import time
from datetime import datetime, timedelta

from airflow import DAG
from airflow.decorators import task


tasks_amount = 200
seconds = 1
minutes = 5

with DAG(
    dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}",
    start_date=datetime(2023, 11, 22, 20, 0),
    end_date=datetime(2023, 11, 22, 20, 49),
    schedule_interval=timedelta(minutes=minutes),
    catchup=True,
) as dag:

    @task
    def create_subtasks(seconds: int) -> None:
        time.sleep(seconds)

    for i in range(tasks_amount):
        create_subtasks(seconds)

Mendiagnosis malfungsi penjadwal dan masalah kegagalan tugas

Setelah DAG selesai berjalan, buka UI Airflow, lalu klik DAG dag_10_tasks_200_seconds_1. Anda akan melihat bahwa total 10 operasi DAG berhasil, dan masing-masing memiliki 200 tugas yang berhasil.

Tinjau log tugas Airflow:

  1. Di Konsol Google Cloud, buka halaman Environments.

    Buka Lingkungan

  2. Pada daftar lingkungan, klik nama lingkungan Anda. Halaman Detail lingkungan akan terbuka.

  3. Buka tab Logs, lalu buka All logs &gt; Log Airflow &gt; Pekerja &gt; Lihat di Logs Explorer.

Pada histogram log, Anda dapat melihat error dan peringatan yang ditunjukkan dengan warna merah dan warna oranye:

Histogram log pekerja Airflow dengan error dan peringatan
    ditunjukkan dengan warna merah dan oranye
Gambar 1. Histogram log pekerja Airflow (klik untuk memperbesar)

Contoh DAG menghasilkan sekitar 130 peringatan dan 60 error. Klik salah satu yang berisi batang kuning dan merah. Anda akan melihat beberapa hal berikut peringatan dan error dalam log:

State of this instance has been externally set to success. Terminating
instance.

Received SIGTERM. Terminating subprocesses.

worker: Warm shutdown (MainProcess).

Log ini mungkin menunjukkan bahwa penggunaan sumber daya melebihi batas dan Worker me-restartnya sendiri.

Jika tugas Airflow disimpan dalam antrean terlalu lama, penjadwal akan menandai sebagai gagal dan up_for_retry, dan akan menjadwalkan ulang sekali lagi untuk dalam proses eksekusi. Salah satu cara untuk mengamati gejala situasi ini adalah dengan melihat dengan jumlah tugas dalam antrean dan jika lonjakan tidak turun dalam waktu sekitar 10 menit, maka kemungkinan akan ada kegagalan tugas (tanpa log).

Tinjau informasi pemantauan:

  1. Buka tab Monitoring lalu pilih Overview.

  2. Tinjau grafik Airflow tasks.

    Grafik tugas Airflow dari waktu ke waktu, yang menunjukkan lonjakan
    jumlah tugas dalam antrean
    Gambar 2. Grafik tugas Airflow (klik untuk memperbesar)

    Di grafik tugas Airflow, terdapat lonjakan antrean tugas yang berlangsung lebih dari 10 menit, yang mungkin berarti sumber daya tidak cukup di lingkungan Anda untuk memproses semua tugas terjadwal.

  3. Tinjau grafik Pekerja aktif:

    Grafik pekerja Airflow yang aktif dari waktu ke waktu menunjukkan bahwa
    jumlah pekerja aktif dinaikkan hingga batas maksimum
    Gambar 3. Grafik pekerja aktif (klik untuk memperbesar)

    Grafik Active worker menunjukkan bahwa DAG memicu penskalaan otomatis hingga batas maksimum yang diizinkan, yaitu tiga pekerja selama menjalankan DAG.

  4. Grafik penggunaan resource dapat menunjukkan kurangnya kapasitas pada pekerja Airflow untuk menjalankan tugas dalam antrean. Pada tab Monitoring, pilih Workers dan tinjau grafik Total worker CPU usage dan Total worker memory usage.

    Grafik penggunaan CPU oleh pekerja Airflow menunjukkan penggunaan CPU
    meningkat hingga batas maksimum
    Gambar 4. Grafik penggunaan CPU total worker (klik untuk memperbesar)
    Grafik penggunaan memori oleh pekerja Airflow menunjukkan penggunaan memori
    meningkat, tetapi tidak mencapai batas maksimum
    Gambar 5. Grafik penggunaan memori total worker (klik untuk memperbesar)

    Grafik menunjukkan bahwa pelaksanaan terlalu banyak tugas secara bersamaan menyebabkan batas CPU tercapai. Sumber daya telah digunakan selama lebih dari 30 lebih lama dari total durasi 200 tugas dalam 10 DAG berjalan satu per satu.

Ini adalah indikator antrean yang terisi dan kurangnya sumber daya untuk memproses semua tugas terjadwal.

Menggabungkan tugas

Kode saat ini membuat banyak DAG dan tugas tanpa resource yang memadai untuk memproses semua tugas secara paralel, yang menyebabkan antrean terisi. Menyimpan tugas dalam antrean terlalu lama dapat menyebabkan tugas dijadwalkan ulang atau gagal. Dalam situasi seperti itu, Anda harus memilih jumlah sampel yang lebih sedikit tugas klasifikasi.

Contoh DAG berikut mengubah jumlah tugas dalam contoh awal dari 200 menjadi 20 dan menambah waktu tunggu dari 1 menjadi 10 detik untuk meniru tugas-tugas terkonsolidasi yang melakukan pekerjaan dalam jumlah yang sama.

Upload contoh DAG berikut ke lingkungan yang Anda buat. Dalam tutorial ini, DAG ini diberi nama dag_10_tasks_20_seconds_10.

import time
from datetime import datetime, timedelta

from airflow import DAG
from airflow.decorators import task


tasks_amount = 20
seconds = 10
minutes = 5

with DAG(
    dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}",
    start_date=datetime(2021, 12, 22, 20, 0),
    end_date=datetime(2021, 12, 22, 20, 49),
    schedule_interval=timedelta(minutes=minutes),
    catchup=True,
) as dag:

    @task
    def create_subtasks(seconds: int) -> None:
        time.sleep(seconds)

    for i in range(tasks_amount):
        create_subtasks(seconds)

Evaluasi dampak tugas yang lebih terkonsolidasi terhadap proses penjadwalan:

  1. Tunggu hingga operasi DAG selesai.

  2. Di UI Airflow, pada halaman DAGs, klik DAG dag_10_tasks_20_seconds_10. Anda akan melihat 10 operasi DAG, yang masing-masing memiliki 20 operasi yang berhasil.

  3. Di Konsol Google Cloud, buka halaman Environments.

    Buka Lingkungan

  4. Pada daftar lingkungan, klik nama lingkungan Anda. Halaman Detail lingkungan akan terbuka.

  5. Buka tab Logs, lalu buka All logs &gt; Log Airflow &gt; Pekerja &gt; Lihat di Logs Explorer.

    Contoh kedua dengan tugas yang lebih konsolidasi menghasilkan sekitar 10 peringatan dan 7 error. Pada histogramnya, Anda dapat membandingkan jumlah kesalahan dan peringatan di contoh awal (nilai sebelumnya) dan contoh kedua contoh (nilai kemudian).

    Histogram log pekerja Airflow dengan error dan peringatan
    menunjukkan penurunan jumlah kesalahan dan peringatan setelah tugas
    konsolidasi
    Gambar 6. Histogram log pekerja Airflow setelah tugas digabungkan (klik untuk memperbesar)

    Ketika membandingkan contoh pertama dengan contoh yang lebih terkonsolidasi, Anda dapat melihat bahwa ada kesalahan dan peringatan yang jauh lebih sedikit pada contoh. Namun, error yang sama terkait dengan warm shutdown masih muncul di log karena beban daya yang berlebihan.

  6. Pada tab Monitoring, pilih Workers dan tinjau grafik.

    Saat Anda membandingkan grafik Airflow tasks untuk contoh pertama (sebelumnya dengan grafik untuk contoh kedua dengan tugas yang lebih terkonsolidasi, Anda dapat melihat bahwa lonjakan tugas antre berlangsung selama periode yang lebih singkat ketika tugas-tugas itu lebih dikonsolidasikan. Namun, studi kasus berlangsung hampir 10 menit, yang masih kurang optimal.

    Grafik tugas Airflow dari waktu ke waktu
menunjukkan bahwa lonjakan
    Tugas Airflow berlangsung dalam jangka waktu yang lebih singkat dari sebelumnya.
    Gambar 7. Grafik tugas Airflow setelah tugas digabungkan (klik untuk memperbesar)

    Pada grafik Pekerja aktif, Anda dapat melihat contoh pertama (di sebelah kiri grafik) menggunakan sumber daya dalam waktu yang lebih lama yang kedua, meskipun kedua contoh tersebut meniru jumlah Anda.

    Grafik pekerja Airflow yang aktif dari waktu ke waktu menunjukkan bahwa
    jumlah pekerja aktif meningkat untuk jangka waktu yang lebih singkat
    dari sebelumnya.
    Gambar 8. Grafik pekerja aktif setelah tugas digabungkan (klik untuk memperbesar)

    Tinjau grafik konsumsi resource pekerja. Meskipun perbedaan antara sumber daya yang digunakan dalam contoh dengan tugas yang lebih terkonsolidasi dan contoh awalnya cukup signifikan, penggunaan CPU masih melonjak hingga 70% dari batas.

    Grafik penggunaan CPU oleh pekerja Airflow menunjukkan penggunaan CPU
    meningkat hingga 70% dari batas maksimum
    Gambar 9. Grafik penggunaan CPU total worker setelah tugas digabungkan (klik untuk memperbesar)
    Grafik penggunaan memori oleh pekerja Airflow menunjukkan peningkatan penggunaan memori, tetapi tidak mencapai batas maksimum
    Gambar 10. Grafik penggunaan memori total worker setelah tugas digabungkan (klik untuk memperbesar)

Mendistribusikan tugas secara lebih merata dari waktu ke waktu

Terlalu banyak tugas serentak menyebabkan antrian terisi, yang menyebabkan tugas yang terjebak dalam antrian atau dijadwalkan ulang. Pada langkah sebelumnya, Anda menurunkan jumlah tugas dengan menggabungkan tugas-tugas tersebut, tetapi output log dan pemantauan menunjukkan bahwa jumlah tugas serentak masih kurang optimal.

Anda dapat mengontrol jumlah tugas serentak yang dijalankan dengan menerapkan jadwal atau menetapkan batas jumlah tugas yang dapat dijalankan secara bersamaan.

Dalam tutorial ini, Anda mendistribusikan tugas secara lebih merata dari waktu ke waktu dengan menambahkan Parameter tingkat DAG ke DAG dag_10_tasks_20_seconds_10:

  1. Tambahkan argumen max_active_runs=1 ke pengelola konteks DAG. Argumen ini menetapkan batas hanya satu instance dari DAG yang dijalankan dalam momen tertentu.

  2. Tambahkan argumen max_active_tasks=5 ke pengelola konteks DAG. Argumen ini mengontrol jumlah maksimum instance tugas yang dapat berjalan serentak di setiap DAG.

Upload contoh DAG berikut ke lingkungan yang Anda buat. Dalam tutorial ini, DAG ini diberi nama dag_10_tasks_20_seconds_10_scheduled.py.

import time
from datetime import datetime, timedelta

from airflow import DAG
from airflow.decorators import task


tasks_amount = 20
seconds = 10
minutes = 5
active_runs = 1
active_tasks = 5


with DAG(
    dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}_runs_{active_runs}_tasks_{active_tasks}",
    start_date=datetime(2021, 12, 22, 20, 0),
    end_date=datetime(2021, 12, 22, 20, 49),
    schedule_interval=timedelta(minutes=minutes),
    max_active_runs=active_runs,
    max_active_tasks=active_tasks,
    catchup=True,
) as dag:

    @task
    def create_subtasks(seconds: int) -> None:
        time.sleep(seconds)

    for i in range(tasks_amount):
        create_subtasks(seconds)

Mengevaluasi dampak distribusi tugas dari waktu ke waktu terhadap proses penjadwalan:

  1. Tunggu hingga operasi DAG selesai.

  2. Di Konsol Google Cloud, buka halaman Environments.

    Buka Lingkungan

  3. Pada daftar lingkungan, klik nama lingkungan Anda. Halaman Detail lingkungan akan terbuka.

  4. Buka tab Logs, lalu buka All logs &gt; Log Airflow &gt; Pekerja &gt; Lihat di Logs Explorer.

  5. Di histogram, Anda bisa melihat bahwa DAG ketiga dengan tugas aktif dan operasi tidak menghasilkan peringatan atau kesalahan apa pun dan log terlihat lebih merata dibandingkan dengan nilai sebelumnya.

    Histogram log pekerja Airflow dengan error dan peringatan
    tidak menunjukkan error atau peringatan setelah tugas digabungkan dan
    yang didistribusikan dari waktu ke waktu.
    Gambar 11. Histogram log pekerja Airflow setelah tugas-tugas tersebut digabungkan dan didistribusikan dari waktu ke waktu (klik untuk memperbesar)

Tugas dalam contoh dag_10_tasks_20_seconds_10_scheduled yang memiliki jumlah tugas aktif dan operasi yang terbatas tidak menyebabkan tekanan sumber daya karena tugas-tugas itu diantrekan secara merata.

Setelah melakukan langkah yang dijelaskan, Anda mengoptimalkan penggunaan resource dengan menggabungkan tugas-tugas kecil dan mendistribusikannya secara lebih merata dari waktu ke waktu.

Mengoptimalkan konfigurasi lingkungan

Anda dapat menyesuaikan konfigurasi lingkungan untuk memastikan selalu ada kapasitas dalam pekerja Airflow untuk menjalankan tugas dalam antrean.

Jumlah pekerja dan konkurensi pekerja

Anda dapat menyesuaikan jumlah maksimum pekerja agar Cloud Composer otomatis menskalakan lingkungan Anda dalam batas yang ditetapkan.

Parameter [celery]worker_concurrency menentukan jumlah maksimum tugas satu pekerja dapat mengambil dari antrean tugas. Mengubah parameter ini menyesuaikan jumlah tugas yang dapat dijalankan oleh satu pekerja pada saat yang sama. Anda dapat mengubah opsi konfigurasi Airflow ini dengan menggantikannya. Secara default, konkurensi pekerja disetel ke minimum dari hal berikut: 32, 12 * worker_CPU, 8 * worker_memory, yang berarti itu tergantung pada batas resource pekerja. Lihat Mengoptimalkan lingkungan untuk informasi selengkapnya tentang default nilai konkurensi pekerja.

Jumlah pekerja dan pekerja konkurensi yang bekerja dalam kombinasi masing-masing lainnya, dan performa lingkungan Anda sangat bergantung pada kedua parameter tersebut. Anda dapat menggunakan pertimbangan berikut untuk memilih kombinasi yang tepat:

  • Beberapa tugas cepat yang berjalan secara paralel. Anda bisa menambah pekerja konkurensi ketika ada tugas yang menunggu dalam antrean, dan worker Anda menggunakan persentase CPU dan memori pada saat yang sama. Namun, pada keadaan tertentu antrean mungkin tidak pernah terisi, sehingga menyebabkan penskalaan otomatis tidak pernah dipicu. Jika tugas kecil menyelesaikan eksekusi pada saat pekerja baru sudah siap, pekerja yang ada dapat mengambil tugas yang tersisa, dan tidak akan ada tugas untuk pekerja yang baru dibuat.

    Dalam situasi ini, sebaiknya meningkatkan jumlah minimum worker dan meningkatkan konkurensi pekerja untuk menghindari penskalaan yang berlebihan.

  • Beberapa tugas panjang yang berjalan secara paralel. Konkurensi pekerja tinggi mencegah sistem menskalakan jumlah pekerja. Jika beberapa tugas membutuhkan waktu yang lama untuk menyelesaikannya, dan tenaga kerja yang tinggi konkurensi dapat menyebabkan antrean tidak pernah terisi dan semua tugas diambil oleh hanya satu pekerja, yang menyebabkan masalah kinerja. Di situasi, sebaiknya meningkatkan jumlah maksimum worker dan mengurangi konkurensi pekerja.

Pentingnya paralelisme

Penjadwal Airflow mengontrol penjadwalan operasi DAG dan tugas individual dari DAG. Opsi konfigurasi Airflow [core]parallelism mengontrol jumlah tugas yang dapat diantrekan penjadwal Airflow di antrean eksekutor setelah dependensi untuk tugas-tugas ini terpenuhi.

Paralelisme adalah mekanisme perlindungan Airflow yang menentukan jumlah tugas dapat dijalankan secara bersamaan untuk setiap penjadwal, terlepas dari jumlah pekerja. Nilai paralelisme, dikalikan dengan jumlah penjadwal di cluster Anda, adalah jumlah maksimum instance tugas yang dapat diantrekan oleh lingkungan Anda.

Biasanya, [core]parallelism ditetapkan sebagai hasil dari jumlah maksimum pekerja dan [celery]worker_concurrency. Hal ini juga dipengaruhi oleh pool. Anda dapat mengubah opsi konfigurasi Airflow ini dengan menggantikannya. Untuk informasi selengkapnya tentang menyesuaikan Airflow terkait penskalaan, lihat Menskalakan konfigurasi Airflow.

Menemukan konfigurasi lingkungan yang optimal

Cara yang disarankan untuk memperbaiki masalah penjadwalan adalah mengonsolidasikan tugas-tugas kecil ke dalam tugas yang lebih besar dan mendistribusikan tugas secara lebih merata dari waktu ke waktu. Selain dengan mengoptimalkan kode DAG, Anda juga dapat mengoptimalkan konfigurasi lingkungan kapasitas yang cukup untuk menjalankan beberapa tugas secara serentak.

Misalnya, anggap Anda menggabungkan beberapa tugas di DAG sebanyak mungkin tetapi membatasi tugas aktif untuk mendistribusikannya secara lebih merata di waktu bukanlah solusi pilihan untuk kasus penggunaan Anda.

Anda dapat menyesuaikan paralelisme, jumlah pekerja, dan konkurensi pekerja parameter untuk menjalankan DAG dag_10_tasks_20_seconds_10 tanpa membatasi parameter aktif tugas klasifikasi. Dalam contoh ini, DAG berjalan 10 kali dan setiap proses berisi 20 tugas kecil. Jika Anda ingin menjalankan semuanya secara bersamaan:

  • Anda akan membutuhkan ukuran lingkungan yang lebih besar, karena ukuran ini mengontrol performa parameter infrastruktur Cloud Composer terkelola di lingkungan Anda.

  • Pekerja airflow harus dapat menjalankan 20 tugas secara bersamaan, yang berarti Anda perlu menetapkan konkurensi pekerja ke 20.

  • Pekerja memerlukan CPU dan memori yang cukup untuk menangani semua tugas. Pekerja konkurensi dipengaruhi oleh CPU dan memori pekerja, oleh karena itu, Anda memerlukan CPU minimal worker_concurrency / 12 dan least worker_concurrency / 8 dalam memori.

  • Anda perlu meningkatkan paralelisme agar cocok dengan konkurensi pekerja yang lebih tinggi. Agar pekerja dapat mengambil 20 tugas dari antrian, penjadwal akan perlu menjadwalkan 20 tugas itu terlebih dahulu.

Sesuaikan konfigurasi lingkungan Anda dengan cara berikut:

  1. Di Konsol Google Cloud, buka halaman Environments.

    Buka Lingkungan

  2. Pada daftar lingkungan, klik nama lingkungan Anda. Halaman Detail lingkungan akan terbuka.

  3. Buka tab Konfigurasi lingkungan.

  4. Temukan konfigurasi Resource &gt; Workloads dan Klik Edit.

  5. Di bagian Worker, di kolom Memory, tentukan memori baru untuk pekerja Airflow. Dalam tutorial ini, gunakan 4 GB.

  6. Di kolom CPU, tentukan batas CPU baru untuk pekerja Airflow. Di sini menggunakan 2 vCPU.

  7. Simpan perubahan dan tunggu beberapa menit agar pekerja Airflow Anda dapat mulai ulang.

Selanjutnya, ganti opsi konfigurasi Airflow konkurensi dan paralelisme pekerja:

  1. Buka tab Airflow Configuration Overrides.

  2. Klik Edit, lalu klik Add Airflow Configuration Override.

  3. Ganti konfigurasi parralelisme:

    Bagian Kunci Nilai
    core parallelism 20
  4. Klik Add Airflow Configuration Override dan ganti worker konfigurasi konkurensi:

    Bagian Kunci Nilai
    celery worker_concurrency 20
  5. Klik Simpan dan tunggu hingga lingkungan memperbarui konfigurasinya.

Picu lagi contoh DAG yang sama dengan konfigurasi yang disesuaikan:

  1. Di UI Airflow, buka halaman DAGs.

  2. Temukan DAG dag_10_tasks_20_seconds_10 dan hapus.

    Setelah DAG dihapus, Airflow akan memeriksa folder DAG di bucket lingkungan dan otomatis menjalankan DAG lagi.

Setelah operasi DAG selesai, tinjau lagi histogram Log. Pada diagram, Anda dapat melihat bahwa contoh dag_10_tasks_20_seconds_10 dengan tugas gabungan tidak menghasilkan kesalahan dan peringatan apa pun saat dijalankan dengan konfigurasi lingkungan yang disesuaikan. Membandingkan hasilnya dengan data sebelumnya di diagram, di mana contoh yang sama menghasilkan error dan peringatan saat berjalan dengan konfigurasi lingkungan default tge.

Histogram log pekerja Airflow dengan error dan peringatan
        tidak menampilkan error dan peringatan setelah konfigurasi lingkungan
        disesuaikan
Gambar 12. Histogram log pekerja Airflow setelah konfigurasi lingkungan disesuaikan (klik untuk memperbesar)

Konfigurasi lingkungan dan konfigurasi Airflow memainkan peran penting dalam penjadwalan tugas, namun, tidak mungkin meningkatkan konfigurasi melampaui batas tertentu.

Sebaiknya optimalkan kode DAG, gabungkan tugas, dan gunakan untuk performa dan efisiensi yang optimal.

Contoh: Error penguraian dan latensi DAG karena kode DAG yang kompleks

Dalam contoh ini, Anda menyelidiki latensi penguraian contoh DAG yang meniru kelebihan variabel Airflow.

Membuat variabel Airflow baru

Sebelum mengupload kode contoh, buat variabel Airflow baru.

  1. Di Konsol Google Cloud, buka halaman Environments.

    Buka Lingkungan

  2. Di kolom server web Airflow, ikuti link Airflow untuk lingkungan fleksibel App Engine.

  3. Buka Admin &gt; Variabel &gt; Tambahkan data baru.

  4. Tetapkan nilai berikut:

    • kunci: example_var
    • val: test_airflow_variable

Mengupload DAG sampel ke lingkungan Anda

Upload contoh DAG berikut ke lingkungan yang telah dibuat pada langkah sebelumnya. Dalam tutorial ini, DAG ini diberi nama dag_for_loop_airflow_variable.

DAG ini berisi loop for yang berjalan 1.000 kali dan meniru kelebihan Variabel Airflow. Setiap iterasi membaca variabel example_var dan akan menghasilkan tugas. Setiap tugas berisi satu perintah yang mencetak alamat dengan sejumlah nilai.

from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.models import Variable

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 2, 17),
    'retries': 0
}

dag = DAG(
    'dag_for_loop_airflow_variable',
    default_args=default_args,
    catchup=False,
    schedule_interval="@daily"
)

for i in range(1000):
    a = Variable.get('example_var', 'N/A')
    task = BashOperator(
        task_id=f'task_{i}',
        bash_command='echo variable foo=${foo_env}',
        dag=dag,
        env={'foo_env': a}
    )

Mendiagnosis masalah penguraian

Waktu penguraian DAG adalah jumlah waktu yang diperlukan penjadwal Airflow untuk membaca file DAG dan mengurainya. Sebelum penjadwal Airflow dapat menjadwalkan tugas apa pun dari DAG, penjadwal harus mengurai file DAG untuk menemukan struktur DAG dan tugas yang ditentukan.

Jika DAG membutuhkan waktu lama untuk diurai, ini akan menghabiskan kapasitas penjadwal dan dapat mengurangi performa operasi DAG.

Untuk memantau waktu penguraian DAG:

  1. Jalankan dags report perintah Airflow CLI di gcloud CLI untuk melihat waktu penguraian untuk semua DAG Anda:

    gcloud composer environments run ENVIRONMENT_NAME \
        --location LOCATION \
        dags report
    

    Ganti kode berikut:

    • ENVIRONMENT_NAME: nama lingkungan Anda.
    • LOCATION: wilayah tempat lingkungan berada.
  2. Di output perintah, cari nilai durasi untuk DAG dag_for_loop_airflow_variables. Nilai yang besar mungkin menunjukkan bahwa DAG ini tidak diimplementasikan secara optimal. Jika memiliki beberapa DAG, Dari tabel output, Anda dapat mengidentifikasi DAG yang memiliki waktu penguraian yang lama.

    Contoh:

    file                | duration       | dag_num | task_num | dags
    ====================+================+=========+==========+=====================
    /dag_for_loop_airfl | 0:00:14.773594 | 1       | 1000     | dag_for_loop_airflow
    ow_variable.py      |                |         |          | _variable
    /airflow_monitoring | 0:00:00.003035 | 1       | 1        | airflow_monitoring
    .py
    
    
  3. Periksa waktu penguraian DAG di Konsol Google Cloud:

    1. Di Konsol Google Cloud, buka halaman Environments.

    Buka Lingkungan

  4. Pada daftar lingkungan, klik nama lingkungan Anda. Halaman Detail lingkungan akan terbuka.

  5. Buka tab Logs, lalu buka All logs &gt; Pengelola prosesor DAG.

  6. Tinjau log dag-processor-manager dan identifikasi kemungkinan masalah.

    Entri log untuk contoh DAG menunjukkan bahwa waktu penguraian DAG adalah 46,3 detik
    Gambar 13. Log pengelola prosesor DAG menampilkan DAG waktu penguraian (klik untuk memperbesar)

Jika total waktu penguraian DAG melebihi sekitar 10 detik, penjadwal Anda mungkin kelebihan beban dengan penguraian DAG dan tidak dapat menjalankan DAG secara efektif.

Mengoptimalkan kode DAG

Penting direkomendasikan untuk menghindari "tingkat tinggi" yang tidak perlu kode Python di DAG. DAG dengan banyak impor, variabel, dan fungsi di luar DAG memperkenalkan penguraian yang lebih besar untuk scheduler Airflow. Hal ini akan mengurangi performa dan skalabilitas Cloud Composer dan Airflow. Kelebihan pembacaan variabel Airflow menyebabkan waktu penguraian yang lama dan pemuatan database yang tinggi. Jika kode ini ada di DAG fungsi ini dieksekusi pada setiap detak jantung penjadwal, yang mungkin lambat.

Kolom template Airflow memungkinkan Anda menyertakan nilai dari Airflow variabel dan template Jinja ke DAG. Hal ini mencegah tindakan yang tidak diperlukan dan menjalankan fungsi selama detak jantung scheduler.

Untuk menerapkan contoh DAG dengan cara yang lebih baik, hindari penggunaan variabel Airflow di kode Python tingkat atas dari DAG. Sebagai gantinya, teruskan variabel Airflow ke operator akan menggunakan template Jinja, yang akan menunda pembacaan nilai hingga dalam pelaksanaan tugas.

Upload versi baru DAG sampel ke lingkungan fleksibel App Engine. Dalam tutorial ini, DAG ini diberi nama dag_for_loop_airflow_variable_optimized.

from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 2, 17),
    'retries': 0
}

dag = DAG(
    'dag_for_loop_airflow_variable_optimized',
    default_args=default_args,
    catchup=False,
    schedule_interval='@daily'
)

for i in range(1000):
    task = BashOperator(
        task_id=f'bash_use_variable_good_{i}',
        bash_command='echo variable foo=${foo_env}',
        dag=dag,
        env={'foo_env': '{{ var.value.get("example_var") }}'},
    )

Periksa waktu penguraian DAG baru:

  1. Tunggu hingga operasi DAG selesai.

  2. Jalankan perintah dags report lagi untuk melihat waktu penguraian untuk semua DAG:

    file                | duration       | dag_num | task_num | dags
    ====================+================+=========+==========+=====================
    /dag_for_loop_airfl | 0:00:37.000369 | 1       | 1000     | dag_for_loop_airflow
    ow_variable.py      |                |         |          | _variable
    /dag_for_loop_airfl | 0:00:01.109457 | 1       | 1000     | dag_for_loop_airflow
    ow_variable_optimiz |                |         |          | _variable_optimized
    ed.py               |                |         |          |
    /airflow_monitoring | 0:00:00.040510 | 1       | 1        | airflow_monitoring
    .py                 |                |         |          |
    
  3. Tinjau dag-processor-manager log lagi dan menganalisis durasi penguraian.

    Entri log untuk contoh DAG menunjukkan bahwa waktu penguraian DAG adalah 4,21
    detik
    Gambar 14. Log pengelola prosesor DAG menampilkan DAG waktu penguraian setelah kode DAG dioptimalkan (klik untuk memperbesar)

Dengan mengganti variabel lingkungan dengan template Airflow, Anda menyederhanakan kode DAG dan mengurangi latensi penguraian sekitar sepuluh kali lipat.

Mengoptimalkan konfigurasi lingkungan Airflow

Penjadwal Airflow terus mencoba memicu tugas baru dan mengurai semua DAG dalam bucket lingkungan Anda. Jika DAG memiliki waktu penguraian yang lama dan Scheduler menghabiskan banyak resource, Anda dapat mengoptimalkan scheduler Airflow konfigurasi agar penjadwal dapat menggunakan sumber daya secara lebih efisien.

Dalam tutorial ini, file DAG memerlukan banyak waktu untuk mengurai dan mengurai siklus tumpang tindih, yang kemudian menghabiskan kapasitas penjadwal. Dalam contoh ini, contoh pertama DAG membutuhkan waktu lebih dari 5 detik untuk diurai, jadi Anda akan mengonfigurasi penjadwal untuk berjalan lebih jarang untuk menggunakan sumber daya dengan lebih efisien. Anda akan mengganti scheduler_heartbeat_sec Opsi konfigurasi Airflow. Konfigurasi ini menentukan seberapa sering penjadwal harus berjalan (dalam detik). Secara default, nilai ditetapkan ke 5 detik. Anda dapat mengubah opsi konfigurasi Airflow ini dengan menggantikannya.

Ganti opsi konfigurasi Airflow scheduler_heartbeat_sec:

  1. Di Konsol Google Cloud, buka halaman Environments.

    Buka Lingkungan

  2. Pada daftar lingkungan, klik nama lingkungan Anda. Halaman Detail lingkungan akan terbuka.

  3. Buka tab Airflow Configuration Overrides.

  4. Klik Edit, lalu klik Add Airflow Configuration Override.

  5. Ganti opsi konfigurasi Airflow:

    Bagian Kunci Nilai
    scheduler scheduler_heartbeat_sec 10
  6. Klik Simpan dan tunggu hingga lingkungan memperbarui konfigurasinya.

Periksa metrik penjadwal:

  1. Buka tab Monitoring lalu pilih Scheduler.

  2. Pada grafik Schedulerbeat, klik tombol More options (tiga titik), lalu klik View in the Metrics Explorer.

Grafik detak jantung penjadwal menunjukkan bahwa detak jantung lebih jarang terjadi
Gambar 15. Grafik detak jantung penjadwal (klik untuk memperbesar)

Pada grafik, Anda akan melihat penjadwal berjalan dua kali lebih jarang setelah Anda mengubah konfigurasi {i>default<i} dari 5 detik menjadi 10 detik. Dengan mengurangi frekuensi detak jantung, Anda memastikan bahwa penjadwal tidak dimulai sedang berlangsung saat siklus penguraian sebelumnya sedang berlangsung dan jadwal kapasitas resource tidak habis.

Memberikan lebih banyak sumber daya ke penjadwal

Di Cloud Composer 2, Anda dapat mengalokasikan lebih banyak resource CPU dan memori ke {i>scheduler<i} (penjadwal). Dengan cara ini, Anda dapat meningkatkan kinerja penjadwal dan mempercepat waktu penguraian untuk DAG Anda.

Alokasikan CPU dan memori tambahan ke penjadwal:

  1. Di Konsol Google Cloud, buka halaman Environments.

    Buka Lingkungan

  2. Pada daftar lingkungan, klik nama lingkungan Anda. Halaman Detail lingkungan akan terbuka.

  3. Buka tab Konfigurasi lingkungan.

  4. Temukan konfigurasi Resource &gt; Workloads dan Klik Edit.

  5. Di bagian Scheduler, di kolom Memory, tentukan memori baru batas tersebut. Dalam tutorial ini, gunakan 4 GB.

  6. Di kolom CPU, tentukan batas CPU baru. Di sini menggunakan 2 vCPU.

  7. Simpan perubahan dan tunggu beberapa menit agar penjadwal Airflow Anda mulai ulang.

  8. Buka tab Logs, lalu buka All logs &gt; Pengelola prosesor DAG.

  9. Tinjau log dag-processor-manager dan bandingkan durasi penguraian untuk contoh DAG:

    Entri log untuk contoh DAG menunjukkan bahwa waktu penguraian DAG untuk DAG yang dioptimalkan adalah 1,5 detik. Untuk DAG yang tidak dioptimalkan, waktu penguraian adalah 28,71 detik
    Gambar 16. Log pengelola prosesor DAG menampilkan DAG mengurai waktu setelah lebih banyak sumber daya ditugaskan ke penjadwal (klik untuk memperbesar)

Dengan menetapkan lebih banyak sumber daya ke {i>scheduler<i}, Anda meningkatkan dan mengurangi latensi penguraian secara signifikan dibandingkan dengan default konfigurasi lingkungannya sendiri. Dengan sumber daya yang lebih banyak, penjadwal dapat mengurai DAG lebih cepat, tetapi biaya yang terkait dengan Cloud Composer sumber daya juga akan meningkat. Selain itu, tidak mungkin meningkatkan resource di luar batas tertentu.

Sebaiknya alokasikan resource hanya setelah kode DAG yang memungkinkan dan Pengoptimalan konfigurasi Airflow telah diterapkan.

Pembersihan

Agar tidak menimbulkan biaya ke akun Google Cloud Anda untuk resource yang digunakan dalam tutorial ini, hapus project yang berisi resource tersebut atau mempertahankan proyek dan menghapus sumber daya satu per satu.

Menghapus project

  1. Di konsol Google Cloud, buka halaman Manage resource.

    Buka Manage resource

  2. Pada daftar project, pilih project yang ingin Anda hapus, lalu klik Delete.
  3. Pada dialog, ketik project ID, lalu klik Shut down untuk menghapus project.

Menghapus resource satu per satu

Jika Anda berencana mempelajari beberapa tutorial dan panduan memulai, menggunakan kembali project dapat membantu Anda agar tidak melampaui batas kuota project.

Hapus lingkungan Cloud Composer. Anda juga menghapus bucket lingkungan selama prosedur ini.

Langkah selanjutnya