Airflow スケジューラに関する問題のトラブルシューティング

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

このページでは、Airflow スケジューラの一般的な問題に関する情報とトラブルシューティングの手順について説明します。

問題の原因を特定する

トラブルシューティングを開始するには、問題の発生が DAG 解析時間か、実行時間のタスクの処理中かを特定します。解析時間と実行時間について詳しくは、DAG 解析時間と DAG 実行時間の違いをご覧ください。

DAG プロセッサのログの検査

DAG が複雑な場合、スケジューラによって実行される DAG プロセッサがすべての DAG を解析しないことがあります。これにより、次のような症状が見られるさまざまな問題が発生する可能性があります。

症状:

  • DAG の解析中に DAG プロセッサで問題が発生した場合、下記の問題が組み合わさって発生することがあります。DAG が動的に生成される場合、これらの問題は静的 DAG よりも影響が大きい可能性があります。

  • Airflow UI と DAG UI で DAG が表示されない。

  • DAG の実行がスケジュール設定されない。

  • DAG プロセッサ ログにエラーがある。例:

    dag-processor-manager [2023-04-21 21:10:44,510] {manager.py:1144} ERROR -
    Processor for /home/airflow/gcs/dags/dag-example.py with PID 68311 started
    at 2023-04-21T21:09:53.772793+00:00 has timed out, killing it.
    

    または

    dag-processor-manager [2023-04-26 06:18:34,860] {manager.py:948} ERROR -
    Processor for /home/airflow/gcs/dags/dag-example.py exited with return
    code 1.
    
  • Airflow スケジューラに問題が発生し、スケジューラが再起動する。

  • 実行がスケジュールされている Airflow タスクがキャンセルされ、解析に失敗した DAG の DAG 実行が failed とマークされる可能性があります。例:

    airflow-scheduler Failed to get task '<TaskInstance: dag-example.task1--1
    manual__2023-04-17T10:02:03.137439+00:00 [removed]>' for dag
    'dag-example'. Marking it as removed.
    

解決策:

  • DAG の解析に関連付けられたパラメータを増やします。

    • dagbag-import-timeout を 120 秒以上(必要に応じてそれ以上)に増やします。

    • dag-file-processor-timeout を 180 秒以上(必要に応じてそれ以上)に増やします。この値は dagbag-import-timeout より大きい値にする必要があります。

  • DAG プロセッサに問題を引き起こす DAG を修正するか、削除します。

DAG の解析時間の検査

問題が DAG 解析時間に発生したかどうかを確認するには、以下の手順に従います。

コンソール

Google Cloud コンソールでは、[モニタリング] ページと [ログ] タブを使用して DAG の解析時間を調べることができます。

Cloud Composer の [モニタリング] ページで DAG の解析時間を調べる

  1. Google Cloud コンソールで [環境] ページに移動します。

    [環境] に移動

  2. 環境のリストで、ご利用の環境の名前をクリックします。[モニタリング] ページが開きます。

  3. [モニタリング] タブで、[DAG 実行] セクションの [すべての DAG ファイルの合計解析時間] グラフを調べて、問題かあるかどうか確認します。

    [Composer Monitoring] タブの [DAG 実行] セクションには、環境内の DAG の健全性指標が表示されます。

Cloud Composer の [ログ] タブで DAG の解析時間を調べる

  1. Google Cloud コンソールで [環境] ページに移動します。

    [環境] に移動

  2. 環境のリストで、ご利用の環境の名前をクリックします。[モニタリング] ページが開きます。

  3. [ログ] タブに移動し、[すべてのログ] ナビゲーション ツリーから [DAG プロセッサ マネージャー] セクションを選択します。

  4. dag-processor-manager ログを確認し、問題かあるかどうか確認します。

    DAG プロセッサのログに DAG の解析時間が表示されます

gcloud

すべての DAG の解析時間を表示するには、dags report コマンドを使用します。

gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags report

以下のように置き換えます。

  • ENVIRONMENT_NAME を環境の名前にする。
  • LOCATION は、環境が配置されているリージョン。

コマンドの出力は次のようになります。

Executing within the following Kubernetes cluster namespace: composer-2-0-31-airflow-2-3-3
file                  | duration       | dag_num | task_num | dags
======================+================+=========+==========+===================
/manydagsbig.py       | 0:00:00.038334 | 2       | 10       | serial-0,serial-0
/airflow_monitoring.py| 0:00:00.001620 | 1       | 1        | airflow_monitoring

