当消息传送给拉取订阅方时,订阅方必须在确认时限内处理和确认 (确认) 消息。否则,订阅者必须通过调用修改确认时限来延长该时限。
Pub/Sub 高级客户端库提供租期管理这一功能,可自动延长尚未确认的消息的时限。默认情况下,客户端库可以通过定期发出 modifyAckDeadline 请求将截止时间延长到一小时。Python、Go、Java 和 .Net 的高阶客户端库使用确认延迟的第 99 百分位来确定每个扩展程序的长度。
与配置订阅级属性相比,租期管理可让您更精细地控制消息的确认期限。如果您只使用订阅级确认截止期限,则必须在低值和高值之间进行权衡。值越小,出现重复消息的可能性就越大,值越大,消息越有可能重新传送失败的消息。确定合适的值可能很困难,尤其是不同消息的预期处理时间差异很大时。
如需详细了解订阅的属性(包括确认截止期限),请参阅订阅属性。
租期管理配置
您可以在高级客户端库中配置以下属性来控制租期管理。
确认延期最长期限。您可以使用
modify acknowledgment deadline
请求延长消息的确认截止期限的最长时间。此属性可让您确定您希望订阅者客户端处理消息的时长。每次确认延期的最长持续时间。每个
modify acknowledgment deadline
请求的确认截止期限的最长延长时间。此属性可让您定义 Pub/Sub 重新提交消息所用的时间。当处理消息的第一个订阅者崩溃或运行状况不佳且无法再发送modify acknowledgment deadline
请求时,就会发生重新传送。每次确认延期的最短时长。每个
modify acknowledgment deadline
请求的确认截止期限的最短延长时间。通过此属性,您可以指定在重新传送消息之前必须经过的最短时间。
除非您启用正好一次传送,否则无法保证遵循确认时限。
手动管理确认时限
为避免在使用一元拉取或低级别客户端库时消息过期并重新传送,请使用 modify acknowledgment deadline
请求延长其确认期限。Go 和 C++ 高级客户端库属于例外情况,它们在使用一元拉取时提供租期管理。如需了解使用租期管理的一元拉取,请参阅以下示例:
C#
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C# 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub C# API 参考文档。
using Google.Cloud.PubSub.V1;
using Grpc.Core;
using System;
using System.Collections.Generic;
public class PullMessageWithLeaseManagementSample
{
public int PullMessageWithLeaseManagement(string projectId, string subscriptionId, bool acknowledge)
{
SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);
SubscriberServiceApiClient subscriberClient = SubscriberServiceApiClient.Create();
var ackIds = new List<string>();
try
{
PullResponse response = subscriberClient.Pull(subscriptionName, maxMessages: 20);
// Print out each received message.
foreach (ReceivedMessage msg in response.ReceivedMessages)
{
ackIds.Add(msg.AckId);
string text = msg.Message.Data.ToStringUtf8();
Console.WriteLine($"Message {msg.Message.MessageId}: {text}");
// Modify the ack deadline of each received message from the default 10 seconds to 30.
// This prevents the server from redelivering the message after the default 10 seconds
// have passed.
subscriberClient.ModifyAckDeadline(subscriptionName, new List<string> { msg.AckId }, 30);
}
// If acknowledgement required, send to server.
if (acknowledge && ackIds.Count > 0)
{
subscriberClient.Acknowledge(subscriptionName, ackIds);
}
}
catch (RpcException ex) when (ex.Status.StatusCode == StatusCode.Unavailable)
{
// UNAVAILABLE due to too many concurrent pull requests pending for the given subscription.
}
return ackIds.Count;
}
}
Java
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档。
import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
import com.google.pubsub.v1.AcknowledgeRequest;
import com.google.pubsub.v1.ModifyAckDeadlineRequest;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PullRequest;
import com.google.pubsub.v1.PullResponse;
import com.google.pubsub.v1.ReceivedMessage;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class SubscribeSyncWithLeaseExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
String subscriptionId = "your-subscription-id";
Integer numOfMessages = 10;
subscribeSyncWithLeaseExample(projectId, subscriptionId, numOfMessages);
}
public static void subscribeSyncWithLeaseExample(
String projectId, String subscriptionId, Integer numOfMessages)
throws IOException, InterruptedException {
SubscriberStubSettings subscriberStubSettings =
SubscriberStubSettings.newBuilder()
.setTransportChannelProvider(
SubscriberStubSettings.defaultGrpcTransportProviderBuilder()
.setMaxInboundMessageSize(20 << 20) // 20 MB
.build())
.build();
try (SubscriberStub subscriber = GrpcSubscriberStub.create(subscriberStubSettings)) {
String subscriptionName = ProjectSubscriptionName.format(projectId, subscriptionId);
PullRequest pullRequest =
PullRequest.newBuilder()
.setMaxMessages(numOfMessages)
.setSubscription(subscriptionName)
.build();
// Use pullCallable().futureCall to asynchronously perform this operation.
PullResponse pullResponse = subscriber.pullCallable().call(pullRequest);
// Stop the program if the pull response is empty to avoid acknowledging
// an empty list of ack IDs.
if (pullResponse.getReceivedMessagesList().isEmpty()) {
System.out.println("No message was pulled. Exiting.");
return;
}
List<String> ackIds = new ArrayList<>();
for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) {
ackIds.add(message.getAckId());
// Modify the ack deadline of each received message from the default 10 seconds to 30.
// This prevents the server from redelivering the message after the default 10 seconds
// have passed.
ModifyAckDeadlineRequest modifyAckDeadlineRequest =
ModifyAckDeadlineRequest.newBuilder()
.setSubscription(subscriptionName)
.addAckIds(message.getAckId())
.setAckDeadlineSeconds(30)
.build();
subscriber.modifyAckDeadlineCallable().call(modifyAckDeadlineRequest);
}
// Acknowledge received messages.
AcknowledgeRequest acknowledgeRequest =
AcknowledgeRequest.newBuilder()
.setSubscription(subscriptionName)
.addAllAckIds(ackIds)
.build();
// Use acknowledgeCallable().futureCall to asynchronously perform this operation.
subscriber.acknowledgeCallable().call(acknowledgeRequest);
System.out.println(pullResponse.getReceivedMessagesList());
}
}
}
Node.js
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档。
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const projectId = 'YOUR_PROJECT_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// Imports the Google Cloud client library. v1 is for the lower level
// proto access.
const {v1} = require('@google-cloud/pubsub');
// Creates a client; cache this for further use.
const subClient = new v1.SubscriberClient();
async function synchronousPullWithLeaseManagement() {
// The low level API client requires a name only.
const formattedSubscription =
subscriptionNameOrId.indexOf('/') >= 0
? subscriptionNameOrId
: subClient.subscriptionPath(projectId, subscriptionNameOrId);
// The maximum number of messages returned for this request.
// Pub/Sub may return fewer than the number specified.
const maxMessages = 1;
const newAckDeadlineSeconds = 30;
const request = {
subscription: formattedSubscription,
maxMessages: maxMessages,
allowExcessMessages: false,
};
let isProcessed = false;
// The worker function is meant to be non-blocking. It starts a long-
// running process, such as writing the message to a table, which may
// take longer than the default 10-sec acknowledge deadline.
function worker(message) {
console.log(`Processing "${message.message.data}"...`);
setTimeout(() => {
console.log(`Finished procesing "${message.message.data}".`);
isProcessed = true;
}, 30000);
}
// The subscriber pulls a specified number of messages.
const [response] = await subClient.pull(request);
// Obtain the first message.
const message = response.receivedMessages[0];
// Send the message to the worker function.
worker(message);
let waiting = true;
while (waiting) {
await new Promise(r => setTimeout(r, 10000));
// If the message has been processed..
if (isProcessed) {
const ackRequest = {
subscription: formattedSubscription,
ackIds: [message.ackId],
};
//..acknowledges the message.
await subClient.acknowledge(ackRequest);
console.log(`Acknowledged: "${message.message.data}".`);
// Exit after the message is acknowledged.
waiting = false;
console.log('Done.');
} else {
// If the message is not yet processed..
const modifyAckRequest = {
subscription: formattedSubscription,
ackIds: [message.ackId],
ackDeadlineSeconds: newAckDeadlineSeconds,
};
//..reset its ack deadline.
await subClient.modifyAckDeadline(modifyAckRequest);
console.log(
`Reset ack deadline for "${message.message.data}" for ${newAckDeadlineSeconds}s.`
);
}
}
}
synchronousPullWithLeaseManagement().catch(console.error);
Python
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档。
import logging
import multiprocessing
import sys
import time
from google.api_core import retry
from google.cloud import pubsub_v1
multiprocessing.log_to_stderr()
logger = multiprocessing.get_logger()
logger.setLevel(logging.INFO)
processes = dict()
# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)
response = subscriber.pull(
request={"subscription": subscription_path, "max_messages": 3},
retry=retry.Retry(deadline=300),
)
if len(response.received_messages) == 0:
return
# Start a process for each message based on its size modulo 10.
for message in response.received_messages:
process = multiprocessing.Process(
target=time.sleep, args=(sys.getsizeof(message) % 10,)
)
processes[process] = (message.ack_id, message.message.data)
process.start()
while processes:
# Take a break every second.
if processes:
time.sleep(1)
for process in list(processes):
ack_id, msg_data = processes[process]
# If the process is running, reset the ack deadline.
if process.is_alive():
subscriber.modify_ack_deadline(
request={
"subscription": subscription_path,
"ack_ids": [ack_id],
# Must be between 10 and 600.
"ack_deadline_seconds": 15,
}
)
logger.debug(f"Reset ack deadline for {msg_data}.")
# If the process is complete, acknowledge the message.
else:
subscriber.acknowledge(
request={"subscription": subscription_path, "ack_ids": [ack_id]}
)
logger.debug(f"Acknowledged {msg_data}.")
processes.pop(process)
print(
f"Received and acknowledged {len(response.received_messages)} messages from {subscription_path}."
)
# Close the underlying gPRC channel. Alternatively, wrap subscriber in
# a 'with' block to automatically call close() when done.
subscriber.close()
Ruby
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Ruby 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Ruby API 参考文档。
# subscription_id = "your-subscription-id"
pubsub = Google::Cloud::Pubsub.new
subscription = pubsub.subscription subscription_id
new_ack_deadline = 30
processed = false
# The subscriber pulls a specified number of messages.
received_messages = subscription.pull immediate: false, max: 1
# Obtain the first message.
message = received_messages.first
# Send the message to a non-blocking worker that starts a long-running process, such as writing
# the message to a table, which may take longer than the default 10-sec acknowledge deadline.
Thread.new do
sleep 15
processed = true
puts "Finished processing \"#{message.data}\"."
end
loop do
sleep 1
if processed
# If the message has been processed, acknowledge the message.
message.acknowledge!
puts "Done."
# Exit after the message is acknowledged.
break
else
# If the message has not yet been processed, reset its ack deadline.
message.modify_ack_deadline! new_ack_deadline
puts "Reset ack deadline for \"#{message.data}\" for #{new_ack_deadline} seconds."
end
end
后续步骤
了解您可为订阅配置的其他传送选项: