Picu DAG

Cloud Composer 1 | Cloud Composer 2

Halaman ini menjelaskan berbagai cara untuk memicu DAG di lingkungan Cloud Composer.

Airflow menyediakan cara berikut untuk memicu DAG:

  • Picu jadwal. Saat membuat DAG, Anda menentukan jadwal untuk DAG. Airflow memicu DAG secara otomatis berdasarkan parameter penjadwalan yang ditentukan.

  • Picu secara manual. Anda dapat memicu DAG secara manual dari UI Airflow, atau dengan menjalankan perintah Airflow CLI dari gcloud.

  • Dipicu sebagai respons terhadap peristiwa. Cara standar untuk memicu DAG sebagai respons terhadap peristiwa adalah dengan menggunakan sensor.

Cara lain untuk memicu DAG:

Memicu DAG sesuai jadwal

Untuk memicu DAG sesuai jadwal:

  1. Tentukan parameter start_date dan schedule_interval dalam file DAG, seperti yang akan dijelaskan nanti di bagian ini.
  2. Upload file DAG ke lingkungan Anda.

Menentukan parameter penjadwalan

Saat menentukan DAG, dalam parameter schedule_interval, Anda menentukan seberapa sering Anda ingin menjalankan DAG. Dalam parameter start_date, tentukan kapan Anda ingin Airflow mulai menjadwalkan DAG. Tugas di DAG dapat memiliki tanggal mulai individual, atau Anda dapat menentukan satu tanggal mulai untuk semua tugas. Berdasarkan tanggal mulai minimum untuk tugas di DAG dan interval jadwal, Airflow menjadwalkan DAG berjalan.

Penjadwalan berfungsi dengan cara berikut. Setelah start_date berlalu, Airflow akan menunggu kemunculan schedule_interval berikut. Kemudian, kode ini akan menjadwalkan operasi DAG pertama yang akan dilakukan di akhir interval jadwal ini. Misalnya, jika DAG dijadwalkan untuk berjalan setiap jam (schedule_interval adalah 1 jam) dan tanggal mulai adalah pukul 12.00 hari ini, operasi DAG pertama terjadi pada pukul 13.00 hari ini.

Contoh berikut menunjukkan DAG yang berjalan setiap jam mulai pukul 15.00 pada 5 April 2021. Dengan parameter yang digunakan dalam contoh, Airflow menjadwalkan operasi DAG pertama yang akan dilakukan pada pukul 16.00 pada 5 April 2021.

from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator

with DAG(
    dag_id='example_dag_schedule',
    # At 15:00 on 5 April, 2021
    start_date=datetime(2021, 4, 5, 15, 0),
    # At minute 0 of every hour
    schedule_interval='0 * * * *') as dag:

    # Output the current date and time
    t1 = BashOperator(
        task_id='date',
        bash_command='date',
        dag=dag)

    t1

Untuk mengetahui informasi selengkapnya tentang parameter penjadwalan, lihat DAG Run dalam dokumentasi Airflow.

Contoh parameter penjadwalan lainnya

Contoh parameter penjadwalan berikut menggambarkan cara kerja penjadwalan dengan berbagai kombinasi parameter:

  • Jika start_date adalah datetime(2021, 4, 4, 16, 25) dan schedule_interval adalah 30 16 * * *, operasi DAG pertama akan terjadi pada pukul 16.30 pada 5 April 2021.
  • Jika start_date adalah datetime(2021, 4, 4, 16, 35) dan schedule_interval adalah 30 16 * * *, operasi DAG pertama akan terjadi pada pukul 16.30 pada 6 April 2021. Karena tanggal mulainya jatuh setelah interval jadwal pada 4 April 2021, operasi DAG tidak terjadi pada 5 April 2021. Sebagai gantinya, interval jadwal berakhir pada pukul 16.35 pada 5 April 2021, sehingga proses DAG berikutnya dijadwalkan pada pukul 16.30 pada hari berikutnya.
  • Jika start_date adalah datetime(2021, 4, 4), dan schedule_interval adalah @daily, operasi DAG pertama akan dijadwalkan pada pukul 00.00 pada 5 April 2021.
  • Jika start_date adalah datetime(2021, 4, 4, 16, 30), dan schedule_interval adalah 0 * * * *, operasi DAG pertama akan dijadwalkan pada pukul 18.00 pada 4 April 2021. Setelah tanggal dan waktu yang ditentukan berlalu, Airflow menjadwalkan operasi DAG agar terjadi pada menit 0 setiap jam. Titik terdekat saat ini terjadi adalah pukul 17.00. Saat ini, Airflow menjadwalkan operasi DAG agar dilakukan di akhir interval jadwal, yaitu pada pukul 18.00.

Memicu DAG secara manual

Saat Anda memicu DAG secara manual, Airflow akan menjalankan DAG. Misalnya, jika Anda memiliki DAG yang sudah berjalan sesuai jadwal, dan Anda memicu DAG ini secara manual, Airflow akan menjalankan DAG satu kali, secara terpisah dari jadwal sebenarnya yang ditentukan untuk DAG.

Konsol

DAG UI didukung di Cloud Composer 1.17.8 dan versi yang lebih baru.

Untuk memicu DAG dari Konsol Google Cloud:

  1. Di konsol Google Cloud, buka halaman Environments.

    Buka Lingkungan

  2. Pilih lingkungan untuk melihat detailnya.

  3. Di halaman Environment details, buka tab DAGs.

  4. Klik nama DAG.

  5. Di halaman Detail DAG, klik Pemicu DAG. Operasi DAG baru dibuat.

UI Airflow

Untuk memicu DAG dari antarmuka web Airflow:

  1. Di konsol Google Cloud, buka halaman Environments.

Buka Lingkungan

  1. Di kolom webserver Airflow, ikuti link Airflow untuk lingkungan Anda.

  2. Login dengan akun Google yang memiliki izin yang sesuai.

  3. Di antarmuka web Airflow, di halaman DAGs, di kolom Links untuk DAG Anda, klik tombol Trigger Dag.

  4. (Opsional) Tentukan konfigurasi run DAG.

  5. Klik Pemicu.

gcloud

Di Airflow 1.10.12 atau yang lebih lama, jalankan perintah CLI Airflow trigger_dag:

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    trigger_dag -- DAG_ID

Di Airflow 1.10.14 atau yang lebih baru, termasuk Airflow 2, jalankan perintah dags trigger Airflow CLI:

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags trigger -- DAG_ID

Ganti:

  • ENVIRONMENT_NAME dengan nama lingkungan.
  • LOCATION dengan wilayah tempat lingkungan berada.
  • DAG_ID dengan nama DAG.

Untuk mengetahui informasi selengkapnya tentang cara menjalankan perintah Airflow CLI di lingkungan Cloud Composer, lihat Menjalankan perintah CLI Airflow.

Untuk mengetahui informasi selengkapnya tentang perintah CLI Airflow yang tersedia, lihat referensi perintah gcloud composer environments run.

Langkah selanjutnya