批量消息传递会创建具有自定义批处理设置的发布者客户端,并使用它来发布一些消息。
本文档介绍了如何对发布到主题的消息使用批量消息传递。
准备工作
在配置发布工作流之前,请确保您已完成以下任务:
所需的角色
如需获取将消息发布到主题所需的权限,请让管理员授予您对该主题的 Pub/Sub Publisher (roles/pubsub.publisher
) IAM 角色。如需详细了解如何授予角色,请参阅管理访问权限。
您需要其他权限才能创建或更新主题和订阅。
使用批量消息传递功能
请参阅以下代码示例,了解如何为发布商配置批量消息传递设置。
C++
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C++ 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C++ API 参考文档。
namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::Options;
using ::google::cloud::StatusOr;
[](std::string project_id, std::string topic_id) {
auto topic = pubsub::Topic(std::move(project_id), std::move(topic_id));
// By default, the publisher will flush a batch after 10ms, after it
// contains more than 100 message, or after it contains more than 1MiB of
// data, whichever comes first. This changes those defaults.
auto publisher = pubsub::Publisher(pubsub::MakePublisherConnection(
std::move(topic),
Options{}
.set<pubsub::MaxHoldTimeOption>(std::chrono::milliseconds(20))
.set<pubsub::MaxBatchBytesOption>(4 * 1024 * 1024L)
.set<pubsub::MaxBatchMessagesOption>(200)));
std::vector<future<void>> ids;
for (char const* data : {"1", "2", "3", "go!"}) {
ids.push_back(
publisher.Publish(pubsub::MessageBuilder().SetData(data).Build())
.then([data](future<StatusOr<std::string>> f) {
auto s = f.get();
if (!s) return;
std::cout << "Sent '" << data << "' (" << *s << ")\n";
}));
}
publisher.Flush();
// Block until they are actually sent.
for (auto& id : ids) id.get();
}
C#
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C# 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub C# API 参考文档。
using Google.Api.Gax;
using Google.Cloud.PubSub.V1;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
public class PublishBatchedMessagesAsyncSample
{
public async Task<int> PublishBatchMessagesAsync(string projectId, string topicId, IEnumerable<string> messageTexts)
{
TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
// Default Settings:
// byteCountThreshold: 1000000
// elementCountThreshold: 100
// delayThreshold: 10 milliseconds
var customSettings = new PublisherClient.Settings
{
BatchingSettings = new BatchingSettings(
elementCountThreshold: 50,
byteCountThreshold: 10240,
delayThreshold: TimeSpan.FromMilliseconds(500))
};
PublisherClient publisher = await new PublisherClientBuilder
{
TopicName = topicName,
Settings = customSettings
}.BuildAsync();
int publishedMessageCount = 0;
var publishTasks = messageTexts.Select(async text =>
{
try
{
string message = await publisher.PublishAsync(text);
Console.WriteLine($"Published message {message}");
Interlocked.Increment(ref publishedMessageCount);
}
catch (Exception exception)
{
Console.WriteLine($"An error occurred when publishing message {text}: {exception.Message}");
}
});
await Task.WhenAll(publishTasks);
return publishedMessageCount;
}
}
Go
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Go 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Go API 参考文档。
import (
"context"
"fmt"
"io"
"strconv"
"time"
"cloud.google.com/go/pubsub"
)
func publishWithSettings(w io.Writer, projectID, topicID string) error {
// projectID := "my-project-id"
// topicID := "my-topic"
ctx := context.Background()
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %w", err)
}
defer client.Close()
var results []*pubsub.PublishResult
var resultErrors []error
t := client.Topic(topicID)
t.PublishSettings.ByteThreshold = 5000
t.PublishSettings.CountThreshold = 10
t.PublishSettings.DelayThreshold = 100 * time.Millisecond
for i := 0; i < 10; i++ {
result := t.Publish(ctx, &pubsub.Message{
Data: []byte("Message " + strconv.Itoa(i)),
})
results = append(results, result)
}
// The Get method blocks until a server-generated ID or
// an error is returned for the published message.
for i, res := range results {
id, err := res.Get(ctx)
if err != nil {
resultErrors = append(resultErrors, err)
fmt.Fprintf(w, "Failed to publish: %v", err)
continue
}
fmt.Fprintf(w, "Published message %d; msg ID: %v\n", i, id)
}
if len(resultErrors) != 0 {
return fmt.Errorf("Get: %v", resultErrors[len(resultErrors)-1])
}
fmt.Fprintf(w, "Published messages with batch settings.")
return nil
}
Java
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档。
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.batching.BatchingSettings;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.threeten.bp.Duration;
public class PublishWithBatchSettingsExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
String topicId = "your-topic-id";
publishWithBatchSettingsExample(projectId, topicId);
}
public static void publishWithBatchSettingsExample(String projectId, String topicId)
throws IOException, ExecutionException, InterruptedException {
TopicName topicName = TopicName.of(projectId, topicId);
Publisher publisher = null;
List<ApiFuture<String>> messageIdFutures = new ArrayList<>();
try {
// Batch settings control how the publisher batches messages
long requestBytesThreshold = 5000L; // default : 1000 bytes
long messageCountBatchSize = 100L; // default : 100 message
Duration publishDelayThreshold = Duration.ofMillis(100); // default : 1 ms
// Publish request get triggered based on request size, messages count & time since last
// publish, whichever condition is met first.
BatchingSettings batchingSettings =
BatchingSettings.newBuilder()
.setElementCountThreshold(messageCountBatchSize)
.setRequestByteThreshold(requestBytesThreshold)
.setDelayThreshold(publishDelayThreshold)
.build();
// Create a publisher instance with default settings bound to the topic
publisher = Publisher.newBuilder(topicName).setBatchingSettings(batchingSettings).build();
// schedule publishing one message at a time : messages get automatically batched
for (int i = 0; i < 100; i++) {
String message = "message " + i;
ByteString data = ByteString.copyFromUtf8(message);
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
// Once published, returns a server-assigned message id (unique within the topic)
ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
messageIdFutures.add(messageIdFuture);
}
} finally {
// Wait on any pending publish requests.
List<String> messageIds = ApiFutures.allAsList(messageIdFutures).get();
System.out.println("Published " + messageIds.size() + " messages with batch settings.");
if (publisher != null) {
// When finished with the publisher, shutdown to free up resources.
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
}
}
}
}
Node.js
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档。
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const topicName = 'YOUR_TOPIC_NAME';
// const data = JSON.stringify({foo: 'bar'});
// const maxMessages = 10;
// const maxWaitTime = 10;
// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');
// Creates a client; cache this for further use
const pubSubClient = new PubSub();
async function publishBatchedMessages(
topicNameOrId,
data,
maxMessages,
maxWaitTime
) {
// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);
const publishOptions = {
batching: {
maxMessages: maxMessages,
maxMilliseconds: maxWaitTime * 1000,
},
};
const batchPublisher = pubSubClient.topic(topicNameOrId, publishOptions);
const promises = [];
for (let i = 0; i < 10; i++) {
promises.push(
(async () => {
const messageId = await batchPublisher.publishMessage({
data: dataBuffer,
});
console.log(`Message ${messageId} published.`);
})()
);
}
await Promise.all(promises);
}
Node.js
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档。
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const topicName = 'YOUR_TOPIC_NAME';
// const data = JSON.stringify({foo: 'bar'});
// const maxMessages = 10;
// const maxWaitTime = 10;
// Imports the Google Cloud client library
import {PublishOptions, PubSub} from '@google-cloud/pubsub';
// Creates a client; cache this for further use
const pubSubClient = new PubSub();
async function publishBatchedMessages(
topicNameOrId: string,
data: string,
maxMessages: number,
maxWaitTime: number
) {
// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);
const publishOptions: PublishOptions = {
batching: {
maxMessages: maxMessages,
maxMilliseconds: maxWaitTime * 1000,
},
};
const batchPublisher = pubSubClient.topic(topicNameOrId, publishOptions);
const promises: Promise<void>[] = [];
for (let i = 0; i < 10; i++) {
promises.push(
(async () => {
const messageId = await batchPublisher.publishMessage({
data: dataBuffer,
});
console.log(`Message ${messageId} published.`);
})()
);
}
await Promise.all(promises);
}
PHP
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 PHP 设置说明进行操作。如需了解详情,请参阅 Pub/Sub PHP API 参考文档。
use Google\Cloud\PubSub\PubSubClient;
/**
* Publishes a message for a Pub/Sub topic.
*
* The publisher should be used in conjunction with the `google-cloud-batch`
* daemon, which should be running in the background.
*
* To start the daemon, from your project root call `vendor/bin/google-cloud-batch daemon`.
*
* @param string $projectId The Google project ID.
* @param string $topicName The Pub/Sub topic name.
* @param string $message The message to publish.
*/
function publish_message_batch($projectId, $topicName, $message)
{
// Check if the batch daemon is running.
if (getenv('IS_BATCH_DAEMON_RUNNING') !== 'true') {
trigger_error(
'The batch daemon is not running. Call ' .
'`vendor/bin/google-cloud-batch daemon` from ' .
'your project root to start the daemon.',
E_USER_NOTICE
);
}
$batchOptions = [
'batchSize' => 100, // Max messages for each batch.
'callPeriod' => 0.01, // Max time in seconds between each batch publish.
];
$pubsub = new PubSubClient([
'projectId' => $projectId,
]);
$topic = $pubsub->topic($topicName);
$publisher = $topic->batchPublisher([
'batchOptions' => $batchOptions
]);
for ($i = 0; $i < 10; $i++) {
$publisher->publish(['data' => $message]);
}
print('Messages enqueued for publication.' . PHP_EOL);
}
Python
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档。
from concurrent import futures
from google.cloud import pubsub_v1
# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# Configure the batch to publish as soon as there are 10 messages
# or 1 KiB of data, or 1 second has passed.
batch_settings = pubsub_v1.types.BatchSettings(
max_messages=10, # default 100
max_bytes=1024, # default 1 MB
max_latency=1, # default 10 ms
)
publisher = pubsub_v1.PublisherClient(batch_settings)
topic_path = publisher.topic_path(project_id, topic_id)
publish_futures = []
# Resolve the publish future in a separate thread.
def callback(future: pubsub_v1.publisher.futures.Future) -> None:
message_id = future.result()
print(message_id)
for n in range(1, 10):
data_str = f"Message number {n}"
# Data must be a bytestring
data = data_str.encode("utf-8")
publish_future = publisher.publish(topic_path, data)
# Non-blocking. Allow the publisher client to batch multiple messages.
publish_future.add_done_callback(callback)
publish_futures.append(publish_future)
futures.wait(publish_futures, return_when=futures.ALL_COMPLETED)
print(f"Published messages with batch settings to {topic_path}.")
Ruby
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Ruby 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Ruby API 参考文档。
# topic_id = "your-topic-id"
pubsub = Google::Cloud::Pubsub.new
# Start sending messages in one request once the size of all queued messages
# reaches 1 MB or the number of queued messages reaches 20
topic = pubsub.topic topic_id, async: {
max_bytes: 1_000_000,
max_messages: 20
}
10.times do |i|
topic.publish_async "This is message ##{i}."
end
# Stop the async_publisher to send all queued messages immediately.
topic.async_publisher.stop.wait!
puts "Messages published asynchronously in batch."
停用批量消息传递
如需在客户端库中关闭批处理功能,请将 max_messages
的值设置为 1。
批量消息传递和有序传送
在有序传送的情况下,未能确认批次中的任何消息意味着该批次中的所有消息(包括在消息未确认之前发送的消息)都将重新传送。
批量消息传递的配额和限制
在配置批量消息传递之前,请考虑发布吞吐量配额和批次大小上限等因素的影响。高级别客户端库可确保批量请求不超出指定限制。
- 1000 字节是出于费用考虑考虑的最小请求大小,即使实际消息大小可能小于 1000 字节也是如此。
- 对于单个批量发布请求,Pub/Sub 的大小上限为 10 MB,或上限为 1,000 条消息。
如需了解详情,请参阅 Pub/Sub 配额和限制。
后续步骤
如需了解如何配置高级发布选项,请参阅以下内容: