Receive messages of Avro schema type

Receive a message of Avro schema type, convert the message data to an object of a generated Avro class, and acknowledge the message.

Explore further

For detailed documentation that includes this code sample, see the following:

Code sample

C++

Before trying this sample, follow the C++ setup instructions in the Pub/Sub quickstart using client libraries. For more information, see the Pub/Sub C++ API reference documentation.

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment.

namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::StatusOr;
return [](pubsub::Subscriber subscriber) {
  auto session = subscriber.Subscribe(
      [](pubsub::Message const& m, pubsub::AckHandler h) {
        std::cout << "Message contents: " << m.data() << "\n";
        std::move(h).ack();
      });
  return session;
}

C#

Before trying this sample, follow the C# setup instructions in the Pub/Sub quickstart using client libraries. For more information, see the Pub/Sub C# API reference documentation.

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment.


using Avro.IO;
using Avro.Specific;
using Google.Api.Gax;
using Google.Cloud.PubSub.V1;
using Newtonsoft.Json;
using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;

public class PullAvroMessagesAsyncSample
{
    public async Task<int> PullAvroMessagesAsync(string projectId, string subscriptionId, bool acknowledge)
    {
        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);
        int messageCount = 0;
        SubscriberClient subscriber = await new SubscriberClientBuilder
        {
            SubscriptionName = subscriptionName,
            Settings = new SubscriberClient.Settings
            {
                AckExtensionWindow = TimeSpan.FromSeconds(4),
                AckDeadline = TimeSpan.FromSeconds(10),
                FlowControlSettings = new FlowControlSettings(maxOutstandingElementCount: 100, maxOutstandingByteCount: 10240)
            }
        }.BuildAsync();
        // SubscriberClient runs your message handle function on multiple
        // threads to maximize throughput.
        Task startTask = subscriber.StartAsync((PubsubMessage message, CancellationToken cancel) =>
        {
            string encoding = message.Attributes["googclient_schemaencoding"];
            AvroUtilities.State state = new AvroUtilities.State();
            switch (encoding)
            {
                case "BINARY":
                    using (var ms = new MemoryStream(message.Data.ToByteArray()))
                    {
                        var decoder = new BinaryDecoder(ms);
                        var reader = new SpecificDefaultReader(state.Schema, state.Schema);
                        reader.Read<AvroUtilities.State>(state, decoder);
                    }
                    break;
                case "JSON":
                    state = JsonConvert.DeserializeObject<AvroUtilities.State>(message.Data.ToStringUtf8());
                    break;
                default:
                    Console.WriteLine($"Encoding not provided in message.");
                    break;
            }
            Console.WriteLine($"Message {message.MessageId}: {state}");
            Interlocked.Increment(ref messageCount);
            return Task.FromResult(acknowledge ? SubscriberClient.Reply.Ack : SubscriberClient.Reply.Nack);
        });
        // Run for 5 seconds.
        await Task.Delay(5000);
        await subscriber.StopAsync(CancellationToken.None);
        // Lets make sure that the start task finished successfully after the call to stop.
        await startTask;
        return messageCount;
    }
}

Go

Before trying this sample, follow the Go setup instructions in the Pub/Sub quickstart using client libraries. For more information, see the Pub/Sub Go API reference documentation.

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment.

import (
	"context"
	"fmt"
	"io"
	"os"
	"sync"
	"time"

	"cloud.google.com/go/pubsub"
	"github.com/linkedin/goavro/v2"
)

func subscribeWithAvroSchema(w io.Writer, projectID, subID, avscFile string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	// avscFile = "path/to/an/avro/schema/file(.avsc)/formatted/in/json"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}

	avroSchema, err := os.ReadFile(avscFile)
	if err != nil {
		return fmt.Errorf("ioutil.ReadFile err: %w", err)
	}
	codec, err := goavro.NewCodec(string(avroSchema))
	if err != nil {
		return fmt.Errorf("goavro.NewCodec err: %w", err)
	}

	sub := client.Subscription(subID)
	ctx2, cancel := context.WithTimeout(ctx, 10*time.Second)
	defer cancel()

	var mu sync.Mutex
	sub.Receive(ctx2, func(ctx context.Context, msg *pubsub.Message) {
		mu.Lock()
		defer mu.Unlock()
		encoding := msg.Attributes["googclient_schemaencoding"]

		var state map[string]interface{}
		if encoding == "BINARY" {
			data, _, err := codec.NativeFromBinary(msg.Data)
			if err != nil {
				fmt.Fprintf(w, "codec.NativeFromBinary err: %v\n", err)
				msg.Nack()
				return
			}
			fmt.Fprintf(w, "Received a binary-encoded message:\n%#v\n", data)
			state = data.(map[string]interface{})
		} else if encoding == "JSON" {
			data, _, err := codec.NativeFromTextual(msg.Data)
			if err != nil {
				fmt.Fprintf(w, "codec.NativeFromTextual err: %v\n", err)
				msg.Nack()
				return
			}
			fmt.Fprintf(w, "Received a JSON-encoded message:\n%#v\n", data)
			state = data.(map[string]interface{})
		} else {
			fmt.Fprintf(w, "Unknown message type(%s), nacking\n", encoding)
			msg.Nack()
			return
		}
		fmt.Fprintf(w, "%s is abbreviated as %s\n", state["name"], state["post_abbr"])
		msg.Ack()
	})
	return nil
}

