Menulis DAG Airflow

Cloud Composer 1 | Cloud Composer 2

Panduan ini menunjukkan cara menulis directed acyclic graph (DAG) Apache Airflow yang berjalan di lingkungan Cloud Composer.

Karena Apache Airflow tidak menyediakan DAG dan isolasi tugas yang kuat, sebaiknya gunakan lingkungan produksi dan pengujian yang terpisah untuk mencegah interferensi DAG. Untuk informasi selengkapnya, lihat Menguji DAG.

Menyusun DAG Aliran Udara

DAG Airflow ditentukan dalam file Python dan terdiri dari komponen berikut:

  • Definisi DAG
  • Operator Airflow
  • Hubungan operator

Cuplikan kode berikut menunjukkan contoh setiap komponen di luar konteks.

Definisi DAG

Contoh berikut menunjukkan definisi DAG:

Aliran udara 2

import datetime

from airflow import models

default_dag_args = {
    # The start_date describes when a DAG is valid / can be run. Set this to a
    # fixed point in time rather than dynamically, since it is evaluated every
    # time a DAG is parsed. See:
    # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
    "start_date": datetime.datetime(2018, 1, 1),
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
    "composer_sample_simple_greeting",
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:

Aliran udara 1

import datetime

from airflow import models

default_dag_args = {
    # The start_date describes when a DAG is valid / can be run. Set this to a
    # fixed point in time rather than dynamically, since it is evaluated every
    # time a DAG is parsed. See:
    # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
    "start_date": datetime.datetime(2018, 1, 1),
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
    "composer_sample_simple_greeting",
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:

Operator dan tugas

Operator menjelaskan pekerjaan yang akan dilakukan. Tugas adalah instance spesifik dari operator.

Aliran udara 2

from airflow.operators import bash_operator
from airflow.operators import python_operator

    def greeting():
        import logging

        logging.info("Hello World!")

    # An instance of an operator is called a task. In this case, the
    # hello_python task calls the "greeting" Python function.
    hello_python = python_operator.PythonOperator(
        task_id="hello", python_callable=greeting
    )

    # Likewise, the goodbye_bash task calls a Bash script.
    goodbye_bash = bash_operator.BashOperator(
        task_id="bye", bash_command="echo Goodbye."
    )

Aliran udara 1

from airflow.operators import bash_operator
from airflow.operators import python_operator

    def greeting():
        import logging

        logging.info("Hello World!")

    # An instance of an operator is called a task. In this case, the
    # hello_python task calls the "greeting" Python function.
    hello_python = python_operator.PythonOperator(
        task_id="hello", python_callable=greeting
    )

    # Likewise, the goodbye_bash task calls a Bash script.
    goodbye_bash = bash_operator.BashOperator(
        task_id="bye", bash_command="echo Goodbye."
    )

Hubungan tugas

Hubungan tugas menjelaskan urutan penyelesaian pekerjaan.

Aliran udara 2

# Define the order in which the tasks complete by using the >> and <<
# operators. In this example, hello_python executes before goodbye_bash.
hello_python >> goodbye_bash

Aliran udara 1

# Define the order in which the tasks complete by using the >> and <<
# operators. In this example, hello_python executes before goodbye_bash.
hello_python >> goodbye_bash

Contoh alur kerja DAG lengkap di Python

Alur kerja berikut adalah template DAG yang berfungsi dan terdiri dari dua tugas: tugas hello_python dan tugas goodbye_bash:

Aliran udara 2


import datetime

from airflow import models

from airflow.operators import bash_operator
from airflow.operators import python_operator

default_dag_args = {
    # The start_date describes when a DAG is valid / can be run. Set this to a
    # fixed point in time rather than dynamically, since it is evaluated every
    # time a DAG is parsed. See:
    # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
    "start_date": datetime.datetime(2018, 1, 1),
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
    "composer_sample_simple_greeting",
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:
    def greeting():
        import logging

        logging.info("Hello World!")

    # An instance of an operator is called a task. In this case, the
    # hello_python task calls the "greeting" Python function.
    hello_python = python_operator.PythonOperator(
        task_id="hello", python_callable=greeting
    )

    # Likewise, the goodbye_bash task calls a Bash script.
    goodbye_bash = bash_operator.BashOperator(
        task_id="bye", bash_command="echo Goodbye."
    )

    # Define the order in which the tasks complete by using the >> and <<
    # operators. In this example, hello_python executes before goodbye_bash.
    hello_python >> goodbye_bash

Aliran udara 1


import datetime

from airflow import models

from airflow.operators import bash_operator
from airflow.operators import python_operator

default_dag_args = {
    # The start_date describes when a DAG is valid / can be run. Set this to a
    # fixed point in time rather than dynamically, since it is evaluated every
    # time a DAG is parsed. See:
    # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
    "start_date": datetime.datetime(2018, 1, 1),
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
    "composer_sample_simple_greeting",
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:
    def greeting():
        import logging

        logging.info("Hello World!")

    # An instance of an operator is called a task. In this case, the
    # hello_python task calls the "greeting" Python function.
    hello_python = python_operator.PythonOperator(
        task_id="hello", python_callable=greeting
    )

    # Likewise, the goodbye_bash task calls a Bash script.
    goodbye_bash = bash_operator.BashOperator(
        task_id="bye", bash_command="echo Goodbye."
    )

    # Define the order in which the tasks complete by using the >> and <<
    # operators. In this example, hello_python executes before goodbye_bash.
    hello_python >> goodbye_bash

Untuk mengetahui informasi selengkapnya tentang cara menentukan DAG Airflow, lihat tutorial Airflow dan Konsep Airflow.

Operator Airflow

Contoh berikut menunjukkan beberapa operator Airflow yang populer. Untuk referensi otoritatif operator Airflow, baca Referensi Operator dan Hook serta indeks Penyedia.

BashOperator

Gunakan BashOperator untuk menjalankan program command line.

Aliran udara 2

from airflow.operators import bash

    # Create BigQuery output dataset.
    make_bq_dataset = bash.BashOperator(
        task_id="make_bq_dataset",
        # Executing 'bq' command requires Google Cloud SDK which comes
        # preinstalled in Cloud Composer.
        bash_command=f"bq ls {bq_dataset_name} || bq mk {bq_dataset_name}",
    )

Aliran udara 1

from airflow.operators import bash_operator

    # Create BigQuery output dataset.
    make_bq_dataset = bash_operator.BashOperator(
        task_id="make_bq_dataset",
        # Executing 'bq' command requires Google Cloud SDK which comes
        # preinstalled in Cloud Composer.
        bash_command=f"bq ls {bq_dataset_name} || bq mk {bq_dataset_name}",
    )

Cloud Composer menjalankan perintah yang diberikan dalam skrip Bash pada pekerja Airflow. Pekerja adalah container Docker berbasis Debian dan mencakup beberapa paket.

PythonOperator

Gunakan PythonOperator untuk menjalankan kode Python arbitrer.

Cloud Composer menjalankan kode Python dalam container yang menyertakan paket untuk versi image Cloud Composer yang digunakan di lingkungan Anda.

Untuk menginstal paket Python tambahan, baca artikel Menginstal Dependensi Python.

Operator Google Cloud

Untuk menjalankan tugas yang menggunakan produk Google Cloud, gunakan operator Google Cloud Airflow. Misalnya, operator BigQuery mengkueri dan memproses data di BigQuery.

Ada banyak operator Airflow lainnya untuk Google Cloud dan masing-masing layanan yang disediakan oleh Google Cloud. Baca bagian Operator Google Cloud untuk mengetahui daftar lengkapnya.

Aliran udara 2

from airflow.providers.google.cloud.operators import bigquery
from airflow.providers.google.cloud.transfers import bigquery_to_gcs

    bq_recent_questions_query = bigquery.BigQueryInsertJobOperator(
        task_id="bq_recent_questions_query",
        configuration={
            "query": {
                "query": RECENT_QUESTIONS_QUERY,
                "useLegacySql": False,
                "destinationTable": {
                    "projectId": project_id,
                    "datasetId": bq_dataset_name,
                    "tableId": bq_recent_questions_table_id,
                },
            }
        },
        location=location,
    )

Aliran udara 1

from airflow.contrib.operators import bigquery_operator

    # Query recent StackOverflow questions.
    bq_recent_questions_query = bigquery_operator.BigQueryOperator(
        task_id="bq_recent_questions_query",
        sql="""
        SELECT owner_display_name, title, view_count
        FROM `bigquery-public-data.stackoverflow.posts_questions`
        WHERE creation_date < CAST('{max_date}' AS TIMESTAMP)
            AND creation_date >= CAST('{min_date}' AS TIMESTAMP)
        ORDER BY view_count DESC
        LIMIT 100
        """.format(
            max_date=max_query_date, min_date=min_query_date
        ),
        use_legacy_sql=False,
        destination_dataset_table=bq_recent_questions_table_id,
    )

EmailOperator

Gunakan EmailOperator untuk mengirim email dari DAG. Untuk mengirim email dari lingkungan Cloud Composer, Anda harus mengonfigurasi lingkungan Anda untuk menggunakan SendGrid.

Aliran udara 2

from airflow.operators import email

    # Send email confirmation (you will need to set up the email operator
    # See https://cloud.google.com/composer/docs/how-to/managing/creating#notification
    # for more info on configuring the email operator in Cloud Composer)
    email_summary = email.EmailOperator(
        task_id="email_summary",
        to="{{var.value.email}}",
        subject="Sample BigQuery notify data ready",
        html_content="""
        Analyzed Stack Overflow posts data from {min_date} 12AM to {max_date}
        12AM. The most popular question was '{question_title}' with
        {view_count} views. Top 100 questions asked are now available at:
        {export_location}.
        """.format(
            min_date=min_query_date,
            max_date=max_query_date,
            question_title=(
                "{{ ti.xcom_pull(task_ids='bq_read_most_popular', "
                "key='return_value')[0][0] }}"
            ),
            view_count=(
                "{{ ti.xcom_pull(task_ids='bq_read_most_popular', "
                "key='return_value')[0][1] }}"
            ),
            export_location=output_file,
        ),
    )

Aliran udara 1

from airflow.operators import email_operator

    # Send email confirmation
    email_summary = email_operator.EmailOperator(
        task_id="email_summary",
        to="{{var.value.email}}",
        subject="Sample BigQuery notify data ready",
        html_content="""
        Analyzed Stack Overflow posts data from {min_date} 12AM to {max_date}
        12AM. The most popular question was '{question_title}' with
        {view_count} views. Top 100 questions asked are now available at:
        {export_location}.
        """.format(
            min_date=min_query_date,
            max_date=max_query_date,
            question_title=(
                "{{ ti.xcom_pull(task_ids='bq_read_most_popular', "
                "key='return_value')[0][0] }}"
            ),
            view_count=(
                "{{ ti.xcom_pull(task_ids='bq_read_most_popular', "
                "key='return_value')[0][1] }}"
            ),
            export_location=output_file,
        ),
    )

Notifikasi tentang kegagalan operator

Tetapkan email_on_failure ke True untuk mengirim notifikasi email saat operator dalam DAG gagal. Untuk mengirim notifikasi email dari lingkungan Cloud Composer, Anda harus mengonfigurasi lingkungan untuk menggunakan SendGrid.

Aliran udara 2

from airflow import models

default_dag_args = {
    "start_date": yesterday,
    # Email whenever an Operator in the DAG fails.
    "email": "{{var.value.email}}",
    "email_on_failure": True,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "project_id": project_id,
}

with models.DAG(
    "composer_sample_bq_notify",
    schedule_interval=datetime.timedelta(weeks=4),
    default_args=default_dag_args,
) as dag:

Aliran udara 1

from airflow import models

default_dag_args = {
    "start_date": yesterday,
    # Email whenever an Operator in the DAG fails.
    "email": "{{var.value.email}}",
    "email_on_failure": True,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "project_id": "{{var.value.gcp_project}}",
}

with models.DAG(
    "composer_sample_bq_notify",
    schedule_interval=datetime.timedelta(weeks=4),
    default_args=default_dag_args,
) as dag:

Panduan alur kerja DAG

  1. Tempatkan library Python kustom apa pun di arsip ZIP DAG di direktori bertingkat. Jangan menempatkan library di tingkat atas direktori DAG.

    Saat Airflow memindai folder dags/, Airflow hanya memeriksa DAG di modul Python yang ada di level teratas folder DAG dan di tingkat atas arsip ZIP yang juga terletak di folder dags/ level atas. Jika Airflow menemukan modul Python dalam arsip ZIP yang tidak berisi substring airflow dan DAG, Airflow akan berhenti memproses arsip ZIP. Airflow hanya menampilkan DAG yang ditemukan hingga saat itu.

  2. Gunakan Airflow 2, bukan Airflow 1.

    Komunitas Airflow tidak lagi memublikasikan rilis kecil atau patch baru untuk Airflow 1.

  3. Untuk fault tolerance, jangan menentukan beberapa objek DAG dalam modul Python yang sama.

  4. Jangan gunakan SubDAG. Sebagai gantinya, kelompokkan tugas di dalam DAG.

  5. Tempatkan file yang diperlukan pada waktu penguraian DAG ke dalam folder dags/, bukan di folder data/.

  6. Mengimplementasikan pengujian unit untuk DAG Anda

  7. Uji DAG yang dikembangkan atau diubah seperti yang direkomendasikan dalam petunjuk untuk menguji DAG.

  8. Pastikan DAG yang dikembangkan tidak meningkatkan waktu penguraian DAG terlalu banyak.

  9. Tugas Airflow bisa gagal karena beberapa alasan. Untuk menghindari kegagalan seluruh DAG berjalan, sebaiknya aktifkan percobaan ulang tugas. Menyetel percobaan ulang maksimum ke 0 berarti tidak ada percobaan ulang yang dilakukan.

    Sebaiknya ganti opsi default_task_retries dengan nilai untuk menghentikan tugas selain 0. Selain itu, Anda dapat menetapkan parameter retries pada level tugas.

  10. Jika Anda ingin menggunakan GPU dalam tugas Airflow, buat cluster GKE terpisah berdasarkan node menggunakan mesin dengan GPU. Gunakan GKEStartPodOperator untuk menjalankan tugas Anda.

  11. Hindari menjalankan tugas yang menggunakan CPU dan memori berat di kumpulan node cluster tempat komponen Airflow lainnya (penjadwal, pekerja, server web) dijalankan. Sebagai gantinya, gunakan KubernetesPodOperator atau GKEStartPodOperator.

  12. Saat men-deploy DAG ke lingkungan, hanya upload file yang benar-benar diperlukan untuk menafsirkan dan menjalankan DAG ke dalam folder /dags.

  13. Batasi jumlah file DAG di folder /dags.

    Airflow terus mengurai DAG di folder /dags. Penguraian adalah proses yang melakukan loop melalui folder DAG dan jumlah file yang perlu dimuat (dengan dependensinya) berdampak pada performa penguraian DAG dan penjadwalan tugas. Jauh lebih efisien untuk menggunakan 100 file dengan 100 DAG daripada 10.000 file dengan masing-masing 1 DAG, sehingga pengoptimalan seperti itu direkomendasikan. Pengoptimalan ini adalah keseimbangan antara waktu penguraian dan efisiensi pembuatan dan pengelolaan DAG.

    Anda juga dapat mempertimbangkan, misalnya, untuk men-deploy 10.000 file DAG, Anda dapat membuat 100 file zip yang masing-masing berisi 100 file DAG.

    Selain petunjuk di atas, jika Anda memiliki lebih dari 10.000 file DAG, pembuatan DAG dengan cara terprogram mungkin merupakan opsi yang baik. Misalnya, Anda dapat mengimplementasikan satu file DAG Python yang menghasilkan sejumlah objek DAG (misalnya, 20.100 objek DAG).

Hindari penggunaan operator Airflow yang tidak digunakan lagi

Operator yang tercantum dalam tabel berikut tidak digunakan lagi. Hindari menggunakannya di DAG Anda. Sebagai gantinya, gunakan alternatif terbaru yang disediakan.

Operator yang Tidak Digunakan Lagi Operator untuk Digunakan
BigQueryExecuteQueryOperator BigQueryInsertJobOperator
BigQueryPatchDatasetOperator BigQueryUpdateTableOperator
DataflowCreateJavaJobOperator BeamRunJavaPipelineOperator
DataflowCreatePythonJobOperator BeamRunPythonPipelineOperator
DataprocScaleClusterOperator DataprocUpdateClusterOperator
DataprocSubmitPigJobOperator DataprocSubmitJobOperator
DataprocSubmitSparkSqlJobOperator DataprocSubmitJobOperator
DataprocSubmitSparkJobOperator DataprocSubmitJobOperator
DataprocSubmitHadoopJobOperator DataprocSubmitJobOperator
DataprocSubmitPySparkJobOperator DataprocSubmitJobOperator
MLEngineManageModelOperator MLEngineCreateModelOperator, MLEngineGetModelOperator
MLEngineManageVersionOperator MLEngineCreateVersion, MLEngineSetDefaultVersion, MLEngineListVersions, MLEngineDeleteVersion
GCSObjectsWtihPrefixExistenceSensor GCSObjectsWithPrefixExistenceSensor

FAQ untuk menulis DAG

Bagaimana cara meminimalkan pengulangan kode jika saya ingin menjalankan tugas yang sama atau serupa di beberapa DAG?

Sebaiknya tentukan library dan wrapper untuk meminimalkan pengulangan kode.

Bagaimana cara menggunakan kembali kode antar-file DAG?

Tempatkan fungsi utilitas Anda di library Python lokal, lalu impor fungsi tersebut. Anda dapat mereferensikan fungsi ini di DAG mana pun yang berada di folder dags/ di bucket lingkungan Anda.

Bagaimana cara meminimalkan risiko dari perbedaan definisi yang timbul?

Misalnya, Anda memiliki dua tim yang ingin menggabungkan data mentah ke dalam metrik pendapatan. Tim menulis dua tugas yang sedikit berbeda yang menyelesaikan hal yang sama. Tentukan library untuk menangani data pendapatan sehingga pengimplementasi DAG harus mengklarifikasi definisi pendapatan yang digabungkan.

Bagaimana cara menetapkan dependensi antar-DAG?

Ini bergantung pada cara Anda ingin mendefinisikan dependensi.

Jika memiliki dua DAG (DAG A dan DAG B) dan ingin DAG B dipicu setelah DAG A, Anda dapat menempatkan TriggerDagRunOperator di akhir Dag A.

Jika DAG B hanya bergantung pada artefak yang dihasilkan DAG A, seperti pesan Pub/Sub, sensor mungkin akan berfungsi lebih baik.

Jika DAG B terintegrasi erat dengan DAG A, Anda mungkin dapat menggabungkan kedua DAG menjadi satu DAG.

Bagaimana cara meneruskan ID operasi unik ke DAG dan tugasnya?

Misalnya, Anda ingin meneruskan nama cluster Dataproc dan jalur file.

Anda dapat membuat ID unik acak dengan menampilkan str(uuid.uuid4()) dalam PythonOperator. Tindakan ini akan menempatkan ID ke XComs, sehingga Anda dapat merujuk ke ID tersebut di operator lain melalui kolom template.

Sebelum membuat uuid, pertimbangkan apakah ID khusus DagRun akan lebih berharga. Anda juga dapat mereferensikan ID ini dalam penggantian Jinja dengan menggunakan makro.

Bagaimana cara memisahkan tugas di DAG?

Setiap tugas harus menjadi unit kerja idempoten. Akibatnya, Anda harus menghindari enkapsulasi alur kerja multi-langkah dalam satu tugas, seperti program kompleks yang berjalan di PythonOperator.

Haruskah saya menentukan beberapa tugas dalam satu DAG untuk menggabungkan data dari berbagai sumber?

Misalnya, Anda memiliki beberapa tabel dengan data mentah dan ingin membuat agregat harian untuk setiap tabel. Tugas-tugas tersebut tidak saling bergantung. Haruskah Anda membuat satu tugas dan DAG untuk setiap tabel atau membuat satu DAG umum?

Jika Anda tidak keberatan dengan setiap tugas yang menggunakan properti level DAG yang sama, misalnya schedule_interval, sebaiknya tentukan beberapa tugas dalam satu DAG. Jika tidak, untuk meminimalkan pengulangan kode, beberapa DAG dapat dibuat dari satu modul Python dengan menempatkannya ke dalam globals() modul.

Bagaimana cara membatasi jumlah tugas serentak yang berjalan di DAG?

Misalnya, Anda tidak ingin melebihi batas penggunaan/kuota API atau tidak menjalankan terlalu banyak proses bersamaan.

Anda dapat menentukan kumpulan Airflow di UI web Airflow dan mengaitkan tugas dengan kumpulan yang ada di DAG.

FAQ terkait penggunaan operator

Haruskah saya menggunakan DockerOperator?

Sebaiknya jangan gunakan DockerOperator, kecuali jika digunakan untuk meluncurkan container pada penginstalan Docker jarak jauh (bukan di dalam cluster lingkungan). Di lingkungan Cloud Composer, operator tidak memiliki akses ke daemon Docker.

Sebagai gantinya, gunakan KubernetesPodOperator atau GKEStartPodOperator. Operator ini masing-masing meluncurkan pod Kubernetes ke cluster Kubernetes atau GKE. Perlu diperhatikan bahwa kami tidak merekomendasikan peluncuran pod ke cluster lingkungan karena hal ini dapat menyebabkan persaingan resource.

Haruskah saya menggunakan SubDagOperator?

Sebaiknya jangan gunakan SubDagOperator.

Gunakan alternatif seperti yang disarankan dalam Petunjuk Mengelompokkan Tugas.

Haruskah saya menjalankan kode Python hanya dalam PythonOperators untuk memisahkan operator Python sepenuhnya?

Bergantung pada sasaran, Anda memiliki beberapa opsi.

Jika satu-satunya kekhawatiran Anda adalah mempertahankan dependensi Python terpisah, Anda dapat menggunakan PythonVirtualenvOperator.

Sebaiknya gunakan KubernetesPodOperator. Operator ini dapat Anda gunakan untuk menentukan pod Kubernetes dan menjalankan pod tersebut di cluster lain.

Bagaimana cara menambahkan paket biner atau non-PyPI kustom?

Anda dapat menginstal paket yang dihosting di repositori paket pribadi.

Anda juga dapat menggunakan KubernetesPodOperator untuk menjalankan pod Kubernetes dengan image Anda sendiri yang dibangun dengan paket kustom.

Bagaimana cara meneruskan argumen secara seragam ke DAG dan tugasnya?

Anda dapat menggunakan dukungan bawaan Airflow untuk Pembuatan template Jinja guna meneruskan argumen yang dapat digunakan di kolom dengan template.

Kapan penggantian template terjadi?

Penggantian template terjadi pada pekerja Airflow tepat sebelum fungsi pre_execute operator dipanggil. Dalam praktiknya, ini berarti template tidak diganti sampai tepat sebelum tugas berjalan.

Bagaimana cara mengetahui argumen operator mana yang mendukung penggantian template?

Argumen operator yang mendukung penggantian template Jinja2 secara eksplisit ditandai seperti itu.

Cari kolom template_fields di definisi Operator, yang berisi daftar nama argumen yang mengalami penggantian template.

Misalnya, lihat BashOperator, yang mendukung pemberian template untuk argumen bash_command dan env.

Langkah selanjutnya