在 Spring 应用中使用 Pub/Sub

本页面介绍了如何在通过 Spring Framework 构建的 Java 应用中使用 Pub/Sub。

Spring Cloud GCP 具有多个模块,用于向 Pub/Sub 主题发送消息,以及使用 Spring Framework 从 Pub/Sub 订阅中接收消息。您可以单独使用这些模块,也可以将它们组合起来用于不同的使用场景:

注意:Spring Cloud GCP 库不提供对 AckReplyConsumerWithResponse 的访问权限,后者是使用 Java 客户端库实现“正好一次”功能所需的模块。

准备工作

  1. 登录您的 Google Cloud 账号。如果您是 Google Cloud 新手,请创建一个账号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
  2. 设置 Google Cloud 控制台项目。

    设置项目

    点击即可执行以下操作:

    • 创建或选择项目。
    • 为该项目启用 Pub/Sub API。
    • 创建服务账号。
    • 下载 JSON 格式的私钥。

    您可以随时在 Google Cloud 控制台中查看和管理这些资源。

  3. 将环境变量 GOOGLE_APPLICATION_CREDENTIALS 设置为包含凭据的 JSON 文件的路径。 此变量仅适用于当前的 shell 会话,因此,如果您打开新的会话,请重新设置该变量。

  4. 设置 Google Cloud 控制台项目。

    设置项目

    点击即可执行以下操作:

    • 创建或选择项目。
    • 为该项目启用 Pub/Sub API。
    • 创建服务账号。
    • 下载 JSON 格式的私钥。

    您可以随时在 Google Cloud 控制台中查看和管理这些资源。

  5. 将环境变量 GOOGLE_APPLICATION_CREDENTIALS 设置为包含凭据的 JSON 文件的路径。 此变量仅适用于当前的 shell 会话,因此,如果您打开新的会话,请重新设置该变量。

  6. 将环境变量 GOOGLE_CLOUD_PROJECT 设置为您的 Google Cloud 项目 ID。

使用 Spring Cloud GCP Pub/Sub 入门版

Spring Cloud GCP Pub/Sub 入门版模块使用 Spring Cloud GCP Pub/Sub 模块安装 Pub/Sub Java 客户端库。您可以使用 Spring Cloud GCP Pub/Sub 入门版提供的类或 Pub/Sub Java 客户端库从 Spring 应用调用 Pub/Sub API。如果您使用的是 Spring Cloud GCP Pub/Sub 入门版提供的类,则可以替换默认的 Pub/Sub 配置

安装模块

如需安装 Spring Cloud GCP Pub/Sub 入门版模块,请将以下依赖项添加到 pom.xml 文件中:

  1. Spring Cloud 物料清单 (BOM)

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>com.google.cloud</groupId>
          <artifactId>spring-cloud-gcp-dependencies</artifactId>
          <version>3.7.7</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
  2. Spring Cloud GCP Pub/Sub 入门版工件:

    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
    </dependency>

支持的操作

Spring Cloud GCP Pub/Sub 入门版模块包括以下类:

  • PubSubAdmin,用于管理操作:
    • 创建主题和订阅。
    • 获取主题和订阅。
    • 列出主题和订阅。
    • 删除主题和订阅。
    • 获取和设置订阅的确认时限。
  • PubSubTemplate,用于发送和接收消息:
    • 向主题发布消息。
    • 从订阅中同步拉取消息。
    • 从订阅中异步拉取消息。
    • 确认消息。
    • 修改确认时限。
    • 将 Pub/Sub 消息转换为普通的旧式 Java 对象 (POJO)。

使用 Spring Integration 渠道适配器

如果您的 Spring 应用使用 Spring Integration 消息通道,您可以使用渠道适配器在消息渠道和 Pub/Sub 之间路由消息。

安装模块

