Menjadwalkan dan memicu DAG Airflow

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Halaman ini menjelaskan cara kerja penjadwalan dan pemicuan DAG di Airflow, cara menentukan jadwal untuk DAG, dan cara memicu DAG secara manual atau menjeda DAG.

Tentang DAG Airflow di Cloud Composer

DAG Airflow di Cloud Composer dijalankan di satu atau beberapa lingkungan Cloud Composer dalam project Anda. Anda mengupload file sumber DAG Airflow ke bucket Cloud Storage yang terkait dengan lingkungan. Instance Airflow lingkungan kemudian mengurai file ini dan menjadwalkan operasi DAG, seperti yang ditentukan oleh jadwal setiap DAG. Selama DAG berjalan, Airflow menjadwalkan dan menjalankan setiap tugas yang membentuk DAG dalam urutan yang ditentukan oleh DAG.

Untuk mempelajari lebih lanjut konsep inti Airflow seperti DAG Airflow, operasi DAG, tugas, atau operator, lihat halaman Konsep Inti dalam dokumentasi Airflow.

Tentang penjadwalan DAG di Airflow

Airflow menyediakan konsep berikut untuk mekanisme penjadwalannya:

Tanggal logis

Merepresentasikan tanggal saat DAG tertentu dijalankan.

Ini bukan tanggal sebenarnya saat Airflow menjalankan DAG, tetapi jangka waktu yang harus diproses oleh DAG tertentu. Misalnya, untuk DAG yang dijadwalkan untuk berjalan setiap hari pukul 12.00, tanggal logisnya juga akan menjadi 12.00 pada hari tertentu. Karena berjalan dua kali per hari, periode waktu yang harus diproses adalah 12 jam terakhir. Pada saat yang sama, logika yang ditentukan dalam DAG itu sendiri mungkin tidak menggunakan tanggal logis atau interval waktu sama sekali. Misalnya, DAG mungkin menjalankan skrip yang sama sekali per hari tanpa menggunakan nilai tanggal logis.

Pada versi Airflow sebelum 2.2, tanggal ini disebut tanggal eksekusi.

Tanggal dijalankan

Merepresentasikan tanggal saat DAG tertentu dijalankan.

Misalnya, untuk DAG yang dijadwalkan untuk berjalan setiap hari pukul 12.00, eksekusi DAG yang sebenarnya mungkin terjadi pada pukul 12.05, beberapa saat setelah tanggal logis berlalu.

Interval jadwal

Menampilkan kapan dan seberapa sering DAG harus dieksekusi, dalam hal tanggal logika.

Misalnya, jadwal harian berarti DAG dijalankan sekali per hari, dan tanggal logis untuk DAG-nya berjalan dengan interval 24 jam.

Tanggal mulai

Menentukan kapan Anda ingin Airflow mulai menjadwalkan DAG.

Tugas dalam DAG dapat memiliki tanggal mulai individual, atau Anda dapat menentukan satu tanggal mulai untuk semua tugas. Berdasarkan tanggal mulai minimum untuk tugas di DAG dan interval jadwal, Airflow menjadwalkan DAG berjalan.

Penangkapan, pengisian ulang, dan percobaan ulang

Mekanisme untuk menjalankan DAG untuk tanggal yang lalu.

Catchup menjalankan operasi DAG yang belum berjalan, misalnya, jika DAG dijeda selama jangka waktu yang lama, lalu dijeda lagi. Anda dapat menggunakan pengisian ulang untuk menjalankan DAG untuk rentang tanggal tertentu. Percobaan ulang menentukan jumlah percobaan yang harus dilakukan Airflow saat menjalankan tugas dari DAG.

Penjadwalan berfungsi dengan cara berikut:

  1. Setelah tanggal mulai berlalu, Airflow akan menunggu kemunculan berikutnya dari interval jadwal.

  2. Airflow menjadwalkan operasi DAG pertama agar terjadi di akhir interval jadwal ini.

    Misalnya, jika DAG dijadwalkan untuk berjalan setiap jam dan tanggal mulainya pukul 12.00 hari ini, maka DAG pertama akan berjalan pukul 13.00 hari ini.

Bagian Menjadwalkan DAG Airflow dalam dokumen ini menjelaskan cara menyiapkan penjadwalan untuk DAG menggunakan konsep ini. Untuk mengetahui informasi selengkapnya tentang pengoperasian dan penjadwalan DAG, lihat Pengoperasian DAG dalam dokumentasi Airflow.

Tentang cara memicu DAG

Airflow menyediakan cara berikut untuk memicu DAG:

  • Pemicu sesuai jadwal. Airflow memicu DAG secara otomatis berdasarkan jadwal yang ditentukan untuknya dalam file DAG.

  • Pemicu secara manual. Anda dapat memicu DAG secara manual dari Konsol Google Cloud, UI Airflow, atau dengan menjalankan perintah Airflow CLI dari Google Cloud CLI.

  • Memicu sebagai respons terhadap peristiwa. Cara standar untuk memicu DAG sebagai respons terhadap peristiwa adalah dengan menggunakan sensor.

