Publicar mensajes del tipo de esquema de Avro

Publica mensajes que se ajusten a un esquema de Avro en un tema con un esquema de Avro adjunto.

Explora más

Para obtener documentación en la que se incluye esta muestra de código, consulta lo siguiente:

Muestra de código

C++

Antes de probar esta muestra, sigue las instrucciones de configuración de C++ que se encuentran en la guía de inicio rápido de Pub/Sub con bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub C++.

Para autenticarte en Pub/Sub, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.

namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::StatusOr;
[](pubsub::Publisher publisher) {
  auto constexpr kNewYork =
      R"js({ "name": "New York", "post_abbr": "NY" })js";
  auto constexpr kPennsylvania =
      R"js({ "name": "Pennsylvania", "post_abbr": "PA" })js";
  std::vector<future<void>> done;
  auto handler = [](future<StatusOr<std::string>> f) {
    auto id = f.get();
    if (!id) throw std::move(id).status();
  };
  for (auto const* data : {kNewYork, kPennsylvania}) {
    done.push_back(
        publisher.Publish(pubsub::MessageBuilder{}.SetData(data).Build())
            .then(handler));
  }
  // Block until all messages are published.
  for (auto& d : done) d.get();
}

C#

Antes de probar esta muestra, sigue las instrucciones de configuración de C# que se encuentran en la guía de inicio rápido de Pub/Sub con bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub C#.

Para autenticarte en Pub/Sub, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.


using Avro.IO;
using Avro.Specific;
using Google.Cloud.PubSub.V1;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

public class PublishAvroMessagesAsyncSample
{
    public async Task<int> PublishAvroMessagesAsync(string projectId, string topicId, IEnumerable<AvroUtilities.State> messageStates)
    {
        TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
        PublisherClient publisher = await PublisherClient.CreateAsync(topicName);

        PublisherServiceApiClient publishApi = PublisherServiceApiClient.Create();
        var topic = publishApi.GetTopic(topicName);

        int publishedMessageCount = 0;
        var publishTasks = messageStates.Select(async state =>
        {

            try
            {
                string messageId = null;
                switch (topic.SchemaSettings.Encoding)
                {
                    case Encoding.Binary:
                        using (var ms = new MemoryStream())
                        {
                            var encoder = new BinaryEncoder(ms);
                            var writer = new SpecificDefaultWriter(state.Schema);
                            writer.Write(state, encoder);
                            messageId = await publisher.PublishAsync(ms.ToArray());
                        }
                        break;
                    case Encoding.Json:
                        var jsonMessage = AvroUtilities.StateUtils.StateToJsonString(state);
                        messageId = await publisher.PublishAsync(jsonMessage);
                        break;
                }
                Console.WriteLine($"Published message {messageId}");
                Interlocked.Increment(ref publishedMessageCount);
            }
            catch (Exception exception)
            {
                Console.WriteLine($"An error occurred when publishing message {state}: {exception.Message}");
            }
        });
        await Task.WhenAll(publishTasks);
        return publishedMessageCount;
    }
}

Go

Antes de probar esta muestra, sigue las instrucciones de configuración de Go que se encuentran en la guía de inicio rápido de Pub/Sub con bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub Go.

Para autenticarte en Pub/Sub, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.

import (
	"context"
	"fmt"
	"io"
	"os"

	"cloud.google.com/go/pubsub"
	"github.com/linkedin/goavro/v2"
)

