本文档介绍了如何发布消息。
如需了解如何创建、删除及管理主题和订阅,请参阅管理主题和订阅。
如需按主题限制存储消息数据的位置,请参阅限制 Pub/Sub 资源位置。
如需详细了解如何接收消息,请参阅订阅者指南。
发布者应用会创建消息,然后将其发送到主题。如订阅者概览所述,Pub/Sub 会为现有订阅者提供至少一次消息传送及尽力排序功能。
发布者应用的一般流程如下所示:
- 创建一条包含您的数据的消息。
- 向 Pub/Sub 服务器发送请求以将该消息发布到预期的主题。
消息格式
消息由包含消息数据和元数据的字段组成。在消息中至少指定以下内容之一:
如果您使用的是REST API,则消息数据必须采用 base64 编码。
Pub/Sub 服务将以下字段添加到消息中:
- 主题专属的消息 ID
- Pub/Sub 服务接收消息的时间的时间戳
发布消息
您可以使用 gcloud
命令行工具或 Pub/Sub API 发布消息。客户端库可以异步发布消息。
控制台
要发布消息,请执行以下操作
在 Cloud Console 中,转到 Pub/Sub 主题 页面。
点击主题 ID。
在主题详情页面中,点击发布消息。
在消息正文字段中,输入消息数据。
可选:添加消息属性。
点击添加消息属性。
为属性输入键和值。
点击发布。
gcloud
要发布消息,请使用 gcloud pubsub topics publish 命令:
gcloud pubsub topics publish TOPIC_ID \ --message=MESSAGE_DATA \ [--attribute=KEY="VALUE",...]
请替换以下内容:
- TOPIC_ID:主题的 ID
- MESSAGE_DATA:包含消息数据的 base64 编码字符串
- KEY:消息属性的键
- VALUE:消息属性的键对应的值
REST
要发布消息,请发送如下所示的 POST 请求:
POST https://pubsub.googleapis.com/v1/projects/PROJECT_ID/topics/TOPIC_ID:publish Authorization: Bearer $(gcloud auth application-default print-access-token)
请替换以下内容:
- PROJECT_ID:包含主题的项目的 ID
- TOPIC_ID:主题的 ID
在请求正文中指定以下字段:
{ "messages": [ { "attributes": { "KEY": "VALUE", ... }, "data": "MESSAGE_DATA", } ] }
请替换以下内容:
- KEY:消息属性的键
- VALUE:消息属性的键对应的值
- MESSAGE_DATA:包含消息数据的 base64 编码字符串
消息必须包含非空的数据字段或至少一个特性。
如果请求成功,则响应是一个包含消息 ID 的 JSON 对象。以下是包含消息 ID 的响应:
{ "messageIds": [ "19916711285", ] }
C++
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C++ 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C++ API 参考文档。
namespace pubsub = google::cloud::pubsub;
using google::cloud::future;
using google::cloud::StatusOr;
[](pubsub::Publisher publisher) {
auto message_id = publisher.Publish(
pubsub::MessageBuilder{}.SetData("Hello World!").Build());
auto done = message_id.then([](future<StatusOr<std::string>> f) {
auto id = f.get();
if (!id) throw std::runtime_error(id.status().message());
std::cout << "Hello World! published with id=" << *id << "\n";
});
// Block until the message is published
done.get();
}
C#
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C# 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub C# API 参考文档。
using Google.Cloud.PubSub.V1;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
public class PublishMessagesAsyncSample
{
public async Task<int> PublishMessagesAsync(string projectId, string topicId, IEnumerable<string> messageTexts)
{
TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
PublisherClient publisher = await PublisherClient.CreateAsync(topicName);
int publishedMessageCount = 0;
var publishTasks = messageTexts.Select(async text =>
{
try
{
string message = await publisher.PublishAsync(text);
Console.WriteLine($"Published message {message}");
Interlocked.Increment(ref publishedMessageCount);
}
catch (Exception exception)
{
Console.WriteLine($"An error ocurred when publishing message {text}: {exception.Message}");
}
});
await Task.WhenAll(publishTasks);
return publishedMessageCount;
}
}
Go
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Go 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Go API 参考文档。
import (
"context"
"fmt"
"io"
"strconv"
"sync"
"sync/atomic"
"cloud.google.com/go/pubsub"
)
func publishThatScales(w io.Writer, projectID, topicID string, n int) error {
// projectID := "my-project-id"
// topicID := "my-topic"
ctx := context.Background()
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
var wg sync.WaitGroup
var totalErrors uint64
t := client.Topic(topicID)
for i := 0; i < n; i++ {
result := t.Publish(ctx, &pubsub.Message{
Data: []byte("Message " + strconv.Itoa(i)),
})
wg.Add(1)
go func(i int, res *pubsub.PublishResult) {
defer wg.Done()
// The Get method blocks until a server-generated ID or
// an error is returned for the published message.
id, err := res.Get(ctx)
if err != nil {
// Error handling code can be added here.
fmt.Fprintf(w, "Failed to publish: %v", err)
atomic.AddUint64(&totalErrors, 1)
return
}
fmt.Fprintf(w, "Published message %d; msg ID: %v\n", i, id)
}(i, result)
}
wg.Wait()
if totalErrors > 0 {
return fmt.Errorf("%d of %d messages did not publish successfully", totalErrors, n)
}
return nil
}
Java
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档。
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class PublishWithErrorHandlerExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
String topicId = "your-topic-id";
publishWithErrorHandlerExample(projectId, topicId);
}
public static void publishWithErrorHandlerExample(String projectId, String topicId)
throws IOException, InterruptedException {
TopicName topicName = TopicName.of(projectId, topicId);
Publisher publisher = null;
try {
// Create a publisher instance with default settings bound to the topic
publisher = Publisher.newBuilder(topicName).build();
List<String> messages = Arrays.asList("first message", "second message");
for (final String message : messages) {
ByteString data = ByteString.copyFromUtf8(message);
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
// Once published, returns a server-assigned message id (unique within the topic)
ApiFuture<String> future = publisher.publish(pubsubMessage);
// Add an asynchronous callback to handle success / failure
ApiFutures.addCallback(
future,
new ApiFutureCallback<String>() {
@Override
public void onFailure(Throwable throwable) {
if (throwable instanceof ApiException) {
ApiException apiException = ((ApiException) throwable);
// details on the API exception
System.out.println(apiException.getStatusCode().getCode());
System.out.println(apiException.isRetryable());
}
System.out.println("Error publishing message : " + message);
}
@Override
public void onSuccess(String messageId) {
// Once published, returns server-assigned message ids (unique within the topic)
System.out.println("Published message ID: " + messageId);
}
},
MoreExecutors.directExecutor());
}
} finally {
if (publisher != null) {
// When finished with the publisher, shutdown to free up resources.
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
}
}
}
}
Node.js
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档。
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const topicName = 'YOUR_TOPIC_NAME';
// const data = JSON.stringify({foo: 'bar'});
// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');
// Creates a client; cache this for further use
const pubSubClient = new PubSub();
async function publishMessage() {
// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);
try {
const messageId = await pubSubClient.topic(topicName).publish(dataBuffer);
console.log(`Message ${messageId} published.`);
} catch (error) {
console.error(`Received error while publishing: ${error.message}`);
process.exitCode = 1;
}
}
publishMessage();
PHP
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 PHP 设置说明进行操作。如需了解详情,请参阅 Pub/Sub PHP API 参考文档。
use Google\Cloud\PubSub\PubSubClient;
/**
* Publishes a message for a Pub/Sub topic.
*
* @param string $projectId The Google project ID.
* @param string $topicName The Pub/Sub topic name.
* @param string $message The message to publish.
*/
function publish_message($projectId, $topicName, $message)
{
$pubsub = new PubSubClient([
'projectId' => $projectId,
]);
$topic = $pubsub->topic($topicName);
$topic->publish(['data' => $message]);
print('Message published' . PHP_EOL);
}
Python
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档。
"""Publishes multiple messages to a Pub/Sub topic with an error handler."""
import time
from google.cloud import pubsub_v1
# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)
futures = dict()
def get_callback(f, data):
def callback(f):
try:
print(f.result())
futures.pop(data)
except: # noqa
print("Please handle {} for {}.".format(f.exception(), data))
return callback
for i in range(10):
data = str(i)
futures.update({data: None})
# When you publish a message, the client returns a future.
future = publisher.publish(topic_path, data.encode("utf-8"))
futures[data] = future
# Publish failures shall be handled in the callback function.
future.add_done_callback(get_callback(future, data))
# Wait for all the publish futures to resolve before exiting.
while futures:
time.sleep(5)
print(f"Published messages with error handler to {topic_path}.")
Ruby
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Ruby 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Ruby API 参考文档。
# topic_name = "Your Pubsub topic name"
require "google/cloud/pubsub"
pubsub = Google::Cloud::Pubsub.new
topic = pubsub.topic topic_name
begin
topic.publish_async "This is a test message." do |result|
raise "Failed to publish the message." unless result.succeeded?
puts "Message published asynchronously."
end
# Stop the async_publisher to send all queued messages immediately.
topic.async_publisher.stop.wait!
rescue StandardError => e
puts "Received error while publishing: #{e.message}"
end
发布消息后,Pub/Sub 服务将消息 ID 返回给发布者。
使用属性
您可以在 Pub/Sub 消息中嵌入自定义属性作为元数据。属性可以是文本字符串或字节字符串。消息架构可通过以下方式表示:
{ "data": string, "attributes": { string: string, ... }, "messageId": string, "publishTime": string, "orderingKey": string }
PubsubMessage
JSON 架构作为 REST 和远程过程调用 (RPC) 文档的一部分发布。
gcloud
gcloud pubsub topics publish my-topic --message="hello" \ --attribute="origin=gcloud-sample,username=gcp"
C++
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C++ 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C++ API 参考文档。
namespace pubsub = google::cloud::pubsub;
using google::cloud::future;
using google::cloud::StatusOr;
[](pubsub::Publisher publisher) {
std::vector<future<void>> done;
for (int i = 0; i != 10; ++i) {
auto message_id = publisher.Publish(
pubsub::MessageBuilder{}
.SetData("Hello World! [" + std::to_string(i) + "]")
.SetAttribute("origin", "cpp-sample")
.SetAttribute("username", "gcp")
.Build());
done.push_back(message_id.then([i](future<StatusOr<std::string>> f) {
auto id = f.get();
if (!id) throw std::runtime_error(id.status().message());
std::cout << "Message " << i << " published with id=" << *id << "\n";
}));
}
publisher.Flush();
// Block until all the messages are published (optional)
for (auto& f : done) f.get();
}
C#
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C# 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub C# API 参考文档。
using Google.Cloud.PubSub.V1;
using Google.Protobuf;
using System;
using System.Threading.Tasks;
public class PublishMessageWithCustomAttributesAsyncSample
{
public async Task PublishMessageWithCustomAttributesAsync(string projectId, string topicId, string messageText)
{
TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
PublisherClient publisher = await PublisherClient.CreateAsync(topicName);
var pubsubMessage = new PubsubMessage
{
// The data is any arbitrary ByteString. Here, we're using text.
Data = ByteString.CopyFromUtf8(messageText),
// The attributes provide metadata in a string-to-string dictionary.
Attributes =
{
{ "year", "2020" },
{ "author", "unknown" }
}
};
string message = await publisher.PublishAsync(pubsubMessage);
Console.WriteLine($"Published message {message}");
}
}
Go
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Go 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Go API 参考文档。
import (
"context"
"fmt"
"io"
"cloud.google.com/go/pubsub"
)
func publishCustomAttributes(w io.Writer, projectID, topicID string) error {
// projectID := "my-project-id"
// topicID := "my-topic"
ctx := context.Background()
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
t := client.Topic(topicID)
result := t.Publish(ctx, &pubsub.Message{
Data: []byte("Hello world!"),
Attributes: map[string]string{
"origin": "golang",
"username": "gcp",
},
})
// Block until the result is returned and a server-generated
// ID is returned for the published message.
id, err := result.Get(ctx)
if err != nil {
return fmt.Errorf("Get: %v", err)
}
fmt.Fprintf(w, "Published message with custom attributes; msg ID: %v\n", id)
return nil
}
Java
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档。
import com.google.api.core.ApiFuture;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
public class PublishWithCustomAttributesExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
String topicId = "your-topic-id";
publishWithCustomAttributesExample(projectId, topicId);
}
public static void publishWithCustomAttributesExample(String projectId, String topicId)
throws IOException, ExecutionException, InterruptedException {
TopicName topicName = TopicName.of(projectId, topicId);
Publisher publisher = null;
try {
// Create a publisher instance with default settings bound to the topic
publisher = Publisher.newBuilder(topicName).build();
String message = "first message";
ByteString data = ByteString.copyFromUtf8(message);
PubsubMessage pubsubMessage =
PubsubMessage.newBuilder()
.setData(data)
.putAllAttributes(ImmutableMap.of("year", "2020", "author", "unknown"))
.build();
// Once published, returns a server-assigned message id (unique within the topic)
ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
String messageId = messageIdFuture.get();
System.out.println("Published a message with custom attributes: " + messageId);
} finally {
if (publisher != null) {
// When finished with the publisher, shutdown to free up resources.
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
}
}
}
}
Node.js
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档。
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const topicName = 'YOUR_TOPIC_NAME';
// const data = JSON.stringify({foo: 'bar'});
// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');
// Creates a client; cache this for further use
const pubSubClient = new PubSub();
async function publishMessageWithCustomAttributes() {
// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);
// Add two custom attributes, origin and username, to the message
const customAttributes = {
origin: 'nodejs-sample',
username: 'gcp',
};
const messageId = await pubSubClient
.topic(topicName)
.publish(dataBuffer, customAttributes);
console.log(`Message ${messageId} published.`);
}
publishMessageWithCustomAttributes().catch(console.error);
Python
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档。
from google.cloud import pubsub_v1
# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)
for n in range(1, 10):
data = "Message number {}".format(n)
# Data must be a bytestring
data = data.encode("utf-8")
# Add two attributes, origin and username, to the message
future = publisher.publish(
topic_path, data, origin="python-sample", username="gcp"
)
print(future.result())
print(f"Published messages with custom attributes to {topic_path}.")
Ruby
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Ruby 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Ruby API 参考文档。
# topic_name = "Your Pubsub topic name"
require "google/cloud/pubsub"
pubsub = Google::Cloud::Pubsub.new
topic = pubsub.topic topic_name
# Add two attributes, origin and username, to the message
topic.publish_async "This is a test message.",
origin: "ruby-sample",
username: "gcp" do |result|
raise "Failed to publish the message." unless result.succeeded?
puts "Message with custom attributes published asynchronously."
end
# Stop the async_publisher to send all queued messages immediately.
topic.async_publisher.stop.wait!
使用排序键
如果消息具有相同的排序键,并且您要将消息发布到同一地区,则订阅者可以按顺序接收消息。发布带有排序键的消息可能会增加延迟时间。如需将消息发布到同一地区,请使用地区性端点。
您可以使用 Cloud Console、gcloud
命令行工具或 Pub/Sub API 发布带有排序键的消息。
控制台
在 Cloud Console 中,转到 Pub/Sub 主题 页面。
点击主题 ID。
在主题详情页面中,点击发布消息。
在消息正文字段中,输入消息数据。
在排序键字段中,输入排序键。
点击发布。
gcloud
如需发布带有排序键的消息,请使用 gcloud pubsub topics publish
命令和 --ordering-key
标志:
gcloud pubsub topics publish TOPIC_ID \ --message=MESSAGE_DATA \ --ordering-key=ORDERING_KEY
请替换以下内容:
- TOPIC_ID:主题的 ID
- MESSAGE_DATA:包含消息数据的 base64 编码字符串
- ORDERING_KEY:带有排序键的字符串
REST
如需发布带有排序键的消息,请发送如下所示的 POST 请求:
POST https://pubsub.googleapis.com/v1/projects/PROJECT_ID/topics/TOPIC_ID:publish Authorization: Bearer $(gcloud auth application-default print-access-token)
请替换以下内容:
- PROJECT_ID:包含主题的项目的 ID
- TOPIC_ID:主题的 ID
在请求正文中指定以下字段:
{ "messages": [ { "attributes": { "KEY": "VALUE", ... }, "data": "MESSAGE_DATA", "ordering_key": "ORDERING_KEY", } ] }
请替换以下内容:
- KEY:消息属性的键
- VALUE:消息属性的键对应的值
- MESSAGE_DATA:包含消息数据的 base64 编码字符串
- ORDERING_KEY:带有排序键的字符串
消息必须包含非空的数据字段或至少一个特性。
如果请求成功,则响应是一个包含消息 ID 的 JSON 对象。以下是包含消息 ID 的响应:
{ "messageIds": [ "19916711285", ] }
C++
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C++ 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C++ API 参考文档。
namespace pubsub = google::cloud::pubsub;
using google::cloud::future;
using google::cloud::StatusOr;
[](pubsub::Publisher publisher) {
struct SampleData {
std::string ordering_key;
std::string data;
} data[] = {
{"key1", "message1"}, {"key2", "message2"}, {"key1", "message3"},
{"key1", "message4"}, {"key1", "message5"},
};
std::vector<future<void>> done;
for (auto const& datum : data) {
auto message_id =
publisher.Publish(pubsub::MessageBuilder{}
.SetData("Hello World! [" + datum.data + "]")
.SetOrderingKey(datum.ordering_key)
.Build());
std::string ack_id = datum.ordering_key + "#" + datum.data;
done.push_back(message_id.then([ack_id](future<StatusOr<std::string>> f) {
auto id = f.get();
if (!id) throw std::runtime_error(id.status().message());
std::cout << "Message " << ack_id << " published with id=" << *id
<< "\n";
}));
}
publisher.Flush();
// Block until all the messages are published (optional)
for (auto& f : done) f.get();
}
C#
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C# 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub C# API 参考文档。
using Google.Cloud.PubSub.V1;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
public class PublishOrderedMessagesAsyncSample
{
public async Task<int> PublishOrderedMessagesAsync(string projectId, string topicId, IEnumerable<(string, string)> keysAndMessages)
{
TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
var customSettings = new PublisherClient.Settings
{
EnableMessageOrdering = true
};
// Sending messages to the same region ensures they are received in order even when multiple publishers are used.
var clientCreationSettings = new PublisherClient.ClientCreationSettings(serviceEndpoint: "us-east1-pubsub.googleapis.com:443");
PublisherClient publisher = await PublisherClient.CreateAsync(topicName, clientCreationSettings, customSettings);
int publishedMessageCount = 0;
var publishTasks = keysAndMessages.Select(async keyAndMessage =>
{
try
{
string message = await publisher.PublishAsync(keyAndMessage.Item1, keyAndMessage.Item2);
Console.WriteLine($"Published message {message}");
Interlocked.Increment(ref publishedMessageCount);
}
catch (Exception exception)
{
Console.WriteLine($"An error occurred when publishing message {keyAndMessage.Item2}: {exception.Message}");
}
});
await Task.WhenAll(publishTasks);
return publishedMessageCount;
}
}
Go
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Go 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Go API 参考文档。
import (
"context"
"fmt"
"io"
"sync"
"sync/atomic"
"cloud.google.com/go/pubsub"
"google.golang.org/api/option"
)
func publishWithOrderingKey(w io.Writer, projectID, topicID string) {
// projectID := "my-project-id"
// topicID := "my-topic"
ctx := context.Background()
// Sending messages to the same region ensures they are received in order
// even when multiple publishers are used.
client, err := pubsub.NewClient(ctx, projectID,
option.WithEndpoint("us-east1-pubsub.googleapis.com:443"))
if err != nil {
fmt.Fprintf(w, "pubsub.NewClient: %v", err)
return
}
defer client.Close()
var wg sync.WaitGroup
var totalErrors uint64
t := client.Topic(topicID)
t.EnableMessageOrdering = true
messages := []struct {
message string
orderingKey string
}{
{
message: "message1",
orderingKey: "key1",
},
{
message: "message2",
orderingKey: "key2",
},
{
message: "message3",
orderingKey: "key1",
},
{
message: "message4",
orderingKey: "key2",
},
}
for _, m := range messages {
res := t.Publish(ctx, &pubsub.Message{
Data: []byte(m.message),
OrderingKey: m.orderingKey,
})
wg.Add(1)
go func(res *pubsub.PublishResult) {
defer wg.Done()
// The Get method blocks until a server-generated ID or
// an error is returned for the published message.
_, err := res.Get(ctx)
if err != nil {
// Error handling code can be added here.
fmt.Printf("Failed to publish: %s\n", err)
atomic.AddUint64(&totalErrors, 1)
return
}
}(res)
}
wg.Wait()
if totalErrors > 0 {
fmt.Fprintf(w, "%d of 4 messages did not publish successfully", totalErrors)
return
}
fmt.Fprint(w, "Published 4 messages with ordering keys successfully\n")
}
Java
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档。
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
public class PublishWithOrderingKeys {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
// Choose an existing topic.
String topicId = "your-topic-id";
publishWithOrderingKeysExample(projectId, topicId);
}
public static void publishWithOrderingKeysExample(String projectId, String topicId)
throws IOException, InterruptedException {
TopicName topicName = TopicName.of(projectId, topicId);
// Create a publisher and set message ordering to true.
Publisher publisher =
Publisher.newBuilder(topicName)
// Sending messages to the same region ensures they are received in order
// even when multiple publishers are used.
.setEndpoint("us-east1-pubsub.googleapis.com:443")
.setEnableMessageOrdering(true)
.build();
try {
Map<String, String> messages = new HashMap<String, String>();
messages.put("message1", "key1");
messages.put("message2", "key2");
messages.put("message3", "key1");
messages.put("message4", "key2");
for (Map.Entry<String, String> entry : messages.entrySet()) {
ByteString data = ByteString.copyFromUtf8(entry.getKey());
PubsubMessage pubsubMessage =
PubsubMessage.newBuilder().setData(data).setOrderingKey(entry.getValue()).build();
ApiFuture<String> future = publisher.publish(pubsubMessage);
// Add an asynchronous callback to handle publish success / failure.
ApiFutures.addCallback(
future,
new ApiFutureCallback<String>() {
@Override
public void onFailure(Throwable throwable) {
if (throwable instanceof ApiException) {
ApiException apiException = ((ApiException) throwable);
// Details on the API exception.
System.out.println(apiException.getStatusCode().getCode());
System.out.println(apiException.isRetryable());
}
System.out.println("Error publishing message : " + pubsubMessage.getData());
}
@Override
public void onSuccess(String messageId) {
// Once published, returns server-assigned message ids (unique within the topic).
System.out.println(pubsubMessage.getData() + " : " + messageId);
}
},
MoreExecutors.directExecutor());
}
} finally {
// When finished with the publisher, shutdown to free up resources.
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
}
}
}
Node.js
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档。
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const topicName = 'YOUR_TOPIC_NAME';
// const data = JSON.stringify({foo: 'bar'});
// const orderingKey = 'key1';
// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');
// Creates a client; cache this for further use
const pubSubClient = new PubSub({
// Sending messages to the same region ensures they are received in order
// even when multiple publishers are used.
apiEndpoint: 'us-east1-pubsub.googleapis.com:443',
});
async function publishOrderedMessage() {
// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);
// Be sure to set an ordering key that matches other messages
// you want to receive in order, relative to each other.
const message = {
data: dataBuffer,
orderingKey: orderingKey,
};
// Publishes the message
const messageId = await pubSubClient
.topic(topicName, {enableMessageOrdering: true})
.publishMessage(message);
console.log(`Message ${messageId} published.`);
return messageId;
}
return await publishOrderedMessage();
Python
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档。
from google.cloud import pubsub_v1
# TODO(developer): Choose an existing topic.
# project_id = "your-project-id"
# topic_id = "your-topic-id"
publisher_options = pubsub_v1.types.PublisherOptions(enable_message_ordering=True)
# Sending messages to the same region ensures they are received in order
# even when multiple publishers are used.
client_options = {"api_endpoint": "us-east1-pubsub.googleapis.com:443"}
publisher = pubsub_v1.PublisherClient(
publisher_options=publisher_options, client_options=client_options
)
# The `topic_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/topics/{topic_id}`
topic_path = publisher.topic_path(project_id, topic_id)
for message in [
("message1", "key1"),
("message2", "key2"),
("message3", "key1"),
("message4", "key2"),
]:
# Data must be a bytestring
data = message[0].encode("utf-8")
ordering_key = message[1]
# When you publish a message, the client returns a future.
future = publisher.publish(topic_path, data=data, ordering_key=ordering_key)
print(future.result())
print(f"Published messages with ordering keys to {topic_path}.")
Ruby
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Ruby 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Ruby API 参考文档。
# topic_name = "Your Pubsub topic name"
require "google/cloud/pubsub"
pubsub = Google::Cloud::Pubsub.new
# Start sending messages in one request once the size of all queued messages
# reaches 1 MB or the number of queued messages reaches 20
topic = pubsub.topic topic_name, async: {
max_bytes: 1_000_000,
max_messages: 20
}
topic.enable_message_ordering!
10.times do |i|
topic.publish_async "This is message \##{i}.",
ordering_key: "ordering-key"
end
# Stop the async_publisher to send all queued messages immediately.
topic.async_publisher.stop!
puts "Messages published with ordering key."
批处理消息
Pub/Sub 客户端库会在对服务的单个调用中批处理多条消息。较大批次大小可提高消息吞吐量(每个 CPU 发送的消息速率)。批处理的代价是单条消息延迟,这些消息会在内存中排入队列,直到其相应的批次中消息达到一定数量并准备好通过网络发送。要最大程度地缩短延迟时间,应停用批处理功能。如果应用发布单条消息作为请求-响应序列的一部分,这一点尤为重要。这种模式的一个常见示例是使用 Cloud Functions 或 App Engine 的无服务器、事件驱动型应用。
消息可以根据请求大小(以字节为单位)、消息数量和时间分批。 您可以替换此示例中显示的默认设置:
C++
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C++ 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C++ API 参考文档。
namespace pubsub = google::cloud::pubsub;
using google::cloud::future;
using google::cloud::StatusOr;
[](std::string project_id, std::string topic_id) {
auto topic = pubsub::Topic(std::move(project_id), std::move(topic_id));
// By default the publisher will flush a batch after 10ms, after it contains
// more than 100 message, or after it contains more than 1MiB of data,
// whichever comes first. This changes those defaults.
auto publisher = pubsub::Publisher(pubsub::MakePublisherConnection(
std::move(topic),
pubsub::PublisherOptions{}
.set_maximum_hold_time(std::chrono::milliseconds(20))
.set_maximum_batch_bytes(4 * 1024 * 1024L)
.set_maximum_batch_message_count(200),
pubsub::ConnectionOptions{}));
std::vector<future<void>> ids;
for (char const* data : {"1", "2", "3", "go!"}) {
ids.push_back(
publisher.Publish(pubsub::MessageBuilder().SetData(data).Build())
.then([data](future<StatusOr<std::string>> f) {
auto s = f.get();
if (!s) return;
std::cout << "Sent '" << data << "' (" << *s << ")\n";
}));
}
publisher.Flush();
// Block until they are actually sent.
for (auto& id : ids) id.get();
}
C#
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C# 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub C# API 参考文档。
using Google.Api.Gax;
using Google.Cloud.PubSub.V1;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
public class PublishBatchedMessagesAsyncSample
{
public async Task<int> PublishBatchMessagesAsync(string projectId, string topicId, IEnumerable<string> messageTexts)
{
TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
// Default Settings:
// byteCountThreshold: 1000000
// elementCountThreshold: 100
// delayThreshold: 10 milliseconds
var customSettings = new PublisherClient.Settings
{
BatchingSettings = new BatchingSettings(
elementCountThreshold: 50,
byteCountThreshold: 10240,
delayThreshold: TimeSpan.FromMilliseconds(500))
};
PublisherClient publisher = await PublisherClient.CreateAsync(topicName, settings: customSettings);
int publishedMessageCount = 0;
var publishTasks = messageTexts.Select(async text =>
{
try
{
string message = await publisher.PublishAsync(text);
Console.WriteLine($"Published message {message}");
Interlocked.Increment(ref publishedMessageCount);
}
catch (Exception exception)
{
Console.WriteLine($"An error occurred when publishing message {text}: {exception.Message}");
}
});
await Task.WhenAll(publishTasks);
return publishedMessageCount;
}
}
Go
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Go 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Go API 参考文档。
import (
"context"
"fmt"
"io"
"strconv"
"time"
"cloud.google.com/go/pubsub"
)
func publishWithSettings(w io.Writer, projectID, topicID string) error {
// projectID := "my-project-id"
// topicID := "my-topic"
ctx := context.Background()
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
var results []*pubsub.PublishResult
var resultErrors []error
t := client.Topic(topicID)
t.PublishSettings.ByteThreshold = 5000
t.PublishSettings.CountThreshold = 10
t.PublishSettings.DelayThreshold = 100 * time.Millisecond
for i := 0; i < 10; i++ {
result := t.Publish(ctx, &pubsub.Message{
Data: []byte("Message " + strconv.Itoa(i)),
})
results = append(results, result)
}
// The Get method blocks until a server-generated ID or
// an error is returned for the published message.
for i, res := range results {
id, err := res.Get(ctx)
if err != nil {
resultErrors = append(resultErrors, err)
fmt.Fprintf(w, "Failed to publish: %v", err)
continue
}
fmt.Fprintf(w, "Published message %d; msg ID: %v\n", i, id)
}
if len(resultErrors) != 0 {
return fmt.Errorf("Get: %v", resultErrors[len(resultErrors)-1])
}
fmt.Fprintf(w, "Published messages with batch settings.")
return nil
}
Java
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档。
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.batching.BatchingSettings;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.threeten.bp.Duration;
public class PublishWithBatchSettingsExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
String topicId = "your-topic-id";
publishWithBatchSettingsExample(projectId, topicId);
}
public static void publishWithBatchSettingsExample(String projectId, String topicId)
throws IOException, ExecutionException, InterruptedException {
TopicName topicName = TopicName.of(projectId, topicId);
Publisher publisher = null;
List<ApiFuture<String>> messageIdFutures = new ArrayList<>();
try {
// Batch settings control how the publisher batches messages
long requestBytesThreshold = 5000L; // default : 1 byte
long messageCountBatchSize = 100L; // default : 1 message
Duration publishDelayThreshold = Duration.ofMillis(100); // default : 1 ms
// Publish request get triggered based on request size, messages count & time since last
// publish, whichever condition is met first.
BatchingSettings batchingSettings =
BatchingSettings.newBuilder()
.setElementCountThreshold(messageCountBatchSize)
.setRequestByteThreshold(requestBytesThreshold)
.setDelayThreshold(publishDelayThreshold)
.build();
// Create a publisher instance with default settings bound to the topic
publisher = Publisher.newBuilder(topicName).setBatchingSettings(batchingSettings).build();
// schedule publishing one message at a time : messages get automatically batched
for (int i = 0; i < 100; i++) {
String message = "message " + i;
ByteString data = ByteString.copyFromUtf8(message);
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
// Once published, returns a server-assigned message id (unique within the topic)
ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
messageIdFutures.add(messageIdFuture);
}
} finally {
// Wait on any pending publish requests.
List<String> messageIds = ApiFutures.allAsList(messageIdFutures).get();
System.out.println("Published " + messageIds.size() + " messages with batch settings.");
if (publisher != null) {
// When finished with the publisher, shutdown to free up resources.
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
}
}
}
}
Node.js
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档。
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const topicName = 'YOUR_TOPIC_NAME';
// const data = JSON.stringify({foo: 'bar'});
// const maxMessages = 10;
// const maxWaitTime = 10;
// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');
// Creates a client; cache this for further use
const pubSubClient = new PubSub();
async function publishBatchedMessages() {
// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);
const batchPublisher = pubSubClient.topic(topicName, {
batching: {
maxMessages: maxMessages,
maxMilliseconds: maxWaitTime * 1000,
},
});
for (let i = 0; i < 10; i++) {
(async () => {
const messageId = await batchPublisher.publish(dataBuffer);
console.log(`Message ${messageId} published.`);
})();
}
}
publishBatchedMessages().catch(console.error);
PHP
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 PHP 设置说明进行操作。如需了解详情,请参阅 Pub/Sub PHP API 参考文档。
use Google\Cloud\PubSub\PubSubClient;
/**
* Publishes a message for a Pub/Sub topic.
*
* The publisher should be used in conjunction with the `google-cloud-batch`
* daemon, which should be running in the background.
*
* To start the daemon, from your project root call `vendor/bin/google-cloud-batch daemon`.
*
* @param string $projectId The Google project ID.
* @param string $topicName The Pub/Sub topic name.
* @param string $message The message to publish.
*/
function publish_message_batch($projectId, $topicName, $message)
{
// Check if the batch daemon is running.
if (getenv('IS_BATCH_DAEMON_RUNNING') !== 'true') {
trigger_error(
'The batch daemon is not running. Call ' .
'`vendor/bin/google-cloud-batch daemon` from ' .
'your project root to start the daemon.',
E_USER_NOTICE
);
}
$batchOptions = [
'batchSize' => 100, // Max messages for each batch.
'callPeriod' => 0.01, // Max time in seconds between each batch publish.
];
$pubsub = new PubSubClient([
'projectId' => $projectId,
]);
$topic = $pubsub->topic($topicName);
$publisher = $topic->batchPublisher([
'batchOptions' => $batchOptions
]);
for ($i = 0; $i < 10; $i++) {
$publisher->publish(['data' => $message]);
}
print('Messages enqueued for publication.' . PHP_EOL);
}
Python
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档。
from google.cloud import pubsub_v1
# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# Configure the batch to publish as soon as there is ten messages,
# one kilobyte of data, or one second has passed.
batch_settings = pubsub_v1.types.BatchSettings(
max_messages=10, # default 100
max_bytes=1024, # default 1 MB
max_latency=1, # default 10 ms
)
publisher = pubsub_v1.PublisherClient(batch_settings)
topic_path = publisher.topic_path(project_id, topic_id)
# Resolve the publish future in a separate thread.
def callback(future):
message_id = future.result()
print(message_id)
for n in range(1, 10):
data = "Message number {}".format(n)
# Data must be a bytestring
data = data.encode("utf-8")
future = publisher.publish(topic_path, data)
# Non-blocking. Allow the publisher client to batch multiple messages.
future.add_done_callback(callback)
print(f"Published messages with batch settings to {topic_path}.")
Ruby
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Ruby 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Ruby API 参考文档。
# topic_name = "Your Pubsub topic name"
require "google/cloud/pubsub"
pubsub = Google::Cloud::Pubsub.new
# Start sending messages in one request once the size of all queued messages
# reaches 1 MB or the number of queued messages reaches 20
topic = pubsub.topic topic_name, async: {
max_bytes: 1_000_000,
max_messages: 20
}
10.times do |i|
topic.publish_async "This is message \##{i}."
end
# Stop the async_publisher to send all queued messages immediately.
topic.async_publisher.stop.wait!
puts "Messages published asynchronously in batch."
重试请求
如果发布失败,系统会自动重试,但无法保证能够重试的错误除外。此示例代码演示了如何使用自定义重试设置创建发布者(请注意,并非所有客户端库都支持自定义重试设置;请参阅适用于您所选语言的 API 参考文档):
C++
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C++ 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C++ API 参考文档。
namespace pubsub = google::cloud::pubsub;
using google::cloud::future;
using google::cloud::StatusOr;
[](std::string project_id, std::string topic_id) {
auto topic = pubsub::Topic(std::move(project_id), std::move(topic_id));
// By default a publisher will retry for 60 seconds, with an initial backoff
// of 100ms, a maximum backoff of 60 seconds, and the backoff will grow by
// 30% after each attempt. This changes those defaults.
auto publisher = pubsub::Publisher(pubsub::MakePublisherConnection(
std::move(topic), pubsub::PublisherOptions{}, {},
pubsub::LimitedTimeRetryPolicy(
/*maximum_duration=*/std::chrono::minutes(10))
.clone(),
pubsub::ExponentialBackoffPolicy(
/*initial_delay=*/std::chrono::milliseconds(200),
/*maximum_delay=*/std::chrono::seconds(45),
/*scaling=*/2.0)
.clone()));
std::vector<future<bool>> done;
for (char const* data : {"1", "2", "3", "go!"}) {
done.push_back(
publisher.Publish(pubsub::MessageBuilder().SetData(data).Build())
.then([](future<StatusOr<std::string>> f) {
return f.get().ok();
}));
}
publisher.Flush();
int count = 0;
for (auto& f : done) {
if (f.get()) ++count;
}
std::cout << count << " messages sent successfully\n";
}
C#
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C# 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub C# API 参考文档。
using Google.Api.Gax.Grpc;
using Google.Cloud.PubSub.V1;
using Grpc.Core;
using System;
using System.Threading.Tasks;
public class PublishMessageWithRetrySettingsAsyncSample
{
public async Task PublishMessageWithRetrySettingsAsync(string projectId, string topicId, string messageText)
{
TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
// Retry settings control how the publisher handles retry-able failures
var maxAttempts = 3;
var initialBackoff = TimeSpan.FromMilliseconds(110); // default: 100 ms
var maxBackoff = TimeSpan.FromSeconds(70); // default : 60 seconds
var backoffMultiplier = 1.3; // default: 1.0
var totalTimeout = TimeSpan.FromSeconds(100); // default: 600 seconds
var publisher = await PublisherClient.CreateAsync(topicName,
clientCreationSettings: new PublisherClient.ClientCreationSettings(
publisherServiceApiSettings: new PublisherServiceApiSettings
{
PublishSettings = CallSettings.FromRetry(RetrySettings.FromExponentialBackoff(
maxAttempts: maxAttempts,
initialBackoff: initialBackoff,
maxBackoff: maxBackoff,
backoffMultiplier: backoffMultiplier,
retryFilter: RetrySettings.FilterForStatusCodes(StatusCode.Unavailable)))
.WithTimeout(totalTimeout)
}
)).ConfigureAwait(false);
string message = await publisher.PublishAsync(messageText);
Console.WriteLine($"Published message {message}");
}
}
Java
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档。
import com.google.api.core.ApiFuture;
import com.google.api.gax.retrying.RetrySettings;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.threeten.bp.Duration;
public class PublishWithRetrySettingsExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
String topicId = "your-topic-id";
publishWithRetrySettingsExample(projectId, topicId);
}
public static void publishWithRetrySettingsExample(String projectId, String topicId)
throws IOException, ExecutionException, InterruptedException {
TopicName topicName = TopicName.of(projectId, topicId);
Publisher publisher = null;
try {
// Retry settings control how the publisher handles retry-able failures
Duration initialRetryDelay = Duration.ofMillis(100); // default: 100 ms
double retryDelayMultiplier = 2.0; // back off for repeated failures, default: 1.3
Duration maxRetryDelay = Duration.ofSeconds(60); // default : 60 seconds
Duration initialRpcTimeout = Duration.ofSeconds(1); // default: 5 seconds
double rpcTimeoutMultiplier = 1.0; // default: 1.0
Duration maxRpcTimeout = Duration.ofSeconds(600); // default: 600 seconds
Duration totalTimeout = Duration.ofSeconds(600); // default: 600 seconds
RetrySettings retrySettings =
RetrySettings.newBuilder()
.setInitialRetryDelay(initialRetryDelay)
.setRetryDelayMultiplier(retryDelayMultiplier)
.setMaxRetryDelay(maxRetryDelay)
.setInitialRpcTimeout(initialRpcTimeout)
.setRpcTimeoutMultiplier(rpcTimeoutMultiplier)
.setMaxRpcTimeout(maxRpcTimeout)
.setTotalTimeout(totalTimeout)
.build();
// Create a publisher instance with default settings bound to the topic
publisher = Publisher.newBuilder(topicName).setRetrySettings(retrySettings).build();
String message = "first message";
ByteString data = ByteString.copyFromUtf8(message);
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
// Once published, returns a server-assigned message id (unique within the topic)
ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
String messageId = messageIdFuture.get();
System.out.println("Published a message with retry settings: " + messageId);
} finally {
if (publisher != null) {
// When finished with the publisher, shutdown to free up resources.
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
}
}
}
}
Node.js
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档。
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const projectId = 'YOUR_PROJECT_ID'
// const topicName = 'YOUR_TOPIC_NAME';
// const data = JSON.stringify({foo: 'bar'});
// Imports the Google Cloud client library. v1 is for the lower level
// proto access.
const {v1} = require('@google-cloud/pubsub');
// Creates a publisher client.
const publisherClient = new v1.PublisherClient({
// optional auth parameters
});
async function publishWithRetrySettings() {
const formattedTopic = publisherClient.projectTopicPath(
projectId,
topicName
);
// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);
const messagesElement = {
data: dataBuffer,
};
const messages = [messagesElement];
// Build the request
const request = {
topic: formattedTopic,
messages: messages,
};
// Retry settings control how the publisher handles retryable failures
// Default values are shown
const retrySettings = {
retryCodes: [
10, // 'ABORTED'
1, // 'CANCELLED',
4, // 'DEADLINE_EXCEEDED'
13, // 'INTERNAL'
8, // 'RESOURCE_EXHAUSTED'
14, // 'UNAVAILABLE'
2, // 'UNKNOWN'
],
backoffSettings: {
// The initial delay time, in milliseconds, between the completion
// of the first failed request and the initiation of the first retrying request.
initialRetryDelayMillis: 100,
// The multiplier by which to increase the delay time between the completion
// of failed requests, and the initiation of the subsequent retrying request.
retryDelayMultiplier: 1.3,
// The maximum delay time, in milliseconds, between requests.
// When this value is reached, retryDelayMultiplier will no longer be used to increase delay time.
maxRetryDelayMillis: 60000,
// The initial timeout parameter to the request.
initialRpcTimeoutMillis: 5000,
// The multiplier by which to increase the timeout parameter between failed requests.
rpcTimeoutMultiplier: 1.0,
// The maximum timeout parameter, in milliseconds, for a request. When this value is reached,
// rpcTimeoutMultiplier will no longer be used to increase the timeout.
maxRpcTimeoutMillis: 600000,
// The total time, in milliseconds, starting from when the initial request is sent,
// after which an error will be returned, regardless of the retrying attempts made meanwhile.
totalTimeoutMillis: 600000,
},
};
const [response] = await publisherClient.publish(request, {
retry: retrySettings,
});
console.log(`Message ${response.messageIds} published.`);
}
publishWithRetrySettings().catch(console.error);
Python
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档。
from google import api_core
from google.cloud import pubsub_v1
# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# Configure the retry settings. Defaults shown in comments are values applied
# by the library by default, instead of default values in the Retry object.
custom_retry = api_core.retry.Retry(
initial=0.250, # seconds (default: 0.1)
maximum=90.0, # seconds (default: 60.0)
multiplier=1.45, # default: 1.3
deadline=300.0, # seconds (default: 60.0)
predicate=api_core.retry.if_exception_type(
api_core.exceptions.Aborted,
api_core.exceptions.DeadlineExceeded,
api_core.exceptions.InternalServerError,
api_core.exceptions.ResourceExhausted,
api_core.exceptions.ServiceUnavailable,
api_core.exceptions.Unknown,
api_core.exceptions.Cancelled,
),
)
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)
for n in range(1, 10):
data = "Message number {}".format(n)
# Data must be a bytestring
data = data.encode("utf-8")
future = publisher.publish(topic=topic_path, data=data, retry=custom_retry)
print(future.result())
print(f"Published messages with retry settings to {topic_path}.")
重试设置控制 Pub/Sub 客户端库重试发布请求的方式。客户端库具有以下任一重试设置:
- 初始请求超时:客户端库停止等待初始发布请求完成前的时间。
- 重试延迟:请求超时后,客户端库等待请求重试的时间。
- 总超时:客户端库停止重试发布请求前的时间。
如需重试发布请求,初始请求超时必须短于总超时。例如,如果您使用的是指数退避算法,客户端库会按如下所示计算请求超时和重试延迟:
- 每个发布请求后,请求超时会以请求超时倍数增加,最长为请求超时上限。
- 每次重试后,重试延迟时间会以重试延迟倍数增加,最长为重试延迟上限。
当客户端库重试某个请求且消息带有排序键时,无论重试设置如何,客户端库都会不断重试该请求。
如果出现不可重试的错误,则客户端库不会发布消息,并会停止发布带有相同排序键的其他消息。例如,当发布者将消息发送到不存在的主题时,即会发生不可重试的错误。要继续发布带用相同排序键的消息,请调用一个方法以继续发布,然后开始重新发布。
以下示例显示了如何继续发布带有相同排序键的消息。
C++
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C++ 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C++ API 参考文档。
namespace pubsub = google::cloud::pubsub;
using google::cloud::future;
using google::cloud::StatusOr;
[](pubsub::Publisher publisher) {
struct SampleData {
std::string ordering_key;
std::string data;
} data[] = {
{"key1", "message1"}, {"key2", "message2"}, {"key1", "message3"},
{"key1", "message4"}, {"key1", "message5"},
};
std::vector<future<void>> done;
for (auto const& datum : data) {
auto const& da = datum; // workaround MSVC lambda capture confusion
auto handler = [da, publisher](future<StatusOr<std::string>> f) mutable {
auto const msg = da.ordering_key + "#" + da.data;
auto id = f.get();
if (!id) {
std::cout << "An error has occurred publishing " << msg << "\n";
publisher.ResumePublish(da.ordering_key);
return;
}
std::cout << "Message " << msg << " published as id=" << *id << "\n";
};
done.push_back(
publisher
.Publish(pubsub::MessageBuilder{}
.SetData("Hello World! [" + datum.data + "]")
.SetOrderingKey(datum.ordering_key)
.Build())
.then(handler));
}
publisher.Flush();
// Block until all the messages are published (optional)
for (auto& f : done) f.get();
}
C#
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C# 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub C# API 参考文档。
using Google.Cloud.PubSub.V1;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
public class ResumePublishSample
{
public async Task<int> PublishOrderedMessagesAsync(string projectId, string topicId, IEnumerable<(string, string)> keysAndMessages)
{
TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
var customSettings = new PublisherClient.Settings
{
EnableMessageOrdering = true
};
PublisherClient publisher = await PublisherClient.CreateAsync(topicName, settings: customSettings);
int publishedMessageCount = 0;
var publishTasks = keysAndMessages.Select(async keyAndMessage =>
{
try
{
string message = await publisher.PublishAsync(keyAndMessage.Item1, keyAndMessage.Item2);
Console.WriteLine($"Published message {message}");
Interlocked.Increment(ref publishedMessageCount);
}
catch (Exception exception)
{
Console.WriteLine($"An error occurred when publishing message {keyAndMessage.Item2}: {exception.Message}");
publisher.ResumePublish(keyAndMessage.Item1);
}
});
await Task.WhenAll(publishTasks);
return publishedMessageCount;
}
}
Go
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Go 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Go API 参考文档。
import (
"context"
"fmt"
"io"
"cloud.google.com/go/pubsub"
"google.golang.org/api/option"
)
func resumePublishWithOrderingKey(w io.Writer, projectID, topicID string) {
// projectID := "my-project-id"
// topicID := "my-topic"
ctx := context.Background()
// Sending messages to the same region ensures they are received in order
// even when multiple publishers are used.
client, err := pubsub.NewClient(ctx, projectID,
option.WithEndpoint("us-east1-pubsub.googleapis.com:443"))
if err != nil {
fmt.Fprintf(w, "pubsub.NewClient: %v", err)
return
}
defer client.Close()
t := client.Topic(topicID)
t.EnableMessageOrdering = true
key := "some-ordering-key"
res := t.Publish(ctx, &pubsub.Message{
Data: []byte("some-message"),
OrderingKey: key,
})
_, err = res.Get(ctx)
if err != nil {
// Error handling code can be added here.
fmt.Printf("Failed to publish: %s\n", err)
// Resume publish on an ordering key that has had unrecoverable errors.
// After such an error publishes with this ordering key will fail
// until this method is called.
t.ResumePublish(key)
}
fmt.Fprint(w, "Published a message with ordering key successfully\n")
}
Java
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档。
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
public class ResumePublishWithOrderingKeys {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
// Choose an existing topic.
String topicId = "your-topic-id";
resumePublishWithOrderingKeysExample(projectId, topicId);
}
public static void resumePublishWithOrderingKeysExample(String projectId, String topicId)
throws IOException, InterruptedException {
TopicName topicName = TopicName.of(projectId, topicId);
// Create a publisher and set message ordering to true.
Publisher publisher =
Publisher.newBuilder(topicName)
.setEnableMessageOrdering(true)
.setEndpoint("us-east1-pubsub.googleapis.com:443")
.build();
try {
Map<String, String> messages = new HashMap<String, String>();
messages.put("message1", "key1");
messages.put("message2", "key2");
messages.put("message3", "key1");
messages.put("message4", "key2");
for (Map.Entry<String, String> entry : messages.entrySet()) {
ByteString data = ByteString.copyFromUtf8(entry.getKey());
PubsubMessage pubsubMessage =
PubsubMessage.newBuilder().setData(data).setOrderingKey(entry.getValue()).build();
ApiFuture<String> future = publisher.publish(pubsubMessage);
// Add an asynchronous callback to handle publish success / failure.
ApiFutures.addCallback(
future,
new ApiFutureCallback<String>() {
@Override
public void onFailure(Throwable throwable) {
if (throwable instanceof ApiException) {
ApiException apiException = ((ApiException) throwable);
// Details on the API exception.
System.out.println(apiException.getStatusCode().getCode());
System.out.println(apiException.isRetryable());
}
System.out.println("Error publishing message : " + pubsubMessage.getData());
// (Beta) Must call resumePublish to reset key and continue publishing with order.
publisher.resumePublish(pubsubMessage.getOrderingKey());
}
@Override
public void onSuccess(String messageId) {
// Once published, returns server-assigned message ids (unique within the topic).
System.out.println(pubsubMessage.getData() + " : " + messageId);
}
},
MoreExecutors.directExecutor());
}
} finally {
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
}
}
}
Node.js
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档。
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const topicName = 'YOUR_TOPIC_NAME';
// const data = JSON.stringify({foo: 'bar'});
// const orderingKey = 'key1';
// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');
// Creates a client; cache this for further use
const pubSubClient = new PubSub();
async function resumePublish() {
// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);
// Publishes the message
const publisher = pubSubClient.topic(topicName, {
enableMessageOrdering: true,
});
try {
const message = {
data: dataBuffer,
orderingKey: orderingKey,
};
const messageId = await publisher.publishMessage(message);
console.log(`Message ${messageId} published.`);
return messageId;
} catch (e) {
console.log(`Could not publish: ${e}`);
publisher.resumePublishing(orderingKey);
return null;
}
}
return await resumePublish();
Python
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档。
from google.cloud import pubsub_v1
# TODO(developer): Choose an existing topic.
# project_id = "your-project-id"
# topic_id = "your-topic-id"
publisher_options = pubsub_v1.types.PublisherOptions(enable_message_ordering=True)
# Sending messages to the same region ensures they are received in order
# even when multiple publishers are used.
client_options = {"api_endpoint": "us-east1-pubsub.googleapis.com:443"}
publisher = pubsub_v1.PublisherClient(
publisher_options=publisher_options, client_options=client_options
)
# The `topic_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/topics/{topic_id}`
topic_path = publisher.topic_path(project_id, topic_id)
for message in [
("message1", "key1"),
("message2", "key2"),
("message3", "key1"),
("message4", "key2"),
]:
# Data must be a bytestring
data = message[0].encode("utf-8")
ordering_key = message[1]
# When you publish a message, the client returns a future.
future = publisher.publish(topic_path, data=data, ordering_key=ordering_key)
try:
print(future.result())
except RuntimeError:
# Resume publish on an ordering key that has had unrecoverable errors.
publisher.resume_publish(topic_path, ordering_key)
print(f"Resumed publishing messages with ordering keys to {topic_path}.")
Ruby
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Ruby 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Ruby API 参考文档。
# topic_name = "Your Pubsub topic name"
require "google/cloud/pubsub"
pubsub = Google::Cloud::Pubsub.new
# Start sending messages in one request once the size of all queued messages
# reaches 1 MB or the number of queued messages reaches 20
topic = pubsub.topic topic_name, async: {
max_bytes: 1_000_000,
max_messages: 20
}
topic.enable_message_ordering!
10.times do |i|
topic.publish_async "This is message \##{i}.",
ordering_key: "ordering-key" do |result|
if result.succeeded?
puts "Message \##{i} successfully published."
else
puts "Message \##{i} failed to publish"
# Allow publishing to continue on "ordering-key" after processing the
# failure.
topic.resume_publish "ordering-key"
end
end
end
# Stop the async_publisher to send all queued messages immediately.
topic.async_publisher.stop!
并发控制
是否支持并发取决于您使用的编程语言。如需了解详情,请参阅 API 参考文档。
以下示例说明了如何控制发布者并发:
C++
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C++ 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C++ API 参考文档。
namespace pubsub = google::cloud::pubsub;
using google::cloud::future;
using google::cloud::StatusOr;
[](std::string project_id, std::string topic_id) {
auto topic = pubsub::Topic(std::move(project_id), std::move(topic_id));
// Override the default number of background (I/O) threads. By default the
// library uses `std::thread::hardware_concurrency()` threads.
auto options =
pubsub::ConnectionOptions{}.set_background_thread_pool_size(8);
auto publisher = pubsub::Publisher(pubsub::MakePublisherConnection(
std::move(topic), pubsub::PublisherOptions{}, std::move(options)));
std::vector<future<void>> ids;
for (char const* data : {"1", "2", "3", "go!"}) {
ids.push_back(
publisher.Publish(pubsub::MessageBuilder().SetData(data).Build())
.then([data](future<StatusOr<std::string>> f) {
auto s = f.get();
if (!s) return;
std::cout << "Sent '" << data << "' (" << *s << ")\n";
}));
}
publisher.Flush();
// Block until they are actually sent.
for (auto& id : ids) id.get();
}
Go
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Go 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Go API 参考文档。
import (
"context"
"fmt"
"io"
"cloud.google.com/go/pubsub"
)
func publishSingleGoroutine(w io.Writer, projectID, topicID, msg string) error {
// projectID := "my-project-id"
// topicID := "my-topic"
// msg := "Hello World"
ctx := context.Background()
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
t := client.Topic(topicID)
t.PublishSettings.NumGoroutines = 1
result := t.Publish(ctx, &pubsub.Message{Data: []byte(msg)})
// Block until the result is returned and a server-generated
// ID is returned for the published message.
id, err := result.Get(ctx)
if err != nil {
return fmt.Errorf("Get: %v", err)
}
fmt.Fprintf(w, "Published a message; msg ID: %v\n", id)
return nil
}
Java
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档。
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.InstantiatingExecutorProvider;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
public class PublishWithConcurrencyControlExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
String topicId = "your-topic-id";
publishWithConcurrencyControlExample(projectId, topicId);
}
public static void publishWithConcurrencyControlExample(String projectId, String topicId)
throws IOException, ExecutionException, InterruptedException {
TopicName topicName = TopicName.of(projectId, topicId);
Publisher publisher = null;
List<ApiFuture<String>> messageIdFutures = new ArrayList<>();
try {
// Provides an executor service for processing messages. The default
// `executorProvider` used by the publisher has a default thread count of
// 5 * the number of processors available to the Java virtual machine.
ExecutorProvider executorProvider =
InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(4).build();
// `setExecutorProvider` configures an executor for the publisher.
publisher = Publisher.newBuilder(topicName).setExecutorProvider(executorProvider).build();
// schedule publishing one message at a time : messages get automatically batched
for (int i = 0; i < 100; i++) {
String message = "message " + i;
ByteString data = ByteString.copyFromUtf8(message);
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
// Once published, returns a server-assigned message id (unique within the topic)
ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
messageIdFutures.add(messageIdFuture);
}
} finally {
// Wait on any pending publish requests.
List<String> messageIds = ApiFutures.allAsList(messageIdFutures).get();
System.out.println("Published " + messageIds.size() + " messages with concurrency control.");
if (publisher != null) {
// When finished with the publisher, shutdown to free up resources.
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
}
}
}
}
Ruby
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Ruby 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Ruby API 参考文档。
# topic_name = "Your Pubsub topic name"
require "google/cloud/pubsub"
pubsub = Google::Cloud::Pubsub.new
topic = pubsub.topic topic_name, async: {
threads: {
# Use exactly one thread for publishing message and exactly one thread
# for executing callbacks
publish: 1,
callback: 1
}
}
topic.publish_async "This is a test message." do |result|
raise "Failed to publish the message." unless result.succeeded?
puts "Message published asynchronously."
end
# Stop the async_publisher to send all queued messages immediately.
topic.async_publisher.stop.wait!