Airflow スケジューラに関する問題のトラブルシューティング

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

このページでは、Airflow スケジューラと DAG プロセッサに関する一般的な問題に関する情報とトラブルシューティングの手順について説明します。

問題の原因を特定する

トラブルシューティングを開始するには、問題の発生が次のいずれであるかを特定します。

  • DAG 解析時(Airflow DAG プロセッサによって DAG が解析されている間)
  • 実行時(Airflow スケジューラで DAG が処理されている間)

解析時間と実行時間について詳しくは、DAG 解析時間と DAG 実行時間の違いをご覧ください。

DAG 処理の問題を調べる

  1. DAG プロセッサのログを検査します
  2. DAG の解析時間をチェックします

実行中のタスクとキューに入れられたタスクのモニタリング

キュー内でタスクが滞留しているかどうかを確認するには、次の手順に従います。

  1. Google Cloud コンソールで、[環境] ページに移動します。

    [環境] に移動

  2. 環境のリストで、ご利用の環境の名前をクリックします。 [環境の詳細] ページが開きます。

  3. [Monitoring] タブに移動します。

  4. [モニタリング] タブで、[DAG 実行] セクションの [Airflow タスク] グラフを調べて、問題があるかどうか確認します。Airflow タスクは、Airflow でキューに格納された状態のタスクであり、Celery または Kubernetes Executor のブローカー キューに移動できます。Celery キューに入れられたタスクは、Celery ブローカーのキューに入れられたタスク インスタンスです。

DAG 解析時間の問題のトラブルシューティング

以下のセクションでは、DAG 解析時間の一般的な問題の症状と想定される修正方法について説明します。

Cloud Composer 1 と Airflow 1 の DAG 解析とスケジューリング

Airflow 2 では、DAG の解析効率が大幅に向上しました。DAG の解析とスケジューリングに関連するパフォーマンスの問題が発生した場合は、Airflow 2 への移行を検討してください。

Cloud Composer 1 では、スケジューラは他の Cloud Composer コンポーネントとともにクラスタノードで実行されます。このため、個々のクラスタノードの負荷が他のノードと比較して高くなることも低くなることもあります。スケジューラのパフォーマンス(DAG 解析とスケジューリング)は、スケジューラが実行されるノードによって異なる場合があります。また、アップグレードやメンテナンス オペレーションの結果として、スケジューラが実行される個々のノードが変更されることもあります。この制限は Cloud Composer 2 で解決されました。Cloud Composer 2 では、CPU リソースとメモリリソースをスケジューラに割り当てることができ、スケジューラのパフォーマンスはクラスタノードの負荷に依存しません。

タスクの数と時間の分布

Airflow では、多数の DAG やタスクを同時にスケジュール設定すると問題が発生する可能性があります。スケジューリングの問題を回避するには、次の操作を行います。

  • より少数の統合されたタスクを使用するように DAG を調整します。
  • DAG のスケジュール間隔を調整して、DAG 実行を時間経過とともに均等に分散します。

Airflow 構成のスケーリング