func publishAvroRecords(w io.Writer, projectID, topicID, avscFile string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	// avscFile = "path/to/an/avro/schema/file(.avsc)/formatted/in/json"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}

	avroSource, err := os.ReadFile(avscFile)
	if err != nil {
		return fmt.Errorf("ioutil.ReadFile err: %w", err)
	}
	codec, err := goavro.NewCodec(string(avroSource))
	if err != nil {
		return fmt.Errorf("goavro.NewCodec err: %w", err)
	}
	record := map[string]interface{}{"name": "Alaska", "post_abbr": "AK"}

	// Get the topic encoding type.
	t := client.Topic(topicID)
	cfg, err := t.Config(ctx)
	if err != nil {
		return fmt.Errorf("topic.Config err: %w", err)
	}
	encoding := cfg.SchemaSettings.Encoding

	var msg []byte
	switch encoding {
	case pubsub.EncodingBinary:
		msg, err = codec.BinaryFromNative(nil, record)
		if err != nil {
			return fmt.Errorf("codec.BinaryFromNative err: %w", err)
		}
	case pubsub.EncodingJSON:
		msg, err = codec.TextualFromNative(nil, record)
		if err != nil {
			return fmt.Errorf("codec.TextualFromNative err: %w", err)
		}
	default:
		return fmt.Errorf("invalid encoding: %v", encoding)
	}

	result := t.Publish(ctx, &pubsub.Message{
		Data: msg,
	})
	_, err = result.Get(ctx)
	if err != nil {
		return fmt.Errorf("result.Get: %w", err)
	}
	fmt.Fprintf(w, "Published avro record: %s\n", string(msg))
	return nil
}

Java

Antes de probar esta muestra, sigue las instrucciones de configuración de Java que se encuentran en la guía de inicio rápido de Pub/Sub con bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub Java.

Para autenticarte en Pub/Sub, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.


import com.google.api.core.ApiFuture;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.Encoding;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import utilities.State;

public class PublishAvroRecordsExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    // Use a topic created with an Avro schema.
    String topicId = "your-topic-id";

    publishAvroRecordsExample(projectId, topicId);
  }

  public static void publishAvroRecordsExample(String projectId, String topicId)
      throws IOException, ExecutionException, InterruptedException {

    Encoding encoding = null;

    TopicName topicName = TopicName.of(projectId, topicId);

    // Get the topic encoding type.
    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
      encoding = topicAdminClient.getTopic(topicName).getSchemaSettings().getEncoding();
    }

    // Instantiate an avro-tools-generated class defined in `us-states.avsc`.
    State state = State.newBuilder().setName("Alaska").setPostAbbr("AK").build();

    Publisher publisher = null;

    block:
    try {
      publisher = Publisher.newBuilder(topicName).build();

      // Prepare to serialize the object to the output stream.
      ByteArrayOutputStream byteStream = new ByteArrayOutputStream();

      Encoder encoder = null;

      // Prepare an appropriate encoder for publishing to the topic.
      switch (encoding) {
        case BINARY:
          System.out.println("Preparing a BINARY encoder...");
          encoder = EncoderFactory.get().directBinaryEncoder(byteStream, /*reuse=*/ null);
          break;

        case JSON:
          System.out.println("Preparing a JSON encoder...");
          encoder = EncoderFactory.get().jsonEncoder(State.getClassSchema(), byteStream);
          break;

        default:
          break block;
      }

      // Encode the object and write it to the output stream.
      state.customEncode(encoder);
      encoder.flush();

      // Publish the encoded object as a Pub/Sub message.
      ByteString data = ByteString.copyFrom(byteStream.toByteArray());
      PubsubMessage message = PubsubMessage.newBuilder().setData(data).build();
      System.out.println("Publishing message: " + message);

      ApiFuture<String> future = publisher.publish(message);
      System.out.println("Published message ID: " + future.get());

    } finally {
      if (publisher != null) {
        publisher.shutdown();
        publisher.awaitTermination(1, TimeUnit.MINUTES);
      }
    }
  }
}

Node.js

