Cloud Composer 1 |Cloud Composer 2 |Cloud Composer 3
本页介绍了 Airflow 中调度和 DAG 触发的工作原理、如何为 DAG 定义时间表,以及如何手动触发 DAG 或将其暂停。
Cloud Composer 中的 Airflow DAG 简介
Cloud Composer 中的 Airflow DAG 在 一个或多个 Cloud Composer 环境 项目。您可以将 Airflow DAG 的源文件上传到 与环境关联的 Cloud Storage 存储桶。然后,环境的 Airflow 实例会解析这些文件,并根据每个 DAG 的时间表安排 DAG 运行。在 DAG 运行期间,Airflow 会安排和执行 按 DAG 定义的序列组成 DAG 的各项任务。
如需详细了解 Airflow 的核心概念(例如 Airflow DAG、DAG 运行作业或运算符),请参阅 Airflow 文档中的核心概念页面。
Airflow 中的 DAG 调度简介
Airflow 为其调度机制提供了以下概念:
- 逻辑日期
表示执行特定 DAG 运行作业的日期。
这不是 Airflow 运行 DAG 的实际日期,而是一段时间段 特定 DAG 运行必须处理的内容。例如,对于安排每天 12:00 运行的 DAG,逻辑日期也会是特定日期的 12:00。由于该作业每天运行两次,因此它必须处理的时间段是过去 12 小时。同时,DAG 本身中定义的逻辑可能根本不会使用逻辑日期或时间间隔。例如,DAG 可能每天运行一次同一脚本,而不使用 逻辑日期的值。
在 Airflow 2.2 之前的版本中,此日期称为执行日期。
- 运行日期
表示执行特定 DAG 运行的时间的日期。
例如,对于安排每天 12:00 运行的 DAG,该 DAG 的实际执行可能会在逻辑日期过后的一段时间(例如 12:05)进行。
- 安排的时间间隔
表示 DAG 必须以逻辑日期的形式执行的时间和频率。
例如,每日时间表意味着 DAG 每天执行一次, 其 DAG 运行的逻辑日期间隔为 24 小时。
- 开始日期
指定您希望 Airflow 开始安排 DAG 的时间。
DAG 中的任务可以有各自的开始日期,您也可以为所有任务指定同一个开始日期。基于任务的最短开始日期 在 DAG 中,Airflow 按计划间隔安排 DAG 运行。
- 补余、回填和重试
用于执行过去日期的 DAG 运行的机制。
赶超执行尚未运行的 DAG 运行,例如, 如果 DAG 长时间暂停然后取消暂停,则会发生该错误。您可以使用回填功能来执行特定日期范围内的 DAG 运行作业。重试次数 指定 Airflow 在执行 DAG 中的任务时必须进行多少次尝试。
时间安排的工作原理如下:
开始日期过后,Airflow 会等待下一个时间间隔的出现。
Airflow 会安排第一次 DAG 运行在此计划的时间间隔结束时进行。
例如,如果某个 DAG 计划每小时运行一次,并且开始日期为今天的 12:00,则第一次 DAG 运行会在今天 13:00 进行。
本文档中的安排 Airflow DAG 的运行时间部分介绍了如何使用这些概念为 DAG 设置运行时间。有关 有关 DAG 运行和调度的信息,请参阅 Airflow 文档中的 DAG 运行。
触发 DAG 的方法简介
Airflow 提供以下触发 DAG 的方法:
按时间表触发。Airflow 会根据 DAG 文件中为其指定的时间表自动触发 DAG。
手动触发。您可以从 Google Cloud 控制台、Airflow 界面手动触发 DAG,也可以从 Google Cloud CLI 运行 Airflow CLI 命令来触发。
为响应事件而触发。在 对事件的响应是使用传感器。
触发 DAG 的其他方法:
以编程方式触发。您可以 使用 Airflow REST API 触发 DAG。 例如,从 Python 脚本。
以编程方式触发以响应事件。您可以在以下位置触发 DAG: 对事件进行响应 Cloud Run 函数和 Airflow REST API。
准备工作
- 请确保您的账号具有可管理 环境存储分区以及查看和触发 DAG。 如需了解详情,请参阅访问权限控制。
安排 Airflow DAG
您可以在 DAG 文件中定义 DAG 时间表。在以下位置修改 DAG 的定义: 方式:
找到并修改计算机上的 DAG 文件。如果您没有 DAG 您可以从 从环境的存储桶中对于新 DAG,您可以在创建 DAG 文件时定义所有参数。
在
schedule_interval
参数中,定义时间表。您可以使用 Cron 表达式(例如0 0 * * *
)或预设(例如@daily
)。如需了解详情,请参阅 Airflow 文档中的 Cron 和时间间隔。Airflow 根据时间表来确定 DAG 运行的逻辑日期, 。
在
start_date
参数中,定义开始日期。Airflow 使用此 参数。
(可选)在
catchup
参数中,定义 Airflow 是否必须执行从开始日期到当前日期尚未执行的此 DAG 的所有之前运行作业。在追赶期间执行的 DAG 运行作业的逻辑日期将包含在 其运行日期将反映该 DAG 运行 。
(可选)在
retries
参数中,定义 Airflow 运行多少次 必须重试失败的任务(每个 DAG 包含一个或多个单独的 任务)。默认情况下,Cloud Composer 中的任务会重试两次 次。将新版 DAG 上传到环境的存储桶。
等待 Airflow 成功解析该 DAG。例如,您可以查看 您的环境中的 DAG 列表(位于 Google Cloud 控制台或 Airflow 界面中。
以下示例 DAG 定义在每天 00:00 和 12:00 各运行两次。其开始日期设为 2024 年 1 月 1 日,但在您上传或暂停该作业后,Airflow 不会针对过去的日期运行该作业,因为“赶上”功能已停用。
此 DAG 包含一项名为 insert_query_job
的任务,该任务会将一行插入
BigQueryInsertJobOperator
运算符。此运算符是 Google Cloud BigQuery 运算符之一,可用于管理数据集和表、运行查询以及验证数据。如果此任务的特定执行失败,Airflow 会再重试四次
次。这些重试的逻辑日期保持不变。
此行的 SQL 查询使用 Airflow 模板:用于写入 DAG 的逻辑日期 为该行添加名称
import datetime
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
with DAG(
"bq_example_scheduling_dag",
start_date=datetime.datetime(2024, 1, 1),
schedule_interval='0 */12 * * *',
catchup=False
) as dag:
insert_query_job = BigQueryInsertJobOperator(
task_id="insert_query_job",
retries=4,
configuration={
"query": {
# schema: date (string), description (string)
# example row: "20240101T120000", "DAG run: <DAG: bq_example_scheduling_dag>"
"query": "INSERT example_dataset.example_table VALUES ('{{ ts_nodash }}', 'DAG run: {{ dag }}' )",
"useLegacySql": False,
"priority": "BATCH",
}
},
location="us-central1"
)
insert_query_job
如需测试此 DAG,您可以手动触发它并 然后查看任务执行日志。
更多时间安排参数示例
以下调度参数示例说明了调度如何与 不同的参数组合:
如果
start_date
为datetime(2024, 4, 4, 16, 25)
且schedule_interval
为30 16 * * *
,则首次 DAG 运行在 2024 年 4 月 5 日 16:30 进行。如果
start_date
为datetime(2024, 4, 4, 16, 35)
且schedule_interval
为30 16 * * *
,则首次 DAG 运行在 2024 年 4 月 6 日 16:30 进行。 由于开始日期晚于 2024 年 4 月 4 日的计划时间间隔,因此 2024 年 4 月 5 日不进行 DAG 运行。计划的时间间隔于 2024 年 4 月 5 日 16:35 结束,因此下一次 DAG 运行安排在次日 16:30 进行。如果
start_date
为datetime(2024, 4, 4)
且schedule_interval
为@daily
,则第一次 DAG 运行安排在 2024 年 4 月 5 日 00:00 进行。如果
start_date
为datetime(2024, 4, 4, 16, 30)
,并且schedule_interval
为0 * * * *
,则第一次 DAG 运行为 安排在 2024 年 4 月 4 日 18:00。在指定日期和时间过后,Airflow 会安排 DAG 运行在每小时的零分进行一次。最接近的时间点为 17:00。此时,Airflow 安排 DAG 运行在计划的时间间隔结束时进行,即 18:00。
手动触发 DAG
当您手动触发 Airflow DAG 时,Airflow 会运行一次 DAG,这与 DAG 文件中指定的时间表无关。
控制台
Cloud Composer 1.17.8 及更高版本支持 DAG 界面。
如需从 Google Cloud 控制台触发 DAG,请执行以下操作:
在 Google Cloud 控制台中,前往环境页面。
选择环境以查看其详细信息。
在环境详情页面上,前往 DAG 标签页。
点击 DAG 的名称。
在 DAG 详情页面上,点击触发 DAG。新 DAG 运行是 创建。
Airflow 界面
如需从 Airflow 界面触发 DAG,请执行以下操作:
在 Google Cloud 控制台中,前往环境页面。
在 Airflow Web 服务器列中,点击与您的环境对应的 Airflow 链接。
使用具有相应权限的 Google 账号登录。
在 Airflow 网页界面中的 DAG 页面上,在您的 DAG 的链接列中,点击触发 Dag (Trigger Dag) 按钮。
(可选)指定 DAG 运行配置。
点击触发器。
gcloud
在 Airflow 1.10.12 或更低版本中,运行 trigger_dag
Airflow CLI 命令:
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
trigger_dag -- DAG_ID
在 Airflow 1.10.14 或更高版本(包括 Airflow 2)中,运行 dags trigger
Airflow CLI 命令:
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
dags trigger -- DAG_ID
替换以下内容:
ENVIRONMENT_NAME
:您的环境的名称。LOCATION
:环境所在的区域。DAG_ID
:DAG 的名称。
如需详细了解如何在 Cloud Composer 环境中运行 Airflow CLI 命令,请参阅运行 Airflow CLI 命令。
如需详细了解可用的 Airflow CLI 命令,请参阅 gcloud composer environments run
命令参考文档。
查看 DAG 运行日志和详细信息
在 Google Cloud 控制台中,您可以执行以下操作:
此外,Cloud Composer 还提供 Airflow 界面,这是 Airflow 自己的网页界面。
暂停 DAG
控制台
Cloud Composer 1.17.8 及更高版本支持 DAG 界面。
如需从 Google Cloud 控制台中暂停 DAG,请执行以下操作:
在 Google Cloud 控制台中,前往环境页面。
选择环境以查看其详细信息。
在环境详情页面上,前往 DAG 标签页。
点击 DAG 的名称。
在 DAG 详情页面上,点击 Pause DAG(暂停 DAG)。
Airflow 界面
如需在 Airflow 界面中暂停 DAG,请执行以下操作:
- 在 Google Cloud 控制台中,前往环境页面。
在 Airflow Web 服务器列中,点击与您的环境对应的 Airflow 链接。
使用具有相应权限的 Google 账号登录。
在 Airflow 网页界面中,点击 DAG 页面上的 DAG 名称旁边的切换开关。
gcloud
在 Airflow 1.10.12 或更低版本中,运行 pause
Airflow CLI 命令:
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
pause -- DAG_ID
在 Airflow 1.10.14 或更高版本(包括 Airflow 2)中,运行 dags pause
Airflow CLI 命令:
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
dags pause -- DAG_ID
替换以下内容:
ENVIRONMENT_NAME
:您的环境的名称。LOCATION
:环境所在的区域。DAG_ID
:DAG 的名称。
如需详细了解如何在 Cloud Composer 环境中运行 Airflow CLI 命令,请参阅运行 Airflow CLI 命令。
如需详细了解可用的 Airflow CLI 命令,请参阅 gcloud composer environments run
命令参考文档。