編寫 Airflow DAG

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

本指南說明如何編寫在 Cloud Composer 環境中執行的 Apache Airflow 有向無環圖 (DAG)。

Apache Airflow 不提供嚴密的 DAG 和工作隔離機制,因此建議您將實際工作環境和測試環境分開,以免發生 DAG 干擾情形。詳情請參閱測試 DAG 一文。

建構 Airflow DAG

Airflow DAG 是在 Python 檔案中定義,並且是由下列元件組成:

  • DAG 定義
  • Airflow 運算子
  • 運算子關係

下列程式碼片段會顯示去脈絡化的各元件範例。

DAG 定義

以下範例展示 Airflow DAG 定義:

import datetime

from airflow import models

default_dag_args = {
    # The start_date describes when a DAG is valid / can be run. Set this to a
    # fixed point in time rather than dynamically, since it is evaluated every
    # time a DAG is parsed. See:
    # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
    "start_date": datetime.datetime(2018, 1, 1),
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
    "composer_sample_simple_greeting",
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:

運算子和工作

Airflow 運算子會描述待完成的工作。工作是運算子的特定執行個體。

from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

    def greeting():
        import logging

        logging.info("Hello World!")

    # An instance of an operator is called a task. In this case, the
    # hello_python task calls the "greeting" Python function.
    hello_python = PythonOperator(task_id="hello", python_callable=greeting)

    # Likewise, the goodbye_bash task calls a Bash script.
    goodbye_bash = BashOperator(task_id="bye", bash_command="echo Goodbye.")

工作關係

工作關係:描述工作必須完成的順序。

# Define the order in which the tasks complete by using the >> and <<
# operators. In this example, hello_python executes before goodbye_bash.
hello_python >> goodbye_bash

Python 中的完整 DAG 工作流程範例

下列工作流程是完整有效的 DAG 範本,其中包含 hello_python 工作和 goodbye_bash 工作:


import datetime

from airflow import models

from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator



default_dag_args = {
    # The start_date describes when a DAG is valid / can be run. Set this to a
    # fixed point in time rather than dynamically, since it is evaluated every
    # time a DAG is parsed. See:
    # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
    "start_date": datetime.datetime(2018, 1, 1),
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
    "composer_sample_simple_greeting",
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:
    def greeting():
        import logging

        logging.info("Hello World!")

    # An instance of an operator is called a task. In this case, the
    # hello_python task calls the "greeting" Python function.
    hello_python = PythonOperator(task_id="hello", python_callable=greeting)

    # Likewise, the goodbye_bash task calls a Bash script.
    goodbye_bash = BashOperator(task_id="bye", bash_command="echo Goodbye.")

    # Define the order in which the tasks complete by using the >> and <<
    # operators. In this example, hello_python executes before goodbye_bash.
    hello_python >> goodbye_bash

如要進一步瞭解如何定義 Airflow DAG,請參閱 Airflow 教學課程Airflow 概念

Airflow 運算子

以下是幾個常用 Airflow 運算子的範例。如需 Airflow 運算子的權威性參考資料,請參閱運算子和 Hook 參考資料供應商索引

BashOperator

使用 BashOperator 執行指令列程式。

from airflow.operators import bash

    # Create BigQuery output dataset.
    make_bq_dataset = bash.BashOperator(
        task_id="make_bq_dataset",
        # Executing 'bq' command requires Google Cloud SDK which comes
        # preinstalled in Cloud Composer.
        bash_command=f"bq ls {bq_dataset_name} || bq mk {bq_dataset_name}",
    )

Cloud Composer 會在 Airflow 工作站上執行以 Bash 指令碼提供的指令。工作站是 Debian 式的 Docker 容器,並包含數個套件。

PythonOperator

您可以使用 PythonOperator 執行任意 Python 程式碼。

Cloud Composer 會在容器中執行 Python 程式碼,該容器包含環境中使用的 Cloud Composer 映像檔版本套件。

如要安裝其他 Python 套件,請參閱安裝 Python 依附元件一文。

Google Cloud 運算子

如要執行使用 Google Cloud 產品的工作,請使用Google Cloud Airflow 運算子。舉例來說,BigQuery 運算子會在 BigQuery 中查詢和處理資料。

Google Cloud 和 Google Cloud提供的個別服務還有許多 Airflow 運算子。如需完整清單,請參閱Google Cloud 運算子

from airflow.providers.google.cloud.operators import bigquery
from airflow.providers.google.cloud.transfers import bigquery_to_gcs

    bq_recent_questions_query = bigquery.BigQueryInsertJobOperator(
        task_id="bq_recent_questions_query",
        configuration={
            "query": {
                "query": RECENT_QUESTIONS_QUERY,
                "useLegacySql": False,
                "destinationTable": {
                    "projectId": project_id,
                    "datasetId": bq_dataset_name,
                    "tableId": bq_recent_questions_table_id,
                },
            }
        },
        location=location,
    )

EmailOperator

您可以使用 EmailOperator 從 DAG 傳送電子郵件。如要從 Cloud Composer 環境傳送電子郵件,請將環境設定為使用 SendGrid

from airflow.operators import email

    # Send email confirmation (you will need to set up the email operator
    # See https://cloud.google.com/composer/docs/how-to/managing/creating#notification
    # for more info on configuring the email operator in Cloud Composer)
    email_summary = email.EmailOperator(
        task_id="email_summary",
        to="{{var.value.email}}",
        subject="Sample BigQuery notify data ready",
        html_content="""
        Analyzed Stack Overflow posts data from {min_date} 12AM to {max_date}
        12AM. The most popular question was '{question_title}' with
        {view_count} views. Top 100 questions asked are now available at:
        {export_location}.
        """.format(
            min_date=min_query_date,
            max_date=max_query_date,
            question_title=(
                "{{ ti.xcom_pull(task_ids='bq_read_most_popular', "
                "key='return_value')[0][0] }}"
            ),
            view_count=(
                "{{ ti.xcom_pull(task_ids='bq_read_most_popular', "
                "key='return_value')[0][1] }}"
            ),
            export_location=output_file,
        ),
    )

電信業者故障通知

email_on_failure 設為 True,即可在 DAG 中的運算子失敗時傳送電子郵件通知。如要從 Cloud Composer 環境傳送電子郵件,您必須將環境設定為使用 SendGrid

from airflow import models

default_dag_args = {
    "start_date": yesterday,
    # Email whenever an Operator in the DAG fails.
    "email": "{{var.value.email}}",
    "email_on_failure": True,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "project_id": project_id,
}

with models.DAG(
    "composer_sample_bq_notify",
    schedule_interval=datetime.timedelta(weeks=4),
    default_args=default_dag_args,
) as dag:

DAG 工作流程指南

  • 請將任何自訂 Python 程式庫放到巢狀目錄的 DAG ZIP 封存檔中,而不要放到 DAG 目錄頂層。

    當 Airflow 掃描 dags/ 資料夾時,Airflow 只會檢查 DAG 資料夾頂層和同樣位在頂層 dags/ 資料夾中的 ZIP 封存檔頂層,看看當中的 Python 模組是否包含 DAG。如果 Airflow 碰到的 Python 模組所屬的 ZIP 封存檔不包含 airflowDAG 子字串,Airflow 就會停止處理該 ZIP 封存檔。Airflow 只會傳回到停止處理為止所找到的 DAG。

  • 為了容錯,請勿在相同 Python 模組中定義多個 DAG 物件。

  • 請勿使用 SubDAG。請改為將 DAG 中的工作分組

  • 請將剖析 DAG 時需要的檔案放在 dags/ 資料夾,而非 data/ 資料夾中。

  • 為 DAG 導入單元測試

  • 按照測試 DAG 的操作說明,測試開發或修改的 DAG。

  • 確認開發的 DAG 不會大幅增加 DAG 剖析時間

  • Airflow 工作可能會因多種原因而失敗。為避免整個 DAG 執行作業失敗,建議啟用工作重試功能。將重試次數上限設為 0,表示系統不會重試。

    建議您覆寫 default_task_retries 選項,並為工作重試次數指定 0 以外的值。此外,您可以在工作層級設定 retries 參數

  • 如要在 Airflow 工作中使用 GPU,請根據使用 GPU 的機器節點,建立獨立的 GKE 叢集。使用 GKEStartPodOperator 執行工作。

  • 請避免在叢集的節點集區中執行耗用大量 CPU 和記憶體的作業,因為其他 Airflow 元件 (排程器、工作站、網路伺服器) 也在該處執行。請改用 KubernetesPodOperatorGKEStartPodOperator

  • 將 DAG 部署到環境時,請只上傳解譯和執行 DAG 絕對必要的檔案到 /dags 資料夾。

  • 限制 /dags 資料夾中的 DAG 檔案數量。

    Airflow 會持續剖析 /dags 資料夾中的 DAG。剖析作業會循環處理 DAG 資料夾,而需要載入的檔案數量 (及其依附元件) 會影響 DAG 剖析和工作排程的效能。使用 100 個檔案 (每個檔案有 100 個 DAG) 比使用 10000 個檔案 (每個檔案有 1 個 DAG) 的效率高得多,因此建議進行這類最佳化。這項最佳化作業會在剖析時間與 DAG 撰寫和管理效率之間取得平衡。

    舉例來說,如要部署 10,000 個 DAG 檔案,可以建立 100 個 ZIP 檔案,每個檔案包含 100 個 DAG 檔案。

    除了上述提示外,如果 DAG 檔案超過 10, 000 個,以程式輔助方式產生 DAG 可能是不錯的選擇。舉例來說,您可以實作單一 Python DAG 檔案,產生一定數量的 DAG 物件 (例如 20 個、100 個 DAG 物件)。

  • 請勿使用已淘汰的 Airflow 運算子。請改用最新替代方案

編寫 DAG 的常見問題

如要在多個 DAG 中執行相同或類似的工作,我該如何降低程式碼重複的情形?

建議您定義程式庫和包裝函式,盡量減少程式碼重複的情形。

如何在不同 DAG 檔案之間重複使用程式碼?

請將您的公用程式函式放到本機 Python 程式庫中並匯入函式。您可以在環境的 bucket 中,參照 dags/ 資料夾內任何 DAG 的函式。

如何降低產生不同定義的風險?

舉例來說,假設有兩個團隊要將原始資料匯總成收益指標。這兩個團隊編寫了兩個稍有差異但用途相同的工作。您可以定義程式庫來處理收益資料,這樣 DAG 實作者就必須提供明確的匯總收益定義。

如何設定 DAG 之間的相依性?

這取決於您要如何定義相依性。

如果您有兩個 DAG (DAG A 和 DAG B),並想讓 DAG B 在 DAG A 之後觸發,您可以在 DAG A 的結尾加上 TriggerDagRunOperator

如果 DAG B 只依賴 DAG A 產生的成果 (例如 Pub/Sub 訊息),則或許較適合使用感應器。

如果 DAG B 與 DAG A 緊密整合,您或許能夠將這兩個 DAG 合併為單一 DAG。

如何將專屬執行 ID 傳送至 DAG 及其工作?

舉例來說,假設您要傳送 Dataproc 叢集名稱和檔案路徑。

您可以在 PythonOperator 中傳回 str(uuid.uuid4()),藉此產生隨機專屬 ID。這樣做會將 ID 加到 XComs 中,讓您可在其他運算子中透過範本欄位參照這個 ID。

產生 uuid 之前,請想想看 DagRun 專屬 ID 是否更有幫助。您也可以使用巨集,在 Jinja 替換作業中參照這些 ID。

如何分隔 DAG 中的工作?

每項工作都應為整體作業的一部分,並具有冪等性質,因此請避免將多步驟工作流程封裝在單一工作中,例如在 PythonOperator 中執行的複雜程式。

我該在單一 DAG 中定義多項工作,以匯總來自多個來源的資料嗎?

舉例來說,假設您有多個包含原始資料的資料表,並想為每個資料表建立每日匯總數據。這些工作彼此不相關。您應該為每個資料表分別建立一項工作和 DAG 嗎?還是要建立一個通用的 DAG?

如果您允許每個工作共用相同的 DAG 層級屬性 (例如 schedule_interval),則在單一 DAG 中定義多個工作是合理的做法。否則,為盡量減少程式碼重複,您可以將多個 DAG 放入模組的 globals(),從單一 Python 模組產生這些 DAG。

如何限制 DAG 中並行執行的工作數量?

舉例來說,您想避免超出 API 用量限制/配額,或避免同時執行過多程序。

您可以在 Airflow 網頁介面中定義 Airflow 集區,並將 DAG 中的工作與現有集區建立關聯。

使用運算子的常見問題

我該使用 DockerOperator 嗎?

除非用於在遠端 Docker 安裝 (而非環境的叢集內) 中啟動容器,否則不建議使用 DockerOperator。在 Cloud Composer 環境中,運算子無法存取 Docker 精靈。

請改用 KubernetesPodOperatorGKEStartPodOperator。這些運算子會分別在 Kubernetes 或 GKE 叢集中啟動 Kubernetes Pod。請注意,我們不建議將 Pod 啟動至環境的叢集,因為這可能會導致資源競爭。

我該使用 SubDagOperator 嗎?

我們不建議使用 SubDagOperator

請改用「分組工作」一文建議的替代方案。

我應該只在 PythonOperators 中執行 Python 程式碼,以完全區隔 Python 運算子嗎?

視您的目標而定,您有幾種選項。

如果您只是要維持獨立的 Python 相依性,則可使用 PythonVirtualenvOperator

建議使用KubernetesPodOperator。這個運算子可讓您定義 Kubernetes Pod,並在其他叢集中執行這些 Pod。

如何新增自訂二進位檔或非 PyPI 套件?

您可以安裝私人套件存放區中託管的套件

如何將引數統一傳送至 DAG 及其工作?

您可以使用 Airflow 內建的 Jinja 範本支援功能,傳送可用於範本欄位的引數。

範本替換作業的發生時機為何?

呼叫運算子的 pre_execute 函式之前,系統會在 Airflow 工作站上替換範本。從實際執行的層面來看,這表示系統要到執行工作的前一刻才會替換範本。

如何確認哪些運算子引數支援範本替換作業?

運算子引數如果支援 Jinja2 範本替換作業,就會明確標示。

請查看運算子定義中的 template_fields 欄位,其中包含會經過範本替換作業的引數名稱清單。

舉例來說,請參閱 BashOperator,該運算子支援 bash_commandenv 引數的範本。

已淘汰及移除的 Airflow 運算子

下表列出的 Airflow 運算子已淘汰:

  • 避免在 DAG 中使用這些運算子。請改用提供的最新替代運算子。

  • 如果運算子列為「已移除」,表示該運算子已在 Cloud Composer 3 的其中一個 Airflow 版本中停用。

  • 如果運算子列為「預計移除」,則表示該運算子已淘汰,並將在日後的 Airflow 建構版本中移除。

  • 如果運營商列為「已在最新 Google 供應商中移除」,則表示該運營商已在最新版 apache-airflow-providers-google 套件中移除。同時,Cloud Composer 仍會使用這個套件的版本,其中尚未移除運算子。

已淘汰的運算子 狀態 替換運算子 換貨期限
CreateAutoMLTextTrainingJobOperator 已移除 SupervisedFineTuningTrainOperator composer-3-airflow-2.9.3-build.1
composer-3-airflow-2.9.1-build.8
GKEDeploymentHook 已移除 GKEKubernetesHook 所有版本
GKECustomResourceHook 已移除 GKEKubernetesHook 所有版本
GKEPodHook 已移除 GKEKubernetesHook 所有版本
GKEJobHook 已移除 GKEKubernetesHook 所有版本
GKEPodAsyncHook 已移除 GKEKubernetesAsyncHook 所有版本
SecretsManagerHook 已移除 GoogleCloudSecretManagerHook composer-3-airflow-2.7.3-build.6
BigQueryExecuteQueryOperator 已移除 BigQueryInsertJobOperator 所有版本
BigQueryPatchDatasetOperator 已移除 BigQueryUpdateDatasetOperator 所有版本
DataflowCreateJavaJobOperator 已移除 beam.BeamRunJavaPipelineOperator 所有版本
DataflowCreatePythonJobOperator 已移除 beam.BeamRunPythonPipelineOperator 所有版本
DataprocSubmitPigJobOperator 已移除 DataprocSubmitJobOperator 所有版本
DataprocSubmitHiveJobOperator 已移除 DataprocSubmitJobOperator 所有版本
DataprocSubmitSparkSqlJobOperator 已移除 DataprocSubmitJobOperator 所有版本
DataprocSubmitSparkJobOperator 已移除 DataprocSubmitJobOperator 所有版本
DataprocSubmitHadoopJobOperator 已移除 DataprocSubmitJobOperator 所有版本
DataprocSubmitPySparkJobOperator 已移除 DataprocSubmitJobOperator 所有版本
BigQueryTableExistenceAsyncSensor 已移除 BigQueryTableExistenceSensor 所有版本
BigQueryTableExistencePartitionAsyncSensor 已移除 BigQueryTablePartitionExistenceSensor 所有版本
CloudComposerEnvironmentSensor 已移除 CloudComposerCreateEnvironmentOperator、 CloudComposerDeleteEnvironmentOperator、 CloudComposerUpdateEnvironmentOperator 所有版本
GCSObjectExistenceAsyncSensor 已移除 GCSObjectExistenceSensor 所有版本
GoogleAnalyticsHook 已移除 GoogleAnalyticsAdminHook 所有版本
GoogleAnalyticsListAccountsOperator 已移除 GoogleAnalyticsAdminListAccountsOperator 所有版本
GoogleAnalyticsGetAdsLinkOperator 已移除 GoogleAnalyticsAdminGetGoogleAdsLinkOperator 所有版本
GoogleAnalyticsRetrieveAdsLinksListOperator 已移除 GoogleAnalyticsAdminListGoogleAdsLinksOperator 所有版本
GoogleAnalyticsDataImportUploadOperator 已移除 GoogleAnalyticsAdminCreateDataStreamOperator 所有版本
GoogleAnalyticsDeletePreviousDataUploadsOperator 已移除 GoogleAnalyticsAdminDeleteDataStreamOperator 所有版本
DataPipelineHook 已移除 DataflowHook composer-3-airflow-2.9.1-build.0
composer-3-airflow-2.7.3-build.9
CreateDataPipelineOperator 已移除 DataflowCreatePipelineOperator composer-3-airflow-2.9.1-build.0
composer-3-airflow-2.7.3-build.9
RunDataPipelineOperator 已移除 DataflowRunPipelineOperator composer-3-airflow-2.9.1-build.0
composer-3-airflow-2.7.3-build.9
AutoMLDatasetLink 已淘汰,預計移除 TranslationLegacyDatasetLink composer-3-airflow-2.9.1-build.0
composer-3-airflow-2.7.3-build.9
AutoMLDatasetListLink 已淘汰,預計移除 TranslationDatasetListLink composer-3-airflow-2.9.1-build.0
composer-3-airflow-2.7.3-build.9
AutoMLModelLink 已淘汰,預計移除 TranslationLegacyModelLink composer-3-airflow-2.9.1-build.0
composer-3-airflow-2.7.3-build.9
AutoMLModelTrainLink 已淘汰,預計移除 TranslationLegacyModelTrainLink composer-3-airflow-2.9.1-build.0
composer-3-airflow-2.7.3-build.9
AutoMLModelPredictLink 已淘汰,預計移除 TranslationLegacyModelPredictLink composer-3-airflow-2.9.1-build.0
composer-3-airflow-2.7.3-build.9
AutoMLBatchPredictOperator 已移除 vertex_ai.batch_prediction_job composer-3-airflow-2.9.3-build.4
AutoMLPredictOperator 已淘汰,預計移除 vertex_aigenerative_model。TextGenerationModelPredictOperator, translate.TranslateTextOperator composer-3-airflow-2.7.3-build.6
PromptLanguageModelOperator 已移除 TextGenerationModelPredictOperator composer-3-airflow-2.9.1-build.0
composer-3-airflow-2.7.3-build.9
GenerateTextEmbeddingsOperator 已移除 TextEmbeddingModelGetEmbeddingsOperator composer-3-airflow-2.9.1-build.0
composer-3-airflow-2.7.3-build.9
PromptMultimodalModelOperator 已移除 GenerativeModelGenerateContentOperator composer-3-airflow-2.9.1-build.0
composer-3-airflow-2.7.3-build.9
PromptMultimodalModelWithMediaOperator 已移除 GenerativeModelGenerateContentOperator composer-3-airflow-2.9.1-build.0
composer-3-airflow-2.7.3-build.9
DataflowStartSqlJobOperator 已移除 DataflowStartYamlJobOperator composer-3-airflow-2.9.3-build.1
composer-3-airflow-2.9.1-build.8
LifeSciencesHook 已淘汰,預計移除 Google Cloud Batch 運算子的掛鉤 尚未公布
DataprocScaleClusterOperator 已淘汰,預計移除 DataprocUpdateClusterOperator 尚未公布
MLEngineStartBatchPredictionJobOperator 已淘汰,預計移除 CreateBatchPredictionJobOperator 尚未公布
MLEngineManageModelOperator 已淘汰,預計移除 MLEngineCreateModelOperator、MLEngineGetModelOperator 尚未公布
MLEngineGetModelOperator 已淘汰,預計移除 GetModelOperator 尚未公布
MLEngineDeleteModelOperator 已淘汰,預計移除 DeleteModelOperator 尚未公布
MLEngineManageVersionOperator 已淘汰,預計移除 MLEngineCreateVersion、MLEngineSetDefaultVersion、MLEngineListVersions、 MLEngineDeleteVersion 尚未公布
MLEngineCreateVersionOperator 已淘汰,預計移除 VertexAI 運算子的 parent_model 參數 尚未公布
MLEngineSetDefaultVersionOperator 已淘汰,預計移除 SetDefaultVersionOnModelOperator 尚未公布
MLEngineListVersionsOperator 已淘汰,預計移除 ListModelVersionsOperator 尚未公布
MLEngineDeleteVersionOperator 已淘汰,預計移除 DeleteModelVersionOperator 尚未公布
MLEngineStartTrainingJobOperator 已淘汰,預計移除 CreateCustomPythonPackageTrainingJobOperator 尚未公布
MLEngineTrainingCancelJobOperator 已淘汰,預計移除 CancelCustomTrainingJobOperator 尚未公布
LifeSciencesRunPipelineOperator 已淘汰,預計移除 Google Cloud Batch 運算子 尚未公布
MLEngineCreateModelOperator 已淘汰,預計移除 對應的 Vertex AI 運算子 尚未公布

後續步驟