Menggunakan operator yang dapat ditangguhkan di DAG

Cloud Composer 1 | Cloud Composer 2

Halaman ini menjelaskan cara mengaktifkan dukungan untuk Operator yang Dapat Ditangguhkan di lingkungan Anda dan menggunakan operator Google Cloud yang dapat ditangguhkan di DAG.

Tentang Operator yang Dapat Ditangguhkan di Cloud Composer

Jika Anda memiliki setidaknya satu instance pemicu (atau setidaknya dua instance di lingkungan yang sangat tangguh), Anda dapat menggunakan Operator dan Pemicu yang Dapat Ditangguhkan di DAG.

Untuk operator yang dapat ditangguhkan, Airflow membagi eksekusi tugas ke dalam tahap berikut:

  1. Mulai operasi. Pada tahap ini, tugas menempati slot pekerja Airflow. Tugas tersebut menjalankan operasi yang mendelegasikan tugas ke layanan yang berbeda.

    Misalnya, menjalankan tugas BigQuery dapat memerlukan waktu beberapa detik hingga beberapa jam. Setelah membuat tugas, operasi tersebut akan meneruskan ID pekerjaan (ID tugas BigQuery) ke pemicu Airflow.

  2. Pemicu memantau tugas hingga selesai. Pada tahap ini, slot pekerja tidak terisi. Pemicu Airflow memiliki arsitektur asinkron dan mampu menangani ratusan tugas semacam itu. Saat mendeteksi bahwa tugas selesai, pemicu akan mengirimkan peristiwa yang memicu tahap terakhir.

  3. Pada tahap terakhir, pekerja Airflow akan menjalankan callback. Callback ini, misalnya, dapat menandai tugas sebagai berhasil, atau menjalankan operasi lain dan menetapkan tugas untuk dipantau kembali oleh pemicu.

Pemicu bersifat stateless sehingga tidak akan terganggu atau dimulai ulang. Oleh karena itu, tugas yang berjalan lama tidak akan dapat dimulai ulang oleh pod, kecuali jika mulai ulang terjadi pada tahap terakhir, yang diperkirakan akan berlangsung singkat.

Sebelum memulai

  • Operator dan Sensor yang Dapat Ditangguhkan tersedia di lingkungan Cloud Composer 2 dan memerlukan hal berikut:
    • Cloud Composer 2.0.31 dan versi yang lebih baru
    • Airflow 2.2.5, 2.3.3, dan versi yang lebih baru

Mengaktifkan dukungan untuk operator yang dapat ditangguhkan

Komponen lingkungan bernama Airflow triggerer memantau semua tugas yang ditangguhkan di lingkungan Anda secara asinkron. Setelah operasi yang ditangguhkan dari tugas tersebut selesai, pemicu akan meneruskan tugas ke pekerja Airflow.

Anda memerlukan setidaknya satu instance pemicu di lingkungan Anda (atau setidaknya dua di lingkungan yang sangat tangguh) untuk menggunakan mode yang dapat ditangguhkan di DAG. Anda dapat mengonfigurasi pemicu saat membuat lingkungan atau menyesuaikan jumlah pemicu dan parameter performa untuk lingkungan yang ada.

Operator Google Cloud yang mendukung mode yang dapat ditangguhkan

Hanya beberapa operator Airflow yang telah diperpanjang untuk mendukung model yang dapat ditangguhkan. Daftar berikut adalah referensi untuk operator dalam paket airflow.providers.google.operators.cloud yang mendukung mode yang dapat ditangguhkan. Kolom dengan versi paket airflow.providers.google.operators.cloud minimum yang diperlukan mewakili versi paket paling awal ketika operator tersebut mendukung mode yang dapat ditangguhkan.

Operator Cloud Composer

Nama operatorVersi apache-airflow-providers-google yang diperlukan
CloudComposerCreateEnvironmentOperator 6.4.0
CloudComposerDeleteEnvironmentOperator 6.4.0
CloudComposerUpdateEnvironmentOperator 6.4.0

Operator BigQuery

Nama operatorVersi apache-airflow-providers-google yang diperlukan
BigQueryCheckOperator 8.4.0
BigQueryValueCheckOperator 8.4.0
BigQueryIntervalCheckOperator 8.4.0
BigQueryGetDataOperator 8.4.0
BigQueryInsertJobOperator 8.4.0

Operator BigQuery Data Transfer Service

Nama operatorVersi apache-airflow-providers-google yang diperlukan
BigQueryDataTransferServiceStartTransferRunsOperator 8.9.0

Operator Cloud Build

Nama operatorVersi apache-airflow-providers-google yang diperlukan
CloudBuildCreateBuildOperator 8.7.0

Operator Cloud SQL

Nama operatorVersi apache-airflow-providers-google yang diperlukan
CloudSQLExportInstanceOperator 10.3.0

Operator Dataflow

Nama operatorVersi apache-airflow-providers-google yang diperlukan
DataflowTemplatedJobStartOperator 8.9.0
DataflowStartFlexTemplateOperator 8.9.0

Operator Cloud Data Fusion

Nama operatorVersi apache-airflow-providers-google yang diperlukan
CloudDataFusionStartPipelineOperator 8.9.0

Operator Google Kubernetes Engine

Nama operatorVersi apache-airflow-providers-google yang diperlukan
GKEDeleteClusterOperator 9.0.0
GKECreateClusterOperator 9.0.0

Operator AI Platform

Nama operatorVersi apache-airflow-providers-google yang diperlukan
MLEngineStartTrainingJobOperator 8.9.0

Menggunakan operator yang dapat ditangguhkan di DAG

Konvensi umum untuk semua operator Google Cloud adalah mengaktifkan mode yang dapat ditangguhkan dengan parameter boolean deferrable. Jika operator Google Cloud tidak memiliki parameter ini, maka operator tersebut tidak dapat berjalan dalam mode yang dapat ditangguhkan. Operator lain dapat memiliki konvensi yang berbeda. Misalnya, beberapa operator komunitas memiliki class terpisah dengan akhiran Async dalam namanya.

Contoh DAG berikut menggunakan operator DataprocSubmitJobOperator dalam mode yang dapat ditangguhkan:

PYSPARK_JOB = {
    "reference": { "project_id": "PROJECT_ID" },
    "placement": { "cluster_name": "PYSPARK_CLUSTER_NAME" },
    "pyspark_job": {
        "main_python_file_uri": "gs://dataproc-examples/pyspark/hello-world/hello-world.py"
    },
}

DataprocSubmitJobOperator(
        task_id="dataproc-deferrable-example",
        job=PYSPARK_JOB,
        deferrable=True,
    )

Lihat log pemicu

Pemicu menghasilkan log yang tersedia bersama dengan log komponen lingkungan lainnya. Untuk mengetahui informasi selengkapnya tentang cara melihat log lingkungan, lihat Melihat log.

Pemicu pemantauan

Untuk mengetahui informasi selengkapnya tentang pemantauan komponen pemicu, lihat Metrik Airflow.

Selain memantau pemicu, Anda dapat memeriksa jumlah tugas yang ditangguhkan di metrik Unfinished Task pada dasbor Monitoring di lingkungan Anda.

Langkah selanjutnya