Pull Subscriber Guide

Google Cloud Pub/Sub supports both push and pull message delivery. For an overview and comparison of pull and push subscriptions, see the Subscriber Overview. This document describes pull delivery. For a discussion of push delivery, see the Push Subscriber Guide.

Asynchronous Pull

Using asynchronous pulling provides higher throughput in your application, by not requiring your application to block for new messages. Messages can be received in your application using a long running message listener, and acknowledged one message at a time, as shown in the example below.

Not all client libraries support asynchronously pulling messages. To learn about synchronously pulling messages, see Synchronous Pull.

For more information, see the API Reference documentation in your programming language.

Go

For more on installing and creating a Cloud Pub/Sub client, refer to Cloud Pub/Sub Client Libraries.

// Consume 10 messages.
var mu sync.Mutex
received := 0
sub := client.Subscription(name)
cctx, cancel := context.WithCancel(ctx)
err := sub.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) {
	mu.Lock()
	defer mu.Unlock()
	received++
	if received >= 10 {
		cancel()
		msg.Nack()
		return
	}
	fmt.Printf("Got message: %q\n", string(msg.Data))
	msg.Ack()
})
if err != nil {
	return err
}

Java

For more on installing and creating a Cloud Pub/Sub client, refer to Cloud Pub/Sub Client Libraries.

String projectId = "my-project-id";
String subscriptionId = "my-subscription-id";

SubscriptionName subscriptionName = SubscriptionName.create(projectId, subscriptionId);
// Instantiate an asynchronous message receiver
MessageReceiver receiver =
    new MessageReceiver() {
      @Override
      public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
        // handle incoming message, then ack/nack the received message
        System.out.println("Id : " + message.getMessageId());
        System.out.println("Data : " + message.getData().toStringUtf8());
        consumer.ack();
      }
    };

Subscriber subscriber = null;
try {
  // Create a subscriber for "my-subscription-id" bound to the message receiver
  subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
  subscriber.startAsync();
  // ...
} finally {
  // stop receiving messages
  if (subscriber != null) {
    subscriber.stopAsync();
  }
}

Python

For more on installing and creating a Cloud Pub/Sub client, refer to Cloud Pub/Sub Client Libraries.

def receive_messages(project, subscription_name):
    """Receives messages from a pull subscription."""
    subscriber = pubsub_v1.SubscriberClient()
    subscription_path = subscriber.subscription_path(
        project, subscription_name)

    def callback(message):
        print('Received message: {}'.format(message))
        message.ack()

    subscriber.subscribe(subscription_path, callback=callback)

    # The subscriber is non-blocking, so we must keep the main thread from
    # exiting to allow it to process messages in the background.
    print('Listening for messages on {}'.format(subscription_path))
    while True:
        time.sleep(60)

Node.js

For more on installing and creating a Cloud Pub/Sub client, refer to Cloud Pub/Sub Client Libraries.

function listenForMessages (subscriptionName, timeout) {
  // Instantiates a client
  const pubsub = PubSub();

  // References an existing subscription, e.g. "my-subscription"
  const subscription = pubsub.subscription(subscriptionName);

  // Create an event handler to handle messages
  let messageCount = 0;
  const messageHandler = (message) => {
    console.log(`Received message ${message.id}:`);
    console.log(`\tData: ${message.data}`);
    console.log(`\tAttributes: ${message.attributes}`);
    messageCount += 1;

    // "Ack" (acknowledge receipt of) the message
    message.ack();
  };

  // Listen for new messages until timeout is hit
  subscription.on(`message`, messageHandler);
  setTimeout(() => {
    subscription.removeListener('message', messageHandler);
    console.log(`${messageCount} message(s) received.`);
  }, timeout * 1000);
}

Listening for Errors

This sample shows how to handle errors that arise when subscribing to messages:

Java

For more on installing and creating a Cloud Pub/Sub client, refer to Cloud Pub/Sub Client Libraries.

