Receiving messages using Pull

Cloud Pub/Sub supports both push and pull message delivery. For an overview and comparison of pull and push subscriptions, see the Subscriber Overview. This document describes pull delivery. For a discussion of push delivery, see the Push Subscriber Guide.

Asynchronous Pull

Using asynchronous pulling provides higher throughput in your application, by not requiring your application to block for new messages. Messages can be received in your application using a long running message listener, and acknowledged one message at a time, as shown in the example below. Java, Python, .NET, Go, and Ruby clients use the streamingPull service API to implement the asynchronous client API efficiently.

Not all client libraries support asynchronously pulling messages. To learn about synchronously pulling messages, see Synchronous Pull.

For more information, see the API Reference documentation in your programming language.

C#

Before trying this sample, follow the C# setup instructions in Quickstart: Using Client Libraries. For more information, see the Cloud Pub/Sub C# API reference documentation.

SubscriptionName subscriptionName = new SubscriptionName(projectId,
    subscriptionId);
SubscriberClient subscriber = await SubscriberClient.CreateAsync(
    subscriptionName);
// SubscriberClient runs your message handle function on multiple
// threads to maximize throughput.
subscriber.StartAsync(
    async (PubsubMessage message, CancellationToken cancel) =>
    {
        string text =
            Encoding.UTF8.GetString(message.Data.ToArray());
        await Console.Out.WriteLineAsync(
            $"Message {message.MessageId}: {text}");
        return acknowledge ? SubscriberClient.Reply.Ack
            : SubscriberClient.Reply.Nack;
    });
// Run for 3 seconds.
await Task.Delay(3000);
await subscriber.StopAsync(CancellationToken.None);

go

Before trying this sample, follow the Go setup instructions in Quickstart: Using Client Libraries. For more information, see the Cloud Pub/Sub Go API reference documentation.

import (
	"context"
	"fmt"
	"io"
	"sync"

	"cloud.google.com/go/pubsub"
)

func pullMsgs(w io.Writer, projectID, subName string, topic *pubsub.Topic) error {
	// projectID := "my-project-id"
	// subName := projectID + "-example-sub"
	// topic of type https://godoc.org/cloud.google.com/go/pubsub#Topic
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %v", err)
	}

	// Publish 10 messages on the topic.
	var results []*pubsub.PublishResult
	for i := 0; i < 10; i++ {
		res := topic.Publish(ctx, &pubsub.Message{
			Data: []byte(fmt.Sprintf("hello world #%d", i)),
		})
		results = append(results, res)
	}

	// Check that all messages were published.
	for _, r := range results {
		_, err := r.Get(ctx)
		if err != nil {
			return fmt.Errorf("Get: %v", err)
		}
	}
	// Consume 10 messages.
	var mu sync.Mutex
	received := 0
	sub := client.Subscription(subName)
	cctx, cancel := context.WithCancel(ctx)
	err = sub.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) {
		msg.Ack()
		fmt.Fprintf(w, "Got message: %q\n", string(msg.Data))
		mu.Lock()
		defer mu.Unlock()
		received++
		if received == 10 {
			cancel()
		}
	})
	if err != nil {
		return fmt.Errorf("Receive: %v", err)
	}
	return nil
}

java

Before trying this sample, follow the Java setup instructions in Quickstart: Using Client Libraries. For more information, see the Cloud Pub/Sub Java API reference documentation.

String projectId = "my-project-id";
String subscriptionId = "my-subscription-id";

ProjectSubscriptionName subscriptionName =
    ProjectSubscriptionName.of(projectId, subscriptionId);
// Instantiate an asynchronous message receiver
MessageReceiver receiver =
    new MessageReceiver() {
      @Override
      public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
        // handle incoming message, then ack/nack the received message
        System.out.println("Id : " + message.getMessageId());
        System.out.println("Data : " + message.getData().toStringUtf8());
        consumer.ack();
      }
    };

Subscriber subscriber = null;
try {
  // Create a subscriber for "my-subscription-id" bound to the message receiver
  subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
  subscriber.startAsync().awaitRunning();
  // Allow the subscriber to run indefinitely unless an unrecoverable error occurs
  subscriber.awaitTerminated();
} finally {
  // Stop receiving messages
  if (subscriber != null) {
    subscriber.stopAsync();
  }
}

