Pull Subscriber Guide

Google Cloud Pub/Sub supports both push and pull message delivery. For an overview and comparison of pull and push subscriptions, see, see the Subscriber Overview. This document describes pull delivery. For a discussion of push delivery, see the Push Subscriber Guide.

Receiving Messages

For pull subscriptions, delivery is accomplished in two steps:

  1. Call the pull method to retrieve some number of unacknowledged messages. You can specify the max number of messages returned by one call by setting the maxMessages field. By default, the server keeps the connection open until at least one message is received. You can optionally set the returnImmediately field to true to prevent the subscriber from waiting if the queue is currently empty.
  2. Call the acknowledge method to indicate that the subscriber has finished processing the message and that the Pub/Sub server can delete it from the subscription queue. You can also optionally modify the ack deadline on a message to delay the acknowledgment.

Different client libraries abstract these steps in different ways. Consult the reference documentation for your client library of choice for details.

Here is some sample code to pull and acknowledge a message one at a time:

Protocol

Request:

POST https://pubsub.googleapis.com/v1/projects/myproject/subscriptions/mysubscription:pull
{
  "returnImmediately": "false",
  "maxMessages": "1"
}

Response:

200 OK
{
  "receivedMessages": [
    {
      "ackId": "dQNNHlAbEGEIBERNK0EPKVgUWQYyODM2LwgRHFEZDDsLRk1SK...",
      "message": {
        "data": "SGVsbG8gQ2xvdWQgUHViL1N1YiEgSGVyZSBpcyBteSBtZXNzYWdlIQ==",
        "messageId": "19917247034"
      }
    }
  ]
}

Request:

POST https://pubsub.googleapis.com/v1/projects/myproject/subscriptions/mysubscription:acknowledge
{
  "ackIds": [
    "dQNNHlAbEGEIBERNK0EPKVgUWQYyODM2LwgRHFEZDDsLRk1SK..."
  ]
}

Response:

200 OK

C#

For more on installing and creating a Cloud Pub/Sub client, refer to Cloud Pub/Sub Client Libraries.

SubscriptionName subscriptionName = new SubscriptionName(_projectId,
    subscriptionId);
PullResponse response = subscriber.Pull(subscriptionName,
    returnImmediately: true, maxMessages: 10);
subscriber.Acknowledge(subscriptionName,
    response.ReceivedMessages.Select(m => m.AckId));

Go

For more on installing and creating a Cloud Pub/Sub client, refer to Cloud Pub/Sub Client Libraries.

// Consume 10 messages.
var mu sync.Mutex
received := 0
sub := client.Subscription(name)
cctx, cancel := context.WithCancel(ctx)
err := sub.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) {
	mu.Lock()
	defer mu.Unlock()
	if received >= 10 {
		cancel()
		msg.Nack()
		return
	}
	fmt.Printf("Got message: %q\n", string(msg.Data))
	received++
	msg.Ack()
})
if err != nil {
	return err
}

Java

For more on installing and creating a Cloud Pub/Sub client, refer to Cloud Pub/Sub Client Libraries.

String projectId = "my-project-id";
String subscriptionId = "my-subscription-id";

SubscriptionName subscriptionName = SubscriptionName.create(projectId, subscriptionId);
// Instantiate an asynchronous message receiver
MessageReceiver receiver = new MessageReceiver() {
      @Override
      public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
        // handle incoming message, then ack/nack the received message
        System.out.println("Id : " + message.getMessageId());
        System.out.println("Data : " + message.getData().toStringUtf8());
        consumer.ack();
      }
    };

Subscriber subscriber = null;
try {
  // Create a subscriber for "my-subscription-id" bound to the message receiver
  subscriber = Subscriber.defaultBuilder(subscriptionName, receiver).build();
  subscriber.startAsync();
  // ...
} finally {
  // stop receiving messages
  if (subscriber != null) {
    subscriber.stopAsync();
  }
}

Node.js