subscriber.addListener(
    new Subscriber.Listener() {
      public void failed(Subscriber.State from, Throwable failure) {
        // Handle error.
      }
    },
    MoreExecutors.directExecutor());

Go

For more on installing and creating a Cloud Pub/Sub client, refer to Cloud Pub/Sub Client Libraries.

// If the service returns a non-retryable error, Receive returns that error after
// all of the outstanding calls to the handler have returned.
err := client.Subscription(name).Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
	fmt.Printf("Got message: %q\n", string(msg.Data))
	msg.Ack()
})
if err != nil {
	return err
}

Node.js

For more on installing and creating a Cloud Pub/Sub client, refer to Cloud Pub/Sub Client Libraries.

function listenForErrors (subscriptionName, timeout) {
  // Instantiates a client
  const pubsub = PubSub();

  // References an existing subscription, e.g. "my-subscription"
  const subscription = pubsub.subscription(subscriptionName);

  // Create an event handler to handle messages
  const messageHandler = function (message) {
    // Do something with the message
    console.log(`Message: ${message}`);

    // "Ack" (acknowledge receipt of) the message
    message.ack();
  };

  // Create an event handler to handle errors
  const errorHandler = function (error) {
    // Do something with the error
    console.error(`ERROR: ${error}`);
  };

  // Listen for new messages/errors until timeout is hit
  return new Promise((resolve) => {
    subscription.on(`message`, messageHandler);
    subscription.on(`error`, errorHandler);
    setTimeout(() => {
      subscription.removeListener(`message`, messageHandler);
      subscription.removeListener(`error`, errorHandler);
      resolve();
    }, timeout * 1000);
  });
}

Message Flow Control

You can control the rate at which a subscriber retrieves messages by using the flow control features of the subscriber, as illustrated in this sample:

C#

For more on installing and creating a Cloud Pub/Sub client, refer to Cloud Pub/Sub Client Libraries.

SubscriptionName subscriptionName = new SubscriptionName(projectId,
    subscriptionId);
SubscriberClient subscriberClient = SubscriberClient.Create();
SimpleSubscriber subscriber = SimpleSubscriber.Create(
    subscriptionName, new[] { subscriberClient },
    new SimpleSubscriber.Settings()
    {
        AckExtensionWindow = TimeSpan.FromSeconds(4),
        Scheduler = Google.Api.Gax.SystemScheduler.Instance,
        StreamAckDeadline = TimeSpan.FromSeconds(10),
        FlowControlSettings = new Google.Api.Gax
            .FlowControlSettings(
            maxOutstandingElementCount: 100,
            maxOutstandardByteCount: 10240)
    });
// SimpleSubscriber runs your message handle function on multiple
// threads to maximize throughput.
subscriber.StartAsync(
    async (PubsubMessage message, CancellationToken cancel) =>
    {
        string text =
            Encoding.UTF8.GetString(message.Data.ToArray());
        await Console.Out.WriteLineAsync(
            $"Message {message.MessageId}: {text}");
        return acknowledge ? SimpleSubscriber.Reply.Ack
            : SimpleSubscriber.Reply.Nack;
    });
// Run for 3 seconds.
Thread.Sleep(3000);
subscriber.StopAsync(CancellationToken.None).Wait();

Java

For more on installing and creating a Cloud Pub/Sub client, refer to Cloud Pub/Sub Client Libraries.

long maxMessageCount = 10L;
// Configure max number of messages to be pulled
FlowControlSettings flowControlSettings =
    FlowControlSettings.newBuilder().setMaxOutstandingElementCount(maxMessageCount).build();
Subscriber subscriber =
    Subscriber.newBuilder(subscriptionName, receiver)
        .setFlowControlSettings(flowControlSettings)
        .build();

Go

For more on installing and creating a Cloud Pub/Sub client, refer to Cloud Pub/Sub Client Libraries.

sub := client.Subscription(name)
sub.ReceiveSettings.MaxOutstandingMessages = 10
err := sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
	fmt.Printf("Got message: %q\n", string(msg.Data))
	msg.Ack()
})
if err != nil {
	return err
}