node.js

Before trying this sample, follow the Node.js setup instructions in Quickstart: Using Client Libraries. For more information, see the Cloud Pub/Sub Node.js API reference documentation.

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client
const pubsub = new PubSub();

/**
 * TODO(developer): Uncomment the following lines to run the sample.
 */
// const subscriptionName = 'my-sub';
// const timeout = 60;

// References an existing subscription
const subscription = pubsub.subscription(subscriptionName);

// Create an event handler to handle messages
let messageCount = 0;
const messageHandler = message => {
  console.log(`Received message ${message.id}:`);
  console.log(`\tData: ${message.data}`);
  console.log(`\tAttributes: ${message.attributes}`);
  messageCount += 1;

  // "Ack" (acknowledge receipt of) the message
  message.ack();
};

// Listen for new messages until timeout is hit
subscription.on(`message`, messageHandler);

setTimeout(() => {
  subscription.removeListener('message', messageHandler);
  console.log(`${messageCount} message(s) received.`);
}, timeout * 1000);

python

Before trying this sample, follow the Python setup instructions in Quickstart: Using Client Libraries. For more information, see the Cloud Pub/Sub Python API reference documentation.

import time

from google.cloud import pubsub_v1

# TODO project_id = "Your Google Cloud Project ID"
# TODO subscription_name = "Your Pub/Sub subscription name"

subscriber = pubsub_v1.SubscriberClient()
# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_name}`
subscription_path = subscriber.subscription_path(
    project_id, subscription_name)

def callback(message):
    print('Received message: {}'.format(message))
    message.ack()

subscriber.subscribe(subscription_path, callback=callback)

# The subscriber is non-blocking. We must keep the main thread from
# exiting to allow it to process messages asynchronously in the background.
print('Listening for messages on {}'.format(subscription_path))
while True:
    time.sleep(60)

Processing Custom Attributes

This sample shows how to pull messages asynchronously and retrieve the custom attributes from metadata:

python

Before trying this sample, follow the Python setup instructions in Quickstart: Using Client Libraries. For more information, see the Cloud Pub/Sub Python API reference documentation.

import time

from google.cloud import pubsub_v1

# TODO project_id = "Your Google Cloud Project ID"
# TODO subscription_name = "Your Pub/Sub subscription name"

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
    project_id, subscription_name)

def callback(message):
    print('Received message: {}'.format(message.data))
    if message.attributes:
        print('Attributes:')
        for key in message.attributes:
            value = message.attributes.get(key)
            print('{}: {}'.format(key, value))
    message.ack()

subscriber.subscribe(subscription_path, callback=callback)

# The subscriber is non-blocking, so we must keep the main thread from
# exiting to allow it to process messages in the background.
print('Listening for messages on {}'.format(subscription_path))
while True:
    time.sleep(60)

ruby

Before trying this sample, follow the Ruby setup instructions in Quickstart: Using Client Libraries. For more information, see the Cloud Pub/Sub Ruby API reference documentation.

# project_id        = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

subscription = pubsub.subscription subscription_name
subscriber   = subscription.listen do |received_message|
  puts "Received message: #{received_message.data}"
  unless received_message.attributes.empty?
    puts "Attributes:"
    received_message.attributes.each do |key, value|
      puts "#{key}: #{value}"
    end
  end
  received_message.acknowledge!
end

subscriber.start
# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60
subscriber.stop.wait!

Listening for Errors

This sample shows how to handle errors that arise when subscribing to messages:

go

Before trying this sample, follow the Go setup instructions in Quickstart: Using Client Libraries. For more information, see the Cloud Pub/Sub Go API reference documentation.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

func pullMsgsError(w io.Writer, projectID, subName string) error {
	// projectID := "my-project-id"
	// subName := projectID + "-example-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %v", err)
	}

	// If the service returns a non-retryable error, Receive returns that error after
	// all of the outstanding calls to the handler have returned.
	err = client.Subscription(subName).Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
		fmt.Fprintf(w, "Got message: %q\n", string(msg.Data))
		msg.Ack()
	})
	if err != nil {
		return fmt.Errorf("Receive: %v", err)
	}
	return nil
}

java

