DAG のトラブルシューティング

Cloud Composer 1 | Cloud Composer 2

このページでは、トラブルシューティングの手順と一般的なワークフローの問題に関する情報を提供します。

DAG 実行の問題の多くは、最適ではない環境のパフォーマンスが原因で発生します。環境のパフォーマンスと費用を最適化するガイドに沿って、Cloud Composer 2 環境を最適化できます。

一部の DAG 実行の問題は、Airflow スケジューラが正しく動作しないか、最適に機能していないことが原因の可能性があります。スケジューラのトラブルシューティング手順に沿って問題を解決してください。

ワークフローのトラブルシューティング

トラブルシューティングを開始するには、次の手順を行います。

  1. Airflow ログを確認します。

    Airflow のロギングレベルを上げるには、次の Airflow 構成オプションをオーバーライドします。

    Airflow 2

    セクション キー
    logging logging_level デフォルト値は INFO です。 ログメッセージの詳細度を上げるには、DEBUG に設定します。

    Airflow 1

    セクション キー
    core logging_level デフォルト値は INFO です。 ログメッセージの詳細度を上げるには、DEBUG に設定します。
  2. Monitoring ダッシュボードを確認します。

  3. Cloud Monitoring を確認します。

  4. Google Cloud コンソールで、環境のコンポーネントのページのエラーを確認します。

  5. Airflow ウェブ インターフェースの DAG のグラフビューで、失敗したタスク インスタンスを確認します。

    セクション キー
    webserver dag_orientation LRTBRL、または BT

演算子の失敗をデバッグする

演算子の失敗をデバッグするには、次の手順を行います。

  1. タスク固有のエラーを確認します
  2. Airflow ログを確認します。
  3. Cloud Monitoring を確認します。
  4. 演算子固有のログを確認します。
  5. エラーを修正します。
  6. dags/ フォルダに DAG をアップロードします。
  7. Airflow ウェブ インターフェースで、DAG の過去の状態を消去します。
  8. DAG を再開または実行します。

タスク実行のトラブルシューティング

Airflow は、タスクキューや Airflow データベースを介して相互に通信を行い、シグナル(SIGTERM など)を送信する、スケジューラ、エグゼキュータ、ワーカーなどのさまざまなエンティティを持つ分散システムです。次の図は、Airflow コンポーネント間の相互接続の概要を示しています。

Airflow コンポーネント間のインタラクション
図 1。Airflow コンポーネント間のインタラクション(クリックして拡大)

Airflow などの分散システムでは、ネットワーク接続に問題があるか、基盤となるインフラストラクチャで断続的な問題が発生している可能性があります。この場合、タスクが失敗して実行が再スケジュールされたり、タスクが正常に完了していない(ゾンビタスク、実行中にエラーになったタスクなど)ことがあります。Airflow には、こうした状況に対処するためのメカニズムがあり、自動的に機能を再開します。以下のセクションでは、Airflow によるタスクの実行中に発生する一般的な問題(ゾンビタスク、Poison Pill、SIGTERM シグナル)について説明します。

ゾンビタスクのトラブルシューティング

Airflow は、タスクとタスクを実行するプロセスの間で 2 種類の不一致を検出します。

  • ゾンビタスクは、実行されるはずであるが実行されていないタスクです。これは、タスクのプロセスが終了済みか応答していない場合、Airflow ワーカーが過負荷のためにタスク ステータスを時間内に報告しなかった場合、またはタスクが実行された VM がシャットダウンされた場合に発生します。Airflow はこのようなタスクを定期的に検出し、タスクの設定に応じて、タスクの失敗または再試行を行います。

  • デッドタスクとは、実行すべきでないタスクです。Airflow はこのようなタスクを定期的に検出し、終了します。

ゾンビタスクの最も一般的な理由は、環境のクラスタ内の CPU リソースとメモリリソースが不足していることです。その結果、Airflow ワーカーがタスクのステータスを報告できなかった、またはセンサーが突然中断された可能性があります。この場合、スケジューラは特定のタスクをゾンビタスクとしてマークします。ゾンビタスクを回避するには、より多くのリソースを環境に割り当てます。

Cloud Composer 環境のスケーリングの詳細については、環境のスケーリングのガイドをご覧ください。ゾンビタスクが発生した場合、考えられる 1 つのソリューションは、ゾンビタスクのタイムアウトを長くすることです。その結果、スケジューラはタスクがゾンビとみなされるまでに待機時間が長くなります。このようにして、Airflow ワーカーはタスクのステータスを報告する時間が増えます。

ゾンビタスクのタイムアウトを増やすには、[scheduler]scheduler_zombie_task_threshold Airflow 構成オプションの値をオーバーライドします。