Python

For more on installing and creating a Cloud Pub/Sub client, refer to Cloud Pub/Sub Client Libraries.

def receive_messages_with_flow_control(project, subscription_name):
    """Receives messages from a pull subscription with flow control."""
    subscriber = pubsub_v1.SubscriberClient()
    subscription_path = subscriber.subscription_path(
        project, subscription_name)

    def callback(message):
        print('Received message: {}'.format(message))
        message.ack()

    # Limit the subscriber to only have ten outstanding messages at a time.
    flow_control = pubsub_v1.types.FlowControl(max_messages=10)
    subscriber.subscribe(
        subscription_path, callback=callback, flow_control=flow_control)

    # The subscriber is non-blocking, so we must keep the main thread from
    # exiting to allow it to process messages in the background.
    print('Listening for messages on {}'.format(subscription_path))
    while True:
        time.sleep(60)

Node.js

For more on installing and creating a Cloud Pub/Sub client, refer to Cloud Pub/Sub Client Libraries.

function createFlowControlledSubscription (topicName, subscriptionName, maxInProgress, maxBytes) {
  // Instantiates a client
  const pubsub = PubSub();

  // References an existing topic, e.g. "my-topic"
  const topic = pubsub.topic(topicName);

  // Creates a new subscription, e.g. "my-new-subscription"
  // Note that flow control configurations are not persistent
  return topic.createSubscription(subscriptionName, {
    flowControl: {
      maxBytes: maxBytes,
      maxMessages: maxInProgress
    }
  })
    .then((results) => {
      const subscription = results[0];

      console.log(`Subscription ${subscription.name} created with a maximum of ${maxInProgress} unprocessed messages.`);

      return subscription;
    });
}

Concurrency Control

Support for concurrency depends on your programming language. Refer to the API Reference documentation for more information.

The following sample illustrates how to control concurrency in a subscriber:

Java

For more on installing and creating a Cloud Pub/Sub client, refer to Cloud Pub/Sub Client Libraries.

// provide a separate executor service for polling
ExecutorProvider executorProvider =
    InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(1).build();

Subscriber subscriber =
    Subscriber.newBuilder(subscriptionName, receiver)
        .setExecutorProvider(executorProvider)
        .build();

Synchronous Pull

If you require constant polling of new messages, the asynchronous pull (if available in your choice of programming language) might be a better fit to achieve better throughput. Synchronous pull is better suited to workloads that do not require handling of messages as soon as they are published. A synchronous subscriber does not need to keep a long-running connection alive and can choose to pull and handle a fixed number of messages. The subscriber times out if no messages are currently available to be handled. Not all client libraries support synchronously pulling a fixed number of messages. See the API Reference documentation for your chosen programming language for details.

Here is some sample code to pull and acknowledge a fixed number of messages:

Protocol

Request:

POST https://pubsub.googleapis.com/v1/projects/myproject/subscriptions/mysubscription:pull
{
  "returnImmediately": "false",
  "maxMessages": "1"
}

Response:

200 OK
{
  "receivedMessages": [
    {
      "ackId": "dQNNHlAbEGEIBERNK0EPKVgUWQYyODM2LwgRHFEZDDsLRk1SK...",
      "message": {
        "data": "SGVsbG8gQ2xvdWQgUHViL1N1YiEgSGVyZSBpcyBteSBtZXNzYWdlIQ==",
        "messageId": "19917247034"
      }
    }
  ]
}

Request:

POST https://pubsub.googleapis.com/v1/projects/myproject/subscriptions/mysubscription:acknowledge
{
  "ackIds": [
    "dQNNHlAbEGEIBERNK0EPKVgUWQYyODM2LwgRHFEZDDsLRk1SK..."
  ]
}

Response:

200 OK

C#

For more on installing and creating a Cloud Pub/Sub client, refer to Cloud Pub/Sub Client Libraries.

SubscriptionName subscriptionName = new SubscriptionName(projectId,
    subscriptionId);