Before trying this sample, follow the Go setup instructions in Quickstart: Using Client Libraries. For more information, see the Cloud Pub/Sub Go API reference documentation.

subscriber.addListener(
    new Subscriber.Listener() {
      public void failed(Subscriber.State from, Throwable failure) {
        // Handle error.
      }
    },
    MoreExecutors.directExecutor());

node.js

Before trying this sample, follow the Node.js setup instructions in Quickstart: Using Client Libraries. For more information, see the Cloud Pub/Sub Node.js API reference documentation.

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client
const pubsub = new PubSub();

/**
 * TODO(developer): Uncomment the following lines to run the sample.
 */
// const subscriptionName = 'my-sub';
// const timeout = 60;

// References an existing subscription
const subscription = pubsub.subscription(subscriptionName);

// Create an event handler to handle messages
const messageHandler = function(message) {
  // Do something with the message
  console.log(`Message: ${message}`);

  // "Ack" (acknowledge receipt of) the message
  message.ack();
};

// Create an event handler to handle errors
const errorHandler = function(error) {
  // Do something with the error
  console.error(`ERROR: ${error}`);
};

// Listen for new messages/errors until timeout is hit
subscription.on(`message`, messageHandler);
subscription.on(`error`, errorHandler);

setTimeout(() => {
  subscription.removeListener(`message`, messageHandler);
  subscription.removeListener(`error`, errorHandler);
}, timeout * 1000);

python

Before trying this sample, follow the Python setup instructions in Quickstart: Using Client Libraries. For more information, see the Cloud Pub/Sub Python API reference documentation.

from google.cloud import pubsub_v1

# TODO project_id        = "Your Google Cloud Project ID"
# TODO subscription_name = "Your Pubsub subscription name"

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
    project_id, subscription_name)

def callback(message):
    print('Received message: {}'.format(message))
    message.ack()

future = subscriber.subscribe(subscription_path, callback=callback)

# Blocks the thread while messages are coming in through the stream. Any
# exceptions that crop up on the thread will be set on the future.
try:
    # When timeout is unspecified, the result method waits indefinitely.
    future.result(timeout=30)
except Exception as e:
    print(
        'Listening for messages on {} threw an Exception: {}.'.format(
            subscription_name, e))

ruby

Before trying this sample, follow the Go setup instructions in Quickstart: Using Client Libraries. For more information, see the Cloud Pub/Sub Go API reference documentation.

# project_id        = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

subscription = pubsub.subscription subscription_name
subscriber   = subscription.listen do |received_message|
  puts "Received message: #{received_message.data}"
  received_message.acknowledge!
end
# Propagate expection from child threads to the main thread as soon as it is
# raised. Exceptions happened in the callback thread are collected in the
# callback thread pool and do not propagate to the main thread
Thread.abort_on_exception = true

begin
  subscriber.start
  # Let the main thread sleep for 60 seconds so the thread for listening
  # messages does not quit
  sleep 60
  subscriber.stop.wait!
rescue Exception => ex
  puts "Exception #{ex.inspect}: #{ex.message}"
  raise "Stopped listening for messages."
end

Message Flow Control

Your subscriber client might process and acknowledge messages more slowly than Cloud Pub/Sub sends them to the client. In this case:

  • It's possible that one client could have a backlog of messages because it doesn't have the capacity to process the volume of incoming messages, but another client on the network does have that capacity. The second client could reduce the subscription's backlog, but it doesn't get the chance to do so because the first client maintains a lease on the messages that it receives. This reduces the overall rate of processing because messages get stuck on the first client.

  • Because the client library repeatedly extends the acknowledgement deadline for backlogged messages, those messages continue to consume memory, CPU, and bandwidth resources. As such, the subscriber client might run out of resources (such as memory). This can adversely impact the throughput and latency of processing messages.

To mitigate the issues above, use the flow control features of the subscriber to control the rate at which the subscriber receives messages. These flow control features are illustrated in the following samples:

C#

Before trying this sample, follow the C# setup instructions in Quickstart: Using Client Libraries. For more information, see the Cloud Pub/Sub C# API reference documentation.

SubscriptionName subscriptionName = new SubscriptionName(projectId,
    subscriptionId);
