Airflow Scheduler に関する問題のトラブルシューティング

このページでは、Airflow Scheduler の一般的な問題に関する情報とトラブルシューティングの手順について説明します。

問題の原因を特定する

トラブルシューティングを開始するには、問題の発生が DAG 解析時間か、実行時間のタスクの処理中かを特定します。解析時間と実行時間について詳しくは、DAG 解析時間と DAG 実行時間の違いをご覧ください。

DAG の解析時間の検査

問題が DAG 解析時間に発生したかどうかを確認するには、以下の手順に従います。

コンソール

  1. Cloud Console で、[環境] ページに移動します。

    [環境] ページを開く

  2. リストから環境を選択します。
  3. [モニタリング] タブで、[DAG 実行] セクションの [すべての DAG ファイルの合計解析時間] グラフを調べて、問題かあるかどうか確認します。

    [Composer Monitoring] タブの [DAG 実行] セクションには、環境内の DAG の健全性指標が表示されます。

gcloud

すべての DAG の解析時間を表示するには、list_dags コマンドで -r フラグを指定します。

gcloud composer environments run ENVIRONMENT_NAME \
  --location ENVIRONMENT_LOCATION \
  list_dags -- -r

以下を置き換えます。

  • ENVIRONMENT_NAME: 環境の名前
  • ENVIRONMENT_LOCATION: 環境のリージョン

コマンドの出力は次のようになります。

-------------------------------------------------------------------
DagBag loading stats for /home/airflow/gcs/dags
-------------------------------------------------------------------
Number of DAGs: 5
Total task number: 13
DagBag parsing time: 0.6765180000000001
-----------+----------+---------+----------+-----------------------
file       | duration | dag_num | task_num | dags
-----------+----------+---------+----------+-----------------------
/dag_1.py  | 0.6477   |       1 |        2 | ['dag_1']
/dag_2.py  | 0.018652 |       1 |        2 | ['dag_2']
/dag_3.py  | 0.004024 |       1 |        6 | ['dag_3']
/dag_4.py  | 0.003476 |       1 |        2 | ['dag_4']
/dag_5.py  | 0.002666 |       1 |        1 | ['dag_5']
-----------+----------+---------+----------+-----------------------

DagBag 解析時間の値を探します。値が大きい場合、いずれかの DAG が最適な方法で実装されていない可能性があります。出力テーブルから、解析時間が長い DAG を確認できます。

実行中のタスクとキューに入れられたタスクのモニタリング

キュー内でタスクが滞留しているかどうかを確認するには、次の手順に従います。

  1. Cloud Console で、[環境] ページに移動します。

    [環境] ページを開く

  2. リストからプロジェクトを選択します。
  3. [モニタリング] タブで、[DAG 実行] セクションの [実行中のタスクとキューに入れられたタスク] グラフを調べて、問題があるかどうか確認します。

DAG 解析時間の問題のトラブルシューティング

以下のセクションでは、DAG 解析時間の一般的な問題の症状と想定される修正方法について説明します。

スレッド数の上限

DAG プロセッサ マネージャー(DAG ファイルを処理するスケジューラの一部)に、限られた数のスレッドのみの使用を許可すると、DAG 解析時間に影響する可能性があります。この問題を解決するには、airflow.cfg 構成ファイルに次の変更を適用します。

  • Airflow 1.10.12 以前のバージョンでは、max_threads パラメータを使用します。

    [scheduler]
    max_threads = <NUMBER_OF_CORES_IN_MACHINE - 1>
    
  • Airflow 1.10.14 以降のバージョンでは、parsing_processes パラメータを使用します。

    [scheduler]
    parsing_processes = <NUMBER_OF_CORES_IN_MACHINE - 1>
    

NUMBER_OF_CORES_IN_MACHINE をワーカーノード マシンのコア数に置き換えます。

タスクの数と時間の分布

Airflow では、多数の小さなタスクのスケジュールを設定する際に問題が発生することがわかっています。このような状況では、少数の統合されたタスクを選ぶ必要があります。

多数の DAG やタスクを同時にスケジュール設定すると、問題の原因になる可能性もあります。この問題を回避するには、タスクを長い時間をかけて均等に分散します。

実行中のタスクとキューに入れられたタスクのトラブルシューティング

以降のセクションでは、実行中のタスクとキューに入れられたタスクの一般的な問題の症状と想定される修正方法について説明します。

タスクキューが長すぎる

タスクキューが Scheduler にとって長すぎる場合もあります。ワーカー パラメータと celery パラメータを最適化する方法については、Cloud Composer 環境をビジネスとともにスケーリングするをご覧ください。

制限されたクラスタ リソース