Java

Before trying this sample, follow the Java setup instructions in the Pub/Sub quickstart using client libraries. For more information, see the Pub/Sub Java API reference documentation.

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment.


import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import utilities.State;

public class SubscribeWithAvroSchemaExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    // Use an existing subscription.
    String subscriptionId = "your-subscription-id";

    subscribeWithAvroSchemaExample(projectId, subscriptionId);
  }

  public static void subscribeWithAvroSchemaExample(String projectId, String subscriptionId) {

    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    // Prepare a reader for the encoded Avro records.
    SpecificDatumReader<State> reader = new SpecificDatumReader<>(State.getClassSchema());

    // Instantiate an asynchronous message receiver.
    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          ByteString data = message.getData();

          // Get the schema encoding type.
          String encoding = message.getAttributesMap().get("googclient_schemaencoding");

          // Send the message data to a byte[] input stream.
          InputStream inputStream = new ByteArrayInputStream(data.toByteArray());

          Decoder decoder = null;

          // Prepare an appropriate decoder for the message data in the input stream
          // based on the schema encoding type.
          block:
          try {
            switch (encoding) {
              case "BINARY":
                decoder = DecoderFactory.get().directBinaryDecoder(inputStream, /*reuse=*/ null);
                System.out.println("Receiving a binary-encoded message:");
                break;
              case "JSON":
                decoder = DecoderFactory.get().jsonDecoder(State.getClassSchema(), inputStream);
                System.out.println("Receiving a JSON-encoded message:");
                break;
              default:
                break block;
            }

            // Obtain an object of the generated Avro class using the decoder.
            State state = reader.read(null, decoder);
            System.out.println(state.getName() + " is abbreviated as " + state.getPostAbbr());

          } catch (IOException e) {
            System.err.println(e);
          }

          // Ack the message.
          consumer.ack();
        };

    Subscriber subscriber = null;
    try {
      subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
      subscriber.startAsync().awaitRunning();
      System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
    } catch (TimeoutException timeoutException) {
      subscriber.stopAsync();
    }
  }
}

Node.js

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const timeout = 60;

// Imports the Google Cloud client library
const {PubSub, Schema, Encodings} = require('@google-cloud/pubsub');

// Node FS library, to load definitions
const fs = require('fs');

// And the Apache Avro library
const avro = require('avro-js');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

function listenForAvroRecords(subscriptionNameOrId, timeout) {
  // References an existing subscription
  const subscription = pubSubClient.subscription(subscriptionNameOrId);

  // Make an encoder using the official avro-js library.
  const definition = fs
    .readFileSync('system-test/fixtures/provinces.avsc')
    .toString();
  const type = avro.parse(definition);

  // Create an event handler to handle messages
  let messageCount = 0;
  const messageHandler = async message => {
    // "Ack" (acknowledge receipt of) the message
    message.ack();

    // Get the schema metadata from the message.
    const schemaMetadata = Schema.metadataFromMessage(message.attributes);

    let result;
    switch (schemaMetadata.encoding) {
      case Encodings.Binary:
        result = type.fromBuffer(message.data);
        break;
      case Encodings.Json:
        result = type.fromString(message.data.toString());
        break;
      default:
        console.log(`Unknown schema encoding: ${schemaMetadata.encoding}`);
        break;
    }

    console.log(`Received message ${message.id}:`);
    console.log(`\tData: ${JSON.stringify(result, null, 4)}`);
    console.log(`\tAttributes: ${message.attributes}`);
    messageCount += 1;
  };

  // Listen for new messages until timeout is hit
  subscription.on('message', messageHandler);

  setTimeout(() => {
    subscription.removeListener('message', messageHandler);
    console.log(`${messageCount} message(s) received.`);
  }, timeout * 1000);
}

Node.js

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const timeout = 60;

// Imports the Google Cloud client library
import {PubSub, Schema, Encodings, Message} from '@google-cloud/pubsub';

// Node FS library, to load definitions
import * as fs from 'fs';

// And the Apache Avro library
import * as avro from 'avro-js';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

