Pull Subscriber Guide

Cloud Pub/Sub supports both push and pull message delivery. For an overview and comparison of pull and push subscriptions, see the Subscriber Overview. This document describes pull delivery. For a discussion of push delivery, see the Push Subscriber Guide.

Asynchronous Pull

Using asynchronous pulling provides higher throughput in your application, by not requiring your application to block for new messages. Messages can be received in your application using a long running message listener, and acknowledged one message at a time, as shown in the example below.

Not all client libraries support asynchronously pulling messages. To learn about synchronously pulling messages, see Synchronous Pull.

For more information, see the API Reference documentation in your programming language.

Go

Before trying this sample, follow the Go setup instructions in the Cloud Pub/Sub Quickstart Using Client Libraries . For more information, see the Cloud Pub/Sub Go API reference documentation .

// Consume 10 messages.
var mu sync.Mutex
received := 0
sub := client.Subscription(subName)
cctx, cancel := context.WithCancel(ctx)
err := sub.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) {
	msg.Ack()
	fmt.Printf("Got message: %q\n", string(msg.Data))
	mu.Lock()
	defer mu.Unlock()
	received++
	if received == 10 {
		cancel()
	}
})
if err != nil {
	return err
}

Java

Before trying this sample, follow the Java setup instructions in the Cloud Pub/Sub Quickstart Using Client Libraries . For more information, see the Cloud Pub/Sub Java API reference documentation .

String projectId = "my-project-id";
String subscriptionId = "my-subscription-id";

ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of(projectId, subscriptionId);
// Instantiate an asynchronous message receiver
MessageReceiver receiver =
    new MessageReceiver() {
      @Override
      public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
        // handle incoming message, then ack/nack the received message
        System.out.println("Id : " + message.getMessageId());
        System.out.println("Data : " + message.getData().toStringUtf8());
        consumer.ack();
      }
    };

Subscriber subscriber = null;
try {
  // Create a subscriber for "my-subscription-id" bound to the message receiver
  subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
  subscriber.startAsync();
  // ...
} finally {
  // stop receiving messages
  if (subscriber != null) {
    subscriber.stopAsync();
  }
}

Node.js

Before trying this sample, follow the Node.js setup instructions in the Cloud Pub/Sub Quickstart Using Client Libraries . For more information, see the Cloud Pub/Sub Node.js API reference documentation .

// Imports the Google Cloud client library
const PubSub = require(`@google-cloud/pubsub`);

// Creates a client
const pubsub = new PubSub();

/**
 * TODO(developer): Uncomment the following lines to run the sample.
 */
// const subscriptionName = 'your-subscription';
// const timeout = 60;

// References an existing subscription
const subscription = pubsub.subscription(subscriptionName);

// Create an event handler to handle messages
let messageCount = 0;
const messageHandler = message => {
  console.log(`Received message ${message.id}:`);
  console.log(`\tData: ${message.data}`);
  console.log(`\tAttributes: ${message.attributes}`);
  messageCount += 1;

  // "Ack" (acknowledge receipt of) the message
  message.ack();
};

// Listen for new messages until timeout is hit
subscription.on(`message`, messageHandler);
setTimeout(() => {
  subscription.removeListener('message', messageHandler);
  console.log(`${messageCount} message(s) received.`);
}, timeout * 1000);

Python

Before trying this sample, follow the Python setup instructions in the Cloud Pub/Sub Quickstart Using Client Libraries . For more information, see the Cloud Pub/Sub Python API reference documentation .

# project           = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
    project, subscription_name)

def callback(message):
    print('Received message: {}'.format(message))
    message.ack()

subscriber.subscribe(subscription_path, callback=callback)

# The subscriber is non-blocking, so we must keep the main thread from
# exiting to allow it to process messages in the background.
print('Listening for messages on {}'.format(subscription_path))
while True:
    time.sleep(60)

Processing Custom Attributes

This sample shows how to pull messages asynchronously and retrieve the custom attributes from metadata:

Python

Before trying this sample, follow the Python setup instructions in the Cloud Pub/Sub Quickstart Using Client Libraries . For more information, see the Cloud Pub/Sub Python API reference documentation .

# project           = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
    project, subscription_name)

