Cloud Composer 1 |Cloud Composer 2 |Cloud Composer 3
本页面介绍了 Airflow 中调度和 DAG 触发的工作原理, 定义 DAG 时间表,以及如何手动触发 DAG 或暂停 DAG。
Cloud Composer 中的 Airflow DAG 简介
Cloud Composer 中的 Airflow DAG 在 一个或多个 Cloud Composer 环境 项目。您可以将 Airflow DAG 的源文件上传到 与环境关联的 Cloud Storage 存储桶。然后,环境的 Airflow 实例会解析这些文件,并根据每个 DAG 的时间表安排 DAG 运行。在 DAG 运行期间,Airflow 会安排和执行 按 DAG 定义的序列组成 DAG 的各项任务。
如需详细了解 Airflow 的核心概念,例如 Airflow DAG、DAG 运行 任务或运算符,请参阅 Airflow 文档。
Airflow 中的 DAG 调度简介
Airflow 为其调度机制提供了以下概念:
- 逻辑日期
表示执行特定 DAG 运行的日期。
这不是 Airflow 运行 DAG 的实际日期,而是一段时间段 特定 DAG 运行必须处理的内容。例如,对于一个 计划在每天中午 12:00 运行,逻辑日期也是中午 12:00 特定日期展示由于它每天运行两次,因此必须要达到的 过去 12 小时的数据同时, DAG 本身可能根本不使用逻辑日期或时间间隔。 例如,DAG 可能每天运行一次同一脚本,而不使用 逻辑日期的值。
在 Airflow 2.2 之前的版本中,此日期称为执行日期。
- 运行日期
表示执行特定 DAG 运行作业的日期。
例如,对于安排每天 12:00 运行的 DAG,该 DAG 的实际执行可能会在逻辑日期过后的一段时间(例如 12:05)进行。
- 安排的时间间隔
表示 DAG 必须以逻辑日期的形式执行的时间和频率。
例如,每日时间表意味着 DAG 每天执行一次, 其 DAG 运行的逻辑日期间隔为 24 小时。
- 开始日期
指定您希望 Airflow 开始安排 DAG 的时间。
DAG 中的任务可以有单独的开始日期, 所有任务的同一个开始日期。基于任务的最短开始日期 在 DAG 中,Airflow 按计划间隔安排 DAG 运行。
- 赶超、回填和重试
用于执行过去日期的 DAG 运行作业的机制。
追赶作业会执行尚未运行的 DAG 运行作业,例如,如果 DAG 长时间处于暂停状态,然后取消暂停。您可以 使用回填在特定日期范围内执行 DAG 运行。“重试次数”用于指定 Airflow 在从 DAG 执行任务时必须尝试的次数。
时间安排的工作方式如下:
开始日期过后,Airflow 会等待下一个时间间隔的出现。
Airflow 将首次 DAG 运行安排在此时间表结束时进行 。
例如,如果某个 DAG 计划每小时运行一次,并且开始日期为今天的 12:00,则第一次 DAG 运行会在今天 13:00 进行。
本文档中的安排 Airflow DAG 的运行时间部分介绍了如何使用这些概念为 DAG 设置运行时间。有关 有关 DAG 运行和调度的信息,请参阅 Airflow 文档中的 DAG 运行。
触发 DAG 的方法简介
Airflow 提供以下触发 DAG 的方法:
按时间表触发。Airflow 触发 DAG 根据 DAG 文件中为其指定的时间表自动创建。
手动触发。您可以从以下位置手动触发 DAG: Google Cloud 控制台或 Airflow 界面,或者通过运行 Airflow CLI 命令来 Google Cloud CLI
为响应事件而触发。触发 DAG 来响应事件的标准方法是使用传感器。
触发 DAG 的其他方法:
以编程方式触发。您可以使用 Airflow REST API 触发 DAG。例如,从 Python 脚本。
以编程方式触发以响应事件。您可以使用 Cloud Run 函数和 Airflow REST API 触发 DAG 来响应事件。
准备工作
- 请确保您的账号具有可管理环境存储桶中的对象以及查看和触发 DAG 的角色。如需了解详情,请参阅访问权限控制。
安排 Airflow DAG
您可以在 DAG 文件中为 DAG 定义时间表。在以下位置修改 DAG 的定义: 方式:
在计算机上找到并修改 DAG 文件。如果您没有 DAG 文件,可以从环境的存储桶下载其副本。对于新 DAG,您需要 可以在创建 DAG 文件时定义所有参数。
在
schedule_interval
参数中,定义时间表。您可以使用 Cron 表达式(例如0 0 * * *
)或预设(例如@daily
)。对于 请参阅 Cron 和时间间隔 Airflow 文档中所述。Airflow 会根据您设置的时间表确定 DAG 运行的逻辑日期。
在
start_date
参数中,定义开始日期。Airflow 使用此参数确定第一次 DAG 运行的逻辑日期。
(可选)在
catchup
参数中,定义是否必须执行 Airflow 从开始日期到当前日期,该 DAG 之前的所有运行 尚未执行。在追赶期间执行的 DAG 运行作业的逻辑日期将包含在 其运行日期将反映该 DAG 运行 。
(可选)在
retries
参数中,定义 Airflow 运行多少次 必须重试失败的任务(每个 DAG 包含一个或多个单独的 任务)。默认情况下,Cloud Composer 中的任务会重试两次 次。将新版 DAG 上传到环境的 存储分区。
等待 Airflow 成功解析 DAG。例如,您可以在 Google Cloud 控制台或 Airflow 界面中查看环境中的 DAG 列表。
以下示例 DAG 定义每天运行两次,分别在 00:00 和 12:00。其 开始日期设为 2024 年 1 月 1 日,但 Airflow 不会针对过去的日期运行它 (因为追赶功能已停用),然后再上传或暂停。
此 DAG 包含一项名为 insert_query_job
的任务,该任务会将一行插入
BigQueryInsertJobOperator
运算符。此运算符是以下之一:
Google Cloud BigQuery 运算符,
可用于管理数据集和表、运行查询以及验证数据。
如果此任务的特定执行失败,Airflow 会再重试四次,并采用默认的重试间隔。这些重试的逻辑日期保持不变。
此行的 SQL 查询使用 Airflow 模板将 DAG 的逻辑日期和名称写入该行。
import datetime
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
with DAG(
"bq_example_scheduling_dag",
start_date=datetime.datetime(2024, 1, 1),
schedule_interval='0 */12 * * *',
catchup=False
) as dag:
insert_query_job = BigQueryInsertJobOperator(
task_id="insert_query_job",
retries=4,
configuration={
"query": {
# schema: date (string), description (string)
# example row: "20240101T120000", "DAG run: <DAG: bq_example_scheduling_dag>"
"query": "INSERT example_dataset.example_table VALUES ('{{ ts_nodash }}', 'DAG run: {{ dag }}' )",
"useLegacySql": False,
"priority": "BATCH",
}
},
location="us-central1"
)
insert_query_job
如需测试此 DAG,您可以手动触发它并 然后查看任务执行日志。
更多时间安排参数示例
以下调度参数示例说明了调度如何与 不同的参数组合:
如果
start_date
为datetime(2024, 4, 4, 16, 25)
且schedule_interval
为30 16 * * *
,则首次 DAG 运行在 2024 年 4 月 5 日 16:30 进行。如果
start_date
为datetime(2024, 4, 4, 16, 35)
且schedule_interval
为30 16 * * *
,则第一次 DAG 运行于 2024 年 4 月 6 日 16:30 进行。由于开始日期晚于 2024 年 4 月 4 日的计划时间间隔,因此 2024 年 4 月 5 日不进行 DAG 运行。而是 间隔结束时间为 2024 年 4 月 5 日 16:35,因此已安排下一次 DAG 运行 次日 16:30如果
start_date
为datetime(2024, 4, 4)
且schedule_interval
为@daily
,则第一次 DAG 运行安排在 2024 年 4 月 5 日 00:00 进行。如果
start_date
为datetime(2024, 4, 4, 16, 30)
,并且schedule_interval
为0 * * * *
,则第一次 DAG 运行为 安排在 2024 年 4 月 4 日 18:00。在指定日期和时间过后,Airflow 会安排 DAG 运行在每小时的零分进行一次。最接近的时间点为 17:00。此时,Airflow 安排 DAG 运行在计划的时间间隔结束时进行,即 18:00。
手动触发 DAG
当您手动触发 Airflow DAG 时,Airflow 会运行 执行一次该 DAG 操作,并且独立于 DAG 文件中指定的时间表。
控制台
如需从 Google Cloud 控制台触发 DAG,请执行以下操作:
在 Google Cloud 控制台中,前往环境页面。
选择环境以查看其详细信息。
在环境详情页面上,前往 DAG 标签页。
点击 DAG 的名称。
在 DAG 详情页面上,点击触发 DAG。新 DAG 运行是 创建。
Airflow 界面
如需从 Airflow 界面触发 DAG,请执行以下操作:
在 Google Cloud 控制台中,前往环境页面。
在 Airflow Web 服务器列中,点击与您的环境对应的 Airflow 链接。
使用具有相应权限的 Google 账号登录。
在 Airflow 网页界面中的 DAG 页面上,在您的 DAG 的链接列中,点击触发 Dag (Trigger Dag) 按钮。
(可选)指定 DAG 运行配置。
点击触发器。
gcloud
运行 dags trigger
Airflow CLI 命令:
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
dags trigger -- DAG_ID
替换以下内容:
ENVIRONMENT_NAME
:您的环境的名称。LOCATION
:环境所在的区域。DAG_ID
:DAG 的名称。
如需详细了解如何在 Cloud Composer 环境中运行 Airflow CLI 命令,请参阅运行 Airflow CLI 命令。
如需详细了解可用的 Airflow CLI 命令,请参阅 gcloud composer environments run
命令参考文档。
查看 DAG 运行日志和详情
在 Google Cloud 控制台中,您可以执行以下操作:
- 查看过往 DAG 运行作业的状态和 DAG 详情。
- 浏览所有 DAG 运行作业和所有任务的详细日志 从这些 DAG 中获取。
- 查看 DAG 统计信息。
此外,Cloud Composer 还提供对 Airflow 界面的访问权限,该界面是 Airflow 自己的 Web 界面。
暂停 DAG
控制台
如需从 Google Cloud 控制台暂停 DAG,请执行以下操作:
在 Google Cloud 控制台中,前往环境页面。
选择环境以查看其详细信息。
在环境详情页面上,前往 DAG 标签页。
点击 DAG 的名称。
在 DAG 详情页面上,点击暂停 DAG。
Airflow 界面
如需在 Airflow 界面中暂停 DAG,请执行以下操作:
- 在 Google Cloud 控制台中,前往环境页面。
在 Airflow Web 服务器列中,点击与您的环境对应的 Airflow 链接。
使用拥有相应权限的 Google 账号登录。
在 Airflow 网页界面中,点击 DAG 页面上的 DAG 名称旁边的切换开关。
gcloud
运行 dags pause
Airflow CLI 命令:
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
dags pause -- DAG_ID
替换以下内容:
ENVIRONMENT_NAME
:您的环境的名称。LOCATION
:环境所在的区域。DAG_ID
:DAG 的名称。
如需详细了解如何在 Cloud Composer 环境中运行 Airflow CLI 命令,请参阅运行 Airflow CLI 命令。
如需详细了解可用的 Airflow CLI 命令,请参阅 gcloud composer environments run
命令参考文档。