DAG をトリガーする

Cloud Composer 1 | Cloud Composer 2

このページでは、Cloud Composer 環境で DAG をトリガーするさまざまな方法について説明します。

Airflow では、次の方法で DAG をトリガーできます。

  • スケジュールに基づいてトリガーする。DAG を作成する際ときに、DAG のスケジュールを指定します。Airflow は指定されたスケジューリング パラメータに基づいて DAG を自動的にトリガーします。

  • 手動でトリガーする。DAG は Airflow UI から手動でトリガーできます。また、gcloud から Airflow CLI コマンドを実行することもできます。

  • イベントに応答してトリガーする。イベントに応答して DAG をトリガーする標準的な方法は、センサーを使用することです。

DAG をトリガーするその他の方法:

スケジュールに従って DAG をトリガーする

スケジュールに従って DAG をトリガーするには:

  1. このセクションで後述するように、DAG ファイルで start_date パラメータと schedule_interval パラメータを指定します。
  2. DAG ファイルをご利用の環境にアップロードします。

スケジューリング パラメータを指定する

DAG を定義するときに、schedule_interval パラメータで DAG を実行する頻度を指定します。start_date パラメータで、Airflow が DAG のスケジューリングを開始するタイミングを指定します。DAG 内のタスクには個別の開始日を使用することも、すべてのタスクに単一の開始日を指定することもできます。DAG 内のタスクの最小開始日とスケジュール間隔に基づいて、Airflow が DAG 実行をスケジュール設定します。

スケジューリングは次のように機能します。start_date が経過すると、Airflow は次の schedule_interval が発生するのを待ちます。その後、このスケジュール間隔の最後に最初の DAG 実行が行われるようにスケジュール設定されます。たとえば、DAG が 1 時間おき(schedule_interval が 1 時間)、開始日が今日の 12:00 に指定されている場合、最初の DAG の実行は今日の 13:00 に行われます。

次の例は、2021 年 4 月 5 日の 15:00 から 1 時間ごとに実行される DAG を示しています。この例で使用されているパラメータでは、Airflow は最初の DAG 実行が 2021 年 4 月 5 日の 16:00 に発生するようにスケジュールされます。

from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator

with DAG(
    dag_id='example_dag_schedule',
    # At 15:00 on 5 April, 2021
    start_date=datetime(2021, 4, 5, 15, 0),
    # At minute 0 of every hour
    schedule_interval='0 * * * *') as dag:

    # Output the current date and time
    t1 = BashOperator(
        task_id='date',
        bash_command='date',
        dag=dag)

    t1

スケジューリング パラメータの詳細については、Airflow ドキュメントの DAG の実行 をご覧ください。

スケジュール設定パラメータのその他の例

以下に示すスケジューリング パラメータの例は、パラメータのさまざまな組み合わせでスケジューリングがどのように動作するかを示しています。

  • start_datedatetime(2021, 4, 4, 16, 25) で、schedule_interval30 16 * * * の場合、最初の DAG 実行は 2021 年 4 月 5 日の 16:30 に発生します。
  • start_datedatetime(2021, 4, 4, 16, 35) で、schedule_interval30 16 * * * の場合、最初の DAG 実行は 2021 年 4 月 6 日の 16:30 に発生します。開始日が 2021 年 4 月 4 日のスケジュール間隔より後であるため、2021 年 4 月 5 日には DAG 実行が発生しません。代わりに、スケジュール間隔が 2021 年 4 月 5 日の 16:35 に終了するため、次回の DAG 実行は翌日の 16:30 にスケジュール設定されます。
  • start_datedatetime(2021, 4, 4) で、schedule_interval@daily の場合、最初の DAG 実行は 2021 年 4 月 5 日の 00:00 にスケジュール設定されます。
  • start_datedatetime(2021, 4, 4, 16, 30) で、schedule_interval0 * * * * の場合、最初の DAG 実行は 2021 年 4 月 4 日の 18:00 にスケジュール設定されます。指定した日時を経過すると、Airflow は毎時 0 分に DAG 実行が発生するようにスケジュール設定します。これが発生する最も近い時点は 17:00 です。この時点で、Airflow はスケジュール間隔の終了時刻、つまり 18:00 に DAG 実行をスケジュール設定します。

DAG を手動でトリガーする

DAG を手動でトリガーすると、Airflow は DAG を実行します。たとえば、すでにスケジュールに従って実行されている DAG があり、この DAG を手動でトリガーした場合、Airflow は DAG に指定された実際のスケジュールとは別に DAG を 1 回実行します。

Airflow UI

Airflow ウェブ インターフェースから DAG をトリガーするには:

  1. Google Cloud Console で [環境] ページに移動します。

[環境] に移動

  1. [Airflow ウェブサーバー] 列で、ご使用の環境の [Airflow] リンクをクリックします。

  2. 適切な権限を持つ Google アカウントでログインします。

  3. Airflow ウェブ インターフェースの [DAG] ページで、DAG の [リンク] 列の [DAG をトリガー] ボタンをクリックします。

  4. (省略可)DAG 実行の構成を指定します。

  5. [トリガー] をクリックします。

gcloud

Airflow 1.10.* で、trigger_dag Airflow CLI コマンドを実行します。

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    trigger_dag -- DAG_ID

Airflow 2 で、dags trigger Airflow CLI コマンドを実行します。

  gcloud beta composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags trigger -- DAG_ID

次のように置き換えます。

  • ENVIRONMENT_NAME を環境の名前にする。
  • LOCATION は、環境が配置される Compute Engine のリージョンに置き換えます。
  • DAG_ID は、DAG の名前に置き換えます。

Cloud Composer 環境で Airflow CLI コマンドを実行する方法の詳細については、Airflow CLI コマンドを実行するをご覧ください。

使用可能な Airflow CLI コマンドの詳細については、gcloud composer environments run コマンド リファレンスをご覧ください。

次のステップ