如需为 Spring Integration 渠道适配器安装模块,请将以下内容添加到 pom.xml 文件中:

  1. Spring Cloud GCP BOM

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>com.google.cloud</groupId>
          <artifactId>spring-cloud-gcp-dependencies</artifactId>
          <version>3.7.7</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
  2. Spring Cloud GCP Pub/Sub 入门版和 Spring Integration 核心工件:

    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.integration</groupId>
      <artifactId>spring-integration-core</artifactId>
    </dependency>

从 Pub/Sub 接收消息

如需从 Spring 应用中的 Pub/Sub 订阅接收消息,请使用入站渠道适配器。入站渠道适配器将传入的 Pub/Sub 消息转换为 POJO,然后将 POJO 转发到消息渠道。

// Create a message channel for messages arriving from the subscription `sub-one`.
@Bean
public MessageChannel inputMessageChannel() {
  return new PublishSubscribeChannel();
}

// Create an inbound channel adapter to listen to the subscription `sub-one` and send
// messages to the input message channel.
@Bean
public PubSubInboundChannelAdapter inboundChannelAdapter(
    @Qualifier("inputMessageChannel") MessageChannel messageChannel,
    PubSubTemplate pubSubTemplate) {
  PubSubInboundChannelAdapter adapter =
      new PubSubInboundChannelAdapter(pubSubTemplate, "sub-one");
  adapter.setOutputChannel(messageChannel);
  adapter.setAckMode(AckMode.MANUAL);
  adapter.setPayloadType(String.class);
  return adapter;
}

// Define what happens to the messages arriving in the message channel.
@ServiceActivator(inputChannel = "inputMessageChannel")
public void messageReceiver(
    String payload,
    @Header(GcpPubSubHeaders.ORIGINAL_MESSAGE) BasicAcknowledgeablePubsubMessage message) {
  LOGGER.info("Message arrived via an inbound channel adapter from sub-one! Payload: " + payload);
  message.ack();
}

上面的示例使用以下 Spring Bean 和 Pub/Sub 资源:

  • 名为 inputMessageChannel 的消息渠道 Bean。
  • 名为 inboundChannelAdapter 的入站渠道适配器 Bean,类型为 PubSubInboundChannelAdapter
  • 名为 sub-one 的 Pub/Sub 订阅 ID。

inboundChannelAdapter 使用 PubSubTemplatesub-one 中异步拉取消息,并将消息发送到 inputMessageChannel

inboundChannelAdapter 会将确认模式设置为 MANUAL,以便应用可以在处理消息后进行确认。PubSubInboundChannelAdapter 类型的默认确认模式是 AUTO

ServiceActivator Bean messageReceiver 会将到达 inputMessageChannel 的每条消息记录到标准输出,然后确认消息。

将消息发布到 Pub/Sub

如需将消息渠道的消息发布到 Pub/Sub 主题,请使用出站渠道适配器。出站渠道适配器将 POJO 转换为 Pub/Sub 消息,然后将这些消息发送到 Pub/Sub 主题。

// Create an outbound channel adapter to send messages from the input message channel to the
// topic `topic-two`.
@Bean
@ServiceActivator(inputChannel = "inputMessageChannel")
public MessageHandler messageSender(PubSubTemplate pubsubTemplate) {
  PubSubMessageHandler adapter = new PubSubMessageHandler(pubsubTemplate, "topic-two");

  adapter.setSuccessCallback(
      ((ackId, message) ->
          LOGGER.info("Message was sent via the outbound channel adapter to topic-two!")));

  adapter.setFailureCallback(
      (cause, message) -> LOGGER.info("Error sending " + message + " due to " + cause));

  return adapter;
}

上面的示例使用以下 Spring Bean 和 Pub/Sub 资源:

  • 名为 inputMessageChannel 的消息渠道 Bean。
  • 名为 messageSender 的出站渠道适配器 Bean,类型为 PubSubMessageHandler
  • 名为 topic-two 的 Pub/Sub 主题 ID。

ServiceActivator Bean 将 messageSender 中的逻辑应用于 inputMessageChannel 中的每条消息。

messageSender 中的 PubSubMessageHandler 使用 PubSubTemplateinputMessageChannel 中发布消息。PubSubMessageHandler 会将消息发布到 Pub/Sub 主题 topic-two

