Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
Tutorial ini akan memandu Anda mendiagnosis dan memecahkan masalah penjadwalan tugas dan penguraian yang menyebabkan penjadwal tidak berfungsi, error penguraian dan latensi, serta kegagalan tugas.
Pengantar
Penjadwal Airflow terutama dipengaruhi oleh dua faktor: penjadwalan tugas dan penguraian DAG. Masalah pada salah satu faktor tersebut dapat berdampak negatif pada kesehatan dan performa lingkungan.
Terkadang terlalu banyak tugas yang dijadwalkan secara bersamaan. Dalam situasi ini, antrean terisi penuh, dan tugas tetap dalam status "dijadwalkan" atau dijadwalkan ulang setelah diantrekan, yang dapat menyebabkan kegagalan tugas dan latensi performa.
Masalah umum lainnya adalah latensi penguraian dan error yang disebabkan oleh kompleksitas kode DAG. Misalnya, kode DAG yang berisi variabel Airflow di level teratas kode dapat menyebabkan penundaan penguraian, kelebihan beban database, kegagalan penjadwalan, dan waktu tunggu DAG habis.
Dalam tutorial ini, Anda akan mendiagnosis contoh DAG dan mempelajari cara memecahkan masalah penjadwalan dan penguraian, meningkatkan penjadwalan DAG, dan mengoptimalkan kode DAG dan konfigurasi lingkungan untuk meningkatkan performa.
Tujuan
Bagian ini mencantumkan tujuan untuk contoh dalam tutorial ini.
Contoh: Penjadwal tidak berfungsi dan latensi yang disebabkan oleh konkurensi tugas yang tinggi
Upload contoh DAG yang berjalan beberapa kali secara bersamaan dan diagnostik masalah latensi dan gangguan penjadwal dengan Cloud Monitoring.
Optimalkan kode DAG Anda dengan menggabungkan tugas dan mengevaluasi dampak performa.
Bagikan tugas secara lebih merata dari waktu ke waktu dan evaluasi dampak performa.
Optimalkan konfigurasi Airflow dan konfigurasi lingkungan Anda, lalu nilai dampaknya.
Contoh: Error penguraian DAG dan latensi yang disebabkan oleh kode yang kompleks
Upload contoh DAG dengan variabel Airflow dan diagnostik masalah penguraian dengan Cloud Monitoring.
Optimalkan kode DAG dengan menghindari variabel Airflow di level teratas kode dan evaluasi dampaknya terhadap waktu penguraian.
Optimalkan konfigurasi Airflow dan konfigurasi lingkungan serta evaluasi dampaknya terhadap waktu penguraian.
Biaya
Tutorial ini menggunakan komponen Google Cloudyang dapat ditagih berikut:
Setelah menyelesaikan tutorial ini, Anda dapat menghindari penagihan berkelanjutan dengan menghapus resource yang Anda buat. Untuk detail selengkapnya, lihat Pembersihan.
Sebelum memulai
Bagian ini menjelaskan tindakan yang diperlukan sebelum Anda memulai tutorial.
Membuat dan mengonfigurasi project
Untuk tutorial ini, Anda memerlukan project Google Cloud. Konfigurasikan project dengan cara berikut:
Di konsol Google Cloud, pilih atau buat project:
Pastikan penagihan diaktifkan untuk project Anda. Pelajari cara memeriksa apakah penagihan diaktifkan pada project.
Pastikan Google Cloud pengguna project Anda memiliki peran berikut untuk membuat resource yang diperlukan:
- Environment and Storage Object Administrator
(
roles/composer.environmentAndStorageObjectAdmin
) - Compute Admin (
roles/compute.admin
)
- Environment and Storage Object Administrator
(
Mengaktifkan API untuk project Anda
Enable the Cloud Composer API.
Membuat lingkungan Cloud Composer
Buat lingkungan Cloud Composer 2.
Sebagai bagian dari pembuatan lingkungan, Anda memberikan peran Cloud Composer v2 API Service Agent Extension (roles/composer.ServiceAgentV2Ext
) ke akun Composer Service Agent. Cloud Composer menggunakan akun ini untuk melakukan operasi
di project Google Cloud Anda.
Contoh: Penjadwal tidak berfungsi dan tugas gagal karena masalah penjadwalan tugas
Contoh ini menunjukkan penjadwal proses debug yang tidak berfungsi dan latensi yang disebabkan oleh konkurensi tugas yang tinggi.
Mengupload DAG contoh ke lingkungan Anda
Upload contoh DAG berikut ke lingkungan
yang Anda buat di langkah sebelumnya. Dalam tutorial ini, DAG ini diberi nama
dag_10_tasks_200_seconds_1
.
DAG ini memiliki 200 tugas. Setiap tugas menunggu selama 1 detik dan mencetak "Selesai!". DAG dipicu secara otomatis setelah diupload. Cloud Composer menjalankan DAG ini 10 kali, dan semua DAG berjalan secara paralel.
import time
from datetime import datetime, timedelta
from airflow import DAG
from airflow.decorators import task
tasks_amount = 200
seconds = 1
minutes = 5
with DAG(
dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}",
start_date=datetime(2023, 11, 22, 20, 0),
end_date=datetime(2023, 11, 22, 20, 49),
schedule_interval=timedelta(minutes=minutes),
catchup=True,
) as dag:
@task
def create_subtasks(seconds: int) -> None:
time.sleep(seconds)
for i in range(tasks_amount):
create_subtasks(seconds)
Mendiagnosis masalah kegagalan tugas dan malfungsi penjadwal
Setelah DAG selesai berjalan, buka UI Airflow, lalu klik
dag_10_tasks_200_seconds_1
DAG. Anda akan melihat bahwa total 10 operasi DAG berhasil, dan masing-masing memiliki 200 tugas yang berhasil.
Tinjau log tugas Airflow:
Di konsol Google Cloud, buka halaman Environments.
Di daftar lingkungan, klik nama lingkungan Anda. Halaman Environment details akan terbuka.
Buka tab Logs, lalu buka All logs > Airflow logs > Workers > View in Logs Explorer.
Pada histogram log, Anda dapat melihat error dan peringatan yang ditunjukkan dengan warna merah dan oranye:

