DAG のトラブルシューティング

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

このページでは、トラブルシューティングの手順と一般的なワークフローの問題に関する情報を提供します。

DAG 実行の問題の多くは、最適ではない環境のパフォーマンスが原因で発生します。環境のパフォーマンスと費用を最適化するガイドに沿って、Cloud Composer 2 環境を最適化できます。

一部の DAG 実行の問題は、Airflow スケジューラが正しく動作しないか、最適に機能していないことが原因の可能性があります。スケジューラのトラブルシューティング手順に沿って問題を解決してください。

ワークフローのトラブルシューティング

トラブルシューティングを開始するには、次の手順を行います。

  1. Airflow ログを確認します。

    Airflow のロギングレベルを上げるには、次の Airflow 構成オプションをオーバーライドします。

    Airflow 2

    セクション キー
    logging logging_level デフォルト値は INFO です。 ログメッセージの詳細度を上げるには、DEBUG に設定します。

    Airflow 1

    セクション キー
    core logging_level デフォルト値は INFO です。 ログメッセージの詳細度を上げるには、DEBUG に設定します。
  2. モニタリング ダッシュボードを確認します。

  3. Cloud Monitoring を確認します。

  4. Google Cloud コンソールで、環境のコンポーネントのページのエラーを確認します。

  5. Airflow ウェブ インターフェースの DAG のグラフビューで、失敗したタスク インスタンスを確認します。

    セクション キー
    webserver dag_orientation LRTBRL、または BT

演算子の失敗をデバッグする

演算子の失敗をデバッグするには、次の手順を行います。

  1. タスク固有のエラーを確認します
  2. Airflow ログを確認します。
  3. Cloud Monitoring を確認します。
  4. 演算子固有のログを確認します。
  5. エラーを修正します。
  6. dags/ フォルダに DAG をアップロードします。
  7. Airflow ウェブ インターフェースで、DAG の過去の状態を消去します。
  8. DAG を再開または実行します。

タスク実行のトラブルシューティング

Airflow は、タスクキューや Airflow データベースを介して相互に通信を行い、シグナル(SIGTERM など)を送信する、スケジューラ、エグゼキュータ、ワーカーなどのさまざまなエンティティを持つ分散システムです。次の図は、Airflow コンポーネント間の相互接続の概要を示しています。

Airflow コンポーネント間のインタラクション
図 1。Airflow コンポーネント間のインタラクション(クリックして拡大)

Airflow などの分散システムでは、ネットワーク接続に問題があるか、基盤となるインフラストラクチャで断続的な問題が発生している可能性があります。この場合、タスクが失敗して実行が再スケジュールされたり、タスクが正常に完了していない(ゾンビタスク、実行中にエラーになったタスクなど)ことがあります。Airflow には、こうした状況に対処するためのメカニズムがあり、自動的に機能を再開します。以下のセクションでは、Airflow によるタスクの実行中に発生する一般的な問題(ゾンビタスク、Poison Pill、SIGTERM シグナル)について説明します。

ゾンビタスクのトラブルシューティング

Airflow は、タスクとタスクを実行するプロセスの間で次の 2 種類の不一致を検出します。

  • ゾンビタスクは、実行されるはずであるが実行されていないタスクです。これは、タスクのプロセスが終了済みか応答していない場合、Airflow ワーカーが過負荷のためにタスク ステータスを時間内に報告しなかった場合、またはタスクが実行された VM がシャットダウンされた場合に発生します。Airflow はそのようなタスクを定期的に検出し、タスクの設定に応じて、失敗するか、再試行します。

    ゾンビタスクを検出する

    resource.type="cloud_composer_environment"
    resource.labels.environment_name="ENVIRONMENT_NAME"
    log_id("airflow-scheduler")
    textPayload:"Detected zombie job"
  • デッドタスクとは、実行すべきでないタスクです。Airflow はそのようなタスクを定期的に検出し、終了させます。

ゾンビタスクの一般的な理由と解決策は次のとおりです。

Airflow ワーカーのメモリ不足

各 Airflow ワーカーは、最大 [celery]worker_concurrency 個のタスク インスタンスを同時に実行できます。これらのタスク インスタンスの累積メモリ使用量が Airflow ワーカーのメモリ上限を超えると、ワーカー上のランダムなプロセスが終了し、リソースが解放されます。

Airflow ワーカーのメモリ不足イベントを検出する

resource.type="k8s_node"
resource.labels.cluster_name="GKE_CLUSTER_NAME"
log_id("events")
jsonPayload.message:"Killed process"
jsonPayload.message:("airflow task" OR "celeryd")

解決策:

Airflow ワーカーが強制排除された

Pod の強制排除は、Kubernetes でワークロードを実行するうえで通常行われることです。GKE は、ストレージが不足した場合や、優先度の高いワークロード用にリソースを解放する場合に Pod を強制排除します。

Airflow ワーカーの強制排除を検出する

resource.type="k8s_pod"
resource.labels.cluster_name="GKE_CLUSTER_NAME"
resource.labels.pod_name:"airflow-worker"
log_id("events")
jsonPayload.reason="Evicted"

解決策:

Airflow ワーカーが終了した

Airflow ワーカーが外部要因で削除される可能性があります。現在実行中のタスクが正常な終了期間内に完了しなかった場合、タスクは中断され、ゾンビとして検出される可能性があります。