Airflow には、Airflow が同時に実行できるタスクと DAG の数を制御する Airflow 構成オプションが用意されています。これらの構成オプションを設定するには、環境に合わせて値をオーバーライドします。これらの値の一部は、DAG レベルまたはタスクレベルで設定することもできます。

  • ワーカーの同時実行

    [celery]worker_concurrency パラメータは、Airflow ワーカーが同時に実行できるタスクの最大数を制御します。このパラメータの値に Cloud Composer 環境の Airflow ワーカー数を掛けると、環境内の特定の時点で実行できるタスクの最大数が取得されます。この数は、[core]parallelism Airflow 構成オプションによって制限されます。詳細は以下で説明します。

  • 最大アクティブ DAG 実行数

    [core]max_active_runs_per_dag Airflow 構成オプションは、DAG あたりのアクティブな DAG 実行の最大数を制御します。この上限に達すると、スケジューラは DAG 実行をそれ以上作成しなくなります。

    このパラメータが正しく設定されていない場合、スケジューラが DAG 実行を抑制するという問題が発生する可能性があります。これはスケジューラが特定の時点で DAG 実行インスタンスを作成できなくなるためです。

    この値は、max_active_runs パラメータを使用して DAG レベルで設定することもできます。

  • DAG あたりの最大アクティブ タスク数

    [core]max_active_tasks_per_dag Airflow 構成オプションは、各 DAG で同時に実行できるタスク インスタンスの最大数を制御します。

    このパラメータが正しく設定されていない場合、1 つの DAG インスタンスの実行速度が遅い問題が発生する可能性があります。これは、ある瞬間に実行できる DAG タスクの数には限りがあるからです。この場合は、この構成オプションの値を増やすことができます。

    この値は、max_active_tasks パラメータを使用して DAG レベルで設定することもできます。

    タスクレベルで max_active_tis_per_dag パラメータと max_active_tis_per_dagrun パラメータを使用すると、特定のタスク ID を持つインスタンスが DAG ごと、DAG 実行ごとに実行できる数を制御できます。

  • 並列処理とプールサイズ

    [core]parallelism Airflow 構成オプションは、これらのタスクのすべての依存関係が満たされた後に、Airflow スケジューラが Executor のキュー内にキューに入れるタスクの数を制御します。

    これは、Airflow の設定全体に適用されるグローバル パラメータです。

    タスクはプール内でキューに保存され、実行されます。Cloud Composer 環境では、1 つのプールのみが使用されます。このプールのサイズによって、特定の時間に実行するためにスケジューラによってキューに入れられるタスクの数が制御されます。プールサイズが小さすぎると、スケジューラは、[core]parallelism 構成オプションと Airflow ワーカーの数を掛けた値がまだ満たされていない [celery]worker_concurrency 構成オプションによって定義されたしきい値であっても、実行するタスクをキューに追加できません。

    プールサイズは Airflow UI([メニュー] > [管理] > [プール])で構成できます。プールサイズを、環境で想定する並列処理のレベルに調整します。

    通常、[core]parallelism は、ワーカーの最大数と [celery]worker_concurrency の積が設定されます。

実行中のタスクとキューに入れられたタスクに関する問題のトラブルシューティング

以降のセクションでは、実行中のタスクとキューに入れられたタスクの一般的な問題の症状と想定される修正方法について説明します。

DAG 実行が実行されない

症状:

DAG のスケジュール日付が動的に設定されると、さまざまな予期しない副作用が発生する可能性があります。次に例を示します。

  • DAG 実行が常に未来で、いつまでも実行されません。

  • 過去の DAG 実行が、実行済みで成功としてマークされますが、実行はされていません。

詳細については、Apache Airflow のドキュメントをご覧ください。

解決策の提示

  • Apache Airflow のドキュメントの推奨事項に従ってください。

  • DAG に静的 start_date を設定します。必要に応じて、catchup=False を使用して過去の日付の DAG の実行を無効にできます。

  • このアプローチの副作用を認識していない限り、datetime.now() または days_ago(<number of days>) の使用は避けてください。

Airflow スケジューラの TimeTable 機能の使用

時刻表は Airflow 2.2 以降で利用できます。

DAG の時刻表は、次のいずれかの方法で定義できます。

組み込みの時刻表を使用することもできます。

制限されたクラスタ リソース

環境内の GKE クラスタがすべての DAG とタスクを処理できないほど小さい場合、パフォーマンスの問題が発生する可能性があります。この場合は、次のいずれかの解決策を試してください。

  • パフォーマンスを向上させるマシンタイプを使用して新しい環境を作成し、DAG を移行する。
  • 追加の Cloud Composer 環境を作成し、環境間で DAG を分割します。
  • GKE ノードのマシンタイプをアップグレードするの説明に従って、GKE ノードのマシンタイプを変更します。この手順はエラーが発生しやすいため、最も推奨されないオプションです。
  • gcloud composer environments update コマンドなどを使用して、ご使用の環境で Airflow データベースを実行する Cloud SQL インスタンスのマシンタイプをアップグレードする。Airflow データベースのパフォーマンスが悪いのは、スケジューラが遅いのが原因の可能性があります。

メンテナンスの時間枠でタスクのスケジューリングを回避する