SubscriberClient subscriber = await SubscriberClient.CreateAsync(
    subscriptionName,
    settings: new SubscriberClient.Settings()
    {
        AckExtensionWindow = TimeSpan.FromSeconds(4),
        Scheduler = Google.Api.Gax.SystemScheduler.Instance,
        AckDeadline = TimeSpan.FromSeconds(10),
        FlowControlSettings = new Google.Api.Gax
            .FlowControlSettings(
            maxOutstandingElementCount: 100,
            maxOutstandingByteCount: 10240)
    });
// SubscriberClient runs your message handle function on multiple
// threads to maximize throughput.
subscriber.StartAsync(
    async (PubsubMessage message, CancellationToken cancel) =>
    {
        string text =
            Encoding.UTF8.GetString(message.Data.ToArray());
        await Console.Out.WriteLineAsync(
            $"Message {message.MessageId}: {text}");
        return acknowledge ? SubscriberClient.Reply.Ack
            : SubscriberClient.Reply.Nack;
    });
// Run for 3 seconds.
await Task.Delay(3000);
await subscriber.StopAsync(CancellationToken.None);

go

Before trying this sample, follow the Go setup instructions in Quickstart: Using Client Libraries. For more information, see the Cloud Pub/Sub Go API reference documentation.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

func pullMsgsSettings(w io.Writer, projectID, subName string) error {
	// projectID := "my-project-id"
	// subName := projectID + "-example-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %v", err)
	}

	sub := client.Subscription(subName)
	sub.ReceiveSettings.Synchronous = true
	sub.ReceiveSettings.MaxOutstandingMessages = 10
	err = sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
		fmt.Fprintf(w, "Got message: %q\n", string(msg.Data))
		msg.Ack()
	})
	if err != nil {
		return fmt.Errorf("Receive: %v", err)
	}
	return nil
}

java

Before trying this sample, follow the Java setup instructions in Quickstart: Using Client Libraries. For more information, see the Cloud Pub/Sub Java API reference documentation.

FlowControlSettings flowControlSettings =
    FlowControlSettings.newBuilder()
        .setMaxOutstandingElementCount(10_000L)
        .setMaxOutstandingRequestBytes(1_000_000_000L)
        .build();
Subscriber subscriber =
    Subscriber.newBuilder(subscriptionName, receiver)
        .setFlowControlSettings(flowControlSettings)
        .build();

node.js

Before trying this sample, follow the Node.js setup instructions in Quickstart: Using Client Libraries. For more information, see the Cloud Pub/Sub Node.js API reference documentation.

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client
const pubsub = new PubSub();

/**
 * TODO(developer): Uncomment the following lines to run the sample.
 */
// const subscriptionName = 'my-sub';
// const maxInProgress = 5;
// const timeout = 10;

const subscriberOptions = {
  flowControl: {
    maxMessages: maxInProgress,
  },
};

// References an existing subscription.
// Note that flow control settings are not persistent across subscribers.
const subscription = pubsub.subscription(subscriptionName, subscriberOptions);

console.log(
  `Subscriber to subscription ${subscription.name} is ready to receive messages at a controlled volume of ${maxInProgress} messages.`
);

const messageHandler = message => {
  console.log(`Received message: ${message.id}`);
  console.log(`\tData: ${message.data}`);
  console.log(`\tAttributes: ${message.attributes}`);

  // "Ack" (acknowledge receipt of) the message
  message.ack();
};

subscription.on(`message`, messageHandler);

setTimeout(() => {
  subscription.close();
}, timeout * 1000);

python

Before trying this sample, follow the Python setup instructions in Quickstart: Using Client Libraries. For more information, see the Cloud Pub/Sub Python API reference documentation.

import time

from google.cloud import pubsub_v1

# TODO project_id = "Your Google Cloud Project ID"
# TODO subscription_name = "Your Pub/Sub subscription name"

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
    project_id, subscription_name)

def callback(message):
    print('Received message: {}'.format(message.data))
    message.ack()

# Limit the subscriber to only have ten outstanding messages at a time.
flow_control = pubsub_v1.types.FlowControl(max_messages=10)
subscriber.subscribe(
    subscription_path, callback=callback, flow_control=flow_control)

