DAG(ワークフロー)のトラブルシューティング

このページでは、トラブルシューティングの手順と一般的なワークフローの問題に関する情報を提供します。

ワークフローのトラブルシューティング

トラブルシューティングを開始するには、次の手順を行います。

  1. Airflow ログを確認します。
  2. Google Cloud のオペレーション スイートを確認します。
  3. Cloud Console で、環境を実行している Google Cloud コンポーネントのページでエラーを確認します。
  4. Airflow ウェブ インターフェースの DAG のグラフビューで、失敗したタスク インスタンスを確認します。

    ヒント: サイズが大きい DAG に移動して、失敗したタスク インスタンスを確認するには、ウェブサーバーのデフォルトの dag_orientation 構成をオーバーライドすることにより、グラフビューの画面の向きを LR から RL に変更します。

    セクション キー
    webserver dag_orientation LRTBRL、または BT

演算子の失敗をデバッグする

演算子の失敗をデバッグするには、次の手順を行います。

  1. タスク固有のエラーを確認します
  2. Airflow ログを確認します。
  3. Google Cloud のオペレーション スイートを確認します。
  4. 演算子固有のログを確認します。
  5. エラーを修正します。
  6. dags/ フォルダに DAG をアップロードします。
  7. Airflow ウェブ インターフェースで、DAG の過去の状態を消去します。
  8. DAG を再開または実行します。

一般的な問題

以降のセクションでは、いくつかの一般的な DAG の問題の症状と可能性のある修正方法について説明します。

ログが出力されずタスクが失敗する

Google Kubernetes Engine Pod には Kubernetes Pod Lifecycle と Pod の強制排除が適用されます。タスクの急激な増大とワーカーの同時スケジューリングの 2 つが、Cloud Composer で Pod が強制排除される最も一般的な原因です。

Pod の強制排除は、特定の Pod が、ノードの構成済みのリソース使用量予想と比較して、ノードのリソースを過剰に使用すると発生する可能性があります。たとえば、強制排除は、Pod 内でメモリを多く消費するいくつかのタスクが実行され、それらの負荷の合計によりこの Pod が実行されるノードがメモリ使用量上限を超過する場合に発生する可能性があります。

Airflow ワーカー Pod が強制排除されると、その Pod で実行されているすべてのタスク インスタンスは中断され、後で Airflow によって失敗としてマークされます。

ログはバッファリングされます。バッファリングがフラッシュする前にワーカー Pod が強制排除された場合、ログは出力されません。ログなしでタスクが失敗する場合は、メモリ不足(OOM)により Airflow ワーカーが再起動されたことを示します。Airflow ログが出力されていなくても、Cloud Logging に一部のログが存在することがあります。ログは、たとえば次の方法で表示できます。Google Cloud Console で環境を選択し、[ログ] タブに移動し、[すべてのログ] -> [Airflow ログ] -> [ワーカー] -> [(個々のワーカー)] で個々のワーカーのログを表示します。

DAG の実行には RAM の制限があります。各タスクの実行は、タスクの実行とモニタリングという 2 つの Airflow プロセスとともに開始されます。各ノードは最大 6 つの同時タスク(Airflow モジュールとともに読み込む、約 12 のプロセス)を実行できます。DAG の性質によっては、より多くのメモリが消費される可能性があります。

症状

  1. Cloud Console で、[Kubernetes Engine] -> [ワークロード] パネルに移動します。

    [ワークロード] パネルを開く

  2. Evicted を示す airflow-worker Pod が存在する場合は、強制排除された各 Pod をクリックして、ウィンドウ上部の The node was low on resource: memory メッセージを確認します。

修正

DAG のインポートの読み込みのタイムアウト

症状:

  • Airflow ウェブ UI: DAG の一覧ページの上部にある赤いアラート ボックスに、「Broken DAG: [/path/to/dagfile] Timeout」と表示される。
  • Google Cloud のオペレーション スイート: airflow-scheduler ログに次のようなエントリが含まれている。
    • 「ERROR - Process timed out」
    • 「ERROR - Failed to import: /path/to/dagfile」
    • 「AirflowTaskTimeout: Timeout」

修正:

dagbag_import_timeout Airflow 構成のオプションをオーバーライドして、DAG の解析にかける時間を増やす。

セクション キー
core dagbag_import_timeout 新しいタイムアウト値

Airflow データベースとのネットワーク トラフィックが増加する

