安排和触发 Airflow DAG

Cloud Composer 1 |Cloud Composer 2 |Cloud Composer 3

本页介绍了 Airflow 中调度和 DAG 触发的工作原理、如何为 DAG 定义时间表,以及如何手动触发 DAG 或将其暂停。

Cloud Composer 中的 Airflow DAG 简介

Cloud Composer 中的 Airflow DAG 在 一个或多个 Cloud Composer 环境 项目。您可以将 Airflow DAG 的源文件上传到 与环境关联的 Cloud Storage 存储桶。然后,环境的 Airflow 实例会解析这些文件,并根据每个 DAG 的时间表安排 DAG 运行。在 DAG 运行期间,Airflow 会安排和执行 按 DAG 定义的序列组成 DAG 的各项任务。

如需详细了解 Airflow 的核心概念(例如 Airflow DAG、DAG 运行作业或运算符),请参阅 Airflow 文档中的核心概念页面。

Airflow 中的 DAG 调度简介

Airflow 为其调度机制提供了以下概念:

逻辑日期

表示执行特定 DAG 运行作业的日期

这不是 Airflow 运行 DAG 的实际日期,而是一段时间段 特定 DAG 运行必须处理的内容。例如,对于安排每天 12:00 运行的 DAG,逻辑日期也会是特定日期的 12:00。由于该作业每天运行两次,因此它必须处理的时间段是过去 12 小时。同时,DAG 本身中定义的逻辑可能根本不会使用逻辑日期或时间间隔。例如,DAG 可能每天运行一次同一脚本,而不使用 逻辑日期的值。

在 Airflow 2.2 之前的版本中,此日期称为执行日期。

运行日期

表示执行特定 DAG 运行的时间的日期。

例如,对于安排每天 12:00 运行的 DAG,该 DAG 的实际执行可能会在逻辑日期过后的一段时间(例如 12:05)进行。

安排的时间间隔

表示 DAG 必须以逻辑日期的形式执行的时间和频率。

例如,每日时间表意味着 DAG 每天执行一次, 其 DAG 运行的逻辑日期间隔为 24 小时。

开始日期

指定您希望 Airflow 开始安排 DAG 的时间。

DAG 中的任务可以有各自的开始日期,您也可以为所有任务指定同一个开始日期。基于任务的最短开始日期 在 DAG 中,Airflow 按计划间隔安排 DAG 运行。

补余、回填和重试

用于执行过去日期的 DAG 运行的机制。

赶超执行尚未运行的 DAG 运行,例如, 如果 DAG 长时间暂停然后取消暂停,则会发生该错误。您可以使用回填功能来执行特定日期范围内的 DAG 运行作业。重试次数 指定 Airflow 在执行 DAG 中的任务时必须进行多少次尝试。

时间安排的工作原理如下:

  1. 开始日期过后,Airflow 会等待下一个时间间隔的出现。

  2. Airflow 会安排第一次 DAG 运行在此计划的时间间隔结束时进行。

    例如,如果某个 DAG 计划每小时运行一次,并且开始日期为今天的 12:00,则第一次 DAG 运行会在今天 13:00 进行。

本文档中的安排 Airflow DAG 的运行时间部分介绍了如何使用这些概念为 DAG 设置运行时间。有关 有关 DAG 运行和调度的信息,请参阅 Airflow 文档中的 DAG 运行

触发 DAG 的方法简介

Airflow 提供以下触发 DAG 的方法:

  • 按时间表触发。Airflow 会根据 DAG 文件中为其指定的时间表自动触发 DAG。

  • 手动触发。您可以从 Google Cloud 控制台、Airflow 界面手动触发 DAG,也可以从 Google Cloud CLI 运行 Airflow CLI 命令来触发。

  • 为响应事件而触发。在 对事件的响应是使用传感器

触发 DAG 的其他方法:

准备工作

  • 请确保您的账号具有可管理 环境存储分区以及查看和触发 DAG。 如需了解详情,请参阅访问权限控制