Cara lain untuk memicu DAG:

Sebelum memulai

  • Pastikan akun Anda memiliki peran yang dapat mengelola objek di bucket lingkungan serta melihat dan memicu DAG. Untuk informasi selengkapnya, lihat Kontrol akses.

Menjadwalkan DAG Airflow

Anda menentukan jadwal untuk DAG dalam file DAG. Edit definisi DAG dengan cara berikut:

  1. Temukan dan edit file DAG di komputer Anda. Jika tidak memiliki file DAG, Anda dapat mendownload salinannya dari bucket lingkungan. Untuk DAG baru, Anda dapat menentukan semua parameter saat membuat file DAG.

  2. Di parameter schedule_interval, tentukan jadwal. Anda dapat menggunakan ekspresi Cron, seperti 0 0 * * *, atau preset, seperti @daily. Untuk mengetahui informasi selengkapnya, lihat Cron dan Interval Waktu dalam dokumentasi Airflow.

    Airflow menentukan tanggal logis untuk pengoperasian DAG berdasarkan jadwal yang Anda tetapkan.

  3. Di parameter start_date, tentukan tanggal mulai.

    Airflow menentukan tanggal logis dari DAG pertama yang dijalankan menggunakan parameter ini.

  4. (Opsional) Di parameter catchup, tentukan apakah Airflow harus mengeksekusi semua operasi DAG sebelumnya dari tanggal mulai hingga tanggal saat ini yang belum dieksekusi.

    Operasi DAG yang dijalankan selama proses mengejar akan memiliki tanggal logis di masa lalu dan tanggal operasinya akan mencerminkan waktu saat operasi DAG benar-benar dijalankan.

  5. (Opsional) Di parameter retries, tentukan berapa kali Airflow harus mencoba kembali tugas yang gagal (setiap DAG terdiri dari satu atau beberapa tugas individual). Secara default, tugas di Cloud Composer dicoba ulang dua kali.

  6. Upload DAG versi baru ke bucket lingkungan.

  7. Tunggu hingga Airflow berhasil mengurai DAG. Misalnya, Anda dapat memeriksa daftar DAG di lingkungan Anda di konsol Google Cloud atau di UI Airflow.

Contoh definisi DAG berikut berjalan dua kali sehari pada pukul 00.00 dan 12.00. Tanggal mulainya ditetapkan ke 1 Januari 2024, tetapi Airflow tidak menjalankannya untuk tanggal sebelumnya setelah Anda mengupload atau menjeda karena fitur mengejar ketinggalan dinonaktifkan.

DAG berisi satu tugas bernama insert_query_job, yang menyisipkan baris ke dalam tabel dengan operator BigQueryInsertJobOperator. Operator ini adalah salah satu Operator BigQuery Google Cloud yang dapat Anda gunakan untuk mengelola set data dan tabel, menjalankan kueri, serta memvalidasi data. Jika eksekusi tertentu dari tugas ini gagal, Airflow akan mencobanya lagi empat kali lagi dengan interval percobaan ulang default. Tanggal logis untuk percobaan ulang ini tetap sama.

Kueri SQL untuk baris ini menggunakan template Airflow untuk menulis tanggal dan nama logis DAG ke baris.

import datetime

from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator

with DAG(
  "bq_example_scheduling_dag",
  start_date=datetime.datetime(2024, 1, 1),
  schedule_interval='0 */12 * * *',
  catchup=False
  ) as dag:

  insert_query_job = BigQueryInsertJobOperator(
    task_id="insert_query_job",
    retries=4,
    configuration={
        "query": {
            # schema: date (string), description (string)
            # example row: "20240101T120000", "DAG run: <DAG: bq_example_scheduling_dag>"
            "query": "INSERT example_dataset.example_table VALUES ('{{ ts_nodash }}', 'DAG run: {{ dag }}' )",
            "useLegacySql": False,
            "priority": "BATCH",
        }
    },
    location="us-central1"
  )

  insert_query_job

Untuk menguji DAG ini, Anda dapat memicunya secara manual, lalu melihat log eksekusi tugas.

Contoh parameter penjadwalan lainnya