表に記載されている各 DAG の duration 値を探します。 値が大きい場合、いずれかの DAG が最適な方法で実装されていない可能性があります。出力テーブルから、解析時間が長い DAG を確認できます。

実行中のタスクとキューに入れられたタスクのモニタリング

キュー内でタスクが滞留しているかどうかを確認するには、次の手順に従います。

  1. Google Cloud コンソールで [環境] ページに移動します。

    [環境] に移動

  2. 環境のリストで、ご利用の環境の名前をクリックします。 [環境の詳細] ページが開きます。

  3. [Monitoring] タブに移動します。

  4. [モニタリング] タブで、[DAG 実行] セクションの [Airflow タスク] グラフを調べて、問題があるかどうか確認します。Airflow タスクは、Airflow でキューに格納された状態のタスクであり、Celery または Kubernetes Executor のブローカー キューに移動できます。Celery キューに入れられたタスクは、Celery ブローカーのキューに入れられたタスク インスタンスです。

DAG 解析時間の問題のトラブルシューティング

以下のセクションでは、DAG 解析時間の一般的な問題の症状と想定される修正方法について説明します。

スレッド数の上限

DAG プロセッサ マネージャー(DAG ファイルを処理するスケジューラの一部)に、限られた数のスレッドのみの使用を許可すると、DAG 解析時間に影響する可能性があります。

この問題を解決するには、次の Airflow 構成オプションをオーバーライドします。

  • Airflow 1.10.12 以前のバージョンでは、max_threads パラメータをオーバーライドします。

    セクション キー メモ
    scheduler max_threads NUMBER_OF_CORES_IN_MACHINE - 1 NUMBER_OF_CORES_IN_MACHINE をワーカーノード マシンのコア数
    に置き換えます。
  • Airflow 1.10.14 以降のバージョンでは、parsing_processes パラメータをオーバーライドします。

    セクション キー メモ
    scheduler parsing_processes NUMBER_OF_CORES_IN_MACHINE - 1 NUMBER_OF_CORES_IN_MACHINE をワーカーノード マシンのコア数
    に置き換えます。

タスクの数と時間の分布

Airflow では、多数の小さなタスクのスケジュールを設定する際に問題が発生することがわかっています。このような状況では、少数の統合されたタスクを選ぶ必要があります。

多数の DAG やタスクを同時にスケジュール設定すると、問題の原因になる可能性もあります。この問題を回避するには、タスクを長い時間をかけて均等に分散します。

実行中のタスクとキューに入れられたタスクのトラブルシューティング

以降のセクションでは、実行中のタスクとキューに入れられたタスクの一般的な問題の症状と想定される修正方法について説明します。

タスクキューが長すぎる

タスクキューがスケジューラにとって長すぎる場合もあります。ワーカー パラメータと celery パラメータを最適化する方法については、Cloud Composer 環境をビジネスとともにスケーリングするをご覧ください。

Airflow スケジューラの TimeTable 機能の使用

Airflow 2.2 以降では、TimeTable という新機能を使用して DAG の時刻表を定義できます。

時刻表は、次のいずれかの方法で定義できます。

制限されたクラスタ リソース

このセクションは、Cloud Composer 1 にのみ適用されます。

環境内の GKE クラスタがすべての DAG とタスクを処理できないほど小さい場合、パフォーマンスの問題が発生する可能性があります。この場合は、次のいずれかの解決策を試してください。

  • パフォーマンスを向上させるマシンタイプを使用して新しい環境を作成し、DAG を移行する。
  • 追加の Cloud Composer 環境を作成し、環境間で DAG を分割します。
  • GKE ノードのマシンタイプをアップグレードするの説明に従って、GKE ノードのマシンタイプを変更します。この手順はエラーが発生しやすいため、最も推奨されないオプションです。
  • gcloud composer environments update コマンドなどを使用して、ご使用の環境で Airflow データベースを実行する Cloud SQL インスタンスのマシンタイプをアップグレードする。Airflow データベースのパフォーマンスが悪いのは、スケジューラが遅いのが原因の可能性があります。

メンテナンスの時間枠でタスクのスケジューリングを回避する

使用する環境に固有のメンテナンスの時間枠を定義できます。この期間中に、Cloud SQL と GKE のメンテナンス イベントが行われます。

