Cloud Pub/Sub は push と pull の両方のメッセージ配信をサポートします。pull サブスクリプションと push サブスクリプションの概要と比較については、サブスクライバーの概要をご覧ください。このドキュメントでは、pull 配信について説明します。push 配信の説明については、push サブスクライバー ガイドをご覧ください。
非同期 pull
非同期 pull を使用すると、アプリケーションのスループットを向上させることができます。アプリケーションで新しいメッセージをブロックする必要はありません。メッセージは、アプリケーション内で長時間実行されているメッセージ リスナーを使って受信できます。以下の例で示すとおり、同時に確認応答されるメッセージは 1 回に 1 つだけです。
クライアント ライブラリによっては、非同期のメッセージ pull に対応していない場合があります。同期 pull によるメッセージの取得については、同期 pull をご覧ください。
詳細については、ご使用のプログラミング言語の API リファレンス ドキュメントをご覧ください。
C#
このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の C# の設定手順に従ってください。詳細については、Cloud Pub/Sub C# API のリファレンス ドキュメントをご覧ください。
Go
このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Go の設定手順に従ってください。詳細については、Cloud Pub/Sub Go API のリファレンス ドキュメントをご覧ください。
Java
このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Java の設定手順に従ってください。詳細については、Cloud Pub/Sub Java API のリファレンス ドキュメントをご覧ください。
Node.js
このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Node.js の設定手順に従ってください。詳細については、Cloud Pub/Sub Node.js API のリファレンス ドキュメントをご覧ください。
Python
このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Python の設定手順に従ってください。詳細については、Cloud Pub/Sub Python API のリファレンス ドキュメントをご覧ください。
カスタム属性の処理
このサンプルでは、非同期でメッセージを pull し、メタデータからカスタム属性を取得する方法を示します。
Python
このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Python の設定手順に従ってください。詳細については、Cloud Pub/Sub Python API のリファレンス ドキュメントをご覧ください。
Ruby
このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Ruby の設定手順に従ってください。詳細については、Cloud Pub/Sub Ruby API のリファレンス ドキュメントをご覧ください。
エラーのリッスン
次のサンプルでは、メッセージの登録中に発生するエラーを処理します。
Go
このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Go の設定手順に従ってください。詳細については、Cloud Pub/Sub Go API のリファレンス ドキュメントをご覧ください。
Java
このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Java の設定手順に従ってください。詳細については、Cloud Pub/Sub Java API のリファレンス ドキュメントをご覧ください。
Node.js
このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Node.js の設定手順に従ってください。詳細については、Cloud Pub/Sub Node.js API のリファレンス ドキュメントをご覧ください。
Python
このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Python の設定手順に従ってください。詳細については、Cloud Pub/Sub Python API のリファレンス ドキュメントをご覧ください。
Ruby
このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Ruby の設定手順に従ってください。詳細については、Cloud Pub/Sub Ruby API のリファレンス ドキュメントをご覧ください。
メッセージのフロー制御
サブスクライバー クライアントのメッセージの処理と確認が遅く、Cloud Pub/Sub がクライアントにメッセージを送信できない場合があります。その場合、次の状況になります。
- 受信メッセージの量がクライアントの処理能力を超えたときに、ネットワーク上の別のクライアントにその処理能力がある場合、メッセージのバックログが発生する可能性があります。2 番目のクライアントで全体のバックログを減らすことも可能ですが、最初のクライアントが 2 番目のクライアントにメッセージをすぐに送信できないため、バックログが減ることはありません。このため、最初のクライアントにメッセージが溜まり、全体の処理速度が低下します。
- クライアント ライブラリは処理が遅れているメッセージに対する確認応答期限を繰り返し延長するため、こうしたメッセージによってメモリ、CPU、帯域幅のリソースが消費され続けます。そのため、メモリ不足の問題が発生する可能性があります。
この状況でスループットを最大にし、レイテンシを最小にするには、サブスクライバーのフロー制御機能を使用して、サブスクライバーのメッセージの取得速度を制御します。以下に、フロー制御機能のサンプルを示します。
C#
このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の C# の設定手順に従ってください。詳細については、Cloud Pub/Sub C# API のリファレンス ドキュメントをご覧ください。
Go
このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Go の設定手順に従ってください。詳細については、Cloud Pub/Sub Go API のリファレンス ドキュメントをご覧ください。
Java
このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Java の設定手順に従ってください。詳細については、Cloud Pub/Sub Java API のリファレンス ドキュメントをご覧ください。
Node.js
このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Node.js の設定手順に従ってください。詳細については、Cloud Pub/Sub Node.js API のリファレンス ドキュメントをご覧ください。
Python
このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Python の設定手順に従ってください。詳細については、Cloud Pub/Sub Python API のリファレンス ドキュメントをご覧ください。
Ruby
このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Ruby の設定手順に従ってください。詳細については、Cloud Pub/Sub Ruby API のリファレンス ドキュメントをご覧ください。
同時実行制御
同時実行のサポートは、プログラミング言語によって異なります。Java や Go など、並列スレッドに対応している言語の場合、クライアント ライブラリはデフォルトのスレッド数を選択します。しかし、この値がアプリケーションに最適ではない場合もあります。たとえば、サブスクライバー アプリケーションがメッセージの受信量に対応できず、CPU にバインドされていない場合は、スレッド数を増やす必要があります。CPU 使用率の高いメッセージ処理では、スレッド数を減らすと、問題が解決する場合があります。
次のサンプルでは、サブスクライバーで同時実行制御を行います。
Java
このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Java の設定手順に従ってください。詳細については、Cloud Pub/Sub Java API のリファレンス ドキュメントをご覧ください。
Ruby
このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Ruby の設定手順に従ってください。詳細については、Cloud Pub/Sub Ruby API のリファレンス ドキュメントをご覧ください。
同時実行のサポートは、プログラミング言語によって異なります。詳細については、API リファレンス ドキュメントをご覧ください。
同期 pull
新しいメッセージを常にポーリングする必要がある場合、非同期 pull(プログラミング言語で選択可能な場合)のほうがスループットの改善に役立つ可能性があります。同期 pull は、パブリッシュ後すぐにメッセージを処理する必要のないワークロードに適しています。同期サブスクライバーは接続を長時間維持する必要がないため、固定数のメッセージを pull して処理するように設定することもできます。現在処理するメッセージがない場合、サブスクライバーはタイムアウトします。
同期 pull でのレイテンシを短くするには、returnImmediately
が false に設定された未処理の pull リクエストが多く存在している必要があります。トピックのスループットが増加するにつれ、より多くの pull リクエストが必要になります。同時に存在する未処理のリクエスト数は 10〜100 が適切です。ただし、レイテンシを最小限に抑えるには、非同期 pull の使用をおすすめします。
クライアント ライブラリによっては、固定数のメッセージの同期 pull に対応していないものもあります。詳細については、ご利用のプログラミング言語の API リファレンス ドキュメントをご覧ください。次に、固定数のメッセージを pull して確認応答するサンプルコードを記載します。
プロトコル
リクエスト:
POST https://pubsub.googleapis.com/v1/projects/myproject/subscriptions/mysubscription:pull
{ "returnImmediately": "false", "maxMessages": "1" }
レスポンス:
200 OK
{ "receivedMessages": [ { "ackId": "dQNNHlAbEGEIBERNK0EPKVgUWQYyODM2LwgRHFEZDDsLRk1SK...", "message": { "data": "SGVsbG8gQ2xvdWQgUHViL1N1YiEgSGVyZSBpcyBteSBtZXNzYWdlIQ==", "messageId": "19917247034" } } ] }
リクエスト:
POST https://pubsub.googleapis.com/v1/projects/myproject/subscriptions/mysubscription:acknowledge
{ "ackIds": [ "dQNNHlAbEGEIBERNK0EPKVgUWQYyODM2LwgRHFEZDDsLRk1SK..." ] }
レスポンス:
200 OK
C#
このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の C# の設定手順に従ってください。詳細については、Cloud Pub/Sub C# API のリファレンス ドキュメントをご覧ください。
Java
このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Java の設定手順に従ってください。詳細については、Cloud Pub/Sub Java API のリファレンス ドキュメントをご覧ください。
Node.js
このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Node.js の設定手順に従ってください。詳細については、Cloud Pub/Sub Node.js API のリファレンス ドキュメントをご覧ください。
PHP
このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の PHP の設定手順に従ってください。詳細については、Cloud Pub/Sub PHP API のリファレンス ドキュメントをご覧ください。
Python
このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Python の設定手順に従ってください。詳細については、Cloud Pub/Sub Python API のリファレンス ドキュメントをご覧ください。
Ruby
このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Ruby の設定手順に従ってください。詳細については、Cloud Pub/Sub Ruby API のリファレンス ドキュメントをご覧ください。
スケーリング
メッセージ量の変化に対応するため、サブスクライバー アプリケーションにスケーリング メカニズムの実装が必要になる場合があります。その方法は環境によって異なりますが、一般には Stackdriver Monitoring サービスを通じて提供されたバックログ指標に基づきます。Compute Engine に対してこれを行う方法について詳しくは、Cloud Monitoring 指標に基づくスケーリングをご覧ください。
Cloud Monitoring のサポートされている指標ページで「PubSub」を探し、プログラムを使用してモニタリングできる指標を確認してください。
最後に、すべての分散型サービスと同様に、すべてのリクエストが再試行される場合があることを想定してください。
StreamingPull
Cloud Pub/Sub サービスにはメッセージを取得するための 2 つの API があります。
Cloud Client ライブラリでは、使用可能な場合に、スループットを最大にし、レイテンシを最小にするために、双方向ストリーミング RPC である StreamingPull を使用します。StreamingPull API を直接使用することはない場合でも、StreamingPull のいくつかの重要なプロパティ、従来の pull メソッドとの違いを理解することは重要です。
pull メソッドは次のリクエスト / レスポンス モデルに依存しています。
- アプリケーションがメッセージのリクエストを送信します。
- サーバーが 0 または 1 つ以上のメッセージとともに応答し、接続を終了します。
StreamingPull サービス API は確認応答を送信し、確認応答期限を変更する際に、複数のメッセージを受信するために、次の永続的な双方向接続に依存します。
- クライアントが接続を確立するためにサービスへのリクエストを送信します。
- クライアントがメッセージ データを交換するためにこの接続を使用します。
- リクエスト(つまり、双方向接続)はクライアントまたはサーバーによって終了します。
StreamingPull のエラー率は 100%(予測された結果)
StreamingPull ストリームは常に再試行可能なエラーコードで終了します。通常の RPC とは異なり、ここでのエラーはリクエストが失敗したことではなく、単にストリームが破損したことを示しています。そのため、StreamingPull API のエラー率が驚異的な 100% になることがありますが、これは設計によるものです。StreamingPull エラーを診断するために、StreamingPull リクエスト指標ではなく、StreamingPull メッセージ オペレーション指標を確認することをおすすめします。
小規模なメッセージの大規模なバックログの処理
gRPC StreamingPull スタックは高スループットのために最適化されているため、メッセージをバッファリングします。(新しいメッセージの安定したストリームではなく)小規模なメッセージの大規模なバックログを処理しようとすると、なんらかの影響が出る可能性があります。このような状況下では、メッセージが何度も配信され、クライアント間で負荷が効率的に分散されない可能性があります。
Cloud Pub/Sub サービスとクライアント ライブラリ ユーザー スペースの間のバッファ容量は約 10 MB です。クライアント ライブラリの動作に対するこのバッファの影響を理解するために、次の例を見てみましょう。
- サブスクリプションに 10,000 件の 1 KB メッセージのバックログがあります。
- シングル スレッド クライアント インスタンスによって各メッセージを順次処理するために 1 秒ずつかかります。
- そのサブスクリプションのためのサービスへの StreamingPull 接続を確立する最初のクライアント インスタンスは、10,000 件のメッセージ全体のバッファを受け取ります。
- このバッファを処理するには、10,000 秒(約 3 時間)かかります。
- その間、一部のメッセージが確認応答期限を超え、同じクライアントに再送信され、結果として重複が生じます。
- 複数のクライアント インスタンスが実行されている場合、バッファ内のメッセージ スタックを、最初のインスタンス以外のインスタンスでは利用できません。
メッセージが単一の大規模なバッチとしてではなく、一定のレートで到着する場合には、この状況は発生しません。このサービスでは 10 MB のメッセージ全体が一度に処理されることはないため、複数のサブスクライバー間でメッセージの負荷を効率的に分散できます。
この状況を解決するには、現在、一部の Cloud クライアント ライブラリ(「同期 pull」セクションを参照)とすべての API クライアント ライブラリで利用できる push サブスクリプションまたは pull API のいずれかを使用します。詳細については、クライアント ライブラリのドキュメントをご覧ください。