Pull Subscriber Guide

Google Cloud Pub/Sub supports both push and pull message delivery. For an overview and comparison of pull and push subscriptions, see the Subscriber Overview. This document describes pull delivery. For a discussion of push delivery, see the Push Subscriber Guide.

Receiving Messages

Here is some sample code to pull and acknowledge a message one at a time:

Protocol

Request:

POST https://pubsub.googleapis.com/v1/projects/myproject/subscriptions/mysubscription:pull
{
  "returnImmediately": "false",
  "maxMessages": "1"
}

Response:

200 OK
{
  "receivedMessages": [
    {
      "ackId": "dQNNHlAbEGEIBERNK0EPKVgUWQYyODM2LwgRHFEZDDsLRk1SK...",
      "message": {
        "data": "SGVsbG8gQ2xvdWQgUHViL1N1YiEgSGVyZSBpcyBteSBtZXNzYWdlIQ==",
        "messageId": "19917247034"
      }
    }
  ]
}

Request:

POST https://pubsub.googleapis.com/v1/projects/myproject/subscriptions/mysubscription:acknowledge
{
  "ackIds": [
    "dQNNHlAbEGEIBERNK0EPKVgUWQYyODM2LwgRHFEZDDsLRk1SK..."
  ]
}

Response:

200 OK

C#

For more on installing and creating a Cloud Pub/Sub client, refer to Cloud Pub/Sub Client Libraries.

SubscriptionName subscriptionName = new SubscriptionName(projectId,
    subscriptionId);
PullResponse response = subscriber.Pull(subscriptionName,
    returnImmediately: true, maxMessages: 10);
subscriber.Acknowledge(subscriptionName,
    response.ReceivedMessages.Select(m => m.AckId));

Go

For more on installing and creating a Cloud Pub/Sub client, refer to Cloud Pub/Sub Client Libraries.

// Consume 10 messages.
var mu sync.Mutex
received := 0
sub := client.Subscription(name)
cctx, cancel := context.WithCancel(ctx)
err := sub.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) {
	mu.Lock()
	defer mu.Unlock()
	received++
	if received >= 10 {
		cancel()
		msg.Nack()
		return
	}
	fmt.Printf("Got message: %q\n", string(msg.Data))
	msg.Ack()
})
if err != nil {
	return err
}

Java

For more on installing and creating a Cloud Pub/Sub client, refer to Cloud Pub/Sub Client Libraries.

String projectId = "my-project-id";
String subscriptionId = "my-subscription-id";

SubscriptionName subscriptionName = SubscriptionName.create(projectId, subscriptionId);
// Instantiate an asynchronous message receiver
MessageReceiver receiver = new MessageReceiver() {
      @Override
      public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
        // handle incoming message, then ack/nack the received message
        System.out.println("Id : " + message.getMessageId());
        System.out.println("Data : " + message.getData().toStringUtf8());
        consumer.ack();
      }
    };

Subscriber subscriber = null;
try {
  // Create a subscriber for "my-subscription-id" bound to the message receiver
  subscriber = Subscriber.defaultBuilder(subscriptionName, receiver).build();
  subscriber.startAsync();
  // ...
} finally {
  // stop receiving messages
  if (subscriber != null) {
    subscriber.stopAsync();
  }
}

Node.js

For more on installing and creating a Cloud Pub/Sub client, refer to Cloud Pub/Sub Client Libraries.

function pullMessages (subscriptionName) {
  // Instantiates a client
  const pubsub = PubSub();

  // References an existing subscription, e.g. "my-subscription"
  const subscription = pubsub.subscription(subscriptionName);

  // Pulls messages. Set returnImmediately to false to block until messages are
  // received.
  return subscription.pull()
    .then((results) => {
      const messages = results[0];

      console.log(`Received ${messages.length} messages.`);

      messages.forEach((message) => {
        console.log(`* %d %j %j`, message.id, message.data, message.attributes);
      });

      // Acknowledges received messages. If you do not acknowledge, Pub/Sub will
      // redeliver the message.
      return subscription.ack(messages.map((message) => message.ackId));
    });
}

PHP

For more on installing and creating a Cloud Pub/Sub client, refer to Cloud Pub/Sub Client Libraries.

use Google\Cloud\PubSub\PubSubClient;

/**
 * Pulls all Pub/Sub messages for a subscription.
 *
 * @param string $projectId  The Google project ID.
 * @param string $subscriptionName  The Pub/Sub subscription name.
 */
