Subscribe with custom attributes

Stay organized with collections Save and categorize content based on your preferences.

Uses asynchronous pull to receive messages with custom attributes.

Explore further

For detailed documentation that includes this code sample, see the following:

Code sample


Before trying this sample, follow the C# setup instructions in the Pub/Sub quickstart using client libraries. For more information, see the Pub/Sub C# API reference documentation.

using Google.Cloud.PubSub.V1;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

public class PullMessagesWithCustomAttributesAsyncSample
    public async Task<List<PubsubMessage>> PullMessagesWithCustomAttributesAsync(string projectId, string subscriptionId, bool acknowledge)
        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);

        SubscriberClient subscriber = await SubscriberClient.CreateAsync(subscriptionName);
        var messages = new List<PubsubMessage>();
        Task startTask = subscriber.StartAsync((PubsubMessage message, CancellationToken cancel) =>
            string text = System.Text.Encoding.UTF8.GetString(message.Data.ToArray());
            Console.WriteLine($"Message {message.MessageId}: {text}");
            if (message.Attributes != null)
                foreach (var attribute in message.Attributes)
                    Console.WriteLine($"{attribute.Key} = {attribute.Value}");
            return Task.FromResult(acknowledge ? SubscriberClient.Reply.Ack : SubscriberClient.Reply.Nack);
        // Run for 7 seconds.
        await Task.Delay(7000);
        await subscriber.StopAsync(CancellationToken.None);
        // Lets make sure that the start task finished successfully after the call to stop.
        await startTask;
        return messages;


Before trying this sample, follow the C++ setup instructions in the Pub/Sub quickstart using client libraries. For more information, see the Pub/Sub C++ API reference documentation.

namespace pubsub = ::google::cloud::pubsub;
auto sample = [](pubsub::Subscriber subscriber) {
  return subscriber.Subscribe(
      [&](pubsub::Message const& m, pubsub::AckHandler h) {
        std::cout << "Received message with attributes:\n";
        for (auto const& kv : m.attributes()) {
          std::cout << "  " << kv.first << ": " << kv.second << "\n";


Before trying this sample, follow the Go setup instructions in the Pub/Sub quickstart using client libraries. For more information, see the Pub/Sub Go API reference documentation.

import (


func pullMsgsCustomAttributes(w io.Writer, projectID, subID string) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %v", err)
	defer client.Close()

	sub := client.Subscription(subID)

	// Receive messages for 10 seconds, which simplifies testing.
	// Comment this out in production, since `Receive` should
	// be used as a long running operation.
	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
	defer cancel()

	// Receive blocks until the context is cancelled or an error occurs.
	err = sub.Receive(ctx, func(_ context.Context, msg *pubsub.Message) {
		fmt.Fprintf(w, "Got message :%q\n", string(msg.Data))
		fmt.Fprintln(w, "Attributes:")
		for key, value := range msg.Attributes {
			fmt.Fprintf(w, "%s = %s\n", key, value)
	if err != nil {
		return fmt.Errorf("sub.Receive: %v", err)

	return nil


Before trying this sample, follow the Java setup instructions in the Pub/Sub quickstart using client libraries. For more information, see the Pub/Sub Java API reference documentation.

import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class SubscribeWithCustomAttributesExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";

    subscribeWithCustomAttributesExample(projectId, subscriptionId);

  public static void subscribeWithCustomAttributesExample(String projectId, String subscriptionId) {
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    // Instantiate an asynchronous message receiver.
    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          // Handle incoming message, then ack the received message.
          System.out.println("Id: " + message.getMessageId());
          System.out.println("Data: " + message.getData().toStringUtf8());
          // Print message attributes.
              .forEach((key, value) -> System.out.println(key + " = " + value));

    Subscriber subscriber = null;
    try {
      subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
      // Start the subscriber.
      System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
      // Allow the subscriber to run for 30s unless an unrecoverable error occurs.
      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
    } catch (TimeoutException timeoutException) {
      // Shut down the subscriber after 30s. Stop receiving messages.


Before trying this sample, follow the Node.js setup instructions in the Pub/Sub quickstart using client libraries. For more information, see the Pub/Sub Node.js API reference documentation.

 * TODO(developer): Uncomment these variables before running the sample.
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const timeout = 60;

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function listenWithCustomAttributes() {
  // References an existing subscription, e.g. "my-subscription"
  const subscription = pubSubClient.subscription(subscriptionNameOrId);

  // Create an event handler to handle messages
  const messageHandler = message => {
      `Received message: id ${}, data ${
      }, attributes: ${JSON.stringify(message.attributes)}`

    // "Ack" (acknowledge receipt of) the message

  // Listen for new messages until timeout is hit
  subscription.on('message', messageHandler);
  setTimeout(() => {
    subscription.removeListener('message', messageHandler);
  }, timeout * 1000);



Before trying this sample, follow the Python setup instructions in the Pub/Sub quickstart using client libraries. For more information, see the Pub/Sub Python API reference documentation.

from concurrent.futures import TimeoutError
from import pubsub_v1

# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# Number of seconds the subscriber should listen for messages
# timeout = 5.0

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)

def callback(message: pubsub_v1.subscriber.message.Message) -> None:
    print(f"Received {!r}.")
    if message.attributes:
        for key in message.attributes:
            value = message.attributes.get(key)
            print(f"{key}: {value}")

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..\n")

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
        # When `timeout` is not set, result() will block indefinitely,
        # unless an exception is encountered first.
    except TimeoutError:
        streaming_pull_future.cancel()  # Trigger the shutdown.
        streaming_pull_future.result()  # Block until the shutdown is complete.


Before trying this sample, follow the Ruby setup instructions in the Pub/Sub quickstart using client libraries. For more information, see the Pub/Sub Ruby API reference documentation.

# subscription_id = "your-subscription-id"
require "google/cloud/pubsub"

pubsub =

subscription = pubsub.subscription subscription_id
subscriber   = subscription.listen do |received_message|
  puts "Received message: #{}"
  unless received_message.attributes.empty?
    puts "Attributes:"
    received_message.attributes.each do |key, value|
      puts "#{key}: #{value}"

# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60

What's next

To search and filter code samples for other Google Cloud products, see the Google Cloud sample browser.