環境内の GKE クラスタがすべての DAG とタスクを処理できないほど小さい場合、パフォーマンスの問題が発生する可能性があります。この場合は、次のいずれかの解決策を試してください。

  • パフォーマンスを向上させるマシンタイプを使用して新しい環境を作成し、DAG を移行する。
  • 追加の Cloud Composer 環境を作成し、環境間で DAG を分割します。
  • GKE ノードのマシンタイプをアップグレードするの説明に従って、GKE ノードのマシンタイプを変更します。この手順はエラーが発生しやすいため、最も推奨されないオプションです。
  • gcloud composer environments update コマンドなどを使用して、ご使用の環境で Airflow データベースを実行する Cloud SQL インスタンスのマシンタイプをアップグレードする。Airflow データベースのパフォーマンスが悪いのは、Scheduler が遅いのが原因の可能性があります。

メンテナンスの時間枠でタスクのスケジューリングを回避する

使用する環境に固有のメンテナンスの時間枠を定義できます。この期間中に、Cloud SQL と GKE のメンテナンス イベントが行われます。

スケジュールや実行の問題が生じる可能性があるため、メンテナンスの時間枠内に DAG 実行をスケジュールしないでください。

Airflow スケジューラに不要なファイルを無視させる

DAG フォルダ内の不要なファイルをスキップすることで、Airflow スケジューラのパフォーマンスを向上させることができます。Airflow スケジューラは、.airflowignore ファイルで指定されたファイルとフォルダを無視します。

Airflow スケジューラに不要なファイルを無視させるには:

  1. .airflowignore ファイルを作成します。
  2. このファイルで、無視するファイルとフォルダを一覧表示します。
  3. 環境のバケット内の /dags フォルダにこのファイルをアップロードします。

.airflowignore ファイル形式の詳細については、Airflow のドキュメントをご覧ください。

Airflow スケジューラが停止した DAG を処理する

DAG を一時停止してその実行を回避します。これにより、Airflow ワーカーの処理サイクルが節約されます。

Airflow スケジューラは一時停止した DAG の解析を続けます。Airflow スケジューラのパフォーマンスを向上させるには、.airflowignore を使用するか、一時停止した DAG を DAG フォルダから削除します。

DAG での「wait_for_downstream」の使用

DAG で wait_for_downstream パラメータを True に設定した場合、タスクが成功すると、このタスクのすぐにダウンストリームにあるすべてのタスクも成功する必要があります。つまり、特定の DAG 実行に属するタスクの実行は、以前の DAG 実行からのタスクの実行によって遅くなる可能性があります。詳細については、Airflow のドキュメントをご覧ください。

Airflow 構成のスケーリング

Airflow には、Airflow が同時に実行できるタスクと DAG の数を制御する Airflow 構成オプションが用意されています。これらの構成オプションを設定するには、環境の値をオーバーライドします。

  • ワーカーの同時実行

    [core]worker_concurrency パラメータは、Airflow ワーカーが同時に実行できるタスクの最大数を制御します。このパラメータの値に Cloud Composer 環境の Airflow ワーカーの数を掛けると、環境内の特定の時間に実行できるタスクの最大数が得られます。この数は、以下に説明する [core]parallelism Airflow 構成オプションによって制限されます。

  • 最大の有効な DAG 実行数

    [core]max_active_runs_per_dag Airflow 構成オプションは、DAG ごとのアクティブな DAG 実行の最大数を制御します。DAG がこの上限に達すると、スケジューラはそれ以上 DAG 実行を作成しません。

    このパラメータが正しく設定されていない場合、スケジューラが特定の時点で DAG 実行インスタンスを作成できなくなるため、DAG 実行を抑制できる可能性があります。

  • DAG の同時実行

    [core]dag_concurrency Airflow 構成オプションは、各 DAG で同時に実行できるタスク インスタンスの最大数を制御します。これは DAG レベルのパラメータです。

    このパラメータが正しく設定されていない場合、1 つの DAG インスタンスの実行速度が遅いという問題が発生する可能性があります。これは、一度に実行できる DAG タスクの数が限られているためです。

  • 並列処理とプールサイズ

    [core]parallelism Airflow 構成オプションは、これらのタスクのすべての依存関係が満たされた後に、Airflow スケジューラがエグゼキュータのキュー内にキューに入れるタスクの数を制御します。

    これは Airflow の設定全体のグローバル パラメータです。

    タスクはキューに入れられ、プール内で実行されます。Cloud Composer 環境では、プールが 1 つだけ使用されます。このプールのサイズは、スケジューラが特定の時間に実行するためにキューに入れられるタスクの数を制御します。プールサイズが小さすぎると、スケジューラは、[core]parallelism 構成オプションと、[core]worker_concurrency 構成オプションに Airflow ワーカー数を掛けた値によって定義されたしきい値がまだ満たされない場合であっても、実行するタスクをキューに追加できません。

    Airflow UI (メニュー > [Admin] > [Pools])でプールサイズを構成できます。プールサイズを、環境で期待する並列処理のレベルに調整します。