/**
 * TODO(developer): Uncomment this variable before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';

// Imports the Google Cloud client library
const {PubSub, Encodings} = require('@google-cloud/pubsub');

// And the Apache Avro library
const avro = require('avro-js');
const fs = require('fs');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function publishAvroRecords(topicNameOrId) {
  // Get the topic metadata to learn about its schema encoding.
  const topic = pubSubClient.topic(topicNameOrId);
  const [topicMetadata] = await topic.getMetadata();
  const topicSchemaMetadata = topicMetadata.schemaSettings;

  if (!topicSchemaMetadata) {
    console.log(`Topic ${topicNameOrId} doesn't seem to have a schema.`);
    return;
  }
  const schemaEncoding = topicSchemaMetadata.encoding;

  // Make an encoder using the official avro-js library.
  const definition = fs
    .readFileSync('system-test/fixtures/provinces.avsc')
    .toString();
  const type = avro.parse(definition);

  // Encode the message.
  const province = {
    name: 'Ontario',
    post_abbr: 'ON',
  };
  let dataBuffer;
  switch (schemaEncoding) {
    case Encodings.Binary:
      dataBuffer = type.toBuffer(province);
      break;
    case Encodings.Json:
      dataBuffer = Buffer.from(type.toString(province));
      break;
    default:
      console.log(`Unknown schema encoding: ${schemaEncoding}`);
      break;
  }
  if (!dataBuffer) {
    console.log(`Invalid encoding ${schemaEncoding} on the topic.`);
    return;
  }

  const messageId = await topic.publish(dataBuffer);
  console.log(`Avro record ${messageId} published.`);
}

Node.js

/**
 * TODO(developer): Uncomment this variable before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';

// Imports the Google Cloud client library
import {PubSub, Encodings} from '@google-cloud/pubsub';

// And the Apache Avro library
import * as avro from 'avro-js';
import * as fs from 'fs';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

interface ProvinceObject {
  name: string;
  post_abbr: string;
}

async function publishAvroRecords(topicNameOrId: string) {
  // Get the topic metadata to learn about its schema encoding.
  const topic = pubSubClient.topic(topicNameOrId);
  const [topicMetadata] = await topic.getMetadata();
  const topicSchemaMetadata = topicMetadata.schemaSettings;

  if (!topicSchemaMetadata) {
    console.log(`Topic ${topicNameOrId} doesn't seem to have a schema.`);
    return;
  }
  const schemaEncoding = topicSchemaMetadata.encoding;

  // Make an encoder using the official avro-js library.
  const definition = fs
    .readFileSync('system-test/fixtures/provinces.avsc')
    .toString();
  const type = avro.parse(definition);

  // Encode the message.
  const province: ProvinceObject = {
    name: 'Ontario',
    post_abbr: 'ON',
  };
  let dataBuffer: Buffer | undefined;
  switch (schemaEncoding) {
    case Encodings.Binary:
      dataBuffer = type.toBuffer(province);
      break;
    case Encodings.Json:
      dataBuffer = Buffer.from(type.toString(province));
      break;
    default:
      console.log(`Unknown schema encoding: ${schemaEncoding}`);
      break;
  }
  if (!dataBuffer) {
    console.log(`Invalid encoding ${schemaEncoding} on the topic.`);
    return;
  }

  const messageId = await topic.publish(dataBuffer);
  console.log(`Avro record ${messageId} published.`);
}

PHP

Antes de probar esta muestra, sigue las instrucciones de configuración de PHP que se encuentran en la guía de inicio rápido de Pub/Sub con bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub PHP.

Para autenticarte en Pub/Sub, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.

use Google\Cloud\PubSub\PubSubClient;
use Google\Cloud\PubSub\V1\Encoding;

use AvroStringIO;
use AvroSchema;
use AvroIODatumWriter;
use AvroIOBinaryEncoder;

/**
 * Publish a message using an AVRO schema.
 *
 * This sample uses `wikimedia/avro` for AVRO encoding.
 *
 * @param string $projectId
 * @param string $topicId
 * @param string $definitionFile
 */
