安排和触发 Airflow DAG

Cloud Composer 1 |Cloud Composer 2 |Cloud Composer 3

本页面介绍了 Airflow 中调度和 DAG 触发的工作原理, 定义 DAG 时间表,以及如何手动触发 DAG 或暂停 DAG。

Cloud Composer 中的 Airflow DAG 简介

Cloud Composer 中的 Airflow DAG 在 一个或多个 Cloud Composer 环境 项目。您可以将 Airflow DAG 的源文件上传到 与环境关联的 Cloud Storage 存储桶。然后,环境的 Airflow 实例会解析这些文件,并根据每个 DAG 的时间表安排 DAG 运行。在 DAG 运行期间,Airflow 会安排和执行 按 DAG 定义的序列组成 DAG 的各项任务。

如需详细了解 Airflow 的核心概念,例如 Airflow DAG、DAG 运行 任务或运算符,请参阅 Airflow 文档。

Airflow 中的 DAG 调度简介

Airflow 为其调度机制提供了以下概念:

逻辑日期

表示执行特定 DAG 运行的日期

这不是 Airflow 运行 DAG 的实际日期,而是一段时间段 特定 DAG 运行必须处理的内容。例如,对于一个 计划在每天中午 12:00 运行,逻辑日期也是中午 12:00 特定日期展示由于它每天运行两次,因此必须要达到的 过去 12 小时的数据同时, DAG 本身可能根本不使用逻辑日期或时间间隔。 例如,DAG 可能每天运行一次同一脚本,而不使用 逻辑日期的值。

在 Airflow 2.2 之前的版本中,此日期称为执行日期。

运行日期

表示执行特定 DAG 运行作业的日期。

例如,对于安排每天 12:00 运行的 DAG,该 DAG 的实际执行可能会在逻辑日期过后的一段时间(例如 12:05)进行。

安排的时间间隔

表示 DAG 必须以逻辑日期的形式执行的时间和频率。

例如,每日时间表意味着 DAG 每天执行一次, 其 DAG 运行的逻辑日期间隔为 24 小时。

开始日期

指定您希望 Airflow 开始安排 DAG 的时间。

DAG 中的任务可以有单独的开始日期, 所有任务的同一个开始日期。基于任务的最短开始日期 在 DAG 中,Airflow 按计划间隔安排 DAG 运行。

赶超、回填和重试

用于执行过去日期的 DAG 运行作业的机制。

追赶作业会执行尚未运行的 DAG 运行作业,例如,如果 DAG 长时间处于暂停状态,然后取消暂停。您可以 使用回填在特定日期范围内执行 DAG 运行。“重试次数”用于指定 Airflow 在从 DAG 执行任务时必须尝试的次数。

时间安排的工作方式如下:

  1. 开始日期过后,Airflow 会等待下一个时间间隔的出现。

  2. Airflow 将首次 DAG 运行安排在此时间表结束时进行 。

    例如,如果某个 DAG 计划每小时运行一次,并且开始日期为今天的 12:00,则第一次 DAG 运行会在今天 13:00 进行。

本文档中的安排 Airflow DAG 的运行时间部分介绍了如何使用这些概念为 DAG 设置运行时间。有关 有关 DAG 运行和调度的信息,请参阅 Airflow 文档中的 DAG 运行

触发 DAG 的方法简介

Airflow 提供以下触发 DAG 的方法:

  • 按时间表触发。Airflow 触发 DAG 根据 DAG 文件中为其指定的时间表自动创建。

  • 手动触发。您可以从以下位置手动触发 DAG: Google Cloud 控制台或 Airflow 界面,或者通过运行 Airflow CLI 命令来 Google Cloud CLI

  • 为响应事件而触发。触发 DAG 来响应事件的标准方法是使用传感器

触发 DAG 的其他方法:

准备工作

  • 请确保您的账号具有可管理环境存储桶中的对象以及查看和触发 DAG 的角色。如需了解详情,请参阅访问权限控制

安排 Airflow DAG

您可以在 DAG 文件中为 DAG 定义时间表。在以下位置修改 DAG 的定义: 方式:

  1. 在计算机上找到并修改 DAG 文件。如果您没有 DAG 文件,可以从环境的存储桶下载其副本。对于新 DAG,您需要 可以在创建 DAG 文件时定义所有参数。

  2. schedule_interval 参数中,定义时间表。您可以使用 Cron 表达式(例如 0 0 * * *)或预设(例如 @daily)。对于 请参阅 Cron 和时间间隔 Airflow 文档中所述。

    Airflow 会根据您设置的时间表确定 DAG 运行的逻辑日期。

  3. start_date 参数中,定义开始日期。

    Airflow 使用此参数确定第一次 DAG 运行的逻辑日期。

  4. (可选)在 catchup 参数中,定义是否必须执行 Airflow 从开始日期到当前日期,该 DAG 之前的所有运行 尚未执行。

    在追赶期间执行的 DAG 运行作业的逻辑日期将包含在 其运行日期将反映该 DAG 运行 。

  5. (可选)在 retries 参数中,定义 Airflow 运行多少次 必须重试失败的任务(每个 DAG 包含一个或多个单独的 任务)。默认情况下,Cloud Composer 中的任务会重试两次 次。

  6. 将新版 DAG 上传到环境的 存储分区。

  7. 等待 Airflow 成功解析 DAG。例如,您可以在 Google Cloud 控制台或 Airflow 界面中查看环境中的 DAG 列表

以下示例 DAG 定义每天运行两次,分别在 00:00 和 12:00。其 开始日期设为 2024 年 1 月 1 日,但 Airflow 不会针对过去的日期运行它 (因为追赶功能已停用),然后再上传或暂停。