使用 Spring Cloud Stream Binder

如需在 Spring Cloud Stream 应用中调用 Pub/Sub API,请使用 Spring Cloud GCP Pub/Sub Stream Binder 模块。

安装模块

如需安装 Spring Cloud Stream Binder 模块,请将以下内容添加到 pom.xml 文件中:

  1. Spring Cloud GCP BOM

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>com.google.cloud</groupId>
          <artifactId>spring-cloud-gcp-dependencies</artifactId>
          <version>3.7.7</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
  2. Spring Cloud Stream Binder 工件:

    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>spring-cloud-gcp-pubsub-stream-binder</artifactId>
    </dependency>

从 Pub/Sub 接收消息

要将应用用作事件接收器,请通过指定以下内容来配置输入 Binder:

  • 定义消息处理逻辑的 Consumer Bean。例如,以下 Consumer Bean 名为 receiveMessageFromTopicTwo

    // Create an input binder to receive messages from `topic-two` using a Consumer bean.
    @Bean
    public Consumer<Message<String>> receiveMessageFromTopicTwo() {
      return message -> {
        LOGGER.info(
            "Message arrived via an input binder from topic-two! Payload: " + message.getPayload());
      };
    }
  • 配置文件 application.properties 中的 Pub/Sub 主题 ID。例如,以下配置文件使用名为 topic-two 的 Pub/Sub 主题 ID:

    # Bind the Pub/Sub topic `topic-two` to the Consumer bean
    # `receiveMessageFromTopicTwo`. Your Spring application will
    # automatically create and attach a subscription to the topic.
    spring.cloud.stream.bindings.receiveMessageFromTopicTwo-in-0.destination=topic-two

示例代码会从 Pub/Sub 接收消息。该示例执行以下操作:

  1. application.properties 的输入绑定目的地中找到 Pub/Sub 主题 ID topic-two
  2. 创建针对 topic-two 的 Pub/Sub 订阅。
  3. 使用绑定名称 receiveMessageFromTopicTwo-in-0 查找名为 receiveMessageFromTopicTwoConsumer Bean。
  4. 将传入的消息输出到标准输出并自动确认。

将消息发布到 Pub/Sub

要将应用用作事件源,请指定以下内容以配置输出 Binder:

  • 一个 Supplier Bean,用于定义消息来自应用中的何处。例如,以下 Supplier Bean 名为 sendMessageToTopicOne

    // Create an output binder to send messages to `topic-one` using a Supplier bean.
    @Bean
    public Supplier<Flux<Message<String>>> sendMessageToTopicOne() {
      return () ->
          Flux.<Message<String>>generate(
                  sink -> {
                    try {
                      Thread.sleep(10000);
                    } catch (InterruptedException e) {
                      // Stop sleep earlier.
                    }
    
                    Message<String> message =
                        MessageBuilder.withPayload("message-" + rand.nextInt(1000)).build();
                    LOGGER.info(
                        "Sending a message via the output binder to topic-one! Payload: "
                            + message.getPayload());
                    sink.next(message);
                  })
              .subscribeOn(Schedulers.boundedElastic());
    }
  • 配置文件 application.properties 中的 Pub/Sub 主题 ID。例如,以下配置文件使用名为 topic-one 的 Pub/Sub 主题 ID:

    # Bind the Supplier bean `sendMessageToTopicOne` to the Pub/Sub topic
    # `topic-one`. If the topic does not exist, one will be created.
    spring.cloud.stream.bindings.sendMessageToTopicOne-out-0.destination=topic-one

示例代码会将消息发布到 Pub/Sub。该示例执行以下操作:

  1. application.properties 的输出绑定目的地中找到 Pub/Sub 主题 ID topic-one
  2. 使用绑定名称 sendMessageToTopicOne-out-0 查找名为 sendMessageToTopicOneSupplier Bean。
  3. 每 10 秒向 topic-one 发送一条已编号的消息。