Airflow スケジューラに関する問題のトラブルシューティング

Cloud Composer 1 | Cloud Composer 2

このページでは、Airflow スケジューラに関する一般的な問題のトラブルシューティング手順と情報について説明します。

問題の原因を特定する

トラブルシューティングを開始するには、問題の発生が DAG 解析時間か、実行時間のタスクの処理中かを特定します。解析時間と実行時間について詳しくは、DAG 解析時間と DAG 実行時間の違いをご覧ください。

DAG の解析時間の検査

問題が DAG 解析時間に発生したかどうかを確認するには、以下の手順に従います。

Console

  1. Google Cloud Console で [環境] ページに移動します。

    [環境] に移動

  2. 環境のリストで、ご利用の環境の名前をクリックします。[環境の詳細] ページが開きます。

  3. [モニタリング] タブに移動します。

  4. [モニタリング] タブで、[DAG 実行] セクションの [すべての DAG ファイルの合計解析時間] グラフを調べて、問題かあるかどうか確認します。

    [Composer Monitoring] タブの [DAG 実行] セクションには、環境内の DAG の健全性指標が表示されます。

gcloud

すべての DAG の解析時間を表示するには、list_dags コマンドで -r フラグを指定します。

gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    list_dags -- -r

次のように置き換えます。

  • ENVIRONMENT_NAME を環境の名前にする。
  • LOCATION は、環境が配置されているリージョン。

コマンドの出力は次のようになります。

-------------------------------------------------------------------
DagBag loading stats for /home/airflow/gcs/dags
-------------------------------------------------------------------
Number of DAGs: 5
Total task number: 13
DagBag parsing time: 0.6765180000000001
-----------+----------+---------+----------+-----------------------
file       | duration | dag_num | task_num | dags
-----------+----------+---------+----------+-----------------------
/dag_1.py  | 0.6477   |       1 |        2 | ['dag_1']
/dag_2.py  | 0.018652 |       1 |        2 | ['dag_2']
/dag_3.py  | 0.004024 |       1 |        6 | ['dag_3']
/dag_4.py  | 0.003476 |       1 |        2 | ['dag_4']
/dag_5.py  | 0.002666 |       1 |        1 | ['dag_5']
-----------+----------+---------+----------+-----------------------

DagBag 解析時間の値を探します。値が大きい場合、いずれかの DAG が最適な方法で実装されていない可能性があります。出力テーブルから、解析時間が長い DAG を確認できます。

実行中のタスクとキューに入れられたタスクのモニタリング

キュー内でタスクが滞留しているかどうかを確認するには、次の手順に従います。

  1. Google Cloud Console で [環境] ページに移動します。

    [環境] に移動

  2. 環境のリストで、ご利用の環境の名前をクリックします。 [環境の詳細] ページが開きます。

  3. [モニタリング] タブに移動します。

  4. [モニタリング] タブで、[DAG 実行] セクションの [実行中のタスクとキューに入れられたタスク] グラフを調べて、問題があるかどうか確認します。

DAG 解析時間の問題のトラブルシューティング

以下のセクションでは、DAG 解析時間の一般的な問題の症状と想定される修正方法について説明します。

スレッド数の上限

DAG プロセッサ マネージャー(DAG ファイルを処理するスケジューラの一部)に、限られた数のスレッドのみの使用を許可すると、DAG 解析時間に影響する可能性があります。

この問題を解決するには、airflow.cfg 構成ファイルに次の変更を適用します。

  • Airflow 1.10.12 以前のバージョンでは、max_threads パラメータを使用します。

    [scheduler]
    max_threads = <NUMBER_OF_CORES_IN_MACHINE - 1>
    
  • Airflow 1.10.14 以降のバージョンでは、parsing_processes パラメータを使用します。

    [scheduler]
    parsing_processes = <NUMBER_OF_CORES_IN_MACHINE - 1>
    

NUMBER_OF_CORES_IN_MACHINE をワーカーノード マシンのコア数に置き換えます。

タスクの数と時間の分布

Airflow では、多数の小さなタスクのスケジュールを設定する際に問題が発生することがわかっています。このような状況では、少数の統合されたタスクを選ぶ必要があります。

多数の DAG やタスクを同時にスケジュール設定すると、問題の原因になる可能性もあります。この問題を回避するには、タスクを長い時間をかけて均等に分散します。

