Airflow トリガラーに関する問題のトラブルシューティング

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

このページでは、Airflow トリガーの一般的な問題に関する情報とトラブルシューティングの手順について説明します。

トリガーでのオペレーションのブロック

非同期タスクが triggerer でブロックされる場合があります。ほとんどの場合、問題は triggerer リソースが不十分であるか、カスタム非同期演算子コードに問題があることが原因です。

triggerer ログには、triggerer のパフォーマンスの低下の根本原因を特定できる警告メッセージが表示されます。注意すべき重要な警告が 2 つあります。

  1. 非同期スレッドがブロックされた

    Triggerer's async thread was blocked for 1.2 seconds, likely due to the highly utilized environment.
    

    この警告は、非同期タスクの量が多いためにパフォーマンスに問題があることを示します。

    解決策: この問題に対処するには、triggerer に追加のリソースを割り当てて、同時に実行される遅延タスクの数を減らすか、環境内の triggerer の数を増やします。triggerer が遅延可能なタスクを処理する場合でも、各タスクの開始と完了はワーカーが担います。triggerer の数を調整する場合は、ワーカー インスタンスの数のスケーリングも検討してください。

  2. 特定のタスクで非同期スレッドがブロックされました。

    WARNING - Executing <Task finished coro=<TriggerRunner.run_trigger() done, defined at /opt/***/***/jobs/my-custom-code.py:609> result=None> took 0.401 second
    

    この警告は、Cloud Composer によって実行される演算子コードの特定の部分を指しています。トリガーは設計上、asyncio ライブラリに依存してバックグラウンドでオペレーションを実行します。トリガーのカスタム実装が、asyncio 契約に正しく準拠していない可能性があります(たとえば、Python コードで await キーワードと async キーワードが正しく使用されていない場合など)。

    解決策: 警告で報告されたコードを調べて、非同期オペレーションが正しく実装されているかどうかを確認します。

トリガーが多すぎる

遅延タスクの数は、環境の Monitoring ダッシュボードにも表示される task_count 指標に表示されます。各トリガーは、メモリを消費する外部リソースへの接続など、いくつかのリソースを作成します。

Monitoring ダッシュボードに表示される遅延タスク
図 1。Monitoring ダッシュボードに表示される延期タスク(クリックして拡大)

メモリと CPU 消費量のグラフは、ハートビートが欠落していることにより liveness プローブが失敗するためにリソース不足で再起動が発生していることを示しています。

リソースが不十分なためにトリガーが再起動する
図 2.リソースが不十分なためにトリガーが再起動する(クリックして拡大)

解決策: この問題に対処するには、triggerer に追加のリソースを割り当てて、同時に実行される遅延タスクの数を減らすか、環境内の triggerer の数を増やします

コールバック実行時の Airflow ワーカーのクラッシュ

トリガーが実行を完了すると、制御は Airflow ワーカーに戻ります。ワーカーは、実行スロットを使用してコールバック メソッドを実行します。このフェーズは Celery Executor によって制御されるため、対応する構成とリソースの上限(parallelismworker_concurrency など)が適用されます。

Airflow ワーカーでコールバック メソッドが失敗した場合、ワーカーが失敗した場、メソッドを実行するワーカーが再起動した場合、タスクは FAILED とマークされます。この場合、コールバック メソッドだけでなく、タスク全体が再実行されます。

トリガー内の無限ループ

カスタム トリガー演算子を、メイン トリガー ループを完全にブロックするように実装して、壊れたトリガーの実行を一度に 1 つだけにすることもできます。この場合、問題のあるトリガーが終了すると、警告がトリガーログに生成されます

トリガークラスが見つからない

DAG フォルダが Airflow トリガーと同期されていないため、トリガーの実行時にインライン化されたトリガーコードが見つかりません。エラーは、失敗したタスクのログに生成されます。

ImportError: Module "PACKAGE_NAME" does not define a "CLASS_NAME" attribute/
class

解決策: 不足しているコードを PyPI からインポートします。

Airflow UI のトリガーに関する警告メッセージ

トリガーが無効になった後、Airflow UI に次の警告メッセージが表示されることがあります。

The triggerer does not appear to be running. Last heartbeat was received
4 hours ago. Triggers will not run, and any deferred operator will remain
deferred until it times out or fails.

不完全なトリガーが Airflow データベースに残っているため、Airflow はこのメッセージを表示できます。このメッセージは通常、環境内のすべてのトリガーが完了する前にトリガーが無効になったことを意味します。

環境で実行されているすべてのトリガーを表示するには、Airflow UI で [閲覧] > [トリガー] ページで確認します(Admin ロールが必要)。

解決策:

トリガーが無効になってもタスクが遅延状態のままである

トリガーが無効にされた場合、すでに遅延状態になっているタスクは、タイムアウトに達するまでこの状態のままになります。このタイムアウトは、Airflow と DAG の構成に応じて無限に設定できます。

次のいずれかの方法を使用します。

  • タスクを手動で失敗としてマークします。
  • タスクを完了するには、トリガーを有効にします。

環境で遅延演算子やタスクが実行されず、遅延タスクがすべて完了している場合にのみトリガーを無効にすることをおすすめします。

次のステップ