Pub/Sub は push と pull の両方のメッセージ配信をサポートします。pull サブスクリプションと push サブスクリプションの概要と比較については、サブスクライバーの概要をご覧ください。このドキュメントでは、pull 配信について説明します。push 配信の説明については、push サブスクライバー ガイドをご覧ください。
非同期 pull
非同期 pull を使用すると、アプリケーションで新しいメッセージをブロックする必要がなくなるため、アプリケーションのスループットが向上します。メッセージは、アプリケーション内で長時間実行されているメッセージ リスナーを使って受信できます。以下の例で示すとおり、同時に確認応答されるメッセージは 1 回に 1 つだけです。Java、Python、.NET、Go、Ruby クライアントでは、streamingPull サービス API を使用して、非同期クライアント API を効率的に実装します。
クライアント ライブラリによっては、非同期のメッセージ pull に対応していない場合があります。同期 pull によるメッセージの取得については、同期 pull をご覧ください。
詳細については、ご使用のプログラミング言語の API リファレンス ドキュメントをご覧ください。
C++
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の C++ の設定手順を実施してください。詳細については、Pub/Sub C++ API のリファレンス ドキュメントをご覧ください。
C#
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の C# の設定手順を実施してください。詳細については、Pub/Sub C# API のリファレンス ドキュメントをご覧ください。
Go
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Go の設定手順を実施してください。詳細については、Pub/Sub Go API のリファレンス ドキュメントをご覧ください。
Java
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Java の設定手順を実施してください。詳細については、Pub/Sub Java API のリファレンス ドキュメントをご覧ください。
Node.js
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Node.js の設定手順を実施してください。詳細については、Pub/Sub Node.js API のリファレンス ドキュメントをご覧ください。
Python
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Python の設定手順を実施してください。詳細については、Pub/Sub Python API のリファレンス ドキュメントをご覧ください。
Ruby
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Ruby の設定手順を実施してください。詳細については、Pub/Sub Ruby API のリファレンス ドキュメントをご覧ください。
カスタム属性の処理
このサンプルでは、非同期でメッセージを pull し、メタデータからカスタム属性を取得する方法を示します。
C++
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の C++ の設定手順を実施してください。詳細については、Pub/Sub C++ API のリファレンス ドキュメントをご覧ください。
C#
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の C# の設定手順を実施してください。詳細については、Pub/Sub C# API のリファレンス ドキュメントをご覧ください。
Go
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Go の設定手順を実施してください。詳細については、Pub/Sub Go API のリファレンス ドキュメントをご覧ください。
Java
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Java の設定手順を実施してください。詳細については、Pub/Sub Java API のリファレンス ドキュメントをご覧ください。
Node.js
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Node.js の設定手順を実施してください。詳細については、Pub/Sub Node.js API のリファレンス ドキュメントをご覧ください。
Python
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Python の設定手順を実施してください。詳細については、Pub/Sub Python API のリファレンス ドキュメントをご覧ください。
Ruby
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Ruby の設定手順を実施してください。詳細については、Pub/Sub Ruby API のリファレンス ドキュメントをご覧ください。
エラーのリッスン
次のサンプルでは、メッセージの登録中に発生するエラーを処理します。
C++
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の C++ の設定手順を実施してください。詳細については、Pub/Sub C++ API のリファレンス ドキュメントをご覧ください。
Go
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Go の設定手順を実施してください。詳細については、Pub/Sub Go API のリファレンス ドキュメントをご覧ください。
Java
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Go の設定手順を実施してください。詳細については、Pub/Sub Go API のリファレンス ドキュメントをご覧ください。
Node.js
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Node.js の設定手順を実施してください。詳細については、Pub/Sub Node.js API のリファレンス ドキュメントをご覧ください。
Python
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Python の設定手順を実施してください。詳細については、Pub/Sub Python API のリファレンス ドキュメントをご覧ください。
Ruby
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Go の設定手順を実施してください。詳細については、Pub/Sub Go API のリファレンス ドキュメントをご覧ください。
メッセージのフロー制御
Pub/Sub がクライアントにメッセージを送信するよりも、サブスクライバー クライアントがメッセージを処理して確認応答するほうが遅くなる場合があります。この場合、次の手順に従います。
受信メッセージの量がクライアントの処理能力を超えたときに、ネットワーク上の別のクライアントにその処理能力がある場合、メッセージのバックログが発生する可能性があります。2 番目のクライアントによってサブスクリプションのバックログが少なくなりますが、最初のクライアントが受信メッセージのリースを保持しているため、この処理を行う機会がありません。このため、最初のクライアントにメッセージが溜まり、全体の処理速度が低下します。
クライアント ライブラリはバックログにあるメッセージに対する確認応答期限を繰り返し延長するため、こうしたメッセージによってメモリ、CPU、帯域幅のリソースが消費され続けます。このため、サブスクライバー クライアントのリソース(メモリなど)が不足する可能性があります。メッセージ処理のスループットが低下し、レイテンシが長くなる可能性があります。
この問題を軽減するには、サブスクライバーのフロー制御機能を使用して、サブスクライバーがメッセージを受信する速度を制御します。以下に、フロー制御機能のサンプルを示します。
C++
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の C++ の設定手順を実施してください。詳細については、Pub/Sub C++ API のリファレンス ドキュメントをご覧ください。
C#
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の C# の設定手順を実施してください。詳細については、Pub/Sub C# API のリファレンス ドキュメントをご覧ください。
Go
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Go の設定手順を実施してください。詳細については、Pub/Sub Go API のリファレンス ドキュメントをご覧ください。
Java
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Java の設定手順を実施してください。詳細については、Pub/Sub Java API のリファレンス ドキュメントをご覧ください。
Node.js
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Node.js の設定手順を実施してください。詳細については、Pub/Sub Node.js API のリファレンス ドキュメントをご覧ください。
Python
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Python の設定手順を実施してください。詳細については、Pub/Sub Python API のリファレンス ドキュメントをご覧ください。
Ruby
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Ruby の設定手順を実施してください。詳細については、Pub/Sub Ruby API のリファレンス ドキュメントをご覧ください。
通常、フロー制御が必要な場合、メッセージがパブリッシュされる速度が、使用される速度を超えています。メッセージ量の急増が一時的なものではなく、永続的な状態である場合は、サブスクライバー クライアントのインスタンス数を増やすことを検討してください。
同時実行制御
同時実行のサポートは、プログラミング言語によって異なります。Java や Go など、並列スレッドに対応している言語実装の場合、クライアント ライブラリはデフォルトのスレッド数を選択します。しかし、この値がアプリケーションに最適ではない場合もあります。たとえば、サブスクライバー アプリケーションがメッセージの受信量に対応できず、CPU にバインドされていない場合は、スレッド数を増やす必要があります。CPU 使用率の高いメッセージ処理のオペレーションでは、スレッド数を減らすと、問題が解決する場合があります。
次のサンプルでは、サブスクライバーで同時実行制御を行います。
C++
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の C++ の設定手順を実施してください。詳細については、Pub/Sub C++ API のリファレンス ドキュメントをご覧ください。
Go
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Go の設定手順を実施してください。詳細については、Pub/Sub Go API のリファレンス ドキュメントをご覧ください。
Java
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Java の設定手順を実施してください。詳細については、Pub/Sub Java API のリファレンス ドキュメントをご覧ください。
Ruby
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Ruby の設定手順を実施してください。詳細については、Pub/Sub Ruby API のリファレンス ドキュメントをご覧ください。
同時実行のサポートは、プログラミング言語によって異なります。詳細については、API リファレンス ドキュメントをご覧ください。
StreamingPull
Pub/Sub サービスにはメッセージを取得するための 2 つの API があります。
Cloud Client ライブラリでは、使用可能な場合に、スループットを最大にし、レイテンシを最小にするために、StreamingPull を使用します。StreamingPull API を直接使用することはない場合でも、StreamingPull のいくつかの重要なプロパティ、従来の Pull メソッドとの違いを理解することは重要です。
pull メソッドは次のリクエスト / レスポンス モデルに依存しています。
- クライアントはサーバーにメッセージのリクエストを送信します。
- サーバーが 0 または 1 つ以上のメッセージとともに応答し、接続を終了します。
StreamingPull サービス API は、複数のメッセージを受信するために、次の永続的な双方向接続に依存します。
- クライアントが接続を確立するためにサーバーへのリクエストを送信します。
- サーバーは接続されたクライアントに継続的にメッセージを送信します。
- 接続は、最終的にクライアントまたはサーバーによって終了します。
サブスクライバーにコールバックを指定すると、サブスクライバーが各メッセージのコールバックを非同期で実行します。サブスクライバーが同じ順序指定キーを持つメッセージを受信した場合、クライアント ライブラリは順次にコールバックを実行します。Pub/Sub サービスはこれらのメッセージを同じサブスクライバーにベストエフォート ベースで配信します。
StreamingPull のエラー率は 100%(予測された結果)
StreamingPull ストリームは常に OK 以外のステータスで終了します。通常の RPC とは異なり、ここでのステータスはリクエストが失敗したことではなく、単にストリームが破損したことを示しています。そのため、StreamingPull API のエラー率が 100% になることがありますが、これは設計によるものです。
StreamingPull エラーの診断
StreamingPull ストリームは常にエラーで終了するため、エラーを診断するためにストリーム終了指標ではなく、StreamingPull メッセージ オペレーション指標を重視します(subscription/streaming_pull_message_operation_count
)。次のエラーを探します。
FAILED_PRECONDITION
エラーは、次の状況で発生する場合があります。- Pub/Sub が、無効化された Cloud KMS 鍵を使用してメッセージの復号を試みます。
- サブスクリプション バックログに無効な Cloud KMS 鍵で暗号化されたメッセージがある場合、サブスクリプションが一時的に停止されることがあります。
UNAVAILABLE
エラー
StreamingPull: 小規模なメッセージの大規模なバックログの処理
gRPC StreamingPull スタックは高スループットが得られるように最適化されているため、メッセージをバッファリングします。(新しいメッセージの安定したストリームではなく)小規模なメッセージの大規模なバックログを処理しようとすると、なんらかの影響が出る可能性があります。このような状況下では、メッセージが何度も配信され、クライアント間で負荷が効率的に分散されない可能性があります。
Pub/Sub サービスとクライアント ライブラリ ユーザー スペースの間のバッファ容量は約 10 MB です。クライアント ライブラリの動作に対するこのバッファの影響を理解するために、次の例を見てみましょう。
- サブスクリプションに 10,000 件の 1 KB メッセージのバックログがあります。
- シングル スレッド クライアント インスタンスによって各メッセージを順次処理するために 1 秒ずつかかります。
- そのサブスクリプションのためのサービスへの StreamingPull 接続を確立する最初のクライアントインスタンスのバッファが、10,000 件のメッセージで満たされます。
- このバッファを処理するには、10,000 秒(約 3 時間)かかります。
- その間、バッファ内にある一部のメッセージが確認応答期限を超え、同じクライアントに再送信され、結果として重複が生じます。
- 複数のクライアント インスタンスが実行されている場合、1 つのクライアントのバッファに滞留しているメッセージは、どのクライアント インスタンスでも使用できません。
メッセージが単一の大規模なバッチとしてではなく、一定のレートで到着する場合には、この状況は発生しません。このサービスでは 10 MB のメッセージ全体が一度に処理されることはないため、複数のサブスクライバー間でメッセージの負荷を効率的に分散できます。
この状況を解決するには、現在、一部の Cloud クライアント ライブラリ(「同期 pull」セクションを参照)とすべての API クライアント ライブラリで利用できる push サブスクリプションまたは Pull API のいずれかを使用します。詳細については、クライアント ライブラリのドキュメントをご覧ください。
同期 pull
非同期 pull が、アプリケーションにとって最適ではない場合もあります。その例として、アプリケーション ロジックがメッセージを取得する際にポーリング パターンを使用する場合や、クライアントによって取得された複数のメッセージに対して正確な上限の設定が常に必要な場合などが挙げられます。このようなアプリケーションをサポートするため、サービスで同期 pull メソッドがサポートされています。
次に、固定数のメッセージを pull して確認応答するサンプルコードを記載します。
C#
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の C# の設定手順を実施してください。詳細については、Pub/Sub C# API のリファレンス ドキュメントをご覧ください。
Go
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Go の設定手順を実施してください。詳細については、Pub/Sub Go API のリファレンス ドキュメントをご覧ください。
Java
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Java の設定手順を実施してください。詳細については、Pub/Sub Java API のリファレンス ドキュメントをご覧ください。
Node.js
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Node.js の設定手順を実施してください。詳細については、Pub/Sub Node.js API のリファレンス ドキュメントをご覧ください。
PHP
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Node.js の設定手順を実施してください。詳細については、Pub/Sub Node.js API のリファレンス ドキュメントをご覧ください。
プロトコル
リクエスト:
POST https://pubsub.googleapis.com/v1/projects/myproject/subscriptions/mysubscription:pull
{
"returnImmediately": "false",
"maxMessages": "1"
}
レスポンス:
200 OK
{
"receivedMessages": [{
"ackId": "dQNNHlAbEGEIBERNK0EPKVgUWQYyODM2LwgRHFEZDDsLRk1SK...",
"message": {
"data": "SGVsbG8gQ2xvdWQgUHViL1N1YiEgSGVyZSBpcyBteSBtZXNzYWdlIQ==",
"messageId": "19917247034"
}
}]
}
リクエスト:
POST https://pubsub.googleapis.com/v1/projects/myproject/subscriptions/mysubscription:acknowledge
{
"ackIds": [
"dQNNHlAbEGEIBERNK0EPKVgUWQYyODM2LwgRHFEZDDsLRk1SK..."
]
}
Python
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Python の設定手順を実施してください。詳細については、Pub/Sub Python API のリファレンス ドキュメントをご覧ください。
Ruby
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Ruby の設定手順を実施してください。詳細については、Pub/Sub Ruby API のリファレンス ドキュメントをご覧ください。
Pub/Sub によりメッセージの一覧が配信されます。一覧に複数のメッセージがある場合、Pub/Sub は同じ順序指定キーを使用してメッセージを順序づけます。
同期 pull でのメッセージ配信のレイテンシを短くするには、未処理の pull リクエストが多く存在している必要があります。トピックのスループットが増加するにつれ、より多くの pull リクエストが必要になります。一般に、非同期 pull はレイテンシの影響を受けやすいアプリケーションに適しています。
同期 pull とリース管理
個別のメッセージの処理が事前構成された確認応答期限(リース)を超える場合があります。クライアント ライブラリでは、このようなメッセージが再配信されないように、確認応答期限を再設定できます(ただし、Go クライアント ライブラリではポーリングされたメッセージの確認応答期限が自動的に変更されます)。以下にサンプルを示します。
C#
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の C# の設定手順を実施してください。詳細については、Pub/Sub C# API のリファレンス ドキュメントをご覧ください。
Java
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Java の設定手順を実施してください。詳細については、Pub/Sub Java API のリファレンス ドキュメントをご覧ください。
Node.js
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Node.js の設定手順を実施してください。詳細については、Pub/Sub Node.js API のリファレンス ドキュメントをご覧ください。
Python
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Python の設定手順を実施してください。詳細については、Pub/Sub Python API のリファレンス ドキュメントをご覧ください。
Ruby
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Ruby の設定手順を実施してください。詳細については、Pub/Sub Ruby API のリファレンス ドキュメントをご覧ください。
スケーリング
メッセージ量の変化に対応するため、サブスクライバー アプリケーションにスケーリング メカニズムの実装が必要になる場合があります。これを行う方法は環境によって異なりますが、通常はGoogle Cloud のオペレーション スイート モニタリング サービスによって提供されるバックログ指標に基づきます。Compute Engine に対してこれを行う方法について詳しくは、Cloud Monitoring 指標に基づくスケーリングをご覧ください。
GCP 指標リストページの Pub/Sub セクションに移動して、プログラマティックにモニタリングできる指標を確認してください。
最後に、すべての分散型サービスと同様に、すべてのリクエストが再試行される場合があることを想定してください。
重複の処理と再試行の強制
確認応答期限が切れる前にメッセージの確認応答を行わないと、Pub/Sub によってメッセージが再送信されます。その結果、Pub/Sub によって重複するメッセージが送信されることがあります。Google Cloud のオペレーション スイートを使用して、expired
レスポンス コードで確認応答オペレーションをモニタリングし、この状態を検出します。このデータを取得するには、確認応答メッセージ オペレーション指標を選択し、response_code
ラベルでグループ化するかフィルタします。なお、response_code
は指標のシステムラベルであり、指標ではありません。

メッセージの期限を延長すると、重複率を低下させることができます。
- 期限の延長はクライアント ライブラリによって自動的に処理されますが、構成可能な延長期限の最大値にはデフォルトの上限があります。
- 独自のクライアント ライブラリを構築する場合は、
modifyAckDeadline
メソッドを使用して、確認応答期限を延長します。
あるいは、Pub/Sub にメッセージの再試行を強制するには、modifyAckDeadline
を 0 に設定します。