function listenForAvroRecords(subscriptionNameOrId: string, timeout: number) {
  // References an existing subscription
  const subscription = pubSubClient.subscription(subscriptionNameOrId);

  // Make an encoder using the official avro-js library.
  const definition = fs
    .readFileSync('system-test/fixtures/provinces.avsc')
    .toString();
  const type = avro.parse(definition);

  // Create an event handler to handle messages
  let messageCount = 0;
  const messageHandler = async (message: Message) => {
    // "Ack" (acknowledge receipt of) the message
    message.ack();

    // Get the schema metadata from the message.
    const schemaMetadata = Schema.metadataFromMessage(message.attributes);

    let result: object | undefined;
    switch (schemaMetadata.encoding) {
      case Encodings.Binary:
        result = type.fromBuffer(message.data);
        break;
      case Encodings.Json:
        result = type.fromString(message.data.toString());
        break;
      default:
        console.log(`Unknown schema encoding: ${schemaMetadata.encoding}`);
        break;
    }

    console.log(`Received message ${message.id}:`);
    console.log(`\tData: ${JSON.stringify(result, null, 4)}`);
    console.log(`\tAttributes: ${message.attributes}`);
    messageCount += 1;
  };

  // Listen for new messages until timeout is hit
  subscription.on('message', messageHandler);

  setTimeout(() => {
    subscription.removeListener('message', messageHandler);
    console.log(`${messageCount} message(s) received.`);
  }, timeout * 1000);
}

PHP

Before trying this sample, follow the PHP setup instructions in the Pub/Sub quickstart using client libraries. For more information, see the Pub/Sub PHP API reference documentation.

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment.

use Google\Cloud\PubSub\PubSubClient;

/**
 * Subscribe and pull messages using an AVRO schema.
 *
 * @param string $projectId
 * @param string $subscriptionId
 */
function subscribe_avro_records($projectId, $subscriptionId, $definitionFile)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);

    $subscription = $pubsub->subscription($subscriptionId);
    $definition = file_get_contents($definitionFile);
    $messages = $subscription->pull();

    foreach ($messages as $message) {
        $decodedMessageData = '';
        $encoding = $message->attribute('googclient_schemaencoding');
        switch ($encoding) {
            case 'BINARY':
                $io = new \AvroStringIO($message->data());
                $schema = \AvroSchema::parse($definition);
                $reader = new \AvroIODatumReader($schema);
                $decoder = new \AvroIOBinaryDecoder($io);
                $decodedMessageData = json_encode($reader->read($decoder));
                break;
            case 'JSON':
                $decodedMessageData = $message->data();
                break;
        }

        printf('Received a %d-encoded message %s', $encoding, $decodedMessageData);
    }
}

Python

Before trying this sample, follow the Python setup instructions in the Pub/Sub quickstart using client libraries. For more information, see the Pub/Sub Python API reference documentation.

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment.

import avro.schema as schema
from avro.io import BinaryDecoder, DatumReader
from concurrent.futures import TimeoutError
import io
import json
from google.cloud.pubsub import SubscriberClient

# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# avsc_file = "path/to/an/avro/schema/file/(.avsc)/formatted/in/json"
# Number of seconds the subscriber listens for messages
# timeout = 5.0

subscriber = SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)

with open(avsc_file, "rb") as file:
    avro_schema = schema.parse(file.read())

def callback(message: pubsub_v1.subscriber.message.Message) -> None:
    # Get the message serialization type.
    encoding = message.attributes.get("googclient_schemaencoding")
    # Deserialize the message data accordingly.
    if encoding == "BINARY":
        bout = io.BytesIO(message.data)
        decoder = BinaryDecoder(bout)
        reader = DatumReader(avro_schema)
        message_data = reader.read(decoder)
        print(f"Received a binary-encoded message:\n{message_data}")
    elif encoding == "JSON":
        message_data = json.loads(message.data)
        print(f"Received a JSON-encoded message:\n{message_data}")
    else:
        print(f"Received a message with no encoding:\n{message}")

    message.ack()

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..\n")

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
    try:
        # When `timeout` is not set, result() will block indefinitely,
        # unless an exception occurs first.
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError:
        streaming_pull_future.cancel()  # Trigger the shutdown.
        streaming_pull_future.result()  # Block until the shutdown is complete.

Ruby

Before trying this sample, follow the Ruby setup instructions in the Pub/Sub quickstart using client libraries. For more information, see the Pub/Sub Ruby API reference documentation.

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment.

# subscription_id = "your-subscription-id"
# avsc_file = "path/to/an/avro/schema/file/(.avsc)/formatted/in/json"

pubsub = Google::Cloud::Pubsub.new

subscription = pubsub.subscription subscription_id

subscriber = subscription.listen do |received_message|
  encoding = received_message.attributes["googclient_schemaencoding"]
  case encoding
  when "BINARY"
    require "avro"
    avro_schema = Avro::Schema.parse File.read(avsc_file)
    buffer = StringIO.new received_message.data
    decoder = Avro::IO::BinaryDecoder.new buffer
    reader = Avro::IO::DatumReader.new avro_schema
    message_data = reader.read decoder
    puts "Received a binary-encoded message:\n#{message_data}"
  when "JSON"
    require "json"
    message_data = JSON.parse received_message.data
    puts "Received a JSON-encoded message:\n#{message_data}"
  else
    "Received a message with no encoding:\n#{received_message.message_id}"
  end
  received_message.acknowledge!
end

subscriber.start
# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60
subscriber.stop.wait!

What's next

To search and filter code samples for other Google Cloud products, see the Google Cloud sample browser.