実行中のタスクとキューに入れられたタスクのトラブルシューティング

以降のセクションでは、実行中のタスクとキューに入れられたタスクの一般的な問題の症状と想定される修正方法について説明します。

タスクキューが長すぎる

タスクキューがスケジューラにとって長すぎる場合もあります。ワーカー パラメータと celery パラメータを最適化する方法については、Cloud Composer 環境をビジネスとともにスケーリングするをご覧ください。

制限されたクラスタ リソース

このセクションは Cloud Composer 1 にのみ適用されます。

環境内の GKE クラスタがすべての DAG とタスクを処理できないほど小さい場合、パフォーマンスの問題が発生する可能性があります。この場合は、次のいずれかの解決策を試してください。

  • パフォーマンスを向上させるマシンタイプを使用して新しい環境を作成し、DAG を移行する。
  • 追加の Cloud Composer 環境を作成し、環境間で DAG を分割します。
  • GKE ノードのマシンタイプをアップグレードするの説明に従って、GKE ノードのマシンタイプを変更します。この手順はエラーが発生しやすいため、最も推奨されないオプションです。
  • gcloud composer environments update コマンドなどを使用して、ご使用の環境で Airflow データベースを実行する Cloud SQL インスタンスのマシンタイプをアップグレードする。Airflow データベースのパフォーマンスが悪いのは、スケジューラが遅いのが原因の可能性があります。

メンテナンスの時間枠でタスクのスケジューリングを回避する

使用する環境に固有のメンテナンスの時間枠を定義できます。この期間中に、Cloud SQL と GKE のメンテナンス イベントが行われます。

Airflow スケジューラに不要なファイルを無視させる

DAG フォルダ内の不要なファイルをスキップすることで、Airflow スケジューラのパフォーマンスを向上させることができます。Airflow スケジューラは、.airflowignore ファイルで指定されたファイルとフォルダを無視します。

Airflow スケジューラに不要なファイルを無視させるには:

  1. .airflowignore ファイルを作成します。
  2. このファイルで、無視するファイルとフォルダを一覧表示します。
  3. 環境のバケット内の /dags フォルダにこのファイルをアップロードします。

.airflowignore ファイル形式の詳細については、Airflow のドキュメントをご覧ください。

Airflow スケジューラが停止した DAG を処理する

DAG を一時停止してその実行を回避します。これにより、Airflow ワーカーの処理サイクルが節約されます。

Airflow スケジューラは一時停止した DAG の解析を続けます。Airflow スケジューラのパフォーマンスを向上させるには、.airflowignore を使用するか、一時停止した DAG を DAG フォルダから削除します。

DAG での「wait_for_downstream」の使用

DAG で wait_for_downstream パラメータを True に設定した場合、タスクが成功すると、このタスクのすぐダウンストリームにあるすべてのタスクも成功する必要があります。つまり、特定の DAG 実行に属するタスクの実行は、以前の DAG 実行からのタスクの実行によって遅くなる可能性があります。詳細については、Airflow のドキュメントをご覧ください。

キューに長く入りすぎていたタスクがキャンセルされ、再スケジュールされます

Airflow タスクがキューに長時間保持されると、スケジューラは failed/up_for_retry とマークし、再スケジュールして実行します。この状況の症状を確認する 1 つの方法は、キューに入れられたタスクの数を含むグラフを確認します(Cloud Composer UI の [モニタリング] タブ)。このグラフの急増が 10 分以内に低下しない場合、タスクは失敗した可能性があり(ログなし)、スケジューラ ログに「取り込まれたタスクがまだ保留中です...」というログエントリが続きます。 このような場合、タスクが実行されなかったことから、Airflow タスクログに「ログファイルが見つかりません...」というメッセージが表示されることがあります。

一般に、このタスクの失敗は想定されており、スケジュールされたタスクの次のインスタンスは、スケジュールに従って実行されます。Cloud Composer 環境でこのようなケースが多数発生する場合は、環境内に、スケジュールされたすべてのタスクを処理するのに十分な Airflow ワーカーがないことを意味します。