Contoh DAG menghasilkan sekitar 130 peringatan dan 60 error. Klik kolom apa pun yang berisi batang kuning dan merah. Anda akan melihat beberapa peringatan dan error berikut dalam log:
State of this instance has been externally set to success. Terminating
instance.
Received SIGTERM. Terminating subprocesses.
worker: Warm shutdown (MainProcess).
Log ini mungkin menunjukkan bahwa penggunaan resource melampaui batas dan pekerja memulai ulang dirinya sendiri.
Jika tugas Airflow disimpan dalam antrean terlalu lama, penjadwal akan menandainya sebagai gagal dan up_for_retry, lalu akan menjadwalkannya ulang lagi untuk eksekusi. Salah satu cara untuk mengamati gejala situasi ini adalah dengan melihat diagram yang menampilkan jumlah tugas dalam antrean dan jika lonjakan dalam diagram ini tidak menurun dalam waktu sekitar 10 menit, kemungkinan akan ada kegagalan tugas (tanpa log).
Tinjau informasi pemantauan:
Buka tab Monitoring, lalu pilih Overview.
Tinjau grafik Tugas Airflow.
Gambar 2. Grafik tugas Airflow (klik untuk memperbesar) Dalam grafik tugas Airflow, ada lonjakan tugas dalam antrean yang berlangsung selama lebih dari 10 menit, yang mungkin berarti tidak ada cukup resource di lingkungan Anda untuk memproses semua tugas terjadwal.
Tinjau grafik Pekerja aktif:
Gambar 3. Grafik pekerja aktif (klik untuk memperbesar) Grafik Pekerja aktif menunjukkan bahwa DAG memicu penskalaan otomatis ke batas maksimum tiga pekerja yang diizinkan selama DAG berjalan.
Grafik penggunaan resource dapat menunjukkan kurangnya kapasitas di pekerja Airflow untuk menjalankan tugas yang diantrekan. Di tab Monitoring, pilih Workers dan tinjau grafik Total worker CPU usage dan Total worker memory usage.
Gambar 4. Grafik penggunaan CPU pekerja total (klik untuk memperbesar) Gambar 5. Total grafik penggunaan memori pekerja (klik untuk memperbesar) Grafik menunjukkan bahwa eksekusi terlalu banyak tugas secara bersamaan mengakibatkan batas CPU tercapai. Resource telah digunakan selama lebih dari 30 menit, yang bahkan lebih lama dari total durasi 200 tugas dalam 10 DAG yang berjalan satu per satu.
Ini adalah indikator antrean yang terisi penuh dan kurangnya resource untuk memproses semua tugas terjadwal.
Menggabungkan tugas
Kode saat ini membuat banyak DAG dan tugas tanpa resource yang memadai untuk memproses semua tugas secara paralel, yang menyebabkan antrean terisi. Membiarkan tugas dalam antrean terlalu lama dapat menyebabkan tugas dijadwalkan ulang atau gagal. Dalam situasi seperti itu, Anda harus memilih tugas konsolidasi yang lebih sedikit.
Contoh DAG berikut mengubah jumlah tugas dalam contoh awal dari 200 menjadi 20 dan meningkatkan waktu tunggu dari 1 menjadi 10 detik untuk meniru lebih banyak tugas gabungan yang melakukan jumlah pekerjaan yang sama.
Upload contoh DAG berikut ke lingkungan
yang Anda buat. Dalam tutorial ini, DAG ini diberi nama
dag_10_tasks_20_seconds_10
.
import time
from datetime import datetime, timedelta
from airflow import DAG
from airflow.decorators import task
tasks_amount = 20
seconds = 10
minutes = 5
with DAG(
dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}",
start_date=datetime(2021, 12, 22, 20, 0),
end_date=datetime(2021, 12, 22, 20, 49),
schedule_interval=timedelta(minutes=minutes),
catchup=True,
) as dag:
@task
def create_subtasks(seconds: int) -> None:
time.sleep(seconds)
for i in range(tasks_amount):
create_subtasks(seconds)
Evaluasi dampak tugas yang lebih terpadu terhadap proses penjadwalan:
Tunggu hingga DAG selesai dijalankan.
Di UI Airflow, pada halaman DAG, klik
dag_10_tasks_20_seconds_10
DAG. Anda akan melihat 10 DAG berjalan, masing-masing memiliki 20 tugas yang berhasil.Di konsol Google Cloud, buka halaman Environments.
Di daftar lingkungan, klik nama lingkungan Anda. Halaman Environment details akan terbuka.
Buka tab Logs, lalu buka All logs > Airflow logs > Workers > View in Logs Explorer.
Contoh kedua dengan lebih banyak tugas gabungan menghasilkan sekitar 10 peringatan dan 7 error. Pada histogram, Anda dapat membandingkan jumlah error dan peringatan dalam contoh awal (nilai sebelumnya) dan contoh kedua (nilai berikutnya).
Gambar 6. Histogram log pekerja Airflow setelah tugas digabungkan (klik untuk memperbesar) Saat membandingkan contoh pertama dengan contoh yang lebih terkonsolidasi, Anda dapat melihat bahwa ada lebih sedikit error dan peringatan dalam contoh kedua. Namun, error yang sama terkait warm shutdown masih muncul dalam log karena kelebihan resource.
Di tab Monitoring, pilih Workers dan tinjau grafik.
Saat membandingkan grafik Tugas Airflow untuk contoh pertama (nilai sebelumnya) dengan grafik untuk contoh kedua dengan tugas yang lebih terkonsolidasi, Anda dapat melihat bahwa lonjakan tugas dalam antrean berlangsung selama jangka waktu yang lebih singkat saat tugas lebih terkonsolidasi. Namun, proses ini berlangsung hampir 10 menit, yang masih kurang optimal.
Gambar 7. Grafik tugas Airflow setelah tugas digabungkan (klik untuk memperbesar) Pada grafik Pekerja aktif, Anda dapat melihat contoh pertama (di sisi kiri grafik) menggunakan resource selama jangka waktu yang jauh lebih lama daripada contoh kedua, meskipun kedua contoh meniru jumlah pekerjaan yang sama.
Gambar 8. Grafik pekerja aktif setelah tugas digabungkan (klik untuk memperbesar) Tinjau grafik konsumsi resource pekerja. Meskipun perbedaan antara resource yang digunakan dalam contoh dengan tugas yang lebih terkonsolidasi dan contoh awal cukup signifikan, penggunaan CPU masih melonjak hingga 70% dari batas.
Gambar 9. Grafik penggunaan CPU pekerja total setelah tugas digabungkan (klik untuk memperbesar) Gambar 10. Total grafik penggunaan memori pekerja setelah tugas digabungkan (klik untuk memperbesar)
Mendistribusikan tugas secara lebih merata dari waktu ke waktu
Terlalu banyak tugas serentak menyebabkan antrean terisi penuh, yang menyebabkan tugas macet di antrean atau dijadwalkan ulang. Pada langkah sebelumnya, Anda mengurangi jumlah tugas dengan menggabungkan tugas tersebut, tetapi log output dan pemantauan menunjukkan bahwa jumlah tugas serentak masih sub-optimal.
Anda dapat mengontrol jumlah tugas serentak yang berjalan dengan menerapkan jadwal atau menetapkan batas jumlah tugas yang dapat dijalankan secara bersamaan.
Dalam tutorial ini, Anda akan mendistribusikan tugas secara lebih merata dari waktu ke waktu dengan menambahkan
parameter tingkat DAG ke DAG dag_10_tasks_20_seconds_10
:
Tambahkan argumen
max_active_runs=1
ke pengelola konteks DAG. Argumen ini menetapkan batas hanya satu instance DAG yang berjalan pada waktu tertentu.Tambahkan argumen
max_active_tasks=5
ke pengelola konteks DAG. Argumen ini mengontrol jumlah maksimum instance tugas yang dapat berjalan secara serentak di setiap DAG.
Upload contoh DAG berikut ke lingkungan
yang Anda buat. Dalam tutorial ini, DAG ini diberi nama
dag_10_tasks_20_seconds_10_scheduled.py
.
import time
from datetime import datetime, timedelta
from airflow import DAG
from airflow.decorators import task
tasks_amount = 20
seconds = 10
minutes = 5
active_runs = 1
active_tasks = 5
with DAG(
dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}_runs_{active_runs}_tasks_{active_tasks}",
start_date=datetime(2021, 12, 22, 20, 0),
end_date=datetime(2021, 12, 22, 20, 49),
schedule_interval=timedelta(minutes=minutes),
max_active_runs=active_runs,
max_active_tasks=active_tasks,
catchup=True,
) as dag:
@task
def create_subtasks(seconds: int) -> None:
time.sleep(seconds)
for i in range(tasks_amount):
create_subtasks(seconds)
Evaluasi dampak pendistribusian tugas dari waktu ke waktu terhadap proses penjadwalan:
Tunggu hingga DAG selesai dijalankan.
Di konsol Google Cloud, buka halaman Environments.
Di daftar lingkungan, klik nama lingkungan Anda. Halaman Environment details akan terbuka.
Buka tab Logs, lalu buka All logs > Airflow logs > Workers > View in Logs Explorer.
Pada histogram, Anda dapat melihat bahwa DAG ketiga dengan jumlah tugas aktif dan berjalan yang terbatas tidak menghasilkan peringatan atau error apa pun dan distribusi log terlihat lebih merata dibandingkan dengan nilai sebelumnya.
Gambar 11. Histogram log pekerja Airflow setelah tugas digabungkan dan didistribusikan dari waktu ke waktu (klik untuk memperbesar)
Tugas dalam contoh dag_10_tasks_20_seconds_10_scheduled
yang memiliki
jumlah tugas aktif dan berjalan yang terbatas tidak menyebabkan tekanan resource karena
tugas diantrekan secara merata.
Setelah melakukan langkah-langkah yang dijelaskan, Anda telah mengoptimalkan penggunaan resource dengan menggabungkan tugas kecil dan mendistribusikannya secara lebih merata dari waktu ke waktu.
Mengoptimalkan konfigurasi lingkungan
Anda dapat menyesuaikan konfigurasi lingkungan untuk memastikan selalu ada kapasitas di pekerja Airflow untuk menjalankan tugas yang diantrekan.
Jumlah pekerja dan konkurensi pekerja
Anda dapat menyesuaikan jumlah maksimum pekerja agar Cloud Composer otomatis menskalakan lingkungan Anda dalam batas yang ditetapkan.
Parameter [celery]worker_concurrency
menentukan jumlah maksimum tugas
yang dapat diambil oleh satu pekerja dari task queue. Mengubah parameter ini
akan menyesuaikan jumlah tugas yang dapat dijalankan oleh satu pekerja secara bersamaan.
Anda dapat mengubah opsi konfigurasi Airflow ini dengan
menggantikannya. Secara default, konkurensi pekerja ditetapkan ke minimum dari hal berikut: 32, 12 * worker_CPU, 8 * worker_memory
, yang berarti bergantung pada batas resource pekerja. Lihat
Mengoptimalkan lingkungan untuk mengetahui informasi selengkapnya tentang nilai
konkurensi pekerja default.
Jumlah pekerja dan konkurensi pekerja bekerja secara bersamaan, dan performa lingkungan Anda sangat bergantung pada kedua parameter tersebut. Anda dapat menggunakan pertimbangan berikut untuk memilih kombinasi yang tepat:
Beberapa tugas cepat yang berjalan secara paralel. Anda dapat meningkatkan konkurensi pekerja saat ada tugas yang menunggu dalam antrean, dan pekerja menggunakan persentase CPU dan memori yang rendah secara bersamaan. Namun, dalam keadaan tertentu, antrean mungkin tidak pernah terisi penuh, sehingga penskalaan otomatis tidak pernah terpicu. Jika tugas kecil selesai dieksekusi pada saat pekerja baru sudah siap, pekerja yang ada dapat mengambil tugas yang tersisa, dan tidak akan ada tugas untuk pekerja yang baru dibuat.
Dalam situasi ini, sebaiknya tingkatkan jumlah minimum pekerja dan tingkatkan konkurensi pekerja untuk menghindari penskalaan yang berlebihan.
Beberapa tugas panjang yang berjalan secara paralel. Serentak pekerja yang tinggi mencegah sistem menskalakan jumlah pekerja. Jika beberapa tugas memerlukan banyak resource dan memerlukan waktu lama untuk diselesaikan, konkurensi pekerja yang tinggi dapat menyebabkan antrean tidak pernah terisi dan semua tugas diambil hanya oleh satu pekerja, yang mengakibatkan masalah performa. Dalam situasi ini, sebaiknya tingkatkan jumlah maksimum pekerja dan kurangi konkurensi pekerja.
Pentingnya paralelisme
Penjadwal Airflow mengontrol penjadwalan operasi DAG dan setiap tugas dari
DAG. Opsi konfigurasi Airflow [core]parallelism
mengontrol jumlah
tugas yang dapat diantrekan oleh penjadwal Airflow dalam antrean eksekutor setelah semua
dependensi untuk tugas ini terpenuhi.
Paralelisme adalah mekanisme perlindungan Airflow yang menentukan jumlah tugas yang dapat dijalankan secara bersamaan per setiap penjadwal, terlepas dari jumlah pekerja. Nilai paralelisme, yang dikalikan dengan jumlah penjadwal di cluster Anda, adalah jumlah maksimum instance tugas yang dapat diantrekan oleh lingkungan Anda.
Biasanya, [core]parallelism
ditetapkan sebagai produk dari jumlah maksimum pekerja
dan [celery]worker_concurrency
. Hal ini juga dipengaruhi oleh
kumpulan.
Anda dapat mengubah opsi konfigurasi Airflow ini dengan
menggantikannya. Untuk informasi selengkapnya tentang cara menyesuaikan konfigurasi Airflow
yang terkait dengan penskalaan, lihat
Menskalakan konfigurasi Airflow.
Menemukan konfigurasi lingkungan yang optimal
Cara yang direkomendasikan untuk memperbaiki masalah penjadwalan adalah menggabungkan tugas kecil menjadi tugas yang lebih besar dan mendistribusikan tugas secara lebih merata dari waktu ke waktu. Selain mengoptimalkan kode DAG, Anda juga dapat mengoptimalkan konfigurasi lingkungan agar memiliki kapasitas yang memadai untuk menjalankan beberapa tugas secara serentak.
Misalnya, Anda menggabungkan tugas dalam DAG sebanyak mungkin, tetapi membatasi tugas aktif untuk menyebarkannya secara lebih merata dari waktu ke waktu bukan solusi yang lebih disukai untuk kasus penggunaan tertentu.
Anda dapat menyesuaikan paralelisme, jumlah pekerja, dan parameter serentak pekerja
untuk menjalankan DAG dag_10_tasks_20_seconds_10
tanpa membatasi tugas
aktif. Dalam contoh ini, DAG berjalan 10 kali dan setiap operasinya berisi 20 tugas kecil.
Jika Anda ingin menjalankan semuanya secara bersamaan:
Anda memerlukan ukuran lingkungan yang lebih besar, karena ukuran tersebut mengontrol parameter performa infrastruktur Cloud Composer terkelola di lingkungan Anda.
Pekerja Airflow harus dapat menjalankan 20 tugas secara bersamaan, yang berarti Anda perlu menetapkan konkurensi pekerja ke 20.
Pekerja memerlukan CPU dan memori yang memadai untuk menangani semua tugas. Serentak pekerja dipengaruhi oleh CPU dan memori pekerja, sehingga Anda memerlukan setidaknya
worker_concurrency / 12
dalam CPU danleast worker_concurrency / 8
dalam memori.Anda harus meningkatkan paralelisme agar sesuai dengan konkurensi pekerja yang lebih tinggi. Agar pekerja dapat mengambil 20 tugas dari antrean, penjadwal harus menjadwalkan 20 tugas tersebut terlebih dahulu.
Sesuaikan konfigurasi lingkungan Anda dengan cara berikut:
Di konsol Google Cloud, buka halaman Environments.
Di daftar lingkungan, klik nama lingkungan Anda. Halaman Environment details akan terbuka.
Buka tab Konfigurasi lingkungan.
Temukan konfigurasi Resources > Workloads, lalu klik Edit.
Di bagian Worker, di kolom Memory, tentukan batas memori baru untuk pekerja Airflow. Dalam tutorial ini, gunakan 4 GB.
Di kolom CPU, tentukan batas CPU baru untuk pekerja Airflow. Dalam tutorial ini, gunakan 2 vCPU.
Simpan perubahan dan tunggu beberapa menit hingga pekerja Airflow Anda memulai ulang.
Selanjutnya, ganti opsi konfigurasi Airflow paralelisme dan konkurensi pekerja:
Buka tab Penggantian Konfigurasi Airflow.
Klik Edit, lalu klik Tambahkan Penggantian Konfigurasi Airflow.
Ganti konfigurasi paralelisme:
Bagian Kunci Nilai core
parallelism
20
Klik Add Airflow Configuration Override dan ganti konfigurasi konkurensi pekerja:
Bagian Kunci Nilai celery
worker_concurrency
20
Klik Save dan tunggu hingga lingkungan memperbarui konfigurasinya.
Picu contoh DAG yang sama lagi dengan konfigurasi yang disesuaikan:
Di UI Airflow, buka halaman DAGs.
Temukan DAG
dag_10_tasks_20_seconds_10
dan hapus.Setelah DAG dihapus, Airflow akan memeriksa folder DAG di bucket lingkungan Anda dan otomatis menjalankan DAG lagi.
Setelah DAG selesai berjalan, tinjau histogram Log lagi. Pada diagram,
Anda dapat melihat bahwa contoh dag_10_tasks_20_seconds_10
dengan lebih banyak
tugas gabungan tidak menghasilkan error dan peringatan saat dijalankan dengan
konfigurasi lingkungan yang disesuaikan. Bandingkan hasilnya dengan data sebelumnya
pada diagram, dengan contoh yang sama menghasilkan error dan peringatan saat
berjalan dengan konfigurasi lingkungan default.

