Quickstart: Using Client Libraries

The Google Cloud Pub/Sub service allows applications to exchange messages reliably, quickly, and asynchronously. To accomplish this, a producer of data publishes a messages to a Cloud Pub/Sub topic. A subscriber client then creates a subscription to that topic and consumes messages from the subscription. Cloud Pub/Sub persists messages that could not be delivered reliably for up to seven days. This page shows you how to get started publishing messages with Cloud Pub/Sub using client libraries.

Before you begin

  1. Sign in to your Google account.

    If you don't already have one, sign up for a new account.

  2. Select or create a Cloud Platform project.

    Go to the Projects page

  3. Enable billing for your project.

    Enable billing

  4. Enable the Cloud Pub/Sub API.

    Enable the API

  5. Install and initialize the Cloud SDK.

Authentication

Set up authentication with a service account.

Installation

Install the client libraries in your choice of programming language:

C#

Install-Package Google.Cloud.PubSub.V1 -Pre

Go

go get -u cloud.google.com/go/pubsub

Java

If you are using Maven, add this to your pom.xml file:

<dependency>
    <groupId>com.google.cloud</groupId>
    <artifactId>google-cloud-pubsub</artifactId>
    <version>0.20.1-beta</version>
</dependency>

If you are using Gradle, add this to your dependencies:

compile group: 'com.google.cloud', name: 'google-cloud-pubsub', version: '0.20.1-beta'

Node.js

npm install --save @google-cloud/pubsub

PHP

composer require google/apiclient

Python

pip install --upgrade google-cloud-pubsub

Ruby

gem install google-cloud-pubsub

Create a topic and a subscription

You must first create a topic before you can subscribe or publish to it. You can create a topic using the gcloud command-line tool (included in the Cloud SDK you installed in Before you begin) as follows:

gcloud beta pubsub topics create my-topic

Now use gcloud to create a subscription to the topic. Only messages published to the topic after the subscription is created are available to subscriber applications.

gcloud beta pubsub subscriptions create my-sub --topic my-topic

Publish messages

You are now ready to publish messages to the topic:

C#

TopicName topicName = new TopicName(_projectId, topicId);
PubsubMessage message = new PubsubMessage
{
    // The data is any arbitrary ByteString. Here, we're using text.
    Data = ByteString.CopyFromUtf8("Hello Cloud Pub/Sub!"),
    // The attributes provide metadata in a string-to-string 
    // dictionary.
    Attributes =
    {
        { "description", "Simple text message" }
    }
};
publisher.Publish(topicName, new[] { message });

Go

For more on installing and creating a Cloud Pub/Sub client, refer to Cloud Pub/Sub Client Libraries.

t := client.Topic(topic)
result := t.Publish(ctx, &pubsub.Message{
	Data: []byte(msg),
})
// Block until the result is returned and a server-generated
// ID is returned for the published message.
id, err := result.Get(ctx)
if err != nil {
	return err
}
fmt.Printf("Published a message; msg ID: %v\n", id)

Java

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.cloud.ServiceOptions;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.util.ArrayList;
import java.util.List;

public class PublisherExample {

  static final int MESSAGE_COUNT = 5;

  // use the default project id
  private static final String PROJECT_ID = ServiceOptions.getDefaultProjectId();

  //schedule a message to be published, messages are automatically batched
  private static ApiFuture<String> publishMessage(Publisher publisher, String message)
      throws Exception {
    // convert message to bytes
    ByteString data = ByteString.copyFromUtf8(message);
    PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
    return publisher.publish(pubsubMessage);
  }

  /** Publish messages to a topic. */
  public static void main(String... args) throws Exception {
    // topic id, eg. "my-topic"
    String topicId = args[0];
    TopicName topicName = TopicName.create(PROJECT_ID, topicId);
    Publisher publisher = null;
    List<ApiFuture<String>> apiFutures = new ArrayList<>();
    try {
      // Create a publisher instance with default settings bound to the topic
      publisher = Publisher.defaultBuilder(topicName).build();
      for (int i = 0; i < MESSAGE_COUNT; i++) {
        String message = "message-" + i;
        ApiFuture<String> messageId = publishMessage(publisher, message);
        apiFutures.add(messageId);
      }
    } finally {
      // Once published, returns server-assigned message ids (unique within the topic)
      List<String> messageIds = ApiFutures.allAsList(apiFutures).get();
      for (String messageId : messageIds) {
        System.out.println(messageId);
      }
      if (publisher != null) {
        // When finished with the publisher, shutdown to free up resources.
        publisher.shutdown();
      }
    }
  }
}

Node.js

function publishMessage (topicName, data) {
  // Instantiates a client
  const pubsub = PubSub();

  // References an existing topic, e.g. "my-topic"
  const topic = pubsub.topic(topicName);

  // Publishes the message, e.g. "Hello, world!" or { amount: 599.00, status: 'pending' }
  return topic.publish(data)
    .then((results) => {
      const messageIds = results[0];

      console.log(`Message ${messageIds[0]} published.`);

      return messageIds;
    });
}

PHP

use Google\Cloud\PubSub\PubSubClient;

/**
 * Publishes a message for a Pub/Sub topic.
 *
 * @param string $projectId  The Google project ID.
 * @param string $topicName  The Pub/Sub topic name.
 * @param string $message  The message to publish.
 */
function publish_message($projectId, $topicName, $message)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);
    $topic = $pubsub->topic($topicName);
    $topic->publish(['data' => $message]);
    print('Message published' . PHP_EOL);
}

Python

