Cloud Composer 1 | Cloud Composer 2
本页面介绍在 Cloud Composer 环境中触发 DAG 的不同方法。
Airflow 提供以下触发 DAG 的方法:
按时间表触发。创建 DAG 时,您需要为它指定时间表。Airflow 会根据指定的时间安排参数自动触发 DAG。
手动触发。您可以从 Airflow 界面手动触发 DAG,也可以从
gcloud
运行 Airflow CLI 命令来触发。触发来响应事件。触发 DAG 来响应事件的标准方法是使用传感器。
触发 DAG 的其他方法:
以编程方式触发。您可以使用 Airflow REST API 触发 DAG。例如,从 Python 脚本。
以编程方式触发以响应事件。您可以使用 Cloud Functions 和 Airflow REST API 触发 DAG 来响应事件。
按计划触发 DAG
如需计划触发 DAG,请执行以下操作:
- 在 DAG 文件中指定
start_date
和schedule_interval
参数,如本部分稍后所述。 - 将 DAG 文件上传到您的环境。
指定时间安排参数
当您定义 DAG 时,在 schedule_interval
参数中指定您希望运行 DAG 的频率。在 start_date
参数中,指定您希望 Airflow 开始安排 DAG 的时间。DAG 中的任务可以有各自的开始日期,或者您可以为所有任务指定一个开始日期。Airflow 会根据 DAG 中任务的最早开始日期和计划的时间间隔安排 DAG 运行。
时间安排的工作方式如下。start_date
过了以后,Airflow 等待下一个 schedule_interval
的出现。然后,它会安排第一次 DAG 运行在此计划的时间间隔结束时进行。例如,如果某个 DAG 计划每小时运行一次(schedule_interval
为 1 小时),并且开始日期为今天的 12:00,则第一次 DAG 运行会在今天 13:00 进行。
以下示例展示了从 2021 年 4 月 5 日 15:00 开始每小时运行一次的 DAG。根据示例中使用的参数,Airflow 会安排第一次 DAG 运行在 2021 年 4 月 5 日 16:00 进行。
from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
with DAG(
dag_id='example_dag_schedule',
# At 15:00 on 5 April, 2021
start_date=datetime(2021, 4, 5, 15, 0),
# At minute 0 of every hour
schedule_interval='0 * * * *') as dag:
# Output the current date and time
t1 = BashOperator(
task_id='date',
bash_command='date',
dag=dag)
t1
如需详细了解时间安排参数,请参阅 Airflow 文档中的 DAG 运行。
更多时间安排参数示例
以下时间安排参数示例展示了不同参数组合的时间安排的工作方式:
- 如果
start_date
为datetime(2021, 4, 4, 16, 25)
且schedule_interval
为30 16 * * *
,则第一次 DAG 运行于 2021 年 4 月 5 日 16:30 进行。 - 如果
start_date
为datetime(2021, 4, 4, 16, 35)
且schedule_interval
为30 16 * * *
,则第一次 DAG 运行于 2021 年 4 月 6 日 16:30 进行。由于开始日期晚于 2021 年 4 月 4 日的计划时间间隔,因此 2021 年 4 月 5 日不进行 DAG 运行。计划的时间间隔于 2021 年 4 月 5 日 16:35 结束,因此下一次 DAG 运行安排在次日 16:30 进行。 - 如果
start_date
为datetime(2021, 4, 4)
且schedule_interval
为@daily
,则第一次 DAG 运行安排在 2021 年 4 月 5 日 00:00 进行。 - 如果
start_date
为datetime(2021, 4, 4, 16, 30)
且schedule_interval
为0 * * * *
,则第一次 DAG 运行安排在 2021 年 4 月 4 日 18:00 进行。在指定日期和时间过后,Airflow 会安排 DAG 运行在每小时的零分进行一次。最接近的时间点为 17:00。此时,Airflow 安排 DAG 运行在计划的时间间隔结束时进行,即 18:00。
手动触发 DAG
当您手动触发 DAG 时,Airflow 会执行一次 DAG 运行。例如,如果您有一个已按时间表运行的 DAG,并且手动触发此 DAG,则 Airflow 会执行一次 DAG,这与为 DAG 指定的实际时间表无关。
控制台
Cloud Composer 1.17.8 及更高版本支持 DAG 界面。
如需从 Google Cloud 控制台触发 DAG,请执行以下操作:
在 Google Cloud 控制台中,前往环境页面。
选择环境以查看其详细信息。
在环境详情页面上,转到 DAG 标签页。
点击 DAG 的名称。
在 DAG 详细信息页面上,点击触发 DAG。系统会创建新的 DAG 运行。
Airflow 界面
要从 Airflow 网页界面触发 DAG,请执行以下操作:
- 在 Google Cloud 控制台中,前往环境页面。
在 Airflow Web 服务器列中,点击与您的环境对应的 Airflow 链接。
使用具有相应权限的 Google 账号登录。
在 Airflow 网页界面中的 DAG 页面上,在您的 DAG 的链接列中,点击触发 Dag (Trigger Dag) 按钮。
(可选)指定 DAG 运行配置。
点击触发器。
gcloud
在 Airflow 1.10.12 或更低版本中,运行 trigger_dag
Airflow CLI 命令:
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
trigger_dag -- DAG_ID
在 Airflow 1.10.14 或更高版本(包括 Airflow 2)中,运行 dags trigger
Airflow CLI 命令:
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
dags trigger -- DAG_ID
您需要将其中的:
ENVIRONMENT_NAME
替换为环境的名称。LOCATION
替换为环境所在的区域。DAG_ID
替换为 DAG 名称。
如需详细了解如何在 Cloud Composer 环境中运行 Airflow CLI 命令,请参阅运行 Airflow CLI 命令。
如需详细了解可用的 Airflow CLI 命令,请参阅 gcloud composer environments run
命令参考文档。