接收 Avro 架构类型的消息,将消息数据转换为生成的 Avro 类的对象,然后确认该消息。
深入探索
如需查看包含此代码示例的详细文档,请参阅以下内容:
代码示例
C++
试用此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 C++ 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C++ API 参考文档。
要向 Pub/Sub 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证。
namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::StatusOr;
return [](pubsub::Subscriber subscriber) {
auto session = subscriber.Subscribe(
[](pubsub::Message const& m, pubsub::AckHandler h) {
std::cout << "Message contents: " << m.data() << "\n";
std::move(h).ack();
});
return session;
}
C#
试用此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 C# 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C# API 参考文档。
要向 Pub/Sub 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证。
using Avro.IO;
using Avro.Specific;
using Google.Api.Gax;
using Google.Cloud.PubSub.V1;
using Newtonsoft.Json;
using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
public class PullAvroMessagesAsyncSample
{
public async Task<int> PullAvroMessagesAsync(string projectId, string subscriptionId, bool acknowledge)
{
SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);
int messageCount = 0;
SubscriberClient subscriber = await new SubscriberClientBuilder
{
SubscriptionName = subscriptionName,
Settings = new SubscriberClient.Settings
{
AckExtensionWindow = TimeSpan.FromSeconds(4),
AckDeadline = TimeSpan.FromSeconds(10),
FlowControlSettings = new FlowControlSettings(maxOutstandingElementCount: 100, maxOutstandingByteCount: 10240)
}
}.BuildAsync();
// SubscriberClient runs your message handle function on multiple
// threads to maximize throughput.
Task startTask = subscriber.StartAsync((PubsubMessage message, CancellationToken cancel) =>
{
string encoding = message.Attributes["googclient_schemaencoding"];
AvroUtilities.State state = new AvroUtilities.State();
switch (encoding)
{
case "BINARY":
using (var ms = new MemoryStream(message.Data.ToByteArray()))
{
var decoder = new BinaryDecoder(ms);
var reader = new SpecificDefaultReader(state.Schema, state.Schema);
reader.Read<AvroUtilities.State>(state, decoder);
}
break;
case "JSON":
state = JsonConvert.DeserializeObject<AvroUtilities.State>(message.Data.ToStringUtf8());
break;
default:
Console.WriteLine($"Encoding not provided in message.");
break;
}
Console.WriteLine($"Message {message.MessageId}: {state}");
Interlocked.Increment(ref messageCount);
return Task.FromResult(acknowledge ? SubscriberClient.Reply.Ack : SubscriberClient.Reply.Nack);
});
// Run for 5 seconds.
await Task.Delay(5000);
await subscriber.StopAsync(CancellationToken.None);
// Lets make sure that the start task finished successfully after the call to stop.
await startTask;
return messageCount;
}
}
Go
试用此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 Go 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Go API 参考文档。
要向 Pub/Sub 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证。
import (
"context"
"fmt"
"io"
"os"
"sync"
"time"
"cloud.google.com/go/pubsub"
"github.com/linkedin/goavro/v2"
)
func subscribeWithAvroSchema(w io.Writer, projectID, subID, avscFile string) error {
// projectID := "my-project-id"
// topicID := "my-topic"
// avscFile = "path/to/an/avro/schema/file(.avsc)/formatted/in/json"
ctx := context.Background()
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %w", err)
}
avroSchema, err := os.ReadFile(avscFile)
if err != nil {
return fmt.Errorf("ioutil.ReadFile err: %w", err)
}
codec, err := goavro.NewCodec(string(avroSchema))
if err != nil {
return fmt.Errorf("goavro.NewCodec err: %w", err)
}
sub := client.Subscription(subID)
ctx2, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
var mu sync.Mutex
sub.Receive(ctx2, func(ctx context.Context, msg *pubsub.Message) {
mu.Lock()
defer mu.Unlock()
encoding := msg.Attributes["googclient_schemaencoding"]
var state map[string]interface{}
if encoding == "BINARY" {
data, _, err := codec.NativeFromBinary(msg.Data)
if err != nil {
fmt.Fprintf(w, "codec.NativeFromBinary err: %v\n", err)
msg.Nack()
return
}
fmt.Fprintf(w, "Received a binary-encoded message:\n%#v\n", data)
state = data.(map[string]interface{})
} else if encoding == "JSON" {
data, _, err := codec.NativeFromTextual(msg.Data)
if err != nil {
fmt.Fprintf(w, "codec.NativeFromTextual err: %v\n", err)
msg.Nack()
return
}
fmt.Fprintf(w, "Received a JSON-encoded message:\n%#v\n", data)
state = data.(map[string]interface{})
} else {
fmt.Fprintf(w, "Unknown message type(%s), nacking\n", encoding)
msg.Nack()
return
}
fmt.Fprintf(w, "%s is abbreviated as %s\n", state["name"], state["post_abbr"])
msg.Ack()
})
return nil
}
Java
试用此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 Java 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Java API 参考文档。
要向 Pub/Sub 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证。
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import utilities.State;
public class SubscribeWithAvroSchemaExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
// Use an existing subscription.
String subscriptionId = "your-subscription-id";
subscribeWithAvroSchemaExample(projectId, subscriptionId);
}
public static void subscribeWithAvroSchemaExample(String projectId, String subscriptionId) {
ProjectSubscriptionName subscriptionName =
ProjectSubscriptionName.of(projectId, subscriptionId);
// Prepare a reader for the encoded Avro records.
SpecificDatumReader<State> reader = new SpecificDatumReader<>(State.getClassSchema());
// Instantiate an asynchronous message receiver.
MessageReceiver receiver =
(PubsubMessage message, AckReplyConsumer consumer) -> {
ByteString data = message.getData();
// Get the schema encoding type.
String encoding = message.getAttributesMap().get("googclient_schemaencoding");
// Send the message data to a byte[] input stream.
InputStream inputStream = new ByteArrayInputStream(data.toByteArray());
Decoder decoder = null;
// Prepare an appropriate decoder for the message data in the input stream
// based on the schema encoding type.
block:
try {
switch (encoding) {
case "BINARY":
decoder = DecoderFactory.get().directBinaryDecoder(inputStream, /*reuse=*/ null);
System.out.println("Receiving a binary-encoded message:");
break;
case "JSON":
decoder = DecoderFactory.get().jsonDecoder(State.getClassSchema(), inputStream);
System.out.println("Receiving a JSON-encoded message:");
break;
default:
break block;
}
// Obtain an object of the generated Avro class using the decoder.
State state = reader.read(null, decoder);
System.out.println(state.getName() + " is abbreviated as " + state.getPostAbbr());
} catch (IOException e) {
System.err.println(e);
}
// Ack the message.
consumer.ack();
};
Subscriber subscriber = null;
try {
subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
subscriber.startAsync().awaitRunning();
System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
subscriber.awaitTerminated(30, TimeUnit.SECONDS);
} catch (TimeoutException timeoutException) {
subscriber.stopAsync();
}
}
}
Node.js
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const timeout = 60;
// Imports the Google Cloud client library
const {PubSub, Schema, Encodings} = require('@google-cloud/pubsub');
// Node FS library, to load definitions
const fs = require('fs');
// And the Apache Avro library
const avro = require('avro-js');
// Creates a client; cache this for further use
const pubSubClient = new PubSub();
function listenForAvroRecords(subscriptionNameOrId, timeout) {
// References an existing subscription
const subscription = pubSubClient.subscription(subscriptionNameOrId);
// Make an encoder using the official avro-js library.
const definition = fs
.readFileSync('system-test/fixtures/provinces.avsc')
.toString();
const type = avro.parse(definition);
// Create an event handler to handle messages
let messageCount = 0;
const messageHandler = async message => {
// "Ack" (acknowledge receipt of) the message
message.ack();
// Get the schema metadata from the message.
const schemaMetadata = Schema.metadataFromMessage(message.attributes);
let result;
switch (schemaMetadata.encoding) {
case Encodings.Binary:
result = type.fromBuffer(message.data);
break;
case Encodings.Json:
result = type.fromString(message.data.toString());
break;
default:
console.log(`Unknown schema encoding: ${schemaMetadata.encoding}`);
break;
}
console.log(`Received message ${message.id}:`);
console.log(`\tData: ${JSON.stringify(result, null, 4)}`);
console.log(`\tAttributes: ${message.attributes}`);
messageCount += 1;
};
// Listen for new messages until timeout is hit
subscription.on('message', messageHandler);
setTimeout(() => {
subscription.removeListener('message', messageHandler);
console.log(`${messageCount} message(s) received.`);
}, timeout * 1000);
}
Node.js
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const timeout = 60;
// Imports the Google Cloud client library
import {PubSub, Schema, Encodings, Message} from '@google-cloud/pubsub';
// Node FS library, to load definitions
import * as fs from 'fs';
// And the Apache Avro library
import * as avro from 'avro-js';
// Creates a client; cache this for further use
const pubSubClient = new PubSub();
function listenForAvroRecords(subscriptionNameOrId: string, timeout: number) {
// References an existing subscription
const subscription = pubSubClient.subscription(subscriptionNameOrId);
// Make an encoder using the official avro-js library.
const definition = fs
.readFileSync('system-test/fixtures/provinces.avsc')
.toString();
const type = avro.parse(definition);
// Create an event handler to handle messages
let messageCount = 0;
const messageHandler = async (message: Message) => {
// "Ack" (acknowledge receipt of) the message
message.ack();
// Get the schema metadata from the message.
const schemaMetadata = Schema.metadataFromMessage(message.attributes);
let result: object | undefined;
switch (schemaMetadata.encoding) {
case Encodings.Binary:
result = type.fromBuffer(message.data);
break;
case Encodings.Json:
result = type.fromString(message.data.toString());
break;
default:
console.log(`Unknown schema encoding: ${schemaMetadata.encoding}`);
break;
}
console.log(`Received message ${message.id}:`);
console.log(`\tData: ${JSON.stringify(result, null, 4)}`);
console.log(`\tAttributes: ${message.attributes}`);
messageCount += 1;
};
// Listen for new messages until timeout is hit
subscription.on('message', messageHandler);
setTimeout(() => {
subscription.removeListener('message', messageHandler);
console.log(`${messageCount} message(s) received.`);
}, timeout * 1000);
}
PHP
试用此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 PHP 设置说明进行操作。如需了解详情,请参阅 Pub/Sub PHP API 参考文档。
要向 Pub/Sub 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证。
use Google\Cloud\PubSub\PubSubClient;
/**
* Subscribe and pull messages using an AVRO schema.
*
* @param string $projectId
* @param string $subscriptionId
*/
function subscribe_avro_records($projectId, $subscriptionId, $definitionFile)
{
$pubsub = new PubSubClient([
'projectId' => $projectId,
]);
$subscription = $pubsub->subscription($subscriptionId);
$definition = file_get_contents($definitionFile);
$messages = $subscription->pull();
foreach ($messages as $message) {
$decodedMessageData = '';
$encoding = $message->attribute('googclient_schemaencoding');
switch ($encoding) {
case 'BINARY':
$io = new \AvroStringIO($message->data());
$schema = \AvroSchema::parse($definition);
$reader = new \AvroIODatumReader($schema);
$decoder = new \AvroIOBinaryDecoder($io);
$decodedMessageData = json_encode($reader->read($decoder));
break;
case 'JSON':
$decodedMessageData = $message->data();
break;
}
printf('Received a %d-encoded message %s', $encoding, $decodedMessageData);
}
}
Python
试用此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 Python 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Python API 参考文档。
要向 Pub/Sub 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证。
import avro.schema as schema
from avro.io import BinaryDecoder, DatumReader
from concurrent.futures import TimeoutError
import io
import json
from google.cloud.pubsub import SubscriberClient
# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# avsc_file = "path/to/an/avro/schema/file/(.avsc)/formatted/in/json"
# Number of seconds the subscriber listens for messages
# timeout = 5.0
subscriber = SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)
with open(avsc_file, "rb") as file:
avro_schema = schema.parse(file.read())
def callback(message: pubsub_v1.subscriber.message.Message) -> None:
# Get the message serialization type.
encoding = message.attributes.get("googclient_schemaencoding")
# Deserialize the message data accordingly.
if encoding == "BINARY":
bout = io.BytesIO(message.data)
decoder = BinaryDecoder(bout)
reader = DatumReader(avro_schema)
message_data = reader.read(decoder)
print(f"Received a binary-encoded message:\n{message_data}")
elif encoding == "JSON":
message_data = json.loads(message.data)
print(f"Received a JSON-encoded message:\n{message_data}")
else:
print(f"Received a message with no encoding:\n{message}")
message.ack()
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..\n")
# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
try:
# When `timeout` is not set, result() will block indefinitely,
# unless an exception occurs first.
streaming_pull_future.result(timeout=timeout)
except TimeoutError:
streaming_pull_future.cancel() # Trigger the shutdown.
streaming_pull_future.result() # Block until the shutdown is complete.
Ruby
试用此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 Ruby 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Ruby API 参考文档。
要向 Pub/Sub 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证。
# subscription_id = "your-subscription-id"
# avsc_file = "path/to/an/avro/schema/file/(.avsc)/formatted/in/json"
pubsub = Google::Cloud::Pubsub.new
subscription = pubsub.subscription subscription_id
subscriber = subscription.listen do |received_message|
encoding = received_message.attributes["googclient_schemaencoding"]
case encoding
when "BINARY"
require "avro"
avro_schema = Avro::Schema.parse File.read(avsc_file)
buffer = StringIO.new received_message.data
decoder = Avro::IO::BinaryDecoder.new buffer
reader = Avro::IO::DatumReader.new avro_schema
message_data = reader.read decoder
puts "Received a binary-encoded message:\n#{message_data}"
when "JSON"
require "json"
message_data = JSON.parse received_message.data
puts "Received a JSON-encoded message:\n#{message_data}"
else
"Received a message with no encoding:\n#{received_message.message_id}"
end
received_message.acknowledge!
end
subscriber.start
# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60
subscriber.stop.wait!
后续步骤
如需搜索和过滤其他 Google Cloud 产品的代码示例,请参阅 Google Cloud 示例浏览器。