Konfigurasi lingkungan dan konfigurasi Airflow memainkan peran penting dalam penjadwalan tugas, tetapi Anda tidak dapat meningkatkan konfigurasi di luar batas tertentu.
Sebaiknya optimalkan kode DAG, gabungkan tugas, dan gunakan penjadwalan untuk performa dan efisiensi yang dioptimalkan.
Contoh: Error penguraian DAG dan latensi karena kode DAG yang kompleks
Dalam contoh ini, Anda akan menyelidiki latensi penguraian contoh DAG yang meniru kelebihan variabel Airflow.
Membuat variabel Airflow baru
Sebelum mengupload kode contoh, buat variabel Airflow baru.
Di konsol Google Cloud, buka halaman Environments.
Di kolom Airflow webserver, ikuti link Airflow untuk lingkungan Anda.
Buka Admin > Variabel > Tambahkan data baru.
Tetapkan nilai berikut:
- kunci:
example_var
- val:
test_airflow_variable
- kunci:
Mengupload DAG contoh ke lingkungan Anda
Upload contoh DAG berikut ke lingkungan
yang Anda buat di langkah sebelumnya. Dalam tutorial ini, DAG ini diberi nama
dag_for_loop_airflow_variable
.
DAG ini berisi loop for yang berjalan 1.000 kali dan meniru kelebihan
variabel Airflow. Setiap iterasi membaca variabel example_var
dan
membuat tugas. Setiap tugas berisi satu perintah yang mencetak nilai variabel.
from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.models import Variable
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 2, 17),
'retries': 0
}
dag = DAG(
'dag_for_loop_airflow_variable',
default_args=default_args,
catchup=False,
schedule_interval="@daily"
)
for i in range(1000):
a = Variable.get('example_var', 'N/A')
task = BashOperator(
task_id=f'task_{i}',
bash_command='echo variable foo=${foo_env}',
dag=dag,
env={'foo_env': a}
)
Mendiagnosis masalah penguraian
Waktu penguraian DAG adalah jumlah waktu yang diperlukan oleh penjadwal Airflow untuk membaca file DAG dan mengurainya. Sebelum penjadwal Airflow dapat menjadwalkan tugas apa pun dari DAG, penjadwal harus mengurai file DAG untuk menemukan struktur DAG dan tugas yang ditentukan.
Jika DAG memerlukan waktu lama untuk diuraikan, hal ini akan menghabiskan kapasitas penjadwal dan mungkin mengurangi performa DAG yang berjalan.
Untuk memantau waktu penguraian DAG:
Jalankan perintah Airflow CLI
dags report
di gcloud CLI untuk melihat waktu penguraian untuk semua DAG Anda:gcloud composer environments run ENVIRONMENT_NAME \ --location LOCATION \ dags report
Ganti kode berikut:
ENVIRONMENT_NAME
: nama lingkungan Anda.LOCATION
: region tempat lingkungan berada.
Di output perintah, cari nilai durasi untuk DAG
dag_for_loop_airflow_variables
. Nilai yang besar mungkin menunjukkan bahwa DAG ini tidak diterapkan dengan cara yang optimal. Jika memiliki beberapa DAG, dari tabel output, Anda dapat mengidentifikasi DAG mana yang memiliki waktu penguraian yang lama.Contoh:
file | duration | dag_num | task_num | dags ====================+================+=========+==========+===================== /dag_for_loop_airfl | 0:00:14.773594 | 1 | 1000 | dag_for_loop_airflow ow_variable.py | | | | _variable /airflow_monitoring | 0:00:00.003035 | 1 | 1 | airflow_monitoring .py
Periksa waktu penguraian DAG di konsol Google Cloud:
- Di konsol Google Cloud, buka halaman Environments.
Di daftar lingkungan, klik nama lingkungan Anda. Halaman Environment details akan terbuka.
Buka tab Logs, lalu buka All logs > DAG processor manager.
Tinjau log
dag-processor-manager
dan identifikasi kemungkinan masalah.Gambar 13. Log pengelola pemroses DAG menampilkan waktu penguraian DAG (klik untuk memperbesar)
Jika total waktu penguraian DAG melebihi sekitar 10 detik, penjadwal Anda mungkin dibebani dengan penguraian DAG dan tidak dapat menjalankan DAG secara efektif.
Mengoptimalkan kode DAG
Sebaiknya hindari kode Python "tingkat teratas" yang tidak perlu di DAG Anda. DAG dengan banyak impor, variabel, dan fungsi di luar DAG akan menyebabkan waktu penguraian yang lebih lama untuk penjadwal Airflow. Hal ini mengurangi performa dan skalabilitas Cloud Composer dan Airflow. Kelebihan pembacaan variabel Airflow menyebabkan waktu penguraian yang lama dan beban database yang tinggi. Jika kode ini berada dalam file DAG, fungsi ini akan dijalankan pada setiap heartbeat penjadwal, yang mungkin lambat.
Kolom template Airflow memungkinkan Anda menggabungkan nilai dari variabel Airflow dan template Jinja ke dalam DAG. Hal ini mencegah eksekusi fungsi yang tidak perlu selama heartbeat penjadwal.
Untuk menerapkan contoh DAG dengan cara yang lebih baik, hindari penggunaan variabel Airflow di kode Python tingkat atas DAG. Sebagai gantinya, teruskan variabel Airflow ke operator yang ada melalui template Jinja, yang akan menunda pembacaan nilai hingga eksekusi tugas.
Upload DAG contoh versi baru ke lingkungan Anda. Dalam tutorial ini, DAG ini diberi nama
dag_for_loop_airflow_variable_optimized
.
from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 2, 17),
'retries': 0
}
dag = DAG(
'dag_for_loop_airflow_variable_optimized',
default_args=default_args,
catchup=False,
schedule_interval='@daily'
)
for i in range(1000):
task = BashOperator(
task_id=f'bash_use_variable_good_{i}',
bash_command='echo variable foo=${foo_env}',
dag=dag,
env={'foo_env': '{{ var.value.get("example_var") }}'},
)
Periksa waktu penguraian DAG baru:
Tunggu hingga DAG selesai dijalankan.
Jalankan perintah
dags report
lagi untuk melihat waktu penguraian untuk semua DAG Anda:file | duration | dag_num | task_num | dags ====================+================+=========+==========+===================== /dag_for_loop_airfl | 0:00:37.000369 | 1 | 1000 | dag_for_loop_airflow ow_variable.py | | | | _variable /dag_for_loop_airfl | 0:00:01.109457 | 1 | 1000 | dag_for_loop_airflow ow_variable_optimiz | | | | _variable_optimized ed.py | | | | /airflow_monitoring | 0:00:00.040510 | 1 | 1 | airflow_monitoring .py | | | |
Tinjau log
dag-processor-manager
lagi dan analisis durasi penguraian.Gambar 14. Log pengelola pemroses DAG menunjukkan waktu penguraian DAG setelah kode DAG dioptimalkan (klik untuk memperbesar)
Dengan mengganti variabel lingkungan dengan template Airflow, Anda menyederhanakan kode DAG dan mengurangi latensi penguraian sekitar sepuluh kali lipat.
Mengoptimalkan konfigurasi lingkungan Airflow
Penjadwal Airflow terus mencoba memicu tugas baru dan mengurai semua DAG di bucket lingkungan Anda. Jika DAG Anda memiliki waktu penguraian yang lama dan penjadwal menghabiskan banyak resource, Anda dapat mengoptimalkan konfigurasi penjadwal Airflow agar penjadwal dapat menggunakan resource secara lebih efisien.
Dalam tutorial ini, file DAG memerlukan banyak waktu untuk diuraikan, dan siklus penguraian
mulai tumpang-tindih, yang kemudian menghabiskan kapasitas penjadwal. Dalam contoh kami,
contoh DAG pertama memerlukan waktu lebih dari 5 detik untuk diuraikan, sehingga Anda akan mengonfigurasi
penjadwal agar berjalan lebih jarang untuk menggunakan resource secara lebih efisien. Anda
akan mengganti
opsi konfigurasi Airflow
scheduler_heartbeat_sec
. Konfigurasi ini menentukan frekuensi penjadwal harus berjalan (dalam detik). Secara default, nilai ditetapkan ke 5 detik.
Anda dapat mengubah opsi konfigurasi Airflow ini dengan
menggantikannya.
Ganti opsi konfigurasi Airflow scheduler_heartbeat_sec
:
Di konsol Google Cloud, buka halaman Environments.
Di daftar lingkungan, klik nama lingkungan Anda. Halaman Environment details akan terbuka.
Buka tab Penggantian Konfigurasi Airflow.
Klik Edit, lalu klik Tambahkan Penggantian Konfigurasi Airflow.
Ganti opsi konfigurasi Airflow:
Bagian Kunci Nilai scheduler
scheduler_heartbeat_sec
10
Klik Save dan tunggu hingga lingkungan memperbarui konfigurasinya.
Periksa metrik penjadwal:
Buka tab Monitoring, lalu pilih Schedulers.
Pada grafik Heartbeat penjadwal, klik tombol Opsi lainnya (tiga titik), lalu klik Lihat di Metrics Explorer.

