DAG のトラブルシューティング

Cloud Composer 1 | Cloud Composer 2

このページでは、トラブルシューティングの手順と一般的なワークフローの問題に関する情報を提供します。

DAG 実行の問題の多くは、最適でない環境のパフォーマンスが原因で発生します。環境のパフォーマンスとコストの最適化ガイドに従って、Cloud Composer 2 環境を最適化できます。

DAG の実行に関する一部の問題は、Airflow スケジューラが正しく機能していない、または最適に機能していないことが原因である可能性があります。スケジューラのトラブルシューティングの手順に沿って問題を解決してください。

ワークフローのトラブルシューティング

トラブルシューティングを開始するには、次の手順を行います。

  1. Airflow ログを確認します。
  2. モニタリング ダッシュボードを確認します。
  3. Cloud Monitoring を確認します。
  4. Cloud Console で、環境のコンポーネントのページのエラーを確認します。
  5. Airflow ウェブ インターフェースの DAG のグラフビューで、失敗したタスク インスタンスを確認します。

    ヒント: サイズが大きい DAG に移動して、失敗したタスク インスタンスを確認するには、次の Airflow 構成オプションをオーバーライドして、グラフビューの画面の向きを LR から RL に変更します。

    セクション キー
    webserver dag_orientation LRTBRL、または BT

演算子の失敗をデバッグする

演算子の失敗をデバッグするには、次の手順を行います。

  1. タスク固有のエラーを確認します
  2. Airflow ログを確認します。
  3. Cloud Monitoring を確認します。
  4. 演算子固有のログを確認します。
  5. エラーを修正します。
  6. dags/ フォルダに DAG をアップロードします。
  7. Airflow ウェブ インターフェースで、DAG の過去の状態を消去します。
  8. DAG を再開または実行します。

一般的な問題

以降のセクションでは、いくつかの一般的な DAG の問題の症状と可能性のある修正方法について説明します。

DAG 解析エラーのためにログが出力されずタスクが失敗する

微妙な DAG エラーが原因で、Airflow スケジューラと DAG プロセッサがそれぞれが、実行するタスクをスケジューリングし、DAG ファイルを解析できるものの、Airflow ワーカーがタスクの実行に失敗する状況(DAGファイルにプログラミング エラーがあるなど)が発生する可能性があります。このため、特定の Airflow タスクが Failed とマークされ、実行からのログが存在しないことがあります。

解決策: Airflow ワーカーログで、Airflow ワーカーで DAG の解析エラーまたは DAG 解析エラーに関連するエラーが発生していないことを確認します。

リソース不足のためにログが出力されずタスクが失敗する

症状: タスクの実行中に、Airflow タスクの実行を担当する Airflow ワーカー サブプロセスが突然中断されます。Airflow ワーカーのログに表示されるエラーは、次のようになります。

...
File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 412, in trace_task    R = retval = fun(*args, **kwargs)  File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 704, in __protected_call__    return self.run(*args, **kwargs)  File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 88, in execute_command    _execute_in_fork(command_to_exec)  File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 99, in _execute_in_fork
raise AirflowException('Celery command failed on host: ' + get_hostname())airflow.exceptions.AirflowException: Celery command failed on host: airflow-worker-9qg9x
...

解決方法:

Pod のエビクションのためにログが出力されずタスクが失敗する

Google Kubernetes Engine Pod には Kubernetes Pod Lifecycle と Pod の強制排除が適用されます。タスクの急激な増大とワーカーの同時スケジューリングの 2 つが、Cloud Composer で Pod が強制排除される最も一般的な原因です。

Pod の強制排除は、特定の Pod が、ノードの構成済みのリソース使用量予想と比較して、ノードのリソースを過剰に使用すると発生する可能性があります。たとえば、強制排除は、Pod 内でメモリを多く消費するいくつかのタスクが実行され、それらの負荷の合計によりこの Pod が実行されるノードがメモリ使用量上限を超過する場合に発生する可能性があります。

Airflow ワーカー Pod が強制排除されると、その Pod で実行されているすべてのタスク インスタンスは中断され、後で Airflow によって失敗としてマークされます。