セクション キー メモ
scheduler scheduler_zombie_task_threshold 新しいタイムアウト(秒) デフォルト値は 300 です。

Poison Pill のトラブルシューティング

Poison Pill は Airflow が Airflow タスクをシャットダウンするために使用するメカニズムです。

Airflow は、次のような場合に Poison Pill を使用します。

  • スケジューラが時間内に完了しなかったタスクを終了するとき。
  • タスクがタイムアウトしたか、実行時間が長すぎる場合。

Airflow が Poison Pill を使用すると、タスクを実行した Airflow ワーカーのログに次のログエントリが表示されます。

  INFO - Subtask ... WARNING - State of this instance has been externally set
  to success. Taking the poison pill.
  INFO - Subtask ... INFO - Sending Signals.SIGTERM to GPID <X>
  INFO - Subtask ... ERROR - Received SIGTERM. Terminating subprocesses.

解決策の提示

  • 実行に時間がかかりすぎる原因となるエラーがないか、タスクコードを確認します。
  • (Cloud Composer 2)タスクをより迅速に実行できるように、Airflow ワーカーの CPU とメモリを増やします
  • [celery_broker_transport_options]visibility-timeout Airflow 構成オプションの値を増やします。

    その結果、スケジューラはタスクの完了を待機してから、そのタスクをゾンビタスクとみなします。このオプションは、何時間もかかる作業に特に便利です。値が低すぎる(3 時間など)場合、スケジューラは 5 時間または 6 時間実行されるタスクを「ハング」したタスク(ゾンビタスク)と見なします。

  • [core]killed_task_cleanup_time Airflow 構成オプションの値を増やします。

    値を大きくすると、Airflow ワーカーはタスクを正常に完了するまでの時間が長くなります。値が小さすぎると、Airflow タスクが突然中断され、作業を正常に完了するのに十分な時間がありません。

SIGTERM シグナルのトラブルシューティング

SIGTERM シグナルは、Linux、Kubernetes、Airflow スケジューラ、Celery が Airflow ワーカーまたは Airflow タスクの実行を処理するプロセスを終了するために使用されます。

次のようないくつかの理由で、SIGTERM シグナルが環境内で送信される場合があります。

  • タスクがゾンビタスクになったため、停止する必要がある。

  • スケジューラがタスクの重複を検出し、Poison Pill と SIGTERM シグナルをタスクに送信してタスクを停止する。

  • 水平 Pod 自動スケーリングで、GKE コントロール プレーンが SIGTERM シグナルを送信して、不要になった Pod を削除する。

  • スケジューラは、SIGTERM シグナルを DagFileProcessorManager プロセスに送信できます。このような SIGTERM シグナルは、DagFileProcessorManager プロセスのライフサイクルを管理するために Scheduler によって使用され、無視しても問題ありません。

    例:

    Launched DagFileProcessorManager with pid: 353002
    Sending Signals.SIGTERM to group 353002. PIDs of all processes in the group: []
    Sending the signal Signals.SIGTERM to group 353002
    Sending the signal Signals.SIGTERM to process 353002 as process group is missing.
    
  • タスクの実行をモニタリングする local_task_job のハートビート コールバックと終了コールバックの間の競合状態。ハートビートは、タスクが成功とマークされたことを検出した場合、タスク自体が成功したのか、タスクが成功したと見なすように Airflow が通知されたのかを区別できません。それでも、タスクランナーは、終了するのを待たずに停止します。

    このような SIGTERM シグナルは無視しても問題ありません。タスクはすでに成功状態であり、DAG 実行全体の実行には影響しません。

    ログエントリ Received SIGTERM. は、通常の終了と成功した状態のタスクの停止との唯一の違いです。

    ハートビート コールバックと終了コールバックの間の競合状態
    図 2.ハートビート コールバックと終了コールバックの間の競合状態(クリックして拡大)
  • Airflow コンポーネントが、クラスタノードで許可されているよりも多くのリソース(CPU、メモリ)を使用します。

  • GKE サービスはメンテナンス オペレーションを実行し、アップグレードしようとしているノードで実行されている Pod に SIGTERM シグナルを送信します。タスク インスタンスが SIGTERM で停止すると、タスクを実行する Airflow ワーカーのログに次のログエントリが表示されます。

{local_task_job.py:211} WARNING - State of this instance has been externally
set to queued. Terminating instance. {taskinstance.py:1411} ERROR - Received
SIGTERM. Terminating subprocesses. {taskinstance.py:1703} ERROR - Task failed
with exception

解決策の提示

この問題は、タスクを実行する VM がメモリ不足になった場合に発生します。これは Airflow 構成ではなく、VM で使用可能なメモリ量に関連します。

