Airflow DAG のスケジュール設定とトリガー

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

このページでは、Airflow でのスケジュール設定と DAG トリガーの仕組み、DAG のスケジュールの定義方法、DAG を手動でトリガーまたは一時停止する方法について説明します。

Cloud Composer の Airflow DAG について

Cloud Composer の Airflow DAG は、プロジェクト内の 1 つ以上の Cloud Composer 環境で実行されます。Airflow DAG のソースファイルをアップロードして、環境に関連付けられた Cloud Storage バケットに保存します。環境の Airflow インスタンスは、これらのファイルを解析し、各 DAG のスケジュールで定義されているように DAG の実行をスケジュールします。DAG の実行中、Airflow は DAG で定義された順序で DAG を構成する個々のタスクをスケジュールして実行します。

Airflow DAG、DAG 実行、タスク、オペレーターなどの Airflow のコアコンセプトの詳細については、Airflow ドキュメントのコアコンセプト ページをご覧ください。

Airflow での DAG スケジューリングについて

Airflow には、スケジューリング メカニズムに関する次のコンセプトがあります。

論理日付

特定の DAG 実行が実行される日付を表します。

これは、Airflow が DAG を実行する実際の日付ではなく、特定の DAG 実行で処理する必要がある期間です。たとえば、毎日 12:00 に実行されるようにスケジュールされている DAG の場合、論理日も特定の日の 12:00 になります。1 日に 2 回実行されるため、処理対象の期間は過去 12 時間です。同時に、DAG 自体で定義されたロジックでは、論理日付や期間がまったく使用されない場合があります。たとえば、DAG は論理日付の値を使用せずに、同じスクリプトを 1 日 1 回実行する場合があります。

Airflow 2.2 より前のバージョンでは、この日付は「実行日」と呼ばれます。

実行日

特定の DAG 実行が実行された日付を表します。

たとえば、毎日 12:00 に実行されるようにスケジュールされている DAG の場合、DAG の実際の実行は、論理日が過ぎてからしばらく経った 12:05 に行われる場合もあります。

スケジュールの間隔

論理日付で、DAG を実行するタイミングと頻度を表します。

たとえば、1 日スケジュールの場合、DAG は 1 日に 1 回実行され、DAG 実行の論理日付は 24 時間間隔になります。

開始日

Airflow が DAG のスケジューリングを開始するタイミングを指定します。

DAG 内のタスクには個別の開始日を使用することも、すべてのタスクに単一の開始日を指定することもできます。DAG 内のタスクの最小開始日とスケジュール間隔に基づいて、Airflow が DAG 実行をスケジュール設定します。

キャッチアップ、バックフィル、再試行

過去の日付の DAG 実行を実行するメカニズム。

キャッチアップは、DAG が長時間一時停止された後に一時停止を解除した場合など、まだ実行されていない DAG 実行を実行します。バックフィルを使用して、特定の日付範囲の DAG 実行を実行できます。再試行では、DAG からタスクを実行する際に Airflow が試行する回数を指定します。

スケジューリングは次のように機能します。

  1. 開始日が経過すると、Airflow はスケジュール間隔の次回の発生を待ちます。

  2. Airflow は、このスケジュール間隔の最後に最初の DAG 実行が行われるようにスケジュール設定します。

    たとえば、DAG が 1 時間おきに実行され、開始日が今日の 12:00 に指定されている場合、最初の DAG の実行は今日の 13:00 に行われます。

このドキュメントのAirflow DAG のスケジュールを設定するセクションでは、これらのコンセプトを使用して DAG のスケジュールを設定する方法について説明します。DAG の実行とスケジューリングの詳細については、Airflow ドキュメントの DAG の実行をご覧ください。

DAG をトリガーする方法について

Airflow では、以下の方法で DAG をトリガーできます。

  • スケジュールに基づいてトリガーする。Airflow は、DAG ファイルで指定されたスケジュールに基づいて DAG を自動的にトリガーします。

  • 手動でトリガーする。DAG は、Google Cloud コンソール、Airflow UI、または Google Cloud CLI から Airflow CLI コマンドを実行して手動でトリガーできます。

  • イベントに応答してトリガーします。イベントに応答して DAG をトリガーする標準的な方法は、センサーを使用することです。