def callback(message):
    print('Received message: {}'.format(message.data))
    if message.attributes:
        print('Attributes:')
        for key in message.attributes:
            value = message.attributes.get(key)
            print('{}: {}'.format(key, value))
    message.ack()

subscriber.subscribe(subscription_path, callback=callback)

# The subscriber is non-blocking, so we must keep the main thread from
# exiting to allow it to process messages in the background.
print('Listening for messages on {}'.format(subscription_path))
while True:
    time.sleep(60)

Ruby

Before trying this sample, follow the Ruby setup instructions in the Cloud Pub/Sub Quickstart Using Client Libraries . For more information, see the Cloud Pub/Sub Ruby API reference documentation .

# project_id        = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

subscription = pubsub.subscription subscription_name
subscriber   = subscription.listen do |received_message|
  puts "Received message: #{received_message.data}"
  if !received_message.attributes.empty?
    puts "Attributes:"
    received_message.attributes.each do |key, value|
      puts "#{key}: #{value}"
    end
  end
  received_message.acknowledge!
end

subscriber.start
# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60
subscriber.stop.wait!

Listening for Errors

This sample shows how to handle errors that arise when subscribing to messages:

Go

Before trying this sample, follow the Go setup instructions in the Cloud Pub/Sub Quickstart Using Client Libraries . For more information, see the Cloud Pub/Sub Go API reference documentation .

// If the service returns a non-retryable error, Receive returns that error after
// all of the outstanding calls to the handler have returned.
err := client.Subscription(subName).Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
	fmt.Printf("Got message: %q\n", string(msg.Data))
	msg.Ack()
})
if err != nil {
	return err
}

Java

Before trying this sample, follow the Java setup instructions in the Cloud Pub/Sub Quickstart Using Client Libraries . For more information, see the Cloud Pub/Sub Java API reference documentation .

subscriber.addListener(
    new Subscriber.Listener() {
      public void failed(Subscriber.State from, Throwable failure) {
        // Handle error.
      }
    },
    MoreExecutors.directExecutor());

Node.js

Before trying this sample, follow the Node.js setup instructions in the Cloud Pub/Sub Quickstart Using Client Libraries . For more information, see the Cloud Pub/Sub Node.js API reference documentation .

// Imports the Google Cloud client library
const PubSub = require(`@google-cloud/pubsub`);

// Creates a client
const pubsub = new PubSub();

/**
 * TODO(developer): Uncomment the following lines to run the sample.
 */
// const subscriptionName = 'your-subscription';
// const timeout = 60;

// References an existing subscription
const subscription = pubsub.subscription(subscriptionName);

// Create an event handler to handle messages
const messageHandler = function(message) {
  // Do something with the message
  console.log(`Message: ${message}`);

  // "Ack" (acknowledge receipt of) the message
  message.ack();
};

// Create an event handler to handle errors
const errorHandler = function(error) {
  // Do something with the error
  console.error(`ERROR: ${error}`);
};

// Listen for new messages/errors until timeout is hit
subscription.on(`message`, messageHandler);
subscription.on(`error`, errorHandler);
setTimeout(() => {
  subscription.removeListener(`message`, messageHandler);
  subscription.removeListener(`error`, errorHandler);
}, timeout * 1000);

Python

Before trying this sample, follow the Python setup instructions in the Cloud Pub/Sub Quickstart Using Client Libraries . For more information, see the Cloud Pub/Sub Python API reference documentation .

# project           = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
    project, subscription_name)

def callback(message):
    print('Received message: {}'.format(message))
    message.ack()

future = subscriber.subscribe(subscription_path, callback=callback)

# Blocks the thread while messages are coming in through the stream. Any
# exceptions that crop up on the thread will be set on the future.
try:
    # When timeout is unspecified, the result method waits indefinitely.
    future.result(timeout=30)
except Exception as e:
    print(
        'Listening for messages on {} threw an Exception: {}.'.format(
            subscription_name, e))

Ruby

Before trying this sample, follow the Ruby setup instructions in the Cloud Pub/Sub Quickstart Using Client Libraries . For more information, see the Cloud Pub/Sub Ruby API reference documentation .

# project_id        = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

subscription = pubsub.subscription subscription_name
subscriber   = subscription.listen do |received_message|
  puts "Received message: #{received_message.data}"
  received_message.acknowledge!
