接收协议缓冲区架构类型的消息,将消息数据转换为生成的 Proto 类的对象,然后确认该消息。
深入探索
如需查看包含此代码示例的详细文档,请参阅以下内容:
代码示例
C++
试用此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 C++ 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C++ API 参考文档。
要向 Pub/Sub 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证。
namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::StatusOr;
return [](pubsub::Subscriber subscriber) {
auto session = subscriber.Subscribe(
[](pubsub::Message const& m, pubsub::AckHandler h) {
google::cloud::pubsub::samples::State state;
state.ParseFromString(std::string{m.data()});
std::cout << "Message contents: " << state.DebugString() << "\n";
std::move(h).ack();
});
return session;
}
C#
试用此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 C# 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C# API 参考文档。
要向 Pub/Sub 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证。
using Google.Api.Gax;
using Google.Cloud.PubSub.V1;
using System;
using System.Threading;
using System.Threading.Tasks;
public class PullProtoMessagesAsyncSample
{
public async Task<int> PullProtoMessagesAsync(string projectId, string subscriptionId, bool acknowledge)
{
SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);
int messageCount = 0;
SubscriberClient subscriber = await new SubscriberClientBuilder
{
SubscriptionName = subscriptionName,
Settings = new SubscriberClient.Settings
{
AckExtensionWindow = TimeSpan.FromSeconds(4),
AckDeadline = TimeSpan.FromSeconds(10),
FlowControlSettings = new FlowControlSettings(maxOutstandingElementCount: 100, maxOutstandingByteCount: 10240)
}
}.BuildAsync();
// SubscriberClient runs your message handle function on multiple
// threads to maximize throughput.
Task startTask = subscriber.StartAsync((PubsubMessage message, CancellationToken cancel) =>
{
string encoding = message.Attributes["googclient_schemaencoding"];
Utilities.State state = null;
switch (encoding)
{
case "BINARY":
state = Utilities.State.Parser.ParseFrom(message.Data.ToByteArray());
break;
case "JSON":
state = Utilities.State.Parser.ParseJson(message.Data.ToStringUtf8());
break;
default:
Console.WriteLine($"Encoding not provided in message.");
break;
}
Console.WriteLine($"Message {message.MessageId}: {state}");
Interlocked.Increment(ref messageCount);
return Task.FromResult(acknowledge ? SubscriberClient.Reply.Ack : SubscriberClient.Reply.Nack);
});
// Run for 5 seconds.
await Task.Delay(5000);
await subscriber.StopAsync(CancellationToken.None);
// Lets make sure that the start task finished successfully after the call to stop.
await startTask;
return messageCount;
}
}
Go
试用此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 Go 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Go API 参考文档。
要向 Pub/Sub 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证。
import (
"context"
"fmt"
"io"
"sync"
"time"
"cloud.google.com/go/pubsub"
statepb "github.com/GoogleCloudPlatform/golang-samples/internal/pubsub/schemas"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
)
func subscribeWithProtoSchema(w io.Writer, projectID, subID, protoFile string) error {
// projectID := "my-project-id"
// subID := "my-sub"
// protoFile = "path/to/a/proto/schema/file(.proto)/formatted/in/protocol/buffers"
ctx := context.Background()
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %w", err)
}
// Create an instance of the message to be decoded (a single U.S. state).
state := &statepb.State{}
sub := client.Subscription(subID)
ctx2, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
var mu sync.Mutex
sub.Receive(ctx2, func(ctx context.Context, msg *pubsub.Message) {
mu.Lock()
defer mu.Unlock()
encoding := msg.Attributes["googclient_schemaencoding"]
if encoding == "BINARY" {
if err := proto.Unmarshal(msg.Data, state); err != nil {
fmt.Fprintf(w, "proto.Unmarshal err: %v\n", err)
msg.Nack()
return
}
fmt.Printf("Received a binary-encoded message:\n%#v\n", state)
} else if encoding == "JSON" {
if err := protojson.Unmarshal(msg.Data, state); err != nil {
fmt.Fprintf(w, "proto.Unmarshal err: %v\n", err)
msg.Nack()
return
}
fmt.Fprintf(w, "Received a JSON-encoded message:\n%#v\n", state)
} else {
fmt.Fprintf(w, "Unknown message type(%s), nacking\n", encoding)
msg.Nack()
return
}
fmt.Fprintf(w, "%s is abbreviated as %s\n", state.Name, state.PostAbbr)
msg.Ack()
})
return nil
}
Java
试用此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 Java 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Java API 参考文档。
要向 Pub/Sub 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证。
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import utilities.StateProto.State;
public class SubscribeWithProtoSchemaExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
// Use an existing subscription.
String subscriptionId = "your-subscription-id";
subscribeWithProtoSchemaExample(projectId, subscriptionId);
}
public static void subscribeWithProtoSchemaExample(String projectId, String subscriptionId) {
ProjectSubscriptionName subscriptionName =
ProjectSubscriptionName.of(projectId, subscriptionId);
MessageReceiver receiver =
(PubsubMessage message, AckReplyConsumer consumer) -> {
ByteString data = message.getData();
// Get the schema encoding type.
String encoding = message.getAttributesMap().get("googclient_schemaencoding");
block:
try {
switch (encoding) {
case "BINARY":
// Obtain an object of the generated proto class.
State state = State.parseFrom(data);
System.out.println("Received a BINARY-formatted message: " + state);
break;
case "JSON":
State.Builder stateBuilder = State.newBuilder();
JsonFormat.parser().merge(data.toStringUtf8(), stateBuilder);
System.out.println("Received a JSON-formatted message:" + stateBuilder.build());
break;
default:
break block;
}
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
consumer.ack();
System.out.println("Ack'ed the message");
};
// Create subscriber client.
Subscriber subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
try {
subscriber.startAsync().awaitRunning();
System.out.printf("Listening for messages on %s:\n", subscriptionName);
subscriber.awaitTerminated(30, TimeUnit.SECONDS);
} catch (TimeoutException timeoutException) {
subscriber.stopAsync();
}
}
}
Node.js
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const timeout = 60;
// Imports the Google Cloud client library
const {PubSub, Schema, Encodings} = require('@google-cloud/pubsub');
// And the protobufjs library
const protobuf = require('protobufjs');
// Creates a client; cache this for further use
const pubSubClient = new PubSub();
async function listenForProtobufMessages(subscriptionNameOrId, timeout) {
// References an existing subscription
const subscription = pubSubClient.subscription(subscriptionNameOrId);
// Make an decoder using the protobufjs library.
//
// Since we're providing the test message for a specific schema here, we'll
// also code in the path to a sample proto definition.
const root = protobuf.loadSync('system-test/fixtures/provinces.proto');
const Province = root.lookupType('utilities.Province');
// Create an event handler to handle messages
let messageCount = 0;
const messageHandler = async message => {
// "Ack" (acknowledge receipt of) the message
message.ack();
// Get the schema metadata from the message.
const schemaMetadata = Schema.metadataFromMessage(message.attributes);
let result;
switch (schemaMetadata.encoding) {
case Encodings.Binary:
result = Province.decode(message.data);
break;
case Encodings.Json:
// This doesn't require decoding with the protobuf library,
// since it's plain JSON. But you can still validate it against
// your schema.
result = JSON.parse(message.data.toString());
console.log(`Validation of JSON: ${Province.verify(result)}`);
break;
default:
console.log(`Unknown schema encoding: ${schemaMetadata.encoding}`);
break;
}
console.log(`Received message ${message.id}:`);
console.log(`\tData: ${JSON.stringify(result, null, 4)}`);
console.log(`\tAttributes: ${JSON.stringify(message.attributes, null, 4)}`);
messageCount += 1;
};
// Listen for new messages until timeout is hit
subscription.on('message', messageHandler);
setTimeout(() => {
subscription.removeListener('message', messageHandler);
console.log(`${messageCount} message(s) received.`);
}, timeout * 1000);
}
Node.js
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const timeout = 60;
// Imports the Google Cloud client library
import {PubSub, Schema, Encodings, Message} from '@google-cloud/pubsub';
// And the protobufjs library
import * as protobuf from 'protobufjs';
// Creates a client; cache this for further use
const pubSubClient = new PubSub();
async function listenForProtobufMessages(
subscriptionNameOrId: string,
timeout: number
) {
// References an existing subscription
const subscription = pubSubClient.subscription(subscriptionNameOrId);
// Make an decoder using the protobufjs library.
//
// Since we're providing the test message for a specific schema here, we'll
// also code in the path to a sample proto definition.
const root = protobuf.loadSync('system-test/fixtures/provinces.proto');
const Province = root.lookupType('utilities.Province');
// Create an event handler to handle messages
let messageCount = 0;
const messageHandler = async (message: Message) => {
// "Ack" (acknowledge receipt of) the message
message.ack();
// Get the schema metadata from the message.
const schemaMetadata = Schema.metadataFromMessage(message.attributes);
let result;
switch (schemaMetadata.encoding) {
case Encodings.Binary:
result = Province.decode(message.data);
break;
case Encodings.Json:
// This doesn't require decoding with the protobuf library,
// since it's plain JSON. But you can still validate it against
// your schema.
result = JSON.parse(message.data.toString());
console.log(`Validation of JSON: ${Province.verify(result)}`);
break;
default:
console.log(`Unknown schema encoding: ${schemaMetadata.encoding}`);
break;
}
console.log(`Received message ${message.id}:`);
console.log(`\tData: ${JSON.stringify(result, null, 4)}`);
console.log(`\tAttributes: ${JSON.stringify(message.attributes, null, 4)}`);
messageCount += 1;
};
// Listen for new messages until timeout is hit
subscription.on('message', messageHandler);
setTimeout(() => {
subscription.removeListener('message', messageHandler);
console.log(`${messageCount} message(s) received.`);
}, timeout * 1000);
}
PHP
试用此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 PHP 设置说明进行操作。如需了解详情,请参阅 Pub/Sub PHP API 参考文档。
要向 Pub/Sub 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证。
use Google\Cloud\PubSub\PubSubClient;
/**
* Subscribe and pull messages using a protocol buffer schema.
*
* Relies on a proto message of the following form:
* ```
* syntax = "proto3";
*
* package utilities;
*
* message StateProto {
* string name = 1;
* string post_abbr = 2;
* }
* ```
*
* @param string $projectId
* @param string $subscriptionId
*/
function subscribe_proto_messages($projectId, $subscriptionId)
{
$pubsub = new PubSubClient([
'projectId' => $projectId,
]);
$subscription = $pubsub->subscription($subscriptionId);
$messages = $subscription->pull();
foreach ($messages as $message) {
$decodedMessageData = '';
$encoding = $message->attribute('googclient_schemaencoding');
switch ($encoding) {
case 'BINARY':
$protobufMessage = new \Utilities\StateProto();
$protobufMessage->mergeFromString($message->data());
$decodedMessageData = $protobufMessage->serializeToJsonString();
break;
case 'JSON':
$decodedMessageData = $message->data();
break;
}
printf('Received a %d-encoded message %s', $encoding, $decodedMessageData);
}
}
Python
试用此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 Python 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Python API 参考文档。
要向 Pub/Sub 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证。
from concurrent.futures import TimeoutError
from google.cloud.pubsub import SubscriberClient
from google.protobuf.json_format import Parse
from utilities import us_states_pb2
# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# Number of seconds the subscriber listens for messages
# timeout = 5.0
subscriber = SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)
# Instantiate a protoc-generated class defined in `us-states.proto`.
state = us_states_pb2.StateProto()
def callback(message: pubsub_v1.subscriber.message.Message) -> None:
# Get the message serialization type.
encoding = message.attributes.get("googclient_schemaencoding")
# Deserialize the message data accordingly.
if encoding == "BINARY":
state.ParseFromString(message.data)
print(f"Received a binary-encoded message:\n{state}")
elif encoding == "JSON":
Parse(message.data, state)
print(f"Received a JSON-encoded message:\n{state}")
else:
print(f"Received a message with no encoding:\n{message}")
message.ack()
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..\n")
# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
try:
# When `timeout` is not set, result() will block indefinitely,
# unless an exception occurs first.
streaming_pull_future.result(timeout=timeout)
except TimeoutError:
streaming_pull_future.cancel() # Trigger the shutdown.
streaming_pull_future.result() # Block until the shutdown is complete.
Ruby
试用此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 Ruby 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Ruby API 参考文档。
要向 Pub/Sub 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证。
# subscription_id = "your-subscription-id"
pubsub = Google::Cloud::Pubsub.new
subscription = pubsub.subscription subscription_id
subscriber = subscription.listen do |received_message|
encoding = received_message.attributes["googclient_schemaencoding"]
case encoding
when "BINARY"
state = Utilities::StateProto.decode received_message.data
puts "Received a binary-encoded message:\n#{state}"
when "JSON"
require "json"
state = Utilities::StateProto.decode_json received_message.data
puts "Received a JSON-encoded message:\n#{state}"
else
"Received a message with no encoding:\n#{received_message.message_id}"
end
received_message.acknowledge!
end
subscriber.start
# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60
subscriber.stop.wait!
后续步骤
如需搜索和过滤其他 Google Cloud 产品的代码示例,请参阅 Google Cloud 示例浏览器。