安排和触发 Airflow DAG

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

本页介绍了 Airflow 中调度和 DAG 触发的工作原理、如何为 DAG 定义时间表,以及如何手动触发 DAG 或将其暂停。

Cloud Composer 中的 Airflow DAG 简介

Cloud Composer 中的 Airflow DAG 会在项目中的一个或多个 Cloud Composer 环境中执行。您可以将 Airflow DAG 的源文件上传到与环境关联的 Cloud Storage 存储分区。然后,环境的 Airflow 实例会解析这些文件,并根据每个 DAG 的时间表安排 DAG 运行。在 DAG 运行期间,Airflow 会按照 DAG 定义的顺序调度和执行构成 DAG 的各项任务。

如需详细了解 Airflow 的核心概念(例如 Airflow DAG、DAG 运行作业或运算符),请参阅 Airflow 文档中的核心概念页面。

Airflow 中的 DAG 调度简介

Airflow 为其调度机制提供了以下概念:

逻辑日期

表示执行特定 DAG 运行作业的日期

这不是 Airflow 运行 DAG 的实际日期,而是特定 DAG 运行必须处理的一段时间。例如,对于安排每天 12:00 运行的 DAG,逻辑日期也会是特定日期的 12:00。由于该作业每天运行两次,因此它必须处理的时间段是过去 12 小时。同时,DAG 本身中定义的逻辑可能根本不会使用逻辑日期或时间间隔。例如,DAG 可能会每天运行一次相同的脚本,而不会使用逻辑日期的值。

在低于 2.2 的 Airflow 版本中,此日期称为执行日期

运行日期

表示执行特定 DAG 运行作业的日期。

例如,对于安排每天 12:00 运行的 DAG,该 DAG 的实际执行可能会在逻辑日期过后的一段时间(例如 12:05)进行。

安排的时间间隔

表示 DAG 必须以逻辑日期的形式执行的时间和频率。

例如,每日时间表意味着 DAG 每天运行一次,并且其 DAG 运行的逻辑日期间隔为 24 小时。

开始日期

指定您希望 Airflow 开始安排 DAG 的时间。

DAG 中的任务可以有各自的开始日期,您也可以为所有任务指定同一个开始日期。Airflow 会根据 DAG 中任务的最早开始日期和计划的时间间隔安排 DAG 运行。

赶超、回填和重试

用于执行过去日期的 DAG 运行作业的机制。

追赶作业会执行尚未运行的 DAG 运行作业,例如,如果 DAG 长时间处于暂停状态,然后取消暂停。您可以使用回填功能来执行特定日期范围内的 DAG 运行作业。“重试次数”用于指定 Airflow 在从 DAG 执行任务时必须尝试的次数。

时间安排的工作原理如下:

  1. 开始日期过后,Airflow 会等待下一个时间间隔的出现。

  2. Airflow 会安排第一次 DAG 运行在此计划的时间间隔结束时进行。

    例如,如果某个 DAG 计划每小时运行一次,并且开始日期为今天的 12:00,则第一次 DAG 运行会在今天 13:00 进行。

本文档中的安排 Airflow DAG 的运行时间部分介绍了如何使用这些概念为 DAG 设置运行时间。如需详细了解 DAG 运行和调度,请参阅 Airflow 文档中的 DAG 运行

触发 DAG 的方法简介

Airflow 提供以下触发 DAG 的方法:

  • 按时间表触发。Airflow 会根据 DAG 文件中为其指定的时间表自动触发 DAG。

  • 手动触发。您可以从 Google Cloud 控制台、Airflow 界面手动触发 DAG,也可以从 Google Cloud CLI 运行 Airflow CLI 命令来触发。

  • 触发来响应事件。触发 DAG 来响应事件的标准方法是使用传感器

触发 DAG 的其他方法:

准备工作

  • 请确保您的账号具有可管理环境存储分区中的对象以及查看和触发 DAG 的角色。如需了解详情,请参阅访问权限控制

安排 Airflow DAG