end
# Propagate expection from child threads to the main thread as soon as it is
# raised. Exceptions happened in the callback thread are collected in the
# callback thread pool and do not propagate to the main thread
Thread.abort_on_exception= true

begin
  subscriber.start
  # Let the main thread sleep for 60 seconds so the thread for listening
  # messages does not quit
  sleep 60
  subscriber.stop.wait!
rescue Exception => ex
  puts "Exception #{ex.inspect}: #{ex.message}"
  raise "Stopped listening for messages."
end

Message Flow Control

Your subscriber client might process and acknowledge messages slower than Cloud Pub/Sub sends them to the client. In this case:

  • It's possible that one client could have a backlog of messages because it doesn't have the capacity to process the volume of incoming messages, but another client on the network does have that capacity. The second client could reduce the overall backlog, but it doesn't get the chance to because the first client cannot send its messages to the second client quickly enough. This reduces the overall rate of processing because messages get stuck on the first client.
  • Because the client library repeatedly extends the acknowledgement deadline for backlogged messages, those messages continue to consume memory, CPU, and bandwidth resources. As such, you might experience out-of-memory issues.

To maximize throughput and minimize latency in this situation, use the flow control features of the subscriber to control the rate at which the subscriber retrieves messages. These flow control features are illustrated in the following samples:

C#

Before trying this sample, follow the C# setup instructions in the Cloud Pub/Sub Quickstart Using Client Libraries . For more information, see the Cloud Pub/Sub C# API reference documentation .

SubscriptionName subscriptionName = new SubscriptionName(projectId,
    subscriptionId);
SubscriberServiceApiClient subscriberClient = SubscriberServiceApiClient.Create();
SubscriberClient subscriber = SubscriberClient.Create(
    subscriptionName, new[] { subscriberClient },
    new SubscriberClient.Settings()
    {
        AckExtensionWindow = TimeSpan.FromSeconds(4),
        Scheduler = Google.Api.Gax.SystemScheduler.Instance,
        StreamAckDeadline = TimeSpan.FromSeconds(10),
        FlowControlSettings = new Google.Api.Gax
            .FlowControlSettings(
            maxOutstandingElementCount: 100,
            maxOutstandingByteCount: 10240)
    });
// SimpleSubscriber runs your message handle function on multiple
// threads to maximize throughput.
subscriber.StartAsync(
    async (PubsubMessage message, CancellationToken cancel) =>
    {
        string text =
            Encoding.UTF8.GetString(message.Data.ToArray());
        await Console.Out.WriteLineAsync(
            $"Message {message.MessageId}: {text}");
        return acknowledge ? SubscriberClient.Reply.Ack
            : SubscriberClient.Reply.Nack;
    });
// Run for 3 seconds.
Thread.Sleep(3000);
subscriber.StopAsync(CancellationToken.None).Wait();

Go

Before trying this sample, follow the Go setup instructions in the Cloud Pub/Sub Quickstart Using Client Libraries . For more information, see the Cloud Pub/Sub Go API reference documentation .

sub := client.Subscription(subName)
sub.ReceiveSettings.MaxOutstandingMessages = 10
err := sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
	fmt.Printf("Got message: %q\n", string(msg.Data))
	msg.Ack()
})
if err != nil {
	return err
}

Java

Before trying this sample, follow the Java setup instructions in the Cloud Pub/Sub Quickstart Using Client Libraries . For more information, see the Cloud Pub/Sub Java API reference documentation .

FlowControlSettings flowControlSettings =
    FlowControlSettings.newBuilder()
        .setMaxOutstandingElementCount(10_000L)
        .setMaxOutstandingRequestBytes(1_000_000_000L)
        .build();
Subscriber subscriber =
    Subscriber.newBuilder(subscriptionName, receiver)
        .setFlowControlSettings(flowControlSettings)
        .build();

Node.js

Before trying this sample, follow the Node.js setup instructions in the Cloud Pub/Sub Quickstart Using Client Libraries . For more information, see the Cloud Pub/Sub Node.js API reference documentation .

// Imports the Google Cloud client library
const PubSub = require(`@google-cloud/pubsub`);

// Creates a client
const pubsub = new PubSub();

