接收可能针对不同架构修订版本的消息

接收可能针对不同架构修订版本的消息

深入探索

如需查看包含此代码示例的详细文档,请参阅以下内容:

代码示例

C++

试用此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 C++ 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C++ API 参考文档

要向 Pub/Sub 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证

auto subscriber = pubsub::Subscriber(pubsub::MakeSubscriberConnection(
    pubsub::Subscription(project_id, subscription_id)));

// Create a schema client.
auto schema_client =
    pubsub::SchemaServiceClient(pubsub::MakeSchemaServiceConnection());

// Read the reader schema. This is the schema you want the messages to be
// evaluated using.
std::ifstream ifs(avro_file);
avro::ValidSchema reader_schema;
avro::compileJsonSchema(ifs, reader_schema);

std::unordered_map<std::string, avro::ValidSchema> revisions_to_schemas;
auto session = subscriber.Subscribe(
    [&](pubsub::Message const& message, pubsub::AckHandler h) {
      // Get the reader schema revision for the message.
      auto schema_name = message.attributes()["googclient_schemaname"];
      auto schema_revision_id =
          message.attributes()["googclient_schemarevisionid"];
      // If we haven't received a message with this schema, look it up.
      if (revisions_to_schemas.find(schema_revision_id) ==
          revisions_to_schemas.end()) {
        auto schema_path = schema_name + "@" + schema_revision_id;
        // Use the schema client to get the path.
        auto schema = schema_client.GetSchema(schema_path);
        if (!schema) {
          std::cout << "Schema not found:" << schema_path << "\n";
          return;
        }
        avro::ValidSchema writer_schema;
        std::stringstream in;
        in << schema.value().definition();
        avro::compileJsonSchema(in, writer_schema);
        revisions_to_schemas[schema_revision_id] = writer_schema;
      }
      auto writer_schema = revisions_to_schemas[schema_revision_id];

      auto encoding = message.attributes()["googclient_schemaencoding"];
      if (encoding == "JSON") {
        std::stringstream in;
        in << message.data();
        auto avro_in = avro::istreamInputStream(in);
        avro::DecoderPtr decoder = avro::resolvingDecoder(
            writer_schema, reader_schema, avro::jsonDecoder(writer_schema));
        decoder->init(*avro_in);

        v2::State state;
        avro::decode(*decoder, state);
        std::cout << "Name: " << state.name << "\n";
        std::cout << "Postal Abbreviation: " << state.post_abbr << "\n";
        std::cout << "Population: " << state.population << "\n";
      } else {
        std::cout << "Unable to decode. Received message using encoding"
                  << encoding << "\n";
      }
      std::move(h).ack();
    });

C#

试用此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 C# 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C# API 参考文档

要向 Pub/Sub 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证

using Avro.Generic;
using Avro.IO;
using Google.Cloud.PubSub.V1;
using System;
using System.Collections.Concurrent;
using System.IO;
using System.Threading;
using System.Threading.Tasks;