For more on installing and creating a Cloud Pub/Sub client, refer to Cloud Pub/Sub Client Libraries.

function pullMessages (subscriptionName) {
  // Instantiates a client
  const pubsub = PubSub();

  // References an existing subscription, e.g. "my-subscription"
  const subscription = pubsub.subscription(subscriptionName);

  // Pulls messages. Set returnImmediately to false to block until messages are
  // received.
  return subscription.pull()
    .then((results) => {
      const messages = results[0];

      console.log(`Received ${messages.length} messages.`);

      messages.forEach((message) => {
        console.log(`* %d %j %j`, message.id, message.data, message.attributes);
      });

      // Acknowledges received messages. If you do not acknowledge, Pub/Sub will
      // redeliver the message.
      return subscription.ack(messages.map((message) => message.ackId));
    });
}

PHP

For more on installing and creating a Cloud Pub/Sub client, refer to Cloud Pub/Sub Client Libraries.

use Google\Cloud\PubSub\PubSubClient;

/**
 * Pulls all Pub/Sub messages for a subscription.
 *
 * @param string $projectId  The Google project ID.
 * @param string $subscriptionName  The Pub/Sub subscription name.
 */
function pull_messages($projectId, $subscriptionName)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);
    $subscription = $pubsub->subscription($subscriptionName);
    foreach ($subscription->pull() as $message) {
        printf('Message: %s' . PHP_EOL, $message->data());
    }
}

Python

For more on installing and creating a Cloud Pub/Sub client, refer to Cloud Pub/Sub Client Libraries.

def receive_message(topic_name, subscription_name):
    """Receives a message from a pull subscription."""
    pubsub_client = pubsub.Client()
    topic = pubsub_client.topic(topic_name)
    subscription = topic.subscription(subscription_name)

    # Change return_immediately=False to block until messages are
    # received.
    results = subscription.pull(return_immediately=True)

    print('Received {} messages.'.format(len(results)))

    for ack_id, message in results:
        print('* {}: {}, {}'.format(
            message.message_id, message.data, message.attributes))

    # Acknowledge received messages. If you do not acknowledge, Pub/Sub will
    # redeliver the message.
    if results:
        subscription.acknowledge([ack_id for ack_id, message in results])

Ruby

For more on installing and creating a Cloud Pub/Sub client, refer to Cloud Pub/Sub Client Libraries.

pubsub       = Google::Cloud::Pubsub.new project: "my-gcp-project-id"
subscription = pubsub.subscription "my-subscription"

puts "Messages pulled:"
subscription.pull.each do |message|
  puts message.data
  message.acknowledge!
end

Performance and Scaling

Pulling and acknowledging one message at a time can be inefficient. Typically, multiple messages are returned in a single pull response, and multiple ack IDs are sent in a single acknowledge request. Further, to maximize throughput, many pull calls are executed asynchronously and in parallel. You can achieve the right balance of throughput, efficiency, and latency by adjusting the number of messages or ack IDs per request and by using simultaneous pull requests. Different client libraries offer different means of configuring these parameters.

If you must use synchronous pull calls, you have to decide at what frequency to poll, and whether to wait for messages to arrive before returning an empty response. Higher polling frequency with the returnImmediately option minimizes message latency at the expense of requests that return no messages. Waiting for messages to arrive blocks the thread executing the message. This is particularly problematic on App Engine 1.9, where we recommend push subscribers. Allowing for a high number of messages to be returned maximizes the efficiency of processing.

You may need to implement a scaling mechanism for your subscriber application to keep up with message volume. How to do this depends on your environment, but it will generally be based on backlog metrics offered through the Stackdriver Monitoring service. For details on how to do this for Google Compute Engine, see Scaling based on Cloud Monitoring Metrics.

Search for "PubSub" on the Supported Metrics page of the Cloud Monitoring API to learn which metrics can be monitored programmatically.

Finally, as with all distributed services, expect to occasionally retry every request.

Send feedback about...

Cloud Pub/Sub