Publish messages of Avro schema type

Publish messages that conform to an Avro schema to a topic with an Avro schema attached.

Explore further

For detailed documentation that includes this code sample, see the following:

Code sample


Before trying this sample, follow the C# setup instructions in the Pub/Sub quickstart using client libraries. For more information, see the Pub/Sub C# API reference documentation.

using Avro.IO;
using Avro.Specific;
using Google.Cloud.PubSub.V1;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

public class PublishAvroMessagesAsyncSample
    public async Task<int> PublishAvroMessagesAsync(string projectId, string topicId, IEnumerable<AvroUtilities.State> messageStates)
        TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
        PublisherClient publisher = await PublisherClient.CreateAsync(topicName);

        PublisherServiceApiClient publishApi = PublisherServiceApiClient.Create();
        var topic = publishApi.GetTopic(topicName);

        int publishedMessageCount = 0;
        var publishTasks = messageStates.Select(async state =>

                string messageId = null;
                switch (topic.SchemaSettings.Encoding)
                    case Encoding.Binary:
                        using (var ms = new MemoryStream())
                            var encoder = new BinaryEncoder(ms);
                            var writer = new SpecificDefaultWriter(state.Schema);
                            writer.Write(state, encoder);
                            messageId = await publisher.PublishAsync(ms.ToArray());
                    case Encoding.Json:
                        var jsonMessage = AvroUtilities.StateUtils.StateToJsonString(state);
                        messageId = await publisher.PublishAsync(jsonMessage);
                Console.WriteLine($"Published message {messageId}");
                Interlocked.Increment(ref publishedMessageCount);
            catch (Exception exception)
                Console.WriteLine($"An error ocurred when publishing message {state}: {exception.Message}");
        await Task.WhenAll(publishTasks);
        return publishedMessageCount;


Before trying this sample, follow the C++ setup instructions in the Pub/Sub quickstart using client libraries. For more information, see the Pub/Sub C++ API reference documentation.

namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::StatusOr;
[](pubsub::Publisher publisher) {
  auto constexpr kNewYork =
      R"js({ "name": "New York", "post_abbr": "NY" })js";
  auto constexpr kPennsylvania =
      R"js({ "name": "Pennsylvania", "post_abbr": "PA" })js";
  std::vector<future<void>> done;
  auto handler = [](future<StatusOr<std::string>> f) {
    auto id = f.get();
    if (!id) throw std::runtime_error(id.status().message());
  for (auto const* data : {kNewYork, kPennsylvania}) {
  // Block until all messages are published.
  for (auto& d : done) d.get();


Before trying this sample, follow the Go setup instructions in the Pub/Sub quickstart using client libraries. For more information, see the Pub/Sub Go API reference documentation.

import (


func publishAvroRecords(w io.Writer, projectID, topicID, avscFile string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	// avscFile = "path/to/an/avro/schema/file(.avsc)/formatted/in/json"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %v", err)

	avroSource, err := ioutil.ReadFile(avscFile)
	if err != nil {
		return fmt.Errorf("ioutil.ReadFile err: %v", err)
	codec, err := goavro.NewCodec(string(avroSource))
	if err != nil {
		return fmt.Errorf("goavro.NewCodec err: %v", err)
	record := map[string]interface{}{"name": "Alaska", "post_abbr": "AK"}

	// Get the topic encoding type.
	t := client.Topic(topicID)
	cfg, err := t.Config(ctx)
	if err != nil {
		return fmt.Errorf("topic.Config err: %v", err)
	encoding := cfg.SchemaSettings.Encoding

	var msg []byte
	switch encoding {
	case pubsub.EncodingBinary:
		msg, err = codec.BinaryFromNative(nil, record)
		if err != nil {
			return fmt.Errorf("codec.BinaryFromNative err: %v", err)
	case pubsub.EncodingJSON:
		msg, err = codec.TextualFromNative(nil, record)
		if err != nil {
			return fmt.Errorf("codec.TextualFromNative err: %v", err)
		return fmt.Errorf("invalid encoding: %v", encoding)

	result := t.Publish(ctx, &pubsub.Message{
		Data: msg,
	_, err = result.Get(ctx)
	if err != nil {
		return fmt.Errorf("result.Get: %v", err)
	fmt.Fprintf(w, "Published avro record: %s\n", string(msg))
	return nil


Before trying this sample, follow the Java setup instructions in the Pub/Sub quickstart using client libraries. For more information, see the Pub/Sub Java API reference documentation.

import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import utilities.State;

public class PublishAvroRecordsExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    // Use a topic created with an Avro schema.
    String topicId = "your-topic-id";

    publishAvroRecordsExample(projectId, topicId);

  public static void publishAvroRecordsExample(String projectId, String topicId)
      throws IOException, ExecutionException, InterruptedException {

    Encoding encoding = null;

    TopicName topicName = TopicName.of(projectId, topicId);

    // Get the topic encoding type.
    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
      encoding = topicAdminClient.getTopic(topicName).getSchemaSettings().getEncoding();

    // Instantiate an avro-tools-generated class defined in `us-states.avsc`.
    State state = State.newBuilder().setName("Alaska").setPostAbbr("AK").build();

    Publisher publisher = null;

    try {
      publisher = Publisher.newBuilder(topicName).build();

      // Prepare to serialize the object to the output stream.
      ByteArrayOutputStream byteStream = new ByteArrayOutputStream();

      Encoder encoder = null;

      // Prepare an appropriate encoder for publishing to the topic.
      switch (encoding) {
        case BINARY:
          System.out.println("Preparing a BINARY encoder...");
          encoder = EncoderFactory.get().directBinaryEncoder(byteStream, /*reuse=*/ null);

        case JSON:
          System.out.println("Preparing a JSON encoder...");
          encoder = EncoderFactory.get().jsonEncoder(State.getClassSchema(), byteStream);

          break block;

      // Encode the object and write it to the output stream.

      // Publish the encoded object as a Pub/Sub message.
      ByteString data = ByteString.copyFrom(byteStream.toByteArray());
      PubsubMessage message = PubsubMessage.newBuilder().setData(data).build();
      System.out.println("Publishing message: " + message);

      ApiFuture<String> future = publisher.publish(message);
      System.out.println("Published message ID: " + future.get());

    } finally {
      if (publisher != null) {
        publisher.awaitTermination(1, TimeUnit.MINUTES);


Before trying this sample, follow the Node.js setup instructions in the Pub/Sub quickstart using client libraries. For more information, see the Pub/Sub Node.js API reference documentation.

 * TODO(developer): Uncomment this variable before running the sample.
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';

// Imports the Google Cloud client library
const {PubSub, Encodings} = require('@google-cloud/pubsub');

// And the Apache Avro library
const avro = require('avro-js');
const fs = require('fs');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function publishAvroRecords(topicNameOrId) {
  // Get the topic metadata to learn about its schema encoding.
  const topic = pubSubClient.topic(topicNameOrId);
  const [topicMetadata] = await topic.getMetadata();
  const topicSchemaMetadata = topicMetadata.schemaSettings;

  if (!topicSchemaMetadata) {
    console.log(`Topic ${topicNameOrId} doesn't seem to have a schema.`);
  const schemaEncoding = topicSchemaMetadata.encoding;

  // Make an encoder using the official avro-js library.
  const definition = fs
  const type = avro.parse(definition);

  // Encode the message.
  const province = {
    name: 'Ontario',
    post_abbr: 'ON',
  let dataBuffer;
  switch (schemaEncoding) {
    case Encodings.Binary:
      dataBuffer = type.toBuffer(province);
    case Encodings.Json:
      dataBuffer = Buffer.from(type.toString(province));
      console.log(`Unknown schema encoding: ${schemaEncoding}`);
  if (!dataBuffer) {
    console.log(`Invalid encoding ${schemaEncoding} on the topic.`);

  const messageId = await topic.publish(dataBuffer);
  console.log(`Avro record ${messageId} published.`);


Before trying this sample, follow the PHP setup instructions in the Pub/Sub quickstart using client libraries. For more information, see the Pub/Sub PHP API reference documentation.

use Google\Cloud\PubSub\PubSubClient;
use Google\Cloud\PubSub\V1\Encoding;

use AvroStringIO;
use AvroSchema;
use AvroIODatumWriter;
use AvroDataIOWriter;

 * Publish a message using an AVRO schema.
 * This sample uses `wikimedia/avro` for AVRO encoding.
 * @param string $projectId
 * @param string $topicId
 * @param string $definitionFile
 * @return void
function publish_avro_records($projectId, $topicId, $definitionFile)
    $pubsub = new PubSubClient([
        'projectId' => $projectId,

    $definition = file_get_contents($definitionFile);

    $messageData = [
        'name' => 'Alaska',
        'post_abbr' => 'AK',

    $topic = $pubsub->topic($topicId);

    // get the encoding type.
    $topicInfo = $topic->info();
    $encoding = '';
    if (isset($topicInfo['schemaSettings']['encoding'])) {
        $encoding = $topicInfo['schemaSettings']['encoding'];

    // if encoding is not set, we can't continue.
    if ($encoding === '') {
        printf('Topic %s does not have schema enabled', $topicId);

    // If you are using gRPC, encoding may be an integer corresponding to an
    // enum value on Google\Cloud\PubSub\V1\Encoding.
    if (!is_string($encoding)) {
        $encoding = Encoding::name($encoding);

    $encodedMessageData = '';
    if ($encoding == 'BINARY') {
        // encode as AVRO binary.
        $io = new AvroStringIO();
        $schema = AvroSchema::parse($definition);
        $writer = new AvroIODatumWriter($schema);
        $dataWriter = new AvroDataIOWriter($io, $writer, $schema);



        // AVRO binary data must be base64-encoded.
        $encodedMessageData = base64_encode($io->string());
    } else {
        // encode as JSON.
        $encodedMessageData = json_encode($messageData);

    $topic->publish(['data' => $encodedMessageData]);

    printf('Published message with %s encoding', $encoding);


Before trying this sample, follow the Python setup instructions in the Pub/Sub quickstart using client libraries. For more information, see the Pub/Sub Python API reference documentation.

from import BinaryEncoder, DatumWriter
import avro
import io
import json
from google.api_core.exceptions import NotFound
from import PublisherClient
from google.pubsub_v1.types import Encoding

# TODO(developer): Replace these variables before running the sample.
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# avsc_file = "path/to/an/avro/schema/file/(.avsc)/formatted/in/json"

publisher_client = PublisherClient()
topic_path = publisher_client.topic_path(project_id, topic_id)

# Prepare to write Avro records to the binary output stream.
avro_schema = avro.schema.parse(open(avsc_file, "rb").read())
writer = DatumWriter(avro_schema)
bout = io.BytesIO()

# Prepare some data using a Python dictionary that matches the Avro schema
record = {"name": "Alaska", "post_abbr": "AK"}

    # Get the topic encoding type.
    topic = publisher_client.get_topic(request={"topic": topic_path})
    encoding = topic.schema_settings.encoding

    # Encode the data according to the message serialization type.
    if encoding == Encoding.BINARY:
        encoder = BinaryEncoder(bout)
        writer.write(record, encoder)
        data = bout.getvalue()
        print(f"Preparing a binary-encoded message:\n{data.decode()}")
    elif encoding == Encoding.JSON:
        data_str = json.dumps(record)
        print(f"Preparing a JSON-encoded message:\n{data_str}")
        data = data_str.encode("utf-8")
        print(f"No encoding specified in {topic_path}. Abort.")

    future = publisher_client.publish(topic_path, data)
    print(f"Published message ID: {future.result()}")

except NotFound:
    print(f"{topic_id} not found.")


Before trying this sample, follow the Ruby setup instructions in the Pub/Sub quickstart using client libraries. For more information, see the Pub/Sub Ruby API reference documentation.

# topic_id = "your-topic-id"
# avsc_file = "path/to/an/avro/schema/file/(.avsc)/formatted/in/json"
require "google/cloud/pubsub"

pubsub =

topic = pubsub.topic topic_id

record = { "name" => "Alaska", "post_abbr" => "AK" }

if topic.message_encoding_binary?
  require "avro"
  avro_schema = Avro::Schema.parse
  writer = avro_schema
  buffer =
  encoder = buffer
  writer.write record, encoder
  topic.publish buffer
  puts "Published binary-encoded AVRO message."
elsif topic.message_encoding_json?
  require "json"
  topic.publish record.to_json
  puts "Published JSON-encoded AVRO message."
  raise "No encoding specified in #{}."

What's next

To search and filter code samples for other Google Cloud products, see the Google Cloud sample browser.