メモリを増やすかどうかは、使用する Cloud Composer のバージョンによって異なります。次に例を示します。

  • Cloud Composer 2 では、より多くの CPU リソースとメモリリソースを Airflow ワーカーに割り当てることができます。

  • Cloud Composer 1 の場合は、パフォーマンスが高いマシンタイプを使用して環境を再作成できます。

  • 両方のバージョンの Cloud Composer で、[celery]worker_concurrency 同時実行 Airflow 構成オプションの値を小さくできます。このオプションは、特定の Airflow ワーカーによって同時に実行されるタスクの数を決定します。

Cloud Composer 2 環境の最適化の詳細については、環境のパフォーマンスと費用を最適化するをご覧ください。

Cloud Logging クエリで Pod の再起動またはエビクションの理由を特定する

Cloud Composer の環境では、GKE クラスタをコンピューティング インフラストラクチャ レイヤとして使用します。このセクションでは、Airflow ワーカーや Airflow スケジューラの再起動やエビクションの理由の特定に役立つクエリを確認できます。

以下に示すクエリは、次の方法で調整できます。

  • 希望するタイムラインを Cloud Logging で指定できます。たとえば、過去 6 時間、3 日、またはカスタム期間を定義できます。

  • Cloud Composer の CLUSTER_NAME を指定する必要があります。

  • POD_NAME を追加して、検索範囲を特定の Pod に限定することもできます。

再起動されたコンテナを検出する

    resource.type="k8s_node"
    log_id("kubelet")
    jsonPayload.MESSAGE:"will be restarted"
    resource.labels.cluster_name="CLUSTER_NAME"
  

結果を特定の Pod に限定する代替クエリ:

    resource.type="k8s_node"
    log_id("kubelet")
    jsonPayload.MESSAGE:"will be restarted"
    resource.labels.cluster_name="CLUSTER_NAME"
    "POD_NAME"
  

メモリ不足イベントの結果としてのコンテナのシャットダウンを検出する

    resource.type="k8s_node"
    log_id("events")
    (jsonPayload.reason:("OOMKilling" OR "SystemOOM")
      OR jsonPayload.message:("OOM encountered" OR "out of memory"))
    severity=WARNING
    resource.labels.cluster_name="CLUSTER_NAME"
    

結果を特定の Pod に限定する代替クエリ:

    resource.type="k8s_node"
    log_id("events")
    (jsonPayload.reason:("OOMKilling" OR "SystemOOM")
      OR jsonPayload.message:("OOM encountered" OR "out of memory"))
    severity=WARNING
    resource.labels.cluster_name="CLUSTER_NAME"
    "POD_NAME"
    

実行を停止したコンテナを検出する

    resource.type="k8s_node"
    log_id("kubelet")
    jsonPayload.MESSAGE:"ContainerDied"
    severity=DEFAULT
    resource.labels.cluster_name="CLUSTER_NAME"
    

結果を特定の Pod に限定する代替クエリ:

    resource.type="k8s_node"
    log_id("kubelet")
    jsonPayload.MESSAGE:"ContainerDied"
    severity=DEFAULT
    resource.labels.cluster_name="CLUSTER_NAME"
    "POD_NAME"
    

更新またはアップグレード オペレーションが Airflow タスクの実行に及ぼす影響

更新またはアップグレード オペレーションは、タスクが遅延モードで実行されている場合を除き、現在実行中の Airflow タスクを中断します。

Airflow タスクの実行への影響を最小限に抑え、DAG とタスクに適切な再試行メカニズムを設定する際は、これらのオペレーションを行うことをおすすめします。

一般的な問題

以降のセクションでは、いくつかの一般的な DAG の問題の症状と可能性のある修正方法について説明します。

Airflow タスクが Negsignal.SIGKILL によって中断された

タスクによっては、Airflow ワーカーに割り当てられているメモリよりも多くのメモリを使用する場合があります。このような状況では、Negsignal.SIGKILL によって中断される可能性があります。このシグナルは、他の Airflow タスクの実行に影響する可能性のあるメモリ消費のさらなる増大を回避するためにシステムによって送信されます。Airflow ワーカーのログに、次のログエントリが表示される場合があります。

{local_task_job.py:102} INFO - Task exited with return code Negsignal.SIGKILL

解決策の提示

  • Airflow ワーカーの worker_concurrency の低下

  • Cloud Composer 2 の場合は、Airflow ワーカーのメモリを増やす

  • Cloud Composer 1 の場合は、Composer クラスタで使用されているより大きなマシンタイプにアップグレードする

  • 使用するメモリを少なくするようタスクを最適化する

DAG 解析エラーのためにログが出力されずタスクが失敗する

