接收可能适用于不同架构修订版的消息
深入探索
如需查看包含此代码示例的详细文档,请参阅以下内容:
代码示例
C++
在尝试此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 C++ 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub C++ API 参考文档。
如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证。
auto subscriber = pubsub::Subscriber(pubsub::MakeSubscriberConnection(
pubsub::Subscription(project_id, subscription_id)));
// Create a schema client.
auto schema_client =
pubsub::SchemaServiceClient(pubsub::MakeSchemaServiceConnection());
// Read the reader schema. This is the schema you want the messages to be
// evaluated using.
std::ifstream ifs(avro_file);
avro::ValidSchema reader_schema;
avro::compileJsonSchema(ifs, reader_schema);
std::unordered_map<std::string, avro::ValidSchema> revisions_to_schemas;
auto session = subscriber.Subscribe(
[&](pubsub::Message const& message, pubsub::AckHandler h) {
// Get the reader schema revision for the message.
auto schema_name = message.attributes()["googclient_schemaname"];
auto schema_revision_id =
message.attributes()["googclient_schemarevisionid"];
// If we haven't received a message with this schema, look it up.
if (revisions_to_schemas.find(schema_revision_id) ==
revisions_to_schemas.end()) {
auto schema_path = schema_name + "@" + schema_revision_id;
// Use the schema client to get the path.
auto schema = schema_client.GetSchema(schema_path);
if (!schema) {
std::cout << "Schema not found:" << schema_path << "\n";
return;
}
avro::ValidSchema writer_schema;
std::stringstream in;
in << schema.value().definition();
avro::compileJsonSchema(in, writer_schema);
revisions_to_schemas[schema_revision_id] = writer_schema;
}
auto writer_schema = revisions_to_schemas[schema_revision_id];
auto encoding = message.attributes()["googclient_schemaencoding"];
if (encoding == "JSON") {
std::stringstream in;
in << message.data();
auto avro_in = avro::istreamInputStream(in);
avro::DecoderPtr decoder = avro::resolvingDecoder(
writer_schema, reader_schema, avro::jsonDecoder(writer_schema));
decoder->init(*avro_in);
v2::State state;
avro::decode(*decoder, state);
std::cout << "Name: " << state.name << "\n";
std::cout << "Postal Abbreviation: " << state.post_abbr << "\n";
std::cout << "Population: " << state.population << "\n";
} else {
std::cout << "Unable to decode. Received message using encoding"
<< encoding << "\n";
}
std::move(h).ack();
});
C#
在尝试此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 C# 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub C# API 参考文档。
如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证。
using Avro.Generic;
using Avro.IO;
using Google.Cloud.PubSub.V1;
using System;
using System.Collections.Concurrent;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
public class SubscribeAvroRecordsWithRevisionsSample
{
public async Task<(int, int)> SubscribeAvroRecordsWithRevisions(string projectId, string subscriptionId)
{
SchemaServiceClient schemaService = SchemaServiceClient.Create();
var schemaCache = new ConcurrentDictionary<(string, string), Avro.Schema>();
SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);
SubscriberClient subscriber = await SubscriberClient.CreateAsync(subscriptionName);
int messageCount = 0;
Task startTask = subscriber.StartAsync((PubsubMessage message, CancellationToken cancel) =>
{
// Get the schema name, revision ID and encoding type from the message.
string encoding = message.Attributes["googclient_schemaencoding"];
string schemaName = message.Attributes["googclient_schemaname"];
string revision = message.Attributes["googclient_schemarevisionid"];
// Fetch the schema if we don't already have it.
var avroSchema = schemaCache.GetOrAdd((schemaName, revision), key =>
{
var pubSubSchema = schemaService.GetSchema($"{schemaName}@{revision}");
return Avro.Schema.Parse(pubSubSchema.Definition);
});
// Read the message.
if (encoding == "BINARY")
{
using var ms = new MemoryStream(message.Data.ToByteArray());
var decoder = new BinaryDecoder(ms);
var reader = new DefaultReader(avroSchema, avroSchema);
var record = reader.Read<GenericRecord>(null, decoder);
Console.WriteLine($"Message {message.MessageId}: {record.GetValue(0)}");
Interlocked.Increment(ref messageCount);
}
else
{
Console.WriteLine("Expected only binary messages in this sample");
}
return Task.FromResult(SubscriberClient.Reply.Ack);
});
// Run for 10 seconds.
await Task.Delay(10_000);
await subscriber.StopAsync(CancellationToken.None);
// Lets make sure that the start task finished successfully after the call to stop.
await startTask;
return (messageCount, schemaCache.Count);
}
}
Go
在尝试此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 Go 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Go API 参考文档。
如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证。
import (
"context"
"fmt"
"io"
"strings"
"sync"
"time"
"cloud.google.com/go/pubsub"
"github.com/linkedin/goavro/v2"
)
func subscribeWithAvroSchemaRevisions(w io.Writer, projectID, subID, avscFile string) error {
// projectID := "my-project-id"
// topicID := "my-topic"
// avscFile = "path/to/an/avro/schema/file(.avsc)/formatted/in/json"
ctx := context.Background()
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %w", err)
}
schemaClient, err := pubsub.NewSchemaClient(ctx, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewSchemaClient: %w", err)
}
// Create the cache for the codecs for different revision IDs.
revisionCodecs := make(map[string]*goavro.Codec)
sub := client.Subscription(subID)
ctx2, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
var mu sync.Mutex
sub.Receive(ctx2, func(ctx context.Context, msg *pubsub.Message) {
mu.Lock()
defer mu.Unlock()
name := msg.Attributes["googclient_schemaname"]
revision := msg.Attributes["googclient_schemarevisionid"]
codec, ok := revisionCodecs[revision]
// If the codec doesn't exist in the map, this is the first time we
// are seeing this revision. We need to fetch the schema and cache the
// codec. It would be more typical to do this asynchronously, but is
// shown here in a synchronous way to ease readability.
if !ok {
// Extract just the schema resource name
path := strings.Split(name, "/")
name = path[len(path)-1]
schema, err := schemaClient.Schema(ctx, fmt.Sprintf("%s@%s", name, revision), pubsub.SchemaViewFull)
if err != nil {
fmt.Fprintf(w, "Nacking, cannot read message without schema: %v\n", err)
msg.Nack()
return
}
codec, err = goavro.NewCodec(schema.Definition)
if err != nil {
msg.Nack()
fmt.Fprintf(w, "goavro.NewCodec err: %v\n", err)
}
revisionCodecs[revision] = codec
}
encoding := msg.Attributes["googclient_schemaencoding"]
var state map[string]interface{}
if encoding == "BINARY" {
data, _, err := codec.NativeFromBinary(msg.Data)
if err != nil {
fmt.Fprintf(w, "codec.NativeFromBinary err: %v\n", err)
msg.Nack()
return
}
fmt.Fprintf(w, "Received a binary-encoded message:\n%#v\n", data)
state = data.(map[string]interface{})
} else if encoding == "JSON" {
data, _, err := codec.NativeFromTextual(msg.Data)
if err != nil {
fmt.Fprintf(w, "codec.NativeFromTextual err: %v\n", err)
msg.Nack()
return
}
fmt.Fprintf(w, "Received a JSON-encoded message:\n%#v\n", data)
state = data.(map[string]interface{})
} else {
fmt.Fprintf(w, "Unknown message type(%s), nacking\n", encoding)
msg.Nack()
return
}
fmt.Fprintf(w, "%s is abbreviated as %s\n", state["name"], state["post_abbr"])
msg.Ack()
})
return nil
}
Java
在尝试此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档。
如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证。
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.SchemaServiceClient;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.Schema;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import utilities.State;
public class SubscribeWithAvroSchemaRevisionsExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
// Use an existing subscription.
String subscriptionId = "your-subscription-id";
subscribeWithAvroSchemaRevisionsExample(projectId, subscriptionId);
}
static SchemaServiceClient getSchemaServiceClient() {
try {
return SchemaServiceClient.create();
} catch (IOException e) {
System.out.println("Could not get schema client: " + e);
return null;
}
}
public static void subscribeWithAvroSchemaRevisionsExample(
String projectId, String subscriptionId) {
// Used to get the schemas for revsions.
final SchemaServiceClient schemaServiceClient = getSchemaServiceClient();
if (schemaServiceClient == null) {
return;
}
// Cache for the readers for different revision IDs.
Map<String, SpecificDatumReader<State>> revisionReaders =
new HashMap<String, SpecificDatumReader<State>>();
ProjectSubscriptionName subscriptionName =
ProjectSubscriptionName.of(projectId, subscriptionId);
// Instantiate an asynchronous message receiver.
MessageReceiver receiver =
(PubsubMessage message, AckReplyConsumer consumer) -> {
// Get the schema encoding type.
String name = message.getAttributesMap().get("googclient_schemaname");
String revision = message.getAttributesMap().get("googclient_schemarevisionid");
SpecificDatumReader<State> reader = null;
synchronized (revisionReaders) {
reader = revisionReaders.get(revision);
}
if (reader == null) {
// This is the first time we are seeing this revision. We need to
// fetch the schema and cache its decoder. It would be more typical
// to do this asynchronously, but is shown here in a synchronous
// way to ease readability.
try {
Schema schema = schemaServiceClient.getSchema(name + "@" + revision);
org.apache.avro.Schema avroSchema =
new org.apache.avro.Schema.Parser().parse(schema.getDefinition());
reader = new SpecificDatumReader<State>(avroSchema, State.getClassSchema());
synchronized (revisionReaders) {
revisionReaders.put(revision, reader);
}
} catch (Exception e) {
System.out.println("Could not get schema: " + e);
// Without the schema, we cannot read the message, so nack it.
consumer.nack();
return;
}
}
ByteString data = message.getData();
// Send the message data to a byte[] input stream.
InputStream inputStream = new ByteArrayInputStream(data.toByteArray());
String encoding = message.getAttributesMap().get("googclient_schemaencoding");
Decoder decoder = null;
// Prepare an appropriate decoder for the message data in the input stream
// based on the schema encoding type.
try {
switch (encoding) {
case "BINARY":
decoder = DecoderFactory.get().directBinaryDecoder(inputStream, /*reuse=*/ null);
System.out.println("Receiving a binary-encoded message:");
break;
case "JSON":
decoder = DecoderFactory.get().jsonDecoder(State.getClassSchema(), inputStream);
System.out.println("Receiving a JSON-encoded message:");
break;
default:
System.out.println("Unknown message type; nacking.");
consumer.nack();
break;
}
// Obtain an object of the generated Avro class using the decoder.
State state = reader.read(null, decoder);
System.out.println(state.getName() + " is abbreviated as " + state.getPostAbbr());
// Ack the message.
consumer.ack();
} catch (IOException e) {
System.err.println(e);
// If we failed to process the message, nack it.
consumer.nack();
}
};
Subscriber subscriber = null;
try {
subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
subscriber.startAsync().awaitRunning();
System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
subscriber.awaitTerminated(30, TimeUnit.SECONDS);
} catch (TimeoutException timeoutException) {
subscriber.stopAsync();
}
}
}
Node.js
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const timeout = 60;
// Imports the Google Cloud client library
const {PubSub, Schema, Encodings} = require('@google-cloud/pubsub');
// And the Apache Avro library; this lacks typings, so for
// TypeScript, a few synthetic types were created.
const avro = require('avro-js');
// Creates a client; cache this for further use
const pubSubClient = new PubSub();
async function listenForAvroRecordsWithRevisions(
subscriptionNameOrId,
timeout
) {
// References an existing subscription
const subscription = pubSubClient.subscription(subscriptionNameOrId);
// Cache decoders for various schema revisions.
const revisionReaders = new Map();
// We need a schema admin service client to retrieve revisions.
const schemaClient = await pubSubClient.getSchemaClient();
// Create an event handler to handle messages
let messageCount = 0;
const messageHandler = async message => {
// Get the schema metadata from the message.
const schemaMetadata = Schema.metadataFromMessage(message.attributes);
let reader;
try {
// Do we already have a decoder for this revision?
const revision = schemaMetadata.revision;
if (revisionReaders.has(revision)) {
reader = revisionReaders.get(revision);
} else {
// This is the first time we are seeing this revision. We need to
// fetch the schema and cache its decoder.
const [schema] = await schemaClient.getSchema({
name: `${schemaMetadata.name}@${schemaMetadata.revision}`,
});
reader = avro.parse(schema.definition);
revisionReaders.set(revision, reader);
}
} catch (err) {
console.log('Could not get schema', err);
message.nack();
return;
}
let result;
switch (schemaMetadata.encoding) {
case Encodings.Binary:
result = reader.fromBuffer(message.data);
break;
case Encodings.Json:
result = reader.fromString(message.data.toString());
break;
default:
console.log(`Unknown schema encoding: ${schemaMetadata.encoding}`);
message.nack();
return;
}
console.log(`Received message ${message.id}:`);
console.log(`\tData: ${JSON.stringify(result, null, 4)}`);
console.log(`\tAttributes: ${message.attributes}`);
console.log(
`\tProvince ${result.name} is abbreviated as ${result.post_abbr}`
);
messageCount += 1;
// Ack the message.
message.ack();
};
// Listen for new messages until timeout is hit
subscription.on('message', messageHandler);
setTimeout(() => {
subscription.removeListener('message', messageHandler);
console.log(`${messageCount} message(s) received.`);
}, timeout * 1000);
}
Node.js
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const timeout = 60;
// Imports the Google Cloud client library
import {PubSub, Schema, Encodings, Message} from '@google-cloud/pubsub';
// And the Apache Avro library; this lacks typings, so for
// TypeScript, a few synthetic types were created.
import * as avro from 'avro-js';
// Creates a client; cache this for further use
const pubSubClient = new PubSub();
interface ProvinceObject {
name: string;
post_abbr: string;
}
async function listenForAvroRecordsWithRevisions(
subscriptionNameOrId: string,
timeout: number
) {
// References an existing subscription
const subscription = pubSubClient.subscription(subscriptionNameOrId);
// Cache decoders for various schema revisions.
const revisionReaders = new Map<string, avro.Parser>();
// We need a schema admin service client to retrieve revisions.
const schemaClient = await pubSubClient.getSchemaClient();
// Create an event handler to handle messages
let messageCount = 0;
const messageHandler = async (message: Message) => {
// Get the schema metadata from the message.
const schemaMetadata = Schema.metadataFromMessage(message.attributes);
let reader: avro.Parser;
try {
// Do we already have a decoder for this revision?
const revision = schemaMetadata.revision!;
if (revisionReaders.has(revision)) {
reader = revisionReaders.get(revision)!;
} else {
// This is the first time we are seeing this revision. We need to
// fetch the schema and cache its decoder.
const [schema] = await schemaClient.getSchema({
name: `${schemaMetadata.name}@${schemaMetadata.revision}`,
});
reader = avro.parse(schema.definition!);
revisionReaders.set(revision, reader);
}
} catch (err: unknown) {
console.log('Could not get schema', err);
message.nack();
return;
}
let result: ProvinceObject | undefined;
switch (schemaMetadata.encoding) {
case Encodings.Binary:
result = reader.fromBuffer(message.data);
break;
case Encodings.Json:
result = reader.fromString(message.data.toString());
break;
default:
console.log(`Unknown schema encoding: ${schemaMetadata.encoding}`);
message.nack();
return;
}
console.log(`Received message ${message.id}:`);
console.log(`\tData: ${JSON.stringify(result, null, 4)}`);
console.log(`\tAttributes: ${message.attributes}`);
console.log(
`\tProvince ${result?.name} is abbreviated as ${result?.post_abbr}`
);
messageCount += 1;
// Ack the message.
message.ack();
};
// Listen for new messages until timeout is hit
subscription.on('message', messageHandler);
setTimeout(() => {
subscription.removeListener('message', messageHandler);
console.log(`${messageCount} message(s) received.`);
}, timeout * 1000);
}
Python
在尝试此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档。
如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证。
import avro.schema as schema
from avro.io import BinaryDecoder, DatumReader
from concurrent.futures import TimeoutError
import io
import json
from google.api_core.exceptions import NotFound
from google.cloud.pubsub import SchemaServiceClient, SubscriberClient
schema_client = SchemaServiceClient()
# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# avsc_file = "path/to/an/avro/schema/file/(.avsc)/formatted/in/json"
# Number of seconds the subscriber listens for messages
# timeout = 5.0
subscriber = SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)
with open(avsc_file, "rb") as file:
reader_avro_schema = schema.parse(file.read())
# Dict to keep readers for different schema revisions.
revisions_to_readers = {}
def callback(message: pubsub_v1.subscriber.message.Message) -> None:
# Get the message serialization type.
schema_name = message.attributes.get("googclient_schemaname")
schema_revision_id = message.attributes.get("googclient_schemarevisionid")
encoding = message.attributes.get("googclient_schemaencoding")
if schema_revision_id not in revisions_to_readers:
schema_path = schema_name + "@" + schema_revision_id
try:
received_avro_schema = schema_client.get_schema(
request={"name": schema_path}
)
except NotFound:
print(f"{schema_path} not found.")
message.nack()
return
writer_avro_schema = schema.parse(received_avro_schema.definition)
revisions_to_readers[schema_revision_id] = DatumReader(
writer_avro_schema, reader_avro_schema
)
reader = revisions_to_readers[schema_revision_id]
# Deserialize the message data accordingly.
if encoding == "BINARY":
bout = io.BytesIO(message.data)
decoder = BinaryDecoder(bout)
message_data = reader.read(decoder)
print(f"Received a binary-encoded message:\n{message_data}")
elif encoding == "JSON":
message_data = json.loads(message.data)
print(f"Received a JSON-encoded message:\n{message_data}")
else:
print(f"Received a message with no encoding:\n{message}")
message.nack()
message.ack()
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..\n")
# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
try:
# When `timeout` is not set, result() will block indefinitely,
# unless an exception occurs first.
streaming_pull_future.result(timeout=timeout)
except TimeoutError:
streaming_pull_future.cancel() # Trigger the shutdown.
streaming_pull_future.result() # Block until the shutdown is complete.
后续步骤
如需搜索和过滤其他 Google Cloud 产品的代码示例,请参阅 Google Cloud 示例浏览器。