Publisher Guide

A publisher application creates and sends messages to a topic. Pub/Sub offers at-least-once message delivery and best-effort ordering to existing subscribers, as explained in the Subscriber Overview. The general flow for a publisher application is as follows:

  1. Create a message containing your data.
  2. Send a request to the Pub/Sub Server to publish the message to the desired topic.

To learn about creating and managing topics, see Managing Topics and Subscriptions.

Setup

See the Client Libraries Getting Started Guide to set up your environment in the programming language of your choice.

Publish messages to a topic

When using JSON over REST, message data must be base64-encoded. The entire request including one or more messages must be smaller than 10MB, after decoding. Note that the message payload must not be empty; it must contain either a non-empty data field, or at least one attribute.

Client libraries, depending on your choice of programming language, can publish messages synchronously or asynchronously. Asynchronous publishing allows for batching and higher thoroughput in your application.

All client libraries support publishing messages asynchronously. See the API Reference documentation for your chosen programming language to see if its client library also supports publishing messages synchronously, if that is your preferred option. A server-generated ID (unique within the topic) is returned on successfully publishing a message.

Protocol

Request:

POST https://pubsub.googleapis.com/v1/projects/myproject/topics/mytopic:publish
{
  "messages": [
    {
      "attributes": {
        "key": "iana.org/language_tag",
        "value": "en"
      },
      "data": "SGVsbG8gQ2xvdWQgUHViL1N1YiEgSGVyZSBpcyBteSBtZXNzYWdlIQ=="
    }
  ]
}

Response:

200 OK
{
  "messageIds": [
    "19916711285"
  ]
}

C#

For more on installing and creating a Cloud Pub/Sub client, refer to Cloud Pub/Sub Client Libraries.

PublisherClient publisherClient = PublisherClient.Create();
SimplePublisher publisher = SimplePublisher.Create(
    new TopicName(projectId, topicId), new[] { publisherClient });
var publishTasks = new List<Task<string>>();
// SimplePublisher collects messages into appropriately sized
// batches.
foreach (string text in messageTexts)
{
    publishTasks.Add(publisher.PublishAsync(text));
}
foreach (var task in publishTasks)
{
    Console.WriteLine("Published message {0}", task.Result);
}

Go

For more on installing and creating a Cloud Pub/Sub client, refer to Cloud Pub/Sub Client Libraries.

t := client.Topic(topic)
result := t.Publish(ctx, &pubsub.Message{
	Data: []byte(msg),
})
// Block until the result is returned and a server-generated
// ID is returned for the published message.
id, err := result.Get(ctx)
if err != nil {
	return err
}
fmt.Printf("Published a message; msg ID: %v\n", id)

Java

For more on installing and creating a Cloud Pub/Sub client, refer to Cloud Pub/Sub Client Libraries.

TopicName topicName = TopicName.create("my-project-id", "my-topic-id");
Publisher publisher = null;
List<ApiFuture<String>> messageIdFutures = new ArrayList<>();

try {
  // Create a publisher instance with default settings bound to the topic
  publisher = Publisher.defaultBuilder(topicName).build();

  List<String> messages = Arrays.asList("first message", "second message");

  // schedule publishing one message at a time : messages get automatically batched
  for (String message : messages) {
    ByteString data = ByteString.copyFromUtf8(message);
    PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

    // Once published, returns a server-assigned message id (unique within the topic)
    ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
    messageIdFutures.add(messageIdFuture);
  }
} finally {
  // wait on any pending publish requests.
  List<String> messageIds = ApiFutures.allAsList(messageIdFutures).get();

  for (String messageId : messageIds) {
    System.out.println("published with message ID: " + messageId);
  }

  if (publisher != null) {
    // When finished with the publisher, shutdown to free up resources.
    publisher.shutdown();
  }
}

Node.js

For more on installing and creating a Cloud Pub/Sub client, refer to Cloud Pub/Sub Client Libraries.

function publishMessage (topicName, data) {
  // Instantiates a client
  const pubsub = PubSub();

  // References an existing topic, e.g. "my-topic"
  const topic = pubsub.topic(topicName);

  // Create a publisher for the topic (which can include additional batching configuration)
  const publisher = topic.publisher();

  // Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
  const dataBuffer = Buffer.from(data);
  return publisher.publish(dataBuffer)
    .then((results) => {
      const messageId = results[0];

      console.log(`Message ${messageId} published.`);

      return messageId;
    });
}

PHP

For more on installing and creating a Cloud Pub/Sub client, refer to Cloud Pub/Sub Client Libraries.

use Google\Cloud\PubSub\PubSubClient;

/**
 * Publishes a message for a Pub/Sub topic.
 *
 * @param string $projectId  The Google project ID.
 * @param string $topicName  The Pub/Sub topic name.
 * @param string $message  The message to publish.
 */
function publish_message($projectId, $topicName, $message)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);
    $topic = $pubsub->topic($topicName);
    $topic->publish(['data' => $message]);
    print('Message published' . PHP_EOL);
}

Python

For more on installing and creating a Cloud Pub/Sub client, refer to Cloud Pub/Sub Client Libraries.

def publish_messages(project, topic_name):
    """Publishes multiple messages to a Pub/Sub topic."""
    publisher = pubsub_v1.PublisherClient()
    topic_path = publisher.topic_path(project, topic_name)

    for n in range(1, 10):
        data = u'Message number {}'.format(n)
        # Data must be a bytestring
        data = data.encode('utf-8')
        publisher.publish(topic_path, data=data)

    print('Published messages.')

Ruby

For more on installing and creating a Cloud Pub/Sub client, refer to Cloud Pub/Sub Client Libraries.

