Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
本指南說明如何編寫在 Cloud Composer 環境中執行的 Apache Airflow 有向無環圖 (DAG)。
Apache Airflow 不提供嚴密的 DAG 和工作隔離機制,因此建議您將實際工作環境和測試環境分開,以免發生 DAG 干擾情形。詳情請參閱測試 DAG 一文。
建構 Airflow DAG
Airflow DAG 是在 Python 檔案中定義,並且是由下列元件組成:
- DAG 定義
- Airflow 運算子
- 運算子關係
下列程式碼片段會顯示去脈絡化的各元件範例。
DAG 定義
以下範例展示 Airflow DAG 定義:
運算子和工作
Airflow 運算子會描述待完成的工作。工作是運算子的特定執行個體。
工作關係
工作關係:描述工作必須完成的順序。
Python 中的完整 DAG 工作流程範例
下列工作流程是完整有效的 DAG 範本,其中包含 hello_python
工作和 goodbye_bash
工作:
如要進一步瞭解如何定義 Airflow DAG,請參閱 Airflow 教學課程和 Airflow 概念。
Airflow 運算子
以下是幾個常用 Airflow 運算子的範例。如需 Airflow 運算子的權威性參考資料,請參閱運算子和 Hook 參考資料和供應商索引。
BashOperator
使用 BashOperator 執行指令列程式。
Cloud Composer 會在 Airflow 工作站上執行以 Bash 指令碼提供的指令。工作站是 Debian 式的 Docker 容器,並包含數個套件。
gcloud
指令,包括用於處理 Cloud Storage bucket 的gcloud storage
子指令。bq
指令kubectl
指令
PythonOperator
您可以使用 PythonOperator 執行任意 Python 程式碼。
Cloud Composer 會在容器中執行 Python 程式碼,該容器包含環境中使用的 Cloud Composer 映像檔版本套件。
如要安裝其他 Python 套件,請參閱安裝 Python 依附元件一文。
Google Cloud 運算子
如要執行使用 Google Cloud 產品的工作,請使用Google Cloud Airflow 運算子。舉例來說,BigQuery 運算子會在 BigQuery 中查詢和處理資料。
Google Cloud 和 Google Cloud提供的個別服務還有許多 Airflow 運算子。如需完整清單,請參閱Google Cloud 運算子。
EmailOperator
您可以使用 EmailOperator 從 DAG 傳送電子郵件。如要從 Cloud Composer 環境傳送電子郵件,請將環境設定為使用 SendGrid。
電信業者故障通知
將 email_on_failure
設為 True
,即可在 DAG 中的運算子失敗時傳送電子郵件通知。如要從 Cloud Composer 環境傳送電子郵件,您必須將環境設定為使用 SendGrid。
DAG 工作流程指南
請將任何自訂 Python 程式庫放到巢狀目錄的 DAG ZIP 封存檔中,而不要放到 DAG 目錄頂層。
當 Airflow 掃描
dags/
資料夾時,Airflow 只會檢查 DAG 資料夾頂層和同樣位在頂層dags/
資料夾中的 ZIP 封存檔頂層,看看當中的 Python 模組是否包含 DAG。如果 Airflow 碰到的 Python 模組所屬的 ZIP 封存檔不包含airflow
和DAG
子字串,Airflow 就會停止處理該 ZIP 封存檔。Airflow 只會傳回到停止處理為止所找到的 DAG。為了容錯,請勿在相同 Python 模組中定義多個 DAG 物件。
請勿使用 SubDAG。請改為將 DAG 中的工作分組。
請將剖析 DAG 時需要的檔案放在
dags/
資料夾,而非data/
資料夾中。按照測試 DAG 的操作說明,測試開發或修改的 DAG。
確認開發的 DAG 不會大幅增加 DAG 剖析時間。
Airflow 工作可能會因多種原因而失敗。為避免整個 DAG 執行作業失敗,建議啟用工作重試功能。將重試次數上限設為
0
,表示系統不會重試。建議您覆寫
default_task_retries
選項,並為工作重試次數指定0
以外的值。此外,您可以在工作層級設定retries
參數。如要在 Airflow 工作中使用 GPU,請根據使用 GPU 的機器節點,建立獨立的 GKE 叢集。使用 GKEStartPodOperator 執行工作。
請避免在叢集的節點集區中執行耗用大量 CPU 和記憶體的作業,因為其他 Airflow 元件 (排程器、工作站、網路伺服器) 也在該處執行。請改用 KubernetesPodOperator 或 GKEStartPodOperator。
將 DAG 部署到環境時,請只上傳解譯和執行 DAG 絕對必要的檔案到
/dags
資料夾。限制
/dags
資料夾中的 DAG 檔案數量。Airflow 會持續剖析
/dags
資料夾中的 DAG。剖析作業會循環處理 DAG 資料夾,而需要載入的檔案數量 (及其依附元件) 會影響 DAG 剖析和工作排程的效能。使用 100 個檔案 (每個檔案有 100 個 DAG) 比使用 10000 個檔案 (每個檔案有 1 個 DAG) 的效率高得多,因此建議進行這類最佳化。這項最佳化作業會在剖析時間與 DAG 撰寫和管理效率之間取得平衡。舉例來說,如要部署 10,000 個 DAG 檔案,可以建立 100 個 ZIP 檔案,每個檔案包含 100 個 DAG 檔案。
除了上述提示外,如果 DAG 檔案超過 10, 000 個,以程式輔助方式產生 DAG 可能是不錯的選擇。舉例來說,您可以實作單一 Python DAG 檔案,產生一定數量的 DAG 物件 (例如 20 個、100 個 DAG 物件)。
請勿使用已淘汰的 Airflow 運算子。請改用最新替代方案。
編寫 DAG 的常見問題
如要在多個 DAG 中執行相同或類似的工作,我該如何降低程式碼重複的情形?
建議您定義程式庫和包裝函式,盡量減少程式碼重複的情形。
如何在不同 DAG 檔案之間重複使用程式碼?
請將您的公用程式函式放到本機 Python 程式庫中並匯入函式。您可以在環境的 bucket 中,參照 dags/
資料夾內任何 DAG 的函式。
如何降低產生不同定義的風險?
舉例來說,假設有兩個團隊要將原始資料匯總成收益指標。這兩個團隊編寫了兩個稍有差異但用途相同的工作。您可以定義程式庫來處理收益資料,這樣 DAG 實作者就必須提供明確的匯總收益定義。
如何設定 DAG 之間的相依性?
這取決於您要如何定義相依性。
如果您有兩個 DAG (DAG A 和 DAG B),並想讓 DAG B 在 DAG A 之後觸發,您可以在 DAG A 的結尾加上 TriggerDagRunOperator
。
如果 DAG B 只依賴 DAG A 產生的成果 (例如 Pub/Sub 訊息),則或許較適合使用感應器。
如果 DAG B 與 DAG A 緊密整合,您或許能夠將這兩個 DAG 合併為單一 DAG。
如何將專屬執行 ID 傳送至 DAG 及其工作?
舉例來說,假設您要傳送 Dataproc 叢集名稱和檔案路徑。
您可以在 PythonOperator
中傳回 str(uuid.uuid4())
,藉此產生隨機專屬 ID。這樣做會將 ID 加到
XComs
中,讓您可在其他運算子中透過範本欄位參照這個 ID。
產生 uuid
之前,請想想看 DagRun 專屬 ID 是否更有幫助。您也可以使用巨集,在 Jinja 替換作業中參照這些 ID。
如何分隔 DAG 中的工作?
每項工作都應為整體作業的一部分,並具有冪等性質,因此請避免將多步驟工作流程封裝在單一工作中,例如在 PythonOperator
中執行的複雜程式。
我該在單一 DAG 中定義多項工作,以匯總來自多個來源的資料嗎?
舉例來說,假設您有多個包含原始資料的資料表,並想為每個資料表建立每日匯總數據。這些工作彼此不相關。您應該為每個資料表分別建立一項工作和 DAG 嗎?還是要建立一個通用的 DAG?
如果您允許每個工作共用相同的 DAG 層級屬性 (例如 schedule_interval
),則在單一 DAG 中定義多個工作是合理的做法。否則,為盡量減少程式碼重複,您可以將多個 DAG 放入模組的 globals()
,從單一 Python 模組產生這些 DAG。
如何限制 DAG 中並行執行的工作數量?
舉例來說,您想避免超出 API 用量限制/配額,或避免同時執行過多程序。
您可以在 Airflow 網頁介面中定義 Airflow 集區,並將 DAG 中的工作與現有集區建立關聯。
使用運算子的常見問題
我該使用 DockerOperator
嗎?
除非用於在遠端 Docker 安裝 (而非環境的叢集內) 中啟動容器,否則不建議使用 DockerOperator
。在 Cloud Composer 環境中,運算子無法存取 Docker 精靈。
請改用 KubernetesPodOperator
或 GKEStartPodOperator
。這些運算子會分別在 Kubernetes 或 GKE 叢集中啟動 Kubernetes Pod。請注意,我們不建議將 Pod 啟動至環境的叢集,因為這可能會導致資源競爭。
我該使用 SubDagOperator
嗎?
我們不建議使用 SubDagOperator
。
請改用「分組工作」一文建議的替代方案。
我應該只在 PythonOperators
中執行 Python 程式碼,以完全區隔 Python 運算子嗎?
視您的目標而定,您有幾種選項。
如果您只是要維持獨立的 Python 相依性,則可使用 PythonVirtualenvOperator
。
建議使用KubernetesPodOperator
。這個運算子可讓您定義 Kubernetes Pod,並在其他叢集中執行這些 Pod。
如何新增自訂二進位檔或非 PyPI 套件?
您可以安裝私人套件存放區中託管的套件。
如何將引數統一傳送至 DAG 及其工作?
您可以使用 Airflow 內建的 Jinja 範本支援功能,傳送可用於範本欄位的引數。
範本替換作業的發生時機為何?
呼叫運算子的 pre_execute
函式之前,系統會在 Airflow 工作站上替換範本。從實際執行的層面來看,這表示系統要到執行工作的前一刻才會替換範本。
如何確認哪些運算子引數支援範本替換作業?
運算子引數如果支援 Jinja2 範本替換作業,就會明確標示。
請查看運算子定義中的 template_fields
欄位,其中包含會經過範本替換作業的引數名稱清單。
舉例來說,請參閱 BashOperator
,該運算子支援 bash_command
和 env
引數的範本。
已淘汰及移除的 Airflow 運算子
下表列出的 Airflow 運算子已淘汰:
請避免在 DAG 中使用這些運算子。請改用提供的最新替代運算子。
如果運算子列為「已移除」,表示該運算子已在 Cloud Composer 3 的其中一個 Airflow 版本中停用。
如果運算子列為「預計移除」,則表示該運算子已淘汰,並將在日後的 Airflow 建構版本中移除。
如果運營商列為「已在最新 Google 供應商中移除」,則表示該運營商已在最新版
apache-airflow-providers-google
套件中移除。同時,Cloud Composer 仍會使用這個套件的版本,其中尚未移除運算子。
已淘汰的運算子 | 狀態 | 替換運算子 | 換貨期限 |
---|---|---|---|
CreateAutoMLTextTrainingJobOperator | 已移除 | SupervisedFineTuningTrainOperator |
composer-3-airflow-2.9.3-build.1 composer-3-airflow-2.9.1-build.8 |
GKEDeploymentHook | 已移除 | GKEKubernetesHook |
所有版本 |
GKECustomResourceHook | 已移除 | GKEKubernetesHook |
所有版本 |
GKEPodHook | 已移除 | GKEKubernetesHook |
所有版本 |
GKEJobHook | 已移除 | GKEKubernetesHook |
所有版本 |
GKEPodAsyncHook | 已移除 | GKEKubernetesAsyncHook |
所有版本 |
SecretsManagerHook | 已移除 | GoogleCloudSecretManagerHook |
composer-3-airflow-2.7.3-build.6 |
BigQueryExecuteQueryOperator | 已移除 | BigQueryInsertJobOperator |
所有版本 |
BigQueryPatchDatasetOperator | 已移除 | BigQueryUpdateDatasetOperator |
所有版本 |
DataflowCreateJavaJobOperator | 已移除 | beam.BeamRunJavaPipelineOperator |
所有版本 |
DataflowCreatePythonJobOperator | 已移除 | beam.BeamRunPythonPipelineOperator |
所有版本 |
DataprocSubmitPigJobOperator | 已移除 | DataprocSubmitJobOperator |
所有版本 |
DataprocSubmitHiveJobOperator | 已移除 | DataprocSubmitJobOperator |
所有版本 |
DataprocSubmitSparkSqlJobOperator | 已移除 | DataprocSubmitJobOperator |
所有版本 |
DataprocSubmitSparkJobOperator | 已移除 | DataprocSubmitJobOperator |
所有版本 |
DataprocSubmitHadoopJobOperator | 已移除 | DataprocSubmitJobOperator |
所有版本 |
DataprocSubmitPySparkJobOperator | 已移除 | DataprocSubmitJobOperator |
所有版本 |
BigQueryTableExistenceAsyncSensor | 已移除 | BigQueryTableExistenceSensor |
所有版本 |
BigQueryTableExistencePartitionAsyncSensor | 已移除 | BigQueryTablePartitionExistenceSensor |
所有版本 |
CloudComposerEnvironmentSensor | 已移除 | CloudComposerCreateEnvironmentOperator、 CloudComposerDeleteEnvironmentOperator、 CloudComposerUpdateEnvironmentOperator |
所有版本 |
GCSObjectExistenceAsyncSensor | 已移除 | GCSObjectExistenceSensor |
所有版本 |
GoogleAnalyticsHook | 已移除 | GoogleAnalyticsAdminHook |
所有版本 |
GoogleAnalyticsListAccountsOperator | 已移除 | GoogleAnalyticsAdminListAccountsOperator |
所有版本 |
GoogleAnalyticsGetAdsLinkOperator | 已移除 | GoogleAnalyticsAdminGetGoogleAdsLinkOperator |
所有版本 |
GoogleAnalyticsRetrieveAdsLinksListOperator | 已移除 | GoogleAnalyticsAdminListGoogleAdsLinksOperator |
所有版本 |
GoogleAnalyticsDataImportUploadOperator | 已移除 | GoogleAnalyticsAdminCreateDataStreamOperator |
所有版本 |
GoogleAnalyticsDeletePreviousDataUploadsOperator | 已移除 | GoogleAnalyticsAdminDeleteDataStreamOperator |
所有版本 |
DataPipelineHook | 已移除 | DataflowHook |
composer-3-airflow-2.9.1-build.0 composer-3-airflow-2.7.3-build.9 |
CreateDataPipelineOperator | 已移除 | DataflowCreatePipelineOperator |
composer-3-airflow-2.9.1-build.0 composer-3-airflow-2.7.3-build.9 |
RunDataPipelineOperator | 已移除 | DataflowRunPipelineOperator |
composer-3-airflow-2.9.1-build.0 composer-3-airflow-2.7.3-build.9 |
AutoMLDatasetLink | 已淘汰,預計移除 | TranslationLegacyDatasetLink |
composer-3-airflow-2.9.1-build.0 composer-3-airflow-2.7.3-build.9 |
AutoMLDatasetListLink | 已淘汰,預計移除 | TranslationDatasetListLink |
composer-3-airflow-2.9.1-build.0 composer-3-airflow-2.7.3-build.9 |
AutoMLModelLink | 已淘汰,預計移除 | TranslationLegacyModelLink |
composer-3-airflow-2.9.1-build.0 composer-3-airflow-2.7.3-build.9 |
AutoMLModelTrainLink | 已淘汰,預計移除 | TranslationLegacyModelTrainLink |
composer-3-airflow-2.9.1-build.0 composer-3-airflow-2.7.3-build.9 |
AutoMLModelPredictLink | 已淘汰,預計移除 | TranslationLegacyModelPredictLink |
composer-3-airflow-2.9.1-build.0 composer-3-airflow-2.7.3-build.9 |
AutoMLBatchPredictOperator | 已移除 | vertex_ai.batch_prediction_job |
composer-3-airflow-2.9.3-build.4 |
AutoMLPredictOperator | 已淘汰,預計移除 | vertex_aigenerative_model。TextGenerationModelPredictOperator, translate.TranslateTextOperator |
composer-3-airflow-2.7.3-build.6 |
PromptLanguageModelOperator | 已移除 | TextGenerationModelPredictOperator |
composer-3-airflow-2.9.1-build.0 composer-3-airflow-2.7.3-build.9 |
GenerateTextEmbeddingsOperator | 已移除 | TextEmbeddingModelGetEmbeddingsOperator |
composer-3-airflow-2.9.1-build.0 composer-3-airflow-2.7.3-build.9 |
PromptMultimodalModelOperator | 已移除 | GenerativeModelGenerateContentOperator |
composer-3-airflow-2.9.1-build.0 composer-3-airflow-2.7.3-build.9 |
PromptMultimodalModelWithMediaOperator | 已移除 | GenerativeModelGenerateContentOperator |
composer-3-airflow-2.9.1-build.0 composer-3-airflow-2.7.3-build.9 |
DataflowStartSqlJobOperator | 已移除 | DataflowStartYamlJobOperator |
composer-3-airflow-2.9.3-build.1 composer-3-airflow-2.9.1-build.8 |
LifeSciencesHook | 已淘汰,預計移除 | Google Cloud Batch 運算子的掛鉤 |
尚未公布 |
DataprocScaleClusterOperator | 已淘汰,預計移除 | DataprocUpdateClusterOperator |
尚未公布 |
MLEngineStartBatchPredictionJobOperator | 已淘汰,預計移除 | CreateBatchPredictionJobOperator |
尚未公布 |
MLEngineManageModelOperator | 已淘汰,預計移除 | MLEngineCreateModelOperator、MLEngineGetModelOperator |
尚未公布 |
MLEngineGetModelOperator | 已淘汰,預計移除 | GetModelOperator |
尚未公布 |
MLEngineDeleteModelOperator | 已淘汰,預計移除 | DeleteModelOperator |
尚未公布 |
MLEngineManageVersionOperator | 已淘汰,預計移除 | MLEngineCreateVersion、MLEngineSetDefaultVersion、MLEngineListVersions、 MLEngineDeleteVersion |
尚未公布 |
MLEngineCreateVersionOperator | 已淘汰,預計移除 | VertexAI 運算子的 parent_model 參數 |
尚未公布 |
MLEngineSetDefaultVersionOperator | 已淘汰,預計移除 | SetDefaultVersionOnModelOperator |
尚未公布 |
MLEngineListVersionsOperator | 已淘汰,預計移除 | ListModelVersionsOperator |
尚未公布 |
MLEngineDeleteVersionOperator | 已淘汰,預計移除 | DeleteModelVersionOperator |
尚未公布 |
MLEngineStartTrainingJobOperator | 已淘汰,預計移除 | CreateCustomPythonPackageTrainingJobOperator |
尚未公布 |
MLEngineTrainingCancelJobOperator | 已淘汰,預計移除 | CancelCustomTrainingJobOperator |
尚未公布 |
LifeSciencesRunPipelineOperator | 已淘汰,預計移除 | Google Cloud Batch 運算子 |
尚未公布 |
MLEngineCreateModelOperator | 已淘汰,預計移除 | 對應的 Vertex AI 運算子 |
尚未公布 |