在 Spring 应用中使用 Pub/Sub

本页面介绍了如何在通过 Spring Framework 构建的 Java 应用中使用 Pub/Sub。

Spring Cloud GCP 具有多个模块,用于向 Pub/Sub 主题发送消息,以及使用 Spring Framework 从 Pub/Sub 订阅中接收消息。您可以单独使用这些模块,也可以将它们组合起来用于不同的使用场景:

注意:Spring Cloud GCP 库不提供对 AckReplyConsumerWithResponse 的访问权限,后者是使用 Java 客户端库实现“正好一次”功能所需的模块。


  1. 登录您的 Google Cloud 账号。如果您是 Google Cloud 新手,请创建一个账号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
  2. 设置 Google Cloud 控制台项目。



    • 创建或选择项目。
    • 为该项目启用 Pub/Sub API。
    • 创建服务账号。
    • 下载 JSON 格式的私钥。

    您可以随时在 Google Cloud 控制台中查看和管理这些资源。

  3. 将环境变量 GOOGLE_APPLICATION_CREDENTIALS 设置为包含凭据的 JSON 文件的路径。 此变量仅适用于当前的 shell 会话,因此,如果您打开新的会话,请重新设置该变量。

  6. 将环境变量 GOOGLE_CLOUD_PROJECT 设置为您的 Google Cloud 项目 ID。

使用 Spring Cloud GCP Pub/Sub 入门版

Spring Cloud GCP Pub/Sub 入门版模块使用 Spring Cloud GCP Pub/Sub 模块安装 Pub/Sub Java 客户端库。您可以使用 Spring Cloud GCP Pub/Sub 入门版提供的类或 Pub/Sub Java 客户端库从 Spring 应用调用 Pub/Sub API。如果您使用的是 Spring Cloud GCP Pub/Sub 入门版提供的类,则可以替换默认的 Pub/Sub 配置


如需安装 Spring Cloud GCP Pub/Sub 入门版模块,请将以下依赖项添加到 pom.xml 文件中:

  1. Spring Cloud 物料清单 (BOM)

  2. Spring Cloud GCP Pub/Sub 入门版工件:



Spring Cloud GCP Pub/Sub 入门版模块包括以下类:

  • PubSubAdmin,用于管理操作:
    • 创建主题和订阅。
    • 获取主题和订阅。
    • 列出主题和订阅。
    • 删除主题和订阅。
    • 获取和设置订阅的确认时限。
  • PubSubTemplate,用于发送和接收消息:
    • 向主题发布消息。
    • 从订阅中同步拉取消息。
    • 从订阅中异步拉取消息。
    • 确认消息。
    • 修改确认时限。
    • 将 Pub/Sub 消息转换为普通的旧式 Java 对象 (POJO)。

使用 Spring Integration 渠道适配器

如果您的 Spring 应用使用 Spring Integration 消息通道,您可以使用渠道适配器在消息渠道和 Pub/Sub 之间路由消息。


如需为 Spring Integration 渠道适配器安装模块,请将以下内容添加到 pom.xml 文件中:

  1. Spring Cloud GCP BOM

  2. Spring Cloud GCP Pub/Sub 入门版和 Spring Integration 核心工件:


从 Pub/Sub 接收消息

如需从 Spring 应用中的 Pub/Sub 订阅接收消息,请使用入站渠道适配器。入站渠道适配器将传入的 Pub/Sub 消息转换为 POJO,然后将 POJO 转发到消息渠道。

