安排和触发 Airflow DAG

Cloud Composer 1 |Cloud Composer 2 |Cloud Composer 3

本页面介绍了 Airflow 中调度和 DAG 触发的工作原理, 定义 DAG 时间表,以及如何手动触发 DAG 或暂停 DAG。

Cloud Composer 中的 Airflow DAG 简介

Cloud Composer 中的 Airflow DAG 在 一个或多个 Cloud Composer 环境 项目。您可以将 Airflow DAG 的源文件上传到 与环境关联的 Cloud Storage 存储桶。然后,环境的 Airflow 实例会解析这些文件,并根据每个 DAG 的时间表安排 DAG 运行。在 DAG 运行期间,Airflow 会安排和执行 按 DAG 定义的序列组成 DAG 的各项任务。

如需详细了解 Airflow 的核心概念,例如 Airflow DAG、DAG 运行 任务或运算符,请参阅 Airflow 文档。

Airflow 中的 DAG 调度简介

Airflow 为其调度机制提供了以下概念:

逻辑日期

表示执行特定 DAG 运行的日期

这不是 Airflow 运行 DAG 的实际日期,而是一段时间段 特定 DAG 运行必须处理的内容。例如,对于安排每天 12:00 运行的 DAG,逻辑日期也会是特定日期的 12:00。由于该作业每天运行两次,因此它必须处理的时间段是过去 12 小时。同时, DAG 本身可能根本不使用逻辑日期或时间间隔。 例如,DAG 可能每天运行一次同一脚本,而不使用 逻辑日期的值。

在 Airflow 2.2 之前的版本中,此日期称为执行日期。

运行日期

表示执行特定 DAG 运行的时间的日期。

例如,对于计划每天中午 12 点运行的 DAG, DAG 的实际执行时间可能是 12:05 逻辑日期经过。

安排的时间间隔

表示 DAG 必须以逻辑日期的形式执行的时间和频率。

例如,每日时间表意味着 DAG 每天运行一次,并且其 DAG 运行的逻辑日期间隔为 24 小时。

开始日期

指定您希望 Airflow 何时开始安排 DAG。

DAG 中的任务可以有单独的开始日期,也可以指定 所有任务的同一个开始日期。Airflow 会根据 DAG 中任务的最早开始日期和计划的时间间隔安排 DAG 运行。

赶超、回填和重试

用于执行过去日期的 DAG 运行作业的机制。

赶超执行尚未运行的 DAG 运行,例如, 如果 DAG 长时间暂停然后取消暂停,则会发生该错误。您可以使用回填功能来执行特定日期范围内的 DAG 运行作业。“重试次数”用于指定 Airflow 在从 DAG 执行任务时必须进行的尝试次数。

时间安排的工作方式如下:

  1. 开始日期过后,Airflow 会等待 计划间隔

  2. Airflow 将首次 DAG 运行安排在此时间表结束时进行 。

    例如,如果某个 DAG 计划每小时运行一次,并且开始日期为今天的 12:00,则第一次 DAG 运行会在今天 13:00 进行。

本文档中的安排 Airflow DAG 的运行时间部分介绍了如何使用这些概念为 DAG 设置运行时间。如需详细了解 DAG 运行和调度,请参阅 Airflow 文档中的 DAG 运行

触发 DAG 的方法简介

Airflow 提供以下触发 DAG 的方法:

  • 按时间表触发。Airflow 触发 DAG 根据 DAG 文件中为其指定的时间表自动创建。

  • 手动触发。您可以从以下位置手动触发 DAG: Google Cloud 控制台或 Airflow 界面,或者通过运行 Airflow CLI 命令来 Google Cloud CLI

  • 为响应事件而触发。触发 DAG 来响应事件的标准方法是使用传感器

触发 DAG 的其他方法:

准备工作

  • 请确保您的账号具有可管理 环境存储分区以及查看和触发 DAG。 如需了解详情,请参阅访问权限控制

安排 Airflow DAG

