DAG プロセッサの問題のトラブルシューティング

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

このページでは、DAG ファイル処理に関連する問題のみを扱います。タスクのスケジューリングに関する問題については、Airflow スケジューラの問題のトラブルシューティングをご覧ください。

トラブルシューティングのワークフロー

DAG プロセッサのログを検査する

DAG が複雑な場合、Airflow DAG プロセッサがすべての DAG を解析しないことがあります。これにより、次のような症状が見られるさまざまな問題が発生する可能性があります。

症状:

  • DAG の解析中に DAG プロセッサで問題が発生した場合、下記の問題が組み合わさって発生することがあります。DAG が動的に生成される場合、これらの問題は静的 DAG よりも影響が大きい可能性があります。

    • Airflow UI と DAG UI で DAG が表示されない。
    • DAG の実行がスケジュール設定されない。
    • DAG プロセッサ ログにエラーがある。例:
    dag-processor-manager [2023-04-21 21:10:44,510] {manager.py:1144} ERROR -
    Processor for /home/airflow/gcs/dags/dag-example.py with PID 68311 started
    at 2023-04-21T21:09:53.772793+00:00 has timed out, killing it.
    

    または

    dag-processor-manager [2023-04-26 06:18:34,860] {manager.py:948} ERROR -
    Processor for /home/airflow/gcs/dags/dag-example.py exited with return
    code 1.
    
  • 実行がスケジュールされている Airflow タスクがキャンセルされ、解析に失敗した DAG の DAG 実行が failed とマークされる可能性があります。次に例を示します。

    airflow-scheduler Failed to get task '<TaskInstance: dag-example.task1--1
    manual__2023-04-17T10:02:03.137439+00:00 [removed]>' for dag
    'dag-example'. Marking it as removed.
    

解決策:

  • DAG の解析に関連付けられたパラメータを増やします。

    • [core]dagbag_import_timeout を 120 秒以上(必要に応じてそれ以上)に増やします。

    • [core]dag_file_processor_timeout を 180 秒以上(必要に応じてそれ以上)に増やします。この値は [core]dagbag_import_timeout より大きい値にする必要があります。

  • DAG プロセッサに問題を引き起こす DAG を修正するか、削除します。

DAG の解析時間の検査

問題が DAG 解析時間に発生したかどうかを確認するには、以下の手順に従います。

コンソール

Google Cloud コンソールでは、[モニタリング] ページと [ログ] タブを使用して DAG の解析時間を調べることができます。

Cloud Composer の [モニタリング] ページで DAG の解析時間を調べる

  1. Google Cloud コンソールで、[環境] ページに移動します。

    [環境に移動][console-list-env]

  2. 環境のリストで、ご利用の環境の名前をクリックします。[モニタリング] ページが開きます。

  3. [モニタリング] タブで、[DAG の統計情報] を選択し、[すべての DAG ファイルの合計解析時間] グラフを調べて、問題があるかどうか確認します。このグラフをしばらくモニタリングして、複数の DAG 解析サイクルで DAG 解析の問題を特定することをおすすめします。

    [モニタリング] タブに、すべての DAG ファイルの合計解析時間が表示されます。

Cloud Composer の [ログ] タブで DAG の解析時間を調べる

  1. Google Cloud コンソールで、[環境] ページに移動します。

    [環境に移動][console-list-env]

  2. 環境のリストで、ご利用の環境の名前をクリックします。[モニタリング] ページが開きます。

  3. [ログ] タブに移動し、[すべてのログ] ナビゲーション ツリーから [DAG プロセッサ マネージャー] セクションを選択します。

  4. dag-processor-manager ログを確認し、問題かあるかどうか確認します。

    DAG プロセッサのログに DAG の解析時間が表示されます

gcloud

すべての DAG の解析時間を表示するには、dags report コマンドを使用します。

gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags report

以下のように置き換えます。

  • ENVIRONMENT_NAME を環境の名前に置き換えます。
  • LOCATION は、環境が配置されているリージョン。

コマンドの出力は次のようになります。

file                  | duration       | dag_num | task_num | dags
======================+================+=========+==========+===================
/manydagsbig.py       | 0:00:00.038334 | 2       | 10       | serial-0,serial-0
/airflow_monitoring.py| 0:00:00.001620 | 1       | 1        | airflow_monitoring

表に記載されている各 DAG の duration 値を探します。 値が大きい場合、いずれかの DAG が最適な方法で実装されていない可能性があります。出力テーブルから、解析時間が長い DAG を確認できます。

DAG 解析時間の問題のトラブルシューティング

