配置用于接收消息和处理消息回调的线程数。
深入探索
如需查看包含此代码示例的详细文档,请参阅以下内容:
代码示例
C++
在尝试此示例之前,请按照使用客户端库的 Pub/Sub 快速入门中的 C++ 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C++ API 参考文档。
namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::GrpcBackgroundThreadPoolSizeOption;
using ::google::cloud::Options;
using ::google::cloud::StatusOr;
auto sample = [](std::string project_id, std::string subscription_id) {
// Create a subscriber with 16 threads handling I/O work, by default the
// library creates `std::thread::hardware_concurrency()` threads.
auto subscriber = pubsub::Subscriber(pubsub::MakeSubscriberConnection(
pubsub::Subscription(std::move(project_id), std::move(subscription_id)),
Options{}
.set<pubsub::MaxConcurrencyOption>(8)
.set<GrpcBackgroundThreadPoolSizeOption>(16)));
// Create a subscription where up to 8 messages are handled concurrently. By
// default the library uses `std::thread::hardware_concurrency()` as the
// maximum number of concurrent callbacks.
auto session = subscriber.Subscribe(
[](pubsub::Message const& m, pubsub::AckHandler h) {
// This handler executes in the I/O threads, applications could use,
// std::async(), a thread-pool, or any other mechanism to transfer the
// execution to other threads.
std::cout << "Received message " << m << "\n";
std::move(h).ack();
PleaseIgnoreThisSimplifiesTestingTheSamples();
});
return std::make_pair(subscriber, std::move(session));
};
Go
在尝试此示例之前,请按照使用客户端库的 Pub/Sub 快速入门中的 Go 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Go API 参考文档。
import (
"context"
"fmt"
"io"
"sync/atomic"
"time"
"cloud.google.com/go/pubsub"
)
func pullMsgsConcurrencyControl(w io.Writer, projectID, subID string) error {
// projectID := "my-project-id"
// subID := "my-sub"
ctx := context.Background()
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
sub := client.Subscription(subID)
// Must set ReceiveSettings.Synchronous to false (or leave as default) to enable
// concurrency pulling of messages. Otherwise, NumGoroutines will be set to 1.
sub.ReceiveSettings.Synchronous = false
// NumGoroutines determines the number of goroutines sub.Receive will spawn to pull
// messages.
sub.ReceiveSettings.NumGoroutines = 16
// MaxOutstandingMessages limits the number of concurrent handlers of messages.
// In this case, up to 8 unacked messages can be handled concurrently.
// Note, even in synchronous mode, messages pulled in a batch can still be handled
// concurrently.
sub.ReceiveSettings.MaxOutstandingMessages = 8
// Receive messages for 10 seconds, which simplifies testing.
// Comment this out in production, since `Receive` should
// be used as a long running operation.
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
var received int32
// Receive blocks until the context is cancelled or an error occurs.
err = sub.Receive(ctx, func(_ context.Context, msg *pubsub.Message) {
atomic.AddInt32(&received, 1)
msg.Ack()
})
if err != nil {
return fmt.Errorf("sub.Receive returned error: %v", err)
}
fmt.Fprintf(w, "Received %d messages\n", received)
return nil
}
Java
在尝试此示例之前,请按照使用客户端库的 Pub/Sub 快速入门中的 Java 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Java API 参考文档。
import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.InstantiatingExecutorProvider;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class SubscribeWithConcurrencyControlExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
String subscriptionId = "your-subscription-id";
subscribeWithConcurrencyControlExample(projectId, subscriptionId);
}
public static void subscribeWithConcurrencyControlExample(
String projectId, String subscriptionId) {
ProjectSubscriptionName subscriptionName =
ProjectSubscriptionName.of(projectId, subscriptionId);
// Instantiate an asynchronous message receiver.
MessageReceiver receiver =
(PubsubMessage message, AckReplyConsumer consumer) -> {
// Handle incoming message, then ack the received message.
System.out.println("Id: " + message.getMessageId());
System.out.println("Data: " + message.getData().toStringUtf8());
consumer.ack();
};
Subscriber subscriber = null;
try {
// Provides an executor service for processing messages. The default `executorProvider` used
// by the subscriber has a default thread count of 5.
ExecutorProvider executorProvider =
InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(4).build();
// `setParallelPullCount` determines how many StreamingPull streams the subscriber will open
// to receive message. It defaults to 1. `setExecutorProvider` configures an executor for the
// subscriber to process messages. Here, the subscriber is configured to open 2 streams for
// receiving messages, each stream creates a new executor with 4 threads to help process the
// message callbacks. In total 2x4=8 threads are used for message processing.
subscriber =
Subscriber.newBuilder(subscriptionName, receiver)
.setParallelPullCount(2)
.setExecutorProvider(executorProvider)
.build();
// Start the subscriber.
subscriber.startAsync().awaitRunning();
System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
// Allow the subscriber to run for 30s unless an unrecoverable error occurs.
subscriber.awaitTerminated(30, TimeUnit.SECONDS);
} catch (TimeoutException timeoutException) {
// Shut down the subscriber after 30s. Stop receiving messages.
subscriber.stopAsync();
}
}
}
Ruby
在尝试此示例之前,请按照使用客户端库的 Pub/Sub 快速入门中的 Ruby 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Ruby API 参考文档。
# subscription_id = "your-subscription-id"
require "google/cloud/pubsub"
pubsub = Google::Cloud::Pubsub.new
subscription = pubsub.subscription subscription_id
# Use 2 threads for streaming, 4 threads for executing callbacks and 2 threads
# for sending acknowledgements and/or delays
subscriber = subscription.listen streams: 2, threads: {
callback: 4,
push: 2
} do |received_message|
puts "Received message: #{received_message.data}"
received_message.acknowledge!
end
subscriber.start
# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60
subscriber.stop.wait!
后续步骤
如需搜索和过滤其他 Google Cloud 产品的代码示例,请参阅 Google Cloud 示例浏览器。