本页面介绍了如何在通过 Spring Framework 构建的 Java 应用中使用 Pub/Sub。
Spring Cloud GCP 具有多个模块,用于向 Pub/Sub 主题发送消息,以及使用 Spring Framework 从 Pub/Sub 订阅中接收消息。您可以单独使用这些模块,也可以将它们组合起来用于不同的使用场景:
- 借助 Spring Cloud GCP Pub/Sub 入门版,您可以使用辅助类发送和接收消息,并且可以调用 Pub/Sub Java 客户端库来支持更高级的场景。
- 借助适用于 Pub/Sub 的 Spring Integration 渠道适配器,您可以将 Spring Integration 消息渠道与 Pub/Sub 关联起来。
- 借助 适用于 Pub/Sub 的 Spring Cloud Stream Binder,您可以在 Spring Cloud Stream 应用中使用 Pub/Sub 作为消息传递中间件。
注意:Spring Cloud GCP 库不提供对 AckReplyConsumerWithResponse 的访问权限,而该模块是使用 Java 客户端库实现“恰好一次”功能的必需模块。
准备工作
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
Set up a Google Cloud console project.
Click to:
- Create or select a project.
- Enable the Pub/Sub API for that project.
- Create a service account.
- Download a private key as JSON.
You can view and manage these resources at any time in the Google Cloud console.
-
Set the environment variable
GOOGLE_APPLICATION_CREDENTIALS
to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again. -
Set up a Google Cloud console project.
Click to:
- Create or select a project.
- Enable the Pub/Sub API for that project.
- Create a service account.
- Download a private key as JSON.
You can view and manage these resources at any time in the Google Cloud console.
-
Set the environment variable
GOOGLE_APPLICATION_CREDENTIALS
to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again. - 将
GOOGLE_CLOUD_PROJECT
环境变量设置为您的 Google Cloud 项目 ID。
使用 Spring Cloud GCP Pub/Sub 入门版
Spring Cloud GCP Pub/Sub 入门版模块使用 Spring Cloud GCP Pub/Sub 模块安装 Pub/Sub Java 客户端库。您可以使用 Spring Cloud GCP Pub/Sub 入门版提供的类或 Pub/Sub Java 客户端库从 Spring 应用调用 Pub/Sub API。如果您使用的是 Spring Cloud GCP Pub/Sub 入门版提供的类,则可以替换默认的 Pub/Sub 配置。
安装模块
如需安装 Spring Cloud GCP Pub/Sub 入门版模块,请将以下依赖项添加到 pom.xml
文件中:
Spring Cloud GCP Pub/Sub 入门版工件:
支持的操作
Spring Cloud GCP Pub/Sub 入门版模块包括以下类:
PubSubAdmin
,用于管理操作:- 创建主题和订阅。
- 获取主题和订阅。
- 列出主题和订阅。
- 删除主题和订阅。
- 获取和设置订阅的确认时限。
PubSubTemplate
,用于发送和接收消息:- 向主题发布消息。
- 从订阅中同步拉取消息。
- 从订阅中异步拉取消息。
- 确认消息。
- 修改确认时限。
- 将 Pub/Sub 消息转换为普通的旧式 Java 对象 (POJO)。
使用 Spring Integration 渠道适配器
如果您的 Spring 应用使用 Spring Integration 消息通道,您可以使用渠道适配器在消息渠道和 Pub/Sub 之间路由消息。
安装模块
如需为 Spring Integration 渠道适配器安装模块,请将以下内容添加到 pom.xml
文件中:
Spring Cloud GCP Pub/Sub 入门版和 Spring Integration 核心工件:
从 Pub/Sub 接收消息
如需从 Spring 应用中的 Pub/Sub 订阅接收消息,请使用入站渠道适配器。入站渠道适配器将传入的 Pub/Sub 消息转换为 POJO,然后将 POJO 转发到消息渠道。
上面的示例使用以下 Spring Bean 和 Pub/Sub 资源:
- 名为
inputMessageChannel
的消息渠道 Bean。 - 名为
inboundChannelAdapter
的入站渠道适配器 Bean,类型为PubSubInboundChannelAdapter
。 - 名为
sub-one
的 Pub/Sub 订阅 ID。
inboundChannelAdapter
使用 PubSubTemplate
从 sub-one
中异步拉取消息,并将消息发送到 inputMessageChannel
。
inboundChannelAdapter
会将确认模式设置为 MANUAL
,以便应用可以在处理消息后进行确认。PubSubInboundChannelAdapter
类型的默认确认模式是 AUTO
。
ServiceActivator
Bean messageReceiver
会将到达 inputMessageChannel
的每条消息记录到标准输出,然后确认消息。
将消息发布到 Pub/Sub
如需将消息渠道的消息发布到 Pub/Sub 主题,请使用出站渠道适配器。出站渠道适配器将 POJO 转换为 Pub/Sub 消息,然后将这些消息发送到 Pub/Sub 主题。
上面的示例使用以下 Spring Bean 和 Pub/Sub 资源:
- 名为
inputMessageChannel
的消息渠道 Bean。 - 名为
messageSender
的出站渠道适配器 Bean,类型为PubSubMessageHandler
。 - 名为
topic-two
的 Pub/Sub 主题 ID。
ServiceActivator
Bean 将 messageSender
中的逻辑应用于 inputMessageChannel
中的每条消息。
messageSender
中的 PubSubMessageHandler
使用 PubSubTemplate
在 inputMessageChannel
中发布消息。PubSubMessageHandler
会将消息发布到 Pub/Sub 主题 topic-two
。
使用 Spring Cloud Stream Binder
如需在 Spring Cloud Stream 应用中调用 Pub/Sub API,请使用 Spring Cloud GCP Pub/Sub Stream Binder 模块。
安装模块
如需安装 Spring Cloud Stream Binder 模块,请将以下内容添加到 pom.xml
文件中:
Spring Cloud Stream Binder 工件:
从 Pub/Sub 接收消息
要将应用用作事件接收器,请通过指定以下内容来配置输入 Binder:
定义消息处理逻辑的
Consumer
Bean。例如,以下Consumer
Bean 名为receiveMessageFromTopicTwo
:配置文件
application.properties
中的 Pub/Sub 主题 ID。例如,以下配置文件使用名为topic-two
的 Pub/Sub 主题 ID:
示例代码会从 Pub/Sub 接收消息。该示例执行以下操作:
- 在
application.properties
的输入绑定目的地中找到 Pub/Sub 主题 IDtopic-two
。 - 创建针对
topic-two
的 Pub/Sub 订阅。 - 使用绑定名称
receiveMessageFromTopicTwo-in-0
查找名为receiveMessageFromTopicTwo
的Consumer
Bean。 - 将传入的消息输出到标准输出并自动确认。
将消息发布到 Pub/Sub
要将应用用作事件源,请指定以下内容以配置输出 Binder:
一个
Supplier
Bean,用于定义消息来自应用中的何处。例如,以下Supplier
Bean 名为sendMessageToTopicOne
:配置文件
application.properties
中的 Pub/Sub 主题 ID。例如,以下配置文件使用名为topic-one
的 Pub/Sub 主题 ID:
示例代码会将消息发布到 Pub/Sub。该示例执行以下操作:
- 在
application.properties
的输出绑定目的地中找到 Pub/Sub 主题 IDtopic-one
。 - 使用绑定名称
sendMessageToTopicOne-out-0
查找名为sendMessageToTopicOne
的Supplier
Bean。 - 每 10 秒向
topic-one
发送一条已编号的消息。