Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
このページでは、Airflow でのスケジュール設定と DAG トリガーの仕組み、DAG のスケジュールの定義方法、DAG を手動でトリガーまたは一時停止する方法について説明します。
Cloud Composer の Airflow DAG について
Cloud Composer の Airflow DAG は、プロジェクト内の 1 つ以上の Cloud Composer 環境で実行されます。Airflow DAG のソースファイルを、環境に関連付けられた Cloud Storage バケットにアップロードします。環境の Airflow インスタンスは、これらのファイルを解析し、各 DAG のスケジュールで定義されているように DAG の実行をスケジュールします。DAG の実行中、Airflow はこの DAG で定義された順序で DAG を構成する個々のタスクをスケジュールして実行します。
Airflow DAG、DAG 実行、タスク、オペレーターなどの Airflow の基本コンセプトの詳細については、Airflow ドキュメントの基本コンセプト ページをご覧ください。
Airflow での DAG スケジューリングについて
Airflow には、スケジューリング メカニズムに関する次のコンセプトがあります。
- 論理日付
特定の DAG 実行が実行される日付を表します。
これは、Airflow が DAG を実行する実際の日付ではなく、特定の DAG 実行で処理する必要がある期間です。たとえば、毎日 12:00 に実行されるようにスケジュールされている DAG の場合、論理日付も特定の日の 12:00 になります。1 日に 2 回実行されるため、処理される期間は過去 12 時間です。同時に、DAG 自体で定義されたロジックでは、論理日付や期間がまったく使用されない場合があります。たとえば、論理日付の値を使用せずに、DAG で同じスクリプトを 1 日 1 回実行する場合があります。
Airflow 2.2 より前のバージョンでは、この日付は実行日と呼ばれます。
- 実行日
特定の DAG 実行が実行された日付を表します。
たとえば、毎日 12:00 に実行されるようにスケジュールされている DAG の場合、DAG の実際の実行は、論理日付が過ぎてからしばらく経った 12:05 に行われる場合もあります。
- スケジュールの間隔
DAG を実行する必要がある日付と頻度を論理日付で表します。
たとえば、毎日のスケジュールの場合、DAG は 1 日に 1 回実行され、DAG 実行の論理日付は 24 時間間隔になります。
- 開始日
Airflow が DAG のスケジューリングを開始するタイミングを指定します。
DAG 内のタスクには個別の開始日を使用することも、すべてのタスクに単一の開始日を指定することもできます。DAG 内のタスクにおける開始日の最小値とスケジュール間隔に基づいて、Airflow が DAG 実行をスケジュール設定します。
- catchup、バックフィル、再試行
過去の日付の DAG 実行を実行するメカニズム。
catchup は、DAG が長時間一時停止された後に一時停止を解除した場合など、まだ実行されていない DAG 実行を実行します。バックフィルを使用して、特定の日付範囲の DAG 実行を実行できます。再試行では、DAG からタスクを実行する際に Airflow が試行する必要がある回数を指定します。
スケジューリングは次のように機能します。
開始日を経過すると、Airflow はスケジュール間隔の次回の発生を待ちます。
Airflow は、このスケジュール間隔の最後に最初の DAG 実行が行われるようにスケジュール設定します。
たとえば、DAG が 1 時間おきに実行されるようにスケジュールされており、開始日が今日の 12:00 に指定されている場合、最初の DAG の実行は今日の 13:00 に行われます。
このドキュメントのAirflow DAG のスケジュールを設定するセクションでは、これらのコンセプトを使用して DAG のスケジュールを設定する方法について説明します。DAG の実行とスケジューリングの詳細については、Airflow ドキュメントの DAG の実行をご覧ください。
DAG をトリガーする方法について
Airflow では、以下の方法で DAG をトリガーできます。
スケジュールに基づいてトリガーする。Airflow は、DAG ファイルで指定されたスケジュールに基づいて DAG を自動的にトリガーします。
手動でトリガーする。DAG は、Google Cloud コンソールまたは Airflow UI から手動でトリガーできます。また、Google Cloud CLI から Airflow CLI コマンドを実行して手動でトリガーすることもできます。
イベントに応答してトリガーする。イベントに応答して DAG をトリガーする標準的な方法は、センサーを使用することです。
DAG をトリガーする他の方法:
プログラムでトリガーする。Airflow REST API を使用して DAG をトリガーできます。(たとえば Python スクリプトから)。
イベントに応答してプログラムでトリガーする。Cloud Run functions と Airflow REST API を使用して、イベントに応答して DAG をトリガーできます。
始める前に
- 環境バケット内のオブジェクトを管理し、DAG を表示してトリガーできるロールがアカウントに付与されていることを確認します。詳細については、アクセス制御をご覧ください。
Airflow DAG をスケジュールする
DAG のスケジュールは DAG ファイルで定義します。DAG の定義を次のように編集します。
パソコンで DAG ファイルを見つけて編集します。DAG ファイルがない場合は、環境のバケットからコピーをダウンロードできます。新しい DAG の場合は、DAG ファイルを作成するときにすべてのパラメータを定義できます。
schedule_interval
パラメータで、スケジュールを定義します。cron 式(0 0 * * *
など)またはプリセット(@daily
など)を使用できます。詳細については、Airflow ドキュメントの cron と時間間隔をご覧ください。Airflow は、設定したスケジュールに基づいて DAG 実行の論理日付を決定します。
start_date
パラメータで、開始日を定義します。Airflow は、このパラメータを使用して最初の DAG 実行の論理日付を決定します。
(省略可)
catchup
パラメータで、Airflow が開始日から現在の日付までの、この DAG のまだ実行されていない以前の実行をすべて実行する必要があるかどうかを定義します。catchup 中に実行された DAG 実行の論理日付は過去の日付になり、実行日は DAG 実行が実際に実行された時刻を反映します。
(省略可)
retries
パラメータで、Airflow が失敗したタスクを何回再試行する必要があるかを定義します(各 DAG は 1 つ以上の個々のタスクで構成されます)。デフォルトでは、Cloud Composer のタスクは 2 回再試行されます。環境のバケットに新しいバージョンの DAG をアップロードします。
Airflow が DAG を正常に解析するまで待ちます。たとえば、Google Cloud コンソールまたは Airflow UI で環境内の DAG のリストを確認できます。
次の DAG 定義の例は、1 日のうちに 00:00 と 12:00 の 2 回実行されます。開始日は 2024 年 1 月 1 日に設定されていますが、catchup が無効になっているため、アップロードまたは一時停止した後に過去の日付に対して Airflow は実行されません。
DAG には、BigQueryInsertJobOperator
演算子を使用してテーブルに行を挿入する insert_query_job
という名前のタスクが 1 つ含まれています。この演算子は Google Cloud BigQuery 演算子の 1 つで、データセットとテーブルの管理、クエリの実行、データの検証に使用できます。このタスクの特定の実行が失敗した場合、Airflow はデフォルトの再試行間隔でさらに 4 回再試行します。これらの再試行の論理日付は同じままです。
この行の SQL クエリは、Airflow テンプレートを使用して、DAG の論理日付と名前を行に書き込みます。
import datetime
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
with DAG(
"bq_example_scheduling_dag",
start_date=datetime.datetime(2024, 1, 1),
schedule_interval='0 */12 * * *',
catchup=False
) as dag:
insert_query_job = BigQueryInsertJobOperator(
task_id="insert_query_job",
retries=4,
configuration={
"query": {
# schema: date (string), description (string)
# example row: "20240101T120000", "DAG run: <DAG: bq_example_scheduling_dag>"
"query": "INSERT example_dataset.example_table VALUES ('{{ ts_nodash }}', 'DAG run: {{ dag }}' )",
"useLegacySql": False,
"priority": "BATCH",
}
},
location="us-central1"
)
insert_query_job
この DAG をテストするには、手動でトリガーして、タスク実行ログを表示します。
スケジュール設定パラメータのその他の例
以下に示すスケジューリング パラメータの例は、パラメータのさまざまな組み合わせでスケジューリングがどのように動作するかを示しています。
start_date
がdatetime(2024, 4, 4, 16, 25)
で、schedule_interval
が30 16 * * *
の場合、最初の DAG 実行は 2024 年 4 月 5 日の 16:30 に発生します。start_date
がdatetime(2024, 4, 4, 16, 35)
で、schedule_interval
が30 16 * * *
の場合、最初の DAG 実行は 2024 年 4 月 6 日の 16:30 に発生します。開始日が 2024 年 4 月 4 日のスケジュール間隔より後であるため、2024 年 4 月 5 日には DAG 実行が発生しません。代わりに、スケジュール間隔が 2024 年 4 月 5 日の 16:35 に終了するため、次回の DAG 実行は翌日の 16:30 にスケジュール設定されます。start_date
がdatetime(2024, 4, 4)
で、schedule_interval
が@daily
の場合、最初の DAG 実行は 2024 年 4 月 5 日の 00:00 にスケジュール設定されます。start_date
がdatetime(2024, 4, 4, 16, 30)
で、schedule_interval
が0 * * * *
の場合、最初の DAG 実行は 2024 年 4 月 4 日の 18:00 にスケジュール設定されます。指定した日時を経過すると、Airflow は毎時 0 分に DAG 実行が発生するようにスケジュール設定します。これが発生する最も近い時点は 17:00 です。この時点で、Airflow はスケジュール間隔の終了時刻、つまり 18:00 に DAG 実行をスケジュール設定します。
DAG を手動でトリガーする
Airflow DAG を手動でトリガーすると、Airflow は DAG ファイルに指定されたスケジュールとは別に DAG を 1 回実行します。
コンソール
Google Cloud コンソールから DAG をトリガーするには:
Google Cloud コンソールで、[環境] ページに移動します。
環境を選択して詳細を表示します。
[環境の詳細] ページで、[DAG] タブに移動します。
DAG の名前をクリックします。
[DAG の詳細] ページで、[DAG をトリガー] をクリックします。新しい DAG 実行が作成されます。
Airflow UI
Airflow UI から DAG をトリガーするには:
Google Cloud コンソールで、[環境] ページに移動します。
[Airflow ウェブサーバー] 列で、ご使用の環境の [Airflow] リンクをクリックします。
適切な権限を持つ Google アカウントでログインします。
Airflow ウェブ インターフェースの [DAG] ページで、DAG の [リンク] 列の [DAG をトリガー] ボタンをクリックします。
(省略可)DAG 実行の構成を指定します。
[トリガー] をクリックします。
gcloud
dags trigger
Airflow CLI コマンドを実行します。
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
dags trigger -- DAG_ID
以下を置き換えます。
ENVIRONMENT_NAME
: 環境の名前。LOCATION
: 環境が配置されているリージョン。DAG_ID
: DAG の名前。
Cloud Composer 環境で Airflow CLI コマンドを実行する方法の詳細については、Airflow CLI コマンドの実行をご覧ください。
使用可能な Airflow CLI コマンドの詳細については、gcloud composer environments run
コマンド リファレンスをご覧ください。
DAG 実行のログと詳細を表示する
Google Cloud コンソールで、次のことができます。
- 過去の DAG 実行のステータスと DAG の詳細を表示する。
- すべての DAG 実行とこれらの DAG のすべてのタスクの詳細ログを調べる。
- DAG の統計情報を表示する。
また、Cloud Composer は、Airflow 独自のウェブ インターフェースである Airflow UI へのアクセスも提供します。
DAG を一時停止する
コンソール
Google Cloud コンソールから DAG を一時停止するには:
Google Cloud コンソールで、[環境] ページに移動します。
環境を選択して詳細を表示します。
[環境の詳細] ページで、[DAG] タブに移動します。
DAG の名前をクリックします。
[DAG の詳細] ページで、[DAG の一時停止] をクリックします。
Airflow UI
Airflow UI から DAG を一時停止するには:
- Google Cloud コンソールで、[環境] ページに移動します。
[Airflow ウェブサーバー] 列で、ご使用の環境の [Airflow] リンクをクリックします。
適切な権限を持つ Google アカウントでログインします。
Airflow ウェブ インターフェースの [DAG] ページで、DAG 名の横にある切り替えボタンをクリックします。
gcloud
dags pause
Airflow CLI コマンドを実行します。
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
dags pause -- DAG_ID
以下を置き換えます。
ENVIRONMENT_NAME
: 環境の名前。LOCATION
: 環境が配置されているリージョン。DAG_ID
: DAG の名前。
Cloud Composer 環境で Airflow CLI コマンドを実行する方法の詳細については、Airflow CLI コマンドの実行をご覧ください。
使用可能な Airflow CLI コマンドの詳細については、gcloud composer environments run
コマンド リファレンスをご覧ください。