您可以在 DAG 文件中定义 DAG 时间表。按如下方式修改 DAG 的定义:

  1. 在计算机上找到并修改 DAG 文件。如果您没有 DAG 文件,可以从环境的存储桶下载其副本。对于新 DAG,您需要 可以在创建 DAG 文件时定义所有参数。

  2. schedule_interval 参数中,定义时间表。您可以使用 Cron 表达式(例如 0 0 * * *)或预设(例如 @daily)。如需了解详情,请参阅 Airflow 文档中的 Cron 和时间间隔

    Airflow 会根据您设置的时间表确定 DAG 运行的逻辑日期。

  3. start_date 参数中,定义开始日期。

    Airflow 使用此 参数。

  4. (可选)在 catchup 参数中,定义是否必须执行 Airflow 从开始日期到当前日期,该 DAG 之前的所有运行 尚未执行。

    在赶上时间期间执行的 DAG 运行作业的逻辑日期将设为过去的日期,其运行日期将反映 DAG 运行作业的实际执行时间。

  5. (可选)在 retries 参数中,定义 Airflow 运行多少次 必须重试失败的任务(每个 DAG 包含一个或多个单独的 任务)。默认情况下,Cloud Composer 中的任务会重试两次。

  6. 将新版 DAG 上传到环境的存储桶。

  7. 等待 Airflow 成功解析该 DAG。例如,您可以查看 您的环境中的 DAG 列表(位于 Google Cloud 控制台或 Airflow 界面中。

以下示例 DAG 定义在每天 00:00 和 12:00 各运行两次。其 开始日期设为 2024 年 1 月 1 日,但 Airflow 不会针对过去的日期运行它 (因为追赶功能已停用),然后再上传或暂停。

DAG 包含一个名为 insert_query_job 的任务,该任务使用 BigQueryInsertJobOperator 运算符将行插入表中。此运算符是 Google Cloud BigQuery 运算符之一,可用于管理数据集和表、运行查询以及验证数据。如果此任务的特定执行失败,Airflow 会再重试四次,并采用默认的重试间隔。这些重试的逻辑日期 保持不变

此行的 SQL 查询使用 Airflow 模板:用于写入 DAG 的逻辑日期 为该行添加名称

import datetime

from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator

with DAG(
  "bq_example_scheduling_dag",
  start_date=datetime.datetime(2024, 1, 1),
  schedule_interval='0 */12 * * *',
  catchup=False
  ) as dag:

  insert_query_job = BigQueryInsertJobOperator(
    task_id="insert_query_job",
    retries=4,
    configuration={
        "query": {
            # schema: date (string), description (string)
            # example row: "20240101T120000", "DAG run: <DAG: bq_example_scheduling_dag>"
            "query": "INSERT example_dataset.example_table VALUES ('{{ ts_nodash }}', 'DAG run: {{ dag }}' )",
            "useLegacySql": False,
            "priority": "BATCH",
        }
    },
    location="us-central1"
  )

  insert_query_job

如需测试此 DAG,您可以手动触发它并 然后查看任务执行日志

更多时间安排参数示例

以下调度参数示例说明了调度如何与 不同的参数组合:

  • 如果 start_datedatetime(2024, 4, 4, 16, 25)schedule_interval30 16 * * *,则第一次 DAG 运行于 2024 年 4 月 5 日 16:30 进行。

  • 如果 start_datedatetime(2024, 4, 4, 16, 35)schedule_interval30 16 * * *,则第一次 DAG 运行于 2024 年 4 月 6 日 16:30 进行。由于开始日期晚于计划间隔时间(2024 年 4 月 4 日), DAG 运行不会在 2024 年 4 月 5 日进行。计划的时间间隔于 2024 年 4 月 5 日 16:35 结束,因此下一次 DAG 运行安排在次日 16:30 进行。

  • 如果 start_datedatetime(2024, 4, 4),且 schedule_interval@daily,则第一次 DAG 运行安排在 2024 年 4 月 5 日 00:00。

  • 如果 start_datedatetime(2024, 4, 4, 16, 30)schedule_interval0 * * * *,则第一次 DAG 运行安排在 2024 年 4 月 4 日 18:00 进行。在指定日期和时间过后,Airflow 会安排 DAG 运行在每小时的零分进行一次。最接近的时间点为 17:00。此时,Airflow 安排 DAG 运行在计划的时间间隔结束时进行,即 18:00。

手动触发 DAG

当您手动触发 Airflow DAG 时,Airflow 会运行一次 DAG,这与 DAG 文件中指定的时间表无关。

控制台

如需从 Google Cloud 控制台触发 DAG,请执行以下操作:

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 选择环境以查看其详细信息。

  3. 环境详情页面上,前往 DAG 标签页。

  4. 点击 DAG 的名称。

  5. DAG 详情页面上,点击 Trigger DAG(触发 DAG)。新 DAG 运行是 创建。

Airflow 界面

如需从 Airflow 界面触发 DAG,请执行以下操作:

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. Airflow Web 服务器列中,点击与您的环境对应的 Airflow 链接。

  3. 使用具有相应权限的 Google 账号登录。

  4. 在 Airflow 网页界面中的 DAG 页面上,在您的 DAG 的链接列中,点击触发 Dag (Trigger Dag) 按钮。

  5. (可选)指定 DAG 运行配置。

  6. 点击触发器

gcloud

运行 dags trigger Airflow CLI 命令:

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags trigger -- DAG_ID

替换以下内容:

  • ENVIRONMENT_NAME:您的环境的名称。
  • LOCATION:环境所在的区域。
  • DAG_ID:DAG 的名称。

如需详细了解如何在 Cloud Composer 环境中运行 Airflow CLI 命令,请参阅运行 Airflow CLI 命令

如需详细了解可用的 Airflow CLI 命令,请参阅 gcloud composer environments run 命令参考文档

查看 DAG 运行日志和详细信息

在 Google Cloud 控制台中,您可以执行以下操作:

此外,Cloud Composer 还提供 Airflow 界面,这是 Airflow 自己的网页界面。

暂停 DAG

控制台

如需从 Google Cloud 控制台暂停 DAG,请执行以下操作:

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 选择环境以查看其详细信息。

  3. 环境详情页面上,前往 DAG 标签页。

  4. 点击 DAG 的名称。

  5. DAG 详情页面上,点击 Pause DAG(暂停 DAG)。

Airflow 界面

如需在 Airflow 界面中暂停 DAG,请执行以下操作:

  1. 在 Google Cloud 控制台中,前往环境页面。

转到“环境”

  1. Airflow Web 服务器列中,点击与您的环境对应的 Airflow 链接。

  2. 使用拥有相应权限的 Google 账号登录。

  3. 在 Airflow 网页界面中,点击 DAG 页面上的 DAG 名称旁边的切换开关。

gcloud

运行 dags pause Airflow CLI 命令:

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags pause -- DAG_ID

替换以下内容:

  • ENVIRONMENT_NAME:您的环境的名称。
  • LOCATION:环境所在的区域。
  • DAG_ID:DAG 的名称。

如需详细了解如何在 Cloud Composer 环境中运行 Airflow CLI 命令,请参阅运行 Airflow CLI 命令

如需详细了解可用的 Airflow CLI 命令,请参阅 gcloud composer environments run 命令参考文档

后续步骤