微妙な DAG エラーが原因で、Airflow スケジューラと DAG プロセッサがそれぞれが、実行するタスクをスケジューリングし、DAG ファイルを解析できるものの、Airflow ワーカーがタスクの実行に失敗する状況(DAGファイルにプログラミング エラーがあるなど)が発生する可能性があります。このため、Airflow タスクが Failed とマークされ、実行からのログが存在しないことがあります。

解決策:

  • Airflow ワーカーログで、Airflow ワーカーによって引き起こされる、DAG が見つからない、または DAG 解析エラーに関連するエラーが発生していないことを確認します。

  • DAG の解析に関連付けられたパラメータを増やします。

    • dagbag-import-timeout を 120 秒以上(必要に応じてそれ以上)に増やします。

    • dag-file-processor-timeout を 180 秒以上(必要に応じてそれ以上)に増やします。この値は dagbag-import-timeout よりも大きくする必要があります。

  • DAG プロセッサログの検査もご覧ください。

リソース不足のためにログが出力されずタスクが失敗する

症状: タスクの実行中に、Airflow タスクの実行を担当する Airflow ワーカー サブプロセスが突然中断されます。 Airflow ワーカーのログに表示されるエラーは、次のようになります。

...
File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 412, in trace_task    R = retval = fun(*args, **kwargs)  File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 704, in __protected_call__    return self.run(*args, **kwargs)  File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 88, in execute_command    _execute_in_fork(command_to_exec)  File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 99, in _execute_in_fork
raise AirflowException('Celery command failed on host: ' + get_hostname())airflow.exceptions.AirflowException: Celery command failed on host: airflow-worker-9qg9x
...

解決策:

Pod のエビクションのためにログが出力されずタスクが失敗する

Google Kubernetes Engine Pod には Kubernetes Pod Lifecycle と Pod の強制排除が適用されます。タスクの急激な増大とワーカーの同時スケジューリングの 2 つが、Cloud Composer で Pod が強制排除される最も一般的な原因です。

Pod の強制排除は、特定の Pod が、ノードの構成済みのリソース使用量予想と比較して、ノードのリソースを過剰に使用すると発生する可能性があります。たとえば、強制排除は、Pod 内でメモリを多く消費するいくつかのタスクが実行され、それらの負荷の合計によりこの Pod が実行されるノードがメモリ使用量上限を超過する場合に発生する可能性があります。

Airflow ワーカー Pod が強制排除されると、その Pod で実行されているすべてのタスク インスタンスは中断され、後で Airflow によって失敗としてマークされます。

ログはバッファリングされます。バッファリングがフラッシュする前にワーカー Pod が強制排除された場合、ログは出力されません。ログなしでタスクが失敗する場合は、メモリ不足(OOM)により Airflow ワーカーが再起動されたことを示します。Airflow ログが出力されていなくても、Cloud Logging にログが存在することがあります。

ログを表示するには:

  1. Google Cloud Console で [環境] ページに移動します。

    [環境] に移動

  2. 環境のリストで、ご利用の環境の名前をクリックします。[環境の詳細] ページが開きます。

  3. [ログ] タブに移動します。

  4. [すべてのログ] -> [Airflow ログ] -> [ワーカー] -> [(個々のワーカー)] で個々のワーカーのログを表示します。

DAG の実行はメモリ制限されています。各タスクの実行は、タスクの実行とモニタリングという 2 つの Airflow プロセスとともに開始されます。各ノードは最大 6 つの同時タスク(Airflow モジュールとともに読み込む、約 12 のプロセス)を実行できます。DAG の性質によっては、より多くのメモリが消費される可能性があります。

症状:

  1. Google Cloud Console で [環境] ページに移動します。

    [ワークロード] に移動

  2. Evicted を示す airflow-worker Pod が存在する場合は、強制排除された各 Pod をクリックして、ウィンドウ上部の The node was low on resource: memory メッセージを確認します。

修正:

  • Cloud Composer 1 で、現在のマシンタイプより大きいマシンタイプを使用して、新しい Cloud Composer 環境を作成します。
  • Cloud Composer 2 で、Airflow ワーカーのメモリ上限を増やします
  • 強制排除の考えられる原因について、airflow-worker Pod のログを確認します。個別の Pod からログを取得する際の詳細については、デプロイされたワークロードの問題のトラブルシューティングをご覧ください。
  • DAG 内のタスクがべき等で再試行可能であることを確認します。
  • Airflow ワーカーのローカル ファイル システムに不要なファイルをダウンロードしないでください。

    Airflow ワーカーでは、ローカル ファイル システムの容量が制限されています。たとえば、Cloud Composer 2 では、ワーカーは 1 GB~10 GB のストレージを使用できます。保存容量が不足すると、Airflow ワーカー Pod は GKE コントロール プレーンによって強制排除されます。これにより、強制排除されたワーカーが実行していたすべてのタスクが失敗します。

    問題のあるオペレーションの例:

    • ファイルまたはオブジェクトをダウンロードし、Airflow ワーカーにローカルに保存します。代わりに、これらのオブジェクトを Cloud Storage バケットなどの適切なサービスに直接保存する。
    • Airflow ワーカーからの /data フォルダ内の大きなオブジェクトへアクセスする。 Airflow ワーカーがオブジェクトをローカル ファイル システムにダウンロードする。代わりに、DAG を実装して、大きなファイルが Airflow ワーカー Pod の外部で処理されるようにする。

