Dataflow での 1 回限りの処理

Dataflow は、レコードの 1 回限りの処理をサポートしています。このページでは、Dataflow が低レイテンシを実現しながら 1 回限りの処理を実装している方法について説明します。

概要

バッチ パイプラインは常に 1 回限りの処理を使用します。ストリーミング パイプラインはデフォルトで 1 回限りの処理を使用しますが、1 回以上の処理も使用できます。

1 回限りの処理では、各パイプライン ステージの結果など、レコードの処理の結果が保証されます。具体的には、Dataflow はソースからパイプラインに到着するレコード、または前のステージからステージに到着したレコードごとに次のことを確認します。

  • レコードが処理され、失われていない。
  • パイプライン内にとどまる処理の結果は 1 回だけ反映されている。

つまり、レコードは少なくとも 1 回処理され、結果は 1 回だけ commit されます。

1 回限りの処理により、結果の正確性が保証され、出力に重複するレコードが含まれなくなります。Dataflow は、1 回限りのセマンティクスを維持しながらレイテンシを最小限に抑えるように最適化されています。ただし、1 回限りの処理でも重複排除を行うための費用が発生します。重複レコードを許容できるユースケースでは、多くの場合、1 回以上のモードを有効にすることでコストを削減し、レイテンシを改善できます。1 回限りのストリーミングと 1 回以上のストリーミングの選択について詳しくは、パイプライン ストリーミング モードを設定するをご覧ください。

遅延データ

1 回限りの処理により、パイプラインの精度が保証されます。パイプラインがレコードを処理すると、Dataflow はレコードが出力に反映され、レコードが重複しないようにします。

ただし、ストリーミング パイプラインでは、レコードが遅れて到着する可能性があるため、1 回限りの処理で結果が完全になるとは限りません。たとえば、パイプラインが時間枠に対して集計(Count など)を実行するとします。1 回限りの処理では、時間枠内にタイムリーに到着したレコードの結果は正確ですが、遅延したレコードは破棄される可能性があります。

一般に、ストリーミング パイプラインで完全性を保証する方法はありません。理論的には、レコードが遅れて到着する可能性があるためです。この制限的なケースでは、結果を生成するまで待つ必要があります。具体的には、Apache Beam では、遅延データを破棄するしきい値と集計結果を出力するタイミングを構成できます。詳細については、Apache Beam ドキュメントのウォーターマークと遅延データをご覧ください。

副作用

副作用が 1 回限りのセマンティクスを保証するとは限りません。重要な点は、シンクが 1 回限りのセマンティクスを実装していない限り、外部ストアへの出力の書き込みも含まれることです。

たとえば、Dataflow は、各レコードが各変換を 1 回だけ通過することを保証しません。再試行やワーカーの障害が原因で、Dataflow は変換を介してレコードを複数回送信したり、複数のワーカーで同時に送信することがあります。

1 回限りの処理の一環として、Dataflow は出力の重複を除去します。ただし、変換内のコードに副作用がある場合は、副作用が複数回発生する可能性があります。たとえば、変換がリモート サービスの呼び出しを行う場合、同じレコードに対してその呼び出しが複数回行われる可能性があります。場合によっては、副作用によってデータ損失が生じることもあります。たとえば、ある変換がファイルを読み取って出力を生成し、出力が commit されるまで待たずにファイルをすぐに削除するとします。結果を commit するときにエラーが発生すると、Dataflow は変換を再試行しますが、変換では削除されたファイルを読み取ることができません。

ロギング

処理のログ出力は、処理が発生したことを示しますが、データが commit されたかどうかは不明です。このため、処理されたデータの結果が永続ストレージに一度だけ commit された場合でも、ログファイルにデータが複数回処理されたことが示される場合があります。また、ログの処理や commit されたデータが必ずしも反映されるとは限りません。スロットリングが原因でログがドロップされたり、他のロギング サービスの問題が原因でログが失われることがあります。

1 回限りのストリーミング

このセクションでは、非確定的な処理、遅延データ、カスタムコードなどの複雑な処理を Dataflow が管理する方法など、Dataflow でストリーミング ジョブの exactly-once 処理を実装する方法について説明します。

Dataflow ストリーミング シャッフル

Dataflow ストリーミング ジョブは、各ワーカーに作業範囲を割り当てることで、多くの異なるワーカーで並行して実行されます。ワーカーの障害、自動スケーリング、その他のイベントに応じて割り当てが変更される場合がありますが、各 GroupByKey 変換の後に、同じキーを持つすべてのレコードが同じワーカーで処理されます。GroupByKey 変換は、CountFileIO などの複合変換でよく使用されます。特定のキーのレコードが最終的に同じワーカーに保存されるように、Dataflow ワーカーはリモート プロシージャ コール(RPC)を使用してデータをシャッフルします。

シャッフル中にレコードが失われないようにするため、Dataflow はアップストリーム バックアップを使用します。アップストリーム バックアップでは、レコードを送信するワーカーは、レコードを受信したという確認応答を受け取るまで RPC を再試行します。レコード処理の副作用は、ダウンストリームの永続ストレージに commit されます。レコードを送信するワーカーが使用不能になった場合、Dataflow は引き続き RPC を再試行し、すべてのレコードが少なくとも 1 回配信されるようにします。

この再試行によって重複が発生する可能性があるため、すべてのメッセージに一意の ID がタグ付けされます。各レシーバーは、すでに表示されて処理されたすべての ID をカタログに保存します。レコードを受信すると、Dataflow はカタログで ID を検索します。ID が見つかった場合、レコードはすでに受信されて commit されているため、重複として破棄されます。レコード ID が安定していることを確認するため、ステップからステップへの出力にはすべてストレージへのチェックポイントが設定されます。その結果、RPC 呼び出しが繰り返されて同じメッセージが複数回送信された場合、メッセージはストレージに 1 回だけ commit されます。

低レイテンシの確保

exactly-once 処理を可能にするには I/O を低減する必要があります(特に、すべてのレコードの I/O)。この目標を達成するために、Dataflow はブルーム フィルタとガベージ コレクションを使用します。

ブルーム フィルタ

ブルーム フィルタは、メンバーシップをすばやくチェックできるコンパクトなデータ構造です。Dataflow では、各ワーカーが、検出するすべての ID のブルーム フィルタを保持します。新しいレコード ID が到着すると、ワーカーはフィルタ内でその ID を検索します。フィルタが false を返した場合、このレコードは重複していません。安定したストレージで ID は検索されません。

Dataflow は、時間ごとにバケット化された一連のブルーム フィルタを保持します。レコードが到着すると、Dataflow はシステムのタイムスタンプに基づいて適切なフィルタを選択します。このステップにより、フィルタがガベージ コレクションの対象となるときにブルーム フィルタが飽和状態になるのを防ぐことができます。また、起動時にスキャンする必要があるデータ量が制限されます。

ガベージ コレクション

ストレージにレコード ID が挿入されないように、Dataflow はガベージ コレクションを使用して古いレコードを削除します。Dataflow は、システム タイムスタンプを使用してガベージ コレクション ウォーターマークを計算します。

このウォーターマークは、特定のステージで待機する物理時間に基づいています。したがって、パイプラインのどの部分が低速であるかに関する情報も提供されます。このメタデータは、Dataflow モニタリング インターフェースに表示されるシステムラグ指標の基礎となります。

レコードがウォーターマークより古いタイムスタンプで到着した場合と、この時刻の ID がすでにガベージ コレクションの対象となっている場合、レコードは無視されます。ガベージ コレクションをトリガーする低いウォーターマークは、レコードの配信が確認されるまで進まないため、遅れて到着したレコードは重複します。

非確定的ソース

