本文档简要介绍了拉取订阅、其工作流以及关联属性。
在拉取订阅中,订阅者客户端从 Pub/Sub 服务器请求消息。
拉取模式可以使用以下两个服务 API 之一:拉取或 StreamingPull。如需运行所选的 API,您可以选择 Google 提供的高层级客户端库,或自动生成的低层级客户端库。您还可以在异步和同步消息处理之间进行选择。
准备工作
在阅读本文档之前,请确保您熟悉以下内容:
Pub/Sub 的工作原理以及不同的 Pub/Sub 术语。
Pub/Sub 支持的不同订阅类型,以及您可能需要使用拉取订阅的原因。
拉取订阅工作流
对于拉取订阅,订阅者客户端向 Pub/Sub 服务器发起请求来检索消息。订阅者客户端使用以下 API 之一:
大多数订阅者客户端不会直接发出这些请求,相反,客户端依赖于 Google Cloud 提供的高级客户端库,该库在内部执行流式拉取请求并异步传送消息。对于需要更好地控制消息拉取方式的订阅者客户端,Pub/Sub 会使用自动生成的低级别 gRPC 库。此库会直接发出拉取请求或流式传输拉取请求。这些请求可以是同步请求,也可以是异步请求。
以下两张图片展示了订阅者客户端与拉取订阅之间的工作流。
拉取工作流
拉取工作流如下所示(参考图 1):
- 订阅者客户端明确调用
pull
方法,该方法会请求要传送的消息。此请求就是PullRequest
,如图所示。 Pub/Sub 服务器会返回零条或多条消息和确认 ID。没有消息或返回错误的响应并不一定表示没有消息可供接收。此响应是
PullResponse
,如图所示。订阅者客户端明确调用
acknowledge
方法。客户端将使用返回的确认 ID 来确认消息已处理,无需再次传送。
对于单个流式拉取请求,订阅者客户端可能会因连接打开而返回多个响应。相比之下,每个拉取请求仅返回一个响应。
拉取订阅的属性
您为拉取订阅配置的属性决定了您将消息写入订阅的方式。如需了解详情,请参阅订阅属性。
Pub/Sub 服务 API
Pub/Sub 拉取订阅可以使用以下两个 API 之一来检索消息:
- 拉取
- StreamingPull
使用这些 API 接收消息时,请使用一元 Acclaim 和 ModifyAckDeadline RPC。以下标签页介绍了这两种 Pub/Sub API。
StreamingPull API
Pub/Sub 客户端库在可能的情况下,使用 StreamingPull 来最大限度提高吞吐量并缩短延迟时间。虽然您可能永远不会直接使用 StreamingPull API,但了解它与 Pull API 之间的区别很重要。
StreamingPull API 依靠永久性双向连接在消息可用时接收多条消息。工作流程如下:
客户端向服务器发送请求以建立连接。 如果超出连接配额,服务器将返回资源耗尽错误。客户端库会自动重试超出配额的错误。
如果没有错误或连接配额再次可用,服务器将持续向连接的客户端发送消息。
如果超出吞吐量配额,服务器将停止发送消息。但是,连接不会中断。只要有足够的吞吐量配额,数据流就会恢复。
客户端或服务器最终关闭连接。
StreamingPull API 使连接保持打开状态。Pub/Sub 服务器会在一段时间后定期关闭连接,以避免长时间运行的粘性连接。客户端库会自动重新打开 StreamingPull 连接。
有消息时,系统会将消息发送到连接。因此,StreamingPull API 可最大限度地缩短延迟时间并提高消息吞吐量。
详细了解 StreamingPull REST 方法:StreamingPullRequest 和 StreamingPullResponse。
详细了解 StreamingPull RPC 方法:StreamingPullRequest 和 StreamingPullResponse。
拉取 API
此 API 是基于请求和响应模型的传统一元 RPC。单个拉取响应对应于单个拉取请求。工作流程如下:
客户端向服务器发送请求以获取消息。 如果超出吞吐量配额,服务器将返回“资源耗尽”错误。
如果没有错误或吞吐量配额再次可用,服务器将回复零条或多条消息和确认 ID。
使用一元 Pull API 时,没有消息或包含错误的响应不一定表示没有消息可供接收。
使用 Pull API 并不能保证低延迟和高消息吞吐量。如需使用 Pull API 实现高吞吐量和低延迟,您必须同时有多个未完成的请求。当旧请求收到响应时,系统会创建新请求。设计此类解决方案容易出错,且难以维护。我们建议您针对此类用例使用 StreamingPull API。
仅当您需要严格控制以下各项时,才使用 Pull API 而不是 StreamingPull API:
- 订阅者客户端可以处理的消息数
- 客户端内存和资源
当您的订阅者是 Pub/Sub 与其他以更拉取方式运行的服务之间的代理时,您也可以使用此 API。
详细了解拉取 REST 方法:方法:projects.subscriptions.pull。
详细了解 Pull RPC 方法:PullRequest 和 PullResponse。
消息处理模式的类型
为您的订阅者客户端选择以下拉取模式之一。
异步拉取模式
异步拉取模式将消息接收与订阅者客户端中的消息处理分离开来。此模式是大多数订阅者客户端的默认模式。异步拉取模式可以使用 StreamingPull API 或一元 Pull API。异步拉取还可以使用高级客户端库或低级自动生成的客户端库。
本文档稍后会详细介绍客户端库。
同步拉取模式
在同步拉取模式下,消息的接收和处理会按顺序进行,并且不会彼此分离。因此,与 StreamingPull 与一元 Pull API 类似,与同步处理相比,异步处理具有更低的延迟和更高的吞吐量。
仅在以下情况下使用同步拉取模式:低延迟和高吞吐量不是一些其他要求的最重要因素。例如,应用可能仅限于使用同步编程模型。或者,具有资源限制的应用可能需要更精确地控制内存、网络或 CPU。在这种情况下,请将同步模式与一元 Pull API 搭配使用。
Pub/Sub 客户端库
Pub/Sub 提供高层级和低层级自动生成的客户端库。
高级 Pub/Sub 客户端库
高级客户端库提供了使用租期管理来控制确认时限的选项。与在订阅级别使用控制台或 CLI 配置确认截止期限相比,这些选项更精细。高级客户端库还实现了对有序传送、正好一次传送和流控制等功能的支持。
我们建议将异步拉取和 StreamingPull API 与高级客户端库搭配使用。并非 Google Cloud 支持的所有语言也支持高级客户端库中的 Pull API。
如需使用高级客户端库,请参阅 Pub/Sub 客户端库。
自动生成的低层级 Pub/Sub 客户端库
在您必须直接使用 Pull API 的情况下,可以使用低级客户端库。您可以将同步或异步处理与低级别的自动生成的客户端库结合使用。使用自动生成的低级别客户端库时,您必须手动编写功能(如有序分发、“正好一次”分发、流控制和租期管理)。
针对所有受支持的语言使用自动生成的低级客户端库时,您可以使用同步处理模型。在直接使用 Pull API 的情况下,您可以使用低层级自动生成的客户端库和同步拉取。例如,您可能有依赖于此模型的现有应用逻辑。
如需直接使用自动生成的低层级客户端库,请参阅 Pub/Sub API 概览。
客户端库代码示例
StreamingPull 和高级客户端库代码示例
C++
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C++ 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C++ API 参考文档。
C#
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C# 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub C# API 参考文档。
Go
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Go 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Go API 参考文档。
Java
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档。
Node.js
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档。
Node.js
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档。
Python
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档。
Ruby
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Ruby 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Ruby API 参考文档。
使用高级客户端库检索自定义属性
以下示例展示了如何异步拉取消息以及如何从元数据中检索自定义属性。
C++
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C++ 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C++ API 参考文档。
C#
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C# 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub C# API 参考文档。
Go
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Go 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Go API 参考文档。
Java
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档。
Node.js
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档。
Python
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档。
Ruby
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Ruby 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Ruby API 参考文档。
使用高级客户端库处理错误
以下示例展示了如何处理订阅消息时出现的错误。
C++
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C++ 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C++ API 参考文档。
Go
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Go 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Go API 参考文档。
Java
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档。
Node.js
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档。
Python
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档。
Ruby
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Go 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Go API 参考文档。
一元拉取代码示例
C++
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C++ 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C++ API 参考文档。
C#
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C# 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub C# API 参考文档。
Java
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档。
Node.js
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档。
PHP
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档。
Ruby
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Ruby 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Ruby API 参考文档。
协议
请求:
POST https://pubsub.googleapis.com/v1/projects/myproject/subscriptions/mysubscription:pull
{
"returnImmediately": "false",
"maxMessages": "1"
}
响应:
200 OK
{
"receivedMessages": [{
"ackId": "dQNNHlAbEGEIBERNK0EPKVgUWQYyODM2LwgRHFEZDDsLRk1SK...",
"message": {
"data": "SGVsbG8gQ2xvdWQgUHViL1N1YiEgSGVyZSBpcyBteSBtZXNzYWdlIQ==",
"messageId": "19917247034"
}
}]
}
请求:
POST https://pubsub.googleapis.com/v1/projects/myproject/subscriptions/mysubscription:acknowledge
{
"ackIds": [
"dQNNHlAbEGEIBERNK0EPKVgUWQYyODM2LwgRHFEZDDsLRk1SK..."
]
}
Python
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档。
Pub/Sub 传送消息列表。如果该列表包含多条消息,Pub/Sub 会使用同一排序键对消息进行排序。以下是一些重要的注意事项:
在请求中设置
max_messages
的值并不能保证会返回max_messages
,即使积压中的消息数量太多也是如此。Pub/Sub Pull API 返回的值可能少于max_messages
,以缩短随时可传送的消息的传送延迟时间。附带 0 条消息的拉取响应不得用作积压中没有任何消息的指示。有时,系统可能会返回不含任何消息的响应,而后续请求可能会返回消息。
为了使用一元拉取模式实现较短的消息传送延迟,必须有许多同时未完成的拉取请求。随着主题吞吐量的增加,需要更多的拉取请求。通常,StreamingPull 模式更适合对延迟敏感的应用。
配额和限制
拉取和 StreamingPull 连接均受配额和限制的约束。 如需了解详情,请参阅 Pub/Sub 配额和限制。
后续步骤
为您的主题创建拉取订阅。
使用 gcloud CLI 创建或修改订阅。
使用 REST API 创建或修改订阅。
使用 RPC API 创建或修改订阅。