# The subscriber is non-blocking, so we must keep the main thread from
# exiting to allow it to process messages in the background.
print('Listening for messages on {}'.format(subscription_path))
while True:
    time.sleep(60)

ruby

Before trying this sample, follow the Ruby setup instructions in Quickstart: Using Client Libraries. For more information, see the Cloud Pub/Sub Ruby API reference documentation.

# project_id        = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

subscription = pubsub.subscription subscription_name
subscriber   = subscription.listen inventory: 10 do |received_message|
  puts "Received message: #{received_message.data}"
  received_message.acknowledge!
end

subscriber.start
# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60
subscriber.stop.wait!

More generally, the need for flow control indicates that messages are being published at a higher rate than they are being consumed. If this is a persistent state, rather than a transient spike in message volume, consider increasing the number of subscriber client instances.

Concurrency Control

Support for concurrency depends on your programming language. For language implementations that support parallel threads, such as Java and Go, the client libraries make a default choice for the number of threads. This choice may not be optimal for your application. For example, if you find that your subscriber application is not keeping up with the incoming message volume but is not CPU-bound, you should increase the thread count. For CPU-intensive message processing operations, reducing the number of threads might be appropriate.

The following sample illustrates how to control concurrency in a subscriber:

java

Before trying this sample, follow the Java setup instructions in Quickstart: Using Client Libraries. For more information, see the Cloud Pub/Sub Java API reference documentation.

// provide a separate executor service for polling
ExecutorProvider executorProvider =
    InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(1).build();

Subscriber subscriber =
    Subscriber.newBuilder(subscriptionName, receiver)
        .setExecutorProvider(executorProvider)
        .build();

ruby

Before trying this sample, follow the Ruby setup instructions in Quickstart: Using Client Libraries. For more information, see the Cloud Pub/Sub Ruby API reference documentation.

# project_id        = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

subscription = pubsub.subscription subscription_name
# Use 2 threads for streaming, 4 threads for executing callbacks and 2 threads
# for sending acknowledgements and/or delays
subscriber   = subscription.listen streams: 2, threads: {
  callback: 4,
  push:     2
} do |received_message|
  puts "Received message: #{received_message.data}"
  received_message.acknowledge!
end

subscriber.start
# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60
subscriber.stop.wait!

Support for concurrency depends on your programming language. Refer to the API Reference documentation for more information.

StreamingPull

The Cloud Pub/Sub service has two APIs for retrieving messages:

Where possible, the Cloud Client libraries use StreamingPull for maximum throughput and lowest latency. Although you may never use the StreamingPull API directly, it is important to understand some crucial properties of StreamingPull and how it differs from the more traditional pull method.

The Pull method relies on a request/response model:

  1. The application sends a request for messages.
  2. The server replies with zero or more messages and closes the connection.

The StreamingPull service API relies on a persistent bidirectional connection to receive multiple messages as they become available, send acknowledgements, and modify acknowledgement deadlines:

  1. The client sends a request to the service to establish a connection.
  2. The client uses that connection to exchange message data.
  3. The request (that is, the bidirectional connection) is terminated either by the client or the server.

StreamingPull has a 100% error rate (this is to be expected)

StreamingPull streams are always terminated with an error code. Note that unlike in regular RPCs, the error here is simply an indication that a stream has been broken, not that requests are failing. Therefore, while the StreamingPull API may have a seemingly surprising 100% error rate, this is by design.

Diagnosing StreamingPull errors

Because StreamingPull streams always terminate with an error, it isn't helpful to examine StreamingPull request metrics while diagnosing errors. Rather, focus on StreamingPull message operation metrics. Look for these errors:

  • FAILED_PRECONDITION errors can occur in these cases:
    • Cloud Pub/Sub attempts to decrypt a message with a disabled Cloud KMS key.
    • The stream shuts down due to a suspended subscription. Subscriptions can be temporarily suspended if there are messages in the subscription backlog that are protected by a disabled Cloud KMS key.
  • UNAVAILABLE errors

StreamingPull: Dealing with large backlogs of small messages

The gRPC StreamingPull stack is optimized for high throughput and therefore buffers messages. This can have some consequences if you are attempting to process large backlogs of small messages (rather than a steady stream of new messages). Under these conditions, you may see messages delivered multiple times and they may not be load balanced effectively across clients.