解決策: この問題を解決するには、キューに入ったタスクを実行するための Airflow ワーカーの容量が常に確保されている必要があります。たとえば、ワーカーの数や worker_concurrency を増やすことができます。並列処理またはプールを調整して、キューに入れる容量を超えるタスクがキューに入れられるのを防ぐこともできます。

分散したタスクが特定の DAG の実行をブロックすることがたまにあります

多くの場合、Airflow スケジューラは、キューに古いタスクがあり、なんらかの理由でこれらのタスクを正常に実行できない状況(古いタスクが属する DAG が削除されるなど)に対応できます。

古いタスクがスケジューラによって削除されていない場合は、手動で削除する必要があります。これを行うには、Airflow UI で [メニュー] > [ブラウザ] > [タスク インスタンス] の順に移動し、古い DAG に属するキュー内のタスクを見つけて削除します。

Cloud Composer による min_file_process_interval パラメータへのアプローチ

Cloud Composer では、Airflow スケジューラによる min_file_process_interval の使用方法が変更されます。

Airflow 1 を使用する Cloud Composer の場合、ユーザーは min_file_process_interval の値を 0 〜 600 秒に設定できます。600 秒を超える値は、min_file_process_interval が 600 秒に設定されている場合と同じ結果になります。

Airflow 2 を使用する Cloud Composer の場合、ユーザーは min_file_process_interval の値を 0 〜 1,200 秒に設定できます。1,200 秒を超える値は、min_file_process_interval が 1,200 秒に設定されている場合と同じ結果になります。

Airflow 構成のスケーリング

Airflow には、Airflow が同時に実行できるタスクと DAG の数を制御する Airflow 構成オプションが用意されています。これらの構成オプションを設定するには、環境の値をオーバーライドします。

  • ワーカーの同時実行

    [celery]worker_concurrency パラメータは、Airflow ワーカーが同時に実行できるタスクの最大数を制御します。このパラメータの値に Cloud Composer 環境の Airflow ワーカーの数を掛けると、環境内の特定の時間に実行できるタスクの最大数が得られます。この数は、[core]parallelism Airflow 構成オプションによって制限されます。詳細については、こちらをご覧ください。

  • 最大の有効な DAG 実行数

    [core]max_active_runs_per_dag Airflow 構成オプションは、DAG ごとの有効な DAG 実行の最大数を制御します。DAG がこの上限に達すると、スケジューラはそれ以上 DAG 実行を作成しません。

    このパラメータが正しく設定されていない場合、スケジューラが DAG 実行を抑制するという問題が発生する可能性があります。これはスケジューラが特定の時点で DAG 実行インスタンスを作成できなくなるためです。

  • DAG の同時実行

    [core]dag_concurrency Airflow 構成オプションは、各 DAG で同時に実行できるタスク インスタンスの最大数を制御します。これは DAG レベルのパラメータです。

    このパラメータが正しく設定されていない場合、1 つの DAG インスタンスの実行速度が遅いという問題が発生する可能性があります。これは、一度に実行できる DAG タスクの数が限られているためです。

  • 並列処理とプールサイズ

    [core]parallelism Airflow 構成オプションは、これらのタスクのすべての依存関係が満たされた後に、Airflow スケジューラが Executor のキュー内にキューに入れるタスクの数を制御します。

    これは Airflow の設定全体のグローバル パラメータです。

    タスクはキューに入れられ、プール内で実行されます。Cloud Composer 環境では、プールが 1 つだけ使用されます。このプールのサイズは、スケジューラが特定の時間に実行するためにキューに入れられるタスクの数を制御します。プールサイズが小さすぎると、スケジューラは、[core]parallelism 構成オプションと [celery]worker_concurrency オプションに Airflow ワーカーの数を掛けた値で定義されたしきい値にまだ達しない場合でも、実行するタスクをキューに追加できません。

    Airflow UI ([メニュー] > [管理者] > [プール])でプールサイズを構成できます。プールサイズを、環境で期待する並列処理のレベルに調整します。

Airflow データベースの負荷が高い症状

Airflow スケジューラログに、次の警告ログエントリが表示されることがあります。

Scheduler heartbeat got an exception: (_mysql_exceptions.OperationalError) (2006, "Lost connection to MySQL server at 'reading initial communication packet', system error: 0")"

このようなエラーや警告は、Airflow Metadata データベースのオペレーションが過剰である症状の可能性があります。

解決策の提示

次のステップ