DAG のインポートの読み込みのタイムアウト

症状:

  • Airflow ウェブ インターフェースで、DAG の一覧ページの上部にある赤いアラート ボックスに「Broken DAG: [/path/to/dagfile] Timeout」と表示される。
  • Cloud Monitoring で、airflow-scheduler ログに次のようなエントリが含まれる。

    • ERROR - Process timed out
    • ERROR - Failed to import: /path/to/dagfile
    • AirflowTaskTimeout: Timeout

修正:

dag_file_processor_timeout Airflow 構成のオプションをオーバーライドして DAG の解析にかける時間を増やす。

セクション キー
core dag_file_processor_timeout 新しいタイムアウト値

DAG 実行が想定時間内に終了しない

症状:

Airflow タスクが停止し、DAG 実行が想定より長く続くため、DAG 実行が終了しないことがあります。通常の条件下では、Airflow のキューまたは実行状態が無期限に続くことはありません。これは、Airflow にはこの状況を回避するためのタイムアウトとクリーンアップ手順があるためです。

修正:

  • DAG の dagrun_timeout パラメータを使用します。例: dagrun_timeout=timedelta(minutes=120)。したがって、各 DAG 実行は DAG 実行タイムアウト内に終了する必要があり、完了していないタスクは Failed または Upstream Failed とマークされます。Airflow タスクの状態の詳細については、Apache Airflow のドキュメントをご覧ください。

  • タスクの実行タイムアウト パラメータを使用して、Apache Airflow の演算子に基づいて実行されるタスクのデフォルトのタイムアウトを定義します。

DAG 実行が実行されない

症状:

DAG のスケジュール日付が動的に設定されると、さまざまな予期しない副作用が発生する可能性があります。次に例を示します。

  • DAG 実行が常に未来で、いつまでも実行されません。

  • 過去の DAG 実行が、実行済みで成功としてマークされますが、実行はされていません。

詳細については、Apache Airflow のドキュメントをご覧ください。

修正:

  • Apache Airflow のドキュメントの推奨事項に従ってください。

  • DAG に静的 start_date を設定します。オプションとして、catchup=False を使用して、過去の日付の DAG の実行を無効にできます。

  • このアプローチの副作用を認識していない限り、datetime.now() または days_ago(<number of days>) の使用は避けてください。

Airflow データベースとのネットワーク トラフィックが増加する

環境の GKE クラスタと Airflow データベース間のトラフィック ネットワークの量は、DAG の数、DAG のタスクの数、DAG が Airflow データベースのデータにアクセスする方法によって異なります。ネットワークの使用量に影響する可能性がある要因は次のとおりです。

  • Airflow データベースに対するクエリ。DAG で大量のクエリが実行されると、DAG が大量のトラフィックを生成します。例: 他のタスクに進む前にタスクのステータスを確認する、XCom テーブルに対してクエリを実行する、Airflow データベース コンテンツをダンプする。

  • 多数のタスク。スケジュールを設定するタスクが多いほど、より多くのネットワーク トラフィックが生成されます。この考慮事項は、DAG 内のタスクの合計数とスケジューリングの頻度の両方に適用されます。Airflow スケジューラは、DAG の実行をスケジュールするときに、Airflow データベースに対してクエリを実行してトラフィックを生成します。

  • Airflow ウェブ インターフェースによって、Airflow データベースにクエリが行われるので、ネットワーク トラフィックが生成されます。グラフ、タスク、図を含むページを集中的に使用すると、大量のネットワーク トラフィックが生成されます。

DAG で、Airflow ウェブサーバーがクラッシュするか、または Airflow ウェブサーバーによって 502 gateway timeout エラーが返されます

ウェブサーバーでは、いくつかの理由で失敗が生じる可能性があります。Cloud Loggingairflow-webserver ログを調べて、502 gateway timeout エラーの原因を特定します。

重い処理

このセクションは、Cloud Composer 1 にのみ適用されます。

DAG の解析時には重い処理を実行しないでください。

CPU とメモリ容量を増やすためにマシンタイプをカスタマイズできるワーカーノードやスケジューラ ノードとは異なり、ウェブサーバーは固定されたマシンタイプを使用します。そのため、解析時の処理が重すぎる場合、DAG の解析の失敗を引き起こす可能性があります。

