Publish messages of protobuf schema type

Stay organized with collections Save and categorize content based on your preferences.

Publish messages that conform to a protocol buffer schema to a topic with a protocol buffer schema attached.

Explore further

For detailed documentation that includes this code sample, see the following:

Code sample


Before trying this sample, follow the C# setup instructions in the Pub/Sub quickstart using client libraries. For more information, see the Pub/Sub C# API reference documentation.

using Google.Cloud.PubSub.V1;
using Google.Protobuf;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

public class PublishProtoMessagesAsyncSample
    public async Task<int> PublishProtoMessagesAsync(string projectId, string topicId, IEnumerable<Utilities.State> messageStates)
        TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
        PublisherClient publisher = await PublisherClient.CreateAsync(topicName);

        PublisherServiceApiClient publishApi = PublisherServiceApiClient.Create();
        var topic = publishApi.GetTopic(topicName);

        int publishedMessageCount = 0;
        var publishTasks = messageStates.Select(async state =>
                string messageId = null;
                switch (topic.SchemaSettings.Encoding)
                    case Encoding.Binary:
                        var binaryMessage = state.ToByteString();
                        messageId = await publisher.PublishAsync(binaryMessage);
                    case Encoding.Json:
                        var jsonMessage = JsonFormatter.Default.Format(state);
                        messageId = await publisher.PublishAsync(jsonMessage);
                Console.WriteLine($"Published message {messageId}");
                Interlocked.Increment(ref publishedMessageCount);
            catch (Exception exception)
                Console.WriteLine($"An error ocurred when publishing message {state}: {exception.Message}");
        await Task.WhenAll(publishTasks);
        return publishedMessageCount;


Before trying this sample, follow the C++ setup instructions in the Pub/Sub quickstart using client libraries. For more information, see the Pub/Sub C++ API reference documentation.

namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::StatusOr;
[](pubsub::Publisher publisher) {
  std::vector<std::pair<std::string, std::string>> states{
      {"New York", "NY"},
      {"Pennsylvania", "PA"},
  std::vector<future<void>> done;
  auto handler = [](future<StatusOr<std::string>> f) {
    auto id = f.get();
    if (!id) throw std::runtime_error(id.status().message());
  for (auto const& data : states) {
    google::cloud::pubsub::samples::State state;
  // Block until all messages are published.
  for (auto& d : done) d.get();


Before trying this sample, follow the Go setup instructions in the Pub/Sub quickstart using client libraries. For more information, see the Pub/Sub Go API reference documentation.

import (

	statepb ""

func publishProtoMessages(w io.Writer, projectID, topicID string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %v", err)

	state := &statepb.State{
		Name:     "Alaska",
		PostAbbr: "AK",

	// Get the topic encoding type.
	t := client.Topic(topicID)
	cfg, err := t.Config(ctx)
	if err != nil {
		return fmt.Errorf("topic.Config err: %v", err)
	encoding := cfg.SchemaSettings.Encoding

	var msg []byte
	switch encoding {
	case pubsub.EncodingBinary:
		msg, err = proto.Marshal(state)
		if err != nil {
			return fmt.Errorf("proto.Marshal err: %v", err)
	case pubsub.EncodingJSON:
		msg, err = protojson.Marshal(state)
		if err != nil {
			return fmt.Errorf("protojson.Marshal err: %v", err)
		return fmt.Errorf("invalid encoding: %v", encoding)

	result := t.Publish(ctx, &pubsub.Message{
		Data: msg,
	_, err = result.Get(ctx)
	if err != nil {
		return fmt.Errorf("result.Get: %v", err)
	fmt.Fprintf(w, "Published proto message with %#v encoding: %s\n", encoding, string(msg))
	return nil


Before trying this sample, follow the Java setup instructions in the Pub/Sub quickstart using client libraries. For more information, see the Pub/Sub Java API reference documentation.

import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import utilities.StateProto.State;

public class PublishProtobufMessagesExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    // Use a topic created with a proto schema.
    String topicId = "your-topic-id";

    publishProtobufMessagesExample(projectId, topicId);

  public static void publishProtobufMessagesExample(String projectId, String topicId)
      throws IOException, ExecutionException, InterruptedException {

    Encoding encoding = null;

    TopicName topicName = TopicName.of(projectId, topicId);

    // Get the topic encoding type.
    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
      encoding = topicAdminClient.getTopic(topicName).getSchemaSettings().getEncoding();

    Publisher publisher = null;

    // Instantiate a protoc-generated class defined in `us-states.proto`.
    State state = State.newBuilder().setName("Alaska").setPostAbbr("AK").build();

    try {
      publisher = Publisher.newBuilder(topicName).build();

      PubsubMessage.Builder message = PubsubMessage.newBuilder();

      // Prepare an appropriately formatted message based on topic encoding.
      switch (encoding) {
        case BINARY:
          System.out.println("Publishing a BINARY-formatted message:\n" + message);

        case JSON:
          String jsonString = JsonFormat.printer().omittingInsignificantWhitespace().print(state);
          System.out.println("Publishing a JSON-formatted message:\n" + message);

          break block;

      // Publish the message.
      ApiFuture<String> future = publisher.publish(;
      System.out.println("Published message ID: " + future.get());

    } finally {
      if (publisher != null) {
        publisher.awaitTermination(1, TimeUnit.MINUTES);


Before trying this sample, follow the Node.js setup instructions in the Pub/Sub quickstart using client libraries. For more information, see the Pub/Sub Node.js API reference documentation.

 * TODO(developer): Uncomment this variable before running the sample.
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';

// Imports the Google Cloud client library
const {PubSub, Encodings} = require('@google-cloud/pubsub');

// And the protobufjs library
const protobuf = require('protobufjs');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function publishProtobufMessages(topicNameOrId) {
  // Get the topic metadata to learn about its schema.
  const topic = pubSubClient.topic(topicNameOrId);
  const [topicMetadata] = await topic.getMetadata();
  const topicSchemaMetadata = topicMetadata.schemaSettings;

  if (!topicSchemaMetadata) {
    console.log(`Topic ${topicNameOrId} doesn't seem to have a schema.`);
  const schemaEncoding = topicSchemaMetadata.encoding;

  // Encode the message.
  const province = {
    name: 'Ontario',
    postAbbr: 'ON',

  // Make an encoder using the protobufjs library.
  // Since we're providing the test message for a specific schema here, we'll
  // also code in the path to a sample proto definition.
  const root = await protobuf.load('system-test/fixtures/provinces.proto');
  const Province = root.lookupType('utilities.Province');
  const message = Province.create(province);

  let dataBuffer;
  switch (schemaEncoding) {
    case Encodings.Binary:
      dataBuffer = Buffer.from(Province.encode(message).finish());
    case Encodings.Json:
      dataBuffer = Buffer.from(JSON.stringify(message.toJSON()));
      console.log(`Unknown schema encoding: ${schemaEncoding}`);
  if (!dataBuffer) {
    console.log(`Invalid encoding ${schemaEncoding} on the topic.`);

  const messageId = await topic.publish(dataBuffer);
  console.log(`Protobuf message ${messageId} published.`);


Before trying this sample, follow the PHP setup instructions in the Pub/Sub quickstart using client libraries. For more information, see the Pub/Sub PHP API reference documentation.

use Google\Cloud\PubSub\PubSubClient;
use Google\Cloud\PubSub\V1\Encoding;

use Utilities\StateProto;

 * Publish a message using a protocol buffer schema.
 * Relies on a proto message of the following form:
 * ```
 * syntax = "proto3";
 * package utilities;
 * message StateProto {
 *   string name = 1;
 *   string post_abbr = 2;
 * }
 * ```
 * @param string $projectId
 * @param string $topicId
 * @return void
function publish_proto_messages($projectId, $topicId)
    $pubsub = new PubSubClient([
        'projectId' => $projectId,

    $messageData = new StateProto([
        'name' => 'Alaska',
        'post_abbr' => 'AK',

    $topic = $pubsub->topic($topicId);

    // get the encoding type.
    $topicInfo = $topic->info();
    $encoding = '';
    if (isset($topicInfo['schemaSettings']['encoding'])) {
        $encoding = $topicInfo['schemaSettings']['encoding'];

    // if encoding is not set, we can't continue.
    if ($encoding === '') {
        printf('Topic %s does not have schema enabled', $topicId);

    // If you are using gRPC, encoding may be an integer corresponding to an
    // enum value on Google\Cloud\PubSub\V1\Encoding.
    if (!is_string($encoding)) {
        $encoding = Encoding::name($encoding);

    $encodedMessageData = '';
    if ($encoding == 'BINARY') {
        // encode as protobuf binary.
        $encodedMessageData = $messageData->serializeToString();
    } else {
        // encode as JSON.
        $encodedMessageData = $messageData->serializeToJsonString();

    $topic->publish(['data' => $encodedMessageData]);

    printf('Published message with %s encoding', $encoding);


Before trying this sample, follow the Python setup instructions in the Pub/Sub quickstart using client libraries. For more information, see the Pub/Sub Python API reference documentation.

from google.api_core.exceptions import NotFound
from import PublisherClient
from google.protobuf.json_format import MessageToJson
from google.pubsub_v1.types import Encoding

from utilities import us_states_pb2  # type: ignore

# TODO(developer): Replace these variables before running the sample.
# project_id = "your-project-id"
# topic_id = "your-topic-id"

publisher_client = PublisherClient()
topic_path = publisher_client.topic_path(project_id, topic_id)

    # Get the topic encoding type.
    topic = publisher_client.get_topic(request={"topic": topic_path})
    encoding = topic.schema_settings.encoding

    # Instantiate a protoc-generated class defined in `us-states.proto`.
    state = us_states_pb2.StateProto() = "Alaska"
    state.post_abbr = "AK"

    # Encode the data according to the message serialization type.
    if encoding == Encoding.BINARY:
        data = state.SerializeToString()
        print(f"Preparing a binary-encoded message:\n{data}")
    elif encoding == Encoding.JSON:
        json_object = MessageToJson(state)
        data = str(json_object).encode("utf-8")
        print(f"Preparing a JSON-encoded message:\n{data}")
        print(f"No encoding specified in {topic_path}. Abort.")

    future = publisher_client.publish(topic_path, data)
    print(f"Published message ID: {future.result()}")

except NotFound:
    print(f"{topic_id} not found.")


Before trying this sample, follow the Ruby setup instructions in the Pub/Sub quickstart using client libraries. For more information, see the Pub/Sub Ruby API reference documentation.

# topic_id = "your-topic-id"
require "google/cloud/pubsub"
require_relative "utilities/us-states_pb"

pubsub =

topic = pubsub.topic topic_id

state = name: "Alaska", post_abbr: "AK"

if topic.message_encoding_binary?
  topic.publish Utilities::StateProto.encode(state)
  puts "Published binary-encoded protobuf message."
elsif topic.message_encoding_json?
  topic.publish Utilities::StateProto.encode_json(state)
  puts "Published JSON-encoded protobuf message."
  raise "No encoding specified in #{}."

What's next

To search and filter code samples for other Google Cloud products, see the Google Cloud sample browser.