Airflow スケジューラに不要なファイルを無視させる

DAG フォルダ内の不要なファイルをスキップすることで、Airflow スケジューラのパフォーマンスを向上させることができます。Airflow スケジューラは、.airflowignore ファイルで指定されたファイルとフォルダを無視します。

Airflow スケジューラに不要なファイルを無視させるには:

  1. .airflowignore ファイルを作成します。
  2. このファイルで、無視するファイルとフォルダを一覧表示します。
  3. 環境のバケット内の /dags フォルダにこのファイルをアップロードします。

.airflowignore ファイル形式の詳細については、Airflow のドキュメントをご覧ください。

Airflow スケジューラが停止した DAG を処理する

DAG を一時停止してその実行を回避します。これにより、Airflow ワーカーの処理サイクルが節約されます。

Airflow スケジューラは一時停止した DAG の解析を続けます。Airflow スケジューラのパフォーマンスを向上させるには、.airflowignore を使用するか、一時停止した DAG を DAG フォルダから削除します。

DAG での「wait_for_downstream」の使用

DAG で wait_for_downstream パラメータを True に設定した場合、タスクが成功するには、このタスクのすぐにダウンストリームにあるすべてのタスクも成功する必要があります。つまり、特定の DAG 実行に属するタスクの実行が、前の DAG 実行のタスクの実行によって遅くなる可能性があります。詳しくは、Airflow のドキュメントをご覧ください。

キューに長く入りすぎていたタスクがキャンセルされ、再スケジュールされます

Airflow タスクがキューに長時間保持されると、スケジューラは実行するためにもう一度再スケジュールします(2.3.1 より前の Airflow バージョンでも、タスクは失敗としてマークされ、適格な場合は再試行されます)。

この状況の症状を確認する 1 つの方法は、キューに入れられたタスクの数を含むグラフ(Cloud Composer UI の [Monitoring] タブ)を確認し、このグラフのスパイクが約 2 時間以内に低下しない場合は、タスクが再スケジュールされる可能性が高く(ログなし)、スケジューラのログに「採用されたタスクは依然として保留中です」というログエントリが続きます。このような場合、タスクが実行されなかったことから、Airflow タスクログに「ログファイルが見つかりません...」というメッセージが表示されることがあります。

通常、この動作は想定されており、スケジュールされたタスクの次のインスタンスはスケジュールに従って実行されます。Cloud Composer 環境でこのようなケースが多数発生している場合は、スケジュールされたすべてのタスクを処理するのに十分な Airflow ワーカーが環境内にないことを意味している可能性があります。

解決策: この問題を解決するには、キューに入っているタスクを実行するための容量が Airflow ワーカーに常に確保されていることを確認する必要があります。たとえば、ワーカー数または worker_concurrency を増やすことができます。並列処理またはプールを調整して、容量を超えるタスクがキューに追加されないようにすることもできます。

分散したタスクが特定の DAG の実行をブロックすることがたまにあります

多くの場合、Airflow スケジューラは、キューに古いタスクがあり、なんらかの理由でこれらのタスクを正常に実行できない状況(古いタスクが属する DAG が削除されるなど)に対応できます。

これらの古いタスクがスケジューラによって消去されない場合は、手動で削除する必要があります。これを行うには、Airflow UI で [メニュー] > [ブラウザ] > [タスク インスタンス] の順に移動し、古い DAG に属するキュー内のタスクを削除します。

この問題を解決するには、環境を Cloud Composer バージョン 2.1.12 以降にアップグレードします。

[scheduler]min_file_process_interval パラメータに対する Cloud Composer のアプローチ

Cloud Composer では、Airflow スケジューラによる [scheduler]min_file_process_interval の使用方法が変更されます。

Airflow 1

Airflow 1 を使用する Cloud Composer の場合、ユーザーは [scheduler]min_file_process_interval の値を 0~600 秒に設定できます。600 秒を超える値を指定すると、[scheduler]min_file_process_interval が 600 秒に設定されている場合と同じ結果になります。

Airflow 2

