カスタムフロー制御設定でパブリッシャー クライアントを作成し、それを使用して一部のメッセージをパブリッシュします。
もっと見る
このコードサンプルを含む詳細なドキュメントについては、以下をご覧ください。
コードサンプル
C++
このサンプルを試す前に、Pub/Sub クイックスタート: クライアント ライブラリの使用にある C++ 向けの手順に従って設定を行ってください。 詳細については、Pub/Sub C++ API のリファレンス ドキュメントをご覧ください。
namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::Options;
using ::google::cloud::StatusOr;
[](std::string project_id, std::string topic_id) {
auto topic = pubsub::Topic(std::move(project_id), std::move(topic_id));
// Configure the publisher to block if either (1) 100 or more messages, or
// (2) messages with 100MiB worth of data have not been acknowledged by the
// service. By default the publisher never blocks, and its capacity is only
// limited by the system's memory.
auto publisher = pubsub::Publisher(pubsub::MakePublisherConnection(
std::move(topic),
Options{}
.set<pubsub::MaxPendingMessagesOption>(100)
.set<pubsub::MaxPendingBytesOption>(100 * 1024 * 1024L)
.set<pubsub::FullPublisherActionOption>(
pubsub::FullPublisherAction::kBlocks)));
std::vector<future<void>> ids;
for (char const* data : {"a", "b", "c"}) {
ids.push_back(
publisher.Publish(pubsub::MessageBuilder().SetData(data).Build())
.then([data](future<StatusOr<std::string>> f) {
auto s = f.get();
if (!s) return;
std::cout << "Sent '" << data << "' (" << *s << ")\n";
}));
}
publisher.Flush();
// Block until they are actually sent.
for (auto& id : ids) id.get();
}
Go
このサンプルを試す前に、Pub/Sub クイックスタート: クライアント ライブラリの使用にある Go 向けの手順に従って設定を行ってください。 詳細については、Pub/Sub Go API のリファレンス ドキュメントをご覧ください。
import (
"context"
"fmt"
"io"
"strconv"
"sync"
"sync/atomic"
"cloud.google.com/go/pubsub"
)
func publishWithFlowControlSettings(w io.Writer, projectID, topicID string) error {
// projectID := "my-project-id"
// topicID := "my-topic"
ctx := context.Background()
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
t := client.Topic(topicID)
t.PublishSettings.FlowControlSettings = pubsub.FlowControlSettings{
MaxOutstandingMessages: 100, // default 1000
MaxOutstandingBytes: 10 * 1024 * 1024, // default 0 (unlimited)
LimitExceededBehavior: pubsub.FlowControlBlock, // default Ignore, other options: Block and SignalError
}
var wg sync.WaitGroup
var totalErrors uint64
numMsgs := 1000
// Rapidly publishing 1000 messages in a loop may be constrained by flow control.
for i := 0; i < numMsgs; i++ {
wg.Add(1)
result := t.Publish(ctx, &pubsub.Message{
Data: []byte("message #" + strconv.Itoa(i)),
})
go func(i int, res *pubsub.PublishResult) {
fmt.Fprintf(w, "Publishing message %d\n", i)
defer wg.Done()
// The Get method blocks until a server-generated ID or
// an error is returned for the published message.
_, err := res.Get(ctx)
if err != nil {
// Error handling code can be added here.
fmt.Fprintf(w, "Failed to publish: %v", err)
atomic.AddUint64(&totalErrors, 1)
return
}
}(i, result)
}
wg.Wait()
if totalErrors > 0 {
return fmt.Errorf("%d of %d messages did not publish successfully", totalErrors, numMsgs)
}
return nil
}
Java
このサンプルを試す前に、Pub/Sub クイックスタート: クライアント ライブラリの使用にある Java 向けの手順に従って設定を行ってください。 詳細については、Pub/Sub Java API のリファレンス ドキュメントをご覧ください。
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.batching.BatchingSettings;
import com.google.api.gax.batching.FlowControlSettings;
import com.google.api.gax.batching.FlowController.LimitExceededBehavior;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
public class PublishWithFlowControlExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
String topicId = "your-topic-id";
publishWithFlowControlExample(projectId, topicId);
}
public static void publishWithFlowControlExample(String projectId, String topicId)
throws IOException, ExecutionException, InterruptedException {
TopicName topicName = TopicName.of(projectId, topicId);
Publisher publisher = null;
List<ApiFuture<String>> messageIdFutures = new ArrayList<>();
try {
// Configure how many messages the publisher client can hold in memory
// and what to do when messages exceed the limit.
FlowControlSettings flowControlSettings =
FlowControlSettings.newBuilder()
// Block more messages from being published when the limit is reached. The other
// options are Ignore (or continue publishing) and ThrowException (or error out).
.setLimitExceededBehavior(LimitExceededBehavior.Block)
.setMaxOutstandingRequestBytes(10 * 1024 * 1024L) // 10 MiB
.setMaxOutstandingElementCount(100L) // 100 messages
.build();
// By default, messages are not batched.
BatchingSettings batchingSettings =
BatchingSettings.newBuilder().setFlowControlSettings(flowControlSettings).build();
publisher = Publisher.newBuilder(topicName).setBatchingSettings(batchingSettings).build();
// Publish 1000 messages in quick succession may be constrained by publisher flow control.
for (int i = 0; i < 1000; i++) {
String message = "message " + i;
ByteString data = ByteString.copyFromUtf8(message);
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
// Once published, returns a server-assigned message id (unique within the topic)
ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
messageIdFutures.add(messageIdFuture);
}
} finally {
// Wait on any pending publish requests.
List<String> messageIds = ApiFutures.allAsList(messageIdFutures).get();
System.out.println(
"Published " + messageIds.size() + " messages with flow control settings.");
if (publisher != null) {
// When finished with the publisher, shut down to free up resources.
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
}
}
}
}
Node.js
/**
* TODO(developer): Uncomment this variable before running the sample.
*/
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');
// Creates a client; cache this for further use
const pubSubClient = new PubSub();
async function publishWithFlowControl(topicNameOrId) {
// Create publisher options
const options = {
flowControlOptions: {
maxOutstandingMessages: 50,
maxOutstandingBytes: 10 * 1024 * 1024, // 10 MB
},
};
// Get a publisher.
const topic = pubSubClient.topic(topicNameOrId, options);
// For flow controlled publishing, we'll use a publisher flow controller
// instead of `topic.publish()`.
const flow = topic.flowControlled();
// Publish messages in a fast loop.
const testMessage = {data: Buffer.from('test!')};
for (let i = 0; i < 1000; i++) {
// You can also just `await` on `publish()` unconditionally, but if
// you want to avoid pausing to the event loop on each iteration,
// you can manually check the return value before doing so.
const wait = flow.publish(testMessage);
if (wait) {
await wait;
}
}
// Wait on any pending publish requests. Note that you can call `all()`
// earlier if you like, and it will return a Promise for all messages
// that have been sent to `flowController.publish()` so far.
const messageIds = await flow.all();
console.log(`Published ${messageIds.length} with flow control settings.`);
}
Node.js
/**
* TODO(developer): Uncomment this variable before running the sample.
*/
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// Imports the Google Cloud client library
import {PubSub, PublishOptions} from '@google-cloud/pubsub';
// Creates a client; cache this for further use
const pubSubClient = new PubSub();
async function publishWithFlowControl(topicNameOrId: string) {
// Create publisher options
const options: PublishOptions = {
flowControlOptions: {
maxOutstandingMessages: 50,
maxOutstandingBytes: 10 * 1024 * 1024, // 10 MB
},
};
// Get a publisher.
const topic = pubSubClient.topic(topicNameOrId, options);
// For flow controlled publishing, we'll use a publisher flow controller
// instead of `topic.publish()`.
const flow = topic.flowControlled();
// Publish messages in a fast loop.
const testMessage = {data: Buffer.from('test!')};
for (let i = 0; i < 1000; i++) {
// You can also just `await` on `publish()` unconditionally, but if
// you want to avoid pausing to the event loop on each iteration,
// you can manually check the return value before doing so.
const wait = flow.publish(testMessage);
if (wait) {
await wait;
}
}
// Wait on any pending publish requests. Note that you can call `all()`
// earlier if you like, and it will return a Promise for all messages
// that have been sent to `flowController.publish()` so far.
const messageIds = await flow.all();
console.log(`Published ${messageIds.length} with flow control settings.`);
}
Python
このサンプルを試す前に、Pub/Sub クイックスタート: クライアント ライブラリの使用にある Python 向けの手順に従って設定を行ってください。 詳細については、Pub/Sub Python API のリファレンス ドキュメントをご覧ください。
from concurrent import futures
from google.cloud import pubsub_v1
from google.cloud.pubsub_v1.types import (
LimitExceededBehavior,
PublisherOptions,
PublishFlowControl,
)
# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# Configure how many messages the publisher client can hold in memory
# and what to do when messages exceed the limit.
flow_control_settings = PublishFlowControl(
message_limit=100, # 100 messages
byte_limit=10 * 1024 * 1024, # 10 MiB
limit_exceeded_behavior=LimitExceededBehavior.BLOCK,
)
publisher = pubsub_v1.PublisherClient(
publisher_options=PublisherOptions(flow_control=flow_control_settings)
)
topic_path = publisher.topic_path(project_id, topic_id)
publish_futures = []
# Resolve the publish future in a separate thread.
def callback(publish_future: pubsub_v1.publisher.futures.Future) -> None:
message_id = publish_future.result()
print(message_id)
# Publish 1000 messages in quick succession may be constrained by
# publisher flow control.
for n in range(1, 1000):
data_str = f"Message number {n}"
# Data must be a bytestring
data = data_str.encode("utf-8")
publish_future = publisher.publish(topic_path, data)
# Non-blocking. Allow the publisher client to batch messages.
publish_future.add_done_callback(callback)
publish_futures.append(publish_future)
futures.wait(publish_futures, return_when=futures.ALL_COMPLETED)
print(f"Published messages with flow control settings to {topic_path}.")
Ruby
このサンプルを試す前に、Pub/Sub クイックスタート: クライアント ライブラリの使用にある Ruby 向けの手順に従って設定を行ってください。 詳細については、Pub/Sub Ruby API のリファレンス ドキュメントをご覧ください。
# topic_id = "your-topic-id"
require "google/cloud/pubsub"
pubsub = Google::Cloud::Pubsub.new
topic = pubsub.topic topic_id, async: {
# Configure how many messages the publisher client can hold in memory
# and what to do when messages exceed the limit.
flow_control: {
message_limit: 100,
byte_limit: 10 * 1024 * 1024, # 10 MiB
# Block more messages from being published when the limit is reached. The
# other options are :ignore and :error.
limit_exceeded_behavior: :block
}
}
# Rapidly publishing 1000 messages in a loop may be constrained by flow control.
1000.times do |i|
topic.publish_async "message #{i}" do |result|
raise "Failed to publish the message." unless result.succeeded?
end
end
# Stop the async_publisher to send all queued messages immediately.
topic.async_publisher.stop.wait!
puts "Published messages with flow control settings to #{topic_id}."
次のステップ
他の Google Cloud プロダクトに関連するコードサンプルの検索およびフィルタ検索を行うには、Google Cloud のサンプルをご覧ください。