ウェブサーバーには 2 つの vCPU と 2 GB のメモリが備わっていますcore-dagbag_import_timeout のデフォルト値は 30 秒です。このタイムアウト値により、Airflow が dags/ フォルダ内の Python モジュールの読み取りに費やす時間の上限が定義されます。

不適切な権限

このセクションは、Cloud Composer 1 にのみ適用されます。

ウェブサーバーは、ワーカーやスケジューラと同じサービス アカウントでは動作しません。そのため、ワーカーとスケジューラはウェブサーバーがアクセスできない、ユーザー管理のリソースにもアクセスできる場合があります。

DAG の解析中は、非公開リソースへのアクセスは避けることをおすすめします。これを回避できない場合もあります。その場合は、ウェブサーバーのサービス アカウントに権限を付与する必要があります。サービス アカウント名は、ウェブサーバーのドメインから取得されます。たとえば、ドメインが example-tp.appspot.com の場合、サービス アカウントは example-tp@appspot.gserviceaccount.com になります。

DAG エラー

このセクションは、Cloud Composer 1 にのみ適用されます。

ウェブサーバーは App Engine 上で動作し、環境の GKE クラスタとは分離しています。ウェブサーバーは DAG 定義ファイルを解析し、DAG にエラーがある場合は 502 gateway timeout が発生する可能性があります。問題のある DAG によって GKE で実行中のプロセスが中断されない場合、Airflow は正常に機能しているウェブサーバーなしでも正常に機能します。この場合、環境から詳細を取得し、ウェブサーバーが利用できなくなった場合の回避策とするために gcloud composer environments run を使用できます。

その他の場合には、GKE で DAG の解析を実行して、致命的な Python 例外をスローする DAG やそのタイムアウト(デフォルトは 30 秒)を確認できます。 トラブルシューティングを行うには、Airflow ワーカー コンテナ内のリモートシェルに接続して構文エラーをテストします。詳細については、DAG のテストをご覧ください。

DAG とプラグイン フォルダでの多数の DAG とプラグインの処理

/dags フォルダと /plugins フォルダの内容は、環境のバケットから Airflow ワーカーとスケジューラのローカル ファイル システムに同期されます。

これらのフォルダに格納されるデータが多いほど、同期の実行に時間がかかります。このような状況に対処するには:

  • /dags フォルダと /plugins フォルダ内のファイル数を制限します。最低限必要なファイルのみを保存します。

  • 可能であれば、Airflow スケジューラとワーカーが利用できるディスク容量を増やします。

  • 可能であれば、Airflow スケジューラとワーカーの CPU とメモリを増やして、同期オペレーションを高速化します。

  • 多数の DAG がある場合は、DAG をバッチに分割して ZIP アーカイブに圧縮し、これらのアーカイブを /dags フォルダにデプロイします。 このアプローチは、DAG の同期プロセスを高速化します。Airflow コンポーネントは、DAG を処理する前に ZIP アーカイブを解凍します。

  • DAG をプログラムで生成すると、/dags フォルダに格納された DAG ファイルの数を制限することもできます。プログラムによって生成される DAG のスケジューリングと実行に関する問題を回避するには、プログラマティック DAG に関するセクションをご覧ください。

プログラムで生成された DAG を同時にスケジュールしないでください

DAG ファイルからプログラムで DAG オブジェクトを生成することは、わずかな違いしかない多くの類似の DAG を作成する効率的な方法です。

このような DAG のすべてをすぐには実行しないようにスケジュールすることが重要です。同時にスケジュールされているすべてのタスクを実行するのに十分な CPU とメモリリソースが Airflow ワーカーにない可能性があります。

プログラマティック DAG のスケジューリングの問題を回避するには:

  • ワーカーの同時実行を増やして環境をスケールアップし、より多くのタスクを同時に実行できるようにします。
  • スケジュールを時間の経過とともに均等に分散する方法で DAG を生成し、数百のタスクを同時にスケジュールしないようにします。これにより、Airflow ワーカーはすべてのスケジュールされたタスクを実行する時間が得られます。

Airflow ウェブサーバーへのアクセス時のエラー 504

Airflow UI へのアクセス時のエラー 504 をご覧ください。

Lost connection to Postgres / MySQL server during query 例外は、タスクの実行中かタスクの直後に、スローされます。

Lost connection to Postgres / MySQL server during query 例外は、次の条件に該当する場合に頻繁に発生します。

  • DAG で PythonOperator またはカスタム演算子が使用されます。
  • DAG で Airflow データベースにクエリが行われます。

