Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
本頁面說明如何在環境中啟用對可延遲運算子的支援,以及如何在 DAG 中使用可延遲運算子。 Google Cloud
關於 Cloud Composer 中的可延遲執行運算子
如果您有至少一個觸發器執行個體 (或至少兩個高復原力環境),即可在 DAG 中使用可延遲運算子和觸發條件。
對於可延遲執行的運算子,Airflow 會將工作執行作業分成下列階段:
啟動作業。在這個階段,工作會佔用 Airflow 工作站的其中一個位置。這項工作會執行作業,將工作委派給其他服務。
舉例來說,執行 BigQuery 工作可能需要幾秒到幾小時不等。建立工作後,作業會將工作 ID (BigQuery 工作 ID) 傳遞至 Airflow 觸發程序。
觸發程序會監控工作,直到工作完成為止。在這個階段,工作人員時段不會被占用。Airflow 觸發器採用非同步架構,可處理數百個這類工作。觸發程序偵測到工作完成時,會傳送事件來觸發最後一個階段。
在最後階段,Airflow 工作站會執行回呼。舉例來說,這個回呼可以將工作標示為成功,或執行其他作業,並將工作設為再次由觸發器監控。
觸發器為無狀態,因此不會因中斷或重新啟動而受到影響。因此,除非重新啟動發生在最後一個階段 (預計時間很短),否則長時間執行的工作不會受到 Pod 重新啟動的影響。
事前準備
啟用可延遲執行的運算子支援功能
環境元件「Airflow 觸發器」會以非同步方式監控環境中所有延後的工作。這類工作中的延遲作業完成後,觸發器會將工作傳遞給 Airflow 工作站。
如要在 DAG 中使用可延遲模式,環境中必須至少有一個觸發器執行個體 (或在高度彈性環境中至少有兩個)。您可以在建立環境時設定觸發器,或調整現有環境的觸發器數量和效能參數。
Google Cloud 支援可延遲模式的運算子
只有部分 Airflow 運算子已擴充,可支援可延遲模型。以下清單列出 apache-airflow-providers-google
套件中支援可延遲模式的運算子。最低必要apache-airflow-providers-google
套件版本資料欄代表最早的套件版本,該版本支援可延遲模式。
BigQuery 運算子
運算子名稱 | 必要 apache-airflow-providers-google 版本 |
---|---|
BigQueryCheckOperator | 8.4.0 |
BigQueryValueCheckOperator | 8.4.0 |
BigQueryIntervalCheckOperator | 8.4.0 |
BigQueryGetDataOperator | 8.4.0 |
BigQueryInsertJobOperator | 8.4.0 |
BigQuery 資料移轉服務運算子
運算子名稱 | 必要 apache-airflow-providers-google 版本 |
---|---|
BigQueryDataTransferServiceStartTransferRunsOperator | 8.9.0 |
批次運算子
運算子名稱 | 必要 apache-airflow-providers-google 版本 |
---|---|
CloudBatchSubmitJobOperator | 10.7.0 |
Cloud Build 運算子
運算子名稱 | 必要 apache-airflow-providers-google 版本 |
---|---|
CloudBuildCreateBuildOperator | 8.7.0 |
Cloud Composer 運算子
運算子名稱 | 必要 apache-airflow-providers-google 版本 |
---|---|
CloudComposerCreateEnvironmentOperator | 6.4.0 |
CloudComposerDeleteEnvironmentOperator | 6.4.0 |
CloudComposerUpdateEnvironmentOperator | 6.4.0 |
CloudComposerRunAirflowCLICommandOperator | 11.0.0 |
Cloud Run 運算子
運算子名稱 | 必要 apache-airflow-providers-google 版本 |
---|---|
CloudRunExecuteJobOperator | 10.7.0 |
Cloud SQL 運算子
運算子名稱 | 必要 apache-airflow-providers-google 版本 |
---|---|
CloudSQLExportInstanceOperator | 10.3.0 |
Storage 移轉服務運算子
運算子名稱 | 必要 apache-airflow-providers-google 版本 |
---|---|
CloudDataTransferServiceS3ToGCSOperator | 14.0.0 |
CloudDataTransferServiceGCSToGCSOperator | 14.0.0 |
Dataflow 運算子
運算子名稱 | 必要 apache-airflow-providers-google 版本 |
---|---|
DataflowTemplatedJobStartOperator | 8.9.0 |
DataflowStartFlexTemplateOperator | 8.9.0 |
DataflowStartYamlJobOperator | 11.0.0 |
Cloud Data Fusion 運算子
運算子名稱 | 必要 apache-airflow-providers-google 版本 |
---|---|
CloudDataFusionStartPipelineOperator | 8.9.0 |
Dataplex Universal Catalog 運算子
運算子名稱 | 必要 apache-airflow-providers-google 版本 |
---|---|
DataplexRunDataQualityScanOperator | 10.8.0 |
DataplexGetDataQualityScanResultOperator | 10.8.0 |
DataplexRunDataProfileScanOperator | 11.0.0 |
Dataproc 運算子
運算子名稱 | 必要 apache-airflow-providers-google 版本 |
---|---|
DataprocCreateClusterOperator | 8.9.0 |
DataprocDeleteClusterOperator | 8.9.0 |
DataprocJobBaseOperator | 8.4.0 |
DataprocInstantiateWorkflowTemplateOperator | 9.0.0 |
DataprocInstantiateInlineWorkflowTemplateOperator | 10.1.0 |
DataprocSubmitJobOperator | 8.4.0 |
DataprocUpdateClusterOperator | 8.9.0 |
DataprocDiagnoseClusterOperator | 11.0.0 |
DataprocCreateBatchOperator | 8.9.0 |
Google Kubernetes Engine 運算子
運算子名稱 | 必要 apache-airflow-providers-google 版本 |
---|---|
GKEDeleteClusterOperator | 9.0.0 |
GKECreateClusterOperator | 9.0.0 |
GKEStartPodOperator | 12.0.0 |
GKEStartJobOperator | 11.0.0 |
Pub/Sub 運算子
運算子名稱 | 必要 apache-airflow-providers-google 版本 |
---|---|
PubSubPullOperator | 14.0.0 |
AI 平台營運人員
運算子名稱 | 必要 apache-airflow-providers-google 版本 |
---|---|
MLEngineStartTrainingJobOperator | 8.9.0 |
在 DAG 中使用可延遲運算子
所有 Google Cloud 運算子的常見慣例,是使用 deferrable
布林參數啟用可延遲模式。如果運算子沒有這個參數,就無法在可延遲模式中執行。 Google Cloud其他運算子可能有不同的慣例。舉例來說,部分社群營運人員的類別名稱會加上 Async
字尾。
下列範例 DAG 會在可延遲模式中使用 DataprocSubmitJobOperator
運算子:
PYSPARK_JOB = {
"reference": { "project_id": "PROJECT_ID" },
"placement": { "cluster_name": "PYSPARK_CLUSTER_NAME" },
"pyspark_job": {
"main_python_file_uri": "gs://dataproc-examples/pyspark/hello-world/hello-world.py"
},
}
DataprocSubmitJobOperator(
task_id="dataproc-deferrable-example",
job=PYSPARK_JOB,
deferrable=True,
)
查看觸發器記錄
觸發器會產生記錄,這些記錄會與其他環境元件的記錄一起提供。如要進一步瞭解如何查看環境記錄,請參閱「查看記錄」。
監控觸發器
如要進一步瞭解如何監控觸發器元件,請參閱「Airflow 指標」。
除了監控觸發者,您也可以在環境的「監控」資訊主頁上,查看「未完成的工作」指標中延後的工作數量。