Receive messages of Avro schema type

Receive a message of Avro schema type, convert the message data to an object of a generated Avro class, and acknowledge the message.

Documentation pages that include this code sample

To view the code sample used in context, see the following documentation:

Code sample


Before trying this sample, follow the C# setup instructions in the Pub/Sub quickstart using client libraries. For more information, see the Pub/Sub C# API reference documentation.

using Avro.IO;
using Avro.Specific;
using Google.Api.Gax;
using Google.Cloud.PubSub.V1;
using Newtonsoft.Json;
using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;

public class PullAvroMessagesAsyncSample
    public async Task<int> PullAvroMessagesAsync(string projectId, string subscriptionId, bool acknowledge)
        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);
        int messageCount = 0;
        SubscriberClient subscriber = await SubscriberClient.CreateAsync(subscriptionName,
            settings: new SubscriberClient.Settings()
                AckExtensionWindow = TimeSpan.FromSeconds(4),
                AckDeadline = TimeSpan.FromSeconds(10),
                FlowControlSettings = new FlowControlSettings(maxOutstandingElementCount: 100, maxOutstandingByteCount: 10240)
        // SubscriberClient runs your message handle function on multiple
        // threads to maximize throughput.
        Task startTask = subscriber.StartAsync((PubsubMessage message, CancellationToken cancel) =>
            string encoding = message.Attributes["googclient_schemaencoding"];
            AvroUtilities.State state = new AvroUtilities.State();
            switch (encoding)
                case "BINARY":
                    using (var ms = new MemoryStream(message.Data.ToByteArray()))
                        var decoder = new BinaryDecoder(ms);
                        var reader = new SpecificDefaultReader(state.Schema, state.Schema);
                        reader.Read<AvroUtilities.State>(state, decoder);
                case "JSON":
                    state = JsonConvert.DeserializeObject<AvroUtilities.State>(message.Data.ToStringUtf8());
                    Console.WriteLine($"Encoding not provided in message.");
            Console.WriteLine($"Message {message.MessageId}: {state}");
            Interlocked.Increment(ref messageCount);
            return Task.FromResult(acknowledge ? SubscriberClient.Reply.Ack : SubscriberClient.Reply.Nack);
        // Run for 5 seconds.
        await Task.Delay(5000);
        await subscriber.StopAsync(CancellationToken.None);
        // Lets make sure that the start task finished successfully after the call to stop.
        await startTask;
        return messageCount;


Before trying this sample, follow the C++ setup instructions in the Pub/Sub quickstart using client libraries. For more information, see the Pub/Sub C++ API reference documentation.

namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::StatusOr;
return [](pubsub::Subscriber subscriber) {
  auto session = subscriber.Subscribe(
      [](pubsub::Message const& m, pubsub::AckHandler h) {
        std::cout << "Message contents: " << << "\n";
  return session;


Before trying this sample, follow the Go setup instructions in the Pub/Sub quickstart using client libraries. For more information, see the Pub/Sub Go API reference documentation.

import (


func subscribeWithAvroSchema(w io.Writer, projectID, subID, avscFile string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	// avscFile = "path/to/an/avro/schema/file(.avsc)/formatted/in/json"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %v", err)

	avroSchema, err := ioutil.ReadFile(avscFile)
	if err != nil {
		return fmt.Errorf("ioutil.ReadFile err: %v", err)
	codec, err := goavro.NewCodec(string(avroSchema))
	if err != nil {
		return fmt.Errorf("goavro.NewCodec err: %v", err)

	sub := client.Subscription(subID)
	ctx2, cancel := context.WithTimeout(ctx, 10*time.Second)
	defer cancel()

	var mu sync.Mutex
	sub.Receive(ctx2, func(ctx context.Context, msg *pubsub.Message) {
		defer mu.Unlock()
		encoding := msg.Attributes["googclient_schemaencoding"]

		if encoding == "BINARY" {
			data, _, err := codec.NativeFromBinary(msg.Data)
			if err != nil {
				fmt.Fprintf(w, "codec.NativeFromBinary err: %v", err)
			fmt.Printf("Received a binary-encoded message:\n%#v", data)
		} else if encoding == "JSON" {
			data, _, err := codec.NativeFromTextual(msg.Data)
			if err != nil {
				fmt.Fprintf(w, "codec.NativeFromTextual err: %v", err)
			fmt.Fprintf(w, "Received a JSON-encoded message:\n%#v", data)
		} else {
			fmt.Fprintf(w, "invalid encoding: %s", encoding)
	return nil


Before trying this sample, follow the Java setup instructions in the Pub/Sub quickstart using client libraries. For more information, see the Pub/Sub Java API reference documentation.

import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.avro.specific.SpecificDatumReader;
import utilities.State;

public class SubscribeWithAvroSchemaExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    // Use an existing subscription.
    String subscriptionId = "your-subscription-id";

    subscribeWithAvroSchemaExample(projectId, subscriptionId);

  public static void subscribeWithAvroSchemaExample(String projectId, String subscriptionId) {

    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    // Prepare a reader for the encoded Avro records.
    SpecificDatumReader<State> reader = new SpecificDatumReader<>(State.getClassSchema());

    // Instantiate an asynchronous message receiver.
    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          ByteString data = message.getData();

          // Get the schema encoding type.
          String encoding = message.getAttributesMap().get("googclient_schemaencoding");

          // Send the message data to a byte[] input stream.
          InputStream inputStream = new ByteArrayInputStream(data.toByteArray());

          Decoder decoder = null;

          // Prepare an appropriate decoder for the message data in the input stream
          // based on the schema encoding type.
          try {
            switch (encoding) {
              case "BINARY":
                decoder = DecoderFactory.get().directBinaryDecoder(inputStream, /*reuse=*/ null);
                System.out.println("Receiving a binary-encoded message:");
              case "JSON":
                decoder = DecoderFactory.get().jsonDecoder(State.getClassSchema(), inputStream);
                System.out.println("Receiving a JSON-encoded message:");
                break block;

            // Obtain an object of the generated Avro class using the decoder.
            State state =, decoder);
            System.out.println(state.getName() + " is abbreviated as " + state.getPostAbbr());

          } catch (IOException e) {

          // Ack the message.

    Subscriber subscriber = null;
    try {
      subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
      System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
    } catch (TimeoutException timeoutException) {


Before trying this sample, follow the Node.js setup instructions in the Pub/Sub quickstart using client libraries. For more information, see the Pub/Sub Node.js API reference documentation.

 * TODO(developer): Uncomment these variables before running the sample.
// const subscriptionName = 'YOUR_SUBSCRIPTION_NAME';
// const timeout = 60;

// Imports the Google Cloud client library
const {PubSub, Schema, Encodings} = require('@google-cloud/pubsub');

// Node FS library, to load definitions
const fs = require('fs');

// And the Apache Avro library
const avro = require('avro-js');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

function listenForAvroRecords(subscriptionName, timeout) {
  // References an existing subscription
  const subscription = pubSubClient.subscription(subscriptionName);

  // Make an encoder using the official avro-js library.
  const definition = fs
  const type = avro.parse(definition);

  // Create an event handler to handle messages
  let messageCount = 0;
  const messageHandler = async message => {
    // "Ack" (acknowledge receipt of) the message

    // Get the schema metadata from the message.
    const schemaMetadata = Schema.metadataFromMessage(message.attributes);

    let result;
    switch (schemaMetadata.encoding) {
      case Encodings.Binary:
        result = type.fromBuffer(;
      case Encodings.Json:
        result = type.fromString(;
        console.log(`Unknown schema encoding: ${schemaMetadata.encoding}`);

    console.log(`Received message ${}:`);
    console.log(`\tData: ${JSON.stringify(result, null, 4)}`);
    console.log(`\tAttributes: ${message.attributes}`);
    messageCount += 1;

  // Listen for new messages until timeout is hit
  subscription.on('message', messageHandler);

  setTimeout(() => {
    subscription.removeListener('message', messageHandler);
    console.log(`${messageCount} message(s) received.`);
  }, timeout * 1000);


Before trying this sample, follow the PHP setup instructions in the Pub/Sub quickstart using client libraries. For more information, see the Pub/Sub PHP API reference documentation.

use Google\Cloud\PubSub\PubSubClient;

 * Subscribe and pull messages using an AVRO schema.
 * @param string $projectId
 * @param string $subscriptionId
function subscribe_avro_records($projectId, $subscriptionId)
    $pubsub = new PubSubClient([
        'projectId' => $projectId,

    $subscription = $pubsub->subscription($subscriptionId);
    $messages = $subscription->pull();

    foreach ($messages as $message) {
        $decodedMessageData = '';
        $encoding = $message->attribute('googclient_schemaencoding');
        switch ($encoding) {
            case 'BINARY':
                $ioReader = new \AvroStringIO(base64_decode($message->data()));
                $dataReader = new \AvroDataIOReader($ioReader, new \AvroIODatumReader());

                $decodedMessageData = json_encode($dataReader->data());
            case 'JSON':
                $decodedMessageData = $message->data();

        printf('Received a %d-encoded message %s', $encoding, $decodedMessageData);


Before trying this sample, follow the Python setup instructions in the Pub/Sub quickstart using client libraries. For more information, see the Pub/Sub Python API reference documentation.

import avro
from import BinaryDecoder, DatumReader
from concurrent.futures import TimeoutError
import io
import json
from import SubscriberClient

# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# avsc_file = "path/to/an/avro/schema/file/(.avsc)/formatted/in/json"
# Number of seconds the subscriber listens for messages
# timeout = 5.0

subscriber = SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)

avro_schema = avro.schema.parse(open(avsc_file, "rb").read())

def callback(message: pubsub_v1.subscriber.message.Message) -> None:
    # Get the message serialization type.
    encoding = message.attributes.get("googclient_schemaencoding")
    # Deserialize the message data accordingly.
    if encoding == "BINARY":
        bout = io.BytesIO(
        decoder = BinaryDecoder(bout)
        reader = DatumReader(avro_schema)
        message_data =
        print(f"Received a binary-encoded message:\n{message_data}")
    elif encoding == "JSON":
        message_data = json.loads(
        print(f"Received a JSON-encoded message:\n{message_data}")
        print(f"Received a message with no encoding:\n{message}")


streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..\n")

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
        # When `timeout` is not set, result() will block indefinitely,
        # unless an exception occurs first.
    except TimeoutError:
        streaming_pull_future.cancel()  # Trigger the shutdown.
        streaming_pull_future.result()  # Block until the shutdown is complete.


Before trying this sample, follow the Ruby setup instructions in the Pub/Sub quickstart using client libraries. For more information, see the Pub/Sub Ruby API reference documentation.

# subscription_id = "your-subscription-id"
# avsc_file = "path/to/an/avro/schema/file/(.avsc)/formatted/in/json"
require "google/cloud/pubsub"

pubsub =

subscription = pubsub.subscription subscription_id

subscriber = subscription.listen do |received_message|
  encoding = received_message.attributes["googclient_schemaencoding"]
  case encoding
  when "BINARY"
    require "avro"
    avro_schema = Avro::Schema.parse
    buffer =
    decoder = buffer
    reader = avro_schema
    message_data = decoder
    puts "Received a binary-encoded message:\n#{message_data}"
  when "JSON"
    require "json"
    message_data = JSON.parse
    puts "Received a JSON-encoded message:\n#{message_data}"
    "Received a message with no encoding:\n#{received_message.message_id}"

# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60

What's next

To search and filter code samples for other Google Cloud products, see the Google Cloud sample browser.