DAG をトリガーする他の方法:

始める前に

  • 環境バケット内のオブジェクトを管理し、DAG を表示してトリガーできるロールがアカウントに付与されていることを確認します。詳細については、アクセス制御をご覧ください。

Airflow DAG をスケジュールする

DAG のスケジュールは DAG ファイルで定義します。DAG の定義を次のように編集します。

  1. パソコンで DAG ファイルを見つけて編集します。DAG ファイルがない場合は、環境のバケットからコピーをダウンロードできます。新しい DAG の場合は、DAG ファイルを作成するときにすべてのパラメータを定義できます。

  2. schedule_interval パラメータで、スケジュールを定義します。CRON 式(0 0 * * * など)またはプリセット(@daily など)を使用できます。詳細については、Airflow ドキュメントの Cron と時間間隔をご覧ください。

    Airflow は、設定したスケジュールに基づいて DAG 実行の論理日を決定します。

  3. start_date パラメータで、開始日を定義します。

    Airflow は、このパラメータを使用して最初の DAG 実行の論理日を決定します。

  4. (省略可)catchup パラメータで、Airflow が開始日から現在の日付までのこの DAG の以前の実行をすべて実行する必要があるかどうかを定義します。

    キャッチアップ中に実行された DAG 実行の論理日付は過去の日付になり、実行日は DAG 実行が実際に実行された時刻を反映します。

  5. (省略可)retries パラメータで、Airflow が失敗したタスクを再試行する回数を定義します(各 DAG は 1 つ以上の個々のタスクで構成されます)。デフォルトでは、Cloud Composer のタスクは 2 回再試行されます。

  6. 環境のバケットに新しいバージョンの DAG をアップロードします。

  7. Airflow が DAG を正常に解析するまで待ちます。たとえば、Google Cloud コンソールまたは Airflow UI環境内の DAG のリストを確認できます。

次の DAG 定義の例は、00:00 と 12:00 の 1 日に 2 回実行されます。開始日は 2024 年 1 月 1 日に設定されていますが、キャッチアップが無効になっているため、アップロードまたは一時停止した後、過去の日付に対して Airflow は実行されません。

DAG には、BigQueryInsertJobOperator 演算子を使用してテーブルに行を挿入する insert_query_job という名前のタスクが 1 つ含まれています。この演算子は Google Cloud BigQuery 演算子の 1 つで、データセットとテーブルの管理、クエリの実行、データの検証に使用できます。このタスクの特定の実行が失敗した場合、Airflow はデフォルトの再試行間隔でさらに 4 回再試行します。これらの再試行の論理日付は同じままです。

この行の SQL クエリは、Airflow テンプレートを使用して、DAG の論理日付と名前を行に書き込みます。

import datetime

from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator

with DAG(
  "bq_example_scheduling_dag",
  start_date=datetime.datetime(2024, 1, 1),
  schedule_interval='0 */12 * * *',
  catchup=False
  ) as dag:

  insert_query_job = BigQueryInsertJobOperator(
    task_id="insert_query_job",
    retries=4,
    configuration={
        "query": {
            # schema: date (string), description (string)
            # example row: "20240101T120000", "DAG run: <DAG: bq_example_scheduling_dag>"
            "query": "INSERT example_dataset.example_table VALUES ('{{ ts_nodash }}', 'DAG run: {{ dag }}' )",
            "useLegacySql": False,
            "priority": "BATCH",
        }
    },
    location="us-central1"
  )

  insert_query_job

この DAG をテストするには、手動でトリガーしてから、タスク実行ログを表示します。

スケジュール設定パラメータのその他の例