SubscriberClient subscriberClient = SubscriberClient.Create();
SimpleSubscriber subscriber = SimpleSubscriber.Create(
    subscriptionName, new[] { subscriberClient });
// SimpleSubscriber runs your message handle function on multiple
// threads to maximize throughput.
subscriber.StartAsync(
    async (PubsubMessage message, CancellationToken cancel) =>
    {
        string text =
            Encoding.UTF8.GetString(message.Data.ToArray());
        await Console.Out.WriteLineAsync(
            $"Message {message.MessageId}: {text}");
        return acknowledge ? SimpleSubscriber.Reply.Ack
            : SimpleSubscriber.Reply.Nack;
    });
// Run for 3 seconds.
Thread.Sleep(3000);
subscriber.StopAsync(CancellationToken.None).Wait();

Java

For more on installing and creating a Cloud Pub/Sub client, refer to Cloud Pub/Sub Client Libraries.

SubscriptionAdminSettings subscriptionAdminSettings =
    SubscriptionAdminSettings.newBuilder().build();
try (SubscriberStub subscriber = GrpcSubscriberStub.create(subscriptionAdminSettings)) {
  // String projectId = "my-project-id";
  // String subscriptionId = "my-subscription-id";
  // int numOfMessages = 10;   // max number of messages to be pulled
  String subscriptionName = SubscriptionName.create(projectId, subscriptionId).toString();
  PullRequest pullRequest =
      PullRequest.newBuilder()
          .setMaxMessages(numOfMessages)
          .setReturnImmediately(false) // return immediately if messages are not available
          .setSubscription(subscriptionName)
          .build();

  // use pullCallable().futureCall to asynchronously perform this operation
  PullResponse pullResponse = subscriber.pullCallable().call(pullRequest);
  List<String> ackIds = new ArrayList<>();
  for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) {
    // handle received message
    // ...
    ackIds.add(message.getAckId());
  }
  // acknowledge received messages
  AcknowledgeRequest acknowledgeRequest =
      AcknowledgeRequest.newBuilder()
          .setSubscription(subscriptionName)
          .addAllAckIds(ackIds)
          .build();
  // use acknowledgeCallable().futureCall to asynchronously perform this operation
  subscriber.acknowledgeCallable().call(acknowledgeRequest);
  return pullResponse.getReceivedMessagesList();
}

PHP

For more on installing and creating a Cloud Pub/Sub client, refer to Cloud Pub/Sub Client Libraries.

use Google\Cloud\PubSub\PubSubClient;

/**
 * Pulls all Pub/Sub messages for a subscription.
 *
 * @param string $projectId  The Google project ID.
 * @param string $subscriptionName  The Pub/Sub subscription name.
 */
function pull_messages($projectId, $subscriptionName)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);
    $subscription = $pubsub->subscription($subscriptionName);
    foreach ($subscription->pull() as $message) {
        printf('Message: %s' . PHP_EOL, $message->data());
        // Acknowledge the Pub/Sub message has been received, so it will not be pulled multiple times.
        $subscription->acknowledge($message);
    }
}

Ruby

For more on installing and creating a Cloud Pub/Sub client, refer to Cloud Pub/Sub Client Libraries.

pubsub       = Google::Cloud::Pubsub.new project: "my-gcp-project-id"
subscription = pubsub.subscription "my-subscription"

puts "Messages pulled:"
subscription.pull.each do |message|
  puts message.data
  message.acknowledge!
end

Scaling

You may need to implement a scaling mechanism for your subscriber application to keep up with message volume. How to do this depends on your environment, but it will generally be based on backlog metrics offered through the Stackdriver Monitoring service. For details on how to do this for Google Compute Engine, see Scaling based on Cloud Monitoring Metrics.

Search for "PubSub" on the Supported Metrics page of the Cloud Monitoring API to learn which metrics can be monitored programmatically.

Finally, as with all distributed services, expect to occasionally retry every request.

Monitor your resources on the go

Get the Google Cloud Console app to help you manage your projects.

Send feedback about...

Cloud Pub/Sub