The buffer between the Cloud Pub/Sub service and the client library user space is roughly 10MB. To understand the impact of this buffer on client library behavior, consider this example:

  • There is a backlog of 10000 1KB messages on a subscription.
  • Each message takes 1 second to process sequentially, by a single-threaded client instance.
  • The first client instance to establish a StreamingPull connection to the service for that subscription will get a buffer of the entire 10K messages.
  • It takes 10000 seconds (almost 3 hours) to process the buffer.
  • In that time, some of the messages exceed their acknowledgement deadline and are re-sent to the same client, resulting in duplicates.
  • When multiple client instances are running, the messages stuck in the buffer will not be available to any instances other than the first.

This situation will not occur if the messages are arriving at a steady rate, rather as a single large batch. The service never has the entire 10MB of messages at a time and so is able to effectively load balance messages across multiple subscribers.

To address this situation, either use a push subscription or a pull API, currently available in some of the Cloud Client Libraries (see the Synchronous Pull section) and all API Client libraries. To learn more, see the Client Libraries documentation.

Synchronous Pull

There are cases when the asynchronous pull is not a perfect fit for your application. For example, the application logic might rely on a polling pattern to retrieve messages or require a precise cap on a number of messages retrieved by the client at any given time. To support such applications, the service and most Client Libraries support a synchronous Pull method.

Here is some sample code to pull and acknowledge a fixed number of messages:

C#

Before trying this sample, follow the C# setup instructions in Quickstart: Using Client Libraries. For more information, see the Cloud Pub/Sub C# API reference documentation.

SubscriptionName subscriptionName = new SubscriptionName(projectId,
    subscriptionId);
SubscriberServiceApiClient subscriberClient =
    SubscriberServiceApiClient.Create();
// Pull messages from server,
// allowing an immediate response if there are no messages.
PullResponse response = subscriberClient.Pull(
    subscriptionName, returnImmediately: true, maxMessages: 20);
// Print out each received message.
foreach (ReceivedMessage msg in response.ReceivedMessages)
{
    string text = Encoding.UTF8.GetString(msg.Message.Data.ToArray());
    Console.WriteLine($"Message {msg.Message.MessageId}: {text}");
}
// If acknowledgement required, send to server.
if (acknowledge)
{
    subscriberClient.Acknowledge(subscriptionName,
        response.ReceivedMessages.Select(msg => msg.AckId));
}

java

Before trying this sample, follow the Java setup instructions in Quickstart: Using Client Libraries. For more information, see the Cloud Pub/Sub Java API reference documentation.

SubscriberStubSettings subscriberStubSettings =
    SubscriberStubSettings.newBuilder()
        .setTransportChannelProvider(
            SubscriberStubSettings.defaultGrpcTransportProviderBuilder()
                .setMaxInboundMessageSize(20 << 20) // 20MB
                .build())
        .build();

try (SubscriberStub subscriber = GrpcSubscriberStub.create(subscriberStubSettings)) {
  // String projectId = "my-project-id";
  // String subscriptionId = "my-subscription-id";
  // int numOfMessages = 10;   // max number of messages to be pulled
  String subscriptionName = ProjectSubscriptionName.format(projectId, subscriptionId);
  PullRequest pullRequest =
      PullRequest.newBuilder()
          .setMaxMessages(numOfMessages)
          .setReturnImmediately(false) // return immediately if messages are not available
          .setSubscription(subscriptionName)
          .build();

  // use pullCallable().futureCall to asynchronously perform this operation
  PullResponse pullResponse = subscriber.pullCallable().call(pullRequest);
  List<String> ackIds = new ArrayList<>();
  for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) {
    // handle received message
    // ...
    ackIds.add(message.getAckId());
  }
  // acknowledge received messages
  AcknowledgeRequest acknowledgeRequest =
      AcknowledgeRequest.newBuilder()
          .setSubscription(subscriptionName)
          .addAllAckIds(ackIds)
          .build();
  // use acknowledgeCallable().futureCall to asynchronously perform this operation
  subscriber.acknowledgeCallable().call(acknowledgeRequest);
  return pullResponse.getReceivedMessagesList();
}

node.js

