在 DAG 中使用可延期运算符

Cloud Composer 1 |Cloud Composer 2 |Cloud Composer 3

本页介绍如何在 Google Cloud 控制台中 环境,并在 DAG 中使用可延期的 Google Cloud 运算符。

Cloud Composer 中的可延期运算符简介

如果您至少有一个触发器实例(或高度 弹性环境),您可以使用 可延期运算符和触发器 创建 Deployment

对于可延期运算符,Airflow 将任务执行拆分为以下阶段:

  1. 开始操作。在此阶段,任务会占用一个 Airflow 工作器 。该任务会执行一项操作,将作业委托给 不同的服务

    例如,运行一个 BigQuery 作业可能需要 从几秒到几小时创建作业后,操作 将工作标识符(BigQuery 作业 ID)传递给 Airflow 触发器。

  2. 触发器会监控作业,直到完成为止。在此阶段, 未占用工作器槽位。Airflow 触发器具有异步 而且能够处理数百个这样的作业当 触发器检测到作业已完成,就会发送一个事件, 最后一个阶段

  3. 在最后一个阶段,Airflow 工作器执行回调。此回调用于 例如,可以将任务标记为成功, 再次设置触发器监控的作业。

触发器是无状态的,因此可以应对中断或 重启、.因此,长时间运行的作业可以灵活应对 Pod 重启, 除非重启发生在最后一个阶段(预计较短)。

准备工作

  • Cloud Composer 2 中提供了可延期运算符和传感器 并要求满足以下要求: <ph type="x-smartling-placeholder">
      </ph>
    • Cloud Composer 2.0.31 及更高版本
    • Airflow 2.2.5、2.3.3 及更高版本

启用对可延期运算符的支持

名为 Airflow 触发器的环境组件会异步监控 延迟任务。此类任务执行延迟操作后 时,触发器会将任务传递给 Airflow 工作器。

您的环境中需要至少一个触发器实例(或至少两个) (例如在高弹性环境中)使用可延期模式。 你可以配置触发器 创建环境时 调整现有环境的触发器数量和性能参数

支持可延期模式的 Google Cloud 运算符

只有部分 Airflow 运算符进行了扩展,以支持可延期模型。 以下列表是 airflow.providers.google.operators.cloud 软件包 支持可延期模式的应用 包含所需最低 airflow.providers.google.operators.cloud 软件包版本的列表示相应运营商支持可延期模式的最早软件包版本。

Cloud Composer 运算符

运营商名称所需的 apache-airflow-providers-google 版本
CloudComposerCreateEnvironmentOperator 6.4.0
CloudComposerDeleteEnvironmentOperator 6.4.0
CloudComposerUpdateEnvironmentOperator 6.4.0

BigQuery 运算符

运营商名称所需的 apache-airflow-providers-google 版本
BigQueryCheckOperator 8.4.0
BigQueryValueCheckOperator 8.4.0
BigQueryIntervalCheckOperator 8.4.0
BigQueryGetDataOperator 8.4.0
BigQueryInsertJobOperator 8.4.0

BigQuery Data Transfer Service 运算符

运营商名称所需的 apache-airflow-providers-google 版本
BigQueryDataTransferServiceStartTransferRunsOperator 8.9.0

Cloud Build 运算符

运营商名称所需的 apache-airflow-providers-google 版本
CloudBuildCreateBuildOperator 8.7.0

Cloud SQL 运算符

运营商名称所需的 apache-airflow-providers-google 版本
CloudSQLExportInstanceOperator 10.3.0

Dataflow 运算符

运营商名称所需的 apache-airflow-providers-google 版本
DataflowTemplatedJobStartOperator 8.9.0
DataflowStartFlexTemplateOperator 8.9.0

Cloud Data Fusion 运算符

运营商名称所需的 apache-airflow-providers-google 版本
CloudDataFusionStartPipelineOperator 8.9.0

Google Kubernetes Engine 操作器

运营商名称所需的 apache-airflow-providers-google 版本
GKEDeleteClusterOperator 9.0.0
GKECreateClusterOperator 9.0.0

AI Platform 运算符

运营商名称所需的 apache-airflow-providers-google 版本
MLEngineStartTrainingJobOperator 8.9.0

在 DAG 中使用可延期运算符

所有 Google Cloud 运维人员的共同惯例是启用 使用 deferrable 布尔值参数指定可延期模式。如果某个 Google Cloud 运算符没有此参数,则它无法在可延期 模式。其他运算符可能有不同的惯例。例如,一些 社区经营者使用单独的类,以 Async 后缀 名称。

以下示例 DAG 在DataprocSubmitJobOperator 可延期模式:

PYSPARK_JOB = {
    "reference": { "project_id": "PROJECT_ID" },
    "placement": { "cluster_name": "PYSPARK_CLUSTER_NAME" },
    "pyspark_job": {
        "main_python_file_uri": "gs://dataproc-examples/pyspark/hello-world/hello-world.py"
    },
}

DataprocSubmitJobOperator(
        task_id="dataproc-deferrable-example",
        job=PYSPARK_JOB,
        deferrable=True,
    )

查看触发器日志

触发器生成的日志会与其他 环境组件。详细了解如何查看您的环境 请参阅查看日志

监控触发器

如需详细了解如何监控触发器组件,请参阅 Airflow 指标

除了监控触发器之外,您还可以查看 您的 Monitoring 信息中心的未完成任务指标中显示的任务 环境

后续步骤