安排 Airflow DAG

您可以在 DAG 文件中定义 DAG 时间表。在以下位置修改 DAG 的定义: 方式:

  1. 找到并修改计算机上的 DAG 文件。如果您没有 DAG 您可以从 从环境的存储桶中对于新 DAG,您可以在创建 DAG 文件时定义所有参数。

  2. schedule_interval 参数中,定义时间表。您可以使用 Cron 表达式(例如 0 0 * * *)或预设(例如 @daily)。如需了解详情,请参阅 Airflow 文档中的 Cron 和时间间隔

    Airflow 根据时间表来确定 DAG 运行的逻辑日期, 。

  3. start_date 参数中,定义开始日期。

    Airflow 使用此 参数。

  4. (可选)在 catchup 参数中,定义 Airflow 是否必须执行从开始日期到当前日期尚未执行的此 DAG 的所有之前运行作业。

    在追赶期间执行的 DAG 运行作业的逻辑日期将包含在 其运行日期将反映该 DAG 运行 。

  5. (可选)在 retries 参数中,定义 Airflow 运行多少次 必须重试失败的任务(每个 DAG 包含一个或多个单独的 任务)。默认情况下,Cloud Composer 中的任务会重试两次 次。

  6. 将新版 DAG 上传到环境的存储桶。

  7. 等待 Airflow 成功解析该 DAG。例如,您可以查看 您的环境中的 DAG 列表(位于 Google Cloud 控制台或 Airflow 界面中。

以下示例 DAG 定义在每天 00:00 和 12:00 各运行两次。其开始日期设为 2024 年 1 月 1 日,但在您上传或暂停该作业后,Airflow 不会针对过去的日期运行该作业,因为“赶上”功能已停用。

此 DAG 包含一项名为 insert_query_job 的任务,该任务会将一行插入 BigQueryInsertJobOperator 运算符。此运算符是 Google Cloud BigQuery 运算符之一,可用于管理数据集和表、运行查询以及验证数据。如果此任务的特定执行失败,Airflow 会再重试四次 次。这些重试的逻辑日期保持不变。

此行的 SQL 查询使用 Airflow 模板:用于写入 DAG 的逻辑日期 为该行添加名称

import datetime

from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator

with DAG(
  "bq_example_scheduling_dag",
  start_date=datetime.datetime(2024, 1, 1),
  schedule_interval='0 */12 * * *',
  catchup=False
  ) as dag:

  insert_query_job = BigQueryInsertJobOperator(
    task_id="insert_query_job",
    retries=4,
    configuration={
        "query": {
            # schema: date (string), description (string)
            # example row: "20240101T120000", "DAG run: <DAG: bq_example_scheduling_dag>"
            "query": "INSERT example_dataset.example_table VALUES ('{{ ts_nodash }}', 'DAG run: {{ dag }}' )",
            "useLegacySql": False,
            "priority": "BATCH",
        }
    },
    location="us-central1"
  )

  insert_query_job

如需测试此 DAG,您可以手动触发它并 然后查看任务执行日志

更多时间安排参数示例

以下调度参数示例说明了调度如何与 不同的参数组合:

  • 如果 start_datedatetime(2024, 4, 4, 16, 25)schedule_interval30 16 * * *,则首次 DAG 运行在 2024 年 4 月 5 日 16:30 进行。

  • 如果 start_datedatetime(2024, 4, 4, 16, 35)schedule_interval30 16 * * *,则首次 DAG 运行在 2024 年 4 月 6 日 16:30 进行。 由于开始日期晚于 2024 年 4 月 4 日的计划时间间隔,因此 2024 年 4 月 5 日不进行 DAG 运行。计划的时间间隔于 2024 年 4 月 5 日 16:35 结束,因此下一次 DAG 运行安排在次日 16:30 进行。

  • 如果 start_datedatetime(2024, 4, 4)schedule_interval@daily,则第一次 DAG 运行安排在 2024 年 4 月 5 日 00:00 进行。

  • 如果 start_datedatetime(2024, 4, 4, 16, 30),并且 schedule_interval0 * * * *,则第一次 DAG 运行为 安排在 2024 年 4 月 4 日 18:00。在指定日期和时间过后,Airflow 会安排 DAG 运行在每小时的零分进行一次。最接近的时间点为 17:00。此时,Airflow 安排 DAG 运行在计划的时间间隔结束时进行,即 18:00。

手动触发 DAG

当您手动触发 Airflow DAG 时,Airflow 会运行一次 DAG,这与 DAG 文件中指定的时间表无关。

控制台

Cloud Composer 1.17.8 及更高版本支持 DAG 界面。

如需从 Google Cloud 控制台触发 DAG,请执行以下操作:

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 选择环境以查看其详细信息。

  3. 环境详情页面上,前往 DAG 标签页。

  4. 点击 DAG 的名称。

  5. DAG 详情页面上,点击触发 DAG。新 DAG 运行是 创建。

Airflow 界面

如需从 Airflow 界面触发 DAG,请执行以下操作:

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. Airflow Web 服务器列中,点击与您的环境对应的 Airflow 链接。

  3. 使用具有相应权限的 Google 账号登录。

  4. 在 Airflow 网页界面中的 DAG 页面上,在您的 DAG 的链接列中,点击触发 Dag (Trigger Dag) 按钮。

  5. (可选)指定 DAG 运行配置。

  6. 点击触发器

gcloud

在 Airflow 1.10.12 或更低版本中,运行 trigger_dag Airflow CLI 命令:

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    trigger_dag -- DAG_ID

在 Airflow 1.10.14 或更高版本(包括 Airflow 2)中,运行 dags trigger Airflow CLI 命令:

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags trigger -- DAG_ID

替换以下内容:

  • ENVIRONMENT_NAME:您的环境的名称。
  • LOCATION:环境所在的区域。
  • DAG_ID:DAG 的名称。

如需详细了解如何在 Cloud Composer 环境中运行 Airflow CLI 命令,请参阅运行 Airflow CLI 命令

如需详细了解可用的 Airflow CLI 命令,请参阅 gcloud composer environments run 命令参考文档

查看 DAG 运行日志和详细信息

在 Google Cloud 控制台中,您可以执行以下操作:

此外,Cloud Composer 还提供 Airflow 界面,这是 Airflow 自己的网页界面。

暂停 DAG

控制台

Cloud Composer 1.17.8 及更高版本支持 DAG 界面。

如需从 Google Cloud 控制台中暂停 DAG,请执行以下操作:

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 选择环境以查看其详细信息。

  3. 环境详情页面上,前往 DAG 标签页。

  4. 点击 DAG 的名称。

  5. DAG 详情页面上,点击 Pause DAG(暂停 DAG)。

Airflow 界面

如需在 Airflow 界面中暂停 DAG,请执行以下操作:

  1. 在 Google Cloud 控制台中,前往环境页面。

转到“环境”

  1. Airflow Web 服务器列中,点击与您的环境对应的 Airflow 链接。

  2. 使用具有相应权限的 Google 账号登录。

  3. 在 Airflow 网页界面中,点击 DAG 页面上的 DAG 名称旁边的切换开关。

gcloud

在 Airflow 1.10.12 或更低版本中,运行 pause Airflow CLI 命令:

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    pause -- DAG_ID

在 Airflow 1.10.14 或更高版本(包括 Airflow 2)中,运行 dags pause Airflow CLI 命令:

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags pause -- DAG_ID

替换以下内容:

  • ENVIRONMENT_NAME:您的环境的名称。
  • LOCATION:环境所在的区域。
  • DAG_ID:DAG 的名称。

如需详细了解如何在 Cloud Composer 环境中运行 Airflow CLI 命令,请参阅运行 Airflow CLI 命令

如需详细了解可用的 Airflow CLI 命令,请参阅 gcloud composer environments run 命令参考文档

后续步骤