このドキュメントでは、大規模なバッチ パイプラインでジョブの失敗の影響を最小限に抑える方法について説明します。大規模なワークロードの障害は、障害からの復旧と修正に時間と費用がかかるため、特に影響が大きく、これらのパイプラインが失敗した場合に最初から再試行すると、時間と費用の両面でコストがかかります。
費用のかかるバッチ パイプラインの障害を減らすには、このページのガイドラインに沿って対応してください。エラーが発生した要素やパイプラインのエラーを完全に回避することはできないため、ここで説明する手法は、復元力の向上、エラーのコスト削減、エラーが発生した際のデバッグと理解の容易化に重点を置いています。
パイプラインの一般的なベスト プラクティスについては、Dataflow パイプラインのベスト プラクティスをご覧ください。
大規模なジョブに対して小規模なテストを実施する
大規模なバッチジョブを実行する前に、データセットのサブセットで 1 つ以上の小規模なジョブを実行します。この手法は、費用の見積もりを提供するとともに、潜在的な障害ポイントの特定にも役立ちます。
費用予測
テストを実行すると、ジョブの実行にかかる合計費用の最小値を推定できます。通常、ジョブ費用の計算は cost of test job*size(full dataset)/size(test dataset)
です。パイプラインによっては、費用が超線形にスケーリングされる場合や、まれに線形にスケーリングされる場合があります。それでも、このステップでは、ジョブ費用の概算を把握できます。入力サイズを変更して、費用の増加をより正確に予測することもできます。この情報を使用して、既存のパイプラインを続行するか、パイプラインを再設計して費用を削減するかを決定します。
障害点を見つける
テストを実行すると、バグ、潜在的な障害ポイント、構成や効率に関する潜在的な問題が明らかになります。次の指標など、他のパイプライン指標を確認することもできます。
- パイプラインが使用可能なメモリのほぼすべてを使用している場合、負荷が高い場合やレコードが非常に大きい場合は、メモリ不足(OOM)例外が発生する可能性があります。これらの OOM エラーを回避するには、最後のジョブにさらにメモリをプロビジョニングする必要があります。
- パイプラインのスループットが低下している場合は、パイプラインのログを調べて原因を特定します。停止した要素や、特にパフォーマンスが低いデータセットの一部が見つかる場合があります。これらのデータポイントは個別に処理することも、要素の処理時にタイムアウトを適用することもできます。詳細については、このドキュメントの費用のかかるレコードのタイムアウトをご覧ください。
- パイプラインのパフォーマンスが Dataflow のタスクでローカルよりも大幅に低下する場合は、パイプライン ロジックを確認して原因を特定します。たとえば、Dataflow で 8 コアを使用してもローカルで 1 コアを使用しても同じスループットが得られる場合は、リソースの競合でジョブがボトルネックになっている可能性があります。パフォーマンスが想定よりも低い場合は、次のオプションの 1 つ以上を検討してください。
- さまざまなマシン構成またはソフトウェア構成で、さらに多くのテストを実行します。
- 複数のコアを使用してローカルで同時にテストする。
- コードを調べて、大規模なデプロイ時にボトルネックになる可能性のある部分を特定します。
パイプラインに Dataflow の推奨事項がある場合は、それらに基づいてパフォーマンスを改善します。
デッドレター キューを使用して予期しない不正なデータを処理する
多くの場合、パイプラインはほとんどの入力要素で成功しますが、入力のごく一部のサブセットで失敗します。小規模なテストでは入力のサブセットのみをテストするため、この問題が検出されないことがあります。デフォルトでは、Dataflow はこれらの失敗したタスクをバッチモードでは 4 回、ストリーミング モードでは無制限に再試行します。バッチモードでは、再試行の上限に達すると、ジョブ全体が失敗します。ストリーミング モードでは、無期限に停止する可能性があります。
多くのジョブでは、デッドレター キュー(未処理のメッセージ キュー)を使用して、これらの失敗した要素をパイプラインから除外し、ジョブの残りの部分を完了できます。デッドレター キューは、失敗したレコードを別の出力 PCollection
に渡します。この出力は、メイン出力とは別に管理できます。この構成により、これらのレコードのポリシーを設計できます。たとえば、レコードを手動で Pub/Sub に書き込み、検査してクリーンアップしてから、レコードを再処理できます。
多くの Apache Beam 変換には、デッドレター キューのサポートが組み込まれています。Java では、ErrorHandler
オブジェクトを使用してアクセスできます。Python では、with_exception_handling
メソッドを使用してアクセスできます。一部の変換では、デッドレター キューをカスタム方法で定義できます。変換のドキュメントで詳細を確認してください。詳細については、エラー処理にデッドレター キューを使用するをご覧ください。
ジョブがデッドレター キューの条件を満たしているかどうかを確認するには、このドキュメントの制限事項のセクションをご覧ください。
デッドレター キューの制限事項
次のシナリオでは、デッドレター キューが役に立たない場合があります。
- ワーカー全体または
DoFn
のライフサイクルの失敗。ワーカーまたはバンドル全体の処理が失敗した場合、デッドレター キューは失敗をキャッチできません。たとえば、パイプラインでメモリ不足(OOM)例外が発生した場合、VM 上のアクティブなタスクはすべて失敗して再試行され、デッドレター キューには何も送信されません。 - 結合やその他の集計。パイプラインで、すべての入力要素が存在し、結果の一部として処理される必要がある計算を実行する場合は、このステップの前にデッドレター キューを使用する際に注意が必要です。デッドレター キューを使用すると、入力データの一部が結果から除外されます。デッドレター キューを追加すると、正確性とフォールト トレランスのトレードオフが発生する可能性があります。
- デッドレター キューパスでの障害。デッドレター キュー シンクに送信中に要素が失敗すると、パイプライン全体が失敗する可能性があります。この障害を回避するには、デッドレター キューのロジックをできるだけシンプルにします。デッドレター キュー要素の書き込み前にメイン入力が完了するように、待機ステップ(
wait class
を参照)を追加できます。この構成では、パフォーマンスが低下し、パイプラインからのエラー シグナルが遅れる可能性があります。 - 部分的に変換された要素。パイプラインの途中にデッドレター キューを挿入すると、デッドレター キューは部分的に変換された要素を出力し、元の要素にアクセスできない可能性があります。そのため、要素をクリーンアップして、その要素に対してパイプラインを再実行することはできません。代わりに、デッドレター キュー内の出力を元の要素と関連付けるために別のロジックを適用する必要がある場合や、部分的に変換された要素を解釈して処理する必要がある場合があります。また、結果が不整合になる可能性もあります。たとえば、要素がパイプラインの 2 つのブランチに送信され、各ブランチが例外を引き起こす要素をデッドレター キューに送信する場合、単一の入力要素が 1 つのブランチ、もう 1 つのブランチ、両方のブランチ、またはどちらのブランチにも送信される可能性があります。
コストの高いレコードのタイムアウト
パイプラインは、コストの高い要素のサブセットを処理している間、またはデッドロックなどの応答不能を引き起こす制限に達したときに、応答を停止することがあります。この問題を軽減するために、一部の変換では、タイムアウトを設定し、この問題が発生したユーザーコード DoFn
内のタイムアウトした要素を失敗させることができます。たとえば、Python の with_exception_handling
メソッドを使用できます。デッドレター キューでタイムアウトを使用すると、パイプラインは正常な要素の処理を続行して進行状況を進めることができ、費用のかかる要素は個別に再処理できます。この構成ではパフォーマンスが低下する可能性があります。
タイムアウトが必要になる可能性が高い DoFn
オペレーションを特定するには、パイプラインをすべて起動する前に小さなテストを実行します。
垂直自動スケーリングを有効にする
ジョブに必要なメモリ量が不明な場合や、ジョブでメモリ不足が発生する可能性がある場合は、垂直自動スケーリングを有効にします。この機能は、パイプラインが大規模に実行されている場合や、非常に大きな要素に遭遇した場合に OOM エラーを回避するのに役立ちます。
垂直自動スケーリングはジョブの費用を増やす可能性があり、メモリ不足の失敗をすべて防ぐわけではないため、過剰なメモリ消費の問題に対処する必要があります。垂直自動スケーリングには Dataflow Prime も必要です。Dataflow Prime には追加の制限があり、課金モデルが異なります。
失敗しがちなパイプラインの回避策
パイプラインの中には、特にエラーが発生しやすいものがあります。これらのエラーの原因に対処するのが最善ですが、障害のコスト削減のために、次のオプションを検討してください。
中間結果を実体化する
パイプラインには、パイプラインの実行時間を圧迫する特に費用のかかる変換が 1 つ以上含まれている場合があります。この変換後のパイプラインの失敗は特に有害です。すでに完了した作業はすべて失われるためです。このシナリオを回避するには、コストの高いステップで生成された中間 PCollections
を Cloud Storage などのシンクに書き込むことを検討してください。この構成により、障害が発生した場合の費用を削減できます。この利点と、追加の書き込みを実行するコストを比較する必要があります。この実体化された結果は、次のいずれかの方法で使用できます。
- 元のパイプラインを、中間結果を書き込むパイプラインと中間結果を読み取るパイプラインの 2 つのパイプラインに分割します。
- パイプラインの障害が発生した場合にのみ、元のソースとマテリアライズされた中間コレクションの両方から結果を読み取り、フラット化します。
これらの実体化が次の処理の前に書き込まれるようにするには、後続の処理ステップの前に待機ステップ(wait class
を参照)を追加します。