Airflow ワーカー Pod の終了を検出する

resource.type="k8s_cluster"
resource.labels.cluster_name="GKE_CLUSTER_NAME"
protoPayload.methodName:"pods.delete"
protoPayload.response.metadata.name:"airflow-worker"

考えられるシナリオと解決策:

  • Airflow ワーカーは、アップグレードやパッケージのインストールなどの環境の変更中に再起動されます。

    Composer 環境の変更を検出する

    resource.type="cloud_composer_environment"
    resource.labels.environment_name="ENVIRONMENT_NAME"
    log_id("cloudaudit.googleapis.com%2Factivity")

    このようなオペレーションは、重要なタスクが実行されていないときに行います。またはタスクの再試行を有効にします。

  • メンテナンス オペレーション中は、さまざまなコンポーネントが一時的に使用できなくなることがあります。

    GKE メンテナンス オペレーションを検出する

    resource.type="gke_nodepool"
    resource.labels.cluster_name="GKE_CLUSTER_NAME"
    protoPayload.metadata.operationType="UPGRADE_NODES"

    メンテナンスの時間枠を指定して、重要なタスクの実行との重複を最小限に抑えることができます。

  • Cloud Composer 2 バージョン 2.4.5 より前では、終了する Airflow ワーカーが SIGTERM シグナルを無視し、タスクの実行を続行することがあります。

    Composer 自動スケーリングによるスケールダウンを検出する

    resource.type="cloud_composer_environment"
    resource.labels.environment_name="ENVIRONMENT_NAME"
    log_id("airflow-worker-set")
    textPayload:"Workers deleted"

    この問題が修正されている Cloud Composer の新しいバージョンにアップグレードできます。

Airflow ワーカーに大きな負荷がかかっていた

Airflow ワーカーで使用できる CPU リソースとメモリリソースの量は、環境の構成によって制限されます。使用率が上限に近づくと、リソース競合が発生し、タスクの実行中に不要な遅延が発生します。極端な状況では、長期間リソースが不足すると、ゾンビタスクが発生する可能性があります。

解決策:

Airflow データベースに大きな負荷がかかっていた

データベースは、さまざまな Airflow コンポーネントが相互に通信するために使用されます。特に、タスク インスタンスのハートビートを保存するために使用されます。データベースのリソース不足により、クエリ時間が長くなり、タスクの実行に影響する可能性があります。

解決策:

Airflow データベースが一時的に利用できなくなった

Airflow ワーカーは、一時的な接続の問題などの断続的なエラーを検出して正常に処理するまでに時間がかかる場合があります。デフォルトのゾンビ検出しきい値を超える可能性があります。

Airflow ハートビートのタイムアウトを検出する

resource.type="cloud_composer_environment"
resource.labels.environment_name="ENVIRONMENT_NAME"
log_id("airflow-worker")
textPayload:"Heartbeat time limit exceeded"

解決策:

  • ゾンビタスクのタイムアウトを増やし、[scheduler]scheduler_zombie_task_threshold Airflow 構成オプションの値をオーバーライドします。

    セクション キー メモ
    scheduler scheduler_zombie_task_threshold 新しいタイムアウト(秒単位) デフォルト値は 300 です

Poison Pill のトラブルシューティング

Poison Pill は、Airflow タスクをシャットダウンするために Airflow が使用するメカニズムです。

Airflow は、次のような場合に Poison Pill を使用します。

  • スケジューラが時間内に完了しなかったタスクを終了するとき。
  • タスクがタイムアウトした場合や、実行時間が長すぎる場合。

Airflow が Poison Pill を使用すると、タスクを実行した Airflow ワーカーのログに次のログエントリが表示されます。

  INFO - Subtask ... WARNING - State of this instance has been externally set
  to success. Taking the poison pill.
  INFO - Subtask ... INFO - Sending Signals.SIGTERM to GPID <X>
  INFO - Subtask ... ERROR - Received SIGTERM. Terminating subprocesses.

解決策の提示

  • 実行に時間がかかりすぎる原因となるエラーがないか、タスクコードを確認します。
  • (Cloud Composer 2)タスクをより迅速に実行できるように、Airflow ワーカーの CPU とメモリを増やします
  • [celery_broker_transport_options]visibility-timeout Airflow 構成オプションの値を増やします。

    その結果、スケジューラはタスクの完了を待機してから、そのタスクをゾンビタスクとみなします。このオプションは、何時間もかかる作業に特に便利です。値が低すぎる場合(3 時間など)、スケジューラは 5~6 時間実行されたタスクを「ハングした」(ゾンビタスク)と見なします。

  • [core]killed_task_cleanup_time Airflow 構成オプションの値を増やします。

    値を大きくすると、Airflow ワーカーはタスクを正常に完了するまでの時間が長くなります。値が小さすぎると、Airflow タスクが突然中断され、作業を正常に完了するのに十分な時間がありません。

SIGTERM シグナルのトラブルシューティング

SIGTERM シグナルは、Linux、Kubernetes、Airflow スケジューラ、Celery が Airflow ワーカーまたは Airflow タスクの実行を処理するプロセスを終了するために使用されます。

