Cloud Composer 1 | Cloud Composer 2
このページでは、Cloud Composer 環境で DAG をトリガーするさまざまな方法について説明します。
Airflow では、以下の方法で DAG をトリガーできます。
スケジュールに基づいてトリガーする。DAG を作成する際ときに、DAG のスケジュールを指定します。Airflow は指定されたスケジューリング パラメータに基づいて DAG を自動的にトリガーします。
手動でトリガーする。DAG は Google Cloud コンソールや Airflow UI から手動でトリガーできます。または、
gcloud
から Airflow CLI コマンドを実行してトリガーすることもできます。イベントに応答してトリガーする。イベントに応答して DAG をトリガーする標準的な方法は、センサーを使用することです。
DAG をトリガーする他の方法:
プログラムでトリガーする。Airflow REST API を使用して DAG をトリガーできます(たとえば Python スクリプトから)。
イベントに応答してプログラムでトリガーする。Cloud Functions と Airflow REST API を使用すると、イベントに応答して DAG をトリガーできます。
スケジュールに従って DAG をトリガーする
スケジュールに従って DAG をトリガーするには:
- このセクションで後述するように、DAG ファイルで
start_date
パラメータとschedule_interval
パラメータを指定します。 - ご利用の環境に DAG ファイルをアップロードします。
スケジューリング パラメータを指定する
DAG を定義するときに、schedule_interval
パラメータで DAG を実行する頻度を指定します。start_date
パラメータで、Airflow が DAG のスケジューリングを開始するタイミングを指定します。DAG 内のタスクには個別の開始日を使用することも、すべてのタスクに単一の開始日を指定することもできます。DAG 内のタスクの最小開始日とスケジュール間隔に基づいて、Airflow が DAG 実行をスケジュール設定します。
スケジューリングは次のように機能します。start_date
が経過すると、Airflow は次の schedule_interval
が発生するのを待ちます。その後、このスケジュール間隔の最後に最初の DAG 実行が行われるようにスケジュール設定されます。たとえば、DAG が 1 時間おき(schedule_interval
が 1 時間)、開始日が今日の 12:00 に指定されている場合、最初の DAG の実行は今日の 13:00 に行われます。
次の例は、2024 年 4 月 5 日の 15:00 から 1 時間ごとに実行される DAG を示しています。この例で使用されているパラメータでは、Airflow は最初の DAG 実行が 2024 年 4 月 5 日の 16:00 に発生するようにスケジュール設定されます。
from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
with DAG(
dag_id='example_dag_schedule',
# At 15:00 on 5 April, 2024
start_date=datetime(2024, 4, 5, 15, 0),
# At minute 0 of every hour
schedule_interval='0 * * * *') as dag:
# Output the current date and time
t1 = BashOperator(
task_id='date',
bash_command='date',
dag=dag)
t1
スケジューリング パラメータの詳細については、Airflow ドキュメントの DAG の実行 をご覧ください。
スケジュール設定パラメータのその他の例
以下に示すスケジューリング パラメータの例は、パラメータのさまざまな組み合わせでスケジューリングがどのように動作するかを示しています。
start_date
がdatetime(2024, 4, 4, 16, 25)
で、schedule_interval
が30 16 * * *
の場合、最初の DAG 実行は 2024 年 4 月 5 日の 16:30 に発生します。start_date
がdatetime(2024, 4, 4, 16, 35)
で、schedule_interval
が30 16 * * *
の場合、最初の DAG 実行は 2024 年 4 月 6 日の 16:30 に発生します。開始日が 2024 年 4 月 4 日のスケジュール間隔より後であるため、2024 年 4 月 5 日には DAG 実行が発生しません。代わりに、スケジュール間隔が 2024 年 4 月 5 日の 16:35 に終了するため、次回の DAG 実行は翌日の 16:30 にスケジュール設定されます。start_date
がdatetime(2024, 4, 4)
で、schedule_interval
が@daily
の場合、最初の DAG 実行は 2024 年 4 月 5 日の 00:00 にスケジュール設定されます。start_date
がdatetime(2024, 4, 4, 16, 30)
で、schedule_interval
が0 * * * *
の場合、最初の DAG 実行は 2024 年 4 月 4 日の 18:00 にスケジュール設定されます。指定した日時を経過すると、Airflow は毎時 0 分に DAG 実行が発生するようにスケジュール設定します。これが発生する最も近い時点は 17:00 です。この時点で、Airflow はスケジュール間隔の終了時刻、つまり 18:00 に DAG 実行をスケジュール設定します。
DAG を手動でトリガーする
DAG を手動でトリガーすると、Airflow は DAG を実行します。たとえば、すでにスケジュールに従って実行されている DAG があり、この DAG を手動でトリガーした場合、Airflow は DAG に指定された実際のスケジュールとは別に DAG を 1 回実行します。
Console
DAG UI は Cloud Composer 1.17.8 以降のバージョンでサポートされています。
Google Cloud コンソールから DAG をトリガーするには、次の手順を行います。
Google Cloud Console で [環境] ページに移動します。
詳細を表示する環境を選択します。
[環境の詳細] ページで、[DAG] タブに移動します。
DAG の名前をクリックします。
[DAG の詳細] ページで、[DAG をトリガー] をクリックします。新しい DAG 実行が作成されます。
Airflow UI
Airflow ウェブ インターフェースから DAG をトリガーするには:
- Google Cloud Console で [環境] ページに移動します。
[Airflow ウェブサーバー] 列で、ご使用の環境の [Airflow] リンクをクリックします。
適切な権限を持つ Google アカウントでログインします。
Airflow ウェブ インターフェースの [DAG] ページで、DAG の [リンク] 列の [DAG をトリガー] ボタンをクリックします。
(省略可)DAG 実行の構成を指定します。
[トリガー] をクリックします。
gcloud
Airflow 1.10.12 以前では、trigger_dag
Airflow CLI コマンドを実行します。
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
trigger_dag -- DAG_ID
Airflow 1.10.14 以降(Airflow 2 を含む)では、dags trigger
Airflow CLI コマンドを実行します。
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
dags trigger -- DAG_ID
次のように置き換えます。
ENVIRONMENT_NAME
を環境の名前にする。LOCATION
は、環境が配置されているリージョン。DAG_ID
は、DAG の名前に置き換えます。
Cloud Composer 環境で Airflow CLI コマンドを実行する方法の詳細については、Airflow CLI コマンドの実行をご覧ください。
使用可能な Airflow CLI コマンドの詳細については、gcloud composer environments run
コマンド リファレンスをご覧ください。