将发布者配置为为发布消息和处理未来消息使用不同数量的线程。
深入探索
如需查看包含此代码示例的详细文档,请参阅以下内容:
代码示例
C++
在尝试此示例之前,请按照使用客户端库的 Pub/Sub 快速入门中的 C++ 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C++ API 参考文档。
namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::GrpcBackgroundThreadPoolSizeOption;
using ::google::cloud::Options;
using ::google::cloud::StatusOr;
[](std::string project_id, std::string topic_id) {
auto topic = pubsub::Topic(std::move(project_id), std::move(topic_id));
// Override the default number of background (I/O) threads. By default the
// library uses `std::thread::hardware_concurrency()` threads.
auto options = Options{}.set<GrpcBackgroundThreadPoolSizeOption>(8);
auto publisher = pubsub::Publisher(
pubsub::MakePublisherConnection(std::move(topic), std::move(options)));
std::vector<future<void>> ids;
for (char const* data : {"1", "2", "3", "go!"}) {
ids.push_back(
publisher.Publish(pubsub::MessageBuilder().SetData(data).Build())
.then([data](future<StatusOr<std::string>> f) {
auto s = f.get();
if (!s) return;
std::cout << "Sent '" << data << "' (" << *s << ")\n";
}));
}
publisher.Flush();
// Block until they are actually sent.
for (auto& id : ids) id.get();
}
Go
在尝试此示例之前,请按照使用客户端库的 Pub/Sub 快速入门中的 Go 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Go API 参考文档。
import (
"context"
"fmt"
"io"
"cloud.google.com/go/pubsub"
)
func publishSingleGoroutine(w io.Writer, projectID, topicID, msg string) error {
// projectID := "my-project-id"
// topicID := "my-topic"
// msg := "Hello World"
ctx := context.Background()
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
t := client.Topic(topicID)
t.PublishSettings.NumGoroutines = 1
result := t.Publish(ctx, &pubsub.Message{Data: []byte(msg)})
// Block until the result is returned and a server-generated
// ID is returned for the published message.
id, err := result.Get(ctx)
if err != nil {
return fmt.Errorf("Get: %v", err)
}
fmt.Fprintf(w, "Published a message; msg ID: %v\n", id)
return nil
}
Java
在尝试此示例之前,请按照使用客户端库的 Pub/Sub 快速入门中的 Java 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Java API 参考文档。
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.InstantiatingExecutorProvider;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
public class PublishWithConcurrencyControlExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
String topicId = "your-topic-id";
publishWithConcurrencyControlExample(projectId, topicId);
}
public static void publishWithConcurrencyControlExample(String projectId, String topicId)
throws IOException, ExecutionException, InterruptedException {
TopicName topicName = TopicName.of(projectId, topicId);
Publisher publisher = null;
List<ApiFuture<String>> messageIdFutures = new ArrayList<>();
try {
// Provides an executor service for processing messages. The default
// `executorProvider` used by the publisher has a default thread count of
// 5 * the number of processors available to the Java virtual machine.
ExecutorProvider executorProvider =
InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(4).build();
// `setExecutorProvider` configures an executor for the publisher.
publisher = Publisher.newBuilder(topicName).setExecutorProvider(executorProvider).build();
// schedule publishing one message at a time : messages get automatically batched
for (int i = 0; i < 100; i++) {
String message = "message " + i;
ByteString data = ByteString.copyFromUtf8(message);
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
// Once published, returns a server-assigned message id (unique within the topic)
ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
messageIdFutures.add(messageIdFuture);
}
} finally {
// Wait on any pending publish requests.
List<String> messageIds = ApiFutures.allAsList(messageIdFutures).get();
System.out.println("Published " + messageIds.size() + " messages with concurrency control.");
if (publisher != null) {
// When finished with the publisher, shutdown to free up resources.
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
}
}
}
}
Ruby
在尝试此示例之前,请按照使用客户端库的 Pub/Sub 快速入门中的 Ruby 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Ruby API 参考文档。
# topic_id = "your-topic-id"
require "google/cloud/pubsub"
pubsub = Google::Cloud::Pubsub.new
topic = pubsub.topic topic_id, async: {
threads: {
# Use exactly one thread for publishing message and exactly one thread
# for executing callbacks
publish: 1,
callback: 1
}
}
topic.publish_async "This is a test message." do |result|
raise "Failed to publish the message." unless result.succeeded?
puts "Message published asynchronously."
end
# Stop the async_publisher to send all queued messages immediately.
topic.async_publisher.stop.wait!
后续步骤
如需搜索和过滤其他 Google Cloud 产品的代码示例,请参阅 Google Cloud 示例浏览器。