環境の GKE クラスタと Airflow データベース間のトラフィック ネットワークの量は、DAG の数、DAG 内のタスクの数、DAG の Airflow データベースのデータへのアクセス方法によって異なります。次の要因がネットワーク使用量に影響する可能性があります。

  • Airflow データベースへのクエリ。DAG で大量のクエリを実行すると、大量のトラフィックが生成されます。例: 他のタスクに進む前にタスクのステータスを確認し、XCom テーブルに対してクエリを実行し、Airflow データベース コンテンツをダンプします。

  • 多数のタスク。スケジューリングするタスクが多いほど、より多くのネットワーク トラフィックが生成されます。この考慮事項は、DAG 内のタスクの合計数とスケジューリング頻度の両方に適用されます。Airflow スケジューラは DAG の実行をスケジュールする際に、Airflow データベースに対してクエリを実行し、トラフィックを生成します。

  • Airflow ウェブ インターフェースによって、Airflow データベースにクエリが行われるので、ネットワーク トラフィックが生成されます。グラフ、タスク、図を含むページを集中的に使用すると、大量のネットワーク トラフィックが生成されます。

DAG で、Airflow ウェブサーバーがクラッシュするか、または Airflow ウェブサーバーによって 502 gateway timeout エラーが返されます

ウェブサーバーでは、いくつかの異なる理由で失敗が生じる可能性があります。Cloud Loggingairflow-webserver ログを確認し、502 gateway timeout エラーの原因を特定します。

重い処理

DAG の解析時には重い処理を実行しないでください。CPU とメモリ容量を増やすためにマシンタイプをカスタマイズできるワーカーノードやスケジューラ ノードとは異なり、ウェブサーバーは固定されたマシンタイプを使用します。そのため、解析時の処理が重すぎる場合、DAG の解析の失敗を引き起こす可能性があります。

ウェブサーバーには 2 つの vCPU と 2 GB のメモリが備わっていますcore-dagbag_import_timeout のデフォルト値は 30 秒です。このタイムアウト値により、Airflow が dags/ フォルダ内の Python モジュールの読み取りに費やす時間の上限が定義されます。

不適切な権限

ウェブサーバーは、ワーカーやスケジューラと同じサービス アカウントでは動作しません。そのため、ワーカーとスケジューラはウェブサーバーがアクセスできない、ユーザー管理のリソースにもアクセスできる場合があります。

DAG の解析中は、非公開リソースへのアクセスは避けることをおすすめします。これを回避できない場合もあります。その場合は、ウェブサーバーのサービス アカウントに権限を付与する必要があります。サービス アカウント名は、ウェブサーバーのドメインから取得されます。たとえば、ドメインが foo-tp.appspot.com の場合、サービス アカウントは foo-tp@appspot.gserviceaccount.com です。

DAG エラー

ウェブサーバーは App Engine 上で動作し、環境の GKE クラスタとは分離しています。ウェブサーバーは DAG 定義ファイルを解析し、DAG にエラーがある場合は 502 gateway timeout が発生する可能性があります。問題のある DAG によって GKE で実行中のプロセスが中断されない場合、Airflow は正常に機能しているウェブサーバーなしでも正常に機能します。 この場合、環境から詳細を取得し、ウェブサーバーが利用できなくなった場合の回避策とするために gcloud composer environments run を使用できます。

その他の場合には、GKE で DAG の解析を実行して、致命的な Python 例外をスローする DAG やそのタイムアウト(デフォルトは 30 秒)を確認できます。 トラブルシューティングを行うには、Airflow ワーカー コンテナ内のリモートシェルに接続して構文エラーをテストします。詳細については、DAG のテストをご覧ください。

Lost connection to MySQL server during query 例外は、タスクの実行中かタスクの直後に、スローされます。

Lost connection to MySQL server during query 例外は、多くの場合、次の条件が満たされると発生します。

  • DAG で PythonOperator またはカスタム演算子が使用されます。
  • DAG で Airflow データベースにクエリが行われます。

呼び出し可能な関数から複数のクエリが行われた場合、トレースバックによって Airflow コードの self.refresh_from_db(lock_for_update=True) 行が誤って指し示されることがあります。これがタスク実行後の最初のデータベース クエリです。これ以前に例外の実際の原因が生じるのは、SQLAlchemy セッションが適切に終了していない場合です。

SQLAlchemy セッションはスレッドをスコープとし、呼び出し可能な関数セッション内で作成すれば、後で Airflow コード内で続行できます。1 つのセッションの間にクエリ間に大幅な遅延がある場合は、接続がすでに MySQL または PostgreSQL サーバーによって閉じられている可能性があります。Cloud Composer 環境での接続タイムアウトは、約 10 分に設定されています。

修正:

  • airflow.utils.db.provide_session デコレータを使用します。このデコレータは、session パラメータで Airflow データベースへの有効なセッションを提供し、関数の最後にセッションを正しく終了します。
  • 単一の長時間実行関数を使用しないでください。代わりに、すべてのデータベース クエリを別個の関数に移動して、airflow.utils.db.provide_session デコレータを使用した複数の関数が含まれるようにします。この場合、セッションはクエリ結果を取得した後に自動的に閉じます。