Pada grafik, Anda akan melihat penjadwal berjalan dua kali lebih jarang setelah Anda mengubah konfigurasi default dari 5 detik menjadi 10 detik. Dengan mengurangi frekuensi heartbeat, Anda memastikan bahwa penjadwal tidak mulai berjalan saat siklus penguraian sebelumnya sedang berlangsung dan kapasitas resource penjadwal tidak habis.
Menetapkan lebih banyak resource ke penjadwal
Di Cloud Composer 2, Anda dapat mengalokasikan lebih banyak resource CPU dan memori ke penjadwal. Dengan cara ini, Anda dapat meningkatkan performa penjadwal dan mempercepat waktu penguraian untuk DAG.
Alokasikan CPU dan memori tambahan ke penjadwal:
Di konsol Google Cloud, buka halaman Environments.
Di daftar lingkungan, klik nama lingkungan Anda. Halaman Environment details akan terbuka.
Buka tab Konfigurasi lingkungan.
Temukan konfigurasi Resources > Workloads, lalu klik Edit.
Di bagian Penjadwal, di kolom Memori, tentukan batas memori baru. Dalam tutorial ini, gunakan 4 GB.
Di kolom CPU, tentukan batas CPU baru. Dalam tutorial ini, gunakan 2 vCPU.
Simpan perubahan dan tunggu beberapa menit hingga penjadwal Airflow Anda dimulai ulang.
Buka tab Logs, lalu buka All logs > DAG processor manager.
Tinjau log
dag-processor-manager
dan bandingkan durasi penguraian untuk contoh DAG:Gambar 16. Log pengelola pemroses DAG menunjukkan waktu penguraian DAG setelah lebih banyak resource ditetapkan ke penjadwal (klik untuk memperbesar)
Dengan menetapkan lebih banyak resource ke penjadwal, Anda meningkatkan kapasitas penjadwal dan mengurangi latensi penguraian secara signifikan dibandingkan dengan konfigurasi lingkungan default. Dengan lebih banyak resource, penjadwal dapat mengurai DAG lebih cepat, tetapi biaya yang terkait dengan resource Cloud Composer juga akan meningkat. Selain itu, resource tidak dapat ditingkatkan melebihi batas tertentu.
Sebaiknya alokasikan resource hanya setelah kemungkinan kode DAG dan optimasi konfigurasi Airflow diterapkan.
Pembersihan
Agar tidak dikenai biaya pada akun Google Cloud Anda untuk resource yang digunakan dalam tutorial ini, hapus project yang berisi resource tersebut, atau simpan project dan hapus setiap resource.
Menghapus project
- In the Google Cloud console, go to the Manage resources page.
- In the project list, select the project that you want to delete, and then click Delete.
- In the dialog, type the project ID, and then click Shut down to delete the project.
Menghapus resource satu per satu
Jika Anda berencana mempelajari beberapa tutorial dan panduan memulai, menggunakan kembali project dapat membantu Anda agar tidak melampaui batas kuota project.
Hapus lingkungan Cloud Composer. Anda juga akan menghapus bucket lingkungan selama prosedur ini.