function pull_messages($projectId, $subscriptionName)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);
    $subscription = $pubsub->subscription($subscriptionName);
    foreach ($subscription->pull() as $message) {
        printf('Message: %s' . PHP_EOL, $message->data());
        // Acknowledge the Pub/Sub message has been received, so it will not be pulled multiple times.
        $subscription->acknowledge($message);
    }
}

Python

For more on installing and creating a Cloud Pub/Sub client, refer to Cloud Pub/Sub Client Libraries.

def receive_message(topic_name, subscription_name):
    """Receives a message from a pull subscription."""
    pubsub_client = pubsub.Client()
    topic = pubsub_client.topic(topic_name)
    subscription = topic.subscription(subscription_name)

    # Change return_immediately=False to block until messages are
    # received.
    results = subscription.pull(return_immediately=True)

    print('Received {} messages.'.format(len(results)))

    for ack_id, message in results:
        print('* {}: {}, {}'.format(
            message.message_id, message.data, message.attributes))

    # Acknowledge received messages. If you do not acknowledge, Pub/Sub will
    # redeliver the message.
    if results:
        subscription.acknowledge([ack_id for ack_id, message in results])

Ruby

For more on installing and creating a Cloud Pub/Sub client, refer to Cloud Pub/Sub Client Libraries.

pubsub       = Google::Cloud::Pubsub.new project: "my-gcp-project-id"
subscription = pubsub.subscription "my-subscription"

puts "Messages pulled:"
subscription.pull.each do |message|
  puts message.data
  message.acknowledge!
end

Listening for Errors

This sample shows how to handle errors that arise when subscribing to messages.

Java

For more on installing and creating a Cloud Pub/Sub client, refer to Cloud Pub/Sub Client Libraries.

subscriber.addListener(new Subscriber.Listener() {
  public void failed(Subscriber.State from, Throwable failure) {
    // Handle error.
  }
}, MoreExecutors.directExecutor());

Go

For more on installing and creating a Cloud Pub/Sub client, refer to Cloud Pub/Sub Client Libraries.

// If the service returns a non-retryable error, Receive returns that error after
// all of the outstanding calls to the handler have returned.
err := client.Subscription(name).Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
	fmt.Printf("Got message: %q\n", string(msg.Data))
	msg.Ack()
})
if err != nil {
	return err
}

Advanced Configuration

You can control the rate at which a subscriber retrieves messages by using the flow control features of the subscriber, as illustrated in this sample:

Java

For more on installing and creating a Cloud Pub/Sub client, refer to Cloud Pub/Sub Client Libraries.

int maxMessageCount = 10;
// Configure max number of messages to be pulled
FlowControlSettings flowControlSettings = FlowControlSettings.newBuilder()
    .setMaxOutstandingElementCount(maxMessageCount)
    .build();
Subscriber subscriber = Subscriber.defaultBuilder(subscriptionName, receiver)
    .setFlowControlSettings(flowControlSettings)
    .build();

Go

For more on installing and creating a Cloud Pub/Sub client, refer to Cloud Pub/Sub Client Libraries.

sub := client.Subscription(name)
sub.ReceiveSettings.MaxOutstandingMessages = 10
err := sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
	fmt.Printf("Got message: %q\n", string(msg.Data))
	msg.Ack()
})
if err != nil {
	return err
}

For languages that natively support threading, you have options to control this as well. The following sample illustrates how to create a single thread subscriber.

Java

For more on installing and creating a Cloud Pub/Sub client, refer to Cloud Pub/Sub Client Libraries.

// provide a separate executor service for polling
ExecutorProvider executorProvider = InstantiatingExecutorProvider.newBuilder()
    .setExecutorThreadCount(1).build();

Subscriber subscriber = Subscriber.defaultBuilder(subscriptionName, receiver)
    .setExecutorProvider(executorProvider)
    .build();

Scaling

You may need to implement a scaling mechanism for your subscriber application to keep up with message volume. How to do this depends on your environment, but it will generally be based on backlog metrics offered through the Stackdriver Monitoring service. For details on how to do this for Google Compute Engine, see Scaling based on Cloud Monitoring Metrics.

Search for "PubSub" on the Supported Metrics page of the Cloud Monitoring API to learn which metrics can be monitored programmatically.

Finally, as with all distributed services, expect to occasionally retry every request.

Send feedback about...

Cloud Pub/Sub