環境のメンテナンスの時間枠を定義して、DAG の実行時間外に環境のメンテナンスが行われるようにすることができます。一部のタスクの中断と再試行が許可されていれば、メンテナンスの時間枠で DAG を引き続き実行できます。メンテナンス時間枠が環境に与える影響について詳しくは、メンテナンス時間枠を指定するをご覧ください。

DAG での「wait_for_downstream」の使用

DAG で wait_for_downstream パラメータを True に設定した場合、タスクが成功するには、このタスクのすぐにダウンストリームにあるすべてのタスクも成功する必要があります。つまり、特定の DAG 実行に属するタスクの実行が、前の DAG 実行のタスクの実行によって遅延させられる可能性があります。詳しくは、Airflow のドキュメントをご覧ください。

キューに長く入りすぎていたタスクがキャンセルされ、再スケジュールされます

Airflow タスクがキューに長時間保持されると、スケジューラは [scheduler]task_queued_timeout Airflow 構成オプションで設定された時間が経過した後に、実行のためにもう一度再スケジュールします。デフォルト値は 2400 です。2.3.1 より前の Airflow バージョンでは、タスクは失敗としてマークされ、適格な場合は再試行されます。

この状況の症状を監視する 1 つの方法は、キューに入れられたタスクの数をグラフで確認することです(Cloud Composer UI の [モニタリング] タブ)。このグラフのスパイクが約 2 時間以内に低下しない場合は、タスクが再スケジュールされ(ログなし)、スケジューラ ログに「Adopted tasks were still pending ...」というログエントリが記録される可能性が高くなります。このような場合、タスクが実行されなかったことから、Airflow タスクログに「ログファイルが見つかりません...」というメッセージが表示されることがあります。

通常、この動作は想定されており、スケジュールされたタスクの次のインスタンスはスケジュールに従って実行されます。Cloud Composer 環境でこのようなケースが多数発生している場合は、スケジュールされたすべてのタスクを処理するのに十分な Airflow ワーカーが環境内にないことを意味している可能性があります。

解決策: この問題を解決するには、キューに入っているタスクを実行するための容量が Airflow ワーカーに常に確保されるようにします。たとえば、ワーカー数または worker_concurrency を増やすことができます。並列処理またはプールを調整して、容量を超えるタスクがキューに追加されないようにすることもできます。

min_file_process_interval パラメータに対する Cloud Composer のアプローチ

Cloud Composer では、Airflow スケジューラによる [scheduler]min_file_process_interval の使用方法が変更されます。

Airflow 1

Airflow 1 を使用する Cloud Composer の場合、ユーザーは [scheduler]min_file_process_interval の値を 0 ~ 600 秒に設定できます。600 秒を超える値を指定すると、[scheduler]min_file_process_interval が 600 秒に設定されている場合と同じ結果になります。

Airflow 2

1.19.9 より前のバージョンの Cloud Composer では、[scheduler]min_file_process_interval は無視されます。

Cloud Composer バージョン 1.19.9 以降:

すべての DAG が特定の回数スケジュールされた後、Airflow スケジューラが再起動されます。[scheduler]num_runs パラメータは、スケジューラによって実行される回数を制御します。スケジューラが [scheduler]num_runs スケジューリング ループに達すると、再起動します。スケジューラはステートレス コンポーネントであり、このような再起動は、スケジューラで発生する可能性がある問題に対する自動修復メカニズムです。[scheduler]num_runs のデフォルト値は 5000 です。

[scheduler]min_file_process_interval を使用して、DAG 解析の実行頻度を構成できますが、このパラメータは、DAG のスケジューリング時にスケジューラが [scheduler]num_runs ループを実行するのに必要な時間より長く設定することはできません。

dagrun_timeout に達した後にタスクを失敗としてマークする

DAG の実行が dagrun_timeout(DAG パラメータ)内で完了しない場合、スケジューラは完了していない(実行中、スケジュールされた、キュー内の)タスクを「失敗」としてマークします。

解決方法:

  • タイムアウトを満たすように dagrun_timeout を延長します。

Airflow データベースの負荷が大きい場合の兆候

Airflow スケジューラのログに次のような警告ログエントリが表示されることがあります。