public class SubscribeAvroRecordsWithRevisionsSample
{
    public async Task<(int, int)> SubscribeAvroRecordsWithRevisions(string projectId, string subscriptionId)
    {
        SchemaServiceClient schemaService = SchemaServiceClient.Create();

        var schemaCache = new ConcurrentDictionary<(string, string), Avro.Schema>();

        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);
        SubscriberClient subscriber = await SubscriberClient.CreateAsync(subscriptionName);
        int messageCount = 0;
        Task startTask = subscriber.StartAsync((PubsubMessage message, CancellationToken cancel) =>
        {
            // Get the schema name, revision ID and encoding type from the message.
            string encoding = message.Attributes["googclient_schemaencoding"];
            string schemaName = message.Attributes["googclient_schemaname"];
            string revision = message.Attributes["googclient_schemarevisionid"];

            // Fetch the schema if we don't already have it.
            var avroSchema = schemaCache.GetOrAdd((schemaName, revision), key =>
            {
                var pubSubSchema = schemaService.GetSchema($"{schemaName}@{revision}");
                return Avro.Schema.Parse(pubSubSchema.Definition);
            });

            // Read the message.
            if (encoding == "BINARY")
            {
                using var ms = new MemoryStream(message.Data.ToByteArray());
                var decoder = new BinaryDecoder(ms);
                var reader = new DefaultReader(avroSchema, avroSchema);
                var record = reader.Read<GenericRecord>(null, decoder);
                Console.WriteLine($"Message {message.MessageId}: {record.GetValue(0)}");
                Interlocked.Increment(ref messageCount);
            }
            else
            {
                Console.WriteLine("Expected only binary messages in this sample");
            }
            return Task.FromResult(SubscriberClient.Reply.Ack);
        });
        // Run for 10 seconds.
        await Task.Delay(10_000);
        await subscriber.StopAsync(CancellationToken.None);
        // Lets make sure that the start task finished successfully after the call to stop.
        await startTask;
        return (messageCount, schemaCache.Count);
    }
}

Go

试用此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 Go 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Go API 参考文档

要向 Pub/Sub 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证

import (
	"context"
	"fmt"
	"io"
	"strings"
	"sync"
	"time"

	"cloud.google.com/go/pubsub"
	"github.com/linkedin/goavro/v2"
)

func subscribeWithAvroSchemaRevisions(w io.Writer, projectID, subID, avscFile string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	// avscFile = "path/to/an/avro/schema/file(.avsc)/formatted/in/json"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}

	schemaClient, err := pubsub.NewSchemaClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewSchemaClient: %w", err)
	}

	// Create the cache for the codecs for different revision IDs.
	revisionCodecs := make(map[string]*goavro.Codec)

	sub := client.Subscription(subID)
	ctx2, cancel := context.WithTimeout(ctx, 10*time.Second)
	defer cancel()

	var mu sync.Mutex
	sub.Receive(ctx2, func(ctx context.Context, msg *pubsub.Message) {
		mu.Lock()
		defer mu.Unlock()
		name := msg.Attributes["googclient_schemaname"]
		revision := msg.Attributes["googclient_schemarevisionid"]

		codec, ok := revisionCodecs[revision]
		// If the codec doesn't exist in the map, this is the first time we
		// are seeing this revision. We need to fetch the schema and cache the
		// codec. It would be more typical to do this asynchronously, but is
		// shown here in a synchronous way to ease readability.
		if !ok {
			// Extract just the schema resource name
			path := strings.Split(name, "/")
			name = path[len(path)-1]
			schema, err := schemaClient.Schema(ctx, fmt.Sprintf("%s@%s", name, revision), pubsub.SchemaViewFull)
			if err != nil {
				fmt.Fprintf(w, "Nacking, cannot read message without schema: %v\n", err)
				msg.Nack()
				return
			}
			codec, err = goavro.NewCodec(schema.Definition)
			if err != nil {
				msg.Nack()
				fmt.Fprintf(w, "goavro.NewCodec err: %v\n", err)
			}
			revisionCodecs[revision] = codec
		}

		encoding := msg.Attributes["googclient_schemaencoding"]

		var state map[string]interface{}
		if encoding == "BINARY" {
			data, _, err := codec.NativeFromBinary(msg.Data)
			if err != nil {
				fmt.Fprintf(w, "codec.NativeFromBinary err: %v\n", err)
				msg.Nack()
				return
			}
			fmt.Fprintf(w, "Received a binary-encoded message:\n%#v\n", data)
			state = data.(map[string]interface{})
		} else if encoding == "JSON" {
			data, _, err := codec.NativeFromTextual(msg.Data)
			if err != nil {
				fmt.Fprintf(w, "codec.NativeFromTextual err: %v\n", err)
				msg.Nack()
				return
			}
			fmt.Fprintf(w, "Received a JSON-encoded message:\n%#v\n", data)
			state = data.(map[string]interface{})
		} else {
			fmt.Fprintf(w, "Unknown message type(%s), nacking\n", encoding)
			msg.Nack()
			return
		}
		fmt.Fprintf(w, "%s is abbreviated as %s\n", state["name"], state["post_abbr"])
		msg.Ack()
	})
	return nil
}

