Menggunakan operator yang dapat ditangguhkan di DAG Airflow

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Halaman ini menjelaskan cara mengaktifkan dukungan untuk Operator yang Dapat Ditangguhkan di lingkungan Anda dan menggunakan operator Google Cloud yang dapat ditangguhkan di DAG Anda.

Tentang Operator yang Dapat Ditangguhkan di Cloud Composer

Jika memiliki setidaknya satu instance pemicu (atau setidaknya dua di lingkungan yang sangat tangguh), Anda dapat menggunakan Operator dan Pemicu yang Dapat Ditunda di DAG Anda.

Untuk operator yang dapat ditangguhkan, Airflow membagi eksekusi tugas menjadi tahap berikut:

  1. Mulai operasi. Pada tahap ini, tugas menempati slot pekerja Airflow. Tugas melakukan operasi yang mendelegasikan tugas ke layanan lain.

    Misalnya, menjalankan tugas BigQuery dapat memerlukan waktu dari beberapa detik hingga beberapa jam. Setelah membuat tugas, operasi meneruskan ID tugas BigQuery ke pemicu Airflow.

  2. Pemicu memantau tugas hingga selesai. Pada tahap ini, slot pekerja tidak digunakan. Pemicu Airflow memiliki arsitektur asinkron dan mampu menangani ratusan tugas tersebut. Saat pemicu mendeteksi bahwa tugas telah selesai, pemicu akan mengirimkan peristiwa yang memicu tahap terakhir.

  3. Pada tahap terakhir, pekerja Airflow menjalankan callback. Callback ini, misalnya, dapat menandai tugas sebagai berhasil, atau menjalankan operasi lain dan menetapkan tugas untuk dipantau oleh pemicu lagi.

Pemicu bersifat stateless dan oleh karena itu tahan terhadap gangguan atau mulai ulang. Oleh karena itu, tugas yang berjalan lama tahan terhadap mulai ulang pod, kecuali jika mulai ulang terjadi selama tahap terakhir, yang diperkirakan akan singkat.

Sebelum memulai

Mengaktifkan dukungan untuk operator yang dapat ditangguhkan

Komponen lingkungan yang disebut pemicu Airflow secara asinkron memantau semua tugas yang ditangguhkan di lingkungan Anda. Setelah operasi yang ditangguhkan dari tugas tersebut selesai, pemicu akan meneruskan tugas ke worker Airflow.

Anda memerlukan setidaknya satu instance pemicu di lingkungan Anda (atau setidaknya dua di lingkungan yang sangat tangguh) untuk menggunakan mode yang dapat ditunda di DAG Anda. Anda dapat mengonfigurasi pemicu saat membuat lingkungan atau menyesuaikan jumlah pemicu dan parameter performa untuk lingkungan yang sudah ada.

OperatorGoogle Cloud yang mendukung mode yang dapat ditangguhkan

Hanya beberapa operator Airflow yang telah diperluas untuk mendukung model yang dapat ditangguhkan. Daftar berikut adalah referensi untuk operator dalam paket apache-airflow-providers-google yang mendukung mode yang dapat ditangguhkan. Kolom dengan versi paket apache-airflow-providers-google minimum yang diperlukan merepresentasikan versi paket paling awal tempat operator tersebut mendukung mode yang dapat ditangguhkan.

Operator BigQuery

Nama operator Versi apache-airflow-providers-google yang diperlukan
BigQueryCheckOperator 8.4.0
BigQueryValueCheckOperator 8.4.0
BigQueryIntervalCheckOperator 8.4.0
BigQueryGetDataOperator 8.4.0
BigQueryInsertJobOperator 8.4.0

Operator BigQuery Data Transfer Service

Nama operator Versi apache-airflow-providers-google yang diperlukan
BigQueryDataTransferServiceStartTransferRunsOperator 8.9.0

Operator batch

Nama operator Versi apache-airflow-providers-google yang diperlukan
CloudBatchSubmitJobOperator 10.7.0

Operator Cloud Build

