Quickstart: Using Client Libraries

The Google Cloud Pub/Sub service allows applications to exchange messages reliably, quickly, and asynchronously. To accomplish this, a producer of data publishes a messages to a Cloud Pub/Sub topic. A subscriber client then creates a subscription to that topic and consumes messages from the subscription. Cloud Pub/Sub persists messages that could not be delivered reliably for up to seven days. This page shows you how to get started publishing messages with Cloud Pub/Sub using client libraries.

Before you begin

  1. Sign in to your Google account.

    If you don't already have one, sign up for a new account.

  2. Set up a Cloud Platform Console project.

    Set up a project

    Click to:

    • Create or select a project.
    • Enable the Cloud Pub/Sub API for that project.

    You can view and manage these resources at any time in the Cloud Platform Console.

  3. Install and initialize the Cloud SDK.
  4. Update and install gcloud components:
    gcloud components update &&
    gcloud components install beta

Authentication

Set up authentication with a service account.

Installation

Install the client libraries in your choice of programming language:

C#

Install-Package Google.Cloud.PubSub.V1 -Pre

Go

go get -u cloud.google.com/go/pubsub

Java

If you are using Maven, add this to your pom.xml file:
<dependency>
  <groupId>com.google.cloud</groupId>
  <artifactId>google-cloud-pubsub</artifactId>
  <version>0.25.0-beta</version>
</dependency>
If you are using Gradle, add this to your dependencies:
compile 'com.google.cloud:google-cloud-pubsub:0.25.0-beta'
If you are using SBT, add this to your dependences:
libraryDependencies += "com.google.cloud" % "google-cloud-pubsub" % "0.25.0-beta"

Node.js

npm install --save @google-cloud/pubsub

PHP

composer require google/apiclient

Python

pip install --upgrade google-cloud-pubsub

Ruby

gem install google-cloud-pubsub

Create a topic and a subscription

You must first create a topic before you can subscribe or publish to it. You can create a topic using the gcloud command-line tool (included in the Cloud SDK you installed in Before you begin) as follows:

gcloud beta pubsub topics create my-topic

Now use gcloud to create a subscription to the topic. Only messages published to the topic after the subscription is created are available to subscriber applications.

gcloud beta pubsub subscriptions create my-sub --topic my-topic

Publish messages

You are now ready to publish messages to the topic:

C#

TopicName topicName = new TopicName(_projectId, topicId);
PubsubMessage message = new PubsubMessage
{
    // The data is any arbitrary ByteString. Here, we're using text.
    Data = ByteString.CopyFromUtf8("Hello Cloud Pub/Sub!"),
    // The attributes provide metadata in a string-to-string 
    // dictionary.
    Attributes =
    {
        { "description", "Simple text message" }
    }
};
publisher.Publish(topicName, new[] { message });

Go

For more on installing and creating a Cloud Pub/Sub client, refer to Cloud Pub/Sub Client Libraries.

t := client.Topic(topic)
result := t.Publish(ctx, &pubsub.Message{
	Data: []byte(msg),
})
// Block until the result is returned and a server-generated
// ID is returned for the published message.
id, err := result.Get(ctx)
if err != nil {
	return err
}
fmt.Printf("Published a message; msg ID: %v\n", id)

Java

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.cloud.ServiceOptions;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.util.ArrayList;
import java.util.List;

public class PublisherExample {

  static final int MESSAGE_COUNT = 5;

  // use the default project id
  private static final String PROJECT_ID = ServiceOptions.getDefaultProjectId();

  //schedule a message to be published, messages are automatically batched
  private static ApiFuture<String> publishMessage(Publisher publisher, String message)
      throws Exception {
    // convert message to bytes
    ByteString data = ByteString.copyFromUtf8(message);
    PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
    return publisher.publish(pubsubMessage);
  }

  /** Publish messages to a topic. */
  public static void main(String... args) throws Exception {
    // topic id, eg. "my-topic"
    String topicId = args[0];
    TopicName topicName = TopicName.create(PROJECT_ID, topicId);
    Publisher publisher = null;
    List<ApiFuture<String>> apiFutures = new ArrayList<>();
    try {
      // Create a publisher instance with default settings bound to the topic
      publisher = Publisher.defaultBuilder(topicName).build();
      for (int i = 0; i < MESSAGE_COUNT; i++) {
        String message = "message-" + i;
        ApiFuture<String> messageId = publishMessage(publisher, message);
        apiFutures.add(messageId);
      }
    } finally {
      // Once published, returns server-assigned message ids (unique within the topic)
      List<String> messageIds = ApiFutures.allAsList(apiFutures).get();
      for (String messageId : messageIds) {
        System.out.println(messageId);
      }
      if (publisher != null) {
        // When finished with the publisher, shutdown to free up resources.
        publisher.shutdown();
      }
    }
  }
}

Node.js

function publishMessage (topicName, data) {
  // Instantiates a client
  const pubsub = PubSub();

  // References an existing topic, e.g. "my-topic"
  const topic = pubsub.topic(topicName);

  // Create a publisher for the topic (which can include additional batching configuration)
  const publisher = topic.publisher();

  // Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
  const dataBuffer = Buffer.from(data);
  return publisher.publish(dataBuffer)
    .then((results) => {
      const messageId = results[0];

      console.log(`Message ${messageId} published.`);

      return messageId;
    });
}

PHP

use Google\Cloud\PubSub\PubSubClient;

/**
 * Publishes a message for a Pub/Sub topic.
 *
 * @param string $projectId  The Google project ID.
 * @param string $topicName  The Pub/Sub topic name.
 * @param string $message  The message to publish.
 */
function publish_message($projectId, $topicName, $message)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);
    $topic = $pubsub->topic($topicName);
    $topic->publish(['data' => $message]);
    print('Message published' . PHP_EOL);
}

Python

def publish_messages(project, topic_name):
    """Publishes multiple messages to a Pub/Sub topic."""
    publisher = pubsub_v1.PublisherClient()
    topic_path = publisher.topic_path(project, topic_name)

    for n in range(1, 10):
        data = u'Message number {}'.format(n)
        # Data must be a bytestring
        data = data.encode('utf-8')
        publisher.publish(topic_path, data=data)

    print('Published messages.')

Ruby

pubsub = Google::Cloud::Pubsub.new project: "my-gcp-project-id"
topic  = pubsub.topic "my-topic"

topic.publish "A Message"

Receive messages

Now set up a subscriber to pull the messages you just published. Every subscriber must acknowledge each message within a configurable time window. Unacknowledged messages are redelivered. Note that Cloud Pub/Sub occasionally delivers a message more than once to ensure that all messages make it to a subscriber at least once. Here is an example of how you might receive and acknowledge messages:

C#

SubscriptionName subscriptionName = new SubscriptionName(_projectId,
    subscriptionId);
PullResponse response = subscriber.Pull(subscriptionName,
    returnImmediately: true, maxMessages: 10);
subscriber.Acknowledge(subscriptionName,
    response.ReceivedMessages.Select(m => m.AckId));

Go

// Consume 10 messages.
var mu sync.Mutex
received := 0
sub := client.Subscription(name)
cctx, cancel := context.WithCancel(ctx)
err := sub.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) {
	mu.Lock()
	defer mu.Unlock()
	received++
	if received >= 10 {
		cancel()
		msg.Nack()
		return
	}
	fmt.Printf("Got message: %q\n", string(msg.Data))
	msg.Ack()
})
if err != nil {
	return err
}

Java

import com.google.cloud.ServiceOptions;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.SubscriptionName;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;

public class SubscriberExample {

  // use the default project id
  private static final String PROJECT_ID = ServiceOptions.getDefaultProjectId();

  private static final BlockingQueue<PubsubMessage> messages = new LinkedBlockingDeque<>();

  static class MessageReceiverExample implements MessageReceiver {

