このページでは、Lite サブスクリプションからメッセージを受信する方法について説明します。メッセージは、Java 用の Pub/Sub Lite クライアント ライブラリを使用して受信できます。
Lite サブスクリプションでは、Lite トピックがサブスクライバー アプリケーションに接続され、サブスクライバーは、Lite サブスクリプションからメッセージを受信します。サブスクライバーは、パブリッシャー アプリケーションが Lite トピックに送信するメッセージをすべて受信します。これには Lite サブスクリプションを作成する前にパブリッシャーが送信したメッセージも含まれます。
Lite サブスクリプションからメッセージを受信する前に、Lite トピックを作成し、その Lite トピックに Lite サブスクリプションを作成して、メッセージをパブリッシュします。
メッセージの受信
Lite サブスクリプションからメッセージを受信するには、Lite サブスクリプションからのメッセージをリクエストします。クライアント ライブラリは、Lite サブスクリプションに関連付けられた Lite トピックのパーティションに自動的に接続します。 複数のサブスクライバー クライアントがインスタンス化されると、メッセージはすべてのクライアントに分散されます。トピック内のパーティションの数によって、サブスクリプションに同時接続できるサブスクライバー クライアントの最大数が決まります。
サブスクライバーが初期化してメッセージの受信を開始するまでに最大で 1 分程度かかることがあります。初期化後は、最小限のレイテンシでメッセージを受信します。
次の例は、Lite サブスクリプションからメッセージを受信する方法を示しています。
gcloud
このコマンドを使用するには、Python 3.6 以降と grpcio Python パッケージのインストールが必要です。MacOS、Linux、Cloud Shell のユーザーの場合、以下を実行します。
sudo pip3 install grpcio
export CLOUDSDK_PYTHON_SITEPACKAGES=1
メッセージを受信するには、gcloud pubsub lite-subscriptions subscribe コマンドを使用します。
gcloud pubsub lite-subscriptions subscribe SUBSCRIPTION_ID \
--location=LITE_LOCATION \
--auto-ack
以下を置き換えます。
- SUBSCRIPTION_ID: Lite サブスクリプションの ID
- LITE_LOCATION: Lite サブスクリプションのロケーション
Go
このサンプルを実行する前に、Pub/Sub Lite クライアント ライブラリの Go の設定手順を実施してください。
Java
このサンプルを実行する前に、Pub/Sub Lite クライアント ライブラリの Java の設定手順を実施してください。
Python
このサンプルを実行する前に、Pub/Sub Lite クライアント ライブラリの Python の設定手順を実施してください。
クライアント ライブラリは、Lite トピックの各パーティションへの双方向ストリーミング接続を確立します。
サブスクライバーはパーティションへの接続をリクエストします。
Pub/Sub Lite サービスは、サブスクライバーにメッセージを配信します。
サブスクライバーはメッセージを処理した後、メッセージに対する確認応答を行う必要があります。クライアント ライブラリは、コールバック内で非同期にメッセージを処理して確認応答を行います。サブスクライバーがメモリに格納できる確認応答されていないメッセージの数を制限するには、フロー制御の設定を構成します。
複数のサブスクライバーが同じ Lite サブスクリプションからメッセージを受信した場合、Pub/Sub Lite サービスは、各サブスクライバーを同じ比率でパーティションに接続します。たとえば、2 つのサブスクライバーが同じ Lite サブスクリプションを使用し、2 つのパーティションを持つ Lite トピックに接続した場合、各サブスクライバーがいずれかのパーティションからメッセージを受信します。
メッセージの確認応答
メッセージを確認応答するには、Lite サブスクリプションに確認応答を送信します。
Go
確認応答を送信するには、Message.Ack()
メソッドを使用します。
Java
確認応答を送信するには、AckReplyConsumer.ack()
メソッドを使用します。
Python
確認応答を送信するには、Message.ack()
メソッドを使用します。
サブスクライバーは、各メッセージの確認応答を行う必要があります。サブスクライバーは、最も古い未確認のメッセージを最初に受け取り、その後に後続の各メッセージを受け取ります。サブスクライバーが 1 つのメッセージをスキップして後続のメッセージを確認応答してから再接続すると、サブスクライバーは未確認メッセージと、それぞれの後続の確認応答メッセージを受信します。
Lite サブスクリプションには確認応答期限がなく、Pub/Sub Lite サービスは未確認のメッセージをオープン ストリーミング接続で再配信しません。
フロー制御の使用
Pub/Sub Lite サービスがサブスクライバーにメッセージを配信した後、サブスクライバーは確認応答されていないメッセージをメモリに保存します。フロー制御設定を使用して、サブスクライバーがメモリに格納できる未処理のメッセージの数を制限できます。フロー制御設定は、サブスクライバーがメッセージを受信する各パーティションに適用されます。
次のフロー制御設定を構成できます。
- 未処理のメッセージ サイズ。未処理のメッセージの最大サイズ(バイト単位)。この最大サイズは、いちばん大きなメッセージのサイズより大きい必要があります。
- メッセージの数。未処理のメッセージの最大数です。
メッセージのサイズは size_bytes
フィールドに設定されてます。フロー制御設定はクライアント ライブラリで構成できます。
Go
フロー制御設定を構成するには、pscompat.NewSubscriberClientWithSettings
を呼び出すときに ReceiveSettings
を渡します。ReceiveSettings
では、次のパラメータを設定できます。
MaxOutstandingMessages
MaxOutstandingBytes
例については、このフロー制御サンプルをご覧ください。
Java
フロー制御設定を構成するには、FlowControlRequest.Builder
クラスの次のメソッドを使用します。
Python
フロー制御設定を構成するには、FlowControlSettings
クラスに次のパラメータを設定します。
bytes_outstanding
messages_outstanding
たとえば、メッセージの最大数が 100 件で、サブスクライバーが 10 個のパーティションに接続する場合、サブスクライバーは 10 個のパーティションから 100 件を超えるメッセージを受信できません。未処理のメッセージの合計数が 100 件を超えることがありますが、サブスクライバーは各パーティションから 100 件を超えるメッセージを保存できません。