Scheduler heartbeat got an exception: (_mysql_exceptions.OperationalError) (2006, "Lost connection to MySQL server at 'reading initial communication packet', system error: 0")"

Airflow ワーカーのログでも同様の症状が確認される場合があります。

MySQL の場合:

(_mysql_exceptions.OperationalError) (2006, "Lost connection to MySQL server at
'reading initial communication packet', system error: 0")"

PostgreSQL の場合:

psycopg2.OperationalError: connection to server at ... failed

このようなエラーや警告は、Airflow データベースが、オープン接続の数や、スケジューラやワーカー、トリガー、ウェブサーバーなどの他の Airflow コンポーネントによって同時に実行されるクエリの数に圧倒されている症状である場合があります。

解決策の提示

ウェブサーバーに「スケジューラが実行されていないようです」という警告が表示される

スケジューラは、ハートビートを定期的に Airflow データベースに報告します。この情報に基づいて、Airflow ウェブサーバーはスケジューラがアクティブかどうかを判断します。

スケジューラに負荷がかかっている場合、[scheduler]scheduler_heartbeat_sec ごとにハートビートを報告できなくなることがあります。

このような場合、Airflow ウェブサーバーに次のような警告が表示されることがあります。

The scheduler does not appear to be running. Last heartbeat was received <X>
seconds ago.

解決策の提示

  • スケジューラのリソースの CPU とメモリを増やします。

  • DAG の解析とスケジュール設定が高速になってスケジューラのリソースを消費しすぎないように DAG を最適化します。

  • Airflow DAG でグローバル変数を使用しないようにします。代わりに、環境変数Airflow 変数を使用します。

  • [scheduler]scheduler_health_check_threshold Airflow 構成オプションの値を大きくして、ウェブサーバーがスケジューラの使用不能を報告するまでの待機時間を長く取るようにします。

DAG のバックフィル中に発生した問題の回避策

すでに実行されている DAG の再実行が必要になることもあります。これは、Airflow CLI コマンドを使用して次の方法で行うことができます。

Airflow 2

gcloud composer environments run \
  ENVIRONMENT_NAME \
  --location LOCATION \
   dags backfill -- -B \
   -s START_DATE \
   -e END_DATE \
   DAG_NAME

特定の DAG で失敗したタスクのみを再実行するには、--rerun-failed-tasks 引数も使用します。

Airflow 1

gcloud composer environments run \
  ENVIRONMENT_NAME \
  --location LOCATION \
  backfill -- -B \
  -s START_DATE \
  -e END_DATE \
  DAG_NAME

特定の DAG で失敗したタスクのみを再実行するには、--rerun_failed_tasks 引数も使用します。

以下のように置き換えます。

  • ENVIRONMENT_NAME を環境の名前に置き換えます。
  • LOCATION は、環境が配置されているリージョン。
  • START_DATE は、start_date DAG パラメータの値(YYYY-MM-DD 形式)に置き換えます。
  • END_DATE は、end_date DAG パラメータの値(YYYY-MM-DD 形式)に置き換えます。
  • DAG_NAME は、DAG の名前に置き換えます。

バックフィルを実行すると、タスクがロックされているためにバックフィルができないデッドロックが発生することがあります。次に例を示します。

2022-11-08 21:24:18.198 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.201 CET -------- --------- -------- ------------
2022-11-08 21:24:18.202 CET 2022-11-08 21:24:18.203 CET These tasks are deadlocked:
2022-11-08 21:24:18.203 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.204 CET ----------------------- ----------- ----------------------------------- ------------
2022-11-08 21:24:18.204 CET <DAG name> <Task name> backfill__2022-10-27T00:00:00+00:00 1
2022-11-08 21:24:19.249 CET Command exited with return code 1
...
2022-11-08 21:24:19.348 CET Failed to execute job 627927 for task backfill

場合によっては、次の回避策を使用するとデッドロックを克服できます。

  • [core]schedule_after_task_executionFalseオーバーライドして、ミニスケジューラを無効にします。

  • 期間を絞ってバックフィルを実行します。たとえば、START_DATEEND_DATE を設定して 1 日だけの期間を指定します。

次のステップ