Java

试用此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 Java 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Java API 参考文档

要向 Pub/Sub 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证


import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.SchemaServiceClient;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.Schema;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import utilities.State;

public class SubscribeWithAvroSchemaRevisionsExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    // Use an existing subscription.
    String subscriptionId = "your-subscription-id";

    subscribeWithAvroSchemaRevisionsExample(projectId, subscriptionId);
  }

  static SchemaServiceClient getSchemaServiceClient() {
    try {
      return SchemaServiceClient.create();
    } catch (IOException e) {
      System.out.println("Could not get schema client: " + e);
      return null;
    }
  }

  public static void subscribeWithAvroSchemaRevisionsExample(
      String projectId, String subscriptionId) {
    // Used to get the schemas for revsions.
    final SchemaServiceClient schemaServiceClient = getSchemaServiceClient();
    if (schemaServiceClient == null) {
      return;
    }

    // Cache for the readers for different revision IDs.
    Map<String, SpecificDatumReader<State>> revisionReaders =
        new HashMap<String, SpecificDatumReader<State>>();

    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    // Instantiate an asynchronous message receiver.
    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          // Get the schema encoding type.
          String name = message.getAttributesMap().get("googclient_schemaname");
          String revision = message.getAttributesMap().get("googclient_schemarevisionid");

          SpecificDatumReader<State> reader = null;
          synchronized (revisionReaders) {
            reader = revisionReaders.get(revision);
          }
          if (reader == null) {
            // This is the first time we are seeing this revision. We need to
            // fetch the schema and cache its decoder. It would be more typical
            // to do this asynchronously, but is shown here in a synchronous
            // way to ease readability.
            try {
              Schema schema = schemaServiceClient.getSchema(name + "@" + revision);
              org.apache.avro.Schema avroSchema =
                  new org.apache.avro.Schema.Parser().parse(schema.getDefinition());
              reader = new SpecificDatumReader<State>(avroSchema, State.getClassSchema());
              synchronized (revisionReaders) {
                revisionReaders.put(revision, reader);
              }
            } catch (Exception e) {
              System.out.println("Could not get schema: " + e);
              // Without the schema, we cannot read the message, so nack it.
              consumer.nack();
              return;
            }
          }

          ByteString data = message.getData();
          // Send the message data to a byte[] input stream.
          InputStream inputStream = new ByteArrayInputStream(data.toByteArray());

          String encoding = message.getAttributesMap().get("googclient_schemaencoding");

          Decoder decoder = null;

          // Prepare an appropriate decoder for the message data in the input stream
          // based on the schema encoding type.
          try {
            switch (encoding) {
              case "BINARY":
                decoder = DecoderFactory.get().directBinaryDecoder(inputStream, /*reuse=*/ null);
                System.out.println("Receiving a binary-encoded message:");
                break;
              case "JSON":
                decoder = DecoderFactory.get().jsonDecoder(State.getClassSchema(), inputStream);
                System.out.println("Receiving a JSON-encoded message:");
                break;
              default:
                System.out.println("Unknown message type; nacking.");
                consumer.nack();
                break;
            }

            // Obtain an object of the generated Avro class using the decoder.
            State state = reader.read(null, decoder);
            System.out.println(state.getName() + " is abbreviated as " + state.getPostAbbr());

            // Ack the message.
            consumer.ack();
          } catch (IOException e) {
            System.err.println(e);
            // If we failed to process the message, nack it.
            consumer.nack();
          }
        };

    Subscriber subscriber = null;
    try {
      subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
      subscriber.startAsync().awaitRunning();
      System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
    } catch (TimeoutException timeoutException) {
      subscriber.stopAsync();
    }
  }
}

