Quickstart: Using Client Libraries

The Cloud Pub/Sub service allows applications to exchange messages reliably, quickly, and asynchronously. To accomplish this, a producer of data publishes a messages to a Cloud Pub/Sub topic. A subscriber client then creates a subscription to that topic and consumes messages from the subscription. Cloud Pub/Sub persists messages that could not be delivered reliably for up to seven days. This page shows you how to get started publishing messages with Cloud Pub/Sub using client libraries.

Before you begin

  1. Sign in to your Google Account.

    If you don't already have one, sign up for a new account.

  2. Set up a GCP Console project.

    Set up a project

    Click to:

    • Create or select a project.
    • Enable the Cloud Pub/Sub API for that project.
    • Create a service account.
    • Download a private key as JSON.

    You can view and manage these resources at any time in the GCP Console.

  3. Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the file path of the JSON file that contains your service account key. This variable only applies to your current shell session, so if you open a new session, set the variable again.

  4. Install and initialize the Cloud SDK.

Installing the client libraries

If you are using the SDK (rather than the Cloud Shell), install the client libraries in your programming language of choice:


For more on setting up your Python development environment, refer to Python Development Environment Setup Guide.

# ensure that you are using virtualenv
# as described in the python dev setup guide

pip install --upgrade google-cloud-pubsub


Install-Package Google.Cloud.PubSub.V1 -Pre


go get -u cloud.google.com/go/pubsub


If you are using Maven, add the following to your pom.xml file:
If you are using Gradle, add the following to your dependencies:
compile 'com.google.cloud:google-cloud-pubsub:1.87.0'
If you are using SBT, add the following to your dependencies:
libraryDependencies += "com.google.cloud" % "google-cloud-pubsub" % "1.87.0"

If you're using IntelliJ or Eclipse, you can add client libraries to your project using the following IDE plugins:

The plugins provide additional functionality, such as key management for service accounts. Refer to each plugin's documentation for details.


npm install --save @google-cloud/pubsub


composer require google/cloud-pubsub


gem install google-cloud-pubsub

Creating a topic and a subscription

Once you create a topic, you can subscribe or publish to it.

Use the gcloud pubsub topics create command to create a topic:

gcloud pubsub topics create my-topic

Use the gcloud pubsub subscriptions create command to create a subscription. Only messages published to the topic after the subscription is created are available to subscriber applications.

gcloud pubsub subscriptions create my-sub --topic my-topic

For more information about naming your topics and subscriptions, see Resource names.

Publishing messages


from google.cloud import pubsub_v1

# TODO project_id = "Your Google Cloud Project ID"
# TODO topic_name = "Your Pub/Sub topic name"

publisher = pubsub_v1.PublisherClient()
# The `topic_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/topics/{topic_name}`
topic_path = publisher.topic_path(project_id, topic_name)

for n in range(1, 10):
    data = u'Message number {}'.format(n)
    # Data must be a bytestring
    data = data.encode('utf-8')
    # When you publish a message, the client returns a future.
    future = publisher.publish(topic_path, data=data)

print('Published messages.')


// PublisherClient collects messages into appropriately sized
// batches.
var publishTasks =
    messageTexts.Select(text => publisher.PublishAsync(text));
foreach (Task<string> task in publishTasks)
    string message = await task;
    await Console.Out.WriteLineAsync($"Published message {message}");