Dataflow は、Apache Beam SDK を使用してデータをパイプラインに読み込みます。処理に失敗したときに、Dataflow はソースからの読み取りを再試行する場合があります。その場合、Dataflow はソースによって生成されたすべてのレコードが 1 回だけ記録されるようにする必要があります。Pub/Sub Lite や Kafka などの確定的なソースでは、記録されたオフセットに基づいてレコードが読み取られるため、このステップは必要ありません。

Dataflow はレコード ID を自動的に割り当てることができません。非確定的ソースの場合は、重複を避けるためレコード ID を通知する必要があります。ソースが各レコードに一意の ID を提供する場合、コネクタはパイプライン内でシャッフルを使用して重複を削除します。同じ ID を持つレコードは除外されます。Pub/Sub をソースとして使用する場合に Dataflow が exactly-once 処理を実装する方法の例については、「Pub/Sub のストリーミング」ページの効率的な重複排除セクションをご覧ください。

パイプラインの一部としてカスタム DoFn を実行する場合、Dataflow は、このコードがレコードごとに 1 回だけ実行されることを保証しません。ワーカーに障害が発生した場合に少なくとも 1 回は処理されるように、Dataflow は特定のレコードを変換によって複数回実行するか、複数のワーカーで同じレコードを同時に実行します。パイプラインに外部サービスへの接続などを行うコードを含めると、特定のレコードに対してアクションが複数回実行される可能性があります。

非確定的な処理を効果的に確定的な処理にするには、チェックポインティングを使用します。チェックポインティングを使用すると、変換からの各出力は、次のステージに配信される前に、一意の ID を持つ安定したストレージにチェックポイントが設定されます。Dataflow のシャッフル配信の再試行は、チェックポイントされた出力をリレーします。コードが複数回実行される場合でも、Dataflow は、それらの実行のうちの 1 つだけの出力が保存されるようにします。Dataflow は整合性ストアを使用して、安定したストレージへの書き込みが重複しないようにします。

1 回限りの出力の配信

Apache Beam SDK には組み込みシンクが用意されています。これらのシンクは重複が発生しないように設計されています。可能な限り、これらの組み込みシンクを使用してください。

独自のシンクを作成する必要がある場合は、予期しない副作用がなく、必要に応じて再試行できるように、関数オブジェクトをべき等にすることをおすすめします。ただし、シンクの機能を実装する変換の一部のコンポーネントは非確定的であり、再試行すると変更される可能性があります。

たとえば、ウィンドウ集計では、ウィンドウ内のレコードセットは非確定的になります。具体的には、ウィンドウが要素 e0、e1、e2 で起動を試行することがあります。ワーカーは、これらの要素が副作用として送信される前ではなく、ウィンドウ処理の commit 前にクラッシュする可能性があります。ワーカーが再起動すると、ウィンドウが再び起動し、遅延要素 e3 が到着します。この要素は、ウィンドウが commit される前に到着するため、遅延データとしてカウントされません。そのため、要素 e0、e1、e2、e3 で DoFn が再度呼び出されます。これらの要素は、副作用オペレーションに送信されます。このシナリオではべき等性に効果はありません。これは、異なる論理レコードセットが毎回送信されるためです。

Dataflow で非確定的に対処するには、組み込みの Reshuffle 変換を使用します。Dataflow がデータをシャッフルすると、Dataflow はデータを永続的に書き込みます。シャッフルの発生後にオペレーションが再試行されると、非確定的に生成された要素が安定します。Reshuffle 変換を使用すると、DoFn の出力の 1 つのバージョンだけがシャッフル境界を越えることが保証されます。次のパターンでは、副作用オペレーションが常に、出力のために確定的なレコードを受け取ることが保証されます。

c.apply(Window.<..>into(FixedWindows.of(Duration.standardMinutes(1))))
 .apply(GroupByKey.<..>.create())
 .apply(new PrepareOutputData())
 .apply(Reshuffle.<..>of())
 .apply(WriteToSideEffect());

Dataflow ランナーに、DoFn を実行する前に要素が安定している必要があることを認識させるには、RequiresStableInput アノテーションを DoFn に追加します。

詳細