Airflow 2 では、[scheduler]min_file_process_interval はバージョン 1.19.9 と 2.0.26 以降でのみ使用できます。

  • バージョン 1.19.9 および 2.0.26 より前の Cloud Composer

    これらのバージョンでは、[scheduler]min_file_process_interval は無視されます。

  • Cloud Composer バージョン 1.19.9、2.0.26、またはより新しいバージョン

    すべての DAG が特定の回数スケジュールされた後、Airflow スケジューラが再起動さます。[scheduler]num_runs パラメータは、スケジューラによって実行される回数を制御します。スケジューラが [scheduler]num_runs スケジューリング ループに達すると、再起動します。スケジューラはステートレス コンポーネントであり、このような再起動は、スケジューラで発生する可能性がある問題に対する自動修復メカニズムです。 指定しない場合は、デフォルト値の [scheduler]num_runs が適用され、5,000 になります。

    [scheduler]min_file_process_interval を使用して、DAG 解析の実行頻度を構成できますが、このパラメータは、DAG のスケジューリング時にスケジューラが [scheduler]num_runs ループを実行するのに必要な時間より長く設定することはできません。

Airflow 構成のスケーリング

Airflow には、Airflow で同時に実行できるタスクと DAG の数を制御する Airflow 構成オプションが用意されています。これらの構成オプションを設定するには、環境に合わせて値をオーバーライドします。

  • ワーカーの同時実行

    [celery]worker_concurrency パラメータは、Airflow ワーカーが同時に実行できるタスクの最大数を制御します。このパラメータの値に Cloud Composer 環境の Airflow ワーカー数を掛けると、環境内の特定の時点で実行できるタスクの最大数が取得されます。この数は、[core]parallelism Airflow 構成オプションによって制限されます。詳細は以下で説明します。

    Cloud Composer 2 環境では、デフォルト値の [celery]worker_concurrency が自動的に計算されます

    • Airflow バージョン 2.3.3 以降の場合、[celery]worker_concurrency は 32、12 × worker_CPU、8 × worker_memory のうちの最小値に設定されます。

    • Airflow バージョン 2.2.5 以前では、[celery]worker_concurrency は 12 × ワーカーの CPU 数に設定されます。

  • 最大の有効な DAG 実行数

    [core]max_active_runs_per_dag Airflow 構成オプションは、DAG あたりのアクティブな DAG 実行の最大数を制御します。この上限に達すると、スケジューラは DAG 実行を作成しません。

    このパラメータが正しく設定されていない場合、スケジューラが DAG 実行を抑制するという問題が発生する可能性があります。これはスケジューラが特定の時点で DAG 実行インスタンスを作成できなくなるためです。

  • DAG あたりの最大アクティブ タスク数

    [core]max_active_tasks_per_dag Airflow 構成オプションは、各 DAG で同時に実行できるタスク インスタンスの最大数を制御します。これは、DAG レベルのパラメータです。

    このパラメータが正しく設定されていない場合、1 つの DAG インスタンスの実行速度が遅い問題が発生する可能性があります。これは、ある瞬間に実行できる DAG タスクの数には限りがあるからです。

    解決策: [core]max_active_tasks_per_dag を増やします。

  • 並列処理とプールサイズ

    [core]parallelism Airflow 構成オプションは、これらのタスクのすべての依存関係が満たされた後に、Airflow スケジューラが Executor のキュー内にキューに入れるタスクの数を制御します。

    これは、Airflow の設定全体に適用されるグローバル パラメータです。

    タスクはプール内でキューに保存され、実行されます。Cloud Composer 環境では、1 つのプールのみが使用されます。このプールのサイズによって、特定の時間に実行するためにスケジューラによってキューに入れられるタスクの数が制御されます。プールサイズが小さすぎると、スケジューラは、[core]parallelism 構成オプションと Airflow ワーカーの数を掛けた値がまだ満たされていない [celery]worker_concurrency 構成オプションによって定義されたしきい値であっても、実行するタスクをキューに追加できません。

    プールサイズは、Airflow UI([メニュー] > [管理] > [プール])で構成できます。プールサイズを、環境で想定する並列処理のレベルに調整します。

    通常、[core]parallelism は、ワーカーの最大数と [celery]worker_concurrency の積が設定されます。

DAG プロセッサのタイムアウトにより、DAG がスケジューラによってスケジュールされない

この問題の詳細については、DAG のトラブルシューティングをご覧ください。

dagrun_timeout に達した後にタスクを失敗としてマークする

DAG の実行が dagrun_timeout(DAG パラメータ)内で完了しない場合、スケジューラは完了していない(実行中、スケジュールされた、キュー内の)タスクを「失敗」としてマークします。