次のようないくつかの理由で、SIGTERM シグナルが環境内で送信される場合があります。

  • タスクがゾンビタスクになったため、停止する必要がある。

  • スケジューラがタスクの重複を検出し、Poison Pill と SIGTERM シグナルをタスクに送信してタスクを停止する。

  • 水平 Pod 自動スケーリングで、GKE コントロール プレーンが SIGTERM シグナルを送信して、不要になった Pod を削除する。

  • スケジューラは、DagFileProcessorManager プロセスに SIGTERM シグナルを送信できます。このような SIGTERM シグナルは、DagFileProcessorManager プロセスのライフサイクルを管理するために Scheduler によって使用され、無視しても問題ありません。

    例:

    Launched DagFileProcessorManager with pid: 353002
    Sending Signals.SIGTERM to group 353002. PIDs of all processes in the group: []
    Sending the signal Signals.SIGTERM to group 353002
    Sending the signal Signals.SIGTERM to process 353002 as process group is missing.
    
  • タスクの実行をモニタリングする local_task_job のハートビート コールバックと終了コールバック間の競合状態。ハートビートは、タスクが成功とマークされたことを検出した場合、タスク自体が成功したのか、タスクが成功したと見なすように Airflow が通知されたのかを区別できません。それでも、タスクランナーは、終了するのを待たずに停止します。

    このような SIGTERM シグナルは無視してかまいません。タスクはすでに成功状態にあり、DAG 実行全体の実行は影響を受けません。

    ログエントリ Received SIGTERM. は、通常の終了と成功した状態のタスクの停止との唯一の違いです。

    ハートビート コールバックと終了コールバック間の競合状態
    図 2.ハートビート コールバックと終了コールバック間の競合状態(クリックして拡大)
  • Airflow コンポーネントが、クラスタノードで許可されているよりも多くのリソース(CPU、メモリ)を使用している。

  • GKE Service は、メンテナンス オペレーションを実行し、アップグレードされようとしているノード上で実行される Pod に SIGTERM シグナルを送信します。タスク インスタンスが SIGTERM で停止すると、タスクを実行する Airflow ワーカーのログに次のログエントリが表示されます。

{local_task_job.py:211} WARNING - State of this instance has been externally
set to queued. Terminating instance. {taskinstance.py:1411} ERROR - Received
SIGTERM. Terminating subprocesses. {taskinstance.py:1703} ERROR - Task failed
with exception

解決策の提示

この問題は、タスクを実行する VM のメモリが不足している場合に発生します。これは Airflow 構成ではなく、VM で使用可能なメモリ量に関連します。

メモリを増加させる方法は、使用する Cloud Composer のバージョンによって異なります。次に例を示します。

  • Cloud Composer 2 では、より多くの CPU リソースとメモリリソースを Airflow ワーカーに割り当てることができます。

  • Cloud Composer 1 の場合は、パフォーマンスの高いマシンタイプを使用して環境を再作成できます。

  • 両方のバージョンの Cloud Composer で、[celery]worker_concurrency 同時実行 Airflow 構成オプションの値を小さくできます。このオプションは、特定の Airflow ワーカーによって同時に実行されるタスクの数を決定します。

Cloud Composer 2 環境の最適化の詳細については、環境のパフォーマンスと費用を最適化するをご覧ください。

Pod の再起動または強制排除の理由を特定する Cloud Logging クエリ

Cloud Composer の環境では、コンピューティング インフラストラクチャ レイヤとして GKE クラスタが使用されます。このセクションでは、Airflow ワーカーや Airflow スケジューラの再起動、強制排除の原因を特定することに役立つクエリについて説明します。

以下に示すクエリは、次のようにチューニングできます。

  • 希望するタイムラインを Cloud Logging で指定できます。たとえば、過去 6 時間、3 日、またはカスタム期間を定義できます。

  • Cloud Composer の CLUSTER_NAME を指定する必要があります。

  • POD_NAME を追加して、検索を特定の Pod に限定することもできます。

再起動されたコンテナを検出する

    resource.type="k8s_node"
    log_id("kubelet")
    jsonPayload.MESSAGE:"will be restarted"
    resource.labels.cluster_name="CLUSTER_NAME"
  

結果を特定の Pod に限定する代替クエリ:

    resource.type="k8s_node"
    log_id("kubelet")
    jsonPayload.MESSAGE:"will be restarted"
    resource.labels.cluster_name="CLUSTER_NAME"
    "POD_NAME"
  

メモリ不足イベントの結果としてのコンテナのシャットダウンを検出する

    resource.type="k8s_node"
    log_id("events")
    (jsonPayload.reason:("OOMKilling" OR "SystemOOM")
      OR jsonPayload.message:("OOM encountered" OR "out of memory"))
    severity=WARNING
    resource.labels.cluster_name="CLUSTER_NAME"
    

結果を特定の Pod に限定する代替クエリ:

    resource.type="k8s_node"
    log_id("events")
    (jsonPayload.reason:("OOMKilling" OR "SystemOOM")
      OR jsonPayload.message:("OOM encountered" OR "out of memory"))
    severity=WARNING
    resource.labels.cluster_name="CLUSTER_NAME"
    "POD_NAME"
    

実行を停止したコンテナを検出する

    resource.type="k8s_node"
    log_id("kubelet")
    jsonPayload.MESSAGE:"ContainerDied"
    severity=DEFAULT
    resource.labels.cluster_name="CLUSTER_NAME"
    

