일괄 메시징은 커스텀 일괄 처리 설정으로 게시자 클라이언트를 만들고 이를 사용하여 일부 메시지를 게시합니다.
이 문서에서는 주제에 게시된 메시지에 일괄 메시징을 사용하는 방법을 설명합니다.
시작하기 전에
게시 워크플로를 구성하기 전에 다음 작업이 완료되어야 합니다.
- 주제 및 게시 워크플로 자세히 알아보기
- 주제 만들기
필요한 역할
주제에 메시지를 게시하는 데 필요한 권한을 얻으려면 관리자에게 주제에 대한 Pub/Sub 게시자(roles/pubsub.publisher
) IAM 역할을 부여해 달라고 요청하세요.
역할 부여에 대한 자세한 내용은 액세스 관리를 참조하세요.
커스텀 역할이나 다른 사전 정의된 역할을 통해 필요한 권한을 얻을 수도 있습니다.
주제 및 구독을 만들거나 업데이트하려면 추가 권한이 필요합니다.
일괄 메시징 사용
게시자의 메시지 일괄 처리 설정을 구성하는 방법은 다음 코드 샘플을 참조하세요.
C++
이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 C++ 설정 안내를 따르세요. 자세한 내용은 Pub/Sub C++ API 참고 문서를 확인하세요.
namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::Options;
using ::google::cloud::StatusOr;
[](std::string project_id, std::string topic_id) {
auto topic = pubsub::Topic(std::move(project_id), std::move(topic_id));
// By default, the publisher will flush a batch after 10ms, after it
// contains more than 100 message, or after it contains more than 1MiB of
// data, whichever comes first. This changes those defaults.
auto publisher = pubsub::Publisher(pubsub::MakePublisherConnection(
std::move(topic),
Options{}
.set<pubsub::MaxHoldTimeOption>(std::chrono::milliseconds(20))
.set<pubsub::MaxBatchBytesOption>(4 * 1024 * 1024L)
.set<pubsub::MaxBatchMessagesOption>(200)));
std::vector<future<void>> ids;
for (char const* data : {"1", "2", "3", "go!"}) {
ids.push_back(
publisher.Publish(pubsub::MessageBuilder().SetData(data).Build())
.then([data](future<StatusOr<std::string>> f) {
auto s = f.get();
if (!s) return;
std::cout << "Sent '" << data << "' (" << *s << ")\n";
}));
}
publisher.Flush();
// Block until they are actually sent.
for (auto& id : ids) id.get();
}
C#
이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 C# 설정 안내를 따르세요. 자세한 내용은 Pub/Sub C# API 참고 문서를 확인하세요.
using Google.Api.Gax;
using Google.Cloud.PubSub.V1;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
public class PublishBatchedMessagesAsyncSample
{
public async Task<int> PublishBatchMessagesAsync(string projectId, string topicId, IEnumerable<string> messageTexts)
{
TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
// Default Settings:
// byteCountThreshold: 1000000
// elementCountThreshold: 100
// delayThreshold: 10 milliseconds
var customSettings = new PublisherClient.Settings
{
BatchingSettings = new BatchingSettings(
elementCountThreshold: 50,
byteCountThreshold: 10240,
delayThreshold: TimeSpan.FromMilliseconds(500))
};
PublisherClient publisher = await new PublisherClientBuilder
{
TopicName = topicName,
Settings = customSettings
}.BuildAsync();
int publishedMessageCount = 0;
var publishTasks = messageTexts.Select(async text =>
{
try
{
string message = await publisher.PublishAsync(text);
Console.WriteLine($"Published message {message}");
Interlocked.Increment(ref publishedMessageCount);
}
catch (Exception exception)
{
Console.WriteLine($"An error occurred when publishing message {text}: {exception.Message}");
}
});
await Task.WhenAll(publishTasks);
return publishedMessageCount;
}
}
Go
이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Go 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Go API 참고 문서를 참조하세요.
import (
"context"
"fmt"
"io"
"strconv"
"time"
"cloud.google.com/go/pubsub"
)
func publishWithSettings(w io.Writer, projectID, topicID string) error {
// projectID := "my-project-id"
// topicID := "my-topic"
ctx := context.Background()
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %w", err)
}
defer client.Close()
var results []*pubsub.PublishResult
var resultErrors []error
t := client.Topic(topicID)
t.PublishSettings.ByteThreshold = 5000
t.PublishSettings.CountThreshold = 10
t.PublishSettings.DelayThreshold = 100 * time.Millisecond
for i := 0; i < 10; i++ {
result := t.Publish(ctx, &pubsub.Message{
Data: []byte("Message " + strconv.Itoa(i)),
})
results = append(results, result)
}
// The Get method blocks until a server-generated ID or
// an error is returned for the published message.
for i, res := range results {
id, err := res.Get(ctx)
if err != nil {
resultErrors = append(resultErrors, err)
fmt.Fprintf(w, "Failed to publish: %v", err)
continue
}
fmt.Fprintf(w, "Published message %d; msg ID: %v\n", i, id)
}
if len(resultErrors) != 0 {
return fmt.Errorf("Get: %v", resultErrors[len(resultErrors)-1])
}
fmt.Fprintf(w, "Published messages with batch settings.")
return nil
}
Java
이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 자바 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Java API 참고 문서를 참조하세요.
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.batching.BatchingSettings;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.threeten.bp.Duration;
public class PublishWithBatchSettingsExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
String topicId = "your-topic-id";
publishWithBatchSettingsExample(projectId, topicId);
}
public static void publishWithBatchSettingsExample(String projectId, String topicId)
throws IOException, ExecutionException, InterruptedException {
TopicName topicName = TopicName.of(projectId, topicId);
Publisher publisher = null;
List<ApiFuture<String>> messageIdFutures = new ArrayList<>();
try {
// Batch settings control how the publisher batches messages
long requestBytesThreshold = 5000L; // default : 1000 bytes
long messageCountBatchSize = 100L; // default : 100 message
Duration publishDelayThreshold = Duration.ofMillis(100); // default : 1 ms
// Publish request get triggered based on request size, messages count & time since last
// publish, whichever condition is met first.
BatchingSettings batchingSettings =
BatchingSettings.newBuilder()
.setElementCountThreshold(messageCountBatchSize)
.setRequestByteThreshold(requestBytesThreshold)
.setDelayThreshold(publishDelayThreshold)
.build();
// Create a publisher instance with default settings bound to the topic
publisher = Publisher.newBuilder(topicName).setBatchingSettings(batchingSettings).build();
// schedule publishing one message at a time : messages get automatically batched
for (int i = 0; i < 100; i++) {
String message = "message " + i;
ByteString data = ByteString.copyFromUtf8(message);
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
// Once published, returns a server-assigned message id (unique within the topic)
ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
messageIdFutures.add(messageIdFuture);
}
} finally {
// Wait on any pending publish requests.
List<String> messageIds = ApiFutures.allAsList(messageIdFutures).get();
System.out.println("Published " + messageIds.size() + " messages with batch settings.");
if (publisher != null) {
// When finished with the publisher, shutdown to free up resources.
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
}
}
}
}
Node.js
이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Node.js 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Node.js API 참고 문서를 참조하세요.
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const topicName = 'YOUR_TOPIC_NAME';
// const data = JSON.stringify({foo: 'bar'});
// const maxMessages = 10;
// const maxWaitTime = 10;
// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');
// Creates a client; cache this for further use
const pubSubClient = new PubSub();
async function publishBatchedMessages(
topicNameOrId,
data,
maxMessages,
maxWaitTime
) {
// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);
const publishOptions = {
batching: {
maxMessages: maxMessages,
maxMilliseconds: maxWaitTime * 1000,
},
};
const batchPublisher = pubSubClient.topic(topicNameOrId, publishOptions);
const promises = [];
for (let i = 0; i < 10; i++) {
promises.push(
(async () => {
const messageId = await batchPublisher.publishMessage({
data: dataBuffer,
});
console.log(`Message ${messageId} published.`);
})()
);
}
await Promise.all(promises);
}
Node.js
이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Node.js 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Node.js API 참고 문서를 참조하세요.
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const topicName = 'YOUR_TOPIC_NAME';
// const data = JSON.stringify({foo: 'bar'});
// const maxMessages = 10;
// const maxWaitTime = 10;
// Imports the Google Cloud client library
import {PublishOptions, PubSub} from '@google-cloud/pubsub';
// Creates a client; cache this for further use
const pubSubClient = new PubSub();
async function publishBatchedMessages(
topicNameOrId: string,
data: string,
maxMessages: number,
maxWaitTime: number
) {
// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);
const publishOptions: PublishOptions = {
batching: {
maxMessages: maxMessages,
maxMilliseconds: maxWaitTime * 1000,
},
};
const batchPublisher = pubSubClient.topic(topicNameOrId, publishOptions);
const promises: Promise<void>[] = [];
for (let i = 0; i < 10; i++) {
promises.push(
(async () => {
const messageId = await batchPublisher.publishMessage({
data: dataBuffer,
});
console.log(`Message ${messageId} published.`);
})()
);
}
await Promise.all(promises);
}
PHP
이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 PHP 설정 안내를 따르세요. 자세한 내용은 Pub/Sub PHP API 참고 문서를 참조하세요.
use Google\Cloud\PubSub\PubSubClient;
/**
* Publishes a message for a Pub/Sub topic.
*
* The publisher should be used in conjunction with the `google-cloud-batch`
* daemon, which should be running in the background.
*
* To start the daemon, from your project root call `vendor/bin/google-cloud-batch daemon`.
*
* @param string $projectId The Google project ID.
* @param string $topicName The Pub/Sub topic name.
* @param string $message The message to publish.
*/
function publish_message_batch($projectId, $topicName, $message)
{
// Check if the batch daemon is running.
if (getenv('IS_BATCH_DAEMON_RUNNING') !== 'true') {
trigger_error(
'The batch daemon is not running. Call ' .
'`vendor/bin/google-cloud-batch daemon` from ' .
'your project root to start the daemon.',
E_USER_NOTICE
);
}
$batchOptions = [
'batchSize' => 100, // Max messages for each batch.
'callPeriod' => 0.01, // Max time in seconds between each batch publish.
];
$pubsub = new PubSubClient([
'projectId' => $projectId,
]);
$topic = $pubsub->topic($topicName);
$publisher = $topic->batchPublisher([
'batchOptions' => $batchOptions
]);
for ($i = 0; $i < 10; $i++) {
$publisher->publish(['data' => $message]);
}
print('Messages enqueued for publication.' . PHP_EOL);
}
Python
이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Python 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Python API 참고 문서를 참조하세요.
from concurrent import futures
from google.cloud import pubsub_v1
# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# Configure the batch to publish as soon as there are 10 messages
# or 1 KiB of data, or 1 second has passed.
batch_settings = pubsub_v1.types.BatchSettings(
max_messages=10, # default 100
max_bytes=1024, # default 1 MB
max_latency=1, # default 10 ms
)
publisher = pubsub_v1.PublisherClient(batch_settings)
topic_path = publisher.topic_path(project_id, topic_id)
publish_futures = []
# Resolve the publish future in a separate thread.
def callback(future: pubsub_v1.publisher.futures.Future) -> None:
message_id = future.result()
print(message_id)
for n in range(1, 10):
data_str = f"Message number {n}"
# Data must be a bytestring
data = data_str.encode("utf-8")
publish_future = publisher.publish(topic_path, data)
# Non-blocking. Allow the publisher client to batch multiple messages.
publish_future.add_done_callback(callback)
publish_futures.append(publish_future)
futures.wait(publish_futures, return_when=futures.ALL_COMPLETED)
print(f"Published messages with batch settings to {topic_path}.")
Ruby
이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Ruby 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Ruby API 참고 문서를 참조하세요.
# topic_id = "your-topic-id"
pubsub = Google::Cloud::Pubsub.new
# Start sending messages in one request once the size of all queued messages
# reaches 1 MB or the number of queued messages reaches 20
topic = pubsub.topic topic_id, async: {
max_bytes: 1_000_000,
max_messages: 20
}
10.times do |i|
topic.publish_async "This is message ##{i}."
end
# Stop the async_publisher to send all queued messages immediately.
topic.async_publisher.stop.wait!
puts "Messages published asynchronously in batch."
메시지 일괄 처리 사용 중지
클라이언트 라이브러리에서 일괄 처리를 해제하려면 max_messages
값을 1로 설정합니다.
메시지 일괄 처리 및 정렬된 전송
정렬된 전송을 사용할 경우 배치의 메시지를 확인하지 못하면 배치의 모든 메시지(확인되지 않은 메시지 이전에 전송된 메시지 포함)가 모두 다시 전송됩니다.
메시지 일괄 처리의 할당량 및 한도
메시지 일괄 처리를 구성하기 전에 게시 처리량 할당량 및 최대 배치 크기와 같은 요인의 영향을 고려하세요. 상위 수준의 클라이언트 라이브러리는 배치 요청이 지정된 한도 내에서 유지되도록 합니다.
- 실제 메시지 크기가 1,000바이트보다 작더라도 비용 측면에서 권장되는 최소 요청 크기는 1,000바이트입니다.
- Pub/Sub의 단일 배치 게시 요청은 크기 10MB 또는 메시지 수 1,000개로 제한됩니다.
자세한 내용은 Pub/Sub 할당량 및 한도를 참조하세요.
다음 단계
고급 게시 옵션을 구성하는 방법은 다음을 참조하세요.