Receiving messages from Lite subscriptions

This page explains how to receive messages from Lite subscriptions. You can receive messages with the Pub/Sub Lite client library for Java.

Lite subscriptions connect Lite topics to subscriber applications; subscribers receive messages from Lite subscriptions. Subscribers receive every message that publisher applications send to the Lite topic, including the messages that publishers send before you create the Lite subscription.

Before receiving messages from a Lite subscription, create a Lite topic, create a Lite subscription to the Lite topic, and publish messages to the Lite topic.

Receiving messages

To receive messages from a Lite subscription, request messages from the Lite subscription. The client library automatically connects to the partitions in the Lite topic attached to the Lite subscription.

The following sample shows you how to receive messages from Lite subscriptions:

Java

Before running this sample, follow the Java setup instructions in Pub/Sub Lite Client Libraries.

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.SubscriptionName;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
import com.google.cloud.pubsublite.cloudpubsub.Subscriber;
import com.google.cloud.pubsublite.cloudpubsub.SubscriberSettings;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class SubscriberExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    // Choose an existing subscription for the subscribe example to work.
    String subscriptionId = "your-subscription-id";
    long projectNumber = Long.parseLong("123456789");

    subscriberExample(cloudRegion, zoneId, projectNumber, subscriptionId);
  }

  public static void subscriberExample(
      String cloudRegion, char zoneId, long projectNumber, String subscriptionId)
      throws ApiException {

    SubscriptionPath subscriptionPath =
        SubscriptionPath.newBuilder()
            .setLocation(CloudZone.of(CloudRegion.of(cloudRegion), zoneId))
            .setProject(ProjectNumber.of(projectNumber))
            .setName(SubscriptionName.of(subscriptionId))
            .build();

    // The message stream is paused based on the maximum size or number of messages that the
    // subscriber has already received, whichever condition is met first.
    FlowControlSettings flowControlSettings =
        FlowControlSettings.builder()
            // 10 MiB. Must be greater than the allowed size of the largest message (1 MiB).
            .setBytesOutstanding(10 * 1024 * 1024L)
            // 1,000 outstanding messages. Must be >0.
            .setMessagesOutstanding(1000L)
            .build();

    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          System.out.println("Id : " + message.getMessageId());
          System.out.println("Data : " + message.getData().toStringUtf8());
          consumer.ack();
        };

    SubscriberSettings subscriberSettings =
        SubscriberSettings.newBuilder()
            .setSubscriptionPath(subscriptionPath)
            .setReceiver(receiver)
            // Flow control settings are set at the partition level.
            .setPerPartitionFlowControlSettings(flowControlSettings)
            .build();

    Subscriber subscriber = Subscriber.create(subscriberSettings);

    // Start the subscriber. Upon successful starting, its state will become RUNNING.
    subscriber.startAsync().awaitRunning();

    System.out.println("Listening to messages on " + subscriptionPath.toString() + "...");

    try {
      System.out.println(subscriber.state());
      // Wait 90 seconds for the subscriber to reach TERMINATED state. If it encounters
      // unrecoverable errors before then, its state will change to FAILED and an
      // IllegalStateException will be thrown.
      subscriber.awaitTerminated(90, TimeUnit.SECONDS);
    } catch (TimeoutException t) {
      // Shut down the subscriber. This will change the state of the subscriber to TERMINATED.
      subscriber.stopAsync().awaitTerminated();
      System.out.println("Subscriber is shut down: " + subscriber.state());
    }
  }
}

The client library establishes bidirectional streaming connections to each of the partitions in the Lite topic.

  1. The subscriber requests connections to the partitions.

  2. The Pub/Sub Lite service delivers the messages to the subscriber.

After the subscriber processes the message, the subscriber must acknowledge the message. The client library asynchronously processes and acknowledges messages in a callback. To limit the number of unacknowledged messages the subscriber can store in memory, configure the flow control settings.

If multiple subscribers receive messages from the same Lite subscription, the Pub/Sub Lite service connects each subscriber to an equal proportion of partitions. For example, if two subscribers use the same Lite subscription and the Lite subscription is attached to a Lite topic with two partitions, each subscriber receives messages from one of the partitions.

Acknowledging messages

To acknowledge a message, send an acknowledgement to the Lite subscription.

Java

To send an acknowledgment, use the AckReplyConsumer.ack() method.

Acknowledge every message. Subscribers receive the oldest unacknowledged message first, followed by each subsequent message. If a subscriber skips one message, acknowledges the subsequent messages, and then reconnects, the subscriber receives the unacknowledged message and each subsequent, acknowledged message.

Lite subscriptions don't have an acknowledgment deadline and the Pub/Sub Lite service doesn't redeliver unacknowledged messages over an open streaming connection.

Using flow control

After the Pub/Sub Lite service delivers messages to subscribers, the subscribers store unacknowledged messages in memory. You can limit the number of outstanding messages that subscribers can store in memory using flow control settings. The flow control settings apply to each partition that a subscriber receives messages from.

You can configure the following flow control settings:

  • Outstanding message size. The maximum size, in bytes, of the outstanding messages. The maximum size must be greater than the size of the largest message.
  • Number of messages. The maximum number of outstanding messages.

The size of a message is in the size_bytes field. You can configure flow control settings with the client library.

Java

To configure flow control settings, use the following methods in the FlowControlRequest.Builder class:

For example, if the maximum number of messages is 100 and the subscriber connects to 10 partitions, the subscriber cannot receive more than 100 messages from any of the 10 partitions. The total number of outstanding messages might be greater than 100, but the subscriber cannot store more than 100 messages from each partition.