이 문서에서는 스키마를 사용하여 주제에 메시지를 게시하는 방법을 보여줍니다.
시작하기 전에
게시 워크플로를 구성하기 전에 다음 작업이 완료되어야 합니다.
필요한 역할
주제에 메시지를 게시하는 데 필요한 권한을 얻으려면 관리자에게 주제에 대한 Pub/Sub 게시자(roles/pubsub.publisher
) IAM 역할을 부여해 달라고 요청하세요.
역할 부여에 대한 자세한 내용은 프로젝트, 폴더, 조직에 대한 액세스 관리를 참조하세요.
커스텀 역할이나 다른 사전 정의된 역할을 통해 필요한 권한을 얻을 수도 있습니다.
주제 및 구독을 만들거나 업데이트하려면 추가 권한이 필요합니다.스키마를 사용하여 메시지 게시
메시지를 스키마와 연결된 주제에 게시할 수 있습니다. 주제를 만들 때 지정한 스키마와 형식으로 메시지를 인코딩해야 합니다. 메시지가 허용되는 버전 범위 내의 스키마 버전과 일치하면 주제와 연결된 스키마와 일치하는 것입니다. 메시지는 일치하는 항목이 발견되거나 허용된 가장 오래된 버전에 도달할 때까지 허용된 가장 최근 버전부터 순서대로 버전과 비교하여 평가됩니다. Pub/Sub는 스키마와 연결된 주제에 성공적으로 게시된 메시지에 다음 속성을 추가합니다.
googclient_schemaname
: 유효성 검사에 사용되는 스키마의 이름입니다.googclient_schemaencoding
: 메시지 인코딩이며, JSON 또는 BINARY입니다.googclient_schemarevisionid
: 메시지 파싱 및 유효성 검사에 사용되는 스키마의 버전 ID입니다. 각 버전에는 고유한 버전 ID가 연결되어 있습니다. 버전 ID는 자동으로 생성되는 8자로 된 UUID입니다.
메시지가 주제에서 허용하는 스키마 버전과 일치하지 않는 경우 Pub/Sub는 게시 요청에 INVALID_ARGUMENT
오류를 반환합니다.
Pub/Sub는 게시 시점에만 스키마 버전에 대해 메시지를 평가합니다. 메시지를 게시한 후 새 스키마 버전을 커밋하거나 주제와 연결된 스키마를 변경해도 해당 메시지가 재평가되거나 연결된 스키마 메시지 속성이 변경되지 않습니다.
Google Cloud 콘솔, gcloud CLI, Pub/Sub API 또는 Cloud 클라이언트 라이브러리를 사용하여 Google Cloud 프로젝트에서 연결된 스키마가 있는 주제에 메시지를 게시할 수 있습니다.
gcloud
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
gcloud pubsub topics publish 명령어를 사용하여 샘플 메시지를 게시합니다.
gcloud pubsub topics publish TOPIC_ID \ --message=MESSAGE
다음을 바꿉니다.
TOPIC_ID: 이미 만든 주제의 이름입니다.
MESSAGE: 주제에 게시된 메시지입니다. 샘플 메시지는
{"name": "Alaska", "post_abbr": "AK"}
일 수 있습니다.
C++
이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 C++ 설정 안내를 따르세요. 자세한 내용은 Pub/Sub C++ API 참조 문서를 확인하세요.
Avronamespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::StatusOr;
[](pubsub::Publisher publisher) {
auto constexpr kNewYork =
R"js({ "name": "New York", "post_abbr": "NY" })js";
auto constexpr kPennsylvania =
R"js({ "name": "Pennsylvania", "post_abbr": "PA" })js";
std::vector<future<void>> done;
auto handler = [](future<StatusOr<std::string>> f) {
auto id = f.get();
if (!id) throw std::move(id).status();
};
for (auto const* data : {kNewYork, kPennsylvania}) {
done.push_back(
publisher.Publish(pubsub::MessageBuilder{}.SetData(data).Build())
.then(handler));
}
// Block until all messages are published.
for (auto& d : done) d.get();
}
namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::StatusOr;
[](pubsub::Publisher publisher) {
std::vector<std::pair<std::string, std::string>> states{
{"New York", "NY"},
{"Pennsylvania", "PA"},
};
std::vector<future<void>> done;
auto handler = [](future<StatusOr<std::string>> f) {
auto id = f.get();
if (!id) throw std::move(id).status();
};
for (auto& data : states) {
google::cloud::pubsub::samples::State state;
state.set_name(data.first);
state.set_post_abbr(data.second);
done.push_back(publisher
.Publish(pubsub::MessageBuilder{}
.SetData(state.SerializeAsString())
.Build())
.then(handler));
}
// Block until all messages are published.
for (auto& d : done) d.get();
}
C#
이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 C# 설정 안내를 따르세요. 자세한 내용은 Pub/Sub C# API 참조 문서를 확인하세요.
Avro
using Avro.IO;
using Avro.Specific;
using Google.Cloud.PubSub.V1;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
public class PublishAvroMessagesAsyncSample
{
public async Task<int> PublishAvroMessagesAsync(string projectId, string topicId, IEnumerable<AvroUtilities.State> messageStates)
{
TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
PublisherClient publisher = await PublisherClient.CreateAsync(topicName);
PublisherServiceApiClient publishApi = PublisherServiceApiClient.Create();
var topic = publishApi.GetTopic(topicName);
int publishedMessageCount = 0;
var publishTasks = messageStates.Select(async state =>
{
try
{
string messageId = null;
switch (topic.SchemaSettings.Encoding)
{
case Encoding.Binary:
using (var ms = new MemoryStream())
{
var encoder = new BinaryEncoder(ms);
var writer = new SpecificDefaultWriter(state.Schema);
writer.Write(state, encoder);
messageId = await publisher.PublishAsync(ms.ToArray());
}
break;
case Encoding.Json:
var jsonMessage = AvroUtilities.StateUtils.StateToJsonString(state);
messageId = await publisher.PublishAsync(jsonMessage);
break;
}
Console.WriteLine($"Published message {messageId}");
Interlocked.Increment(ref publishedMessageCount);
}
catch (Exception exception)
{
Console.WriteLine($"An error occurred when publishing message {state}: {exception.Message}");
}
});
await Task.WhenAll(publishTasks);
return publishedMessageCount;
}
}
using Google.Cloud.PubSub.V1;
using Google.Protobuf;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
public class PublishProtoMessagesAsyncSample
{
public async Task<int> PublishProtoMessagesAsync(string projectId, string topicId, IEnumerable<Utilities.State> messageStates)
{
TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
PublisherClient publisher = await PublisherClient.CreateAsync(topicName);
PublisherServiceApiClient publishApi = PublisherServiceApiClient.Create();
var topic = publishApi.GetTopic(topicName);
int publishedMessageCount = 0;
var publishTasks = messageStates.Select(async state =>
{
try
{
string messageId = null;
switch (topic.SchemaSettings.Encoding)
{
case Encoding.Binary:
var binaryMessage = state.ToByteString();
messageId = await publisher.PublishAsync(binaryMessage);
break;
case Encoding.Json:
var jsonMessage = JsonFormatter.Default.Format(state);
messageId = await publisher.PublishAsync(jsonMessage);
break;
}
Console.WriteLine($"Published message {messageId}");
Interlocked.Increment(ref publishedMessageCount);
}
catch (Exception exception)
{
Console.WriteLine($"An error occurred when publishing message {state}: {exception.Message}");
}
});
await Task.WhenAll(publishTasks);
return publishedMessageCount;
}
}
Go
이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Go 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Go API 참조 문서를 참조하세요.
Avroimport (
"context"
"fmt"
"io"
"os"
"cloud.google.com/go/pubsub"
"github.com/linkedin/goavro/v2"
)
func publishAvroRecords(w io.Writer, projectID, topicID, avscFile string) error {
// projectID := "my-project-id"
// topicID := "my-topic"
// avscFile = "path/to/an/avro/schema/file(.avsc)/formatted/in/json"
ctx := context.Background()
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %w", err)
}
avroSource, err := os.ReadFile(avscFile)
if err != nil {
return fmt.Errorf("ioutil.ReadFile err: %w", err)
}
codec, err := goavro.NewCodec(string(avroSource))
if err != nil {
return fmt.Errorf("goavro.NewCodec err: %w", err)
}
record := map[string]interface{}{"name": "Alaska", "post_abbr": "AK"}
// Get the topic encoding type.
t := client.Topic(topicID)
cfg, err := t.Config(ctx)
if err != nil {
return fmt.Errorf("topic.Config err: %w", err)
}
encoding := cfg.SchemaSettings.Encoding
var msg []byte
switch encoding {
case pubsub.EncodingBinary:
msg, err = codec.BinaryFromNative(nil, record)
if err != nil {
return fmt.Errorf("codec.BinaryFromNative err: %w", err)
}
case pubsub.EncodingJSON:
msg, err = codec.TextualFromNative(nil, record)
if err != nil {
return fmt.Errorf("codec.TextualFromNative err: %w", err)
}
default:
return fmt.Errorf("invalid encoding: %v", encoding)
}
result := t.Publish(ctx, &pubsub.Message{
Data: msg,
})
_, err = result.Get(ctx)
if err != nil {
return fmt.Errorf("result.Get: %w", err)
}
fmt.Fprintf(w, "Published avro record: %s\n", string(msg))
return nil
}
import (
"context"
"fmt"
"io"
"cloud.google.com/go/pubsub"
statepb "github.com/GoogleCloudPlatform/golang-samples/internal/pubsub/schemas"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
)
func publishProtoMessages(w io.Writer, projectID, topicID string) error {
// projectID := "my-project-id"
// topicID := "my-topic"
ctx := context.Background()
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %w", err)
}
state := &statepb.State{
Name: "Alaska",
PostAbbr: "AK",
}
// Get the topic encoding type.
t := client.Topic(topicID)
cfg, err := t.Config(ctx)
if err != nil {
return fmt.Errorf("topic.Config err: %w", err)
}
encoding := cfg.SchemaSettings.Encoding
var msg []byte
switch encoding {
case pubsub.EncodingBinary:
msg, err = proto.Marshal(state)
if err != nil {
return fmt.Errorf("proto.Marshal err: %w", err)
}
case pubsub.EncodingJSON:
msg, err = protojson.Marshal(state)
if err != nil {
return fmt.Errorf("protojson.Marshal err: %w", err)
}
default:
return fmt.Errorf("invalid encoding: %v", encoding)
}
result := t.Publish(ctx, &pubsub.Message{
Data: msg,
})
_, err = result.Get(ctx)
if err != nil {
return fmt.Errorf("result.Get: %w", err)
}
fmt.Fprintf(w, "Published proto message with %#v encoding: %s\n", encoding, string(msg))
return nil
}
Java
이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Java 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Java API 참조 문서를 참조하세요.
Avro
import com.google.api.core.ApiFuture;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.Encoding;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import utilities.State;
public class PublishAvroRecordsExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
// Use a topic created with an Avro schema.
String topicId = "your-topic-id";
publishAvroRecordsExample(projectId, topicId);
}
public static void publishAvroRecordsExample(String projectId, String topicId)
throws IOException, ExecutionException, InterruptedException {
Encoding encoding = null;
TopicName topicName = TopicName.of(projectId, topicId);
// Get the topic encoding type.
try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
encoding = topicAdminClient.getTopic(topicName).getSchemaSettings().getEncoding();
}
// Instantiate an avro-tools-generated class defined in `us-states.avsc`.
State state = State.newBuilder().setName("Alaska").setPostAbbr("AK").build();
Publisher publisher = null;
block:
try {
publisher = Publisher.newBuilder(topicName).build();
// Prepare to serialize the object to the output stream.
ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
Encoder encoder = null;
// Prepare an appropriate encoder for publishing to the topic.
switch (encoding) {
case BINARY:
System.out.println("Preparing a BINARY encoder...");
encoder = EncoderFactory.get().directBinaryEncoder(byteStream, /*reuse=*/ null);
break;
case JSON:
System.out.println("Preparing a JSON encoder...");
encoder = EncoderFactory.get().jsonEncoder(State.getClassSchema(), byteStream);
break;
default:
break block;
}
// Encode the object and write it to the output stream.
state.customEncode(encoder);
encoder.flush();
// Publish the encoded object as a Pub/Sub message.
ByteString data = ByteString.copyFrom(byteStream.toByteArray());
PubsubMessage message = PubsubMessage.newBuilder().setData(data).build();
System.out.println("Publishing message: " + message);
ApiFuture<String> future = publisher.publish(message);
System.out.println("Published message ID: " + future.get());
} finally {
if (publisher != null) {
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
}
}
}
}
import com.google.api.core.ApiFuture;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.protobuf.ByteString;
import com.google.protobuf.util.JsonFormat;
import com.google.pubsub.v1.Encoding;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import utilities.StateProto.State;
public class PublishProtobufMessagesExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
// Use a topic created with a proto schema.
String topicId = "your-topic-id";
publishProtobufMessagesExample(projectId, topicId);
}
public static void publishProtobufMessagesExample(String projectId, String topicId)
throws IOException, ExecutionException, InterruptedException {
Encoding encoding = null;
TopicName topicName = TopicName.of(projectId, topicId);
// Get the topic encoding type.
try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
encoding = topicAdminClient.getTopic(topicName).getSchemaSettings().getEncoding();
}
Publisher publisher = null;
// Instantiate a protoc-generated class defined in `us-states.proto`.
State state = State.newBuilder().setName("Alaska").setPostAbbr("AK").build();
block:
try {
publisher = Publisher.newBuilder(topicName).build();
PubsubMessage.Builder message = PubsubMessage.newBuilder();
// Prepare an appropriately formatted message based on topic encoding.
switch (encoding) {
case BINARY:
message.setData(state.toByteString());
System.out.println("Publishing a BINARY-formatted message:\n" + message);
break;
case JSON:
String jsonString = JsonFormat.printer().omittingInsignificantWhitespace().print(state);
message.setData(ByteString.copyFromUtf8(jsonString));
System.out.println("Publishing a JSON-formatted message:\n" + message);
break;
default:
break block;
}
// Publish the message.
ApiFuture<String> future = publisher.publish(message.build());
System.out.println("Published message ID: " + future.get());
} finally {
if (publisher != null) {
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
}
}
}
}
Node.js
이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Node.js 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Node.js API 참조 문서를 참조하세요.
Avro/**
* TODO(developer): Uncomment this variable before running the sample.
*/
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// Imports the Google Cloud client library
const {PubSub, Encodings} = require('@google-cloud/pubsub');
// And the Apache Avro library
const avro = require('avro-js');
const fs = require('fs');
// Creates a client; cache this for further use
const pubSubClient = new PubSub();
async function publishAvroRecords(topicNameOrId) {
// Cache topic objects (publishers) and reuse them.
const topic = pubSubClient.topic(topicNameOrId);
// Get the topic metadata to learn about its schema encoding.
const [topicMetadata] = await topic.getMetadata();
const topicSchemaMetadata = topicMetadata.schemaSettings;
if (!topicSchemaMetadata) {
console.log(`Topic ${topicNameOrId} doesn't seem to have a schema.`);
return;
}
const schemaEncoding = topicSchemaMetadata.encoding;
// Make an encoder using the official avro-js library.
const definition = fs
.readFileSync('system-test/fixtures/provinces.avsc')
.toString();
const type = avro.parse(definition);
// Encode the message.
const province = {
name: 'Ontario',
post_abbr: 'ON',
};
let dataBuffer;
switch (schemaEncoding) {
case Encodings.Binary:
dataBuffer = type.toBuffer(province);
break;
case Encodings.Json:
dataBuffer = Buffer.from(type.toString(province));
break;
default:
console.log(`Unknown schema encoding: ${schemaEncoding}`);
break;
}
if (!dataBuffer) {
console.log(`Invalid encoding ${schemaEncoding} on the topic.`);
return;
}
const messageId = await topic.publish(dataBuffer);
console.log(`Avro record ${messageId} published.`);
}
/**
* TODO(developer): Uncomment this variable before running the sample.
*/
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// Imports the Google Cloud client library
const {PubSub, Encodings} = require('@google-cloud/pubsub');
// And the protobufjs library
const protobuf = require('protobufjs');
// Creates a client; cache this for further use
const pubSubClient = new PubSub();
async function publishProtobufMessages(topicNameOrId) {
// Cache topic objects (publishers) and reuse them.
const topic = pubSubClient.topic(topicNameOrId);
// Get the topic metadata to learn about its schema.
const [topicMetadata] = await topic.getMetadata();
const topicSchemaMetadata = topicMetadata.schemaSettings;
if (!topicSchemaMetadata) {
console.log(`Topic ${topicNameOrId} doesn't seem to have a schema.`);
return;
}
const schemaEncoding = topicSchemaMetadata.encoding;
// Encode the message.
const province = {
name: 'Ontario',
postAbbr: 'ON',
};
// Make an encoder using the protobufjs library.
//
// Since we're providing the test message for a specific schema here, we'll
// also code in the path to a sample proto definition.
const root = await protobuf.load('system-test/fixtures/provinces.proto');
const Province = root.lookupType('utilities.Province');
const message = Province.create(province);
let dataBuffer;
switch (schemaEncoding) {
case Encodings.Binary:
dataBuffer = Buffer.from(Province.encode(message).finish());
break;
case Encodings.Json:
dataBuffer = Buffer.from(JSON.stringify(message.toJSON()));
break;
default:
console.log(`Unknown schema encoding: ${schemaEncoding}`);
break;
}
if (!dataBuffer) {
console.log(`Invalid encoding ${schemaEncoding} on the topic.`);
return;
}
const messageId = await topic.publishMessage({data: dataBuffer});
console.log(`Protobuf message ${messageId} published.`);
}
Node.js
이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Node.js 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Node.js API 참조 문서를 참조하세요.
Avro/**
* TODO(developer): Uncomment this variable before running the sample.
*/
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// Imports the Google Cloud client library
import {PubSub, Encodings} from '@google-cloud/pubsub';
// And the Apache Avro library
import * as avro from 'avro-js';
import * as fs from 'fs';
// Creates a client; cache this for further use
const pubSubClient = new PubSub();
interface ProvinceObject {
name: string;
post_abbr: string;
}
async function publishAvroRecords(topicNameOrId: string) {
// Cache topic objects (publishers) and reuse them.
const topic = pubSubClient.topic(topicNameOrId);
// Get the topic metadata to learn about its schema encoding.
const [topicMetadata] = await topic.getMetadata();
const topicSchemaMetadata = topicMetadata.schemaSettings;
if (!topicSchemaMetadata) {
console.log(`Topic ${topicNameOrId} doesn't seem to have a schema.`);
return;
}
const schemaEncoding = topicSchemaMetadata.encoding;
// Make an encoder using the official avro-js library.
const definition = fs
.readFileSync('system-test/fixtures/provinces.avsc')
.toString();
const type = avro.parse(definition);
// Encode the message.
const province: ProvinceObject = {
name: 'Ontario',
post_abbr: 'ON',
};
let dataBuffer: Buffer | undefined;
switch (schemaEncoding) {
case Encodings.Binary:
dataBuffer = type.toBuffer(province);
break;
case Encodings.Json:
dataBuffer = Buffer.from(type.toString(province));
break;
default:
console.log(`Unknown schema encoding: ${schemaEncoding}`);
break;
}
if (!dataBuffer) {
console.log(`Invalid encoding ${schemaEncoding} on the topic.`);
return;
}
const messageId = await topic.publish(dataBuffer);
console.log(`Avro record ${messageId} published.`);
}
/**
* TODO(developer): Uncomment this variable before running the sample.
*/
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// Imports the Google Cloud client library
import {PubSub, Encodings} from '@google-cloud/pubsub';
// And the protobufjs library
import * as protobuf from 'protobufjs';
// Creates a client; cache this for further use
const pubSubClient = new PubSub();
interface ProvinceObject {
name: string;
postAbbr: string;
}
async function publishProtobufMessages(topicNameOrId: string) {
// Cache topic objects (publishers) and reuse them.
const topic = pubSubClient.topic(topicNameOrId);
// Get the topic metadata to learn about its schema.
const [topicMetadata] = await topic.getMetadata();
const topicSchemaMetadata = topicMetadata.schemaSettings;
if (!topicSchemaMetadata) {
console.log(`Topic ${topicNameOrId} doesn't seem to have a schema.`);
return;
}
const schemaEncoding = topicSchemaMetadata.encoding;
// Encode the message.
const province: ProvinceObject = {
name: 'Ontario',
postAbbr: 'ON',
};
// Make an encoder using the protobufjs library.
//
// Since we're providing the test message for a specific schema here, we'll
// also code in the path to a sample proto definition.
const root = await protobuf.load('system-test/fixtures/provinces.proto');
const Province = root.lookupType('utilities.Province');
const message = Province.create(province);
let dataBuffer: Buffer | undefined;
switch (schemaEncoding) {
case Encodings.Binary:
dataBuffer = Buffer.from(Province.encode(message).finish());
break;
case Encodings.Json:
dataBuffer = Buffer.from(JSON.stringify(message.toJSON()));
break;
default:
console.log(`Unknown schema encoding: ${schemaEncoding}`);
break;
}
if (!dataBuffer) {
console.log(`Invalid encoding ${schemaEncoding} on the topic.`);
return;
}
const messageId = await topic.publishMessage({data: dataBuffer});
console.log(`Protobuf message ${messageId} published.`);
}
PHP
이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 PHP 설정 안내를 따르세요. 자세한 내용은 Pub/Sub PHP API 참조 문서를 참조하세요.
Avrouse Google\Cloud\PubSub\PubSubClient;
use Google\Cloud\PubSub\V1\Encoding;
use AvroStringIO;
use AvroSchema;
use AvroIODatumWriter;
use AvroIOBinaryEncoder;
/**
* Publish a message using an AVRO schema.
*
* This sample uses `wikimedia/avro` for AVRO encoding.
*
* @param string $projectId
* @param string $topicId
* @param string $definitionFile
*/
function publish_avro_records($projectId, $topicId, $definitionFile)
{
$pubsub = new PubSubClient([
'projectId' => $projectId,
]);
$definition = (string) file_get_contents($definitionFile);
$messageData = [
'name' => 'Alaska',
'post_abbr' => 'AK',
];
$topic = $pubsub->topic($topicId);
// get the encoding type.
$topicInfo = $topic->info();
$encoding = '';
if (isset($topicInfo['schemaSettings']['encoding'])) {
$encoding = $topicInfo['schemaSettings']['encoding'];
}
// if encoding is not set, we can't continue.
if ($encoding === '') {
printf('Topic %s does not have schema enabled', $topicId);
return;
}
// If you are using gRPC, encoding may be an integer corresponding to an
// enum value on Google\Cloud\PubSub\V1\Encoding.
if (!is_string($encoding)) {
$encoding = Encoding::name($encoding);
}
$encodedMessageData = '';
if ($encoding == 'BINARY') {
// encode as AVRO binary.
$io = new AvroStringIO();
$schema = AvroSchema::parse($definition);
$writer = new AvroIODatumWriter($schema);
$encoder = new AvroIOBinaryEncoder($io);
$writer->write($messageData, $encoder);
$encodedMessageData = $io->string();
} else {
// encode as JSON.
$encodedMessageData = json_encode($messageData);
}
$topic->publish(['data' => $encodedMessageData]);
printf('Published message with %s encoding', $encoding);
}
use Google\Cloud\PubSub\PubSubClient;
use Google\Cloud\PubSub\V1\Encoding;
use Utilities\StateProto;
/**
* Publish a message using a protocol buffer schema.
*
* Relies on a proto message of the following form:
* ```
* syntax = "proto3";
*
* package utilities;
*
* message StateProto {
* string name = 1;
* string post_abbr = 2;
* }
* ```
*
* @param string $projectId
* @param string $topicId
* @return void
*/
function publish_proto_messages($projectId, $topicId)
{
$pubsub = new PubSubClient([
'projectId' => $projectId,
]);
$messageData = new StateProto([
'name' => 'Alaska',
'post_abbr' => 'AK',
]);
$topic = $pubsub->topic($topicId);
// get the encoding type.
$topicInfo = $topic->info();
$encoding = '';
if (isset($topicInfo['schemaSettings']['encoding'])) {
$encoding = $topicInfo['schemaSettings']['encoding'];
}
// if encoding is not set, we can't continue.
if ($encoding === '') {
printf('Topic %s does not have schema enabled', $topicId);
return;
}
// If you are using gRPC, encoding may be an integer corresponding to an
// enum value on Google\Cloud\PubSub\V1\Encoding.
if (!is_string($encoding)) {
$encoding = Encoding::name($encoding);
}
$encodedMessageData = '';
if ($encoding == 'BINARY') {
// encode as protobuf binary.
$encodedMessageData = $messageData->serializeToString();
} else {
// encode as JSON.
$encodedMessageData = $messageData->serializeToJsonString();
}
$topic->publish(['data' => $encodedMessageData]);
printf('Published message with %s encoding', $encoding);
}
Python
이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Python 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Python API 참조 문서를 참조하세요.
Avrofrom avro.io import BinaryEncoder, DatumWriter
import avro.schema as schema
import io
import json
from google.api_core.exceptions import NotFound
from google.cloud.pubsub import PublisherClient
from google.pubsub_v1.types import Encoding
# TODO(developer): Replace these variables before running the sample.
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# avsc_file = "path/to/an/avro/schema/file/(.avsc)/formatted/in/json"
publisher_client = PublisherClient()
topic_path = publisher_client.topic_path(project_id, topic_id)
# Prepare to write Avro records to the binary output stream.
with open(avsc_file, "rb") as file:
avro_schema = schema.parse(file.read())
writer = DatumWriter(avro_schema)
bout = io.BytesIO()
# Prepare some data using a Python dictionary that matches the Avro schema
record = {"name": "Alaska", "post_abbr": "AK"}
try:
# Get the topic encoding type.
topic = publisher_client.get_topic(request={"topic": topic_path})
encoding = topic.schema_settings.encoding
# Encode the data according to the message serialization type.
if encoding == Encoding.BINARY:
encoder = BinaryEncoder(bout)
writer.write(record, encoder)
data = bout.getvalue()
print(f"Preparing a binary-encoded message:\n{data.decode()}")
elif encoding == Encoding.JSON:
data_str = json.dumps(record)
print(f"Preparing a JSON-encoded message:\n{data_str}")
data = data_str.encode("utf-8")
else:
print(f"No encoding specified in {topic_path}. Abort.")
exit(0)
future = publisher_client.publish(topic_path, data)
print(f"Published message ID: {future.result()}")
except NotFound:
print(f"{topic_id} not found.")
from google.api_core.exceptions import NotFound
from google.cloud.pubsub import PublisherClient
from google.protobuf.json_format import MessageToJson
from google.pubsub_v1.types import Encoding
from utilities import us_states_pb2 # type: ignore
# TODO(developer): Replace these variables before running the sample.
# project_id = "your-project-id"
# topic_id = "your-topic-id"
publisher_client = PublisherClient()
topic_path = publisher_client.topic_path(project_id, topic_id)
try:
# Get the topic encoding type.
topic = publisher_client.get_topic(request={"topic": topic_path})
encoding = topic.schema_settings.encoding
# Instantiate a protoc-generated class defined in `us-states.proto`.
state = us_states_pb2.StateProto()
state.name = "Alaska"
state.post_abbr = "AK"
# Encode the data according to the message serialization type.
if encoding == Encoding.BINARY:
data = state.SerializeToString()
print(f"Preparing a binary-encoded message:\n{data}")
elif encoding == Encoding.JSON:
json_object = MessageToJson(state)
data = str(json_object).encode("utf-8")
print(f"Preparing a JSON-encoded message:\n{data}")
else:
print(f"No encoding specified in {topic_path}. Abort.")
exit(0)
future = publisher_client.publish(topic_path, data)
print(f"Published message ID: {future.result()}")
except NotFound:
print(f"{topic_id} not found.")
Ruby
이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Ruby 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Ruby API 참조 문서를 참조하세요.
Avro# topic_id = "your-topic-id"
# avsc_file = "path/to/an/avro/schema/file/(.avsc)/formatted/in/json"
pubsub = Google::Cloud::Pubsub.new
topic = pubsub.topic topic_id
record = { "name" => "Alaska", "post_abbr" => "AK" }
if topic.message_encoding_binary?
require "avro"
avro_schema = Avro::Schema.parse File.read(avsc_file)
writer = Avro::IO::DatumWriter.new avro_schema
buffer = StringIO.new
encoder = Avro::IO::BinaryEncoder.new buffer
writer.write record, encoder
topic.publish buffer
puts "Published binary-encoded AVRO message."
elsif topic.message_encoding_json?
require "json"
topic.publish record.to_json
puts "Published JSON-encoded AVRO message."
else
raise "No encoding specified in #{topic.name}."
end
# topic_id = "your-topic-id"
pubsub = Google::Cloud::Pubsub.new
topic = pubsub.topic topic_id
state = Utilities::StateProto.new name: "Alaska", post_abbr: "AK"
if topic.message_encoding_binary?
topic.publish Utilities::StateProto.encode(state)
puts "Published binary-encoded protobuf message."
elsif topic.message_encoding_json?
topic.publish Utilities::StateProto.encode_json(state)
puts "Published JSON-encoded protobuf message."
else
raise "No encoding specified in #{topic.name}."
end
다음 단계
Pub/Sub에서 메시지 데이터를 저장하는 위치를 제한하려면 Pub/Sub 리소스 위치 제한을 참조하세요.
메시지 수신에 대한 자세한 내용은 구독 유형 선택을 참조하세요.