def publish_message(topic_name, data):
    """Publishes a message to a Pub/Sub topic with the given data."""
    pubsub_client = pubsub.Client()
    topic = pubsub_client.topic(topic_name)

    # Data must be a bytestring
    data = data.encode('utf-8')

    message_id = topic.publish(data)

    print('Message {} published.'.format(message_id))

Ruby

pubsub = Google::Cloud::Pubsub.new project: "my-gcp-project-id"
topic  = pubsub.topic "my-topic"

topic.publish "A Message"

Receive messages

Now set up a subscriber to pull the messages you just published. Every subscriber must acknowledge each message within a configurable time window. Unacknowledged messages are redelivered. Note that Cloud Pub/Sub occasionally delivers a message more than once to ensure that all messages make it to a subscriber at least once. Here is an example of how you might receive and acknowledge messages:

C#

SubscriptionName subscriptionName = new SubscriptionName(_projectId,
    subscriptionId);
PullResponse response = subscriber.Pull(subscriptionName,
    returnImmediately: true, maxMessages: 10);
subscriber.Acknowledge(subscriptionName,
    response.ReceivedMessages.Select(m => m.AckId));

Go

// Consume 10 messages.
var mu sync.Mutex
received := 0
sub := client.Subscription(name)
cctx, cancel := context.WithCancel(ctx)
err := sub.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) {
	mu.Lock()
	defer mu.Unlock()
	received++
	if received >= 10 {
		cancel()
		msg.Nack()
		return
	}
	fmt.Printf("Got message: %q\n", string(msg.Data))
	msg.Ack()
})
if err != nil {
	return err
}

Java

import com.google.cloud.ServiceOptions;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.SubscriptionName;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;

public class SubscriberExample {

  // use the default project id
  private static final String PROJECT_ID = ServiceOptions.getDefaultProjectId();

  private static final BlockingQueue<PubsubMessage> messages = new LinkedBlockingDeque<>();

  static class MessageReceiverExample implements MessageReceiver {

    @Override
    public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
      messages.offer(message);
      consumer.ack();
    }
  }

  /** Receive messages over a subscription. */
  public static void main(String... args) throws Exception {
    // set subscriber id, eg. my-sub
    String subscriptionId = args[0];
    SubscriptionName subscriptionName = SubscriptionName.create(PROJECT_ID, subscriptionId);
    Subscriber subscriber = null;
    try {
      // create a subscriber bound to the asynchronous message receiver
      subscriber =
          Subscriber.defaultBuilder(subscriptionName, new MessageReceiverExample()).build();
      subscriber.startAsync().awaitRunning();
      // Continue to listen to messages
      while (true) {
        PubsubMessage message = messages.take();
        System.out.println("Message Id: " + message.getMessageId());
        System.out.println("Data: " + message.getData().toStringUtf8());
      }
    } finally {
      if (subscriber != null) {
        subscriber.stopAsync();
      }
    }
  }
}

Node.js

function pullMessages (subscriptionName) {
  // Instantiates a client
  const pubsub = PubSub();

  // References an existing subscription, e.g. "my-subscription"
  const subscription = pubsub.subscription(subscriptionName);

  // Pulls messages. Set returnImmediately to false to block until messages are
  // received.
  return subscription.pull()
    .then((results) => {
      const messages = results[0];

      console.log(`Received ${messages.length} messages.`);

      messages.forEach((message) => {
        console.log(`* %d %j %j`, message.id, message.data, message.attributes);
      });

      // Acknowledges received messages. If you do not acknowledge, Pub/Sub will
      // redeliver the message.
      return subscription.ack(messages.map((message) => message.ackId));
    });
}

PHP

use Google\Cloud\PubSub\PubSubClient;

/**
 * Pulls all Pub/Sub messages for a subscription.
 *
 * @param string $projectId  The Google project ID.
 * @param string $subscriptionName  The Pub/Sub subscription name.
 */
function pull_messages($projectId, $subscriptionName)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);
    $subscription = $pubsub->subscription($subscriptionName);
    foreach ($subscription->pull() as $message) {
        printf('Message: %s' . PHP_EOL, $message->data());
        // Acknowledge the Pub/Sub message has been received, so it will not be pulled multiple times.
        $subscription->acknowledge($message);
    }
}

Python

def receive_message(topic_name, subscription_name):
    """Receives a message from a pull subscription."""
    pubsub_client = pubsub.Client()
    topic = pubsub_client.topic(topic_name)
    subscription = topic.subscription(subscription_name)

    # Change return_immediately=False to block until messages are
    # received.
    results = subscription.pull(return_immediately=True)

    print('Received {} messages.'.format(len(results)))

    for ack_id, message in results:
        print('* {}: {}, {}'.format(
            message.message_id, message.data, message.attributes))

    # Acknowledge received messages. If you do not acknowledge, Pub/Sub will
    # redeliver the message.
    if results:
        subscription.acknowledge([ack_id for ack_id, message in results])

Ruby

pubsub       = Google::Cloud::Pubsub.new project: "my-gcp-project-id"
subscription = pubsub.subscription "my-subscription"

puts "Messages pulled:"
subscription.pull.each do |message|
  puts message.data
  message.acknowledge!
end

Clean up (Optional)

To avoid incurring charges to your Google Cloud Platform account for the resources used in this guide, you can delete the topic and subscription.
  gcloud beta pubsub subscriptions delete mySubscription
  gcloud beta pubsub topics delete myTopic

What's next

For complete examples that you can build and run, see the Cloud Pub/Sub tutorials.

To get details about the client library for your language of choice, see Client Libraries.

For an overview of Cloud Pub/Sub, see What is Google Cloud Pub/Sub?.

See the Publisher and Subscriber guides to learn more about the concepts discussed in this page.

Send feedback about...

Cloud Pub/Sub Documentation