此 DAG 包含一项名为 insert_query_job 的任务,该任务会将一行插入 BigQueryInsertJobOperator 运算符。此运算符是以下之一: Google Cloud BigQuery 运算符, 可用于管理数据集和表、运行查询以及验证数据。 如果此任务的特定执行失败,Airflow 会再重试四次,并采用默认的重试间隔。这些重试的逻辑日期保持不变。

此行的 SQL 查询使用 Airflow 模板将 DAG 的逻辑日期和名称写入该行。

import datetime

from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator

with DAG(
  "bq_example_scheduling_dag",
  start_date=datetime.datetime(2024, 1, 1),
  schedule_interval='0 */12 * * *',
  catchup=False
  ) as dag:

  insert_query_job = BigQueryInsertJobOperator(
    task_id="insert_query_job",
    retries=4,
    configuration={
        "query": {
            # schema: date (string), description (string)
            # example row: "20240101T120000", "DAG run: <DAG: bq_example_scheduling_dag>"
            "query": "INSERT example_dataset.example_table VALUES ('{{ ts_nodash }}', 'DAG run: {{ dag }}' )",
            "useLegacySql": False,
            "priority": "BATCH",
        }
    },
    location="us-central1"
  )

  insert_query_job

如需测试此 DAG,您可以手动触发它并 然后查看任务执行日志

更多时间安排参数示例

以下调度参数示例说明了调度如何与 不同的参数组合:

  • 如果 start_datedatetime(2024, 4, 4, 16, 25)schedule_interval30 16 * * *,则首次 DAG 运行在 2024 年 4 月 5 日 16:30 进行。

  • 如果 start_datedatetime(2024, 4, 4, 16, 35)schedule_interval30 16 * * *,则第一次 DAG 运行于 2024 年 4 月 6 日 16:30 进行。由于开始日期晚于 2024 年 4 月 4 日的计划时间间隔,因此 2024 年 4 月 5 日不进行 DAG 运行。而是 间隔结束时间为 2024 年 4 月 5 日 16:35,因此已安排下一次 DAG 运行 次日 16:30

  • 如果 start_datedatetime(2024, 4, 4)schedule_interval@daily,则第一次 DAG 运行安排在 2024 年 4 月 5 日 00:00 进行。

  • 如果 start_datedatetime(2024, 4, 4, 16, 30),并且 schedule_interval0 * * * *,则第一次 DAG 运行为 安排在 2024 年 4 月 4 日 18:00。在指定日期和时间过后,Airflow 会安排 DAG 运行在每小时的零分进行一次。最接近的时间点为 17:00。此时,Airflow 安排 DAG 运行在计划的时间间隔结束时进行,即 18:00。

手动触发 DAG

当您手动触发 Airflow DAG 时,Airflow 会运行 执行一次该 DAG 操作,并且独立于 DAG 文件中指定的时间表。

控制台

如需从 Google Cloud 控制台触发 DAG,请执行以下操作:

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 选择环境以查看其详细信息。

  3. 环境详情页面上,前往 DAG 标签页。

  4. 点击 DAG 的名称。

  5. DAG 详情页面上,点击触发 DAG。新 DAG 运行是 创建。

Airflow 界面

如需从 Airflow 界面触发 DAG,请执行以下操作:

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. Airflow Web 服务器列中,点击与您的环境对应的 Airflow 链接。

  3. 使用具有相应权限的 Google 账号登录。

  4. 在 Airflow 网页界面中的 DAG 页面上,在您的 DAG 的链接列中,点击触发 Dag (Trigger Dag) 按钮。

  5. (可选)指定 DAG 运行配置。

  6. 点击触发器

gcloud

运行 dags trigger Airflow CLI 命令:

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags trigger -- DAG_ID

替换以下内容:

  • ENVIRONMENT_NAME:您的环境的名称。
  • LOCATION:环境所在的区域。
  • DAG_ID:DAG 的名称。

如需详细了解如何在 Cloud Composer 环境中运行 Airflow CLI 命令,请参阅运行 Airflow CLI 命令

如需详细了解可用的 Airflow CLI 命令,请参阅 gcloud composer environments run 命令参考文档

查看 DAG 运行日志和详情

在 Google Cloud 控制台中,您可以执行以下操作:

此外,Cloud Composer 还提供对 Airflow 界面的访问权限,该界面是 Airflow 自己的 Web 界面。

暂停 DAG

控制台

如需从 Google Cloud 控制台暂停 DAG,请执行以下操作:

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 选择环境以查看其详细信息。

  3. 环境详情页面上,前往 DAG 标签页。

  4. 点击 DAG 的名称。

  5. DAG 详情页面上,点击暂停 DAG

Airflow 界面

如需在 Airflow 界面中暂停 DAG,请执行以下操作:

  1. 在 Google Cloud 控制台中,前往环境页面。

转到“环境”

  1. Airflow Web 服务器列中,点击与您的环境对应的 Airflow 链接。

  2. 使用拥有相应权限的 Google 账号登录。

  3. 在 Airflow 网页界面中,点击 DAG 页面上的 DAG 名称旁边的切换开关。

gcloud

运行 dags pause Airflow CLI 命令:

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags pause -- DAG_ID

替换以下内容:

  • ENVIRONMENT_NAME:您的环境的名称。
  • LOCATION:环境所在的区域。
  • DAG_ID:DAG 的名称。

如需详细了解如何在 Cloud Composer 环境中运行 Airflow CLI 命令,请参阅运行 Airflow CLI 命令

如需详细了解可用的 Airflow CLI 命令,请参阅 gcloud composer environments run 命令参考文档

后续步骤