t := client.Topic(topic)
result := t.Publish(ctx, &pubsub.Message{
	Data: []byte(msg),
// Block until the result is returned and a server-generated
// ID is returned for the published message.
id, err := result.Get(ctx)
if err != nil {
	return err
fmt.Printf("Published a message; msg ID: %v\n", id)


import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.cloud.ServiceOptions;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

public class PublisherExample {

  // use the default project id
  private static final String PROJECT_ID = ServiceOptions.getDefaultProjectId();

  /** Publish messages to a topic.
   * @param args topic name, number of messages
  public static void main(String... args) throws Exception {
    // topic id, eg. "my-topic"
    String topicId = args[0];
    int messageCount = Integer.parseInt(args[1]);
    ProjectTopicName topicName = ProjectTopicName.of(PROJECT_ID, topicId);
    Publisher publisher = null;
    List<ApiFuture<String>> futures = new ArrayList<>();

    try {
      // Create a publisher instance with default settings bound to the topic
      publisher = Publisher.newBuilder(topicName).build();

      for (int i = 0; i < messageCount; i++) {
        String message = "message-" + i;

        // convert message to bytes
        ByteString data = ByteString.copyFromUtf8(message);
        PubsubMessage pubsubMessage = PubsubMessage.newBuilder()

        // Schedule a message to be published. Messages are automatically batched.
        ApiFuture<String> future = publisher.publish(pubsubMessage);
    } finally {
      // Wait on any pending requests
      List<String> messageIds = ApiFutures.allAsList(futures).get();

      for (String messageId : messageIds) {

      if (publisher != null) {
        // When finished with the publisher, shutdown to free up resources.


// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client
const pubsub = new PubSub();

 * TODO(developer): Uncomment the following lines to run the sample.
// const topicName = 'my-topic';
// const data = JSON.stringify({ foo: 'bar' });

// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);

const messageId = await pubsub.topic(topicName).publish(dataBuffer);
console.log(`Message ${messageId} published.`);


use Google\Cloud\PubSub\PubSubClient;

 * Publishes a message for a Pub/Sub topic.
 * @param string $projectId  The Google project ID.
 * @param string $topicName  The Pub/Sub topic name.
 * @param string $message  The message to publish.
function publish_message($projectId, $topicName, $message)
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    $topic = $pubsub->topic($topicName);
    $topic->publish(['data' => $message]);
    print('Message published' . PHP_EOL);


# project_id = "Your Google Cloud Project ID"
# topic_name = "Your Pubsub topic name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

topic = pubsub.topic topic_name
topic.publish "This is a test message."

puts "Message published."

Receiving messages

Set up a subscriber to pull the messages you just published. Every subscriber must acknowledge each message within a configurable time window. Unacknowledged messages are redelivered. Note that Cloud Pub/Sub occasionally delivers a message more than once to ensure that all messages make it to a subscriber at least once. Here is an example of how you might receive and acknowledge messages:


import time

from google.cloud import pubsub_v1

# TODO project_id = "Your Google Cloud Project ID"
# TODO subscription_name = "Your Pub/Sub subscription name"

subscriber = pubsub_v1.SubscriberClient()
# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_name}`
subscription_path = subscriber.subscription_path(
    project_id, subscription_name)

def callback(message):
    print('Received message: {}'.format(message))

subscriber.subscribe(subscription_path, callback=callback)

# The subscriber is non-blocking. We must keep the main thread from
# exiting to allow it to process messages asynchronously in the background.
print('Listening for messages on {}'.format(subscription_path))
while True:


// SubscriberClient runs your message handle function on multiple
// threads to maximize throughput.
    async (PubsubMessage message, CancellationToken cancel) =>
        string text =
        await Console.Out.WriteLineAsync(
            $"Message {message.MessageId}: {text}");
        return acknowledge ? SubscriberClient.Reply.Ack
            : SubscriberClient.Reply.Nack;
// Run for 3 seconds.
await Task.Delay(3000);
await subscriber.StopAsync(CancellationToken.None);


// Consume 10 messages.
var mu sync.Mutex
received := 0
sub := client.Subscription(subName)
cctx, cancel := context.WithCancel(ctx)
err := sub.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) {
	fmt.Printf("Got message: %q\n", string(msg.Data))
	defer mu.Unlock()
	if received == 10 {
if err != nil {
	return err


import com.google.cloud.ServiceOptions;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;

public class SubscriberExample {
  // use the default project id
  private static final String PROJECT_ID = ServiceOptions.getDefaultProjectId();

  static class MessageReceiverExample implements MessageReceiver {

    public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
          "Message Id: " + message.getMessageId() + " Data: " + message.getData().toStringUtf8());
      // Ack only after all work for the message is complete.

  /** Receive messages over a subscription. */
  public static void main(String... args) throws Exception {
    // set subscriber id, eg. my-sub
    String subscriptionId = args[0];
    ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of(
        PROJECT_ID, subscriptionId);
    Subscriber subscriber = null;
    try {
      // create a subscriber bound to the asynchronous message receiver
      subscriber =
          Subscriber.newBuilder(subscriptionName, new MessageReceiverExample()).build();
      // Allow the subscriber to run indefinitely unless an unrecoverable error occurs.
    } catch (IllegalStateException e) {
      System.out.println("Subscriber unexpectedly stopped: " + e);


// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client
const pubsub = new PubSub();

 * TODO(developer): Uncomment the following lines to run the sample.
// const subscriptionName = 'my-sub';
// const timeout = 60;

// References an existing subscription
const subscription = pubsub.subscription(subscriptionName);

// Create an event handler to handle messages
let messageCount = 0;
const messageHandler = message => {
  console.log(`Received message ${message.id}:`);
  console.log(`\tData: ${message.data}`);
  console.log(`\tAttributes: ${message.attributes}`);
  messageCount += 1;

  // "Ack" (acknowledge receipt of) the message

// Listen for new messages until timeout is hit
subscription.on(`message`, messageHandler);

setTimeout(() => {
  subscription.removeListener('message', messageHandler);
  console.log(`${messageCount} message(s) received.`);
}, timeout * 1000);


use Google\Cloud\PubSub\PubSubClient;

 * Pulls all Pub/Sub messages for a subscription.
 * @param string $projectId  The Google project ID.
 * @param string $subscriptionName  The Pub/Sub subscription name.
function pull_messages($projectId, $subscriptionName)
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    $subscription = $pubsub->subscription($subscriptionName);
    foreach ($subscription->pull() as $message) {
        printf('Message: %s' . PHP_EOL, $message->data());
        // Acknowledge the Pub/Sub message has been received, so it will not be pulled multiple times.


# project_id        = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

subscription = pubsub.subscription subscription_name
subscriber   = subscription.listen do |received_message|
  puts "Received message: #{received_message.data}"

# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60

How did it go?

Clean up (optional)

To avoid incurring charges to your Google Cloud Platform account for the resources used in this guide, you can use the command line to delete the topic and subscription.

  gcloud pubsub subscriptions delete my-sub
  gcloud pubsub topics delete my-topic

What's next

هل كانت هذه الصفحة مفيدة؟ يرجى تقييم أدائنا:

إرسال تعليقات حول...

Cloud Pub/Sub Documentation