在 Spring 应用中使用 Pub/Sub

本页面介绍了如何在通过 Spring Framework 构建的 Java 应用中使用 Pub/Sub。

Spring Cloud GCP 具有多个模块,用于向 Pub/Sub 主题发送消息,以及使用 Spring Framework 从 Pub/Sub 订阅中接收消息。您可以单独使用这些模块,也可以将它们组合起来用于不同的使用场景:

注意:Spring Cloud GCP 库不提供对 AckReplyConsumerWithResponse, 这是一个必需的模块,使用 Java 客户端库

准备工作

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Set up a Google Cloud console project.

    Set up a project

    Click to:

    • Create or select a project.
    • Enable the Pub/Sub API for that project.
    • Create a service account.
    • Download a private key as JSON.

    You can view and manage these resources at any time in the Google Cloud console.

  3. Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again.

  4. Set up a Google Cloud console project.

    Set up a project

    Click to:

    • Create or select a project.
    • Enable the Pub/Sub API for that project.
    • Create a service account.
    • Download a private key as JSON.

    You can view and manage these resources at any time in the Google Cloud console.

  5. Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again.

  6. 将环境变量 GOOGLE_CLOUD_PROJECT 设置为您的 Google Cloud 项目 ID。

使用 Spring Cloud GCP Pub/Sub 入门版

Spring Cloud GCP Pub/Sub 入门版模块使用 Spring Cloud GCP Pub/Sub 模块安装 Pub/Sub Java 客户端库。您可以使用 Spring Cloud GCP Pub/Sub 入门版提供的类或 Pub/Sub Java 客户端库从 Spring 应用调用 Pub/Sub API。如果您使用的是 Spring Cloud GCP Pub/Sub 入门版提供的类,则可以替换默认的 Pub/Sub 配置

安装模块

如需安装 Spring Cloud GCP Pub/Sub 入门版模块,请将以下依赖项添加到 pom.xml 文件中:

  1. Spring Cloud 物料清单 (BOM)

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>com.google.cloud</groupId>
          <artifactId>spring-cloud-gcp-dependencies</artifactId>
          <version>3.7.7</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
  2. Spring Cloud GCP Pub/Sub 入门版工件:

    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
    </dependency>

支持的操作

Spring Cloud GCP Pub/Sub 入门版模块包括以下类:

  • PubSubAdmin,用于管理操作:
    • 创建主题和订阅。
    • 获取主题和订阅。
    • 列出主题和订阅。
    • 删除主题和订阅。
    • 获取和设置订阅的确认时限。
  • PubSubTemplate,用于发送和接收消息:
    • 向主题发布消息。
    • 从订阅中同步拉取消息。
    • 从订阅中异步拉取消息。
    • 确认消息。
    • 修改确认时限。
    • 将 Pub/Sub 消息转换为普通的旧式 Java 对象 (POJO)。

使用 Spring Integration 渠道适配器

如果您的 Spring 应用使用 Spring Integration 消息通道,您可以使用渠道适配器在消息渠道和 Pub/Sub 之间路由消息。

  • 入站通道适配器 将消息从 Pub/Sub 订阅转发到消息通道。
  • 出站渠道适配器将消息渠道的消息发布到 Pub/Sub 主题。

安装模块

如需为 Spring Integration 渠道适配器安装模块,请将以下内容添加到 pom.xml 文件中:

  1. Spring Cloud GCP BOM

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>com.google.cloud</groupId>
          <artifactId>spring-cloud-gcp-dependencies</artifactId>
          <version>3.7.7</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
  2. Spring Cloud GCP Pub/Sub 入门版和 Spring Integration 核心工件:

    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.integration</groupId>
      <artifactId>spring-integration-core</artifactId>
    </dependency>

从 Pub/Sub 接收消息

如需从 Spring 应用中的 Pub/Sub 订阅接收消息,请使用入站渠道适配器。入站渠道适配器将传入的 Pub/Sub 消息转换为 POJO,然后将 POJO 转发到消息渠道。

// Create a message channel for messages arriving from the subscription `sub-one`.
@Bean
public MessageChannel inputMessageChannel() {
  return new PublishSubscribeChannel();
}

// Create an inbound channel adapter to listen to the subscription `sub-one` and send
// messages to the input message channel.
@Bean
public PubSubInboundChannelAdapter inboundChannelAdapter(
    @Qualifier("inputMessageChannel") MessageChannel messageChannel,
    PubSubTemplate pubSubTemplate) {
  PubSubInboundChannelAdapter adapter =
      new PubSubInboundChannelAdapter(pubSubTemplate, "sub-one");
  adapter.setOutputChannel(messageChannel);
  adapter.setAckMode(AckMode.MANUAL);
  adapter.setPayloadType(String.class);
  return adapter;
}

// Define what happens to the messages arriving in the message channel.
@ServiceActivator(inputChannel = "inputMessageChannel")
public void messageReceiver(
    String payload,
    @Header(GcpPubSubHeaders.ORIGINAL_MESSAGE) BasicAcknowledgeablePubsubMessage message) {
  LOGGER.info("Message arrived via an inbound channel adapter from sub-one! Payload: " + payload);
  message.ack();
}

上面的示例使用以下 Spring Bean 和 Pub/Sub 资源:

  • 名为 inputMessageChannel 的消息渠道 Bean。
  • 名为 inboundChannelAdapter 的入站渠道适配器 Bean,类型为 PubSubInboundChannelAdapter
  • 名为 sub-one 的 Pub/Sub 订阅 ID。

inboundChannelAdapter 使用 PubSubTemplatesub-one 中异步拉取消息,并将消息发送到 inputMessageChannel