Before trying this sample, follow the Node.js setup instructions in Quickstart: Using Client Libraries. For more information, see the Cloud Pub/Sub Node.js API reference documentation.

// Imports the Google Cloud client library
const pubsub = require('@google-cloud/pubsub');

const client = new pubsub.v1.SubscriberClient();

/**
 * TODO(developer): Uncomment the following lines to run the sample.
 */
// const projectName = 'your-project';
// const subscriptionName = 'your-subscription';

const formattedSubscription = client.subscriptionPath(
  projectName,
  subscriptionName
);
// The maximum number of messages returned for this request.
// Pub/Sub may return fewer than the number specified.
const maxMessages = 1;
const newAckDeadlineSeconds = 30;
const request = {
  subscription: formattedSubscription,
  maxMessages: maxMessages,
};

let isProcessed = false;

// The worker function is meant to be non-blocking. It starts a long-
// running process, such as writing the message to a table, which may
// take longer than the default 10-sec acknowledge deadline.
function worker(message) {
  console.log(`Processing "${message.message.data}"...`);

  setTimeout(() => {
    console.log(`Finished procesing "${message.message.data}".`);
    isProcessed = true;
  }, 30000);
}

// The subscriber pulls a specified number of messages.
const [response] = await client.pull(request);
// Obtain the first message.
const message = response.receivedMessages[0];
// Send the message to the worker function.
worker(message);

let waiting = true;
while (waiting) {
  await new Promise(r => setTimeout(r, 10000));
  // If the message has been processed..
  if (isProcessed) {
    const ackRequest = {
      subscription: formattedSubscription,
      ackIds: [message.ackId],
    };

    //..acknowledges the message.
    await client.acknowledge(ackRequest);
    console.log(`Acknowledged: "${message.message.data}".`);
    // Exit after the message is acknowledged.
    waiting = false;
    console.log(`Done.`);
  } else {
    // If the message is not yet processed..
    const modifyAckRequest = {
      subscription: formattedSubscription,
      ackIds: [message.ackId],
      ackDeadlineSeconds: newAckDeadlineSeconds,
    };

    //..reset its ack deadline.
    await client.modifyAckDeadline(modifyAckRequest);

    console.log(
      `Reset ack deadline for "${message.message.data}" for ${newAckDeadlineSeconds}s.`
    );
  }
}

php

Before trying this sample, follow the Node.js setup instructions in Quickstart: Using Client Libraries. For more information, see the Cloud Pub/Sub Node.js API reference documentation.

use Google\Cloud\PubSub\PubSubClient;

/**
 * Pulls all Pub/Sub messages for a subscription.
 *
 * @param string $projectId  The Google project ID.
 * @param string $subscriptionName  The Pub/Sub subscription name.
 */
function pull_messages($projectId, $subscriptionName)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);
    $subscription = $pubsub->subscription($subscriptionName);
    foreach ($subscription->pull() as $message) {
        printf('Message: %s' . PHP_EOL, $message->data());
        // Acknowledge the Pub/Sub message has been received, so it will not be pulled multiple times.
        $subscription->acknowledge($message);
    }
}

protocol

Request:

POST https://pubsub.googleapis.com/v1/projects/myproject/subscriptions/mysubscription:pull

{
  "returnImmediately": "false",
  "maxMessages": "1"
}

Response:

200 OK

{
  "receivedMessages": [{
    "ackId": "dQNNHlAbEGEIBERNK0EPKVgUWQYyODM2LwgRHFEZDDsLRk1SK...",
    "message": {
      "data": "SGVsbG8gQ2xvdWQgUHViL1N1YiEgSGVyZSBpcyBteSBtZXNzYWdlIQ==",
      "messageId": "19917247034"
    }
  }]
}

Request:

POST https://pubsub.googleapis.com/v1/projects/myproject/subscriptions/mysubscription:acknowledge

{
  "ackIds": [
    "dQNNHlAbEGEIBERNK0EPKVgUWQYyODM2LwgRHFEZDDsLRk1SK..."
  ]
}

python

Before trying this sample, follow the Python setup instructions in Quickstart: Using Client Libraries. For more information, see the Cloud Pub/Sub Python API reference documentation.

import logging
import multiprocessing
import random
import time

from google.cloud import pubsub_v1