ログはバッファリングされます。バッファがフラッシュする前にワーカー Pod が強制排除された場合、ログは出力されません。ログなしでタスクが失敗する場合は、メモリ不足(OOM)により Airflow ワーカーが再起動されたことを示します。Airflow ログが出力されていない場合でも、Cloud Logging に一部のログが存在することがあります。

ログを表示するには:

  1. Google Cloud Console で [環境] ページに移動します。

    [環境] に移動

  2. 環境のリストで、ご利用の環境の名前をクリックします。[環境の詳細] ページが開きます。

  3. [ログ] タブに移動します。

  4. [すべてのログ] -> [Airflow ログ] -> [ワーカー] -> [(個々のワーカー)] で個々のワーカーのログを表示します。

DAG の実行にはメモリ制限があります。各タスクの実行は、タスクの実行とモニタリングという 2 つの Airflow プロセスとともに開始されます。各ノードは最大 6 つの同時タスク(Airflow モジュールとともに読み込む、約 12 のプロセス)を実行できます。DAG の性質によっては、より多くのメモリが消費される可能性があります。

症状:

  1. Google Cloud Console で [環境] ページに移動します。

    [ワークロード] に移動

  2. Evicted を示す airflow-worker Pod が存在する場合は、強制排除された各 Pod をクリックして、ウィンドウ上部の The node was low on resource: memory メッセージを確認します。

修正:

DAG のインポートの読み込みのタイムアウト

症状:

  • Airflow ウェブ インターフェースの DAG リストページの上部にある Broken DAG: [/path/to/dagfile] Timeout に赤いアラート ボックスが表示されます。
  • Cloud Monitoring では、airflow-scheduler ログに次のようなエントリが含まれます。

    • ERROR - Process timed out
    • ERROR - Failed to import: /path/to/dagfile
    • AirflowTaskTimeout: Timeout

修正:

dag_file_processor_timeout Airflow 構成のオプションをオーバーライドして DAG の解析にかける時間を増やす。

セクション キー
core dag_file_processor_timeout 新しいタイムアウト値

Airflow データベースとのネットワーク トラフィックが増加する

環境の GKE クラスタと Airflow データベース間のトラフィック ネットワークの量は、DAG の数、DAG のタスク数、DAG が Airflow データベースのデータにアクセスする方法によって異なります。次の要因がネットワーク使用量に影響する可能性があります。

  • Airflow データベースへのクエリ。DAG で大量のクエリを実行すると、大量のトラフィックが生成されます。例: 他のタスクに進む前にタスクのステータスを確認し、XCom テーブルに対してクエリを実行し、Airflow データベース コンテンツをダンプする。

  • 多数のタスク。スケジュール設定するタスクが多いほど、より多くのネットワーク トラフィックが生成されます。この考慮事項は、DAG 内のタスクの合計数とスケジューリング頻度の両方に適用されます。Airflow スケジューラが DAG 実行をスケジュールすると、Airflow データベースに対してクエリを実行し、トラフィックを生成します。

  • Airflow ウェブ インターフェースによって、Airflow データベースにクエリが行われるので、ネットワーク トラフィックが生成されます。グラフ、タスク、図を含むページを集中的に使用すると、大量のネットワーク トラフィックが生成されます。

DAG で、Airflow ウェブサーバーがクラッシュするか、または Airflow ウェブサーバーによって 502 gateway timeout エラーが返されます

ウェブサーバーでは、いくつかの理由で失敗が生じる可能性があります。Cloud Loggingairflow-webserver ログを確認し、502 gateway timeout エラーの原因を特定します。

重い処理

このセクションは Cloud Composer 1 にのみ適用されます。

DAG の解析時には重い処理を実行しないでください。

CPU とメモリ容量を増やすためにマシンタイプをカスタマイズできるワーカーノードやスケジューラ ノードとは異なり、ウェブサーバーは固定されたマシンタイプを使用します。そのため、解析時の処理が重すぎる場合、DAG の解析の失敗を引き起こす可能性があります。

ウェブサーバーには 2 つの vCPU と 2 GB のメモリが備わっていますcore-dagbag_import_timeout のデフォルト値は 30 秒です。このタイムアウト値により、Airflow が dags/ フォルダ内の Python モジュールの読み取りに費やす時間の上限が定義されます。

不適切な権限