Node.js

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const timeout = 60;

// Imports the Google Cloud client library
const {PubSub, Schema, Encodings} = require('@google-cloud/pubsub');

// And the Apache Avro library; this lacks typings, so for
// TypeScript, a few synthetic types were created.
const avro = require('avro-js');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function listenForAvroRecordsWithRevisions(
  subscriptionNameOrId,
  timeout
) {
  // References an existing subscription
  const subscription = pubSubClient.subscription(subscriptionNameOrId);

  // Cache decoders for various schema revisions.
  const revisionReaders = new Map();

  // We need a schema admin service client to retrieve revisions.
  const schemaClient = await pubSubClient.getSchemaClient();

  // Create an event handler to handle messages
  let messageCount = 0;
  const messageHandler = async message => {
    // Get the schema metadata from the message.
    const schemaMetadata = Schema.metadataFromMessage(message.attributes);

    let reader;
    try {
      // Do we already have a decoder for this revision?
      const revision = schemaMetadata.revision;
      if (revisionReaders.has(revision)) {
        reader = revisionReaders.get(revision);
      } else {
        // This is the first time we are seeing this revision. We need to
        // fetch the schema and cache its decoder.
        const [schema] = await schemaClient.getSchema({
          name: `${schemaMetadata.name}@${schemaMetadata.revision}`,
        });
        reader = avro.parse(schema.definition);
        revisionReaders.set(revision, reader);
      }
    } catch (err) {
      console.log('Could not get schema', err);
      message.nack();
      return;
    }

    let result;
    switch (schemaMetadata.encoding) {
      case Encodings.Binary:
        result = reader.fromBuffer(message.data);
        break;
      case Encodings.Json:
        result = reader.fromString(message.data.toString());
        break;
      default:
        console.log(`Unknown schema encoding: ${schemaMetadata.encoding}`);
        message.nack();
        return;
    }

    console.log(`Received message ${message.id}:`);
    console.log(`\tData: ${JSON.stringify(result, null, 4)}`);
    console.log(`\tAttributes: ${message.attributes}`);
    console.log(
      `\tProvince ${result.name} is abbreviated as ${result.post_abbr}`
    );
    messageCount += 1;

    // Ack the message.
    message.ack();
  };

  // Listen for new messages until timeout is hit
  subscription.on('message', messageHandler);

  setTimeout(() => {
    subscription.removeListener('message', messageHandler);
    console.log(`${messageCount} message(s) received.`);
  }, timeout * 1000);
}

Node.js

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const timeout = 60;

// Imports the Google Cloud client library
import {PubSub, Schema, Encodings, Message} from '@google-cloud/pubsub';

// And the Apache Avro library; this lacks typings, so for
// TypeScript, a few synthetic types were created.
import * as avro from 'avro-js';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

interface ProvinceObject {
  name: string;
  post_abbr: string;
}