inboundChannelAdapter 会将确认模式设置为 MANUAL,以便应用可以在处理消息后进行确认。PubSubInboundChannelAdapter 类型的默认确认模式是 AUTO

ServiceActivator Bean messageReceiver 会将到达 inputMessageChannel 的每条消息记录到标准输出,然后确认消息。

将消息发布到 Pub/Sub

如需将消息渠道的消息发布到 Pub/Sub 主题,请使用出站渠道适配器。出站渠道适配器将 POJO 转换为 Pub/Sub 消息,然后将这些消息发送到 Pub/Sub 主题。

// Create an outbound channel adapter to send messages from the input message channel to the
// topic `topic-two`.
@Bean
@ServiceActivator(inputChannel = "inputMessageChannel")
public MessageHandler messageSender(PubSubTemplate pubsubTemplate) {
  PubSubMessageHandler adapter = new PubSubMessageHandler(pubsubTemplate, "topic-two");

  adapter.setSuccessCallback(
      ((ackId, message) ->
          LOGGER.info("Message was sent via the outbound channel adapter to topic-two!")));

  adapter.setFailureCallback(
      (cause, message) -> LOGGER.info("Error sending " + message + " due to " + cause));

  return adapter;
}

上面的示例使用以下 Spring Bean 和 Pub/Sub 资源:

  • 名为 inputMessageChannel 的消息渠道 Bean。
  • 名为 messageSender 的出站渠道适配器 Bean,类型为 PubSubMessageHandler
  • 名为 topic-two 的 Pub/Sub 主题 ID。

ServiceActivator Bean 将 messageSender 中的逻辑应用于 inputMessageChannel 中的每条消息。

messageSender 中的 PubSubMessageHandler 使用 PubSubTemplateinputMessageChannel 中发布消息。PubSubMessageHandler 会将消息发布到 Pub/Sub 主题 topic-two

使用 Spring Cloud Stream Binder

如需在 Spring Cloud Stream 应用中调用 Pub/Sub API,请使用 Spring Cloud GCP Pub/Sub Stream Binder 模块。

安装模块

如需安装 Spring Cloud Stream Binder 模块,请将以下内容添加到 pom.xml 文件中:

  1. Spring Cloud GCP BOM

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>com.google.cloud</groupId>
          <artifactId>spring-cloud-gcp-dependencies</artifactId>
          <version>3.7.7</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
  2. Spring Cloud Stream Binder 工件:

    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>spring-cloud-gcp-pubsub-stream-binder</artifactId>
    </dependency>

从 Pub/Sub 接收消息

要将应用用作事件接收器,请通过指定以下内容来配置输入 Binder:

  • 定义消息处理逻辑的 Consumer Bean。例如,以下 Consumer Bean 名为 receiveMessageFromTopicTwo

    // Create an input binder to receive messages from `topic-two` using a Consumer bean.
    @Bean
    public Consumer<Message<String>> receiveMessageFromTopicTwo() {
      return message -> {
        LOGGER.info(
            "Message arrived via an input binder from topic-two! Payload: " + message.getPayload());
      };
    }
  • 配置文件 application.properties 中的 Pub/Sub 主题 ID。例如,以下配置文件使用名为 topic-two 的 Pub/Sub 主题 ID:

    # Bind the Pub/Sub topic `topic-two` to the Consumer bean
    # `receiveMessageFromTopicTwo`. Your Spring application will
    # automatically create and attach a subscription to the topic.
    spring.cloud.stream.bindings.receiveMessageFromTopicTwo-in-0.destination=topic-two

示例代码会从 Pub/Sub 接收消息。该示例执行以下操作:

  1. application.properties 的输入绑定目的地中找到 Pub/Sub 主题 ID topic-two
  2. 创建针对 topic-two 的 Pub/Sub 订阅。
  3. 使用绑定名称 receiveMessageFromTopicTwo-in-0 查找名为 receiveMessageFromTopicTwoConsumer Bean。
  4. 将传入的消息输出到标准输出并自动确认。

将消息发布到 Pub/Sub

要将应用用作事件源,请指定以下内容以配置输出 Binder:

  • 一个 Supplier Bean,用于定义消息来自应用中的何处。例如,以下 Supplier Bean 名为 sendMessageToTopicOne

    // Create an output binder to send messages to `topic-one` using a Supplier bean.
    @Bean
    public Supplier<Flux<Message<String>>> sendMessageToTopicOne() {
      return () ->
          Flux.<Message<String>>generate(
                  sink -> {
                    try {
                      Thread.sleep(10000);
                    } catch (InterruptedException e) {
                      // Stop sleep earlier.
                    }
    
                    Message<String> message =
                        MessageBuilder.withPayload("message-" + rand.nextInt(1000)).build();
                    LOGGER.info(
                        "Sending a message via the output binder to topic-one! Payload: "
                            + message.getPayload());
                    sink.next(message);
                  })
              .subscribeOn(Schedulers.boundedElastic());
    }
  • 配置文件 application.properties 中的 Pub/Sub 主题 ID。例如,以下配置文件使用名为 topic-one 的 Pub/Sub 主题 ID:

    # Bind the Supplier bean `sendMessageToTopicOne` to the Pub/Sub topic
    # `topic-one`. If the topic does not exist, one will be created.
    spring.cloud.stream.bindings.sendMessageToTopicOne-out-0.destination=topic-one

示例代码会将消息发布到 Pub/Sub。该示例执行以下操作:

  1. application.properties 的输出绑定目的地中找到 Pub/Sub 主题 ID topic-one
  2. 使用绑定名称 sendMessageToTopicOne-out-0 查找名为 sendMessageToTopicOneSupplier Bean。
  3. 每 10 秒向 topic-one 发送一条已编号的消息。