このドキュメントでは、pull サブスクリプションの概要、ワークフロー、関連するプロパティについて説明します。
pull サブスクリプションでは、サブスクライバー クライアントが Pub/Sub サーバーにメッセージをリクエストします。
pull モードでは、Pull または StreamingPull のいずれかのサービス API を使用できます。選択した API を実行するには、Google が提供する高レベル クライアント ライブラリか、低レベルの自動生成クライアント ライブラリを選択します。非同期または同期のメッセージ処理を選択することもできます。
準備
このドキュメントを読む前に、次の内容をよく理解しておいてください。
Pub/Sub の仕組みとさまざまな Pub/Sub の用語。
Pub/Sub がサポートするさまざまなサブスクリプションの種類と、pull サブスクリプションを使用する理由。
pull サブスクリプションのワークフロー
pull サブスクリプションの場合、サブスクライバー クライアントは Pub/Sub サーバーに対してメッセージ取得のリクエストを開始します。サブスクライバー クライアントは次のいずれかの API を使用します。
ほとんどのサブスクライバー クライアントは直接リクエストを行いません。代わりに、クライアントは Google Cloud が提供する高レベル クライアント ライブラリを使用して、ストリーミング pull リクエストを内部で実行し、メッセージを非同期で配信します。メッセージの pull 方法を細かく制御する必要があるサブスクライバー クライアントの場合、Pub/Sub は低レベルの自動生成された gRPC ライブラリを使用します。このライブラリは、pull リクエストまたはストリーミング pull リクエストを直接作成します。これらのリクエストは同期または非同期のいずれかです。
次の 2 つの画像は、サブスクライバー クライアントと pull サブスクリプションの間のワークフローを示しています。
pull ワークフロー
pull ワークフローは次のとおりです。図 1 を参照:
- サブスクライバー クライアントは
pull
メソッドを明示的に呼び出し、配信のメッセージをリクエストします。このリクエストは、画像に示すようにPullRequest
です。 Pub/Sub サーバーは、0 個以上のメッセージと確認応答 ID で応答します。レスポンスにメッセージがない場合やエラーがある場合、受信できるメッセージがないとは限りません。このレスポンスは、画像の
PullResponse
です。サブスクライバー クライアントは明示的に
acknowledge
メソッドを呼び出します。クライアントは、返された確認応答 ID を使用して、メッセージが処理されたことと、再度配信される必要がないことを確認します。
単一ストリーミング pull リクエストの場合は、オープン接続によりサブスクライバー クライアントが複数のレスポンスを返すことがあります。これに対して、pull リクエストごとに 1 つのレスポンスのみが返されます。
pull サブスクリプションのプロパティ
pull サブスクリプションに構成するプロパティによって、サブスクリプションにメッセージを書き込む方法が決まります。詳細については、サブスクリプションのプロパティをご覧ください。
Pub/Sub サービス API
Pub/Sub の pull サブスクリプションでは、次の 2 つの API のいずれかを使用してメッセージを取得できます。
- pull
- StreamingPull
これらの API を使用してメッセージを受信するときは、単項 Acknowledge RPC と ModifyAckDeadline RPC を使用します。2 つの Pub/Sub API については、次のタブで説明します。
StreamingPull API
Pub/Sub クライアント ライブラリでは、スループットを最大にしてレイテンシを最小にするために、StreamingPull を使用します(使用可能な場合)。StreamingPull API を直接使用することがなくても、Pull API との違いを理解することは重要です。
StreamingPull API は、複数のメッセージを受信するために、次の永続的な双方向接続に依存します。ワークフローは次のとおりです。
クライアントが接続を確立するためにサーバーへのリクエストを送信します。 接続割り当てを超過すると、サーバーはリソース不足エラーを返します。クライアント ライブラリは、割り当て不足のエラーを自動的に再試行します。
エラーがない場合、または接続割り当てが再度使用可能になった場合、サーバーは接続されたクライアントに引き続きメッセージを送信します。
スループットの割り当てを超過した場合、サーバーはメッセージの送信を停止します。ただし、接続は切断されません。十分なスループットの割り当てが再び利用可能になると、ストリームが再開されます。
最終的に、クライアントまたはサーバーが接続を終了します。
StreamingPull API は、オープン接続を維持します。Pub/Sub サーバーは、長時間実行されるスティッキー接続を回避するために、一定期間が経過した後も接続を繰り返し閉じます。クライアント ライブラリは StreamingPull 接続を自動的に再開します。
メッセージが利用可能になると、その接続に送信されます。したがって、StreamingPull API はレイテンシを最小限に抑え、メッセージのスループットを最大化します。
StreamingPull REST メソッドの詳細については、StreamingPullRequest および StreamingPullResponse をご覧ください。
StreamingPull RPC メソッドの詳細については、StreamingPullRequest および StreamingPullResponse をご覧ください。
Pull API
この API は、リクエスト / レスポンス モデルに基づく従来の単項 RPC です。1 つの pull レスポンスが 1 つの pull リクエストに対応します。ワークフローは次のとおりです。
クライアントはサーバーにメッセージのリクエストを送信します。 スループットの割り当てを超えると、サーバーはリソース不足エラーを返します。
エラーがない場合、またはスループットの割り当てが再び利用可能になった場合、サーバーは 0 個以上のメッセージと確認応答 ID を返します。
単項 Pull API を使用する場合、メッセージがないレスポンスまたはエラーがあるレスポンスは、受信可能なメッセージが存在しないことを示すとは限りません。
Pull API を使用しても、メッセージの低レイテンシと高スループットは保証されません。Pull API で高スループットと低レイテンシを実現するには、未処理のリクエストが同時に複数存在する必要があります。古いリクエストがレスポンスを受信すると、新しいリクエストが作成されます。このようなソリューションの設計はエラーが発生しやすく、保守が困難です。このようなユースケースでは StreamingPull API を使用することをおすすめします。
次のものを厳密に制御する必要がある場合にのみ、StreamingPull API の代わりに Pull API を使用します。
- サブスクライバー クライアントが処理できるメッセージ数
- クライアントのメモリとリソース
サブスクライバーが Pub/Sub とより pull 指向で動作する別のサービスの間のプロキシである場合にも、この API を使用できます。
Pull REST メソッドの詳細については、メソッド: projects.subscriptions.pull をご覧ください。
Pull RPC メソッドの詳細については、PullRequest と PullResponse をご覧ください。
メッセージ処理モードの種類
サブスクライバー クライアントに対して、次のいずれかの pull モードを選択します。
非同期 pull モード
非同期 pull モードでは、メッセージの受信とサブスクライバー クライアントでのメッセージ処理が切り離されます。このモードは、ほとんどのサブスクライバー クライアントでデフォルトです。非同期 pull モードでは、StreamingPull API または単項 Pull API を使用できます。非同期 pull では、高レベルのクライアント ライブラリまたは低レベルの自動生成クライアント ライブラリも使用できます。
クライアント ライブラリについては、このドキュメントで後ほど詳しく説明します。
同期 pull モード
同期 pull モードでは、メッセージの受信と処理が順番に行われ、互いに分離されていません。そのため、非同期処理では StreamingPull と単項 pull API と同様に、同期処理よりもレイテンシが短く、スループットが大きくなります。
同期 pull モードは、他の要件と比較して低レイテンシと高スループットが最も重要な要素ではないアプリケーションにのみ使用します。たとえば、アプリケーションが同期プログラミング モデルのみを使用するように制限されている場合があります。また、リソースの制約があるアプリケーションでは、メモリ、ネットワーク、CPU をより正確に制御する必要がある場合があります。このような場合は、単項 Pull API と同期モードを使用します。
Pub/Sub クライアント ライブラリ
Pub/Sub では、高レベルと低レベルの自動生成クライアント ライブラリが提供されます。
高レベルの Pub/Sub クライアント ライブラリ
高レベルのクライアント ライブラリには、リース管理を使用して確認応答期限を制御するオプションが用意されています。これらのオプションは、コンソールや CLI を使用してサブスクリプション レベルで確認応答期限を構成するよりも細かく設定できます。高レベル クライアント ライブラリには、順序指定配信、1 回限りの配信、フロー制御などの機能のサポートも実装されています。
非同期 pull と StreamingPull API を、高レベルのクライアント ライブラリで使用することをおすすめします。Google Cloud でサポートされているすべての言語が、高レベル クライアント ライブラリの Pull API もサポートしているわけではありません。
高レベル クライアント ライブラリを使用するには、Pub/Sub クライアント ライブラリをご覧ください。
低レベルの自動生成 Pub/Sub クライアント ライブラリ
低レベルのクライアント ライブラリは、pull API を直接使用しなければならない場合に使用できます。低レベルの自動生成クライアント ライブラリでは、同期処理または非同期処理を使用できます。低レベルの自動生成クライアント ライブラリを使用する場合は、順序指定配信、1 回限りの配信、フロー制御、リース管理などの機能を手動でコーディングする必要があります。
サポートされているすべての言語で、低レベルの自動生成クライアント ライブラリを使用する場合は、同期処理モデルを使用できます。Pull API を直接使用することが適切な場合は、低レベルの自動生成クライアント ライブラリと同期 pull を使用できます。たとえば、このモデルに依存する既存のアプリケーション ロジックがある可能性があります。
低レベルの自動生成クライアント ライブラリを直接使用するには、Pub/Sub API の概要をご覧ください。
クライアント ライブラリのコードサンプル
StreamingPull と高レベルのクライアント ライブラリ コードサンプル
C++
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の C++ の設定手順を実施してください。詳細については、Pub/Sub C++ API のリファレンス ドキュメントをご覧ください。
C#
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の C# の設定手順を実施してください。詳細については、Pub/Sub C# API のリファレンス ドキュメントをご覧ください。
Go
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Go の設定手順を実施してください。詳細については、Pub/Sub Go API のリファレンス ドキュメントをご覧ください。
Java
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Java の設定手順を実施してください。詳細については、Pub/Sub Java API のリファレンス ドキュメントをご覧ください。
Node.js
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Node.js の設定手順を実施してください。詳細については、Pub/Sub Node.js API のリファレンス ドキュメントをご覧ください。
Node.js
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Node.js の設定手順を実施してください。詳細については、Pub/Sub Node.js API のリファレンス ドキュメントをご覧ください。
Python
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Python の設定手順を実施してください。詳細については、Pub/Sub Python API のリファレンス ドキュメントをご覧ください。
Ruby
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Ruby の設定手順を実施してください。詳細については、Pub/Sub Ruby API のリファレンス ドキュメントをご覧ください。
高レベル クライアント ライブラリを使用してカスタム属性を取得する
次のサンプルは、メッセージを非同期で pull し、メタデータからカスタム属性を取得する方法を示しています。
C++
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の C++ の設定手順を実施してください。詳細については、Pub/Sub C++ API のリファレンス ドキュメントをご覧ください。
C#
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の C# の設定手順を実施してください。詳細については、Pub/Sub C# API のリファレンス ドキュメントをご覧ください。
Go
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Go の設定手順を実施してください。詳細については、Pub/Sub Go API のリファレンス ドキュメントをご覧ください。
Java
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Java の設定手順を実施してください。詳細については、Pub/Sub Java API のリファレンス ドキュメントをご覧ください。
Node.js
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Node.js の設定手順を実施してください。詳細については、Pub/Sub Node.js API のリファレンス ドキュメントをご覧ください。
Python
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Python の設定手順を実施してください。詳細については、Pub/Sub Python API のリファレンス ドキュメントをご覧ください。
Ruby
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Ruby の設定手順を実施してください。詳細については、Pub/Sub Ruby API のリファレンス ドキュメントをご覧ください。
高レベル クライアント ライブラリを使用してエラーを処理する
次のサンプルは、メッセージの登録中に発生するエラーを処理する方法を示しています。
C++
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の C++ の設定手順を実施してください。詳細については、Pub/Sub C++ API のリファレンス ドキュメントをご覧ください。
Go
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Go の設定手順を実施してください。詳細については、Pub/Sub Go API のリファレンス ドキュメントをご覧ください。
Java
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Java の設定手順を実施してください。詳細については、Pub/Sub Java API のリファレンス ドキュメントをご覧ください。
Node.js
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Node.js の設定手順を実施してください。詳細については、Pub/Sub Node.js API のリファレンス ドキュメントをご覧ください。
Python
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Python の設定手順を実施してください。詳細については、Pub/Sub Python API のリファレンス ドキュメントをご覧ください。
Ruby
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Go の設定手順を実施してください。詳細については、Pub/Sub Go API のリファレンス ドキュメントをご覧ください。
単項 pull のコードサンプル
次に、固定数のメッセージを pull して確認応答するサンプルコードを記載します。
C++
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の C++ の設定手順を実施してください。詳細については、Pub/Sub C++ API のリファレンス ドキュメントをご覧ください。
C++
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の C++ の設定手順を実施してください。詳細については、Pub/Sub C++ API のリファレンス ドキュメントをご覧ください。
C#
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の C# の設定手順を実施してください。詳細については、Pub/Sub C# API のリファレンス ドキュメントをご覧ください。
Java
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Java の設定手順を実施してください。詳細については、Pub/Sub Java API のリファレンス ドキュメントをご覧ください。
Node.js
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Node.js の設定手順を実施してください。詳細については、Pub/Sub Node.js API のリファレンス ドキュメントをご覧ください。
PHP
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Node.js の設定手順を実施してください。詳細については、Pub/Sub Node.js API のリファレンス ドキュメントをご覧ください。
Ruby
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Ruby の設定手順を実施してください。詳細については、Pub/Sub Ruby API のリファレンス ドキュメントをご覧ください。
プロトコル
リクエスト:
POST https://pubsub.googleapis.com/v1/projects/myproject/subscriptions/mysubscription:pull
{
"returnImmediately": "false",
"maxMessages": "1"
}
レスポンス:
200 OK
{
"receivedMessages": [{
"ackId": "dQNNHlAbEGEIBERNK0EPKVgUWQYyODM2LwgRHFEZDDsLRk1SK...",
"message": {
"data": "SGVsbG8gQ2xvdWQgUHViL1N1YiEgSGVyZSBpcyBteSBtZXNzYWdlIQ==",
"messageId": "19917247034"
}
}]
}
リクエスト:
POST https://pubsub.googleapis.com/v1/projects/myproject/subscriptions/mysubscription:acknowledge
{
"ackIds": [
"dQNNHlAbEGEIBERNK0EPKVgUWQYyODM2LwgRHFEZDDsLRk1SK..."
]
}
Python
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Python の設定手順を実施してください。詳細については、Pub/Sub Python API のリファレンス ドキュメントをご覧ください。
Pub/Sub によりメッセージの一覧が配信されます。 一覧に複数のメッセージがある場合、Pub/Sub は同じ順序指定キーを使用してメッセージを順序づけます。重要な注意点は次のとおりです。
リクエストに
max_messages
の値を設定しても、バックログに多数のメッセージがあっても、max_messages
が返されるとは限りません。Pub/Sub Pull API は、配信可能なメッセージの配信レイテンシを短縮するために、max_messages
より少ないメッセージを返すことがあります。メッセージが 0 である pull レスポンスを、バックログにメッセージがないことを示す指標として使用しないでください。メッセージが 0 のレスポンスを受け取って、その後のリクエストでメッセージを返すこともできます。
単項 pull モードでメッセージ配信のレイテンシを短くするには、未処理の pull リクエストが同時に多数存在していることが不可欠です。トピックのスループットが増加するにつれ、より多くの pull リクエストが必要になります。一般的に、StreamingPull モードはレイテンシの影響を受けやすいアプリケーションに適しています。
割り当てと上限
pull 接続と StreamingPull 接続はどちらも割り当てと上限の対象です。詳細については、Pub/Sub の割り当てと上限をご覧ください。
次のステップ
トピックに pull サブスクリプションを作成します。
gcloud CLI を使用してサブスクリプションを作成または変更する。
REST API を使用してサブスクリプションを作成または変更する。
REST API を使用してサブスクリプションを作成または変更する。