Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
このページでは、Cloud Composer 環境で DAG をトリガーするさまざまな方法について説明します。
Airflow では、以下の方法で DAG をトリガーできます。
スケジュールに基づいてトリガーする。DAG を作成する際ときに、DAG のスケジュールを指定します。Airflow は指定されたスケジューリング パラメータに基づいて DAG を自動的にトリガーします。
手動でトリガーする。DAG は、Google Cloud コンソール、Airflow UI から手動でトリガーできます。または、
gcloud
から Airflow CLI コマンドを実行してトリガーすることもできます。イベントに応答してトリガーする。イベントに応答して DAG をトリガーする標準的な方法は、センサーを使用することです。
DAG をトリガーする他の方法:
プログラムでトリガーする。Airflow REST API を使用して DAG をトリガーできます(たとえば Python スクリプトから)。
イベントに応答してプログラムでトリガーする。Cloud Run functions と Airflow REST API を使用して、イベントに応答して DAG をトリガーできます。
スケジュールに従って DAG をトリガーする
スケジュールに従って DAG をトリガーするには:
- このセクションで後述するように、DAG ファイルで
start_date
パラメータとschedule_interval
パラメータを指定します。 - ご利用の環境に DAG ファイルをアップロードします。
スケジューリング パラメータを指定する
DAG を定義するときに、schedule_interval
パラメータで DAG を実行する頻度を指定します。start_date
パラメータで、Airflow が DAG のスケジューリングを開始するタイミングを指定します。DAG 内のタスクには個別の開始日を使用することも、すべてのタスクに単一の開始日を指定することもできます。DAG 内のタスクの最小開始日とスケジュール間隔に基づいて、Airflow が DAG 実行をスケジュール設定します。
スケジューリングは次のように機能します。start_date
が経過すると、Airflow は次の schedule_interval
が発生するのを待ちます。その後、このスケジュール間隔の最後に最初の DAG 実行が行われるようにスケジュール設定されます。たとえば、DAG が 1 時間おき(schedule_interval
が 1 時間)、開始日が今日の 12:00 に指定されている場合、最初の DAG の実行は今日の 13:00 に行われます。
次の例は、2024 年 4 月 5 日の 15:00 から 1 時間ごとに実行される DAG を示しています。この例で使用されているパラメータでは、Airflow は最初の DAG 実行が 2024 年 4 月 5 日の 16:00 に発生するようにスケジュール設定されます。
from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
with DAG(
dag_id='example_dag_schedule',
# At 15:00 on 5 April, 2024
start_date=datetime(2024, 4, 5, 15, 0),
# At minute 0 of every hour
schedule_interval='0 * * * *') as dag:
# Output the current date and time
t1 = BashOperator(
task_id='date',
bash_command='date',
dag=dag)
t1
スケジューリング パラメータの詳細については、Airflow ドキュメントの DAG の実行 をご覧ください。
スケジュール設定パラメータのその他の例
以下に示すスケジューリング パラメータの例は、パラメータのさまざまな組み合わせでスケジューリングがどのように動作するかを示しています。
start_date
がdatetime(2024, 4, 4, 16, 25)
で、schedule_interval
が30 16 * * *
の場合、最初の DAG 実行は 2024 年 4 月 5 日の 16:30 に発生します。start_date
がdatetime(2024, 4, 4, 16, 35)
で、schedule_interval
が30 16 * * *
の場合、最初の DAG 実行は 2024 年 4 月 6 日の 16:30 に発生します。開始日が 2024 年 4 月 4 日のスケジュール間隔より後であるため、2024 年 4 月 5 日には DAG 実行が発生しません。代わりに、スケジュール間隔が 2024 年 4 月 5 日の 16:35 に終了するため、次回の DAG 実行は翌日の 16:30 にスケジュール設定されます。start_date
がdatetime(2024, 4, 4)
で、schedule_interval
が@daily
の場合、最初の DAG 実行は 2024 年 4 月 5 日の 00:00 にスケジュール設定されます。start_date
がdatetime(2024, 4, 4, 16, 30)
で、schedule_interval
が0 * * * *
の場合、最初の DAG 実行は 2024 年 4 月 4 日の 18:00 にスケジュール設定されます。指定した日時を経過すると、Airflow は毎時 0 分に DAG 実行が発生するようにスケジュール設定します。これが発生する最も近い時点は 17:00 です。この時点で、Airflow はスケジュール間隔の終了時刻、つまり 18:00 に DAG 実行をスケジュール設定します。
DAG を手動でトリガーする
DAG を手動でトリガーすると、Airflow は DAG を実行します。たとえば、すでにスケジュールに従って実行されている DAG があり、この DAG を手動でトリガーした場合、Airflow は DAG に指定された実際のスケジュールとは別に DAG を 1 回実行します。
コンソール
Google Cloud コンソールから DAG をトリガーするには、次の手順を行います。
Google Cloud Console で [環境] ページに移動します。
環境を選択して詳細を表示します。
[環境の詳細] ページで、[DAG] タブに移動します。
DAG の名前をクリックします。
[DAG の詳細] ページで、[DAG をトリガー] をクリックします。新しい DAG 実行が作成されます。
Airflow UI
Airflow ウェブ インターフェースから DAG をトリガーするには:
- Google Cloud Console で [環境] ページに移動します。
[Airflow ウェブサーバー] 列で、ご使用の環境の [Airflow] リンクをクリックします。
適切な権限を持つ Google アカウントでログインします。
Airflow ウェブ インターフェースの [DAG] ページで、DAG の [リンク] 列の [DAG をトリガー] ボタンをクリックします。
(省略可)DAG 実行の構成を指定します。
[トリガー] をクリックします。
gcloud
dags trigger
Airflow CLI コマンドを実行します。
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
dags trigger -- DAG_ID
以下のように置き換えます。
ENVIRONMENT_NAME
を環境の名前にする。LOCATION
は、環境が配置されているリージョン。DAG_ID
は、DAG の名前に置き換えます。
Cloud Composer 環境で Airflow CLI コマンドを実行する方法の詳細については、Airflow CLI コマンドの実行をご覧ください。
使用可能な Airflow CLI コマンドの詳細については、gcloud composer environments run
コマンド リファレンスをご覧ください。