/**
 * TODO(developer): Uncomment the following lines to run the sample.
 */
// const topicName = 'your-topic';
// const subscriptionName = 'your-subscription';
// const maxInProgress = 5;
// const maxBytes = 10000;

const topic = pubsub.topic(topicName);

const options = {
  flowControl: {
    maxBytes: maxBytes,
    maxMessages: maxInProgress,
  },
};

const subscription = topic.subscription(subscriptionName, options);

// Creates a new subscription
// Note that flow control configurations are not persistent
subscription
  .get({
    autoCreate: true,
  })
  .then(results => {
    const subscription = results[0];

    console.log(
      `Subscription ${
        subscription.name
      } created with a maximum of ${maxInProgress} unprocessed messages.`
    );
  })
  .catch(err => {
    console.error('ERROR:', err);
  });

Python

Before trying this sample, follow the Python setup instructions in the Cloud Pub/Sub Quickstart Using Client Libraries . For more information, see the Cloud Pub/Sub Python API reference documentation .

# project           = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
    project, subscription_name)

def callback(message):
    print('Received message: {}'.format(message.data))
    message.ack()

# Limit the subscriber to only have ten outstanding messages at a time.
flow_control = pubsub_v1.types.FlowControl(max_messages=10)
subscriber.subscribe(
    subscription_path, callback=callback, flow_control=flow_control)

# The subscriber is non-blocking, so we must keep the main thread from
# exiting to allow it to process messages in the background.
print('Listening for messages on {}'.format(subscription_path))
while True:
    time.sleep(60)

Ruby

Before trying this sample, follow the Ruby setup instructions in the Cloud Pub/Sub Quickstart Using Client Libraries . For more information, see the Cloud Pub/Sub Ruby API reference documentation .

# project_id        = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

subscription = pubsub.subscription subscription_name
subscriber   = subscription.listen inventory: 10 do |received_message|
  puts "Received message: #{received_message.data}"
  received_message.acknowledge!
end

subscriber.start
# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60
subscriber.stop.wait!
More generally, the need for flow control indicates that messages are being published at a higher rate than they are being consumed. If this is a persistent state, rather than a spike in message volume, consider increasing the number of subscriber client instances and machines.

Concurrency Control

Support for concurrency depends on your programming language. For language implementations that support parallel threads, such as Java and Go, the client libraries make a default choice of the number of threads. This choice may not be optimal for your application. For example, if you find that your subscriber application is not keeping up with the incoming message volume but is not CPU-bound, you should increase the thread count. For CPU-intensive message processing operations, reducing the number of threads might be appropriate.

The following sample illustrates how to control concurrency in a subscriber:

Java

Before trying this sample, follow the Java setup instructions in the Cloud Pub/Sub Quickstart Using Client Libraries . For more information, see the Cloud Pub/Sub Java API reference documentation .

// provide a separate executor service for polling
ExecutorProvider executorProvider =
    InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(1).build();

Subscriber subscriber =
    Subscriber.newBuilder(subscriptionName, receiver)
        .setExecutorProvider(executorProvider)
        .build();

Ruby

Before trying this sample, follow the Ruby setup instructions in the Cloud Pub/Sub Quickstart Using Client Libraries . For more information, see the Cloud Pub/Sub Ruby API reference documentation .

# project_id        = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

subscription = pubsub.subscription subscription_name
# Use 2 threads for streaming, 4 threads for executing callbacks and 2 threads
# for sending acknowledgements and/or delays
subscriber   = subscription.listen streams: 2, threads: {
  :callback => 4,
  :push => 2
} do |received_message |
  puts "Received message: #{received_message.data}"
  received_message.acknowledge!
end

subscriber.start
# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60
subscriber.stop.wait!

Support for concurrency depends on your programming language. Refer to the API Reference documentation for more information.

Synchronous Pull

If you require constant polling of new messages, the asynchronous pull (if available in your choice of programming language) might be a better fit to achieve better throughput. Synchronous pull is better suited to workloads that do not require handling of messages as soon as they are published. A synchronous subscriber does not need to keep a long-running connection alive and can choose to pull and handle a fixed number of messages. The subscriber times out if no messages are currently available to be handled.

