触发 DAG

Cloud Composer 1 | Cloud Composer 2

本页面介绍在 Cloud Composer 环境中触发 DAG 的不同方法。

Airflow 提供以下触发 DAG 的方法:

  • 按时间表触发。创建 DAG 时,您需要为它指定时间表。Airflow 会根据指定的时间安排参数自动触发 DAG。

  • 手动触发。您可以从 Airflow 界面手动触发 DAG,也可以从 gcloud 运行 Airflow CLI 命令来触发。

  • 触发来响应事件。触发 DAG 来响应事件的标准方法是使用传感器

触发 DAG 的其他方法:

按计划触发 DAG

如需计划触发 DAG,请执行以下操作:

  1. 在 DAG 文件中指定 start_dateschedule_interval 参数,如本部分稍后所述。
  2. 将 DAG 文件上传到您的环境。

指定时间安排参数

当您定义 DAG 时,在 schedule_interval 参数中指定您希望运行 DAG 的频率。在 start_date 参数中,指定您希望 Airflow 开始安排 DAG 的时间。DAG 中的任务可以有各自的开始日期,或者您可以为所有任务指定一个开始日期。Airflow 会根据 DAG 中任务的最早开始日期和计划的时间间隔安排 DAG 运行。

时间安排的工作方式如下。start_date 过了以后,Airflow 等待下一个 schedule_interval 的出现。然后,它会安排第一次 DAG 运行在此计划的时间间隔结束时进行。例如,如果某个 DAG 计划每小时运行一次(schedule_interval 为 1 小时),并且开始日期为今天的 12:00,则第一次 DAG 运行会在今天 13:00 进行。

以下示例展示了从 2021 年 4 月 5 日 15:00 开始每小时运行一次的 DAG。根据示例中使用的参数,Airflow 会安排第一次 DAG 运行在 2021 年 4 月 5 日 16:00 进行。

from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator

with DAG(
    dag_id='example_dag_schedule',
    # At 15:00 on 5 April, 2021
    start_date=datetime(2021, 4, 5, 15, 0),
    # At minute 0 of every hour
    schedule_interval='0 * * * *') as dag:

    # Output the current date and time
    t1 = BashOperator(
        task_id='date',
        bash_command='date',
        dag=dag)

    t1

如需详细了解时间安排参数,请参阅 Airflow 文档中的 DAG 运行

更多时间安排参数示例

以下时间安排参数示例展示了不同参数组合的时间安排的工作方式:

  • 如果 start_datedatetime(2021, 4, 4, 16, 25)schedule_interval30 16 * * *,则第一次 DAG 运行于 2021 年 4 月 5 日 16:30 进行。
  • 如果 start_datedatetime(2021, 4, 4, 16, 35)schedule_interval30 16 * * *,则第一次 DAG 运行于 2021 年 4 月 6 日 16:30 进行。由于开始日期晚于 2021 年 4 月 4 日的计划时间间隔,因此 2021 年 4 月 5 日不进行 DAG 运行。计划的时间间隔于 2021 年 4 月 5 日 16:35 结束,因此下一次 DAG 运行安排在次日 16:30 进行。
  • 如果 start_datedatetime(2021, 4, 4)schedule_interval@daily,则第一次 DAG 运行安排在 2021 年 4 月 5 日 00:00 进行。
  • 如果 start_datedatetime(2021, 4, 4, 16, 30)schedule_interval0 * * * *,则第一次 DAG 运行安排在 2021 年 4 月 4 日 18:00 进行。在指定日期和时间过后,Airflow 会安排 DAG 运行在每小时的零分进行一次。最接近的时间点为 17:00。此时,Airflow 安排 DAG 运行在计划的时间间隔结束时进行,即 18:00。

手动触发 DAG

当您手动触发 DAG 时,Airflow 会执行一次 DAG 运行。例如,如果您有一个已按时间表运行的 DAG,并且手动触发此 DAG,则 Airflow 会执行一次 DAG,这与为 DAG 指定的实际时间表无关。

控制台

Cloud Composer 1.17.8 及更高版本支持 DAG 界面。

如需从 Google Cloud 控制台触发 DAG,请执行以下操作:

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 选择环境以查看其详细信息。

  3. 环境详情页面上,转到 DAG 标签页。

  4. 点击 DAG 的名称。

  5. DAG 详细信息页面上,点击触发 DAG。系统会创建新的 DAG 运行。

Airflow 界面

要从 Airflow 网页界面触发 DAG,请执行以下操作:

  1. 在 Google Cloud 控制台中,前往环境页面。

转到“环境”

  1. Airflow Web 服务器列中,点击与您的环境对应的 Airflow 链接。

  2. 使用具有相应权限的 Google 账号登录。

  3. 在 Airflow 网页界面中的 DAG 页面上,在您的 DAG 的链接列中,点击触发 Dag (Trigger Dag) 按钮。

  4. (可选)指定 DAG 运行配置。

  5. 点击触发器

gcloud

在 Airflow 1.10.12 或更低版本中,运行 trigger_dag Airflow CLI 命令:

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    trigger_dag -- DAG_ID

在 Airflow 1.10.14 或更高版本(包括 Airflow 2)中,运行 dags trigger Airflow CLI 命令:

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags trigger -- DAG_ID

您需要将其中的:

  • ENVIRONMENT_NAME 替换为环境的名称。
  • LOCATION 替换为环境所在的区域。
  • DAG_ID 替换为 DAG 名称。

如需详细了解如何在 Cloud Composer 环境中运行 Airflow CLI 命令,请参阅运行 Airflow CLI 命令

如需详细了解可用的 Airflow CLI 命令,请参阅 gcloud composer environments run 命令参考文档

后续步骤