呼び出し可能な関数から複数のクエリが行われた場合、トレースバックによって Airflow コードの self.refresh_from_db(lock_for_update=True) 行が誤って指し示されることがあります。これがタスク実行後の最初のデータベース クエリです。これ以前に例外の実際の原因が生じるのは、SQLAlchemy セッションが適切に終了していない場合です。

SQLAlchemy セッションはスレッドを対象としており、呼び出し可能な関数セッションで作成され、後で Airflow コード内で継続できます。1 つのセッション内のクエリ間で大幅な遅延が発生している場合は、Postgres または MySQL サーバーによって接続がすでに閉じられている可能性があります。Cloud Composer 環境の接続タイムアウトは約 10 分に設定されています。

修正:

  • airflow.utils.db.provide_session デコレータを使用します。このデコレータは、session パラメータで Airflow データベースへの有効なセッションを提供し、関数の最後でセッションが正しく終了します。
  • 単一の長時間実行関数を使用しないでください。代わりに、すべてのデータベース クエリを別々の関数に移動し、airflow.utils.db.provide_session デコレータを持つ複数の関数が存在するようにします。この場合、セッションはクエリ結果を取得した後に自動的に終了します。

同じ DAG の DAG、タスク、並列実行の実行時間の管理

特定の DAG に対する単一の DAG 実行時間を制御する場合は、dagrun_timeout DAG パラメータを使用します。たとえば、単一の DAG 実行(実行が失敗したか失敗したかは問わない)が 1 時間を超えないようにする必要がある場合は、このパラメータを 3,600 秒に設定します。

また、1 つの Airflow タスクの許容時間を制御することもできます。その場合は、execution_timeout を使用します。

特定の DAG に対して実行するアクティブな DAG 実行の数を制御する場合は、[core]max-active-runs-per-dag Airflow 構成オプションを使用します。

特定の時点で DAG のインスタンスを 1 つしか実行しない場合は、max-active-runs-per-dag パラメータを 1 に設定します。

スケジューラ、ワーカー、ウェブサーバーに同期する DAG とプラグインに影響する問題

Cloud Composer は、/dags フォルダと /plugins フォルダの内容をスケジューラとワーカーに同期します。/dags フォルダと /plugins フォルダの特定のオブジェクトにより、この同期が正常に機能しないか、少なくとも遅くなる可能性があります。

  • /dags フォルダはスケジューラとワーカーに同期されます。このフォルダは、Cloud Composer 2 のウェブサーバーには同期されず、また Cloud Composer 1 で DAG Serialization をオンにした場合も同期されません。

  • /plugins フォルダは、スケジューラ、ワーカー、ウェブサーバーに同期されます。

次の問題が発生することがあります。

  • 圧縮トランス コーディングを使用する gzip 圧縮ファイルを、/dags フォルダと /plugins フォルダにアップロードしました。これは、通常、gsutil cp -Z コマンドを使用してデータをバケットにアップロードした場合に発生します。

    解決策: 圧縮コード変換を使用したオブジェクトを削除し、バケットに再度アップロードします。

  • オブジェクトの 1 つが「.」という名前である場合、このようなオブジェクトはスケジューラやワーカーと同期されないため、同期が完全に停止する可能性があります。

    解決策: 問題のあるオブジェクトの名前を変更します。

  • フォルダと DAG Python ファイルは同じ名前です(例: a.py)。この場合、DAG ファイルは Airflow コンポーネントに正しく同期されません。

    解決策: DAG Python ファイルと同じ名前のフォルダを削除します。

  • /dags フォルダまたは /plugins フォルダのオブジェクトの 1 つのオブジェクト名の末尾に / 記号があります。/ 記号はオブジェクトがファイルではなくフォルダであることを表すため、このようなオブジェクトでは同期プロセスで問題が発生する可能性があります。

    解決策: 問題のあるオブジェクトの名前から / 記号を削除します。

  • 不要なファイルを /dags フォルダや /plugins フォルダに保存しないでください。

    実装する DAG やプラグインに、これらのコンポーネントのテストを保存するファイルなど、追加ファイルが含まれる場合があります。これらのファイルはワーカーとスケジューラに同期されており、スケジューラ、ワーカー、ウェブサーバーへのこれらのファイルのコピーに必要な時間に影響します。

    解決策: 追加の不要なファイルを /dags フォルダや /plugins フォルダに保存しないでください。

Done [Errno 21] Is a directory: '/home/airflow/gcs/dags/...' エラーは、スケジューラとワーカーによって生成されます。

この問題は、Cloud Storage でオブジェクトに重複した名前空間がある場合、同時にスケジューラとワーカーが従来のファイル システムを使用していると発生します。たとえば、同じ名前を持つフォルダとオブジェクトの両方を環境のバケットに追加できます。バケットが環境のスケジューラとワーカーに同期されると、このエラーが生成され、タスクが失敗する可能性があります。

