接收精简版订阅的消息

本页面介绍了如何接收精简版订阅的消息。您可以使用 Java 版 Pub/Sub Lite 客户端库接收消息。

精简版订阅将精简版主题连接到订阅者应用;订阅者从精简版订阅接收消息。订阅者会收到发布商应用向精简版主题发送的所有消息,包括发布者在创建精简版订阅之前发送的消息。

在接收精简版订阅的消息之前,请创建精简版主题,针对精简版主题创建精简版订阅,并发布消息

接收消息

要接收精简版订阅的消息,请向精简版订阅请求消息。客户端库会自动连接到附加到精简版订阅的精简版主题中的分区

以下示例展示了如何接收精简版订阅的消息:

Java

在运行此示例之前,请按照 Pub/Sub Lite 客户端库中的 Java 设置说明进行操作。

import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.SubscriptionName;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.SubscriptionPaths;
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
import com.google.cloud.pubsublite.cloudpubsub.Subscriber;
import com.google.cloud.pubsublite.cloudpubsub.SubscriberSettings;
import com.google.pubsub.v1.PubsubMessage;
import io.grpc.StatusException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class SubscriberExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    // Choose an existing subscription for the subscribe example to work.
    String subscriptionId = "your-subscription-id";
    long projectNumber = Long.parseLong("123456789");

    subscriberExample(cloudRegion, zoneId, projectNumber, subscriptionId);
  }

  public static void subscriberExample(
      String cloudRegion, char zoneId, long projectNumber, String subscriptionId)
      throws StatusException {

    SubscriptionPath subscriptionPath =
        SubscriptionPaths.newBuilder()
            .setZone(CloudZone.of(CloudRegion.of(cloudRegion), zoneId))
            .setProjectNumber(ProjectNumber.of(projectNumber))
            .setSubscriptionName(SubscriptionName.of(subscriptionId))
            .build();

    // The message stream is paused based on the maximum size or number of messages that the
    // subscriber has already received, whichever condition is met first.
    FlowControlSettings flowControlSettings =
        FlowControlSettings.builder()
            // 10 MiB. Must be greater than the allowed size of the largest message (1 MiB).
            .setBytesOutstanding(10 * 1024 * 1024L)
            // 1,000 outstanding messages. Must be >0.
            .setMessagesOutstanding(1000L)
            .build();

    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          System.out.println("Id : " + message.getMessageId());
          System.out.println("Data : " + message.getData().toStringUtf8());
          consumer.ack();
        };

    SubscriberSettings subscriberSettings =
        SubscriberSettings.newBuilder()
            .setSubscriptionPath(subscriptionPath)
            .setReceiver(receiver)
            // Flow control settings are set at the partition level.
            .setPerPartitionFlowControlSettings(flowControlSettings)
            .build();

    Subscriber subscriber = Subscriber.create(subscriberSettings);

    // Start the subscriber. Upon successful starting, its state will become RUNNING.
    subscriber.startAsync().awaitRunning();

    System.out.println("Listening to messages on " + subscriptionPath.value() + "...");

    try {
      System.out.println(subscriber.state());
      // Wait 90 seconds for the subscriber to reach TERMINATED state. If it encounters
      // unrecoverable errors before then, its state will change to FAILED and an
      // IllegalStateException will be thrown.
      subscriber.awaitTerminated(90, TimeUnit.SECONDS);
    } catch (TimeoutException t) {
      // Shut down the subscriber. This will change the state of the subscriber to TERMINATED.
      subscriber.stopAsync().awaitTerminated();
      System.out.println("Subscriber is shut down: " + subscriber.state());
    }
  }
}

客户端库会建立与精简版主题中每个分区的双向流式传输连接。

  1. 订阅者请求连接到分区。

  2. Pub/Sub 精简版服务将消息传递给订阅者。

订阅者处理消息后,订阅者必须确认消息。客户端库对回调中的消息进行异步处理和确认。要限制订阅者可以存储在内存中的未确认消息数量,请配置流控制设置

如果多个订阅者从同一精简版订阅接收消息,Pub/Sub 精简版服务会将每个订阅者连接到相同比例的分区。例如,如果两个订阅者使用一个相同的精简版订阅,并且该精简版订阅附加到具有两个分区的精简版主题,则每个订阅者会收到来自其中一个分区的消息。

确认消息

要确认消息,请向精简版订阅发送确认消息。

Java

如需发送确认,请使用 AckReplyConsumer.ack() 方法。

确认每一条消息。订阅者会先收到最早的未确认消息,然后接收每个后续消息。如果订阅者跳过一条消息,确认后续消息,然后重新连接,则订阅者会收到未确认的消息以及每条已确认的后续消息。

精简版订阅没有确认时限,并且 Pub/Sub Lite 服务不会通过开放的串流连接重新确认未确认的消息。

使用流控制

Pub/Sub Lite 服务向订阅者传送消息后,订阅者会将未确认的消息存储在内存中。您可以使用流控制设置限制订阅者可在内存中存储的未完成消息数量。流控制设置适用于订阅者接收消息的每个分区。

您可以配置以下流控制设置:

  • 待处理的消息大小。待处理的消息的大小上限(以字节为单位)。最大大小必须大于最大消息的大小。
  • 消息数量待处理消息的最大数量。

消息的大小位于 size_bytes 字段中。您可以使用客户端库配置流控制设置。

Java

如需配置流控制设置,请在 FlowControlRequest.Builder 类中使用以下方法:

例如,如果消息数量上限为 100,并且订阅者连接到 10 个分区,则订阅者无法从这 10 个分区收到超过 100 条消息。未完成消息的总数可能超过 100,但订阅者无法从每个分区存储超过 100 条消息。