Receive messages of Proto schema type

Stay organized with collections Save and categorize content based on your preferences.

Receive a message of protocol buffer schema type, convert the message data to an object of a generated Proto class, and acknowledge the message.

Explore further

For detailed documentation that includes this code sample, see the following:

Code sample


Before trying this sample, follow the C# setup instructions in the Pub/Sub quickstart using client libraries. For more information, see the Pub/Sub C# API reference documentation.

using Google.Api.Gax;
using Google.Cloud.PubSub.V1;
using System;
using System.Threading;
using System.Threading.Tasks;

public class PullProtoMessagesAsyncSample
    public async Task<int> PullProtoMessagesAsync(string projectId, string subscriptionId, bool acknowledge)
        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);
        int messageCount = 0;
        SubscriberClient subscriber = await new SubscriberClientBuilder
            SubscriptionName = subscriptionName,
            Settings = new SubscriberClient.Settings
                AckExtensionWindow = TimeSpan.FromSeconds(4),
                AckDeadline = TimeSpan.FromSeconds(10),
                FlowControlSettings = new FlowControlSettings(maxOutstandingElementCount: 100, maxOutstandingByteCount: 10240)
        // SubscriberClient runs your message handle function on multiple
        // threads to maximize throughput.
        Task startTask = subscriber.StartAsync((PubsubMessage message, CancellationToken cancel) =>
            string encoding = message.Attributes["googclient_schemaencoding"];
            Utilities.State state = null;
            switch (encoding)
                case "BINARY":
                    state = Utilities.State.Parser.ParseFrom(message.Data.ToByteArray());
                case "JSON":
                    state = Utilities.State.Parser.ParseJson(message.Data.ToStringUtf8());
                    Console.WriteLine($"Encoding not provided in message.");
            Console.WriteLine($"Message {message.MessageId}: {state}");
            Interlocked.Increment(ref messageCount);
            return Task.FromResult(acknowledge ? SubscriberClient.Reply.Ack : SubscriberClient.Reply.Nack);
        // Run for 5 seconds.
        await Task.Delay(5000);
        await subscriber.StopAsync(CancellationToken.None);
        // Lets make sure that the start task finished successfully after the call to stop.
        await startTask;
        return messageCount;


Before trying this sample, follow the C++ setup instructions in the Pub/Sub quickstart using client libraries. For more information, see the Pub/Sub C++ API reference documentation.

namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::StatusOr;
return [](pubsub::Subscriber subscriber) {
  auto session = subscriber.Subscribe(
      [](pubsub::Message const& m, pubsub::AckHandler h) {
        google::cloud::pubsub::samples::State state;
        std::cout << "Message contents: " << state.DebugString() << "\n";
  return session;


Before trying this sample, follow the Go setup instructions in the Pub/Sub quickstart using client libraries. For more information, see the Pub/Sub Go API reference documentation.

import (

	statepb ""

func subscribeWithProtoSchema(w io.Writer, projectID, subID, protoFile string) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	// protoFile = "path/to/a/proto/schema/file(.proto)/formatted/in/protocol/buffers"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %v", err)

	// Create an instance of the message to be decoded (a single U.S. state).
	state := &statepb.State{}

	sub := client.Subscription(subID)
	ctx2, cancel := context.WithTimeout(ctx, 10*time.Second)
	defer cancel()

	var mu sync.Mutex
	sub.Receive(ctx2, func(ctx context.Context, msg *pubsub.Message) {
		defer mu.Unlock()
		encoding := msg.Attributes["googclient_schemaencoding"]

		if encoding == "BINARY" {
			if err := proto.Unmarshal(msg.Data, state); err != nil {
				fmt.Fprintf(w, "proto.Unmarshal err: %v", err)
			fmt.Printf("Received a binary-encoded message:\n%#v", state)
		} else if encoding == "JSON" {
			if err := protojson.Unmarshal(msg.Data, state); err != nil {
				fmt.Fprintf(w, "proto.Unmarshal err: %v", err)
			fmt.Fprintf(w, "Received a JSON-encoded message:\n%#v", state)
		} else {
			fmt.Fprintf(w, "invalid encoding: %s", encoding)
	return nil


Before trying this sample, follow the Java setup instructions in the Pub/Sub quickstart using client libraries. For more information, see the Pub/Sub Java API reference documentation.

import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import utilities.StateProto.State;

public class SubscribeWithProtoSchemaExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    // Use an existing subscription.
    String subscriptionId = "your-subscription-id";

    subscribeWithProtoSchemaExample(projectId, subscriptionId);

  public static void subscribeWithProtoSchemaExample(String projectId, String subscriptionId) {

    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          ByteString data = message.getData();

          // Get the schema encoding type.
          String encoding = message.getAttributesMap().get("googclient_schemaencoding");

          try {
            switch (encoding) {
              case "BINARY":
                // Obtain an object of the generated proto class.
                State state = State.parseFrom(data);
                System.out.println("Received a BINARY-formatted message: " + state);

              case "JSON":
                State.Builder stateBuilder = State.newBuilder();
                JsonFormat.parser().merge(data.toStringUtf8(), stateBuilder);
                System.out.println("Received a JSON-formatted message:" +;

                break block;
          } catch (InvalidProtocolBufferException e) {

          System.out.println("Ack'ed the message");

    // Create subscriber client.
    Subscriber subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();

    try {
      System.out.printf("Listening for messages on %s:\n", subscriptionName);
      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
    } catch (TimeoutException timeoutException) {


Before trying this sample, follow the Node.js setup instructions in the Pub/Sub quickstart using client libraries. For more information, see the Pub/Sub Node.js API reference documentation.

 * TODO(developer): Uncomment these variables before running the sample.
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const timeout = 60;

// Imports the Google Cloud client library
const {PubSub, Schema, Encodings} = require('@google-cloud/pubsub');

// And the protobufjs library
const protobuf = require('protobufjs');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function listenForProtobufMessages(subscriptionNameOrId, timeout) {
  // References an existing subscription
  const subscription = pubSubClient.subscription(subscriptionNameOrId);

  // Make an decoder using the protobufjs library.
  // Since we're providing the test message for a specific schema here, we'll
  // also code in the path to a sample proto definition.
  const root = protobuf.loadSync('system-test/fixtures/provinces.proto');
  const Province = root.lookupType('utilities.Province');

  // Create an event handler to handle messages
  let messageCount = 0;
  const messageHandler = async message => {
    // "Ack" (acknowledge receipt of) the message

    // Get the schema metadata from the message.
    const schemaMetadata = Schema.metadataFromMessage(message.attributes);

    let result;
    switch (schemaMetadata.encoding) {
      case Encodings.Binary:
        result = Province.decode(;
      case Encodings.Json:
        // This doesn't require decoding with the protobuf library,
        // since it's plain JSON. But you can still validate it against
        // your schema.
        result = JSON.parse(;
        console.log(`Validation of JSON: ${Province.verify(result)}`);
        console.log(`Unknown schema encoding: ${schemaMetadata.encoding}`);

    console.log(`Received message ${}:`);
    console.log(`\tData: ${JSON.stringify(result, null, 4)}`);
    console.log(`\tAttributes: ${JSON.stringify(message.attributes, null, 4)}`);
    messageCount += 1;

  // Listen for new messages until timeout is hit
  subscription.on('message', messageHandler);

  setTimeout(() => {
    subscription.removeListener('message', messageHandler);
    console.log(`${messageCount} message(s) received.`);
  }, timeout * 1000);


Before trying this sample, follow the PHP setup instructions in the Pub/Sub quickstart using client libraries. For more information, see the Pub/Sub PHP API reference documentation.

use Google\Cloud\PubSub\PubSubClient;

 * Subscribe and pull messages using a protocol buffer schema.
 * Relies on a proto message of the following form:
 * ```
 * syntax = "proto3";
 * package utilities;
 * message StateProto {
 *   string name = 1;
 *   string post_abbr = 2;
 * }
 * ```
 * @param string $projectId
 * @param string $subscriptionId
function subscribe_proto_messages($projectId, $subscriptionId)
    $pubsub = new PubSubClient([
        'projectId' => $projectId,

    $subscription = $pubsub->subscription($subscriptionId);
    $messages = $subscription->pull();

    foreach ($messages as $message) {
        $decodedMessageData = '';
        $encoding = $message->attribute('googclient_schemaencoding');
        switch ($encoding) {
            case 'BINARY':
                $protobufMessage = new \Utilities\StateProto();

                $decodedMessageData = $protobufMessage->serializeToJsonString();
            case 'JSON':
                $decodedMessageData = $message->data();

        printf('Received a %d-encoded message %s', $encoding, $decodedMessageData);


Before trying this sample, follow the Python setup instructions in the Pub/Sub quickstart using client libraries. For more information, see the Pub/Sub Python API reference documentation.

from concurrent.futures import TimeoutError
from import SubscriberClient
from google.protobuf.json_format import Parse

from utilities import us_states_pb2

# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# Number of seconds the subscriber listens for messages
# timeout = 5.0

subscriber = SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)

# Instantiate a protoc-generated class defined in `us-states.proto`.
state = us_states_pb2.StateProto()

def callback(message: pubsub_v1.subscriber.message.Message) -> None:
    # Get the message serialization type.
    encoding = message.attributes.get("googclient_schemaencoding")
    # Deserialize the message data accordingly.
    if encoding == "BINARY":
        print("Received a binary-encoded message:\n{state}")
    elif encoding == "JSON":
        Parse(, state)
        print(f"Received a JSON-encoded message:\n{state}")
        print(f"Received a message with no encoding:\n{message}")


streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..\n")

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
        # When `timeout` is not set, result() will block indefinitely,
        # unless an exception occurs first.
    except TimeoutError:
        streaming_pull_future.cancel()  # Trigger the shutdown.
        streaming_pull_future.result()  # Block until the shutdown is complete.


Before trying this sample, follow the Ruby setup instructions in the Pub/Sub quickstart using client libraries. For more information, see the Pub/Sub Ruby API reference documentation.

# subscription_id = "your-subscription-id"
require "google/cloud/pubsub"
require_relative "utilities/us-states_pb"

pubsub =

subscription = pubsub.subscription subscription_id

subscriber = subscription.listen do |received_message|
  encoding = received_message.attributes["googclient_schemaencoding"]
  case encoding
  when "BINARY"
    state = Utilities::StateProto.decode
    puts "Received a binary-encoded message:\n#{state}"
  when "JSON"
    require "json"
    state = Utilities::StateProto.decode_json
    puts "Received a JSON-encoded message:\n#{state}"
    "Received a message with no encoding:\n#{received_message.message_id}"

# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60

What's next

To search and filter code samples for other Google Cloud products, see the Google Cloud sample browser.