以下のセクションでは、DAG 解析時間の一般的な問題の症状と想定される修正方法について説明します。

スレッド数の上限

DAG プロセッサに、限られた数のスレッドのみの使用を許可すると、DAG 解析時間に影響する可能性があります。

この問題を解決するには、[scheduler]parsing_processes Airflow 構成オプションをオーバーライドします。初期値を、単一の DAG プロセッサで使用される vCPU の数に設定します。次に、DAG プロセッサのリソース割り当てを調整して、vCPU またはメモリ容量の約 70% で動作するようにします。

DAG プロセッサに不要なファイルを無視させる

DAG フォルダ内の不要なファイルをスキップすることで、Airflow DAG プロセッサのパフォーマンスを向上させることができます。Airflow DAG プロセッサは、.airflowignore ファイルで指定されたファイルとフォルダを無視します。

Airflow DAG プロセッサに不要なファイルを無視させるには:

  1. .airflowignore ファイルを作成します。
  2. このファイルで、無視するファイルとフォルダを一覧表示します。
  3. 環境のバケット内の /dags フォルダにこのファイルをアップロードします。

.airflowignore ファイル形式の詳細については、Airflow のドキュメントをご覧ください。

Airflow が一時停止した DAG を処理する

DAG を一時停止して、実行を停止できます。これにより、Airflow ワーカーのリソースが節約されます。

Airflow DAG プロセッサは、一時停止した DAG の解析を続けます。DAG プロセッサのパフォーマンスを向上させるには、.airflowignore を使用するか、一時停止した DAG を DAG フォルダから削除します。

一般的な問題

以降のセクションでは、いくつかの一般的な解析の問題の症状と可能性のある修正方法について説明します。

DAG のインポートの読み込みのタイムアウト

症状:

  • Airflow ウェブ インターフェースで、DAG の一覧ページの上部にある赤いアラート ボックスに「Broken DAG: [/path/to/dagfile] Timeout」と表示される。
  • Cloud Monitoring で、airflow-scheduler ログに次のようなエントリが含まれる。

    • ERROR - Process timed out
    • ERROR - Failed to import: /path/to/dagfile
    • AirflowTaskTimeout: Timeout

修正:

dag_file_processor_timeout Airflow 構成のオプションをオーバーライドして DAG の解析にかける時間を増やす。

セクション キー
core dag_file_processor_timeout 新しいタイムアウト値

DAG が Airflow UI や DAG UI に表示されず、スケジューラが DAG のスケジュールを設定しない

DAG プロセッサは、スケジューラがスケジュールを設定する前に、また DAG が Airflow UI または DAG UI に表示される前に、各 DAG を解析します。

次の Airflow 構成オプションは、DAG の解析のタイムアウトを定義します。

DAG が Airflow UI または DAG UI に表示されない場合は、次の操作を行います。

  • DAG プロセッサが DAG を正しく処理できるかどうか、DAG プロセッサのログを確認します。問題が発生した場合、DAG プロセッサまたはスケジューラのログに次のログエントリが表示されます。

    [2020-12-03 03:06:45,672] {dag_processing.py:1334} ERROR - Processor for
    /usr/local/airflow/dags/example_dag.py with PID 21903 started at
    2020-12-03T03:05:55.442709+00:00 has timed out, killing it.
    
  • スケジューラのログをチェックして、スケジューラが正常に動作しているかどうかを確認します。問題がある場合、スケジューラのログに次のログエントリが表示されます。

    DagFileProcessorManager (PID=732) last sent a heartbeat 240.09 seconds ago! Restarting it
    Process timed out, PID: 68496
    

解決策:

  • DAG 解析エラーをすべて修正します。DAG プロセッサは複数の DAG を解析します。まれに、1 つの DAG の解析エラーが他の DAG の解析に悪影響を及ぼすことがあります。

  • DAG の解析に [core]dagbag_import_timeout で定義された秒数を超える時間がかかる場合は、このタイムアウトを長くします。

  • すべての DAG の解析に [core]dag_file_processor_timeout で定義されている秒数を超える時間がかかる場合は、このタイムアウトを長くします。

  • DAG の解析に時間がかかっている場合、最適な方法で実装されていない可能性がある。たとえば、多くの環境変数を読み取る場合や、外部サービスまたは Airflow データベースへの呼び出しを行う場合が該当します。可能な限り、DAG のグローバル セクションでこのようなオペレーションを実行しないようにします。

  • DAG プロセッサが高速に動作するように、DAG プロセッサの CPU リソースとメモリリソースを増やします。

  • DAG 解析の頻度を下げる

  • Airflow データベースの負荷を軽減する

次のステップ