結果を特定の Pod に限定する代替クエリ:

    resource.type="k8s_node"
    log_id("kubelet")
    jsonPayload.MESSAGE:"ContainerDied"
    severity=DEFAULT
    resource.labels.cluster_name="CLUSTER_NAME"
    "POD_NAME"
    

更新またはアップグレード オペレーションが Airflow タスクの実行に及ぼす影響

更新またはアップグレード オペレーションは、タスクが遅延モードで実行されている場合を除き、現在実行中の Airflow タスクを中断します。

Airflow タスクの実行への影響が最小限に抑えられることが予想される場合にこれらのオペレーションを実行して、DAG とタスクに適切な再試行メカニズムを設定することをおすすめします。

KubernetesExecutor タスクのトラブルシューティング

CeleryKubernetesExecutor は、CeleryExecutor と KubernetesExecutor を同時に使用できる Cloud Composer 3 のエグゼキュータの一種です。

KubernetesExecutor で実行したタスクのトラブルシューティングの詳細については、CeleryKubernetesExecutor を使用するのページをご覧ください。

一般的な問題

以降のセクションでは、いくつかの一般的な DAG の問題の症状と可能性のある修正方法について説明します。

Airflow タスクが Negsignal.SIGKILL によって中断された

タスクによっては、Airflow ワーカーに割り当てられているメモリよりも多くのメモリを使用する場合があります。このような状況では、Negsignal.SIGKILL によって中断される可能性があります。このシグナルは、他の Airflow タスクの実行に影響する可能性のあるメモリ消費のさらなる増大を回避するためにシステムによって送信されます。Airflow ワーカーのログに次のようなログエントリが表示される場合があります。

{local_task_job.py:102} INFO - Task exited with return code Negsignal.SIGKILL

Negsignal.SIGKILL はコード -9 として表示される場合もあります。

解決策の提示

  • Airflow ワーカーの worker_concurrency を減らす。

  • Cloud Composer 2 の場合は、Airflow ワーカーのメモリを増やします。

  • Cloud Composer 1 の場合は、Cloud Composer クラスタで使用されているより大きなマシンタイプにアップグレードします。

  • 使用するメモリを少なくするようタスクを最適化します。

  • KubernetesPodOperator または GKEStartPodOperator を使用してタスクの分離とカスタマイズされたリソース割り当てを行うことで、Cloud Composer でリソースを大量に消費するタスクを管理します。

DAG 解析エラーのためにログが出力されずタスクが失敗する

微妙な DAG エラーが原因で、Airflow スケジューラと DAG プロセッサがそれぞれが、実行するタスクをスケジューリングし、DAG ファイルを解析できるものの、Airflow ワーカーがタスクの実行に失敗する状況(DAGファイルにプログラミング エラーがあるなど)が発生する可能性があります。このため、Airflow タスクが Failed とマークされ、実行からのログが存在しないことがあります。

ソリューション:

  • Airflow ワーカーログで、Airflow ワーカーによって引き起こされる、DAG が見つからない、または DAG 解析エラーに関連するエラーが発生していないことを確認します。

  • DAG の解析に関連付けられたパラメータを増やします。

    • dagbag-import-timeout を 120 秒以上(必要に応じてそれ以上)に増やします。

    • dag-file-processor-timeout を 180 秒以上(必要に応じてそれ以上)に増やします。この値は dagbag-import-timeout より大きい値にする必要があります。

  • DAG プロセッサのログの検査もご覧ください。

リソースの負荷が原因でログが出力されずタスクが失敗する

症状: タスクの実行中に、Airflow タスクの実行を担当する Airflow ワーカー サブプロセスが突然中断されます。 Airflow ワーカーのログに表示されるエラーは、次のようになります。

...
File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 412, in trace_task    R = retval = fun(*args, **kwargs)  File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 704, in __protected_call__    return self.run(*args, **kwargs)  File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 88, in execute_command    _execute_in_fork(command_to_exec)  File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 99, in _execute_in_fork
raise AirflowException('Celery command failed on host: ' + get_hostname())airflow.exceptions.AirflowException: Celery command failed on host: airflow-worker-9qg9x
...

解決方法:

Pod のエビクションのためにログが出力されずタスクが失敗する

Google Kubernetes Engine Pod には Kubernetes Pod Lifecycle と Pod の強制排除が適用されます。タスクの急激な増大とワーカーの同時スケジューリングの 2 つが、Cloud Composer で Pod が強制排除される最も一般的な原因です。

Pod の強制排除は、特定の Pod が、ノードの構成済みのリソース使用量予想と比較して、ノードのリソースを過剰に使用すると発生する可能性があります。たとえば、強制排除は、Pod 内でメモリを多く消費するいくつかのタスクが実行され、それらの負荷の合計によりこの Pod が実行されるノードがメモリ使用量上限を超過する場合に発生する可能性があります。

Airflow ワーカー Pod が強制排除されると、その Pod で実行されているすべてのタスク インスタンスは中断され、後で Airflow によって失敗としてマークされます。

ログはバッファリングされます。バッファリングがフラッシュする前にワーカー Pod が強制排除された場合、ログは出力されません。ログなしでタスクが失敗する場合は、メモリ不足(OOM)により Airflow ワーカーが再起動されたことを示します。Airflow ログが出力されなかった場合でも、一部のログが Cloud Logging に存在することがあります。