    @Override
    public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
      messages.offer(message);
      consumer.ack();
    }
  }

  /** Receive messages over a subscription. */
  public static void main(String... args) throws Exception {
    // set subscriber id, eg. my-sub
    String subscriptionId = args[0];
    SubscriptionName subscriptionName = SubscriptionName.create(PROJECT_ID, subscriptionId);
    Subscriber subscriber = null;
    try {
      // create a subscriber bound to the asynchronous message receiver
      subscriber =
          Subscriber.defaultBuilder(subscriptionName, new MessageReceiverExample()).build();
      subscriber.startAsync().awaitRunning();
      // Continue to listen to messages
      while (true) {
        PubsubMessage message = messages.take();
        System.out.println("Message Id: " + message.getMessageId());
        System.out.println("Data: " + message.getData().toStringUtf8());
      }
    } finally {
      if (subscriber != null) {
        subscriber.stopAsync();
      }
    }
  }
}

Node.js

function listenForMessages (subscriptionName, timeout) {
  // Instantiates a client
  const pubsub = PubSub();

  // References an existing subscription, e.g. "my-subscription"
  const subscription = pubsub.subscription(subscriptionName);

  // Create an event handler to handle messages
  let messageCount = 0;
  const messageHandler = (message) => {
    console.log(`Received message ${message.id}:`);
    console.log(`\tData: ${message.data}`);
    console.log(`\tAttributes: ${message.attributes}`);
    messageCount += 1;

    // "Ack" (acknowledge receipt of) the message
    message.ack();
  };

  // Listen for new messages until timeout is hit
  subscription.on(`message`, messageHandler);
  setTimeout(() => {
    subscription.removeListener('message', messageHandler);
    console.log(`${messageCount} message(s) received.`);
  }, timeout * 1000);
}

PHP

use Google\Cloud\PubSub\PubSubClient;

/**
 * Pulls all Pub/Sub messages for a subscription.
 *
 * @param string $projectId  The Google project ID.
 * @param string $subscriptionName  The Pub/Sub subscription name.
 */
function pull_messages($projectId, $subscriptionName)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);
    $subscription = $pubsub->subscription($subscriptionName);
    foreach ($subscription->pull() as $message) {
        printf('Message: %s' . PHP_EOL, $message->data());
        // Acknowledge the Pub/Sub message has been received, so it will not be pulled multiple times.
        $subscription->acknowledge($message);
    }
}

Python

def receive_messages(project, subscription_name):
    """Receives messages from a pull subscription."""
    subscriber = pubsub_v1.SubscriberClient()
    subscription_path = subscriber.subscription_path(
        project, subscription_name)

    def callback(message):
        print('Received message: {}'.format(message))
        message.ack()

    subscriber.subscribe(subscription_path, callback=callback)

    # The subscriber is non-blocking, so we must keep the main thread from
    # exiting to allow it to process messages in the background.
    print('Listening for messages on {}'.format(subscription_path))
    while True:
        time.sleep(60)

Ruby

pubsub       = Google::Cloud::Pubsub.new project: "my-gcp-project-id"
subscription = pubsub.subscription "my-subscription"

puts "Messages pulled:"
subscription.pull.each do |message|
  puts message.data
  message.acknowledge!
end

Clean up (Optional)

To avoid incurring charges to your Google Cloud Platform account for the resources used in this guide, you can delete the topic and subscription.
  gcloud beta pubsub subscriptions delete mySubscription
  gcloud beta pubsub topics delete myTopic

What's next

For complete examples that you can build and run, see the Cloud Pub/Sub tutorials.

To get details about the client library for your language of choice, see Client Libraries.

For an overview of Cloud Pub/Sub, see What is Google Cloud Pub/Sub?.

See the Publisher and Subscriber guides to learn more about the concepts discussed in this page.

Monitor your resources on the go

Get the Google Cloud Console app to help you manage your projects.

Send feedback about...

Cloud Pub/Sub Documentation