本文档介绍了如何向具有架构的主题发布消息。
准备工作
在配置发布工作流程之前,请确保您已完成以下操作 任务:
- 了解 Pub/Sub 架构的运作方式。
- 将架构与主题相关联。
所需的角色
如需获取向主题发布消息所需的权限,
请让管理员授予您
该主题的 Pub/Sub Publisher (roles/pubsub.publisher
) IAM 角色。
如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限。
发布具有架构的消息
您可以将消息发布到与架构关联的主题。
您必须使用自己指定的架构和格式对消息进行编码
是在您创建主题时创建的。如果邮件未显示
匹配与主题关联的架构,Pub/Sub 会返回
INVALID_ARGUMENT
错误。
gcloud
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
使用 gcloud pubsub topics publish 命令发布。
gcloud pubsub topics publish TOPIC_ID \ --message=MESSAGE
替换以下内容:
TOPIC_ID:您已创建的主题的名称。
MESSAGE:发布到主题的消息。示例消息 可以为
{"name": "Alaska", "post_abbr": "AK"}
。
C++
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C++ 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C++ API 参考文档。
Avronamespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::StatusOr;
[](pubsub::Publisher publisher) {
auto constexpr kNewYork =
R"js({ "name": "New York", "post_abbr": "NY" })js";
auto constexpr kPennsylvania =
R"js({ "name": "Pennsylvania", "post_abbr": "PA" })js";
std::vector<future<void>> done;
auto handler = [](future<StatusOr<std::string>> f) {
auto id = f.get();
if (!id) throw std::move(id).status();
};
for (auto const* data : {kNewYork, kPennsylvania}) {
done.push_back(
publisher.Publish(pubsub::MessageBuilder{}.SetData(data).Build())
.then(handler));
}
// Block until all messages are published.
for (auto& d : done) d.get();
}
namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::StatusOr;
[](pubsub::Publisher publisher) {
std::vector<std::pair<std::string, std::string>> states{
{"New York", "NY"},
{"Pennsylvania", "PA"},
};
std::vector<future<void>> done;
auto handler = [](future<StatusOr<std::string>> f) {
auto id = f.get();
if (!id) throw std::move(id).status();
};
for (auto& data : states) {
google::cloud::pubsub::samples::State state;
state.set_name(data.first);
state.set_post_abbr(data.second);
done.push_back(publisher
.Publish(pubsub::MessageBuilder{}
.SetData(state.SerializeAsString())
.Build())
.then(handler));
}
// Block until all messages are published.
for (auto& d : done) d.get();
}
C#
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C# 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub C# API 参考文档。
Avro
using Avro.IO;
using Avro.Specific;
using Google.Cloud.PubSub.V1;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
public class PublishAvroMessagesAsyncSample
{
public async Task<int> PublishAvroMessagesAsync(string projectId, string topicId, IEnumerable<AvroUtilities.State> messageStates)
{
TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
PublisherClient publisher = await PublisherClient.CreateAsync(topicName);
PublisherServiceApiClient publishApi = PublisherServiceApiClient.Create();
var topic = publishApi.GetTopic(topicName);
int publishedMessageCount = 0;
var publishTasks = messageStates.Select(async state =>
{
try
{
string messageId = null;
switch (topic.SchemaSettings.Encoding)
{
case Encoding.Binary:
using (var ms = new MemoryStream())
{
var encoder = new BinaryEncoder(ms);
var writer = new SpecificDefaultWriter(state.Schema);
writer.Write(state, encoder);
messageId = await publisher.PublishAsync(ms.ToArray());
}
break;
case Encoding.Json:
var jsonMessage = AvroUtilities.StateUtils.StateToJsonString(state);
messageId = await publisher.PublishAsync(jsonMessage);
break;
}
Console.WriteLine($"Published message {messageId}");
Interlocked.Increment(ref publishedMessageCount);
}
catch (Exception exception)
{
Console.WriteLine($"An error occurred when publishing message {state}: {exception.Message}");
}
});
await Task.WhenAll(publishTasks);
return publishedMessageCount;
}
}
using Google.Cloud.PubSub.V1;
using Google.Protobuf;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
public class PublishProtoMessagesAsyncSample
{
public async Task<int> PublishProtoMessagesAsync(string projectId, string topicId, IEnumerable<Utilities.State> messageStates)
{
TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
PublisherClient publisher = await PublisherClient.CreateAsync(topicName);
PublisherServiceApiClient publishApi = PublisherServiceApiClient.Create();
var topic = publishApi.GetTopic(topicName);
int publishedMessageCount = 0;
var publishTasks = messageStates.Select(async state =>
{
try
{
string messageId = null;
switch (topic.SchemaSettings.Encoding)
{
case Encoding.Binary:
var binaryMessage = state.ToByteString();
messageId = await publisher.PublishAsync(binaryMessage);
break;
case Encoding.Json:
var jsonMessage = JsonFormatter.Default.Format(state);
messageId = await publisher.PublishAsync(jsonMessage);
break;
}
Console.WriteLine($"Published message {messageId}");
Interlocked.Increment(ref publishedMessageCount);
}
catch (Exception exception)
{
Console.WriteLine($"An error occurred when publishing message {state}: {exception.Message}");
}
});
await Task.WhenAll(publishTasks);
return publishedMessageCount;
}
}
Go
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Go 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Go API 参考文档。
Avroimport (
"context"
"fmt"
"io"
"os"
"cloud.google.com/go/pubsub"
"github.com/linkedin/goavro/v2"
)
func publishAvroRecords(w io.Writer, projectID, topicID, avscFile string) error {
// projectID := "my-project-id"
// topicID := "my-topic"
// avscFile = "path/to/an/avro/schema/file(.avsc)/formatted/in/json"
ctx := context.Background()
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %w", err)
}
avroSource, err := os.ReadFile(avscFile)
if err != nil {
return fmt.Errorf("ioutil.ReadFile err: %w", err)
}
codec, err := goavro.NewCodec(string(avroSource))
if err != nil {
return fmt.Errorf("goavro.NewCodec err: %w", err)
}
record := map[string]interface{}{"name": "Alaska", "post_abbr": "AK"}
// Get the topic encoding type.
t := client.Topic(topicID)
cfg, err := t.Config(ctx)
if err != nil {
return fmt.Errorf("topic.Config err: %w", err)
}
encoding := cfg.SchemaSettings.Encoding
var msg []byte
switch encoding {
case pubsub.EncodingBinary:
msg, err = codec.BinaryFromNative(nil, record)
if err != nil {
return fmt.Errorf("codec.BinaryFromNative err: %w", err)
}
case pubsub.EncodingJSON:
msg, err = codec.TextualFromNative(nil, record)
if err != nil {
return fmt.Errorf("codec.TextualFromNative err: %w", err)
}
default:
return fmt.Errorf("invalid encoding: %v", encoding)
}
result := t.Publish(ctx, &pubsub.Message{
Data: msg,
})
_, err = result.Get(ctx)
if err != nil {
return fmt.Errorf("result.Get: %w", err)
}
fmt.Fprintf(w, "Published avro record: %s\n", string(msg))
return nil
}
import (
"context"
"fmt"
"io"
"cloud.google.com/go/pubsub"
statepb "github.com/GoogleCloudPlatform/golang-samples/internal/pubsub/schemas"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
)
func publishProtoMessages(w io.Writer, projectID, topicID string) error {
// projectID := "my-project-id"
// topicID := "my-topic"
ctx := context.Background()
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %w", err)
}
state := &statepb.State{
Name: "Alaska",
PostAbbr: "AK",
}
// Get the topic encoding type.
t := client.Topic(topicID)
cfg, err := t.Config(ctx)
if err != nil {
return fmt.Errorf("topic.Config err: %w", err)
}
encoding := cfg.SchemaSettings.Encoding
var msg []byte
switch encoding {
case pubsub.EncodingBinary:
msg, err = proto.Marshal(state)
if err != nil {
return fmt.Errorf("proto.Marshal err: %w", err)
}
case pubsub.EncodingJSON:
msg, err = protojson.Marshal(state)
if err != nil {
return fmt.Errorf("protojson.Marshal err: %w", err)
}
default:
return fmt.Errorf("invalid encoding: %v", encoding)
}
result := t.Publish(ctx, &pubsub.Message{
Data: msg,
})
_, err = result.Get(ctx)
if err != nil {
return fmt.Errorf("result.Get: %w", err)
}
fmt.Fprintf(w, "Published proto message with %#v encoding: %s\n", encoding, string(msg))
return nil
}
Java
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档。
Avro
import com.google.api.core.ApiFuture;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.Encoding;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import utilities.State;
public class PublishAvroRecordsExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
// Use a topic created with an Avro schema.
String topicId = "your-topic-id";
publishAvroRecordsExample(projectId, topicId);
}
public static void publishAvroRecordsExample(String projectId, String topicId)
throws IOException, ExecutionException, InterruptedException {
Encoding encoding = null;
TopicName topicName = TopicName.of(projectId, topicId);
// Get the topic encoding type.
try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
encoding = topicAdminClient.getTopic(topicName).getSchemaSettings().getEncoding();
}
// Instantiate an avro-tools-generated class defined in `us-states.avsc`.
State state = State.newBuilder().setName("Alaska").setPostAbbr("AK").build();
Publisher publisher = null;
block:
try {
publisher = Publisher.newBuilder(topicName).build();
// Prepare to serialize the object to the output stream.
ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
Encoder encoder = null;
// Prepare an appropriate encoder for publishing to the topic.
switch (encoding) {
case BINARY:
System.out.println("Preparing a BINARY encoder...");
encoder = EncoderFactory.get().directBinaryEncoder(byteStream, /*reuse=*/ null);
break;
case JSON:
System.out.println("Preparing a JSON encoder...");
encoder = EncoderFactory.get().jsonEncoder(State.getClassSchema(), byteStream);
break;
default:
break block;
}
// Encode the object and write it to the output stream.
state.customEncode(encoder);
encoder.flush();
// Publish the encoded object as a Pub/Sub message.
ByteString data = ByteString.copyFrom(byteStream.toByteArray());
PubsubMessage message = PubsubMessage.newBuilder().setData(data).build();
System.out.println("Publishing message: " + message);
ApiFuture<String> future = publisher.publish(message);
System.out.println("Published message ID: " + future.get());
} finally {
if (publisher != null) {
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
}
}
}
}
import com.google.api.core.ApiFuture;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.protobuf.ByteString;
import com.google.protobuf.util.JsonFormat;
import com.google.pubsub.v1.Encoding;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import utilities.StateProto.State;
public class PublishProtobufMessagesExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
// Use a topic created with a proto schema.
String topicId = "your-topic-id";
publishProtobufMessagesExample(projectId, topicId);
}
public static void publishProtobufMessagesExample(String projectId, String topicId)
throws IOException, ExecutionException, InterruptedException {
Encoding encoding = null;
TopicName topicName = TopicName.of(projectId, topicId);
// Get the topic encoding type.
try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
encoding = topicAdminClient.getTopic(topicName).getSchemaSettings().getEncoding();
}
Publisher publisher = null;
// Instantiate a protoc-generated class defined in `us-states.proto`.
State state = State.newBuilder().setName("Alaska").setPostAbbr("AK").build();
block:
try {
publisher = Publisher.newBuilder(topicName).build();
PubsubMessage.Builder message = PubsubMessage.newBuilder();
// Prepare an appropriately formatted message based on topic encoding.
switch (encoding) {
case BINARY:
message.setData(state.toByteString());
System.out.println("Publishing a BINARY-formatted message:\n" + message);
break;
case JSON:
String jsonString = JsonFormat.printer().omittingInsignificantWhitespace().print(state);
message.setData(ByteString.copyFromUtf8(jsonString));
System.out.println("Publishing a JSON-formatted message:\n" + message);
break;
default:
break block;
}
// Publish the message.
ApiFuture<String> future = publisher.publish(message.build());
System.out.println("Published message ID: " + future.get());
} finally {
if (publisher != null) {
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
}
}
}
}
Node.js
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档。
Avro/**
* TODO(developer): Uncomment this variable before running the sample.
*/
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// Imports the Google Cloud client library
const {PubSub, Encodings} = require('@google-cloud/pubsub');
// And the Apache Avro library
const avro = require('avro-js');
const fs = require('fs');
// Creates a client; cache this for further use
const pubSubClient = new PubSub();
async function publishAvroRecords(topicNameOrId) {
// Get the topic metadata to learn about its schema encoding.
const topic = pubSubClient.topic(topicNameOrId);
const [topicMetadata] = await topic.getMetadata();
const topicSchemaMetadata = topicMetadata.schemaSettings;
if (!topicSchemaMetadata) {
console.log(`Topic ${topicNameOrId} doesn't seem to have a schema.`);
return;
}
const schemaEncoding = topicSchemaMetadata.encoding;
// Make an encoder using the official avro-js library.
const definition = fs
.readFileSync('system-test/fixtures/provinces.avsc')
.toString();
const type = avro.parse(definition);
// Encode the message.
const province = {
name: 'Ontario',
post_abbr: 'ON',
};
let dataBuffer;
switch (schemaEncoding) {
case Encodings.Binary:
dataBuffer = type.toBuffer(province);
break;
case Encodings.Json:
dataBuffer = Buffer.from(type.toString(province));
break;
default:
console.log(`Unknown schema encoding: ${schemaEncoding}`);
break;
}
if (!dataBuffer) {
console.log(`Invalid encoding ${schemaEncoding} on the topic.`);
return;
}
const messageId = await topic.publish(dataBuffer);
console.log(`Avro record ${messageId} published.`);
}
/**
* TODO(developer): Uncomment this variable before running the sample.
*/
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// Imports the Google Cloud client library
const {PubSub, Encodings} = require('@google-cloud/pubsub');
// And the protobufjs library
const protobuf = require('protobufjs');
// Creates a client; cache this for further use
const pubSubClient = new PubSub();
async function publishProtobufMessages(topicNameOrId) {
// Get the topic metadata to learn about its schema.
const topic = pubSubClient.topic(topicNameOrId);
const [topicMetadata] = await topic.getMetadata();
const topicSchemaMetadata = topicMetadata.schemaSettings;
if (!topicSchemaMetadata) {
console.log(`Topic ${topicNameOrId} doesn't seem to have a schema.`);
return;
}
const schemaEncoding = topicSchemaMetadata.encoding;
// Encode the message.
const province = {
name: 'Ontario',
postAbbr: 'ON',
};
// Make an encoder using the protobufjs library.
//
// Since we're providing the test message for a specific schema here, we'll
// also code in the path to a sample proto definition.
const root = await protobuf.load('system-test/fixtures/provinces.proto');
const Province = root.lookupType('utilities.Province');
const message = Province.create(province);
let dataBuffer;
switch (schemaEncoding) {
case Encodings.Binary:
dataBuffer = Buffer.from(Province.encode(message).finish());
break;
case Encodings.Json:
dataBuffer = Buffer.from(JSON.stringify(message.toJSON()));
break;
default:
console.log(`Unknown schema encoding: ${schemaEncoding}`);
break;
}
if (!dataBuffer) {
console.log(`Invalid encoding ${schemaEncoding} on the topic.`);
return;
}
const messageId = await topic.publish(dataBuffer);
console.log(`Protobuf message ${messageId} published.`);
}
Node.js
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档。
Avro/**
* TODO(developer): Uncomment this variable before running the sample.
*/
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// Imports the Google Cloud client library
import {PubSub, Encodings} from '@google-cloud/pubsub';
// And the Apache Avro library
import * as avro from 'avro-js';
import * as fs from 'fs';
// Creates a client; cache this for further use
const pubSubClient = new PubSub();
interface ProvinceObject {
name: string;
post_abbr: string;
}
async function publishAvroRecords(topicNameOrId: string) {
// Get the topic metadata to learn about its schema encoding.
const topic = pubSubClient.topic(topicNameOrId);
const [topicMetadata] = await topic.getMetadata();
const topicSchemaMetadata = topicMetadata.schemaSettings;
if (!topicSchemaMetadata) {
console.log(`Topic ${topicNameOrId} doesn't seem to have a schema.`);
return;
}
const schemaEncoding = topicSchemaMetadata.encoding;
// Make an encoder using the official avro-js library.
const definition = fs
.readFileSync('system-test/fixtures/provinces.avsc')
.toString();
const type = avro.parse(definition);
// Encode the message.
const province: ProvinceObject = {
name: 'Ontario',
post_abbr: 'ON',
};
let dataBuffer: Buffer | undefined;
switch (schemaEncoding) {
case Encodings.Binary:
dataBuffer = type.toBuffer(province);
break;
case Encodings.Json:
dataBuffer = Buffer.from(type.toString(province));
break;
default:
console.log(`Unknown schema encoding: ${schemaEncoding}`);
break;
}
if (!dataBuffer) {
console.log(`Invalid encoding ${schemaEncoding} on the topic.`);
return;
}
const messageId = await topic.publish(dataBuffer);
console.log(`Avro record ${messageId} published.`);
}
/**
* TODO(developer): Uncomment this variable before running the sample.
*/
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// Imports the Google Cloud client library
import {PubSub, Encodings} from '@google-cloud/pubsub';
// And the protobufjs library
import * as protobuf from 'protobufjs';
// Creates a client; cache this for further use
const pubSubClient = new PubSub();
interface ProvinceObject {
name: string;
postAbbr: string;
}
async function publishProtobufMessages(topicNameOrId: string) {
// Get the topic metadata to learn about its schema.
const topic = pubSubClient.topic(topicNameOrId);
const [topicMetadata] = await topic.getMetadata();
const topicSchemaMetadata = topicMetadata.schemaSettings;
if (!topicSchemaMetadata) {
console.log(`Topic ${topicNameOrId} doesn't seem to have a schema.`);
return;
}
const schemaEncoding = topicSchemaMetadata.encoding;
// Encode the message.
const province: ProvinceObject = {
name: 'Ontario',
postAbbr: 'ON',
};
// Make an encoder using the protobufjs library.
//
// Since we're providing the test message for a specific schema here, we'll
// also code in the path to a sample proto definition.
const root = await protobuf.load('system-test/fixtures/provinces.proto');
const Province = root.lookupType('utilities.Province');
const message = Province.create(province);
let dataBuffer: Buffer | undefined;
switch (schemaEncoding) {
case Encodings.Binary:
dataBuffer = Buffer.from(Province.encode(message).finish());
break;
case Encodings.Json:
dataBuffer = Buffer.from(JSON.stringify(message.toJSON()));
break;
default:
console.log(`Unknown schema encoding: ${schemaEncoding}`);
break;
}
if (!dataBuffer) {
console.log(`Invalid encoding ${schemaEncoding} on the topic.`);
return;
}
const messageId = await topic.publish(dataBuffer);
console.log(`Protobuf message ${messageId} published.`);
}
PHP
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 PHP 设置说明进行操作。如需了解详情,请参阅 Pub/Sub PHP API 参考文档。
Avrouse Google\Cloud\PubSub\PubSubClient;
use Google\Cloud\PubSub\V1\Encoding;
use AvroStringIO;
use AvroSchema;
use AvroIODatumWriter;
use AvroIOBinaryEncoder;
/**
* Publish a message using an AVRO schema.
*
* This sample uses `wikimedia/avro` for AVRO encoding.
*
* @param string $projectId
* @param string $topicId
* @param string $definitionFile
*/
function publish_avro_records($projectId, $topicId, $definitionFile)
{
$pubsub = new PubSubClient([
'projectId' => $projectId,
]);
$definition = (string) file_get_contents($definitionFile);
$messageData = [
'name' => 'Alaska',
'post_abbr' => 'AK',
];
$topic = $pubsub->topic($topicId);
// get the encoding type.
$topicInfo = $topic->info();
$encoding = '';
if (isset($topicInfo['schemaSettings']['encoding'])) {
$encoding = $topicInfo['schemaSettings']['encoding'];
}
// if encoding is not set, we can't continue.
if ($encoding === '') {
printf('Topic %s does not have schema enabled', $topicId);
return;
}
// If you are using gRPC, encoding may be an integer corresponding to an
// enum value on Google\Cloud\PubSub\V1\Encoding.
if (!is_string($encoding)) {
$encoding = Encoding::name($encoding);
}
$encodedMessageData = '';
if ($encoding == 'BINARY') {
// encode as AVRO binary.
$io = new AvroStringIO();
$schema = AvroSchema::parse($definition);
$writer = new AvroIODatumWriter($schema);
$encoder = new AvroIOBinaryEncoder($io);
$writer->write($messageData, $encoder);
$encodedMessageData = $io->string();
} else {
// encode as JSON.
$encodedMessageData = json_encode($messageData);
}
$topic->publish(['data' => $encodedMessageData]);
printf('Published message with %s encoding', $encoding);
}
use Google\Cloud\PubSub\PubSubClient;
use Google\Cloud\PubSub\V1\Encoding;
use Utilities\StateProto;
/**
* Publish a message using a protocol buffer schema.
*
* Relies on a proto message of the following form:
* ```
* syntax = "proto3";
*
* package utilities;
*
* message StateProto {
* string name = 1;
* string post_abbr = 2;
* }
* ```
*
* @param string $projectId
* @param string $topicId
* @return void
*/
function publish_proto_messages($projectId, $topicId)
{
$pubsub = new PubSubClient([
'projectId' => $projectId,
]);
$messageData = new StateProto([
'name' => 'Alaska',
'post_abbr' => 'AK',
]);
$topic = $pubsub->topic($topicId);
// get the encoding type.
$topicInfo = $topic->info();
$encoding = '';
if (isset($topicInfo['schemaSettings']['encoding'])) {
$encoding = $topicInfo['schemaSettings']['encoding'];
}
// if encoding is not set, we can't continue.
if ($encoding === '') {
printf('Topic %s does not have schema enabled', $topicId);
return;
}
// If you are using gRPC, encoding may be an integer corresponding to an
// enum value on Google\Cloud\PubSub\V1\Encoding.
if (!is_string($encoding)) {
$encoding = Encoding::name($encoding);
}
$encodedMessageData = '';
if ($encoding == 'BINARY') {
// encode as protobuf binary.
$encodedMessageData = $messageData->serializeToString();
} else {
// encode as JSON.
$encodedMessageData = $messageData->serializeToJsonString();
}
$topic->publish(['data' => $encodedMessageData]);
printf('Published message with %s encoding', $encoding);
}
Python
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档。
Avrofrom avro.io import BinaryEncoder, DatumWriter
import avro.schema as schema
import io
import json
from google.api_core.exceptions import NotFound
from google.cloud.pubsub import PublisherClient
from google.pubsub_v1.types import Encoding
# TODO(developer): Replace these variables before running the sample.
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# avsc_file = "path/to/an/avro/schema/file/(.avsc)/formatted/in/json"
publisher_client = PublisherClient()
topic_path = publisher_client.topic_path(project_id, topic_id)
# Prepare to write Avro records to the binary output stream.
with open(avsc_file, "rb") as file:
avro_schema = schema.parse(file.read())
writer = DatumWriter(avro_schema)
bout = io.BytesIO()
# Prepare some data using a Python dictionary that matches the Avro schema
record = {"name": "Alaska", "post_abbr": "AK"}
try:
# Get the topic encoding type.
topic = publisher_client.get_topic(request={"topic": topic_path})
encoding = topic.schema_settings.encoding
# Encode the data according to the message serialization type.
if encoding == Encoding.BINARY:
encoder = BinaryEncoder(bout)
writer.write(record, encoder)
data = bout.getvalue()
print(f"Preparing a binary-encoded message:\n{data.decode()}")
elif encoding == Encoding.JSON:
data_str = json.dumps(record)
print(f"Preparing a JSON-encoded message:\n{data_str}")
data = data_str.encode("utf-8")
else:
print(f"No encoding specified in {topic_path}. Abort.")
exit(0)
future = publisher_client.publish(topic_path, data)
print(f"Published message ID: {future.result()}")
except NotFound:
print(f"{topic_id} not found.")
from google.api_core.exceptions import NotFound
from google.cloud.pubsub import PublisherClient
from google.protobuf.json_format import MessageToJson
from google.pubsub_v1.types import Encoding
from utilities import us_states_pb2 # type: ignore
# TODO(developer): Replace these variables before running the sample.
# project_id = "your-project-id"
# topic_id = "your-topic-id"
publisher_client = PublisherClient()
topic_path = publisher_client.topic_path(project_id, topic_id)
try:
# Get the topic encoding type.
topic = publisher_client.get_topic(request={"topic": topic_path})
encoding = topic.schema_settings.encoding
# Instantiate a protoc-generated class defined in `us-states.proto`.
state = us_states_pb2.StateProto()
state.name = "Alaska"
state.post_abbr = "AK"
# Encode the data according to the message serialization type.
if encoding == Encoding.BINARY:
data = state.SerializeToString()
print(f"Preparing a binary-encoded message:\n{data}")
elif encoding == Encoding.JSON:
json_object = MessageToJson(state)
data = str(json_object).encode("utf-8")
print(f"Preparing a JSON-encoded message:\n{data}")
else:
print(f"No encoding specified in {topic_path}. Abort.")
exit(0)
future = publisher_client.publish(topic_path, data)
print(f"Published message ID: {future.result()}")
except NotFound:
print(f"{topic_id} not found.")
Ruby
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Ruby 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Ruby API 参考文档。
Avro# topic_id = "your-topic-id"
# avsc_file = "path/to/an/avro/schema/file/(.avsc)/formatted/in/json"
pubsub = Google::Cloud::Pubsub.new
topic = pubsub.topic topic_id
record = { "name" => "Alaska", "post_abbr" => "AK" }
if topic.message_encoding_binary?
require "avro"
avro_schema = Avro::Schema.parse File.read(avsc_file)
writer = Avro::IO::DatumWriter.new avro_schema
buffer = StringIO.new
encoder = Avro::IO::BinaryEncoder.new buffer
writer.write record, encoder
topic.publish buffer
puts "Published binary-encoded AVRO message."
elsif topic.message_encoding_json?
require "json"
topic.publish record.to_json
puts "Published JSON-encoded AVRO message."
else
raise "No encoding specified in #{topic.name}."
end
# topic_id = "your-topic-id"
pubsub = Google::Cloud::Pubsub.new
topic = pubsub.topic topic_id
state = Utilities::StateProto.new name: "Alaska", post_abbr: "AK"
if topic.message_encoding_binary?
topic.publish Utilities::StateProto.encode(state)
puts "Published binary-encoded protobuf message."
elsif topic.message_encoding_json?
topic.publish Utilities::StateProto.encode_json(state)
puts "Published JSON-encoded protobuf message."
else
raise "No encoding specified in #{topic.name}."
end
后续步骤
如需限制 Pub/Sub 存储消息数据的位置,请参阅 限制 Pub/Sub 资源位置。
如需详细了解如何接收消息,请参阅 选择订阅类型。