ログを表示するには:

  1. Google Cloud Console で [環境] ページに移動します。

    [環境] に移動

  2. 環境のリストで、ご利用の環境の名前をクリックします。[環境の詳細] ページが開きます。

  3. [ログ] タブに移動します。

  4. [すべてのログ] -> [Airflow ログ] -> [ワーカー] -> [(個々のワーカー)] で個々のワーカーのログを表示します。

DAG の実行にはメモリの制限があります。各タスクの実行は、タスクの実行とモニタリングという 2 つの Airflow プロセスとともに開始されます。各ノードは最大 6 つの同時タスク(Airflow モジュールとともに読み込む、約 12 のプロセス)を実行できます。DAG の性質によっては、より多くのメモリが消費される可能性があります。

症状:

  1. Google Cloud Console で [環境] ページに移動します。

    [ワークロード] に移動

  2. Evicted を示す airflow-worker Pod が存在する場合は、強制排除された各 Pod をクリックして、ウィンドウ上部の The node was low on resource: memory メッセージを確認します。

修正:

  • Cloud Composer 1 では、現在のマシンタイプより大きいマシンタイプを使用して、新しい Cloud Composer 環境を作成します。
  • Cloud Composer 2 で、Airflow ワーカーのメモリ上限を増やします
  • 強制排除の考えられる原因について、airflow-worker Pod のログを確認します。個別の Pod からログを取得する際の詳細については、デプロイされたワークロードの問題のトラブルシューティングをご覧ください。
  • DAG 内のタスクがべき等で再試行可能であることを確認します。
  • Airflow ワーカーのローカル ファイル システムに不要なファイルをダウンロードしないようにします。

    Airflow ワーカーのローカル ファイル システムの容量には上限があります。たとえば、Cloud Composer 2 では、ワーカーは 1 GB~10 GB のストレージを使用できます。保存容量が不足すると、Airflow ワーカー Pod は GKE コントロール プレーンによって強制排除されます。これによって、強制排除されたワーカーが実行していたすべてのタスクが失敗します。

    問題のあるオペレーションの例:

    • ファイルまたはオブジェクトをダウンロードして、Airflow ワーカーにローカルに保存する。代わりに、これらのオブジェクトを Cloud Storage バケットなどの適切なサービスに直接保存する。
    • Airflow ワーカーからの /data フォルダ内の大きなオブジェクトへアクセスする。 Airflow ワーカーがオブジェクトをローカル ファイル システムにダウンロードする。代わりに、DAG を実装して、大きなファイルが Airflow ワーカー Pod の外部で処理されるようにする。

DAG のインポートの読み込みのタイムアウト

症状:

  • Airflow ウェブ インターフェースで、DAG の一覧ページの上部にある赤いアラート ボックスに「Broken DAG: [/path/to/dagfile] Timeout」と表示される。
  • Cloud Monitoring で、airflow-scheduler ログに次のようなエントリが含まれる。

    • ERROR - Process timed out
    • ERROR - Failed to import: /path/to/dagfile
    • AirflowTaskTimeout: Timeout

修正:

dag_file_processor_timeout Airflow 構成のオプションをオーバーライドして DAG の解析にかける時間を増やす。

セクション キー
core dag_file_processor_timeout 新しいタイムアウト値

DAG 実行が想定時間内に終了しない

症状:

Airflow タスクが停止し、DAG 実行が想定より長く続くため、DAG 実行が終了しないことがあります。通常の条件下では、Airflow のキューまたは実行状態が無期限に続くことはありません。これは、Airflow にはこの状況を回避するためのタイムアウトとクリーンアップ手順があるためです。

修正:

  • DAG には dagrun_timeout パラメータを使用します。例: dagrun_timeout=timedelta(minutes=120)。したがって、各 DAG 実行は DAG 実行タイムアウト内に終了する必要があり、完了していないタスクは Failed または Upstream Failed とマークされます。Airflow タスクの状態の詳細については、Apache Airflow のドキュメントをご覧ください。

  • タスク実行タイムアウト パラメータを使用して、Apache Airflow 演算子に基づいて実行されるタスクのデフォルトのタイムアウトを定義します。

DAG 実行が実行されない

症状:

DAG のスケジュール日付が動的に設定されると、さまざまな予期しない副作用が発生する可能性があります。次に例を示します。

  • DAG 実行が常に未来で、いつまでも実行されません。

  • 過去の DAG 実行が、実行済みで成功としてマークされますが、実行はされていません。

詳細については、Apache Airflow のドキュメントをご覧ください。

修正:

  • Apache Airflow のドキュメントの推奨事項に従ってください。

  • DAG に静的 start_date を設定します。必要に応じて、catchup=False を使用して過去の日付の DAG の実行を無効にできます。

  • このアプローチの副作用を認識していない限り、datetime.now() または days_ago(<number of days>) の使用は避けてください。

Airflow データベースとのネットワーク トラフィックが増加する

環境の GKE クラスタと Airflow データベース間のトラフィック ネットワークの量は、DAG の数、DAG 内のタスクの数、DAG が Airflow データベースのデータにアクセスする方法によって異なります。ネットワーク使用量に影響する要因は次のとおりです。

  • Airflow データベースに対するクエリ。DAG で多くのクエリを実行すると、大量のトラフィックが発生します。例: 他のタスクに進む前にタスクのステータスを確認する、XCom テーブルに対してクエリを実行する、Airflow データベース コンテンツをダンプする。

  • 多数のタスク。スケジュールするタスクが多いほど、生成されるネットワーク トラフィックは多くなります。この考慮事項は、DAG のタスクの合計数とスケジュール設定頻度の両方に適用されます。Airflow スケジューラが DAG の実行をスケジュールすると、Airflow データベースにクエリが実行され、トラフィックが生成されます。

  • Airflow ウェブ インターフェースによって、Airflow データベースにクエリが行われるので、ネットワーク トラフィックが生成されます。グラフ、タスク、図を含むページを集中的に使用すると、大量のネットワーク トラフィックが生成されます。

DAG で、Airflow ウェブサーバーがクラッシュするか、または Airflow ウェブサーバーによって 502 gateway timeout エラーが返されます

ウェブサーバーでは、いくつかの異なる理由で失敗が生じる可能性があります。Cloud Loggingairflow-webserver ログを確認して、502 gateway timeout エラーの原因を特定します。

重い処理

このセクションは、Cloud Composer 1 にのみ適用されます。

DAG の解析時には重い処理を実行しないでください。

CPU とメモリ容量を増やすためにマシンタイプをカスタマイズできるワーカーノードやスケジューラ ノードとは異なり、ウェブサーバーは固定されたマシンタイプを使用します。そのため、解析時の処理が重すぎる場合、DAG の解析の失敗を引き起こす可能性があります。

ウェブサーバーには 2 つの vCPU と 2 GB のメモリが備わっていますcore-dagbag_import_timeout のデフォルト値は 30 秒です。このタイムアウト値により、Airflow が dags/ フォルダ内の Python モジュールの読み取りに費やす時間の上限が定義されます。

不適切な権限

このセクションは、Cloud Composer 1 にのみ適用されます。

ウェブサーバーは、ワーカーやスケジューラと同じサービス アカウントでは動作しません。そのため、ワーカーとスケジューラはウェブサーバーがアクセスできない、ユーザー管理のリソースにもアクセスできる場合があります。

DAG の解析中は、非公開リソースへのアクセスは避けることをおすすめします。これを回避できない場合もあります。その場合は、ウェブサーバーのサービス アカウントに権限を付与する必要があります。サービス アカウント名は、ウェブサーバーのドメインから取得されます。たとえば、ドメインが example-tp.appspot.com の場合、サービス アカウントは example-tp@appspot.gserviceaccount.com になります。

DAG エラー

このセクションは、Cloud Composer 1 にのみ適用されます。

ウェブサーバーは App Engine 上で動作し、環境の GKE クラスタとは分離しています。ウェブサーバーは DAG 定義ファイルを解析し、DAG にエラーがある場合は 502 gateway timeout が発生する可能性があります。問題のある DAG によって GKE で実行中のプロセスが中断されない場合、Airflow は正常に機能しているウェブサーバーなしでも正常に機能します。この場合、環境から詳細を取得し、ウェブサーバーが利用できなくなった場合の回避策とするために gcloud composer environments run を使用できます。

その他の場合には、GKE で DAG の解析を実行して、致命的な Python 例外をスローする DAG やそのタイムアウト(デフォルトは 30 秒)を確認できます。 トラブルシューティングを行うには、Airflow ワーカー コンテナ内のリモートシェルに接続して構文エラーをテストします。詳細については、DAG のテストをご覧ください。

DAG フォルダと プラグイン フォルダ内の多数の DAG とプラグインの処理

/dags フォルダと /plugins フォルダの内容は、環境のバケットから Airflow ワーカーとスケジューラのローカル ファイル システムに同期されます。

これらのフォルダに保存されるデータが多いほど、同期の実行に時間がかかります。このような状況に対処するには:

  • /dags フォルダと /plugins フォルダ内のファイル数を制限します。最低限必要なファイルのみを保存します。

  • 可能であれば、Airflow スケジューラとワーカーが使用できるディスク容量を増やします。

  • 可能であれば、Airflow スケジューラと Airflow ワーカーの CPU とメモリを増やして、同期オペレーションをより高速に行ないます。

  • 多数の DAG がある場合は、DAG をバッチに分割して ZIP アーカイブに圧縮し、これらのアーカイブを /dags フォルダにデプロイします。 このアプローチにより、DAG の同期プロセスが高速化されます。Airflow コンポーネントは、DAG を処理する前に ZIP アーカイブを解凍します。

  • プログラムでの DAG の生成は、/dags フォルダに保存される DAG ファイルの数を制限する方法の 1 つです。プログラムによって生成される DAG のスケジューリングと実行に関する問題を回避するには、プログラマティック DAG に関するセクションをご覧ください。

プログラムで生成された DAG を同時にスケジュールしないでください

DAG ファイルからプログラムで DAG オブジェクトを生成することは、わずかな違いしかない多くの類似の DAG を作成する効率的な方法です。

このような DAG のすべてをすぐには実行しないようにスケジュールすることが重要です。同時にスケジュールされているすべてのタスクを実行するのに十分な CPU とメモリリソースが Airflow ワーカーにない可能性があります。