您可以在 DAG 文件中为 DAG 定义时间表。按如下方式修改 DAG 的定义:

  1. 在计算机上找到并修改 DAG 文件。如果您没有 DAG 文件,可以从环境的存储分区下载其副本。对于新 DAG,您可以在创建 DAG 文件时定义所有参数。

  2. schedule_interval 参数中,定义时间表。您可以使用 CRON 表达式(例如 0 0 * * *)或预设(例如 @daily)。如需了解详情,请参阅 Airflow 文档中的 Cron 和时间间隔

    Airflow 会根据您设置的时间表确定 DAG 运行的逻辑日期。

  3. start_date 参数中,定义开始日期。

    Airflow 使用此参数确定第一次 DAG 运行的逻辑日期。

  4. (可选)在 catchup 参数中,定义 Airflow 是否必须执行从开始日期到当前日期之前尚未执行的此 DAG 的所有先前运行作业。

    在赶上时间期间执行的 DAG 运行作业的逻辑日期将设为过去的日期,而其运行日期将反映 DAG 运行作业的实际执行时间。

  5. (可选)在 retries 参数中,定义 Airflow 必须重试失败任务的次数(每个 DAG 由一个或多个单独的任务组成)。默认情况下,Cloud Composer 中的任务会重试两次。

  6. 将新版 DAG 上传到环境的存储分区。

  7. 等待 Airflow 成功解析 DAG。例如,您可以在 Google Cloud 控制台或 Airflow 界面中查看环境中的 DAG 列表

以下示例 DAG 定义每天运行两次,分别在 00:00 和 12:00。其开始日期设为 2024 年 1 月 1 日,但在您上传或暂停该作业后,Airflow 不会针对过去的日期运行该作业,因为“赶上”功能已停用。

DAG 包含一个名为 insert_query_job 的任务,该任务使用 BigQueryInsertJobOperator 运算符将行插入表中。此运算符是 Google Cloud BigQuery 运算符之一,可用于管理数据集和表、运行查询以及验证数据。如果此任务的特定执行失败,Airflow 会再重试四次,并使用默认的重试间隔。这些重试的逻辑日期保持不变。

此行的 SQL 查询使用 Airflow 模板将 DAG 的逻辑日期和名称写入该行。

import datetime

from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator

with DAG(
  "bq_example_scheduling_dag",
  start_date=datetime.datetime(2024, 1, 1),
  schedule_interval='0 */12 * * *',
  catchup=False
  ) as dag:

  insert_query_job = BigQueryInsertJobOperator(
    task_id="insert_query_job",
    retries=4,
    configuration={
        "query": {
            # schema: date (string), description (string)
            # example row: "20240101T120000", "DAG run: <DAG: bq_example_scheduling_dag>"
            "query": "INSERT example_dataset.example_table VALUES ('{{ ts_nodash }}', 'DAG run: {{ dag }}' )",
            "useLegacySql": False,
            "priority": "BATCH",
        }
    },
    location="us-central1"
  )

  insert_query_job

如需测试此 DAG,您可以手动触发该 DAG,然后查看任务执行日志

更多时间安排参数示例

以下时间安排参数示例展示了不同参数组合的时间安排的工作方式:

  • 如果 start_datedatetime(2024, 4, 4, 16, 25)schedule_interval30 16 * * *,则第一次 DAG 运行于 2024 年 4 月 5 日 16:30 进行。

  • 如果 start_datedatetime(2024, 4, 4, 16, 35)schedule_interval30 16 * * *,则第一次 DAG 运行于 2024 年 4 月 6 日 16:30 进行。由于开始日期晚于 2024 年 4 月 4 日的计划时间间隔,因此 2024 年 4 月 5 日不进行 DAG 运行。计划的时间间隔于 2024 年 4 月 5 日 16:35 结束,因此下一次 DAG 运行安排在次日 16:30 进行。

  • 如果 start_datedatetime(2024, 4, 4)schedule_interval@daily,则第一次 DAG 运行安排在 2024 年 4 月 5 日 00:00 进行。

  • 如果 start_datedatetime(2024, 4, 4, 16, 30)schedule_interval0 * * * *,则第一次 DAG 运行安排在 2024 年 4 月 4 日 18:00 进行。在指定日期和时间过后,Airflow 会安排 DAG 运行在每小时的零分进行一次。最接近的时间点为 17:00。此时,Airflow 安排 DAG 运行在计划的时间间隔结束时进行,即 18:00。

手动触发 DAG

当您手动触发 Airflow DAG 时,Airflow 会运行一次 DAG,这与 DAG 文件中指定的时间表无关。

控制台

Cloud Composer 1.17.8 及更高版本支持 DAG 界面。

如需从 Google Cloud 控制台触发 DAG,请执行以下操作:

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 选择环境以查看其详细信息。

  3. 环境详情页面上,前往 DAG 标签页。

  4. 点击 DAG 的名称。

  5. DAG 详情页面上,点击 Trigger DAG(触发 DAG)。系统会创建新的 DAG 运行作业。

Airflow 界面

如需从 Airflow 界面触发 DAG,请执行以下操作:

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. Airflow Web 服务器列中,点击与您的环境对应的 Airflow 链接。

  3. 使用具有相应权限的 Google 账号登录。

  4. 在 Airflow 网页界面中的 DAG 页面上,在您的 DAG 的链接列中,点击触发 Dag (Trigger Dag) 按钮。

  5. (可选)指定 DAG 运行配置。

  6. 点击触发器

gcloud

在 Airflow 1.10.12 或更低版本中,运行 trigger_dag Airflow CLI 命令:

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    trigger_dag -- DAG_ID

在 Airflow 1.10.14 或更高版本(包括 Airflow 2)中,运行 dags trigger Airflow CLI 命令:

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags trigger -- DAG_ID

替换以下内容:

  • ENVIRONMENT_NAME:您的环境的名称。
  • LOCATION:环境所在的区域。
  • DAG_ID:DAG 的名称。

如需详细了解如何在 Cloud Composer 环境中运行 Airflow CLI 命令,请参阅运行 Airflow CLI 命令

如需详细了解可用的 Airflow CLI 命令,请参阅 gcloud composer environments run 命令参考文档

查看 DAG 运行日志和详情

在 Google Cloud 控制台中,您可以执行以下操作:

此外,Cloud Composer 还提供对 Airflow 界面的访问权限,该界面是 Airflow 自己的 Web 界面。

暂停 DAG

控制台

Cloud Composer 1.17.8 及更高版本支持 DAG 界面。

如需从 Google Cloud 控制台中暂停 DAG,请执行以下操作:

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 选择环境以查看其详细信息。

  3. 环境详情页面上,前往 DAG 标签页。

  4. 点击 DAG 的名称。

  5. DAG 详情页面上,点击 Pause DAG(暂停 DAG)。

Airflow 界面

如需从 Airflow 界面暂停 DAG,请执行以下操作:

  1. 在 Google Cloud 控制台中,前往环境页面。

转到“环境”

  1. Airflow Web 服务器列中,点击与您的环境对应的 Airflow 链接。

  2. 使用具有相应权限的 Google 账号登录。

  3. 在 Airflow 网页界面的 DAG 页面上,点击 DAG 名称旁边的切换开关。

gcloud

在 Airflow 1.10.12 或更低版本中,运行 pause Airflow CLI 命令:

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    pause -- DAG_ID

在 Airflow 1.10.14 或更高版本(包括 Airflow 2)中,运行 dags pause Airflow CLI 命令:

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags pause -- DAG_ID

替换以下内容:

  • ENVIRONMENT_NAME:您的环境的名称。
  • LOCATION:环境所在的区域。
  • DAG_ID:DAG 的名称。

如需详细了解如何在 Cloud Composer 环境中运行 Airflow CLI 命令,请参阅运行 Airflow CLI 命令

如需详细了解可用的 Airflow CLI 命令,请参阅 gcloud composer environments run 命令参考文档

后续步骤