このドキュメントでは、プル サブスクリプション、そのワークフロー、関連するプロパティの概要について説明します。
pull サブスクリプションでは、サブスクライバー クライアントが Pub/Sub サーバーからメッセージをリクエストします。
pull モードでは、Pull または StreamingPull のいずれかのサービス API を使用できます。選択した API を実行するには、Google が提供する高レベル クライアント ライブラリか、低レベルの自動生成クライアント ライブラリを選択します。非同期または同期のメッセージ処理を選択することもできます。
準備
このドキュメントを読む前に、次の内容をよく理解しておいてください。
Pub/Sub の仕組みとさまざまな Pub/Sub の用語。
Pub/Sub でサポートされているさまざまなサブスクリプションの種類と、プル サブスクリプションを使用する理由。
pull サブスクリプションのワークフロー
pull サブスクリプションの場合、サブスクライバー クライアントが Pub/Sub サーバーに対してリクエストを開始し、メッセージを取得します。サブスクライバー クライアントは、次のいずれかの API を使用します。
ほとんどのサブスクライバー クライアントは、これらのリクエストを直接作成しません。代わりに、クライアントは Google Cloud が提供する高レベルのクライアント ライブラリに依存します。このライブラリは、内部でストリーミング pull リクエストを実行し、メッセージを非同期で配信します。メッセージのプル方法を細かく制御する必要があるサブスクライバー クライアントの場合、Pub/Sub は低レベルの自動生成された gRPC ライブラリを使用します。このライブラリは、pull リクエストまたはストリーミング pull リクエストを直接行います。これらのリクエストは、同期または非同期のいずれかです。
次の 2 つの画像は、サブスクライバー クライアントと pull サブスクリプション間のワークフローを示しています。
pull ワークフロー
pull ワークフローは次のとおりです(図 1 を参照)。
- サブスクライバー クライアントは、配信用のメッセージをリクエストする
pull
メソッドを明示的に呼び出します。このリクエストは、画像に示すPullRequest
です。 Pub/Sub サーバーは、0 個以上のメッセージと確認応答 ID で応答します。メッセージがないレスポンスまたはエラーがあるレスポンスは、受信可能なメッセージが存在しないことを示すとは限りません。このレスポンスは、画像に示されているように
PullResponse
です。サブスクライバー クライアントは、
acknowledge
メソッドを明示的に呼び出します。クライアントは、返された確認応答 ID を使用して、メッセージが処理され、再配信する必要がないことを確認します。
単一のストリーミング pull リクエストの場合、サブスクライバー クライアントは、オープン接続により複数のレスポンスを返すことができます。一方、pull リクエストごとに返されるレスポンスは 1 つだけです。
pull サブスクリプションのプロパティ
pull サブスクリプション用に構成するプロパティによって、サブスクリプションにメッセージを書き込む方法が決まります。詳細については、サブスクリプション プロパティをご覧ください。
Pub/Sub サービス API
Pub/Sub の pull サブスクリプションでは、次の 2 つの API のいずれかを使用してメッセージを取得できます。
- pull
- StreamingPull
これらの API を使用してメッセージを受信する場合は、単一の Acknowledge と ModifyAckDeadline RPC を使用します。次のタブでは、この 2 つの Pub/Sub API について説明します。
StreamingPull API
Pub/Sub クライアント ライブラリでは、スループットを最大にしてレイテンシを最小にするために、StreamingPull を使用します(使用可能な場合)。StreamingPull API を直接使用することがなくても、Pull API との違いを理解することは重要です。
StreamingPull API は、複数のメッセージを受信するために、次の永続的な双方向接続に依存します。ワークフローは次のとおりです。
クライアントが接続を確立するためにサーバーへのリクエストを送信します。 接続割り当てを超過すると、サーバーはリソース不足エラーを返します。クライアント ライブラリは、割り当て不足のエラーを自動的に再試行します。
エラーがない場合、または接続割り当てが再度使用可能になった場合、サーバーは接続されたクライアントに引き続きメッセージを送信します。
スループットの割り当てを超過した場合、サーバーはメッセージの送信を停止します。ただし、接続は切断されません。十分なスループット割り当てが再び利用可能になると、ストリームが再開されます。
クライアントまたはサーバーが最終的に接続を閉じます。
StreamingPull API は接続をオープン状態に保ちます。Pub/Sub サーバーは、長時間実行される固定接続を回避するために、一定期間後に接続を閉じます。クライアント ライブラリは StreamingPull 接続を自動的に再開します。
メッセージが利用可能になると、その接続に送信されます。したがって、StreamingPull API はレイテンシを最小限に抑え、メッセージのスループットを最大化します。
StreamingPull REST メソッドの詳細については、StreamingPullRequest および StreamingPullResponse をご覧ください。
StreamingPull RPC メソッドの詳細については、StreamingPullRequest および StreamingPullResponse をご覧ください。
Pull API
この API は、リクエストとレスポンスのモデルに基づく従来の単一 RPC です。1 つの pull レスポンスは 1 つの pull リクエストに対応します。ワークフローは次のとおりです。
クライアントはサーバーにメッセージのリクエストを送信します。 スループットの割り当てを超えると、サーバーはリソース不足エラーを返します。
エラーがない場合、またはスループットの割り当てが再び使用可能である場合、サーバーは 0 個以上のメッセージと確認応答 ID を返します。
単項 Pull API を使用する場合、メッセージがないレスポンスまたはエラーがあるレスポンスは、受信可能なメッセージが存在しないことを示すとは限りません。
Pull API を使用しても、メッセージの低レイテンシと高スループットは保証されません。Pull API で高スループットと低レイテンシを実現するには、未処理のリクエストが同時に複数存在する必要があります。古いリクエストがレスポンスを受け取ると、新しいリクエストが作成されます このようなソリューションを設計すると、エラーが発生しやすくなり、維持が困難になります。このようなユースケースには、StreamingPull API の使用をおすすめします。
次のものを厳密に制御する必要がある場合にのみ、StreamingPull API の代わりに Pull API を使用します。
- サブスクライバー クライアントが処理できるメッセージの数
- クライアントのメモリとリソース
サブスクライバーが Pub/Sub とより pull 指向で動作する別のサービスの間のプロキシである場合にも、この API を使用できます。
Pull REST メソッドの詳細については、メソッド: projects.subscriptions.pull をご覧ください。
Pull RPC メソッドの詳細については、PullRequest と PullResponse をご覧ください。
メッセージ処理モードの種類
サブスクライバー クライアントに対して、次のいずれかの pull モードを選択します。
非同期 pull モード
非同期 pull モードでは、メッセージの受信とサブスクライバー クライアントでのメッセージ処理が切り離されます。このモードは、ほとんどのサブスクライバー クライアントでデフォルトです。非同期 pull モードでは、StreamingPull API または単項 Pull API を使用できます。非同期 pull では、高レベルのクライアント ライブラリまたは低レベルの自動生成クライアント ライブラリも使用できます。
クライアント ライブラリについては、このドキュメントで後ほど詳しく説明します。
同期 pull モード
同期 pull モードでは、メッセージの受信と処理が順番に行われ、互いに分離されていません。そのため、非同期処理では StreamingPull と単項 pull API と同様に、同期処理よりもレイテンシが短く、スループットが大きくなります。
同期 pull モードは、他の要件と比較して低レイテンシと高スループットが最も重要な要素ではないアプリケーションにのみ使用します。たとえば、アプリケーションが同期プログラミング モデルのみを使用するように制限されている場合があります。また、リソースの制約があるアプリケーションでは、メモリ、ネットワーク、CPU をより正確に制御する必要がある場合があります。このような場合は、単一 Pull API を備えた同期モードを使用します。
Pub/Sub クライアント ライブラリ
Pub/Sub では、高レベルと低レベルの自動生成クライアント ライブラリが提供されます。
高レベルの Pub/Sub クライアント ライブラリ
高レベル クライアント ライブラリには、リース管理を使用して確認応答の期限を制御するオプションが用意されています。これらのオプションは、コンソールや CLI を使用してサブスクリプション レベルで確認応答期限を構成するよりも細かく設定できます。高レベル クライアント ライブラリには、順序指定配信、1 回限りの配信、フロー制御などの機能のサポートも実装されています。
非同期 pull と StreamingPull API を、高レベルのクライアント ライブラリで使用することをおすすめします。Google Cloud でサポートされているすべての言語が、高レベル クライアント ライブラリの Pull API もサポートしているわけではありません。
高レベル クライアント ライブラリを使用するには、Pub/Sub クライアント ライブラリをご覧ください。
低レベルの自動生成 Pub/Sub クライアント ライブラリ
低レベル クライアント ライブラリは、Pull API を直接使用しなければならない場合に使用できます。低レベルの自動生成クライアント ライブラリで同期処理または非同期処理を使用できます。低レベルの自動生成クライアント ライブラリを使用する場合は、順序指定配信、1 回限りの配信、フロー制御、リース管理などの機能を手動でコーディングする必要があります。
サポートされているすべての言語で、低レベルの自動生成クライアント ライブラリを使用する場合は、同期処理モデルを使用できます。Pull API を直接使用することが賢明な場合は、低レベルの自動生成クライアント ライブラリと同期 pull を使用できます。たとえば、このモデルに依存する既存のアプリケーション ロジックがある可能性があります。
低レベル 自動生成クライアント ライブラリを直接使用するには、Pub/Sub API の概要をご覧ください。
クライアント ライブラリのコードサンプル
StreamingPull と高レベル クライアント ライブラリのコードサンプル
C++
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の C++ の設定手順を実施してください。詳細については、Pub/Sub C++ API リファレンス ドキュメントをご覧ください。
C#
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の C# の設定手順を実施してください。詳細については、Pub/Sub C# API リファレンス ドキュメントをご覧ください。
Go
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Go の設定手順を実施してください。詳細については、Pub/Sub Go API リファレンス ドキュメントをご覧ください。
Java
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Java の設定手順を実施してください。詳細については、Pub/Sub Java API リファレンス ドキュメントをご覧ください。
Node.js
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Node.js の設定手順を実施してください。詳細については、Pub/Sub Node.js API リファレンス ドキュメントをご覧ください。
Node.js
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Node.js の設定手順を実施してください。詳細については、Pub/Sub Node.js API のリファレンス ドキュメントをご覧ください。
Python
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Python の設定手順を実施してください。詳細については、Pub/Sub Python API リファレンス ドキュメントをご覧ください。
Ruby
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Ruby の設定手順を実施してください。詳細については、Pub/Sub Ruby API のリファレンス ドキュメントをご覧ください。
高レベル クライアント ライブラリを使用してカスタム属性を取得する
次のサンプルは、メッセージを非同期で pull し、メタデータからカスタム属性を取得する方法を示しています。
C++
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の C++ の設定手順を実施してください。詳細については、Pub/Sub C++ API リファレンス ドキュメントをご覧ください。
C#
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の C# の設定手順を実施してください。詳細については、Pub/Sub C# API リファレンス ドキュメントをご覧ください。
Go
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Go の設定手順を実施してください。詳細については、Pub/Sub Go API リファレンス ドキュメントをご覧ください。
Java
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Java の設定手順を実施してください。詳細については、Pub/Sub Java API のリファレンス ドキュメントをご覧ください。
Node.js
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Node.js の設定手順を実施してください。詳細については、Pub/Sub Node.js API のリファレンス ドキュメントをご覧ください。
Python
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Python の設定手順を実施してください。詳細については、Pub/Sub Python API リファレンス ドキュメントをご覧ください。
Ruby
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Ruby の設定手順を実施してください。詳細については、Pub/Sub Ruby API のリファレンス ドキュメントをご覧ください。
高レベル クライアント ライブラリを使用してエラーを処理する
次のサンプルは、メッセージの登録中に発生するエラーを処理する方法を示しています。
C++
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の C++ の設定手順を実施してください。詳細については、Pub/Sub C++ API のリファレンス ドキュメントをご覧ください。
Go
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Go の設定手順を実施してください。詳細については、Pub/Sub Go API リファレンス ドキュメントをご覧ください。
Java
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Java の設定手順を実施してください。詳細については、Pub/Sub Java API のリファレンス ドキュメントをご覧ください。
Node.js
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Node.js の設定手順を実施してください。詳細については、Pub/Sub Node.js API のリファレンス ドキュメントをご覧ください。
Python
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Python の設定手順を実施してください。詳細については、Pub/Sub Python API リファレンス ドキュメントをご覧ください。
Ruby
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Go の設定手順を実施してください。詳細については、Pub/Sub Go API のリファレンス ドキュメントをご覧ください。
単一 pull のコードサンプル
以下は、固定数のメッセージを pull して確認応答するサンプルコードです。
C++
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の C++ の設定手順を実施してください。詳細については、Pub/Sub C++ API リファレンス ドキュメントをご覧ください。
C#
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の C# の設定手順を実施してください。詳細については、Pub/Sub C# API のリファレンス ドキュメントをご覧ください。
Java
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Java の設定手順を実施してください。詳細については、Pub/Sub Java API のリファレンス ドキュメントをご覧ください。
Node.js
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Node.js の設定手順を実施してください。詳細については、Pub/Sub Node.js API リファレンス ドキュメントをご覧ください。
PHP
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Node.js の設定手順を実施してください。詳細については、Pub/Sub Node.js API のリファレンス ドキュメントをご覧ください。
Ruby
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Ruby の設定手順を実施してください。詳細については、Pub/Sub Ruby API のリファレンス ドキュメントをご覧ください。
プロトコル
リクエスト:
POST https://pubsub.googleapis.com/v1/projects/myproject/subscriptions/mysubscription:pull
{
"returnImmediately": "false",
"maxMessages": "1"
}
レスポンス:
200 OK
{
"receivedMessages": [{
"ackId": "dQNNHlAbEGEIBERNK0EPKVgUWQYyODM2LwgRHFEZDDsLRk1SK...",
"message": {
"data": "SGVsbG8gQ2xvdWQgUHViL1N1YiEgSGVyZSBpcyBteSBtZXNzYWdlIQ==",
"messageId": "19917247034"
}
}]
}
リクエスト:
POST https://pubsub.googleapis.com/v1/projects/myproject/subscriptions/mysubscription:acknowledge
{
"ackIds": [
"dQNNHlAbEGEIBERNK0EPKVgUWQYyODM2LwgRHFEZDDsLRk1SK..."
]
}
Python
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Python の設定手順を実施してください。詳細については、Pub/Sub Python API のリファレンス ドキュメントをご覧ください。
Pub/Sub によりメッセージの一覧が配信されます。 一覧に複数のメッセージがある場合、Pub/Sub は同じ順序指定キーを使用してメッセージを順序づけます。重要な注意点は次のとおりです。
リクエストに
max_messages
の値を設定しても、バックログに多数のメッセージがあっても、max_messages
が返されるとは限りません。Pub/Sub Pull API は、配信可能なメッセージの配信レイテンシを短縮するために、max_messages
より少ないメッセージを返すことがあります。メッセージが 0 である pull レスポンスを、バックログにメッセージがないことを示す指標として使用しないでください。メッセージが 0 のレスポンスを受け取って、それに続くメッセージを返すリクエストを行うこともできます。
単項 pull モードでメッセージ配信のレイテンシを短くするには、未処理の pull リクエストが同時に多数存在していることが不可欠です。トピックのスループットが増加するにつれ、より多くの pull リクエストが必要になります。一般的に、StreamingPull モードはレイテンシの影響を受けやすいアプリケーションに適しています。
割り当てと上限
pull 接続と StreamingPull 接続はどちらも割り当てと上限の対象です。詳細については、Pub/Sub の割り当てと上限をご覧ください。
次のステップ
トピックに pull サブスクリプションを作成します。
gcloud CLI を使用してサブスクリプションを作成または変更する。
REST API を使用してサブスクリプションを作成または変更する。
REST API を使用してサブスクリプションを作成または変更する。