この問題を解決するには、環境のバケットに重複する名前空間がないことを確認してください。たとえば、/dags/misc(ファイル)と /dags/misc/example_file.txt(別のファイル)の両方がバケットにある場合、スケジューラによってエラーが生成されます。

Airflow メタデータ DB への接続時の一時的な中断

Cloud Composer は分散クラウド インフラストラクチャ上で動作します。 つまり、一時的な問題によって Airflow タスクの実行が中断される場合があります。

この場合、Airflow ワーカーのログに次のエラー メッセージが表示されることがあります。

"Can't connect to Postgres / MySQL server on 'airflow-sqlproxy-service.default.svc.cluster.local' (111)"

または

"Can't connect to Postgres / MySQL server on 'airflow-sqlproxy-service.default.svc.cluster.local' (104)"

断続的な問題は、Cloud Composer 環境に対して実行されるメンテナンス オペレーションによって発生する場合もあります。

通常、このようなエラーは断続的に発生し、Airflow タスクがべき等で、再試行が構成されている場合は影響を受けません。また、メンテナンスの時間枠の定義も検討できます。

このようなエラーが発生するもう 1 つの理由は、環境のクラスタにリソースがないことが挙げられます。そのような場合は、環境のスケーリングまたは環境の最適化の説明に沿って、環境をスケールアップまたは最適化できます。

DAG 実行が成功とマークされているものの、実行されたタスクがない

DAG 実行 execution_date が DAG の start_date よりも前に行われる場合、タスクは実行されていないものの、成功としてマークされている DAG 実行が表示される場合があります。

タスクが実行されずに成功となった DAG
図 3. タスクが実行されずに成功となった DAG(クリックして拡大)

原因

この状況は、次のいずれかに該当する可能性があります。

  • DAG の execution_datestart_date のタイムゾーンの違いにより不一致が発生している。たとえば、pendulum.parse(...) を使用して start_date を設定すると、そのようになる場合があります。

  • DAG の start_date が動的な値(例: airflow.utils.dates.days_ago(1))に設定されている

解決策

  • execution_datestart_date が同じタイムゾーンを使用していることを確認してください。

  • 過去の開始日で DAG が実行されないようにするため、静的な start_date を指定し、catchup=False と合わせて使用します。

DAG が Airflow UI や DAG UI に表示されず、スケジューラが DAG のスケジュールを設定しない

DAG プロセッサは、スケジューラがスケジュールを設定する前に、また DAG が Airflow UI または DAG UI に表示される前に、各 DAG を解析します。

次の Airflow 構成オプションでは、DAG の解析のタイムアウトを定義します。

DAG が Airflow UI または DAG UI に表示されない場合:

  • DAG プロセッサが DAG を正しく処理できるかどうか、DAG プロセッサのログを確認します。問題が発生した場合は、DAG プロセッサまたはスケジューラのログに次のログエントリが表示されることがあります。
[2020-12-03 03:06:45,672] {dag_processing.py:1334} ERROR - Processor for
/usr/local/airflow/dags/example_dag.py with PID 21903 started at
2020-12-03T03:05:55.442709+00:00 has timed out, killing it.
  • スケジューラのログをチェックして、スケジューラが正常に動作しているかどうかを確認します。問題がある場合、スケジューラのログに次のログエントリが表示されます。
DagFileProcessorManager (PID=732) last sent a heartbeat 240.09 seconds ago! Restarting it
Process timed out, PID: 68496

解決策:

  • DAG の解析エラーをすべて修正します。DAG プロセッサは複数の DAG を解析します。まれに、1 つの DAG のエラーが解析されると、他の DAG の解析に悪影響を与える可能性があります。

  • DAG の解析に [core]dagrun_import_timeout で定義された秒数以上かかる場合は、このタイムアウトを長くします。

  • すべての DAG の解析に [core]dag_file_processor_timeout で定義された秒数以上かかる場合は、このタイムアウトを長くします。

  • DAG の解析に時間がかかっている場合、最適な方法で実装されていない可能性がある。たとえば、多数の環境変数の読み取りや、外部サービスまたは Airflow データベースの呼び出しを行う場合です。可能であれば、このようなオペレーションは DAG のグローバル セクションで実行しないでください。

  • スケジューラの CPU やメモリリソースを増やして、より迅速に動作できるようにします。

  • スケジューラの数を調整する

  • DAG プロセッサ プロセスの数を増やして、解析をより迅速に行えるようにします。これを行うには、[scheduler]parsing_process の値を増やします。

  • DAG の解析頻度を下げる

  • Airflow データベースの負荷を軽減する

Airflow データベースの負荷が大きい場合の兆候

詳細については、Airflow データベースに負荷がかかっている場合の兆候をご覧ください。

次のステップ