このセクションは Cloud Composer 1 にのみ適用されます。

ウェブサーバーは、ワーカーやスケジューラと同じサービス アカウントでは動作しません。そのため、ワーカーとスケジューラはウェブサーバーがアクセスできない、ユーザー管理のリソースにもアクセスできる場合があります。

DAG の解析中は、非公開リソースへのアクセスは避けることをおすすめします。これを回避できない場合もあります。その場合は、ウェブサーバーのサービス アカウントに権限を付与する必要があります。サービス アカウント名は、ウェブサーバーのドメインから取得されます。たとえば、ドメインが example-tp.appspot.com の場合、サービス アカウントは example-tp@appspot.gserviceaccount.com になります。

DAG エラー

このセクションは Cloud Composer 1 にのみ適用されます。

ウェブサーバーは App Engine 上で動作し、環境の GKE クラスタとは分離しています。ウェブサーバーは DAG 定義ファイルを解析し、DAG にエラーがある場合は 502 gateway timeout が発生する可能性があります。問題のある DAG によって GKE で実行中のプロセスが中断されない場合、Airflow は正常に機能しているウェブサーバーなしでも正常に機能します。 この場合、環境から詳細を取得し、ウェブサーバーが利用できなくなった場合の回避策とするために gcloud composer environments run を使用できます。

その他の場合には、GKE で DAG の解析を実行して、致命的な Python 例外をスローする DAG やそのタイムアウト(デフォルトは 30 秒)を確認できます。 トラブルシューティングを行うには、Airflow ワーカー コンテナ内のリモートシェルに接続して構文エラーをテストします。詳細については、DAG のテストをご覧ください。

Lost connection to MySQL server during query 例外は、タスクの実行中かタスクの直後に、スローされます。

Lost connection to MySQL / PostgreSQL server during query 例外は、多くの場合、次の条件が満たされると発生します。

  • DAG で PythonOperator またはカスタム演算子が使用されます。
  • DAG で Airflow データベースにクエリが行われます。

呼び出し可能な関数から複数のクエリが行われた場合、トレースバックによって Airflow コードの self.refresh_from_db(lock_for_update=True) 行が誤って指し示されることがあります。これがタスク実行後の最初のデータベース クエリです。これ以前に例外の実際の原因が生じるのは、SQLAlchemy セッションが適切に終了していない場合です。

SQLAlchemy セッションはスレッドにスコープが設定され、呼び出し可能な関数セッションで作成され、後で Airflow コード内で継続できます。1 つのセッションでクエリ間に大幅な遅延がある場合は、MySQL サーバーまたは PostgreSQL サーバーによって接続がすでに終了している可能性があります。Cloud Composer 環境の接続タイムアウトは約 10 分に設定されています。

修正:

  • airflow.utils.db.provide_session デコレータを使用します。このデコレータは、session パラメータで Airflow データベースへの有効なセッションを提供し、関数の最後にセッションを正しく閉じます。
  • 単一の長時間実行関数を使用しないでください。代わりに、すべてのデータベース クエリを別々の関数に移動し、airflow.utils.db.provide_session デコレータを持つ複数の関数が含まれるようにします。この場合、セッションはクエリ結果を取得した後に自動的に閉じます。

DAG、タスク、同じ DAG の並列実行のローリング時間

特定の DAG に対する単一の DAG 実行時間を制御する場合は、dagrun_timeout DAG パラメータを使用します。たとえば、単一の DAG 実行(実行が失敗したか失敗したかは問わない)が 1 時間を超えないようにする必要がある場合は、このパラメータを 3,600 秒に設定します。

単一の Airflow タスクの継続時間を制御することもできます。これを行うには、execution_timeout を使用します。

特定の DAG に対して実行するアクティブな DAG 実行の数を制御する場合は、[core]max-active-runs-per-dag Airflow 構成オプションを使用します。

特定の期間に DAG のインスタンスを 1 つだけ実行する場合は、max-active-runs-per-dag パラメータを 1 に設定します。

スケジューラ、ワーカー、ウェブサーバーへの DAG とプラグインの同期に影響する問題