If low latency is desired when using synchronous pull, then it is important to have many simultaneously outstanding pull requests with returnImmediately set to false. As the throughput of the topic increases, more pull requests are necessary. It is appropriate to have anywhere between 10 and 100 requests outstanding at a time. However, to minimize latency, it is best to use asynchronous pull.

Not all client libraries support synchronously pulling a fixed number of messages. See the API Reference documentation for your chosen programming language for details. Here is some sample code to pull and acknowledge a fixed number of messages:

Protocol

Request:

POST https://pubsub.googleapis.com/v1/projects/myproject/subscriptions/mysubscription:pull
{
  "returnImmediately": "false",
  "maxMessages": "1"
}

Response:

200 OK
{
  "receivedMessages": [
    {
      "ackId": "dQNNHlAbEGEIBERNK0EPKVgUWQYyODM2LwgRHFEZDDsLRk1SK...",
      "message": {
        "data": "SGVsbG8gQ2xvdWQgUHViL1N1YiEgSGVyZSBpcyBteSBtZXNzYWdlIQ==",
        "messageId": "19917247034"
      }
    }
  ]
}

Request:

POST https://pubsub.googleapis.com/v1/projects/myproject/subscriptions/mysubscription:acknowledge
{
  "ackIds": [
    "dQNNHlAbEGEIBERNK0EPKVgUWQYyODM2LwgRHFEZDDsLRk1SK..."
  ]
}

Response:

200 OK

C#

Before trying this sample, follow the C# setup instructions in the Cloud Pub/Sub Quickstart Using Client Libraries . For more information, see the Cloud Pub/Sub C# API reference documentation .

SubscriptionName subscriptionName = new SubscriptionName(projectId,
    subscriptionId);
SubscriberServiceApiClient subscriberClient = SubscriberServiceApiClient.Create();
SubscriberClient subscriber = SubscriberClient.Create(
    subscriptionName, new[] { subscriberClient });

Java

Before trying this sample, follow the Java setup instructions in the Cloud Pub/Sub Quickstart Using Client Libraries . For more information, see the Cloud Pub/Sub Java API reference documentation .

SubscriberStubSettings subscriberStubSettings =
    SubscriberStubSettings.newBuilder().build();
try (SubscriberStub subscriber = GrpcSubscriberStub.create(subscriberStubSettings)) {
  // String projectId = "my-project-id";
  // String subscriptionId = "my-subscription-id";
  // int numOfMessages = 10;   // max number of messages to be pulled
  String subscriptionName = ProjectSubscriptionName.format(projectId, subscriptionId);
  PullRequest pullRequest =
      PullRequest.newBuilder()
          .setMaxMessages(numOfMessages)
          .setReturnImmediately(false) // return immediately if messages are not available
          .setSubscription(subscriptionName)
          .build();

  // use pullCallable().futureCall to asynchronously perform this operation
  PullResponse pullResponse = subscriber.pullCallable().call(pullRequest);
  List<String> ackIds = new ArrayList<>();
  for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) {
    // handle received message
    // ...
    ackIds.add(message.getAckId());
  }
  // acknowledge received messages
  AcknowledgeRequest acknowledgeRequest =
      AcknowledgeRequest.newBuilder()
          .setSubscription(subscriptionName)
          .addAllAckIds(ackIds)
          .build();
  // use acknowledgeCallable().futureCall to asynchronously perform this operation
  subscriber.acknowledgeCallable().call(acknowledgeRequest);
  return pullResponse.getReceivedMessagesList();
}

PHP

Before trying this sample, follow the PHP setup instructions in the Cloud Pub/Sub Quickstart Using Client Libraries . For more information, see the Cloud Pub/Sub PHP API reference documentation .

use Google\Cloud\PubSub\PubSubClient;

/**
 * Pulls all Pub/Sub messages for a subscription.
 *
 * @param string $projectId  The Google project ID.
 * @param string $subscriptionName  The Pub/Sub subscription name.
 */
function pull_messages($projectId, $subscriptionName)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);
    $subscription = $pubsub->subscription($subscriptionName);
    foreach ($subscription->pull() as $message) {
        printf('Message: %s' . PHP_EOL, $message->data());
        // Acknowledge the Pub/Sub message has been received, so it will not be pulled multiple times.
        $subscription->acknowledge($message);
    }
}

Python