Contoh parameter penjadwalan berikut menggambarkan cara kerja penjadwalan dengan berbagai kombinasi parameter:

  • Jika start_date adalah datetime(2024, 4, 4, 16, 25) dan schedule_interval adalah 30 16 * * *, maka DAG pertama akan dijalankan pada pukul 16.30 pada 5 April 2024.

  • Jika start_date adalah datetime(2024, 4, 4, 16, 35) dan schedule_interval adalah 30 16 * * *, maka DAG pertama akan dijalankan pada pukul 16.30 pada 6 April 2024. Karena tanggal mulainya setelah interval jadwal pada 4 April 2024, eksekusi DAG tidak terjadi pada 5 April 2024. Sebagai gantinya, interval jadwal berakhir pada pukul 16.35 pada 5 April 2024, sehingga DAG berikutnya akan dijalankan pada pukul 16.30 pada hari berikutnya.

  • Jika start_date adalah datetime(2024, 4, 4), dan schedule_interval adalah @daily, operasi DAG pertama dijadwalkan pada pukul 00.00 pada 5 April 2024.

  • Jika start_date adalah datetime(2024, 4, 4, 16, 30), dan schedule_interval adalah 0 * * * *, maka DAG pertama akan dijalankan pada pukul 18.00 pada 4 April 2024. Setelah tanggal dan waktu yang ditentukan berlalu, Airflow menjadwalkan DAG berjalan pada menit 0 setiap jam. Titik waktu terdekat saat hal ini terjadi adalah pukul 17.00. Pada saat ini, Airflow menjadwalkan operasi DAG untuk terjadi di akhir interval jadwal, yaitu pukul 18.00.

Memicu DAG secara manual

Saat Anda memicu DAG Airflow secara manual, Airflow akan menjalankan DAG satu kali, secara independen dari jadwal yang ditentukan dalam file DAG.

Konsol

UI DAG didukung di Cloud Composer 1.17.8 dan versi yang lebih baru.

Untuk memicu DAG dari konsol Google Cloud:

  1. Di konsol Google Cloud, buka halaman Environments.

    Buka Lingkungan

  2. Pilih lingkungan untuk melihat detailnya.

  3. Di halaman Environment details, buka tab DAGs.

  4. Klik nama DAG.

  5. Di halaman Detail DAG, klik Pemicu DAG. Operasi DAG baru dibuat.

UI Airflow

Untuk memicu DAG dari UI Airflow:

  1. Di konsol Google Cloud, buka halaman Environments.

    Buka Lingkungan

  2. Di kolom Airflow webserver, ikuti link Airflow untuk lingkungan Anda.

  3. Login dengan Akun Google yang memiliki izin yang sesuai.

  4. Di antarmuka web Airflow, pada halaman DAG, di kolom Link untuk DAG Anda, klik tombol Trigger Dag.

  5. (Opsional) Tentukan konfigurasi run DAG.

  6. Klik Pemicu.

gcloud

Di Airflow 1.10.12 atau yang lebih lama, jalankan perintah Airflow CLI trigger_dag:

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    trigger_dag -- DAG_ID

Di Airflow 1.10.14 atau yang lebih baru, termasuk Airflow 2, jalankan perintah dags trigger Airflow CLI:

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags trigger -- DAG_ID

Ganti kode berikut:

  • ENVIRONMENT_NAME: nama lingkungan Anda.
  • LOCATION: region tempat lingkungan berada.
  • DAG_ID: nama DAG.

Untuk mengetahui informasi selengkapnya tentang cara menjalankan perintah Airflow CLI di lingkungan Cloud Composer, lihat Menjalankan perintah Airflow CLI.

Untuk mengetahui informasi selengkapnya tentang perintah Airflow CLI yang tersedia, lihat referensi perintah gcloud composer environments run.

Melihat log dan detail operasi DAG

Di konsol Google Cloud, Anda dapat:

Selain itu, Cloud Composer menyediakan akses ke UI Airflow, yang merupakan antarmuka web Airflow sendiri.

Menjeda DAG

Konsol

UI DAG didukung di Cloud Composer 1.17.8 dan versi yang lebih baru.

Untuk menjeda DAG dari konsol Google Cloud:

  1. Di konsol Google Cloud, buka halaman Environments.

    Buka Lingkungan

  2. Pilih lingkungan untuk melihat detailnya.

  3. Di halaman Environment details, buka tab DAGs.

  4. Klik nama DAG.

  5. Di halaman Detail DAG, klik Jeda DAG.

UI Airflow

Untuk menjeda DAG dari UI Airflow:

  1. Di konsol Google Cloud, buka halaman Environments.

Buka Lingkungan

  1. Di kolom Airflow webserver, ikuti link Airflow untuk lingkungan Anda.

  2. Login dengan Akun Google yang memiliki izin yang sesuai.

  3. Di antarmuka web Airflow, di halaman DAG, klik tombol di samping nama DAG.

gcloud

Di Airflow 1.10.12 atau yang lebih lama, jalankan perintah Airflow CLI pause:

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    pause -- DAG_ID

Di Airflow 1.10.14 atau yang lebih baru, termasuk Airflow 2, jalankan perintah dags pause Airflow CLI:

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags pause -- DAG_ID

Ganti kode berikut:

  • ENVIRONMENT_NAME: nama lingkungan Anda.
  • LOCATION: region tempat lingkungan berada.
  • DAG_ID: nama DAG.

Untuk mengetahui informasi selengkapnya tentang cara menjalankan perintah Airflow CLI di lingkungan Cloud Composer, lihat Menjalankan perintah Airflow CLI.

Untuk mengetahui informasi selengkapnya tentang perintah Airflow CLI yang tersedia, lihat referensi perintah gcloud composer environments run.

Langkah selanjutnya