# TODO project_id = "Your Google Cloud Project ID"
# TODO subscription_name = "Your Pub/Sub subscription name"

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
    project_id, subscription_name)

NUM_MESSAGES = 2
ACK_DEADLINE = 30
SLEEP_TIME = 10

# The subscriber pulls a specific number of messages.
response = subscriber.pull(subscription_path, max_messages=NUM_MESSAGES)

multiprocessing.log_to_stderr()
logger = multiprocessing.get_logger()
logger.setLevel(logging.INFO)

def worker(msg):
    """Simulates a long-running process."""
    RUN_TIME = random.randint(1, 60)
    logger.info('{}: Running {} for {}s'.format(
        time.strftime("%X", time.gmtime()), msg.message.data, RUN_TIME))
    time.sleep(RUN_TIME)

# `processes` stores process as key and ack id and message as values.
processes = dict()
for message in response.received_messages:
    process = multiprocessing.Process(target=worker, args=(message,))
    processes[process] = (message.ack_id, message.message.data)
    process.start()

while processes:
    for process in list(processes):
        ack_id, msg_data = processes[process]
        # If the process is still running, reset the ack deadline as
        # specified by ACK_DEADLINE once every while as specified
        # by SLEEP_TIME.
        if process.is_alive():
            # `ack_deadline_seconds` must be between 10 to 600.
            subscriber.modify_ack_deadline(
                subscription_path,
                [ack_id],
                ack_deadline_seconds=ACK_DEADLINE)
            logger.info('{}: Reset ack deadline for {} for {}s'.format(
                time.strftime("%X", time.gmtime()),
                msg_data, ACK_DEADLINE))

        # If the processs is finished, acknowledges using `ack_id`.
        else:
            subscriber.acknowledge(subscription_path, [ack_id])
            logger.info("{}: Acknowledged {}".format(
                time.strftime("%X", time.gmtime()), msg_data))
            processes.pop(process)

    # If there are still processes running, sleeps the thread.
    if processes:
        time.sleep(SLEEP_TIME)

print('Received and acknowledged {} messages. Done.'.format(
    len(response.received_messages)))

ruby

Before trying this sample, follow the Ruby setup instructions in Quickstart: Using Client Libraries. For more information, see the Cloud Pub/Sub Ruby API reference documentation.

# project_id        = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

subscription = pubsub.subscription subscription_name
subscription.pull.each do |message|
  puts "Message pulled: #{message.data}"
  message.acknowledge!
end

Note that to achieve low message delivery latency with synchronous pull, it is important to have many simultaneously outstanding pull requests. As the throughput of the topic increases, more pull requests are necessary. In general, asynchronous pull is preferable for latency-sensitive applications.

Not all client libraries support synchronously pulling a fixed number of messages. For details, see the API Reference documentation.

Scaling

You may need to implement a scaling mechanism for your subscriber application to keep up with message volume. How to do this depends on your environment, but it will generally be based on backlog metrics offered through the Stackdriver monitoring service. For details on how to do this for Compute Engine, see Scaling based on Cloud Monitoring Metrics.

Go to the Cloud Pub/Sub section of the GCP Metrics List page to learn which metrics can be monitored programmatically.

Finally, as with all distributed services, expect to occasionally retry every request.

Dealing with duplicates and forcing retries

When you do not acknowledge a message before its acknowledgement deadline has expired, Cloud Pub/Sub resends the message. As a result, Cloud Pub/Sub can send duplicate messages. Use Stackdriver to monitor acknowledge operations with the expired response code to detect this condition. To get this data, select the Acknowledge message operations metric, then group or filter it by the response_code label. Note that response_code is a system label on a metric—it is not a metric.

Use Stackdriver to to search for expired message acknowledgement deadlines

To reduce the duplication rate, extend the message deadline.

  • Client libraries handle deadline extension automatically, but you should note that there are default limits on the maximum extension deadline that can be configured.
  • If you are building your own client library, use the modifyAckDeadline method to extend the acknowledgement deadline.

Alternately, to force Cloud Pub/Sub to retry a message, set modifyAckDeadline to 0.

¿Te ha resultado útil esta página? Enviar comentarios:

Enviar comentarios sobre...

Cloud Pub/Sub Documentation