Pub/Sub 支持推送消息传送和拉取消息传送。如需大致了解拉取订阅和推送订阅以及两者的对比情况,请参阅订阅者概览。本文档介绍拉取传送。如需了解推送传送的讨论,请参阅推送订阅者指南。
异步拉取
使用异步拉取不需要应用阻止新消息,从而在应用中实现更高的吞吐量。可以在应用中使用长时间运行的消息侦听器接收消息,并且一次确认一条消息,如以下示例所示。Java、Python、.NET、Go、Ruby 客户端使用 StreamingPull 服务 API 高效地实现异步客户端 API。
并非所有客户端库都支持异步拉取消息。要了解同步拉取消息,请参阅同步拉取。
如需了解详情,请参阅您的编程语言的 API 参考文档。
C++
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C++ 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C++ API 参考文档。
namespace pubsub = google::cloud::pubsub;
using google::cloud::future;
using google::cloud::StatusOr;
[](pubsub::Subscriber subscriber) {
std::mutex mu;
std::condition_variable cv;
int message_count = 0;
auto session = subscriber.Subscribe(
[&](pubsub::Message const& m, pubsub::AckHandler h) {
std::cout << "Received message " << m << "\n";
std::unique_lock<std::mutex> lk(mu);
++message_count;
lk.unlock();
cv.notify_one();
std::move(h).ack();
});
// Wait until at least one message has been received.
std::unique_lock<std::mutex> lk(mu);
cv.wait(lk, [&message_count] { return message_count > 0; });
lk.unlock();
// Cancel the subscription session.
session.cancel();
// Wait for the session to complete, no more callbacks can happen after this
// point.
auto status = session.get();
// Report any final status, blocking.
std::cout << "Message count: " << message_count << ", status: " << status
<< "\n";
}
C#
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C# 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub C# API 参考文档。
using Google.Cloud.PubSub.V1;
using System;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
public class PullMessagesAsyncSample
{
public async Task<int> PullMessagesAsync(string projectId, string subscriptionId, bool acknowledge)
{
SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);
SubscriberClient subscriber = await SubscriberClient.CreateAsync(subscriptionName);
// SubscriberClient runs your message handle function on multiple
// threads to maximize throughput.
int messageCount = 0;
Task startTask = subscriber.StartAsync((PubsubMessage message, CancellationToken cancel) =>
{
string text = Encoding.UTF8.GetString(message.Data.ToArray());
Console.WriteLine($"Message {message.MessageId}: {text}");
Interlocked.Increment(ref messageCount);
return Task.FromResult(acknowledge ? SubscriberClient.Reply.Ack : SubscriberClient.Reply.Nack);
});
// Run for 5 seconds.
await Task.Delay(5000);
await subscriber.StopAsync(CancellationToken.None);
// Lets make sure that the start task finished successfully after the call to stop.
await startTask;
return messageCount;
}
}
Go
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Go 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Go API 参考文档。
import (
"context"
"fmt"
"io"
"sync"
"cloud.google.com/go/pubsub"
)
func pullMsgs(w io.Writer, projectID, subID string) error {
// projectID := "my-project-id"
// subID := "my-sub"
ctx := context.Background()
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
// Consume 10 messages.
var mu sync.Mutex
received := 0
sub := client.Subscription(subID)
cctx, cancel := context.WithCancel(ctx)
err = sub.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) {
mu.Lock()
defer mu.Unlock()
fmt.Fprintf(w, "Got message: %q\n", string(msg.Data))
msg.Ack()
received++
if received == 10 {
cancel()
}
})
if err != nil {
return fmt.Errorf("Receive: %v", err)
}
return nil
}
Java
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档。
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class SubscribeAsyncExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
String subscriptionId = "your-subscription-id";
subscribeAsyncExample(projectId, subscriptionId);
}
public static void subscribeAsyncExample(String projectId, String subscriptionId) {
ProjectSubscriptionName subscriptionName =
ProjectSubscriptionName.of(projectId, subscriptionId);
// Instantiate an asynchronous message receiver.
MessageReceiver receiver =
(PubsubMessage message, AckReplyConsumer consumer) -> {
// Handle incoming message, then ack the received message.
System.out.println("Id: " + message.getMessageId());
System.out.println("Data: " + message.getData().toStringUtf8());
consumer.ack();
};
Subscriber subscriber = null;
try {
subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
// Start the subscriber.
subscriber.startAsync().awaitRunning();
System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
// Allow the subscriber to run for 30s unless an unrecoverable error occurs.
subscriber.awaitTerminated(30, TimeUnit.SECONDS);
} catch (TimeoutException timeoutException) {
// Shut down the subscriber after 30s. Stop receiving messages.
subscriber.stopAsync();
}
}
}
Node.js
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档。
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const subscriptionName = 'YOUR_SUBSCRIPTION_NAME';
// const timeout = 60;
// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');
// Creates a client; cache this for further use
const pubSubClient = new PubSub();
function listenForMessages() {
// References an existing subscription
const subscription = pubSubClient.subscription(subscriptionName);
// Create an event handler to handle messages
let messageCount = 0;
const messageHandler = message => {
console.log(`Received message ${message.id}:`);
console.log(`\tData: ${message.data}`);
console.log(`\tAttributes: ${message.attributes}`);
messageCount += 1;
// "Ack" (acknowledge receipt of) the message
message.ack();
};
// Listen for new messages until timeout is hit
subscription.on('message', messageHandler);
setTimeout(() => {
subscription.removeListener('message', messageHandler);
console.log(`${messageCount} message(s) received.`);
}, timeout * 1000);
}
listenForMessages();
Python
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档。
from concurrent.futures import TimeoutError
from google.cloud import pubsub_v1
# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# Number of seconds the subscriber should listen for messages
# timeout = 5.0
subscriber = pubsub_v1.SubscriberClient()
# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
subscription_path = subscriber.subscription_path(project_id, subscription_id)
def callback(message):
print(f"Received {message}.")
message.ack()
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..\n")
# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
try:
# When `timeout` is not set, result() will block indefinitely,
# unless an exception is encountered first.
streaming_pull_future.result(timeout=timeout)
except TimeoutError:
streaming_pull_future.cancel()
Ruby
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Ruby 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Ruby API 参考文档。
# subscription_name = "Your Pubsub subscription name"
require "google/cloud/pubsub"
pubsub = Google::Cloud::Pubsub.new
subscription = pubsub.subscription subscription_name
subscriber = subscription.listen do |received_message|
puts "Received message: #{received_message.data}"
received_message.acknowledge!
end
subscriber.start
# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60
subscriber.stop.wait!
处理自定义属性
以下示例展示了如何异步拉取消息以及从元数据中检索自定义属性:
C++
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C++ 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C++ API 参考文档。
namespace pubsub = google::cloud::pubsub;
using google::cloud::future;
using google::cloud::StatusOr;
[](pubsub::Subscriber subscriber) {
std::mutex mu;
std::condition_variable cv;
int message_count = 0;
auto session = subscriber.Subscribe(
[&](pubsub::Message const& m, pubsub::AckHandler h) {
std::cout << "Received message with attributes:\n";
for (auto const& kv : m.attributes()) {
std::cout << " " << kv.first << ": " << kv.second << "\n";
}
std::unique_lock<std::mutex> lk(mu);
++message_count;
lk.unlock();
cv.notify_one();
std::move(h).ack();
});
// Most applications would just release the `session` object at this point,
// but we want to gracefully close down this example.
std::unique_lock<std::mutex> lk(mu);
cv.wait(lk, [&message_count] { return message_count > 0; });
lk.unlock();
session.cancel();
auto status = session.get();
std::cout << "Message count: " << message_count << ", status: " << status
<< "\n";
}
C#
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C# 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub C# API 参考文档。
using Google.Cloud.PubSub.V1;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
public class PullMessagesWithCustomAttributesAsyncSample
{
public async Task<List<PubsubMessage>> PullMessagesWithCustomAttributesAsync(string projectId, string subscriptionId, bool acknowledge)
{
SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);
SubscriberClient subscriber = await SubscriberClient.CreateAsync(subscriptionName);
var messages = new List<PubsubMessage>();
Task startTask = subscriber.StartAsync((PubsubMessage message, CancellationToken cancel) =>
{
messages.Add(message);
string text = Encoding.UTF8.GetString(message.Data.ToArray());
Console.WriteLine($"Message {message.MessageId}: {text}");
if (message.Attributes != null)
{
foreach (var attribute in message.Attributes)
{
Console.WriteLine($"{attribute.Key} = {attribute.Value}");
}
}
return Task.FromResult(acknowledge ? SubscriberClient.Reply.Ack : SubscriberClient.Reply.Nack);
});
// Run for 7 seconds.
await Task.Delay(7000);
await subscriber.StopAsync(CancellationToken.None);
// Lets make sure that the start task finished successfully after the call to stop.
await startTask;
return messages;
}
}
Go
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Go 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Go API 参考文档。
import (
"context"
"fmt"
"io"
"time"
"cloud.google.com/go/pubsub"
)
func pullMsgsCustomAttributes(w io.Writer, projectID, subID string) error {
// projectID := "my-project-id"
// subID := "my-sub"
ctx := context.Background()
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
sub := client.Subscription(subID)
// Receive messages for 10 seconds.
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
// Create a channel to handle messages to as they come in.
cm := make(chan *pubsub.Message)
defer close(cm)
// Handle individual messages in a goroutine.
go func() {
for msg := range cm {
fmt.Fprintf(w, "Got message :%q\n", string(msg.Data))
fmt.Fprintln(w, "Attributes:")
for key, value := range msg.Attributes {
fmt.Fprintf(w, "%s = %s", key, value)
}
msg.Ack()
}
}()
// Receive blocks until the context is cancelled or an error occurs.
err = sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
cm <- msg
})
if err != nil {
return fmt.Errorf("Receive: %v", err)
}
return nil
}
Java
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档。
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class SubscribeWithCustomAttributesExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
String subscriptionId = "your-subscription-id";
subscribeWithCustomAttributesExample(projectId, subscriptionId);
}
public static void subscribeWithCustomAttributesExample(String projectId, String subscriptionId) {
ProjectSubscriptionName subscriptionName =
ProjectSubscriptionName.of(projectId, subscriptionId);
// Instantiate an asynchronous message receiver.
MessageReceiver receiver =
(PubsubMessage message, AckReplyConsumer consumer) -> {
// Handle incoming message, then ack the received message.
System.out.println("Id: " + message.getMessageId());
System.out.println("Data: " + message.getData().toStringUtf8());
// Print message attributes.
message
.getAttributesMap()
.forEach((key, value) -> System.out.println(key + " = " + value));
consumer.ack();
};
Subscriber subscriber = null;
try {
subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
// Start the subscriber.
subscriber.startAsync().awaitRunning();
System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
// Allow the subscriber to run for 30s unless an unrecoverable error occurs.
subscriber.awaitTerminated(30, TimeUnit.SECONDS);
} catch (TimeoutException timeoutException) {
// Shut down the subscriber after 30s. Stop receiving messages.
subscriber.stopAsync();
}
}
}
Node.js
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档。
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const subscriptionName = 'YOUR_SUBSCRIPTION_NAME';
// const timeout = 60;
// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');
// Creates a client; cache this for further use
const pubSubClient = new PubSub();
async function listenWithCustomAttributes() {
// References an existing subscription, e.g. "my-subscription"
const subscription = pubSubClient.subscription(subscriptionName);
// Create an event handler to handle messages
const messageHandler = message => {
console.log(
`Received message: id ${message.id}, data ${
message.data
}, attributes: ${JSON.stringify(message.attributes)}`
);
// "Ack" (acknowledge receipt of) the message
message.ack();
};
// Listen for new messages until timeout is hit
subscription.on('message', messageHandler);
setTimeout(() => {
subscription.removeListener('message', messageHandler);
}, timeout * 1000);
}
listenWithCustomAttributes();
Python
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档。
from concurrent.futures import TimeoutError
from google.cloud import pubsub_v1
# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# Number of seconds the subscriber should listen for messages
# timeout = 5.0
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)
def callback(message):
print(f"Received {message.data}.")
if message.attributes:
print("Attributes:")
for key in message.attributes:
value = message.attributes.get(key)
print(f"{key}: {value}")
message.ack()
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..\n")
# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
try:
# When `timeout` is not set, result() will block indefinitely,
# unless an exception is encountered first.
streaming_pull_future.result(timeout=timeout)
except TimeoutError:
streaming_pull_future.cancel()
Ruby
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Ruby 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Ruby API 参考文档。
# subscription_name = "Your Pubsub subscription name"
require "google/cloud/pubsub"
pubsub = Google::Cloud::Pubsub.new
subscription = pubsub.subscription subscription_name
subscriber = subscription.listen do |received_message|
puts "Received message: #{received_message.data}"
unless received_message.attributes.empty?
puts "Attributes:"
received_message.attributes.each do |key, value|
puts "#{key}: #{value}"
end
end
received_message.acknowledge!
end
subscriber.start
# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60
subscriber.stop.wait!
侦听错误
本示例介绍如何处理订阅消息时出现的错误:
C++
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C++ 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C++ API 参考文档。
namespace pubsub = google::cloud::pubsub;
using google::cloud::future;
using google::cloud::StatusOr;
[](pubsub::Subscriber subscriber) {
std::mutex mu;
std::condition_variable cv;
bool done = false;
int message_count = 0;
auto session =
subscriber
.Subscribe([&](pubsub::Message const& m, pubsub::AckHandler h) {
std::cout << "Received message " << m << "\n";
std::unique_lock<std::mutex> lk(mu);
++message_count;
done = true;
lk.unlock();
cv.notify_one();
std::move(h).ack();
})
// Setup an error handler for the subscription session
.then([&](future<google::cloud::Status> f) {
std::cout << "Subscription session result: " << f.get() << "\n";
std::unique_lock<std::mutex> lk(mu);
done = true;
cv.notify_one();
});
// Most applications would just release the `session` object at this point,
// but we want to gracefully close down this example.
std::unique_lock<std::mutex> lk(mu);
cv.wait(lk, [&done] { return done; });
lk.unlock();
session.cancel();
session.get();
std::cout << "Message count:" << message_count << "\n";
}
Go
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Go 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Go API 参考文档。
import (
"context"
"fmt"
"io"
"cloud.google.com/go/pubsub"
)
func pullMsgsError(w io.Writer, projectID, subID string) error {
// projectID := "my-project-id"
// subID := "my-sub"
ctx := context.Background()
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
// If the service returns a non-retryable error, Receive returns that error after
// all of the outstanding calls to the handler have returned.
err = client.Subscription(subID).Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
fmt.Fprintf(w, "Got message: %q\n", string(msg.Data))
msg.Ack()
})
if err != nil {
return fmt.Errorf("Receive: %v", err)
}
return nil
}
Java
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Go 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Go API 参考文档。
import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.InstantiatingExecutorProvider;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class SubscribeWithErrorListenerExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
String subscriptionId = "your-subscription-id";
subscribeWithErrorListenerExample(projectId, subscriptionId);
}
public static void subscribeWithErrorListenerExample(String projectId, String subscriptionId) {
ProjectSubscriptionName subscriptionName =
ProjectSubscriptionName.of(projectId, subscriptionId);
// Instantiate an asynchronous message receiver.
MessageReceiver receiver =
(PubsubMessage message, AckReplyConsumer consumer) -> {
// Handle incoming message, then ack the received message.
System.out.println("Id: " + message.getMessageId());
System.out.println("Data: " + message.getData().toStringUtf8());
consumer.ack();
};
Subscriber subscriber = null;
try {
// Provides an executor service for processing messages.
ExecutorProvider executorProvider =
InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(4).build();
subscriber =
Subscriber.newBuilder(subscriptionName, receiver)
.setExecutorProvider(executorProvider)
.build();
// Listen for unrecoverable failures. Rebuild a subscriber and restart subscribing
// when the current subscriber encounters permanent errors.
subscriber.addListener(
new Subscriber.Listener() {
public void failed(Subscriber.State from, Throwable failure) {
System.out.println(failure.getStackTrace());
if (!executorProvider.getExecutor().isShutdown()) {
subscribeWithErrorListenerExample(projectId, subscriptionId);
}
}
},
MoreExecutors.directExecutor());
// Start the subscriber.
subscriber.startAsync().awaitRunning();
System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
// Allow the subscriber to run for 30s unless an unrecoverable error occurs.
subscriber.awaitTerminated(30, TimeUnit.SECONDS);
} catch (TimeoutException timeoutException) {
// Shut down the subscriber after 30s. Stop receiving messages.
subscriber.stopAsync();
}
}
}
Node.js
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档。
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const subscriptionName = 'YOUR_SUBSCRIPTION_NAME';
// const timeout = 10;
// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');
// Creates a client; cache this for further use
const pubSubClient = new PubSub();
function listenForErrors() {
// References an existing subscription
const subscription = pubSubClient.subscription(subscriptionName);
// Create an event handler to handle messages
const messageHandler = function (message) {
// Do something with the message
console.log(`Message: ${message}`);
// "Ack" (acknowledge receipt of) the message
message.ack();
};
// Create an event handler to handle errors
const errorHandler = function (error) {
// Do something with the error
console.error(`ERROR: ${error}`);
throw error;
};
// Listen for new messages/errors until timeout is hit
subscription.on('message', messageHandler);
subscription.on('error', errorHandler);
setTimeout(() => {
subscription.removeListener('message', messageHandler);
subscription.removeListener('error', errorHandler);
}, timeout * 1000);
}
listenForErrors();
Python
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档。
from google.cloud import pubsub_v1
# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# Number of seconds the subscriber should listen for messages
# timeout = 5.0
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)
def callback(message):
print(f"Received {message}.")
message.ack()
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..\n")
# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
# When `timeout` is not set, result() will block indefinitely,
# unless an exception is encountered first.
try:
streaming_pull_future.result(timeout=timeout)
except Exception as e:
streaming_pull_future.cancel()
print(
f"Listening for messages on {subscription_path} threw an exception: {e}."
)
Ruby
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Go 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Go API 参考文档。
# subscription_name = "Your Pubsub subscription name"
require "google/cloud/pubsub"
pubsub = Google::Cloud::Pubsub.new
subscription = pubsub.subscription subscription_name
subscriber = subscription.listen do |received_message|
puts "Received message: #{received_message.data}"
received_message.acknowledge!
end
# Propagate expection from child threads to the main thread as soon as it is
# raised. Exceptions happened in the callback thread are collected in the
# callback thread pool and do not propagate to the main thread
Thread.abort_on_exception = true
begin
subscriber.start
# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60
subscriber.stop.wait!
rescue Exception => e
puts "Exception #{e.inspect}: #{e.message}"
raise "Stopped listening for messages."
end
消息流控制
订阅者客户端处理和确认消息的速度可能比 Pub/Sub 将消息发送到客户端的速度要慢。在此示例中:
有可能一个客户端不具备处理传入消息量的能力,导致消息积压,但网络上的另一个客户端却具备这一能力。上述第二个客户端有能力帮助减少订阅积压量,但却没有机会,因为第一个客户端保持着对收到的消息的租期。 这样会降低整体的处理速率,因为消息积压在第一个客户端。
由于该客户端库反复延长积压消息的确认时限,这些消息会继续消耗内存、CPU 和带宽资源。这样一来,订阅者客户端可能会耗尽资源(例如内存)。这可能会对处理消息的吞吐量和延迟时间产生负面影响。
要缓解上述问题,请使用订阅者的流控制功能来控制订阅者接收消息的速率。以下示例演示了这些流控制功能:
C++
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C++ 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C++ API 参考文档。
namespace pubsub = google::cloud::pubsub;
using google::cloud::future;
using google::cloud::StatusOr;
[](std::string project_id, std::string subscription_id) {
// Change the flow control watermarks, by default the client library uses
// 0 and 1,000 for the message count watermarks, and 0 and 10MiB for the
// size watermarks. Recall that the library stops requesting messages if
// any of the high watermarks are reached, and the library resumes
// requesting messages when *both* low watermarks are reached.
auto constexpr kMiB = 1024 * 1024L;
auto subscriber = pubsub::Subscriber(pubsub::MakeSubscriberConnection(
pubsub::Subscription(std::move(project_id), std::move(subscription_id)),
pubsub::SubscriberOptions{}
.set_max_outstanding_messages(1000)
.set_max_outstanding_bytes(8 * kMiB)));
std::mutex mu;
std::condition_variable cv;
int count = 0;
auto constexpr kExpectedMessageCount = 4;
auto handler = [&](pubsub::Message const& m, pubsub::AckHandler h) {
std::move(h).ack();
{
std::lock_guard<std::mutex> lk(mu);
std::cout << "Received message [" << count << "] " << m.data() << "\n";
if (++count < kExpectedMessageCount) return;
}
cv.notify_one();
};
auto session = subscriber.Subscribe(std::move(handler));
{
std::unique_lock<std::mutex> lk(mu);
cv.wait(lk, [&] { return count >= kExpectedMessageCount; });
}
session.cancel();
auto status = session.get();
std::cout << "Message count: " << count << ", status: " << status << "\n";
}
C#
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C# 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub C# API 参考文档。
using Google.Api.Gax;
using Google.Cloud.PubSub.V1;
using System;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
public class PullMessagesWithFlowControlAsyncSample
{
public async Task<int> PullMessagesWithFlowControlAsync(string projectId, string subscriptionId, bool acknowledge)
{
SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);
int messageCount = 0;
SubscriberClient subscriber = await SubscriberClient.CreateAsync(subscriptionName,
settings: new SubscriberClient.Settings()
{
AckExtensionWindow = TimeSpan.FromSeconds(4),
AckDeadline = TimeSpan.FromSeconds(10),
FlowControlSettings = new FlowControlSettings(maxOutstandingElementCount: 100, maxOutstandingByteCount: 10240)
});
// SubscriberClient runs your message handle function on multiple
// threads to maximize throughput.
Task startTask = subscriber.StartAsync((PubsubMessage message, CancellationToken cancel) =>
{
string text = Encoding.UTF8.GetString(message.Data.ToArray());
Console.WriteLine($"Message {message.MessageId}: {text}");
Interlocked.Increment(ref messageCount);
return Task.FromResult(acknowledge ? SubscriberClient.Reply.Ack : SubscriberClient.Reply.Nack);
});
// Run for 5 seconds.
await Task.Delay(5000);
await subscriber.StopAsync(CancellationToken.None);
// Lets make sure that the start task finished successfully after the call to stop.
await startTask;
return messageCount;
}
}
Go
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Go 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Go API 参考文档。
import (
"context"
"fmt"
"io"
"cloud.google.com/go/pubsub"
)
func pullMsgsSettings(w io.Writer, projectID, subID string) error {
// projectID := "my-project-id"
// subID := "my-sub"
ctx := context.Background()
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
sub := client.Subscription(subID)
sub.ReceiveSettings.Synchronous = true
// MaxOutstandingMessages is the maximum number of unprocessed messages the
// client will pull from the server before pausing.
//
// This is only guaranteed when ReceiveSettings.Synchronous is set to true.
// When Synchronous is set to false, the StreamingPull RPC is used which
// can pull a single large batch of messages at once that is greater than
// MaxOustandingMessages before pausing. For more info, see
// https://cloud.google.com/pubsub/docs/pull#streamingpull_dealing_with_large_backlogs_of_small_messages.
sub.ReceiveSettings.MaxOutstandingMessages = 10
// MaxOutstandingBytes is the maximum size of unprocessed messages,
// that the client will pull from the server before pausing. Similar
// to MaxOutstandingMessages, this may be exceeded with a large batch
// of messages since we cannot control the size of a batch of messages
// from the server (even with the synchronous Pull RPC).
sub.ReceiveSettings.MaxOutstandingBytes = 1e10
err = sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
fmt.Fprintf(w, "Got message: %q\n", string(msg.Data))
msg.Ack()
})
if err != nil {
return fmt.Errorf("Receive: %v", err)
}
return nil
}
Java
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档。
import com.google.api.gax.batching.FlowControlSettings;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class SubscribeWithFlowControlSettingsExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
String subscriptionId = "your-subscription-id";
subscribeWithFlowControlSettingsExample(projectId, subscriptionId);
}
public static void subscribeWithFlowControlSettingsExample(
String projectId, String subscriptionId) {
ProjectSubscriptionName subscriptionName =
ProjectSubscriptionName.of(projectId, subscriptionId);
// Instantiate an asynchronous message receiver.
MessageReceiver receiver =
(PubsubMessage message, AckReplyConsumer consumer) -> {
// Handle incoming message, then ack the received message.
System.out.println("Id: " + message.getMessageId());
System.out.println("Data: " + message.getData().toStringUtf8());
consumer.ack();
};
Subscriber subscriber = null;
// The subscriber will pause the message stream and stop receiving more messsages from the
// server if any one of the conditions is met.
FlowControlSettings flowControlSettings =
FlowControlSettings.newBuilder()
// 1,000 outstanding messages. Must be >0. It controls the maximum number of messages
// the subscriber receives before pausing the message stream.
.setMaxOutstandingElementCount(1000L)
// 100 MiB. Must be >0. It controls the maximum size of messages the subscriber
// receives before pausing the message stream.
.setMaxOutstandingRequestBytes(100L * 1024L * 1024L)
.build();
try {
subscriber =
Subscriber.newBuilder(subscriptionName, receiver)
.setFlowControlSettings(flowControlSettings)
.build();
// Start the subscriber.
subscriber.startAsync().awaitRunning();
System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
// Allow the subscriber to run for 30s unless an unrecoverable error occurs.
subscriber.awaitTerminated(30, TimeUnit.SECONDS);
} catch (TimeoutException timeoutException) {
// Shut down the subscriber after 30s. Stop receiving messages.
subscriber.stopAsync();
}
}
}
Node.js
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档。
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const subscriptionName = 'YOUR_SUBSCRIPTION_NAME';
// const maxInProgress = 5;
// const timeout = 10;
// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');
// Creates a client; cache this for further use
const pubSubClient = new PubSub();
async function subscribeWithFlowControlSettings() {
const subscriberOptions = {
flowControl: {
maxMessages: maxInProgress,
},
};
// References an existing subscription.
// Note that flow control settings are not persistent across subscribers.
const subscription = pubSubClient.subscription(
subscriptionName,
subscriberOptions
);
console.log(
`Subscriber to subscription ${subscription.name} is ready to receive messages at a controlled volume of ${maxInProgress} messages.`
);
const messageHandler = message => {
console.log(`Received message: ${message.id}`);
console.log(`\tData: ${message.data}`);
console.log(`\tAttributes: ${message.attributes}`);
// "Ack" (acknowledge receipt of) the message
message.ack();
};
subscription.on('message', messageHandler);
setTimeout(() => {
subscription.close();
}, timeout * 1000);
}
Python
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档。
from concurrent.futures import TimeoutError
from google.cloud import pubsub_v1
# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# Number of seconds the subscriber should listen for messages
# timeout = 5.0
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)
def callback(message):
print(f"Received {message.data}.")
message.ack()
# Limit the subscriber to only have ten outstanding messages at a time.
flow_control = pubsub_v1.types.FlowControl(max_messages=10)
streaming_pull_future = subscriber.subscribe(
subscription_path, callback=callback, flow_control=flow_control
)
print(f"Listening for messages on {subscription_path}..\n")
# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
try:
# When `timeout` is not set, result() will block indefinitely,
# unless an exception is encountered first.
streaming_pull_future.result(timeout=timeout)
except TimeoutError:
streaming_pull_future.cancel()
Ruby
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Ruby 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Ruby API 参考文档。
# subscription_name = "Your Pubsub subscription name"
require "google/cloud/pubsub"
pubsub = Google::Cloud::Pubsub.new
subscription = pubsub.subscription subscription_name
subscriber = subscription.listen inventory: 10 do |received_message|
puts "Received message: #{received_message.data}"
received_message.acknowledge!
end
subscriber.start
# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60
subscriber.stop.wait!
较为普遍的情况是,如果需要实施流控制,则表明消息的发布速率高于消耗速率。如果此状态长时间存在,而不是在消息量瞬态峰值时出现,请考虑增加订阅者客户端实例的数量。
并发控制
是否支持并发取决于您使用的编程语言。对于支持并行线程的语言实现(如 Java 和 Go),客户端库会默认选择线程数。此选择可能不是您的应用的最佳选择。例如,如果您发现自己的订阅者应用无法应对传入消息量,但并非受限于 CPU,则应增加线程数。对于 CPU 密集型消息处理操作,减少线程数可能是适宜的办法。
以下示例说明如何控制订阅者中的并发:
C++
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C++ 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C++ API 参考文档。
namespace pubsub = google::cloud::pubsub;
using google::cloud::future;
using google::cloud::StatusOr;
[](std::string project_id, std::string subscription_id) {
// Create a subscriber with 16 threads handling I/O work, by default the
// library creates `std::thread::hardware_concurrency()` threads.
auto subscriber = pubsub::Subscriber(pubsub::MakeSubscriberConnection(
pubsub::Subscription(std::move(project_id), std::move(subscription_id)),
pubsub::SubscriberOptions{}.set_max_concurrency(8),
pubsub::ConnectionOptions{}.set_background_thread_pool_size(16)));
std::mutex mu;
std::condition_variable cv;
int count = 0;
auto constexpr kExpectedMessageCount = 4;
auto handler = [&](pubsub::Message const& m, pubsub::AckHandler h) {
// This handler executes in the I/O threads, applications could use,
// std::async(), a thread-pool,
// google::cloud::CompletionQueue::RunAsync(), or any other mechanism to
// transfer the execution to other threads.
std::cout << "Received message " << m << "\n";
std::move(h).ack();
{
std::lock_guard<std::mutex> lk(mu);
if (++count < kExpectedMessageCount) return;
}
cv.notify_one();
};
// Create a subscription where up to 8 messages are handled concurrently. By
// default the library uses `std::thread::hardware_concurrency()` as the
// maximum number of concurrent callbacks.
auto session = subscriber.Subscribe(std::move(handler));
{
std::unique_lock<std::mutex> lk(mu);
cv.wait(lk, [&] { return count >= kExpectedMessageCount; });
}
session.cancel();
auto status = session.get();
std::cout << "Message count: " << count << ", status: " << status << "\n";
}
Go
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Go 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Go API 参考文档。
import (
"context"
"fmt"
"io"
"runtime"
"sync/atomic"
"time"
"cloud.google.com/go/pubsub"
)
func pullMsgsConcurrenyControl(w io.Writer, projectID, subID string) error {
// projectID := "my-project-id"
// subID := "my-sub"
ctx := context.Background()
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
sub := client.Subscription(subID)
// Must set ReceiveSettings.Synchronous to false (or leave as default) to enable
// concurrency settings. Otherwise, NumGoroutines will be set to 1.
sub.ReceiveSettings.Synchronous = false
// NumGoroutines is the number of goroutines sub.Receive will spawn to pull
// messages concurrently.
sub.ReceiveSettings.NumGoroutines = runtime.NumCPU()
// Receive messages for 10 seconds.
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
var counter int32
// Receive blocks until the context is cancelled or an error occurs.
err = sub.Receive(ctx, func(_ context.Context, msg *pubsub.Message) {
// The message handler passed to Receive may be called concurrently
// so it's okay to process the messages concurrently but make sure
// to synchronize access to shared memory.
atomic.AddInt32(&counter, 1)
msg.Ack()
})
if err != nil {
return fmt.Errorf("pubsub: Receive returned error: %v", err)
}
fmt.Fprintf(w, "Received %d messages\n", counter)
return nil
}
Java
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档。
import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.InstantiatingExecutorProvider;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class SubscribeWithConcurrencyControlExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
String subscriptionId = "your-subscription-id";
subscribeWithConcurrencyControlExample(projectId, subscriptionId);
}
public static void subscribeWithConcurrencyControlExample(
String projectId, String subscriptionId) {
ProjectSubscriptionName subscriptionName =
ProjectSubscriptionName.of(projectId, subscriptionId);
// Instantiate an asynchronous message receiver.
MessageReceiver receiver =
(PubsubMessage message, AckReplyConsumer consumer) -> {
// Handle incoming message, then ack the received message.
System.out.println("Id: " + message.getMessageId());
System.out.println("Data: " + message.getData().toStringUtf8());
consumer.ack();
};
Subscriber subscriber = null;
try {
// Provides an executor service for processing messages. The default `executorProvider` used
// by the subscriber has a default thread count of 5.
ExecutorProvider executorProvider =
InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(4).build();
// `setParallelPullCount` determines how many StreamingPull streams the subscriber will open
// to receive message. It defaults to 1. `setExecutorProvider` configures an executor for the
// subscriber to process messages. Here, the subscriber is configured to open 2 streams for
// receiving messages, each stream creates a new executor with 4 threads to help process the
// message callbacks. In total 2x4=8 threads are used for message processing.
subscriber =
Subscriber.newBuilder(subscriptionName, receiver)
.setParallelPullCount(2)
.setExecutorProvider(executorProvider)
.build();
// Start the subscriber.
subscriber.startAsync().awaitRunning();
System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
// Allow the subscriber to run for 30s unless an unrecoverable error occurs.
subscriber.awaitTerminated(30, TimeUnit.SECONDS);
} catch (TimeoutException timeoutException) {
// Shut down the subscriber after 30s. Stop receiving messages.
subscriber.stopAsync();
}
}
}
Ruby
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Ruby 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Ruby API 参考文档。
# subscription_name = "Your Pubsub subscription name"
require "google/cloud/pubsub"
pubsub = Google::Cloud::Pubsub.new
subscription = pubsub.subscription subscription_name
# Use 2 threads for streaming, 4 threads for executing callbacks and 2 threads
# for sending acknowledgements and/or delays
subscriber = subscription.listen streams: 2, threads: {
callback: 4,
push: 2
} do |received_message|
puts "Received message: #{received_message.data}"
received_message.acknowledge!
end
subscriber.start
# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60
subscriber.stop.wait!
是否支持并发取决于您使用的编程语言。如需了解更多信息,请参阅 API 参考文档。
StreamingPull
Pub/Sub 服务具有两个用于检索消息的 API:
在可能的情况下,Cloud 客户端库使用 StreamingPull 来最大限度提高吞吐量并缩短延迟时间。尽管您可能永远不会直接使用 StreamingPull API,但了解 StreamingPull 的一些关键属性以及它与传统拉取方法的不同之处却很重要。
拉取方法依赖于请求/响应模型:
- 客户端向服务器发送消息请求。
- 服务器不回复或回复多条消息并关闭连接。
StreamingPull 服务 API 依赖持久双向连接来接收多条消息(当有消息时):
- 客户端向服务器发送建立连接的请求。
- 服务器持续向连接的客户端发送消息。
- 连接最终由客户端或服务器关闭。
您为订阅者提供回调,订阅者将以异步方式运行每条消息的回调。如果订阅者收到具有相同排序键的消息,则客户端库将按顺序运行回调。Pub/Sub 服务会尽最大努力将这些消息发送给同一订阅者。
StreamingPull 的错误率为 100%(这是预期行为)
StreamingPull 流始终以不正常状态关闭。请注意,与常规远程过程调用 (RPC) 不同,此处的状态仅表示流已中断,而不表示请求失败。因此,虽然 StreamingPull API 错误率可能看似高达 100%,但这是特意设计的。
诊断 StreamingPull 错误
由于 StreamingPull 流始终以错误关闭,因此在诊断错误时检查流终止指标并没有什么帮助。不妨关注 StreamingPull 消息操作指标 (subscription/streaming_pull_message_operation_count
)。请查找以下错误:
- 在以下情况下,可能会出现
FAILED_PRECONDITION
错误:- Pub/Sub 尝试使用已停用的 Cloud KMS 密钥解密消息。
- 如果订阅积压输入量中存在已停用的 Cloud KMS 密钥加密的消息,则订阅可能会暂停。
UNAVAILABLE
错误
处理积压的大量简短消息
gRPC StreamingPull 堆栈针对高吞吐量进行了优化,因此会缓冲消息。但如果您试图处理积压的大量简短消息(而不是稳定的新消息流),可能会产生一些不良后果。在这些情况下,消息可能会被多次传送,并且可能无法有效地在客户端之间实现负载平衡。
Pub/Sub 服务和客户端库用户空间之间的缓冲区大约为 10MB。要了解此缓冲区对客户端库行为的影响,请考虑以下示例:
- 某订阅上积压了 1 万条消息,每条大小为 1KB。
- 单线程客户端实例按顺序对其进行处理,每条消息需要 1 秒钟时间。
- 第一个为该订阅建立 StreamingPull 至服务的连接的客户端实例将使用这 1 万条消息填充该缓冲。
- 处理该缓冲需要 1 万秒(将近 3 个小时)时间。
- 在此期间,一些缓冲消息超过确认时限,并被重新发送到同一个客户端,从而导致重复。
- 即使有多个客户端实例正在运行,卡在一个客户端缓冲区中的消息也将无法分流到任何其他实例。
如果消息以稳定的速率送达,而不是以单次大批量送达,则不会发生这种情况。服务不会一下子接收到全部 10MB 消息,因此能够有效地在多个订阅者之间平衡消息负载。
要应对这种情况,请使用推送订阅或拉取 API;目前某些 Cloud 客户端库(请参阅“同步拉取”部分)和所有 API 客户端库都提供拉取 API。如需了解详情,请参阅客户端库文档。
同步拉取
在某些情况下,异步拉取并不非常适合您的应用。例如,应用逻辑可能依赖轮询模式来检索消息,或者需要对客户端在任何给定时间检索的消息数量进行精确限制。为了支持此类应用,该服务支持同步拉取方法。
C#
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C# 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub C# API 参考文档。
using Google.Cloud.PubSub.V1;
using Grpc.Core;
using System;
using System.Linq;
using System.Text;
using System.Threading;
public class PullMessagesSyncSample
{
public int PullMessagesSync(string projectId, string subscriptionId, bool acknowledge)
{
SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);
SubscriberServiceApiClient subscriberClient = SubscriberServiceApiClient.Create();
int messageCount = 0;
try
{
// Pull messages from server,
// allowing an immediate response if there are no messages.
PullResponse response = subscriberClient.Pull(subscriptionName, returnImmediately: false, maxMessages: 20);
// Print out each received message.
foreach (ReceivedMessage msg in response.ReceivedMessages)
{
string text = Encoding.UTF8.GetString(msg.Message.Data.ToArray());
Console.WriteLine($"Message {msg.Message.MessageId}: {text}");
Interlocked.Increment(ref messageCount);
}
// If acknowledgement required, send to server.
if (acknowledge && messageCount > 0)
{
subscriberClient.Acknowledge(subscriptionName, response.ReceivedMessages.Select(msg => msg.AckId));
}
}
catch (RpcException ex) when (ex.Status.StatusCode == StatusCode.Unavailable)
{
// UNAVAILABLE due to too many concurrent pull requests pending for the given subscription.
}
return messageCount;
}
}
Go
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Go 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Go API 参考文档。
import (
"context"
"fmt"
"io"
"time"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"cloud.google.com/go/pubsub"
)
func pullMsgsSync(w io.Writer, projectID, subID string) error {
// projectID := "my-project-id"
// subID := "my-sub"
ctx := context.Background()
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
sub := client.Subscription(subID)
// Turn on synchronous mode. This makes the subscriber use the Pull RPC rather
// than the StreamingPull RPC, which is useful for guaranteeing MaxOutstandingMessages,
// the max number of messages the client will hold in memory at a time.
sub.ReceiveSettings.Synchronous = true
sub.ReceiveSettings.MaxOutstandingMessages = 10
// Receive messages for 5 seconds.
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
// Create a channel to handle messages to as they come in.
cm := make(chan *pubsub.Message)
defer close(cm)
// Handle individual messages in a goroutine.
go func() {
for msg := range cm {
fmt.Fprintf(w, "Got message :%q\n", string(msg.Data))
msg.Ack()
}
}()
// Receive blocks until the passed in context is done.
err = sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
cm <- msg
})
if err != nil && status.Code(err) != codes.Canceled {
return fmt.Errorf("Receive: %v", err)
}
return nil
}
Java
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档。
import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
import com.google.pubsub.v1.AcknowledgeRequest;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PullRequest;
import com.google.pubsub.v1.PullResponse;
import com.google.pubsub.v1.ReceivedMessage;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class SubscribeSyncExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
String subscriptionId = "your-subscription-id";
Integer numOfMessages = 10;
subscribeSyncExample(projectId, subscriptionId, numOfMessages);
}
public static void subscribeSyncExample(
String projectId, String subscriptionId, Integer numOfMessages) throws IOException {
SubscriberStubSettings subscriberStubSettings =
SubscriberStubSettings.newBuilder()
.setTransportChannelProvider(
SubscriberStubSettings.defaultGrpcTransportProviderBuilder()
.setMaxInboundMessageSize(20 * 1024 * 1024) // 20MB (maximum message size).
.build())
.build();
try (SubscriberStub subscriber = GrpcSubscriberStub.create(subscriberStubSettings)) {
String subscriptionName = ProjectSubscriptionName.format(projectId, subscriptionId);
PullRequest pullRequest =
PullRequest.newBuilder()
.setMaxMessages(numOfMessages)
.setSubscription(subscriptionName)
.build();
// Use pullCallable().futureCall to asynchronously perform this operation.
PullResponse pullResponse = subscriber.pullCallable().call(pullRequest);
List<String> ackIds = new ArrayList<>();
for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) {
// Handle received message
// ...
ackIds.add(message.getAckId());
}
// Acknowledge received messages.
AcknowledgeRequest acknowledgeRequest =
AcknowledgeRequest.newBuilder()
.setSubscription(subscriptionName)
.addAllAckIds(ackIds)
.build();
// Use acknowledgeCallable().futureCall to asynchronously perform this operation.
subscriber.acknowledgeCallable().call(acknowledgeRequest);
System.out.println(pullResponse.getReceivedMessagesList());
}
}
}
Node.js
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档。
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const projectId = 'YOUR_PROJECT_ID';
// const subscriptionName = 'YOUR_SUBSCRIPTION_NAME';
// Imports the Google Cloud client library. v1 is for the lower level
// proto access.
const {v1} = require('@google-cloud/pubsub');
// Creates a client; cache this for further use.
const subClient = new v1.SubscriberClient();
async function synchronousPull() {
const formattedSubscription = subClient.subscriptionPath(
projectId,
subscriptionName
);
// The maximum number of messages returned for this request.
// Pub/Sub may return fewer than the number specified.
const request = {
subscription: formattedSubscription,
maxMessages: 10,
};
// The subscriber pulls a specified number of messages.
const [response] = await subClient.pull(request);
// Process the messages.
const ackIds = [];
for (const message of response.receivedMessages) {
console.log(`Received message: ${message.message.data}`);
ackIds.push(message.ackId);
}
// Acknowledge all of the messages. You could also ackknowledge
// these individually, but this is more efficient.
const ackRequest = {
subscription: formattedSubscription,
ackIds: ackIds,
};
await subClient.acknowledge(ackRequest);
console.log('Done.');
}
synchronousPull().catch(console.error);
PHP
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档。
use Google\Cloud\PubSub\PubSubClient;
/**
* Pulls all Pub/Sub messages for a subscription.
*
* @param string $projectId The Google project ID.
* @param string $subscriptionName The Pub/Sub subscription name.
*/
function pull_messages($projectId, $subscriptionName)
{
$pubsub = new PubSubClient([
'projectId' => $projectId,
]);
$subscription = $pubsub->subscription($subscriptionName);
foreach ($subscription->pull() as $message) {
printf('Message: %s' . PHP_EOL, $message->data());
// Acknowledge the Pub/Sub message has been received, so it will not be pulled multiple times.
$subscription->acknowledge($message);
}
}
协议
请求:
POST https://pubsub.googleapis.com/v1/projects/myproject/subscriptions/mysubscription:pull
{
"returnImmediately": "false",
"maxMessages": "1"
}
响应:
200 OK
{
"receivedMessages": [{
"ackId": "dQNNHlAbEGEIBERNK0EPKVgUWQYyODM2LwgRHFEZDDsLRk1SK...",
"message": {
"data": "SGVsbG8gQ2xvdWQgUHViL1N1YiEgSGVyZSBpcyBteSBtZXNzYWdlIQ==",
"messageId": "19917247034"
}
}]
}
请求:
POST https://pubsub.googleapis.com/v1/projects/myproject/subscriptions/mysubscription:acknowledge
{
"ackIds": [
"dQNNHlAbEGEIBERNK0EPKVgUWQYyODM2LwgRHFEZDDsLRk1SK..."
]
}
Python
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档。
from google.api_core import retry
from google.cloud import pubsub_v1
# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)
NUM_MESSAGES = 3
# Wrap the subscriber in a 'with' block to automatically call close() to
# close the underlying gRPC channel when done.
with subscriber:
# The subscriber pulls a specific number of messages. The actual
# number of messages pulled may be smaller than max_messages.
response = subscriber.pull(
request={"subscription": subscription_path, "max_messages": NUM_MESSAGES},
retry=retry.Retry(deadline=300),
)
ack_ids = []
for received_message in response.received_messages:
print(f"Received: {received_message.message.data}.")
ack_ids.append(received_message.ack_id)
# Acknowledges the received messages so they will not be sent again.
subscriber.acknowledge(
request={"subscription": subscription_path, "ack_ids": ack_ids}
)
print(
f"Received and acknowledged {len(response.received_messages)} messages from {subscription_path}."
)
Ruby
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Ruby 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Ruby API 参考文档。
# subscription_name = "Your Pubsub subscription name"
require "google/cloud/pubsub"
pubsub = Google::Cloud::Pubsub.new
subscription = pubsub.subscription subscription_name
subscription.pull.each do |message|
puts "Message pulled: #{message.data}"
message.acknowledge!
end
Pub/Sub 提供消息列表。如果该列表包含多条消息,则 Pub/Sub 会对排序键相同的消息进行排序。
请注意,要通过同步拉取来实现较短的消息传送延迟时间,同时拥有许多待处理的拉取请求很重要。随着主题吞吐量的增加,需要更多的拉取请求。通常,对于某些不宜延迟的应用,异步拉取更合适。
通过租期管理实现同步拉取
单条消息的处理时间可能会超出预先配置的确认时限(也称为租期)。为了避免重新传送这些消息,客户端库提供了一种重置其确认时限的方法(Go 客户端库除外,它会自动修改轮询消息的确认时限),如以下示例所示:
C#
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C# 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub C# API 参考文档。
using Google.Cloud.PubSub.V1;
using Grpc.Core;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
public class PullMessageWithLeaseManagementSample
{
public int PullMessageWithLeaseManagement(string projectId, string subscriptionId, bool acknowledge)
{
SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);
SubscriberServiceApiClient subscriberClient = SubscriberServiceApiClient.Create();
var ackIds = new List<string>();
try
{
PullResponse response = subscriberClient.Pull(subscriptionName, returnImmediately: false, maxMessages: 20);
// Print out each received message.
foreach (ReceivedMessage msg in response.ReceivedMessages)
{
ackIds.Add(msg.AckId);
string text = Encoding.UTF8.GetString(msg.Message.Data.ToArray());
Console.WriteLine($"Message {msg.Message.MessageId}: {text}");
// Modify the ack deadline of each received message from the default 10 seconds to 30.
// This prevents the server from redelivering the message after the default 10 seconds
// have passed.
subscriberClient.ModifyAckDeadline(subscriptionName, new List<string> { msg.AckId }, 30);
}
// If acknowledgement required, send to server.
if (acknowledge && ackIds.Count > 0)
{
subscriberClient.Acknowledge(subscriptionName, ackIds);
}
}
catch (RpcException ex) when (ex.Status.StatusCode == StatusCode.Unavailable)
{
// UNAVAILABLE due to too many concurrent pull requests pending for the given subscription.
}
return ackIds.Count;
}
}
Java
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档。
import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
import com.google.pubsub.v1.AcknowledgeRequest;
import com.google.pubsub.v1.ModifyAckDeadlineRequest;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PullRequest;
import com.google.pubsub.v1.PullResponse;
import com.google.pubsub.v1.ReceivedMessage;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class SubscribeSyncWithLeaseExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
String subscriptionId = "your-subscription-id";
Integer numOfMessages = 10;
projectId = "tz-playground-bigdata";
subscriptionId = "uno";
subscribeSyncWithLeaseExample(projectId, subscriptionId, numOfMessages);
}
public static void subscribeSyncWithLeaseExample(
String projectId, String subscriptionId, Integer numOfMessages)
throws IOException, InterruptedException {
SubscriberStubSettings subscriberStubSettings =
SubscriberStubSettings.newBuilder()
.setTransportChannelProvider(
SubscriberStubSettings.defaultGrpcTransportProviderBuilder()
.setMaxInboundMessageSize(20 << 20) // 20 MB
.build())
.build();
try (SubscriberStub subscriber = GrpcSubscriberStub.create(subscriberStubSettings)) {
String subscriptionName = ProjectSubscriptionName.format(projectId, subscriptionId);
PullRequest pullRequest =
PullRequest.newBuilder()
.setMaxMessages(numOfMessages)
.setSubscription(subscriptionName)
.build();
// Use pullCallable().futureCall to asynchronously perform this operation.
PullResponse pullResponse = subscriber.pullCallable().call(pullRequest);
List<String> ackIds = new ArrayList<>();
for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) {
ackIds.add(message.getAckId());
// Modify the ack deadline of each received message from the default 10 seconds to 30.
// This prevents the server from redelivering the message after the default 10 seconds
// have passed.
ModifyAckDeadlineRequest modifyAckDeadlineRequest =
ModifyAckDeadlineRequest.newBuilder()
.setSubscription(subscriptionName)
.addAckIds(message.getAckId())
.setAckDeadlineSeconds(30)
.build();
subscriber.modifyAckDeadlineCallable().call(modifyAckDeadlineRequest);
}
// Acknowledge received messages.
AcknowledgeRequest acknowledgeRequest =
AcknowledgeRequest.newBuilder()
.setSubscription(subscriptionName)
.addAllAckIds(ackIds)
.build();
// Use acknowledgeCallable().futureCall to asynchronously perform this operation.
subscriber.acknowledgeCallable().call(acknowledgeRequest);
System.out.println(pullResponse.getReceivedMessagesList());
}
}
}
Node.js
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档。
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const projectId = 'YOUR_PROJECT_ID';
// const subscriptionName = 'YOUR_SUBSCRIPTION_NAME';
// Imports the Google Cloud client library. v1 is for the lower level
// proto access.
const {v1} = require('@google-cloud/pubsub');
// Creates a client; cache this for further use.
const subClient = new v1.SubscriberClient();
async function synchronousPullWithLeaseManagement() {
const formattedSubscription = subClient.subscriptionPath(
projectId,
subscriptionName
);
// The maximum number of messages returned for this request.
// Pub/Sub may return fewer than the number specified.
const maxMessages = 1;
const newAckDeadlineSeconds = 30;
const request = {
subscription: formattedSubscription,
maxMessages: maxMessages,
};
let isProcessed = false;
// The worker function is meant to be non-blocking. It starts a long-
// running process, such as writing the message to a table, which may
// take longer than the default 10-sec acknowledge deadline.
function worker(message) {
console.log(`Processing "${message.message.data}"...`);
setTimeout(() => {
console.log(`Finished procesing "${message.message.data}".`);
isProcessed = true;
}, 30000);
}
// The subscriber pulls a specified number of messages.
const [response] = await subClient.pull(request);
// Obtain the first message.
const message = response.receivedMessages[0];
// Send the message to the worker function.
worker(message);
let waiting = true;
while (waiting) {
await new Promise(r => setTimeout(r, 10000));
// If the message has been processed..
if (isProcessed) {
const ackRequest = {
subscription: formattedSubscription,
ackIds: [message.ackId],
};
//..acknowledges the message.
await subClient.acknowledge(ackRequest);
console.log(`Acknowledged: "${message.message.data}".`);
// Exit after the message is acknowledged.
waiting = false;
console.log('Done.');
} else {
// If the message is not yet processed..
const modifyAckRequest = {
subscription: formattedSubscription,
ackIds: [message.ackId],
ackDeadlineSeconds: newAckDeadlineSeconds,
};
//..reset its ack deadline.
await subClient.modifyAckDeadline(modifyAckRequest);
console.log(
`Reset ack deadline for "${message.message.data}" for ${newAckDeadlineSeconds}s.`
);
}
}
}
synchronousPullWithLeaseManagement().catch(console.error);
Python
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档。
import logging
import multiprocessing
import sys
import time
from google.api_core import retry
from google.cloud import pubsub_v1
multiprocessing.log_to_stderr()
logger = multiprocessing.get_logger()
logger.setLevel(logging.INFO)
processes = dict()
# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)
response = subscriber.pull(
request={"subscription": subscription_path, "max_messages": 3},
retry=retry.Retry(deadline=300),
)
# Start a process for each message based on its size modulo 10.
for message in response.received_messages:
process = multiprocessing.Process(
target=time.sleep, args=(sys.getsizeof(message) % 10,)
)
processes[process] = (message.ack_id, message.message.data)
process.start()
while processes:
# Take a break every second.
if processes:
time.sleep(1)
for process in list(processes):
ack_id, msg_data = processes[process]
# If the process is running, reset the ack deadline.
if process.is_alive():
subscriber.modify_ack_deadline(
request={
"subscription": subscription_path,
"ack_ids": [ack_id],
# Must be between 10 and 600.
"ack_deadline_seconds": 15,
}
)
logger.info(f"Reset ack deadline for {msg_data}.")
# If the process is complete, acknowledge the message.
else:
subscriber.acknowledge(
request={"subscription": subscription_path, "ack_ids": [ack_id]}
)
logger.info(f"Acknowledged {msg_data}.")
processes.pop(process)
print(
f"Received and acknowledged {len(response.received_messages)} messages from {subscription_path}."
)
# Close the underlying gPRC channel. Alternatively, wrap subscriber in
# a 'with' block to automatically call close() when done.
subscriber.close()
Ruby
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Ruby 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Ruby API 参考文档。
# subscription_name = "Your Pubsub subscription name"
require "google/cloud/pubsub"
pubsub = Google::Cloud::Pubsub.new
subscription = pubsub.subscription subscription_name
new_ack_deadline = 30
processed = false
# The subscriber pulls a specified number of messages.
received_messages = subscription.pull max: 1
# Obtain the first message.
message = received_messages.first
# Send the message to a non-blocking worker that starts a long-running process, such as writing
# the message to a table, which may take longer than the default 10-sec acknowledge deadline.
Thread.new do
sleep 15
processed = true
puts "Finished processing \"#{message.data}\"."
end
loop do
sleep 10
if processed
# If the message has been processed, acknowledge the message.
message.acknowledge!
puts "Done."
# Exit after the message is acknowledged.
break
else
# If the message has not yet been processed, reset its ack deadline.
message.modify_ack_deadline! new_ack_deadline
puts "Reset ack deadline for \"#{message.data}\" for #{new_ack_deadline} seconds."
end
end
扩缩
您可能需要为订阅者应用实施扩缩机制来应对消息量。如何做到这一点取决于您的环境,但通常基于通过Google Cloud 操作套件监控服务提供的积压指标。如需详细了解如何为 Compute Engine 执行此操作,请参阅基于 Cloud Monitoring 指标进行扩缩。
请转到 GCP 指标列表的 Pub/Sub 部分页面,了解可以通过编程方式监控哪些指标。
最后,与所有分布式服务一样,每个请求偶尔会重试。
处理重复项并强制重试
如果您在确认时限之前未确认消息,Pub/Sub 会重新发送该消息。因此,Pub/Sub 可能会发送重复的消息。请使用 Google Cloud 的操作套件来监控具有 expired
响应代码的确认操作以检测此情况。如需获取此数据,请选择确认消息操作指标,然后按 response_code
标签对其进行分组或过滤。请注意,response_code
是指标上的系统标签,不是指标。

为了降低重复率,请延长消息时限。
- 客户端库会自动延长时限,但您应注意,可配置的最大延长时限有默认限制。
- 如果要构建自己的客户端库,请使用
modifyAckDeadline
方法延长确认时限。
或者,如需强制 Pub/Sub 重试消息,请将 modifyAckDeadline
设置为 0。