Lite サブスクリプションからメッセージを受信する

このページでは、Lite サブスクリプションからメッセージを受信する方法について説明します。メッセージは、Java 用の Pub/Sub Lite クライアント ライブラリを使用して受信できます。

Lite サブスクリプションでは、Lite トピックがサブスクライバー アプリケーションに接続され、サブスクライバーは、Lite サブスクリプションからメッセージを受信します。サブスクライバーは、パブリッシャー アプリケーションが Lite トピックに送信するメッセージをすべて受信します。これには Lite サブスクリプションを作成する前にパブリッシャーが送信したメッセージも含まれます。

Lite サブスクリプションからメッセージを受信する前に、Lite トピックを作成し、その Lite トピックに Lite サブスクリプションを作成して、メッセージをパブリッシュします。

メッセージの受信

Lite サブスクリプションからメッセージを受信するには、Lite サブスクリプションからのメッセージをリクエストします。クライアント ライブラリは、Lite サブスクリプションに関連付けられた Lite トピックのパーティションに自動的に接続します。

次の例は、Lite サブスクリプションからメッセージを受信する方法を示しています。

Java

このサンプルを実行する前に、Pub/Sub Lite クライアント ライブラリの Java の設定手順を実施してください。

import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.SubscriptionName;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
import com.google.cloud.pubsublite.cloudpubsub.Subscriber;
import com.google.cloud.pubsublite.cloudpubsub.SubscriberSettings;
import com.google.pubsub.v1.PubsubMessage;
import io.grpc.StatusException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class SubscriberExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    // Choose an existing subscription for the subscribe example to work.
    String subscriptionId = "your-subscription-id";
    long projectNumber = Long.parseLong("123456789");

    subscriberExample(cloudRegion, zoneId, projectNumber, subscriptionId);
  }

  public static void subscriberExample(
      String cloudRegion, char zoneId, long projectNumber, String subscriptionId)
      throws StatusException {

    SubscriptionPath subscriptionPath =
        SubscriptionPath.newBuilder()
            .setLocation(CloudZone.of(CloudRegion.of(cloudRegion), zoneId))
            .setProject(ProjectNumber.of(projectNumber))
            .setName(SubscriptionName.of(subscriptionId))
            .build();

    // The message stream is paused based on the maximum size or number of messages that the
    // subscriber has already received, whichever condition is met first.
    FlowControlSettings flowControlSettings =
        FlowControlSettings.builder()
            // 10 MiB. Must be greater than the allowed size of the largest message (1 MiB).
            .setBytesOutstanding(10 * 1024 * 1024L)
            // 1,000 outstanding messages. Must be >0.
            .setMessagesOutstanding(1000L)
            .build();

    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          System.out.println("Id : " + message.getMessageId());
          System.out.println("Data : " + message.getData().toStringUtf8());
          consumer.ack();
        };

    SubscriberSettings subscriberSettings =
        SubscriberSettings.newBuilder()
            .setSubscriptionPath(subscriptionPath)
            .setReceiver(receiver)
            // Flow control settings are set at the partition level.
            .setPerPartitionFlowControlSettings(flowControlSettings)
            .build();

    Subscriber subscriber = Subscriber.create(subscriberSettings);

    // Start the subscriber. Upon successful starting, its state will become RUNNING.
    subscriber.startAsync().awaitRunning();

    System.out.println("Listening to messages on " + subscriptionPath.toString() + "...");

    try {
      System.out.println(subscriber.state());
      // Wait 90 seconds for the subscriber to reach TERMINATED state. If it encounters
      // unrecoverable errors before then, its state will change to FAILED and an
      // IllegalStateException will be thrown.
      subscriber.awaitTerminated(90, TimeUnit.SECONDS);
    } catch (TimeoutException t) {
      // Shut down the subscriber. This will change the state of the subscriber to TERMINATED.
      subscriber.stopAsync().awaitTerminated();
      System.out.println("Subscriber is shut down: " + subscriber.state());
    }
  }
}

クライアント ライブラリは、Lite トピックの各パーティションへの双方向ストリーミング接続を確立します。

  1. サブスクライバーはパーティションへの接続をリクエストします。

  2. Pub/Sub Lite サービスは、サブスクライバーにメッセージを配信します。

サブスクライバーはメッセージを処理した後、メッセージに対する確認応答を行う必要があります。クライアント ライブラリは、コールバック内で非同期にメッセージを処理して確認応答を行います。サブスクライバーがメモリに格納できる確認応答されていないメッセージの数を制限するには、フロー制御の設定を構成します

複数のサブスクライバーが同じ Lite サブスクリプションからメッセージを受信した場合、Pub/Sub Lite サービスは、各サブスクライバーを同じ比率でパーティションに接続します。たとえば、2 つのサブスクライバーが同じ Lite サブスクリプションを使用し、2 つのパーティションを持つ Lite トピックに接続した場合、各サブスクライバーがいずれかのパーティションからメッセージを受信します。

メッセージの確認応答

メッセージを確認応答するには、Lite サブスクリプションに確認応答を送信します。

Java

確認応答を送信するには、AckReplyConsumer.ack() メソッドを使用します。

すべてのメッセージを確認応答します。サブスクライバーは、最も古い未確認のメッセージを最初に受け取り、その後に後続の各メッセージを受け取ります。サブスクライバーが 1 つのメッセージをスキップして後続のメッセージを確認応答してから再接続すると、サブスクライバーは未確認メッセージと、それぞれの後続の確認応答メッセージを受信します。

Lite サブスクリプションには確認応答期限がなく、Pub/Sub Lite サービスは未確認のメッセージをオープン ストリーミング接続で再配信しません。

フロー制御の使用

Pub/Sub Lite サービスがサブスクライバーにメッセージを配信した後、サブスクライバーは確認応答されていないメッセージをメモリに保存します。フロー制御設定を使用して、サブスクライバーがメモリに格納できる未処理のメッセージの数を制限できます。フロー制御設定は、サブスクライバーがメッセージを受信する各パーティションに適用されます。

次のフロー制御設定を構成できます。

  • 未処理のメッセージ サイズ。未処理のメッセージの最大サイズ(バイト単位)。この最大サイズは、いちばん大きなメッセージのサイズより大きい必要があります。
  • メッセージの数。未処理のメッセージの最大数です。

メッセージのサイズは size_bytes フィールドに設定されてます。フロー制御設定はクライアント ライブラリで構成できます。

Java

フロー制御設定を構成するには、FlowControlRequest.Builder クラスの次のメソッドを使用します。

たとえば、メッセージの最大数が 100 件で、サブスクライバーが 10 個のパーティションに接続する場合、サブスクライバーは 10 個のパーティションから 100 件を超えるメッセージを受信できません。未処理のメッセージの合計数が 100 件を超えることがありますが、サブスクライバーは各パーティションから 100 件を超えるメッセージを保存できません。