pubsub = Google::Cloud::Pubsub.new project: "my-gcp-project-id"
topic  = pubsub.topic "my-topic"

topic.publish "A Message"

Batching Configuration

The Pub/Sub client libraries automate how messages are batched to different degrees depending on the language. You can fine-tune these as needed for your application to balance message latency and throughput.

Messages can be batched based on request size (in bytes), number of messages, and time. You can override the default settings as shown in this sample:

C#

For more on installing and creating a Cloud Pub/Sub client, refer to Cloud Pub/Sub Client Libraries.

PublisherClient publisherClient = PublisherClient.Create();
SimplePublisher publisher = SimplePublisher.Create(
    new TopicName(projectId, topicId), new[] { publisherClient },
    new SimplePublisher.Settings()
    {
        BatchingSettings = new Google.Api.Gax.BatchingSettings(
            elementCountThreshold: 100,
            byteCountThreshold: 10240,
            delayThreshold: TimeSpan.FromSeconds(3))
    });
var publishTasks = new List<Task<string>>();
// SimplePublisher collects messages into appropriately sized
// batches.
foreach (string text in messageTexts)
{
    publishTasks.Add(publisher.PublishAsync(text));
}
foreach (var task in publishTasks)
{
    Console.WriteLine("Published message {0}", task.Result);
}

Java

For more on installing and creating a Cloud Pub/Sub client, refer to Cloud Pub/Sub Client Libraries.

// Batch settings control how the publisher batches messages
long requestBytesThreshold = 5000L; // default : 1kb
long messageCountBatchSize = 10L; // default : 100

Duration publishDelayThreshold = Duration.ofMillis(100); // default : 1 ms

// Publish request get triggered based on request size, messages count & time since last publish
BatchingSettings batchingSettings = BatchingSettings.newBuilder()
    .setElementCountThreshold(messageCountBatchSize)
    .setRequestByteThreshold(requestBytesThreshold)
    .setDelayThreshold(publishDelayThreshold)
    .build();

Publisher publisher = Publisher.defaultBuilder(topicName)
    .setBatchingSettings(batchingSettings).build();

Go

For more on installing and creating a Cloud Pub/Sub client, refer to Cloud Pub/Sub Client Libraries.

t := client.Topic(topic)
t.PublishSettings = pubsub.PublishSettings{
	ByteThreshold:  5000,
	CountThreshold: 10,
	DelayThreshold: 100 * time.Millisecond,
}
result := t.Publish(ctx, &pubsub.Message{Data: msg})
// Block until the result is returned and a server-generated
// ID is returned for the published message.
id, err := result.Get(ctx)
if err != nil {
	return err
}
fmt.Printf("Published a message; msg ID: %v\n", id)

Python

For more on installing and creating a Cloud Pub/Sub client, refer to Cloud Pub/Sub Client Libraries.

def publish_messages_with_batch_settings(project, topic_name):
    """Publishes multiple messages to a Pub/Sub topic with batch settings."""
    # Configure the batch to publish once there is one kilobyte of data or
    # 1 second has passed.
    batch_settings = pubsub_v1.types.BatchSettings(
        max_bytes=1024,  # One kilobyte
        max_latency=1,  # One second
    )
    publisher = pubsub_v1.PublisherClient(batch_settings)
    topic_path = publisher.topic_path(project, topic_name)

    for n in range(1, 10):
        data = u'Message number {}'.format(n)
        # Data must be a bytestring
        data = data.encode('utf-8')
        publisher.publish(topic_path, data=data)

    print('Published messages.')

Retrying Requests

Publishing failures are automatically retried, except for errors that do not warrant retries. This sample code demonstrates creating a publisher with custom retry settings (note that not all client libraries support custom retry settings; see the API Reference documentation for your chosen language):

Java

For more on installing and creating a Cloud Pub/Sub client, refer to Cloud Pub/Sub Client Libraries.

// Retry settings control how the publisher handles retryable failures
Duration retryDelay = Duration.ofMillis(100); // default : 1 ms
double retryDelayMultiplier = 2.0; // back off for repeated failures
Duration maxRetryDelay = Duration.ofSeconds(5); // default : 10 seconds

RetrySettings retrySettings = RetrySettings.newBuilder()
    .setInitialRetryDelay(retryDelay)
    .setRetryDelayMultiplier(retryDelayMultiplier)
    .setMaxRetryDelay(maxRetryDelay)
    .build();

Publisher publisher = Publisher.defaultBuilder(topicName)
    .setRetrySettings(retrySettings).build();

Concurrency Control

Support for concurrency depends on your programming language. Refer to the API Reference documentation for more information.

The following sample illustrates how to control concurrency in a publisher:

Java

For more on installing and creating a Cloud Pub/Sub client, refer to Cloud Pub/Sub Client Libraries.

// create a publisher with a single threaded executor
ExecutorProvider executorProvider = InstantiatingExecutorProvider.newBuilder()
    .setExecutorThreadCount(1).build();
Publisher publisher = Publisher.defaultBuilder(topicName)
    .setExecutorProvider(executorProvider).build();

Go

For more on installing and creating a Cloud Pub/Sub client, refer to Cloud Pub/Sub Client Libraries.

t := client.Topic(topic)
t.PublishSettings = pubsub.PublishSettings{
	NumGoroutines: 1,
}
result := t.Publish(ctx, &pubsub.Message{Data: msg})
// Block until the result is returned and a server-generated
// ID is returned for the published message.
id, err := result.Get(ctx)
if err != nil {
	return err
}
fmt.Printf("Published a message; msg ID: %v\n", id)

Monitor your resources on the go

Get the Google Cloud Console app to help you manage your projects.

Send feedback about...

Cloud Pub/Sub