在 Airflow DAG 中使用可延遲運算子

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

本頁面說明如何在環境中啟用對可延遲運算子的支援,以及如何在 DAG 中使用可延遲運算子。 Google Cloud

關於 Cloud Composer 中的可延遲執行運算子

如果您有至少一個觸發器執行個體 (或至少兩個高復原力環境),即可在 DAG 中使用可延遲運算子和觸發條件

對於可延遲執行的運算子,Airflow 會將工作執行作業分成下列階段:

  1. 啟動作業。在這個階段,工作會佔用 Airflow 工作站的其中一個位置。這項工作會執行作業,將工作委派給其他服務。

    舉例來說,執行 BigQuery 工作可能需要幾秒到幾小時不等。建立工作後,作業會將工作 ID (BigQuery 工作 ID) 傳遞至 Airflow 觸發程序。

  2. 觸發程序會監控工作,直到工作完成為止。在這個階段,工作人員時段不會被占用。Airflow 觸發器採用非同步架構,可處理數百個這類工作。觸發程序偵測到工作完成時,會傳送事件來觸發最後一個階段。

  3. 在最後階段,Airflow 工作站會執行回呼。舉例來說,這個回呼可以將工作標示為成功,或執行其他作業,並將工作設為再次由觸發器監控。

觸發器為無狀態,因此不會因中斷或重新啟動而受到影響。因此,除非重新啟動發生在最後一個階段 (預計時間很短),否則長時間執行的工作不會受到 Pod 重新啟動的影響。

事前準備

啟用可延遲執行的運算子支援功能

環境元件「Airflow 觸發器」會以非同步方式監控環境中所有延後的工作。這類工作中的延遲作業完成後,觸發器會將工作傳遞給 Airflow 工作站。

如要在 DAG 中使用可延遲模式,環境中必須至少有一個觸發器執行個體 (或在高度彈性環境中至少有兩個)。您可以在建立環境時設定觸發器,或調整現有環境的觸發器數量和效能參數

Google Cloud 支援可延遲模式的運算子

只有部分 Airflow 運算子已擴充,可支援可延遲模型。以下清單列出 apache-airflow-providers-google 套件中支援可延遲模式的運算子。最低必要apache-airflow-providers-google套件版本資料欄代表最早的套件版本,該版本支援可延遲模式。

BigQuery 運算子

運算子名稱 必要 apache-airflow-providers-google 版本
BigQueryCheckOperator 8.4.0
BigQueryValueCheckOperator 8.4.0
BigQueryIntervalCheckOperator 8.4.0
BigQueryGetDataOperator 8.4.0
BigQueryInsertJobOperator 8.4.0

BigQuery 資料移轉服務運算子

運算子名稱 必要 apache-airflow-providers-google 版本
BigQueryDataTransferServiceStartTransferRunsOperator 8.9.0

批次運算子

運算子名稱 必要 apache-airflow-providers-google 版本
CloudBatchSubmitJobOperator 10.7.0

Cloud Build 運算子

運算子名稱 必要 apache-airflow-providers-google 版本
CloudBuildCreateBuildOperator 8.7.0

Cloud Composer 運算子

運算子名稱 必要 apache-airflow-providers-google 版本
CloudComposerCreateEnvironmentOperator 6.4.0
CloudComposerDeleteEnvironmentOperator 6.4.0
CloudComposerUpdateEnvironmentOperator 6.4.0
CloudComposerRunAirflowCLICommandOperator 11.0.0

Cloud Run 運算子

運算子名稱 必要 apache-airflow-providers-google 版本
CloudRunExecuteJobOperator 10.7.0

Cloud SQL 運算子

運算子名稱 必要 apache-airflow-providers-google 版本
CloudSQLExportInstanceOperator 10.3.0

Storage 移轉服務運算子

運算子名稱 必要 apache-airflow-providers-google 版本
CloudDataTransferServiceS3ToGCSOperator 14.0.0
CloudDataTransferServiceGCSToGCSOperator 14.0.0

Dataflow 運算子

運算子名稱 必要 apache-airflow-providers-google 版本
DataflowTemplatedJobStartOperator 8.9.0
DataflowStartFlexTemplateOperator 8.9.0
DataflowStartYamlJobOperator 11.0.0

Cloud Data Fusion 運算子

運算子名稱 必要 apache-airflow-providers-google 版本
CloudDataFusionStartPipelineOperator 8.9.0

Dataplex Universal Catalog 運算子

運算子名稱 必要 apache-airflow-providers-google 版本
DataplexRunDataQualityScanOperator 10.8.0
DataplexGetDataQualityScanResultOperator 10.8.0
DataplexRunDataProfileScanOperator 11.0.0

Google Kubernetes Engine 運算子

運算子名稱 必要 apache-airflow-providers-google 版本
GKEDeleteClusterOperator 9.0.0
GKECreateClusterOperator 9.0.0
GKEStartPodOperator 12.0.0
GKEStartJobOperator 11.0.0

Pub/Sub 運算子

運算子名稱 必要 apache-airflow-providers-google 版本
PubSubPullOperator 14.0.0

AI 平台營運人員

運算子名稱 必要 apache-airflow-providers-google 版本
MLEngineStartTrainingJobOperator 8.9.0

在 DAG 中使用可延遲運算子

所有 Google Cloud 運算子的常見慣例,是使用 deferrable 布林參數啟用可延遲模式。如果運算子沒有這個參數,就無法在可延遲模式中執行。 Google Cloud其他運算子可能有不同的慣例。舉例來說,部分社群營運人員的類別名稱會加上 Async 字尾。

下列範例 DAG 會在可延遲模式中使用 DataprocSubmitJobOperator 運算子:

PYSPARK_JOB = {
    "reference": { "project_id": "PROJECT_ID" },
    "placement": { "cluster_name": "PYSPARK_CLUSTER_NAME" },
    "pyspark_job": {
        "main_python_file_uri": "gs://dataproc-examples/pyspark/hello-world/hello-world.py"
    },
}

DataprocSubmitJobOperator(
        task_id="dataproc-deferrable-example",
        job=PYSPARK_JOB,
        deferrable=True,
    )

查看觸發器記錄

觸發器會產生記錄,這些記錄會與其他環境元件的記錄一起提供。如要進一步瞭解如何查看環境記錄,請參閱「查看記錄」。

監控觸發器

如要進一步瞭解如何監控觸發器元件,請參閱「Airflow 指標」。

除了監控觸發者,您也可以在環境的「監控」資訊主頁上,查看「未完成的工作」指標中延後的工作數量。

後續步驟