プログラマティック DAG のスケジューリングの問題を回避するには:

  • ワーカーの同時実行を増やし、環境をスケールアップして、より多くのタスクを同時に実行できるようにします。
  • スケジュールを時間の経過とともに均等に分散する方法で DAG を生成し、数百のタスクを同時にスケジュールしないようにします。これにより、Airflow ワーカーはすべてのスケジュールされたタスクを実行する時間が得られます。

Airflow ウェブサーバーへのアクセス時のエラー 504

Airflow UI へのアクセス時のエラー 504 をご覧ください。

Lost connection to Postgres server during query 例外は、タスクの実行中かタスクの直後に、スローされます。

Lost connection to Postgres server during query 例外は、次の条件に該当する場合に頻繁に発生します。

  • DAG で PythonOperator またはカスタム演算子が使用されます。
  • DAG で Airflow データベースにクエリが行われます。

呼び出し可能な関数から複数のクエリが行われた場合、トレースバックによって Airflow コードの self.refresh_from_db(lock_for_update=True) 行が誤って指し示されることがあります。これがタスク実行後の最初のデータベース クエリです。これ以前に例外の実際の原因が生じるのは、SQLAlchemy セッションが適切に終了していない場合です。

SQLAlchemy セッションはスレッドを対象としており、呼び出し可能な関数セッションで作成され、後で Airflow コード内で継続できます。1 つのセッション内のクエリ間で大幅な遅延が発生している場合は、Postgres サーバーによって接続がすでに閉じられている可能性があります。Cloud Composer 環境の接続タイムアウトは約 10 分に設定されています。

修正:

  • airflow.utils.db.provide_session デコレータを使用します。このデコレータは、session パラメータで Airflow データベースへの有効なセッションを提供し、関数の最後でセッションが正しく終了します。
  • 単一の長時間実行関数を使用しないでください。代わりに、すべてのデータベース クエリを別々の関数に移動し、airflow.utils.db.provide_session デコレータを持つ複数の関数が存在するようにします。この場合、セッションはクエリ結果を取得した後に自動的に終了します。

同じ DAG の DAG、タスク、並列実行の実行時間の管理

特定の DAG に対する単一の DAG 実行時間を制御する場合は、dagrun_timeout DAG パラメータを使用します。たとえば、単一の DAG 実行(実行が失敗したか失敗したかは問わない)が 1 時間を超えないようにする必要がある場合は、このパラメータを 3,600 秒に設定します。

1 つの Airflow タスクの実行時間を制御することもできます。これを行うには execution_timeout を使用できます。

特定の DAG に対して実行するアクティブな DAG 実行の数を制御する場合は、[core]max-active-runs-per-dag Airflow 構成オプションを使用します。

特定の時点で DAG のインスタンスを 1 つしか実行しない場合は、max-active-runs-per-dag パラメータを 1 に設定します。

スケジューラ、ワーカー、ウェブサーバーに同期する DAG とプラグインに影響する問題

Cloud Composer は、/dags フォルダと /plugins フォルダの内容をスケジューラとワーカーに同期します。/dags フォルダと /plugins フォルダの特定のオブジェクトにより、この同期が正常に機能しないか、少なくとも遅くなる可能性があります。

  • /dags フォルダはスケジューラとワーカーに同期されます。このフォルダは、Cloud Composer 2 のウェブサーバーには同期されず、また Cloud Composer 1 で DAG Serialization をオンにした場合も同期されません。

  • /plugins フォルダはスケジューラ、ワーカー、ウェブサーバーに同期されます。

次の問題が発生することがあります。

  • 圧縮トランス コーディングを使用する gzip 圧縮ファイルを、/dags フォルダと /plugins フォルダにアップロードしました。これは、通常、gcloud storage cp コマンドの --gzip-local-all フラグを使用してデータをバケットにアップロードした場合に発生します。

    解決策: 圧縮コード変換を使用したオブジェクトを削除し、バケットに再度アップロードします。

  • オブジェクトの 1 つが「.」という名前である場合、このようなオブジェクトはスケジューラやワーカーと同期されないため、同期が完全に停止する可能性があります。

    解決策: 問題のあるオブジェクトの名前を変更します。

  • フォルダと DAG Python ファイルの名前が同じです(例: a.py)。この場合、DAG ファイルは Airflow コンポーネントに適切に同期されません。

    解決策: DAG Python ファイルと同じ名前のフォルダを削除します。

  • /dags フォルダまたは /plugins フォルダのオブジェクトの 1 つのオブジェクト名の末尾に / 記号があります。/ 記号はオブジェクトがファイルではなくフォルダであることを表すため、このようなオブジェクトでは同期プロセスで問題が発生する可能性があります。

    解決策: 問題のあるオブジェクトの名前から / 記号を削除します。

  • 不要なファイルを /dags フォルダや /plugins フォルダに保存しないでください。

    実装する DAG やプラグインに、これらのコンポーネントのテストを保存するファイルなど、追加ファイルが含まれる場合があります。これらのファイルはワーカーとスケジューラに同期されており、スケジューラ、ワーカー、ウェブサーバーへのこれらのファイルのコピーに必要な時間に影響します。

    解決策: 追加の不要なファイルを /dags フォルダや /plugins フォルダに保存しないでください。

