本文档介绍了如何向具有架构的主题发布消息。
准备工作
在配置发布工作流之前,请确保您已完成以下任务:
- 了解 Pub/Sub 架构的运作方式。
- 将架构与主题相关联。
所需的角色
如需获得向主题发布消息所需的权限,请让您的管理员为您授予相应主题的 Pub/Sub Publisher (roles/pubsub.publisher
) IAM 角色。
如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限。
发布具有架构的消息
您可以向与架构关联的主题发布消息。您必须按照创建主题时指定的架构和格式编码消息。如果消息与允许的修订版本范围内的架构修订版本中的任何修订版本匹配,则消息与与主题关联的架构匹配。系统会按允许的最新修订版本对消息进行评估,直到找到匹配项或达到允许的旧修订版本为止。Pub/Sub 会向成功发布到与架构关联的主题的消息添加以下属性:
googclient_schemaname
:用于验证的架构的名称。googclient_schemaencoding
:消息的编码(JSON 或二进制)。googclient_schemarevisionid
:用于解析和验证消息的架构的修订 ID。每个修订版本都具有一个与其关联的唯一修订 ID。修订版本 ID 是自动生成的 8 位字符 UUID。
如果消息与主题允许的任何架构修订都不匹配,Pub/Sub 会向发布请求返回 INVALID_ARGUMENT
错误。
Pub/Sub 仅在发布时根据架构修订版本评估消息。发布消息后,提交新的架构修订版本或更改与主题关联的架构不会重新评估该消息,也不会更改任何附加的架构消息属性。
您可以使用 Google Cloud 控制台、gcloud CLI、Pub/Sub API 或 Cloud 客户端库,将消息发布到 Google Cloud 项目中具有关联架构的主题。
gcloud
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
使用 gcloud pubsub topics publish 命令发布示例消息。
gcloud pubsub topics publish TOPIC_ID \ --message=MESSAGE
替换以下内容:
TOPIC_ID:您已创建的主题的名称。
MESSAGE:发布到主题的消息。示例消息可以是
{"name": "Alaska", "post_abbr": "AK"}
。
C++
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C++ 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C++ API 参考文档。
Avronamespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::StatusOr;
[](pubsub::Publisher publisher) {
auto constexpr kNewYork =
R"js({ "name": "New York", "post_abbr": "NY" })js";
auto constexpr kPennsylvania =
R"js({ "name": "Pennsylvania", "post_abbr": "PA" })js";
std::vector<future<void>> done;
auto handler = [](future<StatusOr<std::string>> f) {
auto id = f.get();
if (!id) throw std::move(id).status();
};
for (auto const* data : {kNewYork, kPennsylvania}) {
done.push_back(
publisher.Publish(pubsub::MessageBuilder{}.SetData(data).Build())
.then(handler));
}
// Block until all messages are published.
for (auto& d : done) d.get();
}
namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::StatusOr;
[](pubsub::Publisher publisher) {
std::vector<std::pair<std::string, std::string>> states{
{"New York", "NY"},
{"Pennsylvania", "PA"},
};
std::vector<future<void>> done;
auto handler = [](future<StatusOr<std::string>> f) {
auto id = f.get();
if (!id) throw std::move(id).status();
};
for (auto& data : states) {
google::cloud::pubsub::samples::State state;
state.set_name(data.first);
state.set_post_abbr(data.second);
done.push_back(publisher
.Publish(pubsub::MessageBuilder{}
.SetData(state.SerializeAsString())
.Build())
.then(handler));
}
// Block until all messages are published.
for (auto& d : done) d.get();
}
C#
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C# 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub C# API 参考文档。
Avro
using Avro.IO;
using Avro.Specific;
using Google.Cloud.PubSub.V1;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
public class PublishAvroMessagesAsyncSample
{
public async Task<int> PublishAvroMessagesAsync(string projectId, string topicId, IEnumerable<AvroUtilities.State> messageStates)
{
TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
PublisherClient publisher = await PublisherClient.CreateAsync(topicName);
PublisherServiceApiClient publishApi = PublisherServiceApiClient.Create();
var topic = publishApi.GetTopic(topicName);
int publishedMessageCount = 0;
var publishTasks = messageStates.Select(async state =>
{
try
{
string messageId = null;
switch (topic.SchemaSettings.Encoding)
{
case Encoding.Binary:
using (var ms = new MemoryStream())
{
var encoder = new BinaryEncoder(ms);
var writer = new SpecificDefaultWriter(state.Schema);
writer.Write(state, encoder);
messageId = await publisher.PublishAsync(ms.ToArray());
}
break;
case Encoding.Json:
var jsonMessage = AvroUtilities.StateUtils.StateToJsonString(state);
messageId = await publisher.PublishAsync(jsonMessage);
break;
}
Console.WriteLine($"Published message {messageId}");
Interlocked.Increment(ref publishedMessageCount);
}
catch (Exception exception)
{
Console.WriteLine($"An error occurred when publishing message {state}: {exception.Message}");
}
});
await Task.WhenAll(publishTasks);
return publishedMessageCount;
}
}
using Google.Cloud.PubSub.V1;
using Google.Protobuf;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
public class PublishProtoMessagesAsyncSample
{
public async Task<int> PublishProtoMessagesAsync(string projectId, string topicId, IEnumerable<Utilities.State> messageStates)
{
TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
PublisherClient publisher = await PublisherClient.CreateAsync(topicName);
PublisherServiceApiClient publishApi = PublisherServiceApiClient.Create();
var topic = publishApi.GetTopic(topicName);
int publishedMessageCount = 0;
var publishTasks = messageStates.Select(async state =>
{
try
{
string messageId = null;
switch (topic.SchemaSettings.Encoding)
{
case Encoding.Binary:
var binaryMessage = state.ToByteString();
messageId = await publisher.PublishAsync(binaryMessage);
break;
case Encoding.Json:
var jsonMessage = JsonFormatter.Default.Format(state);
messageId = await publisher.PublishAsync(jsonMessage);
break;
}
Console.WriteLine($"Published message {messageId}");
Interlocked.Increment(ref publishedMessageCount);
}
catch (Exception exception)
{
Console.WriteLine($"An error occurred when publishing message {state}: {exception.Message}");
}
});
await Task.WhenAll(publishTasks);
return publishedMessageCount;
}
}
Go
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Go 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Go API 参考文档。
Avroimport (
"context"
"fmt"
"io"
"os"
"cloud.google.com/go/pubsub"
"github.com/linkedin/goavro/v2"
)
func publishAvroRecords(w io.Writer, projectID, topicID, avscFile string) error {
// projectID := "my-project-id"
// topicID := "my-topic"
// avscFile = "path/to/an/avro/schema/file(.avsc)/formatted/in/json"
ctx := context.Background()
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %w", err)
}
avroSource, err := os.ReadFile(avscFile)
if err != nil {
return fmt.Errorf("os.ReadFile err: %w", err)
}
codec, err := goavro.NewCodec(string(avroSource))
if err != nil {
return fmt.Errorf("goavro.NewCodec err: %w", err)
}
record := map[string]interface{}{"name": "Alaska", "post_abbr": "AK"}
// Get the topic encoding type.
t := client.Topic(topicID)
cfg, err := t.Config(ctx)
if err != nil {
return fmt.Errorf("topic.Config err: %w", err)
}
encoding := cfg.SchemaSettings.Encoding
var msg []byte
switch encoding {
case pubsub.EncodingBinary:
msg, err = codec.BinaryFromNative(nil, record)
if err != nil {
return fmt.Errorf("codec.BinaryFromNative err: %w", err)
}
case pubsub.EncodingJSON:
msg, err = codec.TextualFromNative(nil, record)
if err != nil {
return fmt.Errorf("codec.TextualFromNative err: %w", err)
}
default:
return fmt.Errorf("invalid encoding: %v", encoding)
}
result := t.Publish(ctx, &pubsub.Message{
Data: msg,
})
_, err = result.Get(ctx)
if err != nil {
return fmt.Errorf("result.Get: %w", err)
}
fmt.Fprintf(w, "Published avro record: %s\n", string(msg))
return nil
}
import (
"context"
"fmt"
"io"
"cloud.google.com/go/pubsub"
statepb "github.com/GoogleCloudPlatform/golang-samples/internal/pubsub/schemas"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
)
func publishProtoMessages(w io.Writer, projectID, topicID string) error {
// projectID := "my-project-id"
// topicID := "my-topic"
ctx := context.Background()
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %w", err)
}
state := &statepb.State{
Name: "Alaska",
PostAbbr: "AK",
}
// Get the topic encoding type.
t := client.Topic(topicID)
cfg, err := t.Config(ctx)
if err != nil {
return fmt.Errorf("topic.Config err: %w", err)
}
encoding := cfg.SchemaSettings.Encoding
var msg []byte
switch encoding {
case pubsub.EncodingBinary:
msg, err = proto.Marshal(state)
if err != nil {
return fmt.Errorf("proto.Marshal err: %w", err)
}
case pubsub.EncodingJSON:
msg, err = protojson.Marshal(state)
if err != nil {
return fmt.Errorf("protojson.Marshal err: %w", err)
}
default:
return fmt.Errorf("invalid encoding: %v", encoding)
}
result := t.Publish(ctx, &pubsub.Message{
Data: msg,
})
_, err = result.Get(ctx)
if err != nil {
return fmt.Errorf("result.Get: %w", err)
}
fmt.Fprintf(w, "Published proto message with %#v encoding: %s\n", encoding, string(msg))
return nil
}
Java
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档。
Avro
import com.google.api.core.ApiFuture;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.Encoding;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import utilities.State;
public class PublishAvroRecordsExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
// Use a topic created with an Avro schema.
String topicId = "your-topic-id";
publishAvroRecordsExample(projectId, topicId);
}
public static void publishAvroRecordsExample(String projectId, String topicId)
throws IOException, ExecutionException, InterruptedException {
Encoding encoding = null;
TopicName topicName = TopicName.of(projectId, topicId);
// Get the topic encoding type.
try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
encoding = topicAdminClient.getTopic(topicName).getSchemaSettings().getEncoding();
}
// Instantiate an avro-tools-generated class defined in `us-states.avsc`.
State state = State.newBuilder().setName("Alaska").setPostAbbr("AK").build();
Publisher publisher = null;
block:
try {
publisher = Publisher.newBuilder(topicName).build();
// Prepare to serialize the object to the output stream.
ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
Encoder encoder = null;
// Prepare an appropriate encoder for publishing to the topic.
switch (encoding) {
case BINARY:
System.out.println("Preparing a BINARY encoder...");
encoder = EncoderFactory.get().directBinaryEncoder(byteStream, /*reuse=*/ null);
break;
case JSON:
System.out.println("Preparing a JSON encoder...");
encoder = EncoderFactory.get().jsonEncoder(State.getClassSchema(), byteStream);
break;
default:
break block;
}
// Encode the object and write it to the output stream.
state.customEncode(encoder);
encoder.flush();
// Publish the encoded object as a Pub/Sub message.
ByteString data = ByteString.copyFrom(byteStream.toByteArray());
PubsubMessage message = PubsubMessage.newBuilder().setData(data).build();
System.out.println("Publishing message: " + message);
ApiFuture<String> future = publisher.publish(message);
System.out.println("Published message ID: " + future.get());
} finally {
if (publisher != null) {
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
}
}
}
}
import com.google.api.core.ApiFuture;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.protobuf.ByteString;
import com.google.protobuf.util.JsonFormat;
import com.google.pubsub.v1.Encoding;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import utilities.StateProto.State;
public class PublishProtobufMessagesExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
// Use a topic created with a proto schema.
String topicId = "your-topic-id";
publishProtobufMessagesExample(projectId, topicId);
}
public static void publishProtobufMessagesExample(String projectId, String topicId)
throws IOException, ExecutionException, InterruptedException {
Encoding encoding = null;
TopicName topicName = TopicName.of(projectId, topicId);
// Get the topic encoding type.
try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
encoding = topicAdminClient.getTopic(topicName).getSchemaSettings().getEncoding();
}
Publisher publisher = null;
// Instantiate a protoc-generated class defined in `us-states.proto`.
State state = State.newBuilder().setName("Alaska").setPostAbbr("AK").build();
block:
try {
publisher = Publisher.newBuilder(topicName).build();
PubsubMessage.Builder message = PubsubMessage.newBuilder();
// Prepare an appropriately formatted message based on topic encoding.
switch (encoding) {
case BINARY:
message.setData(state.toByteString());
System.out.println("Publishing a BINARY-formatted message:\n" + message);
break;
case JSON:
String jsonString = JsonFormat.printer().omittingInsignificantWhitespace().print(state);
message.setData(ByteString.copyFromUtf8(jsonString));
System.out.println("Publishing a JSON-formatted message:\n" + message);
break;
default:
break block;
}
// Publish the message.
ApiFuture<String> future = publisher.publish(message.build());
System.out.println("Published message ID: " + future.get());
} finally {
if (publisher != null) {
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
}
}
}
}
Node.js
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档。
Avro/**
* TODO(developer): Uncomment this variable before running the sample.
*/
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// Imports the Google Cloud client library
const {PubSub, Encodings} = require('@google-cloud/pubsub');
// And the Apache Avro library
const avro = require('avro-js');
const fs = require('fs');
// Creates a client; cache this for further use
const pubSubClient = new PubSub();
async function publishAvroRecords(topicNameOrId) {
// Cache topic objects (publishers) and reuse them.
const topic = pubSubClient.topic(topicNameOrId);
// Get the topic metadata to learn about its schema encoding.
const [topicMetadata] = await topic.getMetadata();
const topicSchemaMetadata = topicMetadata.schemaSettings;
if (!topicSchemaMetadata) {
console.log(`Topic ${topicNameOrId} doesn't seem to have a schema.`);
return;
}
const schemaEncoding = topicSchemaMetadata.encoding;
// Make an encoder using the official avro-js library.
const definition = fs
.readFileSync('system-test/fixtures/provinces.avsc')
.toString();
const type = avro.parse(definition);
// Encode the message.
const province = {
name: 'Ontario',
post_abbr: 'ON',
};
let dataBuffer;
switch (schemaEncoding) {
case Encodings.Binary:
dataBuffer = type.toBuffer(province);
break;
case Encodings.Json:
dataBuffer = Buffer.from(type.toString(province));
break;
default:
console.log(`Unknown schema encoding: ${schemaEncoding}`);
break;
}
if (!dataBuffer) {
console.log(`Invalid encoding ${schemaEncoding} on the topic.`);
return;
}
const messageId = await topic.publish(dataBuffer);
console.log(`Avro record ${messageId} published.`);
}
/**
* TODO(developer): Uncomment this variable before running the sample.
*/
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// Imports the Google Cloud client library
const {PubSub, Encodings} = require('@google-cloud/pubsub');
// And the protobufjs library
const protobuf = require('protobufjs');
// Creates a client; cache this for further use
const pubSubClient = new PubSub();
async function publishProtobufMessages(topicNameOrId) {
// Cache topic objects (publishers) and reuse them.
const topic = pubSubClient.topic(topicNameOrId);
// Get the topic metadata to learn about its schema.
const [topicMetadata] = await topic.getMetadata();
const topicSchemaMetadata = topicMetadata.schemaSettings;
if (!topicSchemaMetadata) {
console.log(`Topic ${topicNameOrId} doesn't seem to have a schema.`);
return;
}
const schemaEncoding = topicSchemaMetadata.encoding;
// Encode the message.
const province = {
name: 'Ontario',
postAbbr: 'ON',
};
// Make an encoder using the protobufjs library.
//
// Since we're providing the test message for a specific schema here, we'll
// also code in the path to a sample proto definition.
const root = await protobuf.load('system-test/fixtures/provinces.proto');
const Province = root.lookupType('utilities.Province');
const message = Province.create(province);
let dataBuffer;
switch (schemaEncoding) {
case Encodings.Binary:
dataBuffer = Buffer.from(Province.encode(message).finish());
break;
case Encodings.Json:
dataBuffer = Buffer.from(JSON.stringify(message.toJSON()));
break;
default:
console.log(`Unknown schema encoding: ${schemaEncoding}`);
break;
}
if (!dataBuffer) {
console.log(`Invalid encoding ${schemaEncoding} on the topic.`);
return;
}
const messageId = await topic.publishMessage({data: dataBuffer});
console.log(`Protobuf message ${messageId} published.`);
}
Node.js
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档。
Avro/**
* TODO(developer): Uncomment this variable before running the sample.
*/
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// Imports the Google Cloud client library
import {PubSub, Encodings} from '@google-cloud/pubsub';
// And the Apache Avro library
import * as avro from 'avro-js';
import * as fs from 'fs';
// Creates a client; cache this for further use
const pubSubClient = new PubSub();
interface ProvinceObject {
name: string;
post_abbr: string;
}
async function publishAvroRecords(topicNameOrId: string) {
// Cache topic objects (publishers) and reuse them.
const topic = pubSubClient.topic(topicNameOrId);
// Get the topic metadata to learn about its schema encoding.
const [topicMetadata] = await topic.getMetadata();
const topicSchemaMetadata = topicMetadata.schemaSettings;
if (!topicSchemaMetadata) {
console.log(`Topic ${topicNameOrId} doesn't seem to have a schema.`);
return;
}
const schemaEncoding = topicSchemaMetadata.encoding;
// Make an encoder using the official avro-js library.
const definition = fs
.readFileSync('system-test/fixtures/provinces.avsc')
.toString();
const type = avro.parse(definition);
// Encode the message.
const province: ProvinceObject = {
name: 'Ontario',
post_abbr: 'ON',
};
let dataBuffer: Buffer | undefined;
switch (schemaEncoding) {
case Encodings.Binary:
dataBuffer = type.toBuffer(province);
break;
case Encodings.Json:
dataBuffer = Buffer.from(type.toString(province));
break;
default:
console.log(`Unknown schema encoding: ${schemaEncoding}`);
break;
}
if (!dataBuffer) {
console.log(`Invalid encoding ${schemaEncoding} on the topic.`);
return;
}
const messageId = await topic.publish(dataBuffer);
console.log(`Avro record ${messageId} published.`);
}
/**
* TODO(developer): Uncomment this variable before running the sample.
*/
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// Imports the Google Cloud client library
import {PubSub, Encodings} from '@google-cloud/pubsub';
// And the protobufjs library
import * as protobuf from 'protobufjs';
// Creates a client; cache this for further use
const pubSubClient = new PubSub();
interface ProvinceObject {
name: string;
postAbbr: string;
}
async function publishProtobufMessages(topicNameOrId: string) {
// Cache topic objects (publishers) and reuse them.
const topic = pubSubClient.topic(topicNameOrId);
// Get the topic metadata to learn about its schema.
const [topicMetadata] = await topic.getMetadata();
const topicSchemaMetadata = topicMetadata.schemaSettings;
if (!topicSchemaMetadata) {
console.log(`Topic ${topicNameOrId} doesn't seem to have a schema.`);
return;
}
const schemaEncoding = topicSchemaMetadata.encoding;
// Encode the message.
const province: ProvinceObject = {
name: 'Ontario',
postAbbr: 'ON',
};
// Make an encoder using the protobufjs library.
//
// Since we're providing the test message for a specific schema here, we'll
// also code in the path to a sample proto definition.
const root = await protobuf.load('system-test/fixtures/provinces.proto');
const Province = root.lookupType('utilities.Province');
const message = Province.create(province);
let dataBuffer: Buffer | undefined;
switch (schemaEncoding) {
case Encodings.Binary:
dataBuffer = Buffer.from(Province.encode(message).finish());
break;
case Encodings.Json:
dataBuffer = Buffer.from(JSON.stringify(message.toJSON()));
break;
default:
console.log(`Unknown schema encoding: ${schemaEncoding}`);
break;
}
if (!dataBuffer) {
console.log(`Invalid encoding ${schemaEncoding} on the topic.`);
return;
}
const messageId = await topic.publishMessage({data: dataBuffer});
console.log(`Protobuf message ${messageId} published.`);
}
PHP
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 PHP 设置说明进行操作。如需了解详情,请参阅 Pub/Sub PHP API 参考文档。
Avrouse Google\Cloud\PubSub\PubSubClient;
use Google\Cloud\PubSub\V1\Encoding;
use AvroStringIO;
use AvroSchema;
use AvroIODatumWriter;
use AvroIOBinaryEncoder;
/**
* Publish a message using an AVRO schema.
*
* This sample uses `wikimedia/avro` for AVRO encoding.
*
* @param string $projectId
* @param string $topicId
* @param string $definitionFile
*/
function publish_avro_records($projectId, $topicId, $definitionFile)
{
$pubsub = new PubSubClient([
'projectId' => $projectId,
]);
$definition = (string) file_get_contents($definitionFile);
$messageData = [
'name' => 'Alaska',
'post_abbr' => 'AK',
];
$topic = $pubsub->topic($topicId);
// get the encoding type.
$topicInfo = $topic->info();
$encoding = '';
if (isset($topicInfo['schemaSettings']['encoding'])) {
$encoding = $topicInfo['schemaSettings']['encoding'];
}
// if encoding is not set, we can't continue.
if ($encoding === '') {
printf('Topic %s does not have schema enabled', $topicId);
return;
}
// If you are using gRPC, encoding may be an integer corresponding to an
// enum value on Google\Cloud\PubSub\V1\Encoding.
if (!is_string($encoding)) {
$encoding = Encoding::name($encoding);
}
$encodedMessageData = '';
if ($encoding == 'BINARY') {
// encode as AVRO binary.
$io = new AvroStringIO();
$schema = AvroSchema::parse($definition);
$writer = new AvroIODatumWriter($schema);
$encoder = new AvroIOBinaryEncoder($io);
$writer->write($messageData, $encoder);
$encodedMessageData = $io->string();
} else {
// encode as JSON.
$encodedMessageData = json_encode($messageData);
}
$topic->publish(['data' => $encodedMessageData]);
printf('Published message with %s encoding', $encoding);
}
use Google\Cloud\PubSub\PubSubClient;
use Google\Cloud\PubSub\V1\Encoding;
use Utilities\StateProto;
/**
* Publish a message using a protocol buffer schema.
*
* Relies on a proto message of the following form:
* ```
* syntax = "proto3";
*
* package utilities;
*
* message StateProto {
* string name = 1;
* string post_abbr = 2;
* }
* ```
*
* @param string $projectId
* @param string $topicId
* @return void
*/
function publish_proto_messages($projectId, $topicId)
{
$pubsub = new PubSubClient([
'projectId' => $projectId,
]);
$messageData = new StateProto([
'name' => 'Alaska',
'post_abbr' => 'AK',
]);
$topic = $pubsub->topic($topicId);
// get the encoding type.
$topicInfo = $topic->info();
$encoding = '';
if (isset($topicInfo['schemaSettings']['encoding'])) {
$encoding = $topicInfo['schemaSettings']['encoding'];
}
// if encoding is not set, we can't continue.
if ($encoding === '') {
printf('Topic %s does not have schema enabled', $topicId);
return;
}
// If you are using gRPC, encoding may be an integer corresponding to an
// enum value on Google\Cloud\PubSub\V1\Encoding.
if (!is_string($encoding)) {
$encoding = Encoding::name($encoding);
}
$encodedMessageData = '';
if ($encoding == 'BINARY') {
// encode as protobuf binary.
$encodedMessageData = $messageData->serializeToString();
} else {
// encode as JSON.
$encodedMessageData = $messageData->serializeToJsonString();
}
$topic->publish(['data' => $encodedMessageData]);
printf('Published message with %s encoding', $encoding);
}
Python
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档。
Avrofrom avro.io import BinaryEncoder, DatumWriter
import avro.schema as schema
import io
import json
from google.api_core.exceptions import NotFound
from google.cloud.pubsub import PublisherClient
from google.pubsub_v1.types import Encoding
# TODO(developer): Replace these variables before running the sample.
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# avsc_file = "path/to/an/avro/schema/file/(.avsc)/formatted/in/json"
publisher_client = PublisherClient()
topic_path = publisher_client.topic_path(project_id, topic_id)
# Prepare to write Avro records to the binary output stream.
with open(avsc_file, "rb") as file:
avro_schema = schema.parse(file.read())
writer = DatumWriter(avro_schema)
bout = io.BytesIO()
# Prepare some data using a Python dictionary that matches the Avro schema
record = {"name": "Alaska", "post_abbr": "AK"}
try:
# Get the topic encoding type.
topic = publisher_client.get_topic(request={"topic": topic_path})
encoding = topic.schema_settings.encoding
# Encode the data according to the message serialization type.
if encoding == Encoding.BINARY:
encoder = BinaryEncoder(bout)
writer.write(record, encoder)
data = bout.getvalue()
print(f"Preparing a binary-encoded message:\n{data.decode()}")
elif encoding == Encoding.JSON:
data_str = json.dumps(record)
print(f"Preparing a JSON-encoded message:\n{data_str}")
data = data_str.encode("utf-8")
else:
print(f"No encoding specified in {topic_path}. Abort.")
exit(0)
future = publisher_client.publish(topic_path, data)
print(f"Published message ID: {future.result()}")
except NotFound:
print(f"{topic_id} not found.")
from google.api_core.exceptions import NotFound
from google.cloud.pubsub import PublisherClient
from google.protobuf.json_format import MessageToJson
from google.pubsub_v1.types import Encoding
from utilities import us_states_pb2 # type: ignore
# TODO(developer): Replace these variables before running the sample.
# project_id = "your-project-id"
# topic_id = "your-topic-id"
publisher_client = PublisherClient()
topic_path = publisher_client.topic_path(project_id, topic_id)
try:
# Get the topic encoding type.
topic = publisher_client.get_topic(request={"topic": topic_path})
encoding = topic.schema_settings.encoding
# Instantiate a protoc-generated class defined in `us-states.proto`.
state = us_states_pb2.StateProto()
state.name = "Alaska"
state.post_abbr = "AK"
# Encode the data according to the message serialization type.
if encoding == Encoding.BINARY:
data = state.SerializeToString()
print(f"Preparing a binary-encoded message:\n{data}")
elif encoding == Encoding.JSON:
json_object = MessageToJson(state)
data = str(json_object).encode("utf-8")
print(f"Preparing a JSON-encoded message:\n{data}")
else:
print(f"No encoding specified in {topic_path}. Abort.")
exit(0)
future = publisher_client.publish(topic_path, data)
print(f"Published message ID: {future.result()}")
except NotFound:
print(f"{topic_id} not found.")
Ruby
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Ruby 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Ruby API 参考文档。
Avro# topic_id = "your-topic-id"
# avsc_file = "path/to/an/avro/schema/file/(.avsc)/formatted/in/json"
pubsub = Google::Cloud::Pubsub.new
topic = pubsub.topic topic_id
record = { "name" => "Alaska", "post_abbr" => "AK" }
if topic.message_encoding_binary?
require "avro"
avro_schema = Avro::Schema.parse File.read(avsc_file)
writer = Avro::IO::DatumWriter.new avro_schema
buffer = StringIO.new
encoder = Avro::IO::BinaryEncoder.new buffer
writer.write record, encoder
topic.publish buffer
puts "Published binary-encoded AVRO message."
elsif topic.message_encoding_json?
require "json"
topic.publish record.to_json
puts "Published JSON-encoded AVRO message."
else
raise "No encoding specified in #{topic.name}."
end
# topic_id = "your-topic-id"
pubsub = Google::Cloud::Pubsub.new
topic = pubsub.topic topic_id
state = Utilities::StateProto.new name: "Alaska", post_abbr: "AK"
if topic.message_encoding_binary?
topic.publish Utilities::StateProto.encode(state)
puts "Published binary-encoded protobuf message."
elsif topic.message_encoding_json?
topic.publish Utilities::StateProto.encode_json(state)
puts "Published JSON-encoded protobuf message."
else
raise "No encoding specified in #{topic.name}."
end
后续步骤
如需限制 Pub/Sub 存储消息数据的位置,请参阅限制 Pub/Sub 资源位置。
如需详细了解如何接收消息,请参阅选择订阅类型。