本页面介绍了如何接收精简版订阅的消息。您可以使用 Java 版 Pub/Sub Lite 客户端库接收消息。
精简版订阅将精简版主题连接到订阅者应用;订阅者从精简版订阅接收消息。订阅者会收到发布商应用向精简版主题发送的所有消息,包括发布者在创建精简版订阅之前发送的消息。
在接收精简版订阅的消息之前,请创建精简版主题,针对精简版主题创建精简版订阅,并发布消息。
接收消息
要接收精简版订阅的消息,请向精简版订阅请求消息。客户端库会自动连接到附加到精简版订阅的精简版主题中的分区。 如果多个订阅者客户端进行实例化,则消息将在所有客户端上分发。主题中的分区数量决定了可同时连接到一个订阅的订阅者客户端数上限。
订阅者最长可能需要一分钟才能完成初始化并开始接收消息。初始化后,系统会以最短的延迟时间接收消息。
以下示例展示了如何接收精简版订阅的消息:
gcloud
此命令需要使用 Python 3.6 或更高版本,并且需要安装 grpcio Python 软件包。对于 MacOS、Linux 和 Cloud Shell 用户,请运行以下命令:
sudo pip3 install grpcio
export CLOUDSDK_PYTHON_SITEPACKAGES=1
如需接收消息,请使用 gcloud pubsub lite-subscriptions subscribe 命令:
gcloud pubsub lite-subscriptions subscribe SUBSCRIPTION_ID \
--location=LITE_LOCATION \
--auto-ack
请替换以下内容:
- SUBSCRIPTION_ID:精简版订阅的 ID
- LITE_LOCATION:精简版订阅的位置
Go
在运行此示例之前,请按照 Pub/Sub Lite 客户端库中的 Go 设置说明进行操作。
Java
在运行此示例之前,请按照 Pub/Sub Lite 客户端库中的 Java 设置说明进行操作。
Python
在运行此示例之前,请按照 Pub/Sub Lite 客户端库中的 Python 设置说明进行操作。
客户端库会建立与精简版主题中每个分区的双向流式传输连接。
订阅者请求连接到分区。
Pub/Sub 精简版服务将消息传递给订阅者。
订阅者处理消息后,订阅者必须确认消息。客户端库对回调中的消息进行异步处理和确认。要限制订阅者可以存储在内存中的未确认消息数量,请配置流控制设置。
如果多个订阅者从同一精简版订阅接收消息,Pub/Sub 精简版服务会将每个订阅者连接到相同比例的分区。例如,如果两个订阅者使用一个相同的精简版订阅,并且该精简版订阅附加到具有两个分区的精简版主题,则每个订阅者会收到来自其中一个分区的消息。
确认消息
要确认消息,请向精简版订阅发送确认消息。
Go
如需发送确认,请使用 Message.Ack()
方法。
Java
如需发送确认,请使用 AckReplyConsumer.ack()
方法。
Python
如需发送确认,请使用 Message.ack()
方法。
订阅者必须确认每条消息。订阅者会先收到最早的未确认消息,然后接收每个后续消息。如果订阅者跳过一条消息,确认后续消息,然后重新连接,则订阅者会收到未确认的消息以及每条已确认的后续消息。
精简版订阅没有确认时限,并且 Pub/Sub Lite 服务不会通过开放的串流连接重新确认未确认的消息。
使用流控制
Pub/Sub Lite 服务向订阅者传送消息后,订阅者会将未确认的消息存储在内存中。您可以使用流控制设置限制订阅者可在内存中存储的未完成消息数量。流控制设置适用于订阅者接收消息的每个分区。
您可以配置以下流控制设置:
- 待处理的消息大小。待处理的消息的大小上限(以字节为单位)。最大大小必须大于最大消息的大小。
- 消息数量待处理消息的最大数量。
消息的大小位于 size_bytes
字段中。您可以使用客户端库配置流控制设置。
Go
如需配置流控制设置,请在调用 pscompat.NewSubscriberClientWithSettings
时传入 ReceiveSettings
。您可以在 ReceiveSettings
中设置以下参数:
MaxOutstandingMessages
MaxOutstandingBytes
如需查看示例,请参阅此流控制示例。
Java
如需配置流控制设置,请在 FlowControlRequest.Builder
类中使用以下方法:
Python
如需配置流控制设置,请在 FlowControlSettings
类中设置以下参数:
bytes_outstanding
messages_outstanding
例如,如果消息数量上限为 100,并且订阅者连接到 10 个分区,则订阅者无法从这 10 个分区收到超过 100 条消息。未完成消息的总数可能超过 100,但订阅者无法从每个分区存储超过 100 条消息。