本文档简要介绍了拉取订阅、其工作流和关联的属性。
在拉取订阅中,订阅方客户端向 Pub/Sub 服务器请求消息。
拉取模式可以使用 Pull 或 StreamingPull 两个服务 API 之一。 如需运行所选 API,您可以选择 Google 提供的高级客户端库或自动生成的低级别客户端库。您还可以选择异步处理消息或同步消息处理。
准备工作
在阅读本文档之前,请确保您熟悉以下内容:
Pub/Sub 的工作原理和不同的 Pub/Sub 术语。
Pub/Sub 支持的不同订阅类型以及您可能需要使用拉取订阅的原因。
拉取订阅工作流
对于拉取订阅,订阅者客户端向 Pub/Sub 服务器发起请求以检索消息。订阅方客户端使用以下 API 之一:
大多数订阅者客户端不会直接发出这些请求。相反,客户端依赖于 Google Cloud 提供的高级客户端库,该库在内部执行流式拉取请求并异步传送消息。对于需要更好地控制消息拉取方式的订阅者客户端,Pub/Sub 会使用自动生成的低级别 gRPC 库。此库会直接发出拉取请求或流式拉取请求。这些请求可以是同步请求,也可以是异步请求。
以下两张图片展示了订阅者客户端和拉取订阅之间的工作流。
![拉取订阅的消息流](https://cloud.google.com/static/pubsub/images/subscriber_pull.png?authuser=4&hl=zh-cn)
![streamingPull 订阅的消息流](https://cloud.google.com/static/pubsub/images/subscriber_streamingpull.png?authuser=4&hl=zh-cn)
拉取工作流
拉取工作流如下所示(参考图 1):
- 订阅者客户端明确调用
pull
方法,该方法会请求要传送的消息。此请求是图片中显示的PullRequest
。 Pub/Sub 服务器返回零个或零个以上消息和确认 ID 进行响应。零消息或包含错误的响应不一定表示没有消息可接收。此响应是上图所示的
PullResponse
。订阅者客户端明确调用
acknowledge
方法。客户端使用返回的确认 ID 来确认消息已处理,不需要再次传送。
对于单个流式拉取请求,订阅方客户端可能会因连接打开而返回多个响应。相比之下,每个拉取请求仅返回一个响应。
拉取订阅的属性
您为拉取订阅配置的属性决定了向订阅写入消息的方式。如需了解详情,请参阅订阅属性。
Pub/Sub 服务 API
Pub/Sub 拉取订阅可以使用以下两个 API 之一来检索消息:
- 拉取
- StreamingPull
使用这些 API 接收消息时,请使用一元 Acknowledge 和 ModifyAckDeadline RPC。以下标签页介绍了这两个 Pub/Sub API。
StreamingPull API
在可能的情况下,Pub/Sub 客户端库使用 StreamingPull 来最大限度地提高吞吐量并缩短延迟时间。尽管您可能永远不会直接使用 StreamingPull API,但了解它与 Pull API 之间的区别非常重要。
StreamingPull API 依赖持久双向连接来接收多条消息。工作流程如下:
客户端向服务器发送请求以建立连接。 如果超出连接配额,服务器将返回“资源已用尽”错误。客户端库会自动重试超出配额的错误。
如果没有错误或连接配额再次可用,服务器会持续向连接的客户端发送消息。
如果超出吞吐量配额,服务器将停止发送消息。不过,连接并未中断。只要再次有足够的吞吐量配额,数据流就会恢复。
客户端或服务器最终关闭连接。
StreamingPull API 会保持打开连接。Pub/Sub 服务器在一段时间后定期关闭连接,以避免长时间运行的粘性连接。客户端库会自动重新打开 StreamingPull 连接。
消息可用时,会发送到相应连接。因此,StreamingPull API 可最大限度地缩短延迟时间并提高消息吞吐量。
详细了解 StreamingPull REST 方法:StreamingPullRequest 和 StreamingPullResponse。
详细了解 StreamingPull RPC 方法:StreamingPullRequest 和 StreamingPullResponse。
拉取 API
此 API 是一种基于请求和响应模型的传统一元 RPC。单个拉取响应对应于单个拉取请求。 工作流程如下:
客户端向服务器发送消息请求。 如果超出吞吐量配额,服务器将返回“资源已用尽”错误。
如果没有错误或吞吐量配额再次可用,服务器将回复零个或零个以上消息和确认 ID。
使用一元拉取 API 时,不含任何消息或包含错误的响应并不一定表示没有可接收的消息。
使用 Pull API 不能保证低延迟和高吞吐量的消息。如需使用 Pull API 实现高吞吐量和低延迟,您必须同时有多个未完成的请求。当旧请求收到响应时,系统会创建新请求。设计这样的解决方案不仅容易出错,而且难以维护。对于此类用例,我们建议您使用 StreamingPull API。
只有在需要严格控制以下方面时,才使用 Pull API,而不是 StreamingPull API:
- 订阅者客户端可以处理的消息数量
- 客户端内存和资源
如果您的订阅者是 Pub/Sub 和另一项以更注重拉取的方式运行的服务之间的代理,您也可以使用此 API。
详细了解拉取 REST 方法:方法:projects.subscriptions.pull。
详细了解拉取 RPC 方法:PullRequest 和 PullResponse。
消息处理模式的类型
为订阅者客户端选择以下拉取模式之一。
异步拉取模式
异步拉取模式将消息接收与订阅客户端中消息的处理分离开来。此模式是大多数订阅者客户端的默认模式。异步拉取模式可以使用 StreamingPull API 或一元拉取 API。异步拉取也可以使用高级客户端库或自动生成的低级客户端库。
您可以在本文档的后面部分了解有关客户端库的更多信息。
同步拉取模式
在同步拉取模式下,消息的接收和处理按顺序发生,并且不会彼此分离。因此,与 StreamingPull 与一元拉取 API 类似,与同步处理相比,异步处理的延迟时间更短,吞吐量更高。
请仅在与某些其他要求相比,低延迟和高吞吐量不是最重要因素的应用使用同步拉取模式。例如,应用可能仅限于使用同步编程模型。或者,具有资源限制的应用可能需要更精确地控制内存、网络或 CPU。在这种情况下,请将同步模式与一元拉取 API 搭配使用。
Pub/Sub 客户端库
Pub/Sub 提供自动生成的高级别和低级别的客户端库。
高级 Pub/Sub 客户端库
高级客户端库提供了使用租期管理来控制确认时限的选项。与使用控制台或 CLI 在订阅级别配置确认时限相比,这些选项会更精细。高级客户端库还可实现对有序传送、正好一次传送和流控制等功能的支持。
我们建议将异步拉取和 StreamingPull API 与高级客户端库结合使用。并非所有支持 Google Cloud 的语言也支持高级客户端库中的 Pull API。
如需使用高级客户端库,请参阅 Pub/Sub 客户端库。
自动生成的低级 Pub/Sub 客户端库
对于必须直接使用拉取 API 的情况,您可以使用低层级客户端库。您可以将同步或异步处理与自动生成的低级别客户端库结合使用。使用自动生成的低级别客户端库时,您必须手动编写功能代码,例如有序交付、正好一次交付、流控制和租赁管理。
如果您为所有受支持的语言使用自动生成的低级别客户端库,则可以使用同步处理模型。如果直接使用 Pull API,您可以使用低级别自动生成的客户端库和同步拉取。例如,您可能已有依赖于此模型的应用逻辑。
如需直接使用自动生成的低级别客户端库,请参阅 Pub/Sub API 概览。
客户端库代码示例
StreamingPull 及高级客户端库代码示例
C++
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C++ 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C++ API 参考文档。
C#
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C# 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub C# API 参考文档。
Go
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Go 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Go API 参考文档。
Java
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档。
Node.js
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档。
Node.js
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档。
Python
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档。
Ruby
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Ruby 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Ruby API 参考文档。
使用高级客户端库检索自定义属性
以下示例展示了如何异步拉取消息以及从元数据中检索自定义属性。
C++
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C++ 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C++ API 参考文档。
C#
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C# 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub C# API 参考文档。
Go
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Go 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Go API 参考文档。
Java
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档。
Node.js
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档。
Python
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档。
Ruby
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Ruby 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Ruby API 参考文档。
使用高级客户端库处理错误
以下示例展示了如何处理订阅消息时出现的错误。
C++
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C++ 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C++ API 参考文档。
Go
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Go 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Go API 参考文档。
Java
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档。
Node.js
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档。
Python
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档。
Ruby
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Go 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Go API 参考文档。
一元拉取代码示例
C++
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C++ 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C++ API 参考文档。
C#
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C# 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub C# API 参考文档。
Java
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档。
Node.js
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档。
PHP
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档。
Ruby
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Ruby 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Ruby API 参考文档。
协议
请求:
POST https://pubsub.googleapis.com/v1/projects/myproject/subscriptions/mysubscription:pull
{
"returnImmediately": "false",
"maxMessages": "1"
}
响应:
200 OK
{
"receivedMessages": [{
"ackId": "dQNNHlAbEGEIBERNK0EPKVgUWQYyODM2LwgRHFEZDDsLRk1SK...",
"message": {
"data": "SGVsbG8gQ2xvdWQgUHViL1N1YiEgSGVyZSBpcyBteSBtZXNzYWdlIQ==",
"messageId": "19917247034"
}
}]
}
请求:
POST https://pubsub.googleapis.com/v1/projects/myproject/subscriptions/mysubscription:acknowledge
{
"ackIds": [
"dQNNHlAbEGEIBERNK0EPKVgUWQYyODM2LwgRHFEZDDsLRk1SK..."
]
}
Python
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档。
Pub/Sub 传送消息列表。如果列表包含多条消息,Pub/Sub 会使用相同的排序键对这些消息进行排序。以下是一些重要注意事项:
在请求中为
max_messages
设置值不能保证返回max_messages
,即使积压消息中有这么多消息也是如此。Pub/Sub Pull API 返回的内容可能少于max_messages
,以缩短准备好传送的消息的传送延迟时间。不得将附带 0 条消息的拉取响应用作指示积压中没有消息的指示。有可能出现 0 条消息的响应,但随后有一个返回消息的请求。
如需使用一元拉取模式实现较低的消息传送延迟时间,必须同时具有许多未完成的拉取请求。随着主题吞吐量的增加,需要更多的拉取请求。通常,StreamingPull 模式更适合对延迟敏感的应用。
配额和限制
Pull 和 StreamingPull 连接均受配额和限制的约束。 如需了解详情,请参阅 Pub/Sub 配额和限制。
后续步骤
使用 gcloud CLI 创建或修改订阅。
使用 REST API 创建或修改订阅。
使用 RPC API 创建或修改订阅。