순서 키로 게시가 실패하면 이 순서 키의 이후 게시 요청은 물론 게시자의 동일 순서 키에 대해 큐에 추가된 메시지가 실패합니다. 이 샘플은 이러한 오류가 발생할 때 순서 키가 사용 설정된 상태로 게시를 재개하는 방법을 보여줍니다.
더 살펴보기
이 코드 샘플이 포함된 자세한 문서는 다음을 참조하세요.
코드 샘플
C++
이 샘플을 사용해 보기 전에 Pub/Sub 빠른 시작: 클라이언트 라이브러리 사용의 C++ 설정 안내를 따르세요. 자세한 내용은 Pub/Sub C++ API 참조 문서를 확인하세요.
namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::StatusOr;
[](pubsub::Publisher publisher) {
struct SampleData {
std::string ordering_key;
std::string data;
} data[] = {
{"key1", "message1"}, {"key2", "message2"}, {"key1", "message3"},
{"key1", "message4"}, {"key1", "message5"},
};
std::vector<future<void>> done;
for (auto& datum : data) {
auto const& da = datum; // workaround MSVC lambda capture confusion
auto handler = [da, publisher](future<StatusOr<std::string>> f) mutable {
auto const msg = da.ordering_key + "#" + da.data;
auto id = f.get();
if (!id) {
std::cout << "An error has occurred publishing " << msg << "\n";
publisher.ResumePublish(da.ordering_key);
return;
}
std::cout << "Message " << msg << " published as id=" << *id << "\n";
};
done.push_back(
publisher
.Publish(pubsub::MessageBuilder{}
.SetData("Hello World! [" + datum.data + "]")
.SetOrderingKey(datum.ordering_key)
.Build())
.then(handler));
}
publisher.Flush();
// Block until all the messages are published (optional)
for (auto& f : done) f.get();
}
C#
이 샘플을 사용해 보기 전에 Pub/Sub 빠른 시작: 클라이언트 라이브러리 사용의 C# 설정 안내를 따르세요. 자세한 내용은 Pub/Sub C# API 참조 문서를 확인하세요.
using Google.Cloud.PubSub.V1;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
public class ResumePublishSample
{
public async Task<int> PublishOrderedMessagesAsync(string projectId, string topicId, IEnumerable<(string, string)> keysAndMessages)
{
TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
var customSettings = new PublisherClient.Settings
{
EnableMessageOrdering = true
};
PublisherClient publisher = await new PublisherClientBuilder
{
TopicName = topicName,
Settings = customSettings
}.BuildAsync();
int publishedMessageCount = 0;
var publishTasks = keysAndMessages.Select(async keyAndMessage =>
{
try
{
string message = await publisher.PublishAsync(keyAndMessage.Item1, keyAndMessage.Item2);
Console.WriteLine($"Published message {message}");
Interlocked.Increment(ref publishedMessageCount);
}
catch (Exception exception)
{
Console.WriteLine($"An error occurred when publishing message {keyAndMessage.Item2}: {exception.Message}");
publisher.ResumePublish(keyAndMessage.Item1);
}
});
await Task.WhenAll(publishTasks);
return publishedMessageCount;
}
}
Go
이 샘플을 사용해 보기 전에 Pub/Sub 빠른 시작: 클라이언트 라이브러리 사용의 Go 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Go API 참조 문서를 확인하세요.
import (
"context"
"fmt"
"io"
"cloud.google.com/go/pubsub"
"google.golang.org/api/option"
)
func resumePublishWithOrderingKey(w io.Writer, projectID, topicID string) {
// projectID := "my-project-id"
// topicID := "my-topic"
ctx := context.Background()
// Sending messages to the same region ensures they are received in order
// even when multiple publishers are used.
client, err := pubsub.NewClient(ctx, projectID,
option.WithEndpoint("us-east1-pubsub.googleapis.com:443"))
if err != nil {
fmt.Fprintf(w, "pubsub.NewClient: %v", err)
return
}
defer client.Close()
t := client.Topic(topicID)
t.EnableMessageOrdering = true
key := "some-ordering-key"
res := t.Publish(ctx, &pubsub.Message{
Data: []byte("some-message"),
OrderingKey: key,
})
_, err = res.Get(ctx)
if err != nil {
// Error handling code can be added here.
fmt.Printf("Failed to publish: %s\n", err)
// Resume publish on an ordering key that has had unrecoverable errors.
// After such an error publishes with this ordering key will fail
// until this method is called.
t.ResumePublish(key)
}
fmt.Fprint(w, "Published a message with ordering key successfully\n")
}
Java
이 샘플을 사용해 보기 전에 Pub/Sub 빠른 시작: 클라이언트 라이브러리 사용의 Java 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Java API 참조 문서를 확인하세요.
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
public class ResumePublishWithOrderingKeys {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
// Choose an existing topic.
String topicId = "your-topic-id";
resumePublishWithOrderingKeysExample(projectId, topicId);
}
public static void resumePublishWithOrderingKeysExample(String projectId, String topicId)
throws IOException, InterruptedException {
TopicName topicName = TopicName.of(projectId, topicId);
// Create a publisher and set message ordering to true.
Publisher publisher =
Publisher.newBuilder(topicName)
.setEnableMessageOrdering(true)
.setEndpoint("us-east1-pubsub.googleapis.com:443")
.build();
try {
Map<String, String> messages = new LinkedHashMap<String, String>();
messages.put("message1", "key1");
messages.put("message2", "key2");
messages.put("message3", "key1");
messages.put("message4", "key2");
for (Map.Entry<String, String> entry : messages.entrySet()) {
ByteString data = ByteString.copyFromUtf8(entry.getKey());
PubsubMessage pubsubMessage =
PubsubMessage.newBuilder().setData(data).setOrderingKey(entry.getValue()).build();
ApiFuture<String> future = publisher.publish(pubsubMessage);
// Add an asynchronous callback to handle publish success / failure.
ApiFutures.addCallback(
future,
new ApiFutureCallback<String>() {
@Override
public void onFailure(Throwable throwable) {
if (throwable instanceof ApiException) {
ApiException apiException = ((ApiException) throwable);
// Details on the API exception.
System.out.println(apiException.getStatusCode().getCode());
System.out.println(apiException.isRetryable());
}
System.out.println("Error publishing message : " + pubsubMessage.getData());
// (Beta) Must call resumePublish to reset key and continue publishing with order.
publisher.resumePublish(pubsubMessage.getOrderingKey());
}
@Override
public void onSuccess(String messageId) {
// Once published, returns server-assigned message ids (unique within the topic).
System.out.println(pubsubMessage.getData() + " : " + messageId);
}
},
MoreExecutors.directExecutor());
}
} finally {
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
}
}
}
Node.js
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const data = JSON.stringify({foo: 'bar'});
// const orderingKey = 'key1';
// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');
// Creates a client; cache this for further use
const pubSubClient = new PubSub();
async function resumePublish(topicNameOrId, data, orderingKey) {
// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);
const publishOptions = {
messageOrdering: true,
};
// Publishes the message
const publisher = pubSubClient.topic(topicNameOrId, publishOptions);
try {
const message = {
data: dataBuffer,
orderingKey: orderingKey,
};
const messageId = await publisher.publishMessage(message);
console.log(`Message ${messageId} published.`);
return messageId;
} catch (e) {
console.log(`Could not publish: ${e}`);
publisher.resumePublishing(orderingKey);
return null;
}
}
Node.js
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const data = JSON.stringify({foo: 'bar'});
// const orderingKey = 'key1';
// Imports the Google Cloud client library
import {PublishOptions, PubSub} from '@google-cloud/pubsub';
// Creates a client; cache this for further use
const pubSubClient = new PubSub();
async function resumePublish(
topicNameOrId: string,
data: string,
orderingKey: string
) {
// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);
const publishOptions: PublishOptions = {
messageOrdering: true,
};
// Publishes the message
const publisher = pubSubClient.topic(topicNameOrId, publishOptions);
try {
const message = {
data: dataBuffer,
orderingKey: orderingKey,
};
const messageId = await publisher.publishMessage(message);
console.log(`Message ${messageId} published.`);
return messageId;
} catch (e) {
console.log(`Could not publish: ${e}`);
publisher.resumePublishing(orderingKey);
return null;
}
}
Python
이 샘플을 사용해 보기 전에 Pub/Sub 빠른 시작: 클라이언트 라이브러리 사용의 Python 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Python API 참조 문서를 확인하세요.
from google.cloud import pubsub_v1
# TODO(developer): Choose an existing topic.
# project_id = "your-project-id"
# topic_id = "your-topic-id"
publisher_options = pubsub_v1.types.PublisherOptions(enable_message_ordering=True)
# Sending messages to the same region ensures they are received in order
# even when multiple publishers are used.
client_options = {"api_endpoint": "us-east1-pubsub.googleapis.com:443"}
publisher = pubsub_v1.PublisherClient(
publisher_options=publisher_options, client_options=client_options
)
# The `topic_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/topics/{topic_id}`
topic_path = publisher.topic_path(project_id, topic_id)
for message in [
("message1", "key1"),
("message2", "key2"),
("message3", "key1"),
("message4", "key2"),
]:
# Data must be a bytestring
data = message[0].encode("utf-8")
ordering_key = message[1]
# When you publish a message, the client returns a future.
future = publisher.publish(topic_path, data=data, ordering_key=ordering_key)
try:
print(future.result())
except RuntimeError:
# Resume publish on an ordering key that has had unrecoverable errors.
publisher.resume_publish(topic_path, ordering_key)
print(f"Resumed publishing messages with ordering keys to {topic_path}.")
Ruby
이 샘플을 사용해 보기 전에 Pub/Sub 빠른 시작: 클라이언트 라이브러리 사용의 Ruby 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Ruby API 참조 문서를 확인하세요.
# topic_id = "your-topic-id"
require "google/cloud/pubsub"
pubsub = Google::Cloud::Pubsub.new
# Start sending messages in one request once the size of all queued messages
# reaches 1 MB or the number of queued messages reaches 20
topic = pubsub.topic topic_id, async: {
max_bytes: 1_000_000,
max_messages: 20
}
topic.enable_message_ordering!
10.times do |i|
topic.publish_async "This is message \##{i}.",
ordering_key: "ordering-key" do |result|
if result.succeeded?
puts "Message \##{i} successfully published."
else
puts "Message \##{i} failed to publish"
# Allow publishing to continue on "ordering-key" after processing the
# failure.
topic.resume_publish "ordering-key"
end
end
end
# Stop the async_publisher to send all queued messages immediately.
topic.async_publisher.stop!
다음 단계
다른 Google Cloud 제품의 코드 샘플을 검색하고 필터링하려면 Google Cloud 샘플 브라우저를 참조하세요.