Before trying this sample, follow the Python setup instructions in the Cloud Pub/Sub Quickstart Using Client Libraries . For more information, see the Cloud Pub/Sub Python API reference documentation .

# project           = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
    project, subscription_name)

# Builds a pull request with a specific number of messages to return.
# `return_immediately` is set to False so that the system waits (for a
# bounded amount of time) until at lease one message is available.
response = subscriber.pull(
    subscription_path,
    max_messages=3,
    return_immediately=False)

ack_ids = []
for received_message in response.received_messages:
    print("Received: {}".format(received_message.message.data))
    ack_ids.append(received_message.ack_id)

# Acknowledges the received messages so they will not be sent again.
subscriber.acknowledge(subscription_path, ack_ids)

Ruby

Before trying this sample, follow the Ruby setup instructions in the Cloud Pub/Sub Quickstart Using Client Libraries . For more information, see the Cloud Pub/Sub Ruby API reference documentation .

# project_id        = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

subscription = pubsub.subscription subscription_name
subscription.pull.each do |message|
  puts "Message pulled: #{message.data}"
  message.acknowledge!
end

Scaling

You may need to implement a scaling mechanism for your subscriber application to keep up with message volume. How to do this depends on your environment, but it will generally be based on backlog metrics offered through the Stackdriver Monitoring service. For details on how to do this for Compute Engine, see Scaling based on Cloud Monitoring Metrics.

Search for "pubsub" on the Supported Metrics page of the Cloud Monitoring to learn which metrics can be monitored programmatically.

Finally, as with all distributed services, expect to occasionally retry every request.

StreamingPull

The Cloud Pub/Sub service has two APIs for retrieving messages:

Where possible, the Cloud Client libraries use StreamingPull, a bidirectional streaming RPC for maximum throughput and lowest latency. Although you may never use the StreamingPull API directly, it is important to understand some crucial properties of StreamingPull and how it differs from the more traditional pull method.

The Pull method relies on a request/response model:

  1. The application sends a request for messages.
  2. The server replies with zero or more messages and closes the connection.

The StreamingPull service API relies on a persistent bidirectional connection to receive multiple messages as they become available, send acknowledgements, and modify acknowledgement deadlines:

  1. The client sends a request to the service to establish a connection.
  2. The client uses that connection to exchange message data.
  3. The request (that is, the bidirectional connection) is terminated either by the client or the server.

StreamingPull has a 100% error rate (this is to be expected)

StreamingPull streams are always terminated with a retry-able error code. Note that unlike in regular RPCs, the error here is simply an indication that a stream has been broken, not that requests are failing. Therefore, while the StreamingPull API may have a seemingly surprising 100% error rate, this is by design. To diagnose StreamingPull errors, we recommend focusing on StreamingPull message operation metrics, rather than StreamingPull request metrics.

Dealing with large backlogs of small messages

The gRPC StreamingPull stack is optimized for high throughput and therefore buffers messages. This can have some consequences if you are attempting to process large backlogs of small messages (rather than a steady stream of new messages). Under these conditions, you may see messages delivered multiple times and they may not be load balanced effectively across clients.

The buffer between the Cloud Pub/Sub service and the client library user space is roughly 10MB. To understand the impact of this buffer on client library behavior, consider this example:

  • There is a backlog of 10000 1KB messages on a subscription.
  • Each message takes 1 second to process sequentially, by a single-threaded client instance.
  • The first client instance to establish a StreamingPull connection to the service for that subscription will get a buffer of the entire 10K messages.
  • It takes 10000 seconds (almost 3 hours) to process the buffer.
  • In that time, some of the messages exceed their acknowledgement deadline and are re-sent to the same client, resulting in duplicates.
  • When multiple client instances are running, the messages stuck in the buffer will not be available to any instances other than the first.

This situation will not occur if the messages are arriving at a steady rate, rather as a single large batch. The service never has the entire 10MB of messages at a time and so is able to effectively load balance messages across multiple subscribers.

To address this situation, either use a push subscription or a pull API, currently available in some of the Cloud Client Libraries (see the Synchronous Pull section) and all API Client libraries. To learn more, see the Client Libraries documention.

Was this page helpful? Let us know how we did:

Send feedback about...

Cloud Pub/Sub