async function listenForAvroRecordsWithRevisions(
  subscriptionNameOrId: string,
  timeout: number
) {
  // References an existing subscription
  const subscription = pubSubClient.subscription(subscriptionNameOrId);

  // Cache decoders for various schema revisions.
  const revisionReaders = new Map<string, avro.Parser>();

  // We need a schema admin service client to retrieve revisions.
  const schemaClient = await pubSubClient.getSchemaClient();

  // Create an event handler to handle messages
  let messageCount = 0;
  const messageHandler = async (message: Message) => {
    // Get the schema metadata from the message.
    const schemaMetadata = Schema.metadataFromMessage(message.attributes);

    let reader: avro.Parser;
    try {
      // Do we already have a decoder for this revision?
      const revision = schemaMetadata.revision!;
      if (revisionReaders.has(revision)) {
        reader = revisionReaders.get(revision)!;
      } else {
        // This is the first time we are seeing this revision. We need to
        // fetch the schema and cache its decoder.
        const [schema] = await schemaClient.getSchema({
          name: `${schemaMetadata.name}@${schemaMetadata.revision}`,
        });
        reader = avro.parse(schema.definition!);
        revisionReaders.set(revision, reader);
      }
    } catch (err: unknown) {
      console.log('Could not get schema', err);
      message.nack();
      return;
    }

    let result: ProvinceObject | undefined;
    switch (schemaMetadata.encoding) {
      case Encodings.Binary:
        result = reader.fromBuffer(message.data);
        break;
      case Encodings.Json:
        result = reader.fromString(message.data.toString());
        break;
      default:
        console.log(`Unknown schema encoding: ${schemaMetadata.encoding}`);
        message.nack();
        return;
    }

    console.log(`Received message ${message.id}:`);
    console.log(`\tData: ${JSON.stringify(result, null, 4)}`);
    console.log(`\tAttributes: ${message.attributes}`);
    console.log(
      `\tProvince ${result?.name} is abbreviated as ${result?.post_abbr}`
    );
    messageCount += 1;

    // Ack the message.
    message.ack();
  };

  // Listen for new messages until timeout is hit
  subscription.on('message', messageHandler);

  setTimeout(() => {
    subscription.removeListener('message', messageHandler);
    console.log(`${messageCount} message(s) received.`);
  }, timeout * 1000);
}

Python

试用此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 Python 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Python API 参考文档

要向 Pub/Sub 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证

import avro.schema as schema
from avro.io import BinaryDecoder, DatumReader
from concurrent.futures import TimeoutError
import io
import json
from google.api_core.exceptions import NotFound
from google.cloud.pubsub import SchemaServiceClient, SubscriberClient

schema_client = SchemaServiceClient()

# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# avsc_file = "path/to/an/avro/schema/file/(.avsc)/formatted/in/json"
# Number of seconds the subscriber listens for messages
# timeout = 5.0

subscriber = SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)

with open(avsc_file, "rb") as file:
    reader_avro_schema = schema.parse(file.read())
# Dict to keep readers for different schema revisions.
revisions_to_readers = {}

def callback(message: pubsub_v1.subscriber.message.Message) -> None:
    # Get the message serialization type.
    schema_name = message.attributes.get("googclient_schemaname")
    schema_revision_id = message.attributes.get("googclient_schemarevisionid")
    encoding = message.attributes.get("googclient_schemaencoding")

    if schema_revision_id not in revisions_to_readers:
        schema_path = schema_name + "@" + schema_revision_id
        try:
            received_avro_schema = schema_client.get_schema(
                request={"name": schema_path}
            )
        except NotFound:
            print(f"{schema_path} not found.")
            message.nack()
            return
        writer_avro_schema = schema.parse(received_avro_schema.definition)
        revisions_to_readers[schema_revision_id] = DatumReader(
            writer_avro_schema, reader_avro_schema
        )
    reader = revisions_to_readers[schema_revision_id]

    # Deserialize the message data accordingly.
    if encoding == "BINARY":
        bout = io.BytesIO(message.data)
        decoder = BinaryDecoder(bout)
        message_data = reader.read(decoder)
        print(f"Received a binary-encoded message:\n{message_data}")
    elif encoding == "JSON":
        message_data = json.loads(message.data)
        print(f"Received a JSON-encoded message:\n{message_data}")
    else:
        print(f"Received a message with no encoding:\n{message}")
        message.nack()

    message.ack()

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..\n")

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
    try:
        # When `timeout` is not set, result() will block indefinitely,
        # unless an exception occurs first.
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError:
        streaming_pull_future.cancel()  # Trigger the shutdown.
        streaming_pull_future.result()  # Block until the shutdown is complete.

后续步骤

如需搜索和过滤其他 Google Cloud 产品的代码示例,请参阅 Google Cloud 示例浏览器