// Create a message channel for messages arriving from the subscription `sub-one`.
public MessageChannel inputMessageChannel() {
  return new PublishSubscribeChannel();

// Create an inbound channel adapter to listen to the subscription `sub-one` and send
// messages to the input message channel.
public PubSubInboundChannelAdapter inboundChannelAdapter(
    @Qualifier("inputMessageChannel") MessageChannel messageChannel,
    PubSubTemplate pubSubTemplate) {
  PubSubInboundChannelAdapter adapter =
      new PubSubInboundChannelAdapter(pubSubTemplate, "sub-one");
  return adapter;

// Define what happens to the messages arriving in the message channel.
@ServiceActivator(inputChannel = "inputMessageChannel")
public void messageReceiver(
    String payload,
    @Header(GcpPubSubHeaders.ORIGINAL_MESSAGE) BasicAcknowledgeablePubsubMessage message) {
  LOGGER.info("Message arrived via an inbound channel adapter from sub-one! Payload: " + payload);

上面的示例使用以下 Spring Bean 和 Pub/Sub 资源:

  • 名为 inputMessageChannel 的消息渠道 Bean。
  • 名为 inboundChannelAdapter 的入站渠道适配器 Bean,类型为 PubSubInboundChannelAdapter
  • 名为 sub-one 的 Pub/Sub 订阅 ID。

inboundChannelAdapter 使用 PubSubTemplatesub-one 中异步拉取消息,并将消息发送到 inputMessageChannel

inboundChannelAdapter 会将确认模式设置为 MANUAL,以便应用可以在处理消息后进行确认。PubSubInboundChannelAdapter 类型的默认确认模式是 AUTO

ServiceActivator Bean messageReceiver 会将到达 inputMessageChannel 的每条消息记录到标准输出,然后确认消息。

将消息发布到 Pub/Sub

如需将消息渠道的消息发布到 Pub/Sub 主题,请使用出站渠道适配器。出站渠道适配器将 POJO 转换为 Pub/Sub 消息,然后将这些消息发送到 Pub/Sub 主题。

// Create an outbound channel adapter to send messages from the input message channel to the
// topic `topic-two`.
@ServiceActivator(inputChannel = "inputMessageChannel")
public MessageHandler messageSender(PubSubTemplate pubsubTemplate) {
  PubSubMessageHandler adapter = new PubSubMessageHandler(pubsubTemplate, "topic-two");

      ((ackId, message) ->
          LOGGER.info("Message was sent via the outbound channel adapter to topic-two!")));

      (cause, message) -> LOGGER.info("Error sending " + message + " due to " + cause));

  return adapter;

上面的示例使用以下 Spring Bean 和 Pub/Sub 资源:

  • 名为 inputMessageChannel 的消息渠道 Bean。
  • 名为 messageSender 的出站渠道适配器 Bean,类型为 PubSubMessageHandler
  • 名为 topic-two 的 Pub/Sub 主题 ID。

ServiceActivator Bean 将 messageSender 中的逻辑应用于 inputMessageChannel 中的每条消息。

messageSender 中的 PubSubMessageHandler 使用 PubSubTemplateinputMessageChannel 中发布消息。PubSubMessageHandler 会将消息发布到 Pub/Sub 主题 topic-two

使用 Spring Cloud Stream Binder

如需在 Spring Cloud Stream 应用中调用 Pub/Sub API,请使用 Spring Cloud GCP Pub/Sub Stream Binder 模块。


如需安装 Spring Cloud Stream Binder 模块,请将以下内容添加到 pom.xml 文件中:

  1. Spring Cloud GCP BOM

  2. Spring Cloud Stream Binder 工件:


从 Pub/Sub 接收消息

要将应用用作事件接收器,请通过指定以下内容来配置输入 Binder:

  • 定义消息处理逻辑的 Consumer Bean。例如,以下 Consumer Bean 名为 receiveMessageFromTopicTwo

    // Create an input binder to receive messages from `topic-two` using a Consumer bean.
    public Consumer<Message<String>> receiveMessageFromTopicTwo() {
      return message -> {
            "Message arrived via an input binder from topic-two! Payload: " + message.getPayload());
  • 配置文件 application.properties 中的 Pub/Sub 主题 ID。例如,以下配置文件使用名为 topic-two 的 Pub/Sub 主题 ID:

    # Bind the Pub/Sub topic `topic-two` to the Consumer bean
    # `receiveMessageFromTopicTwo`. Your Spring application will
    # automatically create and attach a subscription to the topic.

示例代码会从 Pub/Sub 接收消息。该示例执行以下操作:

  1. application.properties 的输入绑定目的地中找到 Pub/Sub 主题 ID topic-two
  2. 创建针对 topic-two 的 Pub/Sub 订阅。
  3. 使用绑定名称 receiveMessageFromTopicTwo-in-0 查找名为 receiveMessageFromTopicTwoConsumer Bean。
  4. 将传入的消息输出到标准输出并自动确认。

将消息发布到 Pub/Sub

要将应用用作事件源,请指定以下内容以配置输出 Binder:

  • 一个 Supplier Bean,用于定义消息来自应用中的何处。例如,以下 Supplier Bean 名为 sendMessageToTopicOne

    // Create an output binder to send messages to `topic-one` using a Supplier bean.
    public Supplier<Flux<Message<String>>> sendMessageToTopicOne() {
      return () ->
                  sink -> {
                    try {
                    } catch (InterruptedException e) {
                      // Stop sleep earlier.
                    Message<String> message =
                        MessageBuilder.withPayload("message-" + rand.nextInt(1000)).build();
                        "Sending a message via the output binder to topic-one! Payload: "
                            + message.getPayload());
  • 配置文件 application.properties 中的 Pub/Sub 主题 ID。例如,以下配置文件使用名为 topic-one 的 Pub/Sub 主题 ID:

    # Bind the Supplier bean `sendMessageToTopicOne` to the Pub/Sub topic
    # `topic-one`. If the topic does not exist, one will be created.

示例代码会将消息发布到 Pub/Sub。该示例执行以下操作:

  1. application.properties 的输出绑定目的地中找到 Pub/Sub 主题 ID topic-one
  2. 使用绑定名称 sendMessageToTopicOne-out-0 查找名为 sendMessageToTopicOneSupplier Bean。
  3. 每 10 秒向 topic-one 发送一条已编号的消息。