このページでは、Dataflow と Pub/Sub の統合の概要について説明します。また、Dataflow ランナーで Pub/Sub I/O コネクタを実装する際に利用できる最適化についても説明します。Pub/Sub は、スケーラブルで耐久性のあるイベントの取り込みおよび配信システムです。Dataflow は、メッセージの重複排除、1 回限りの処理、タイムスタンプ付きイベントからのデータ ウォーターマークの生成により、Pub/Sub のスケーラブルな「最低 1 回」配信モデルを補完します。Dataflow を使用するには、Apache Beam SDK でパイプラインを記述し、Dataflow サービスでパイプライン コードを実行します。
開始する前に、Apache Beam とストリーミング パイプラインの基本的なコンセプトをご確認ください。詳細については、次のリソースをご覧ください。
- PCollection、トリガー、ウィンドウ、ウォーターマークなどの Apache Beam のコンセプトの概要
- ラムダ以降: Dataflow における「1 回限り」の処理のパート 1 とパート 3: ソースとシンク
- ストリーミング: バッチを超えた世界 101、102
- Apache Beam プログラミング ガイド
Pub/Sub を使用したストリーミング パイプラインの構築
Dataflow と Pub/Sub の統合の利点を活用するには、次のいずれかの方法でストリーミング パイプラインを構築します。
ストリーミング単語抽出(Java)やストリーミング単語数カウント(Python)など、Apache Beam GitHub リポジトリにある既存のストリーミング パイプラインのサンプルコードを使用します。
Apache Beam API リファレンス(Java または Python)を使用して、新しいパイプラインを記述します。
Google が提供する Dataflow テンプレートと、それに対応する Java のテンプレート ソースコードを使用します。
Google は、UI ベースで Pub/Sub ストリーム処理パイプラインを開始するための Dataflow テンプレートを提供しています。Java を使用する場合は、こうしたテンプレートのソースコードを使用して、カスタム パイプラインを作成することもできます。
次のストリーミング テンプレートは、Pub/Sub データを別の宛先にエクスポートします。
- Pub/Sub Subscription to BigQuery
- Pub/Sub to Pub/Sub relay
- Pub/Sub to Cloud Storage Avro
- Pub/Sub to Cloud Storage Text
- Storage Text to Pub/Sub (Stream)
次のバッチ テンプレートは、Pub/Sub トピックにデータのストリームをインポートします。
Pub/Sub のクイックスタート: Dataflow によるストリーム処理に従って、単純なパイプラインを実行します。
Pub/Sub と Dataflow のインテグレーション機能
Apache Beam は、Pub/Sub 向けのリファレンス I/O ソース実装(PubsubIO
)を提供しています(Java、Python)。これは、Apache Spark ランナー、Apache Flink ランナー、Direct Runner など、Dataflow 以外のランナーによって使用されます。
Dataflow ランナーは PubsubIO
の別のプライベート実装を使用します。この実装では、Google Cloud 内部の API とサービスを利用しています。これにより、低レイテンシのウォーターマーク、高精度のウォーターマーク(つまりデータ完全性)、効率的な重複排除という主要な 3 つの利点が得られます。
メッセージが最初の融合ステージで正常に処理されると(およびその処理の副作用が永続ストレージに書き込まれると)、Dataflow ランナーの PubsubIO
の実装により、そのメッセージに自動的に確認応答が行われます。詳細については、融合のドキュメントをご覧ください。したがって、一部のコンポーネントがクラッシュした場合や接続が失われた場合で、Dataflow によってデータの損失がないことが保証される場合にのみ、メッセージが確認応答されます。
低レイテンシのウォーターマーク
Dataflow は Pub/Sub のプライベート API にアクセスできます。これにより、Cloud Monitoring で使用可能なレイテンシよりも低いレイテンシで、サブスクリプション内の最も古い未承認メッセージの経過時間を得ることができます。比較のため、Cloud Monitoring で使用可能な Pub/Sub バックログ指標は通常 2~3 分の遅延が発生しますが、Dataflow の指標は 10 秒程度しか遅れません。これにより、Dataflow はパイプライン ウォーターマークを先に進めて、ウィンドウ処理された計算結果をより早く出力できます。
高精度のウォーターマーク
イベント時刻で定義されたウィンドウには強力なウォーターマークが必要になるという問題がありますが、この問題も Dataflow と Pub/Sub の統合によってネイティブに解決されます。イベント時間は、Pub/Sub サービス自身によってメッセージに設定された publish_time
フィールドではなく、パブリッシャー アプリケーションによって Pub/Sub メッセージの属性として指定されたタイムスタンプです。Pub/Sub では、サービスによって割り当てられた(処理時刻の)タイムスタンプに関してのみバックログ統計が計算されるため、イベント時刻のウォーターマークを推測するには別のメカニズムが必要です。
この問題を解決するため、ユーザーがカスタムのイベント タイムスタンプを使用する場合、Dataflow サービスは第 2 のトラッキング サブスクリプションを作成します。このトラッキング サブスクリプションを使用して、ベースとなるサブスクリプションのバックログ内にあるメッセージのイベント時刻を検査し、イベント時刻のバックログを推測します。詳しくは、StackOverflow の Dataflow で Pub/Sub ウォーターマークを計算する方法に関するページをご覧ください。
効率的な重複排除
「1 回限り」のメッセージ処理を行うには、メッセージの重複排除が必要になります。Dataflow は、Pub/Sub メッセージ ID に基づいてメッセージの重複排除を行います。その結果、すべての処理ロジックにおいて、Pub/Sub メッセージ ID に基づいてメッセージが一意になっていると想定できます。これを達成するための効率的な増分集約メカニズムは、PubsubIO
API の中で抽象化されています。
重複排除にメッセージ ID ではなく Pub/Sub メッセージ属性を使用するように PubsubIO
が構成されている場合、Dataflow は Pub/Sub に公開されたメッセージ間の重複を 10 分以内に排除します。
サポートされていない Pub/Sub 機能
デッドレター トピックと指数バックオフ遅延の再試行ポリシー
Pub/Sub のデッドレター トピックと指数バックオフ遅延の再試行ポリシーは、Dataflow では完全にサポートされていません。代わりに、これらのパターンをパイプライン内で明示的に実装してください。デッドレター パターンの例として、小売アプリケーションと Pub/Sub to BigQuery テンプレートをご覧ください。
デッドレター トピックと指数バックオフ遅延の再試行ポリシーが Dataflow で機能しない理由は 2 つあります。
まず、Dataflow は、パイプライン コードに失敗しても、Pub/Sub に NACK メッセージを送信しません(つまり、否定確認応答を送信しません)。代わりに、Dataflow はメッセージの処理を無期限に再試行し、メッセージの確認応答期限を延長します。ただし、Dataflow バックエンドはさまざまな内部理由でメッセージに NACK で応答するため、パイプライン コードに障害がない場合でも、デッドレター トピックにメッセージが配信される可能性があります。
次に、Dataflow は、パイプラインがデータを完全に処理する前にメッセージを確認します。具体的には、メッセージが最初の融合ステージで正常に処理されると(かつ、その処理の副作用が永続ストレージに書き込まれると)、メッセージに確認応答が行われます。パイプラインに複数の融合ステージがあり、最初のステージの後の任意の時点でエラーが発生した場合、メッセージはすでに確認応答されています。
同期 pull からストリーミング pull への今後の移行(Streaming Engine のみ)
現在、Streaming Engine は Pub/Sub からデータを使用するために同期 pull を使用していますが、パフォーマンス向上のため、2022 年 2 月からストリーミング pull に移行します。
移行期間中、ジョブで同期 pull が一定の期間使用され、その後、ストリーミング pull が使用される可能性があります。これは、Dataflow UI に表示され、Cloud Monitoring に報告される Pub/Sub 指標に影響します。ジョブがストリーミング pull に切り替わると、既存の指標の一部は報告されません。詳細については、Dataflow モニタリング インターフェースの使用のページをご覧ください。
同期 pull とストリーミング pull では別々の割り当てが使用されます。Dataflow チームでは、同期 pull を介してすでに大量のデータを使用しているプロジェクトの割り当てを積極的に増やしていく予定です。
Streaming Engine を使用しないストリーミング ジョブは、ストリーミング pull に移行されず、この変更の影響を受けません。
移行についてご不明な点がございましたら、担当のアカウント チームにお問い合わせください。