Nama operator Versi apache-airflow-providers-google yang diperlukan
CloudBuildCreateBuildOperator 8.7.0

Operator Cloud Composer

Nama operator Versi apache-airflow-providers-google yang diperlukan
CloudComposerCreateEnvironmentOperator 6.4.0
CloudComposerDeleteEnvironmentOperator 6.4.0
CloudComposerUpdateEnvironmentOperator 6.4.0
CloudComposerRunAirflowCLICommandOperator 11.0.0

Operator Cloud Run

Nama operator Versi apache-airflow-providers-google yang diperlukan
CloudRunExecuteJobOperator 10.7.0

Operator Cloud SQL

Nama operator Versi apache-airflow-providers-google yang diperlukan
CloudSQLExportInstanceOperator 10.3.0

Operator Storage Transfer Service

Nama operator Versi apache-airflow-providers-google yang diperlukan
CloudDataTransferServiceS3ToGCSOperator 14.0.0
CloudDataTransferServiceGCSToGCSOperator 14.0.0

Operator Dataflow

Nama operator Versi apache-airflow-providers-google yang diperlukan
DataflowTemplatedJobStartOperator 8.9.0
DataflowStartFlexTemplateOperator 8.9.0
DataflowStartYamlJobOperator 11.0.0

Operator Cloud Data Fusion

Nama operator Versi apache-airflow-providers-google yang diperlukan
CloudDataFusionStartPipelineOperator 8.9.0

Operator Dataplex Universal Catalog

Nama operator Versi apache-airflow-providers-google yang diperlukan
DataplexRunDataQualityScanOperator 10.8.0
DataplexGetDataQualityScanResultOperator 10.8.0
DataplexRunDataProfileScanOperator 11.0.0

Operator Google Kubernetes Engine

Nama operator Versi apache-airflow-providers-google yang diperlukan
GKEDeleteClusterOperator 9.0.0
GKECreateClusterOperator 9.0.0
GKEStartPodOperator 12.0.0
GKEStartJobOperator 11.0.0

Operator Pub/Sub

Nama operator Versi apache-airflow-providers-google yang diperlukan
PubSubPullOperator 14.0.0

Operator AI Platform

Nama operator Versi apache-airflow-providers-google yang diperlukan
MLEngineStartTrainingJobOperator 8.9.0

Menggunakan operator yang dapat ditangguhkan di DAG

Konvensi umum untuk semua operator Google Cloud adalah mengaktifkan mode yang dapat ditangguhkan dengan parameter boolean deferrable. Jika operator Google Cloud tidak memiliki parameter ini, maka operator tersebut tidak dapat berjalan dalam mode dapat ditangguhkan. Operator lain dapat memiliki konvensi yang berbeda. Misalnya, beberapa operator komunitas memiliki class terpisah dengan akhiran Async dalam namanya.

DAG contoh berikut menggunakan operator DataprocSubmitJobOperator dalam mode yang dapat ditangguhkan:

PYSPARK_JOB = {
    "reference": { "project_id": "PROJECT_ID" },
    "placement": { "cluster_name": "PYSPARK_CLUSTER_NAME" },
    "pyspark_job": {
        "main_python_file_uri": "gs://dataproc-examples/pyspark/hello-world/hello-world.py"
    },
}

DataprocSubmitJobOperator(
        task_id="dataproc-deferrable-example",
        job=PYSPARK_JOB,
        deferrable=True,
    )

Melihat log pemicu

Pemicu menghasilkan log yang tersedia bersama dengan log komponen lingkungan lainnya. Untuk mengetahui informasi selengkapnya tentang cara melihat log lingkungan, lihat Melihat log.

Pemicu monitor

Untuk mengetahui informasi selengkapnya tentang pemantauan komponen pemicu, lihat Metrik Airflow.

Selain memantau pemicu, Anda dapat memeriksa jumlah tugas yang ditangguhkan dalam metrik Unfinished Task di dasbor Monitoring lingkungan Anda.

Langkah berikutnya