Done [Errno 21] Is a directory: '/home/airflow/gcs/dags/...' エラーがスケジューラとワーカーによって生成される

この問題は、Cloud Storage でオブジェクトに重複した名前空間がある場合、同時にスケジューラとワーカーが従来のファイル システムを使用していると発生します。たとえば、同じ名前を持つフォルダとオブジェクトの両方を環境のバケットに追加できます。バケットが環境のスケジューラとワーカーに同期されると、このエラーが発生し、タスクが失敗する可能性があります。

この問題を解決するには、環境のバケットに重複する名前空間がないことを確認します。たとえば、/dags/misc(ファイル)と /dags/misc/example_file.txt(別のファイル)の両方がバケットにある場合、スケジューラによってエラーが生成されます。

Airflow メタデータ DB への接続時の一時的な中断

Cloud Composer は分散クラウド インフラストラクチャ上で動作します。 つまり、一時的な問題によって Airflow タスクの実行が中断される場合があります。

この場合、Airflow ワーカーのログに次のエラー メッセージが表示されることがあります。

"Can't connect to Postgres server on 'airflow-sqlproxy-service.default.svc.cluster.local' (111)"

または

"Can't connect to Postgres server on 'airflow-sqlproxy-service.default.svc.cluster.local' (104)"

このような断続的な問題は、Cloud Composer 環境に対して実施されるメンテナンス オペレーションによっても発生する可能性もあります。

通常、このようなエラーは断続的に発生し、Airflow タスクがべき等で、再試行が構成されている場合は影響を受けません。メンテナンスの時間枠を定義することも検討してください。

このようなエラーが発生するもう 1 つの理由は、環境のクラスタにリソースがないことが挙げられます。そのような場合は、環境のスケーリングまたは環境の最適化の説明に沿って、環境をスケールアップまたは最適化できます。

DAG 実行が成功とマークされているものの、実行されたタスクがない

DAG 実行 execution_date が DAG の start_date よりも前に行われる場合、タスクは実行されていないものの、成功としてマークされている DAG 実行が表示される場合があります。

タスクが実行されずに成功となった DAG
図 3. タスクが実行されずに成功となった DAG(クリックして拡大)

原因

この状況は、次のいずれかの場合で発生する可能性があります。

  • DAG の execution_datestart_date のタイムゾーンの違いにより不一致が発生している。たとえば、pendulum.parse(...) を使用して start_date を設定すると、そのようになる場合があります。

  • DAG の start_date が動的な値(例: airflow.utils.dates.days_ago(1))に設定されている

解決策

  • execution_datestart_date で同じタイムゾーンを使用していることを確認します。

  • 過去の開始日で DAG が実行されないようにするため、静的な start_date を指定し、catchup=False と合わせて使用します。

DAG が Airflow UI や DAG UI に表示されず、スケジューラが DAG のスケジュールを設定しない

DAG プロセッサは、スケジューラがスケジュールを設定する前に、また DAG が Airflow UI または DAG UI に表示される前に、各 DAG を解析します。

次の Airflow 構成オプションは、DAG の解析のタイムアウトを定義します。

DAG が Airflow UI または DAG UI に表示されない場合は、次の操作を行います。

  • DAG プロセッサが DAG を正しく処理できるかどうか、DAG プロセッサのログを確認します。問題が発生した場合、DAG プロセッサまたはスケジューラのログに次のログエントリが表示されます。
[2020-12-03 03:06:45,672] {dag_processing.py:1334} ERROR - Processor for
/usr/local/airflow/dags/example_dag.py with PID 21903 started at
2020-12-03T03:05:55.442709+00:00 has timed out, killing it.
  • スケジューラのログをチェックして、スケジューラが正常に動作しているかどうかを確認します。問題がある場合、スケジューラのログに次のログエントリが表示されます。
DagFileProcessorManager (PID=732) last sent a heartbeat 240.09 seconds ago! Restarting it
Process timed out, PID: 68496

解決策:

  • DAG 解析エラーをすべて修正します。DAG プロセッサは複数の DAG を解析します。まれに、1 つの DAG の解析エラーが他の DAG の解析に悪影響を及ぼすことがあります。

  • DAG の解析に [core]dagrun_import_timeout で定義された秒数を超える時間がかかる場合は、このタイムアウトを長くします。

  • すべての DAG の解析に [core]dag_file_processor_timeout で定義されている秒数を超える時間がかかる場合は、このタイムアウトを長くします。

  • DAG の解析に時間がかかっている場合、最適な方法で実装されていない可能性がある。たとえば、多くの環境変数を読み取る場合や、外部サービスまたは Airflow データベースへの呼び出しを行う場合が該当します。可能な限り、DAG のグローバル セクションでこのようなオペレーションを実行しないようにします。

  • スケジューラが高速に動作するように、スケジューラの CPU リソースとメモリリソースを増やします。

  • スケジューラの数を調整する

  • DAG プロセッサのプロセス数を増やして、解析を高速化できるようにします。これを行うには、[scheduler]parsing_process の値を増やします。

  • DAG 解析の頻度を下げる

  • Airflow データベースの負荷を軽減する

Airflow データベースの負荷が大きい場合の兆候

詳細については、Airflow データベースに負荷がかかっている場合の兆候をご覧ください。

次のステップ