解決方法:

Airflow データベースに負荷がかかっている場合の兆候

Airflow スケジューラのログに次のような警告ログエントリが表示される場合があります。

Scheduler heartbeat got an exception: (_mysql_exceptions.OperationalError) (2006, "Lost connection to MySQL server at 'reading initial communication packet', system error: 0")"

Airflow ワーカーのログにも同様の症状が確認される場合があります。

MySQL の場合:

(_mysql_exceptions.OperationalError) (2006, "Lost connection to MySQL server at
'reading initial communication packet', system error: 0")"

PostgreSQL の場合:

psycopg2.OperationalError: connection to server at ... failed

このようなエラーや警告は、Airflow データベースが、オープン接続の数や、スケジューラやワーカー、トリガー、ウェブサーバーなどの他の Airflow コンポーネントによって同時に実行されるクエリの数に圧倒されている症状である場合があります。

解決策の提示

ウェブサーバーに「スケジューラが実行されていないようです」という警告が表示される

スケジューラは、ハートビートを定期的に Airflow データベースに報告します。この情報に基づいて、Airflow ウェブサーバーはスケジューラがアクティブかどうかを判断します。

スケジューラに負荷がかかっている場合、[scheduler]scheduler-heartbeat-sec ごとにハートビートを報告できないことがあります。

このような場合、Airflow ウェブサーバーに次のような警告が表示されることがあります。

The scheduler does not appear to be running. Last heartbeat was received <X>
seconds ago.

解決策の提示

  • スケジューラのリソースの CPU とメモリを増やします。

  • DAG の解析とスケジュール設定が高速になってスケジューラのリソースを消費しすぎないように DAG を最適化します。

  • Airflow DAG でグローバル変数(Cloud Composer 環境変数Airflow 変数)を使用しないようにします。

  • [scheduler]scheduler-health-check-threshold の値を大きくして、ウェブサーバーがスケジューラの使用不能を報告するまでの待機時間を長く取るようにします。

DAG のバックフィル中に発生した問題の回避策

すでに実行されている DAG の再実行が必要になることもあります。これは、Airflow コマンドライン ツールを使用して次の方法で行うことができます。

Airflow 1

gcloud composer environments run \
  ENVIRONMENT_NAME \
  --location LOCATION \
  backfill -- -B \
  -s START_DATE \
  -e END_DATE \
  DAG_NAME

特定の DAG で失敗したタスクのみを再実行するには、--rerun_failed_tasks 引数も使用します。

Airflow 2

gcloud composer environments run \
  ENVIRONMENT_NAME \
  --location LOCATION \
   dags backfill -- -B \
   -s START_DATE \
   -e END_DATE \
   DAG_NAME

特定の DAG で失敗したタスクのみを再実行するには、--rerun-failed-tasks 引数も使用します。

以下のように置き換えます。

  • ENVIRONMENT_NAME を環境の名前にする。
  • LOCATION は、環境が配置されているリージョン。
  • START_DATE は、start_date DAG パラメータの値(YYYY-MM-DD 形式)に置き換えます。
  • END_DATE は、end_date DAG パラメータの値(YYYY-MM-DD 形式)に置き換えます。
  • DAG_NAME は、DAG の名前に置き換えます。

バックフィルを実行すると、タスクがロックされているためにバックフィルができないデッドロックが発生することがあります。例:

2022-11-08 21:24:18.198 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.201 CET -------- --------- -------- ------------
2022-11-08 21:24:18.202 CET 2022-11-08 21:24:18.203 CET These tasks are deadlocked:
2022-11-08 21:24:18.203 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.204 CET ----------------------- ----------- ----------------------------------- ------------
2022-11-08 21:24:18.204 CET <DAG name> <Task name> backfill__2022-10-27T00:00:00+00:00 1
2022-11-08 21:24:19.249 CET Command exited with return code 1
...
2022-11-08 21:24:19.348 CET Failed to execute job 627927 for task backfill

場合によっては、次の回避策を使用するとデッドロックを克服できます。

  • [core]schedule-after-task-executionFalseオーバーライドして、ミニスケジューラを無効にします。

  • 期間を絞ってバックフィルを実行します。たとえば、START_DATEEND_DATE を設定して 1 日だけの期間を指定します。

次のステップ