Subscribe with custom attributes

Uses asynchronous pull to receive messages with custom attributes.

Documentation pages that include this code sample

To view the code sample used in context, see the following documentation:

Code Sample


Before trying this sample, follow the C# setup instructions in the Pub/Sub Quickstart Using Client Libraries. For more information, see the Pub/Sub C# API reference documentation.

using Google.Cloud.PubSub.V1;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

public class PullMessagesWithCustomAttributesAsyncSample
    public async Task<List<PubsubMessage>> PullMessagesWithCustomAttributesAsync(string projectId, string subscriptionId, bool acknowledge)
        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);

        SubscriberClient subscriber = await SubscriberClient.CreateAsync(subscriptionName);
        var messages = new List<PubsubMessage>();
        Task startTask = subscriber.StartAsync((PubsubMessage message, CancellationToken cancel) =>
            string text = Encoding.UTF8.GetString(message.Data.ToArray());
            Console.WriteLine($"Message {message.MessageId}: {text}");
            if (message.Attributes != null)
                foreach (var attribute in message.Attributes)
                    Console.WriteLine($"{attribute.Key} = {attribute.Value}");
            return Task.FromResult(acknowledge ? SubscriberClient.Reply.Ack : SubscriberClient.Reply.Nack);
        // Run for 7 seconds.
        await Task.Delay(7000);
        await subscriber.StopAsync(CancellationToken.None);
        // Lets make sure that the start task finished successfully after the call to stop.
        await startTask;
        return messages;


Before trying this sample, follow the C++ setup instructions in the Pub/Sub Quickstart Using Client Libraries. For more information, see the Pub/Sub C++ API reference documentation.

namespace pubsub = google::cloud::pubsub;
using google::cloud::future;
using google::cloud::StatusOr;
[](pubsub::Subscriber subscriber) {
  std::mutex mu;
  std::condition_variable cv;
  int message_count = 0;
  auto session = subscriber.Subscribe(
      [&](pubsub::Message const& m, pubsub::AckHandler h) {
        std::cout << "Received message with attributes:\n";
        for (auto const& kv : m.attributes()) {
          std::cout << "  " << kv.first << ": " << kv.second << "\n";
        std::unique_lock<std::mutex> lk(mu);
  // Most applications would just release the `session` object at this point,
  // but we want to gracefully close down this example.
  std::unique_lock<std::mutex> lk(mu);
  cv.wait(lk, [&message_count] { return message_count > 0; });
  auto status = session.get();
  std::cout << "Message count: " << message_count << ", status: " << status
            << "\n";


Before trying this sample, follow the Go setup instructions in the Pub/Sub Quickstart Using Client Libraries. For more information, see the Pub/Sub Go API reference documentation.

import (


func pullMsgsCustomAttributes(w io.Writer, projectID, subID string) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %v", err)
	defer client.Close()

	sub := client.Subscription(subID)

	// Receive messages for 10 seconds.
	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
	defer cancel()

	// Create a channel to handle messages to as they come in.
	cm := make(chan *pubsub.Message)
	defer close(cm)

	// Handle individual messages in a goroutine.
	go func() {
		for msg := range cm {
			fmt.Fprintf(w, "Got message :%q\n", string(msg.Data))
			fmt.Fprintln(w, "Attributes:")
			for key, value := range msg.Attributes {
				fmt.Fprintf(w, "%s = %s", key, value)

	// Receive blocks until the context is cancelled or an error occurs.
	err = sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
		cm <- msg
	if err != nil {
		return fmt.Errorf("Receive: %v", err)

	return nil


Before trying this sample, follow the Java setup instructions in the Pub/Sub Quickstart Using Client Libraries. For more information, see the Pub/Sub Java API reference documentation.

import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class SubscribeWithCustomAttributesExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";

    subscribeWithCustomAttributesExample(projectId, subscriptionId);

  public static void subscribeWithCustomAttributesExample(String projectId, String subscriptionId) {
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    // Instantiate an asynchronous message receiver.
    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          // Handle incoming message, then ack the received message.
          System.out.println("Id: " + message.getMessageId());
          System.out.println("Data: " + message.getData().toStringUtf8());
          // Print message attributes.
              .forEach((key, value) -> System.out.println(key + " = " + value));

    Subscriber subscriber = null;
    try {
      subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
      // Start the subscriber.
      System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
      // Allow the subscriber to run for 30s unless an unrecoverable error occurs.
      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
    } catch (TimeoutException timeoutException) {
      // Shut down the subscriber after 30s. Stop receiving messages.


Before trying this sample, follow the Node.js setup instructions in the Pub/Sub Quickstart Using Client Libraries. For more information, see the Pub/Sub Node.js API reference documentation.

 * TODO(developer): Uncomment these variables before running the sample.
// const subscriptionName = 'YOUR_SUBSCRIPTION_NAME';
// const timeout = 60;

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function listenWithCustomAttributes() {
  // References an existing subscription, e.g. "my-subscription"
  const subscription = pubSubClient.subscription(subscriptionName);

  // Create an event handler to handle messages
  const messageHandler = message => {
      `Received message: id ${}, data ${
      }, attributes: ${JSON.stringify(message.attributes)}`

    // "Ack" (acknowledge receipt of) the message

  // Listen for new messages until timeout is hit
  subscription.on('message', messageHandler);
  setTimeout(() => {
    subscription.removeListener('message', messageHandler);
  }, timeout * 1000);



Before trying this sample, follow the Python setup instructions in the Pub/Sub Quickstart Using Client Libraries. For more information, see the Pub/Sub Python API reference documentation.

from concurrent.futures import TimeoutError
from import pubsub_v1

# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# Number of seconds the subscriber should listen for messages
# timeout = 5.0

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)

def callback(message):
    print(f"Received {}.")
    if message.attributes:
        for key in message.attributes:
            value = message.attributes.get(key)
            print(f"{key}: {value}")

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..\n")

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
        # When `timeout` is not set, result() will block indefinitely,
        # unless an exception is encountered first.
    except TimeoutError:


Before trying this sample, follow the Ruby setup instructions in the Pub/Sub Quickstart Using Client Libraries. For more information, see the Pub/Sub Ruby API reference documentation.

# subscription_name = "Your Pubsub subscription name"
require "google/cloud/pubsub"

pubsub =

subscription = pubsub.subscription subscription_name
subscriber   = subscription.listen do |received_message|
  puts "Received message: #{}"
  unless received_message.attributes.empty?
    puts "Attributes:"
    received_message.attributes.each do |key, value|
      puts "#{key}: #{value}"

# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60