次のスケジューリング パラメータの例は、パラメータのさまざまな組み合わせでスケジューリングがどのように動作するかを示しています。

  • start_datedatetime(2024, 4, 4, 16, 25) で、schedule_interval30 16 * * * の場合、最初の DAG 実行は 2024 年 4 月 5 日の 16:30 に発生します。

  • start_datedatetime(2024, 4, 4, 16, 35) で、schedule_interval30 16 * * * の場合、最初の DAG 実行は 2024 年 4 月 6 日の 16:30 に発生します。開始日が 2024 年 4 月 4 日のスケジュール間隔より後であるため、2024 年 4 月 5 日には DAG 実行が発生しません。代わりに、スケジュール間隔が 2024 年 4 月 5 日の 16:35 に終了するため、次回の DAG 実行は翌日の 16:30 にスケジュール設定されます。

  • start_datedatetime(2024, 4, 4) で、schedule_interval@daily の場合、最初の DAG 実行は 2024 年 4 月 5 日の 00:00 にスケジュール設定されます。

  • start_datedatetime(2024, 4, 4, 16, 30) で、schedule_interval0 * * * * の場合、最初の DAG 実行は 2024 年 4 月 4 日の 18:00 にスケジュール設定されます。指定した日時を経過すると、Airflow は毎時 0 分に DAG 実行が発生するようにスケジュール設定します。これが発生する最も近い時点は 17:00 です。この時点で、Airflow はスケジュール間隔の終了時刻、つまり 18:00 に DAG 実行をスケジュール設定します。

DAG を手動でトリガーする

Airflow DAG を手動でトリガーすると、Airflow は DAG ファイルを指定したスケジュールとは別に DAG を 1 回実行します。

コンソール

Google Cloud コンソールから DAG をトリガーするには、次の手順を行います。

  1. Google Cloud コンソールで [環境] ページに移動します。

    [環境] に移動

  2. 環境を選択して詳細を表示します。

  3. [環境の詳細] ページで、[DAG] タブに移動します。

  4. DAG の名前をクリックします。

  5. [DAG の詳細] ページで、[DAG をトリガー] をクリックします。新しい DAG 実行が作成されます。

Airflow UI

Airflow UI から DAG をトリガーするには:

  1. Google Cloud コンソールで [環境] ページに移動します。

    [環境] に移動

  2. [Airflow ウェブサーバー] 列で、ご使用の環境の [Airflow] リンクをクリックします。

  3. 適切な権限を持つ Google アカウントでログインします。

  4. Airflow ウェブ インターフェースの [DAG] ページで、DAG の [リンク] 列の [DAG をトリガー] ボタンをクリックします。

  5. (省略可)DAG 実行の構成を指定します。

  6. [トリガー] をクリックします。

gcloud

dags trigger Airflow CLI コマンドを実行します。

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags trigger -- DAG_ID

次のように置き換えます。

  • ENVIRONMENT_NAME: 環境の名前。
  • LOCATION: 環境が配置されているリージョン。
  • DAG_ID: DAG の名前。

Cloud Composer 環境で Airflow CLI コマンドを実行する方法の詳細については、Airflow CLI コマンドの実行をご覧ください。

使用可能な Airflow CLI コマンドの詳細については、gcloud composer environments run コマンド リファレンスをご覧ください。

DAG 実行のログと詳細を表示する

Google Cloud コンソールで、次のことができます。

また、Cloud Composer は、Airflow 独自のウェブ インターフェースである Airflow UI へのアクセスも提供します。

DAG を一時停止する

コンソール

Google Cloud コンソールから DAG を一時停止するには:

  1. Google Cloud コンソールで [環境] ページに移動します。

    [環境] に移動

  2. 環境を選択して詳細を表示します。

  3. [環境の詳細] ページで、[DAG] タブに移動します。

  4. DAG の名前をクリックします。

  5. [DAG の詳細] ページで、[DAG の一時停止] をクリックします。

Airflow UI

Airflow UI から DAG を一時停止するには:

  1. Google Cloud コンソールで [環境] ページに移動します。

[環境] に移動

  1. [Airflow ウェブサーバー] 列で、ご使用の環境の [Airflow] リンクをクリックします。

  2. 適切な権限を持つ Google アカウントでログインします。

  3. Airflow ウェブ インターフェースの [DAG] ページで、DAG 名の横にある切り替えボタンをクリックします。

gcloud

dags pause Airflow CLI コマンドを実行します。

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags pause -- DAG_ID

次のように置き換えます。

  • ENVIRONMENT_NAME: 環境の名前。
  • LOCATION: 環境が配置されているリージョン。
  • DAG_ID: DAG の名前。

Cloud Composer 環境で Airflow CLI コマンドを実行する方法の詳細については、Airflow CLI コマンドの実行をご覧ください。

使用可能な Airflow CLI コマンドの詳細については、gcloud composer environments run コマンド リファレンスをご覧ください。

次のステップ