Cloud Composer は、/dags フォルダと /plugins フォルダの内容をスケジューラとワーカーに同期します。/dags フォルダと /plugins フォルダの特定のオブジェクトにより、この同期が正常に機能しないか、少なくとも遅くなる可能性があります。

  • /dags フォルダがスケジューラとワーカーに同期されます。このフォルダは、Cloud Composer 2 ではウェブサーバーに同期されず、Cloud Composer 1 で DAG Serialization もオンになりません。

  • /plugins フォルダはスケジューラ、ワーカー、ウェブサーバーに同期されます。

次のような問題が発生する可能性があります。

  • 圧縮トランス コーディングを使用する gzip 圧縮ファイルを /dags フォルダと /plugins フォルダにアップロードしました。これは、通常、gsutil cp -Z コマンドを使用してデータをバケットにアップロードする場合に発生します。

    解決策: 圧縮コード変換を使用したオブジェクトを削除し、バケットに再度アップロードします。

  • オブジェクトの 1 つが「.」という名前である場合、このようなオブジェクトはスケジューラやワーカーと同期されないため、同期が完全に停止する可能性があります。

    解決策: 問題のあるオブジェクトの名前を変更します。

  • /dags フォルダまたは /plugins フォルダのオブジェクトの 1 つのオブジェクト名の末尾に / 記号があります。/ 記号はオブジェクトがファイルではなくフォルダであることを表すため、このようなオブジェクトでは同期プロセスで問題が発生する可能性があります。

    解決策: 問題のあるオブジェクトの名前から / 記号を削除します。

  • 不要なファイルを /dags フォルダと /plugins フォルダに保存しないでください。

    場合によっては、実装する DAG とプラグインに、これらのコンポーネントのテストを保存するファイルなど、追加ファイルが含まれることがあります。これらのファイルはワーカーとスケジューラに同期されており、これらのファイルをスケジューラ、ワーカー、ウェブサーバーにコピーするために必要な時間に影響します。

    解決策: 追加の不要なファイルを /dags フォルダや /plugins フォルダに保存しないでください。

Done [Errno 21] Is a directory: '/home/airflow/gcs/dags/...' エラーはスケジューラとワーカーによって生成されます。

この問題は、Cloud Storage でオブジェクトの名前空間が重複する場合と、スケジューラとワーカーが従来のファイル システムを使用している場合に発生します。たとえば、同じ名前のフォルダとオブジェクトの両方が環境のバケットに追加される可能性があります。バケットが環境のスケジューラとワーカーと同期されている場合、このエラーが生成され、タスクが失敗する可能性があります。

この問題を解決するには、環境のバケット内に重複する名前空間がないことを確認してください。たとえば、/dags/misc(ファイル)と /dags/misc/example_file.txt(別のファイル)の両方がバケットにある場合、スケジューラによってエラーが生成されます。

Airflow メタデータ DB への接続時の一時的な中断

Cloud Composer は分散クラウド インフラストラクチャ上で動作します。つまり、一時的な問題によって Airflow タスクの実行が中断されることがあるので注意してください。

この場合、Airflow ワーカーのログに次のエラー メッセージが記録されることがあります。

"Can't connect to MySQL server on 'airflow-sqlproxy-service.default.svc.cluster.local' (111)"

または

"Can't connect to MySQL server on 'airflow-sqlproxy-service.default.svc.cluster.local' (104)"

このような一時的な問題は、Cloud Composer 環境に対して実行されるメンテナンス オペレーションによっても発生することがあります。

通常、このようなエラーは断続的に発生し、Airflow タスクがべき等で再試行が構成されている場合、それらの影響を受けないようにする必要があります。メンテナンスの時間枠の定義も検討できます。

このようなエラーが発生するもう 1 つの理由は、環境のクラスタにリソースがないことです。そのような場合は、環境のスケーリングまたは環境の最適化の説明に従って、環境をスケールアップまたは最適化します。

Airflow データベースの負荷が高い症状

Airflow ワーカーログに、次の警告ログエントリが表示されることがあります。

(_mysql_exceptions.OperationalError) (2006, "Lost connection to MySQL server at 'reading initial communication packet', system error: 0")"

このようなエラーや警告は、Airflow メタデータ データベースが処理する必要があるクエリの数に圧倒されている候である場合があります。

解決策の提示:

次のステップ