function publish_avro_records($projectId, $topicId, $definitionFile)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);

    $definition = (string) file_get_contents($definitionFile);

    $messageData = [
        'name' => 'Alaska',
        'post_abbr' => 'AK',
    ];

    $topic = $pubsub->topic($topicId);

    // get the encoding type.
    $topicInfo = $topic->info();
    $encoding = '';
    if (isset($topicInfo['schemaSettings']['encoding'])) {
        $encoding = $topicInfo['schemaSettings']['encoding'];
    }

    // if encoding is not set, we can't continue.
    if ($encoding === '') {
        printf('Topic %s does not have schema enabled', $topicId);
        return;
    }

    // If you are using gRPC, encoding may be an integer corresponding to an
    // enum value on Google\Cloud\PubSub\V1\Encoding.
    if (!is_string($encoding)) {
        $encoding = Encoding::name($encoding);
    }

    $encodedMessageData = '';
    if ($encoding == 'BINARY') {
        // encode as AVRO binary.
        $io = new AvroStringIO();
        $schema = AvroSchema::parse($definition);
        $writer = new AvroIODatumWriter($schema);
        $encoder = new AvroIOBinaryEncoder($io);
        $writer->write($messageData, $encoder);

        $encodedMessageData = $io->string();
    } else {
        // encode as JSON.
        $encodedMessageData = json_encode($messageData);
    }

    $topic->publish(['data' => $encodedMessageData]);

    printf('Published message with %s encoding', $encoding);
}

Python

Antes de probar esta muestra, sigue las instrucciones de configuración de Python que se encuentran en la guía de inicio rápido de Pub/Sub con bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub Python.

Para autenticarte en Pub/Sub, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.

from avro.io import BinaryEncoder, DatumWriter
import avro.schema as schema
import io
import json
from google.api_core.exceptions import NotFound
from google.cloud.pubsub import PublisherClient
from google.pubsub_v1.types import Encoding

# TODO(developer): Replace these variables before running the sample.
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# avsc_file = "path/to/an/avro/schema/file/(.avsc)/formatted/in/json"

publisher_client = PublisherClient()
topic_path = publisher_client.topic_path(project_id, topic_id)

# Prepare to write Avro records to the binary output stream.
with open(avsc_file, "rb") as file:
    avro_schema = schema.parse(file.read())
writer = DatumWriter(avro_schema)
bout = io.BytesIO()

# Prepare some data using a Python dictionary that matches the Avro schema
record = {"name": "Alaska", "post_abbr": "AK"}

try:
    # Get the topic encoding type.
    topic = publisher_client.get_topic(request={"topic": topic_path})
    encoding = topic.schema_settings.encoding

    # Encode the data according to the message serialization type.
    if encoding == Encoding.BINARY:
        encoder = BinaryEncoder(bout)
        writer.write(record, encoder)
        data = bout.getvalue()
        print(f"Preparing a binary-encoded message:\n{data.decode()}")
    elif encoding == Encoding.JSON:
        data_str = json.dumps(record)
        print(f"Preparing a JSON-encoded message:\n{data_str}")
        data = data_str.encode("utf-8")
    else:
        print(f"No encoding specified in {topic_path}. Abort.")
        exit(0)

    future = publisher_client.publish(topic_path, data)
    print(f"Published message ID: {future.result()}")

except NotFound:
    print(f"{topic_id} not found.")

Ruby

Antes de probar esta muestra, sigue las instrucciones de configuración de Ruby que se encuentran en la guía de inicio rápido de Pub/Sub con bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub Ruby.

Para autenticarte en Pub/Sub, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.

# topic_id = "your-topic-id"
# avsc_file = "path/to/an/avro/schema/file/(.avsc)/formatted/in/json"

pubsub = Google::Cloud::Pubsub.new

topic = pubsub.topic topic_id

record = { "name" => "Alaska", "post_abbr" => "AK" }

if topic.message_encoding_binary?
  require "avro"
  avro_schema = Avro::Schema.parse File.read(avsc_file)
  writer = Avro::IO::DatumWriter.new avro_schema
  buffer = StringIO.new
  encoder = Avro::IO::BinaryEncoder.new buffer
  writer.write record, encoder
  topic.publish buffer
  puts "Published binary-encoded AVRO message."
elsif topic.message_encoding_json?
  require "json"
  topic.publish record.to_json
  puts "Published JSON-encoded AVRO message."
else
  raise "No encoding specified in #{topic.name}."
end

¿Qué sigue?

Para buscar y filtrar muestras de código para otros productos de Google Cloud, consulta el navegador de muestra de Google Cloud.