Using Pub/Sub in Spring applications

This page describes how to use Pub/Sub in Java applications built with the Spring Framework.

Spring Cloud GCP has several modules for sending messages to Pub/Sub topics and receiving messages from Pub/Sub subscriptions using the Spring Framework. You can use these modules independently or combine them for different use cases:

Before you begin

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Set up a Cloud Console project.

    Set up a project

    Click to:

    • Create or select a project.
    • Enable the Pub/Sub API for that project.
    • Create a service account.
    • Download a private key as JSON.

    You can view and manage these resources at any time in the Cloud Console.

  3. Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the path of the JSON file that contains your service account key. This variable only applies to your current shell session, so if you open a new session, set the variable again.

  4. Set the environment variable GOOGLE_CLOUD_PROJECT to your Cloud project ID.

Using Spring Cloud GCP Pub/Sub Starter

The Spring Cloud GCP Pub/Sub Starter module installs the Pub/Sub Java client library using the Spring Cloud GCP Pub/Sub module. You can call the Pub/Sub API from your Spring application using the classes that the Spring Cloud GCP Pub/Sub Starter provides or the Pub/Sub Java client library. If you're using the classes that the Spring Cloud GCP Pub/Sub Starter provides, you can override the default Pub/Sub configurations.

Installing the module

To install the Spring Cloud GCP Pub/Sub Starter module, add these dependencies to your pom.xml file:

  1. The Spring Cloud GCP Bill of Materials (BOM):

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>org.springframework.cloud</groupId>
          <artifactId>spring-cloud-gcp-dependencies</artifactId>
          <version>1.2.7.RELEASE</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
  2. The Spring Cloud GCP Pub/Sub Starter artifact:

    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
    </dependency>

Supported operations

The Spring Cloud GCP Pub/Sub Starter module includes the following classes:

  • PubSubAdmin for administrative operations:
    • Create topics and subscriptions.
    • Get topics and subscriptions.
    • List topics and subscriptions.
    • Delete topics and subscriptions.
    • Get and set acknowledgement deadlines on a subscription.
  • PubSubTemplate for sending and receiving messages:
    • Publish messages to topics.
    • Synchronously pull messages from subscriptions.
    • Asynchronously pull messages from subscriptions.
    • Acknowledge messages.
    • Modify acknowledgement deadlines.
    • Convert Pub/Sub messages into Plain Old Java Objects (POJOs).

Using Spring Integration channel adapters

If your Spring application uses Spring Integration message channels, you can route messages between your message channels and Pub/Sub using channel adapters.

Installing the modules

To install modules for Spring Integration channel adapters, add the following to your pom.xml file:

  1. The Spring Cloud GCP BOM.

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>org.springframework.cloud</groupId>
          <artifactId>spring-cloud-gcp-dependencies</artifactId>
          <version>1.2.7.RELEASE</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
  2. The Spring Cloud GCP Pub/Sub Starter and Spring Integration Core artifacts:

    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.integration</groupId>
      <artifactId>spring-integration-core</artifactId>
    </dependency>

Receiving messages from Pub/Sub

To receive messages from a Pub/Sub subscription in your Spring application, use an inbound channel adapter. The inbound channel adapter converts incoming Pub/Sub messages to POJOs and then forwards the POJOs to a message channel.

// Create a message channel for messages arriving from the subscription `sub-one`.
@Bean
public MessageChannel inputMessageChannel() {
  return new PublishSubscribeChannel();
}

// Create an inbound channel adapter to listen to the subscription `sub-one` and send
// messages to the input message channel.
@Bean
public PubSubInboundChannelAdapter inboundChannelAdapter(
    @Qualifier("inputMessageChannel") MessageChannel messageChannel,
    PubSubTemplate pubSubTemplate) {
  PubSubInboundChannelAdapter adapter =
      new PubSubInboundChannelAdapter(pubSubTemplate, "sub-one");
  adapter.setOutputChannel(messageChannel);
  adapter.setAckMode(AckMode.MANUAL);
  adapter.setPayloadType(String.class);
  return adapter;
}

// Define what happens to the messages arriving in the message channel.
@ServiceActivator(inputChannel = "inputMessageChannel")
public void messageReceiver(
    String payload,
    @Header(GcpPubSubHeaders.ORIGINAL_MESSAGE) BasicAcknowledgeablePubsubMessage message) {
  LOGGER.info("Message arrived via an inbound channel adapter from sub-one! Payload: " + payload);
  message.ack();
}

The example above uses the following Spring beans and Pub/Sub resource:

  • A message channel bean named inputMessageChannel.
  • An inbound channel adapter bean named inboundChannelAdapter of type PubSubInboundChannelAdapter.
  • A Pub/Sub subscription ID named sub-one.

The inboundChannelAdapter asynchronously pulls messages from sub-one using a PubSubTemplate and sends the messages to inputMessageChannel.

The inboundChannelAdapter sets the acknowledgement mode to MANUAL so the application can acknowledge messages after it processes them. The default acknowledgment mode of PubSubInboundChannelAdapter types is AUTO.

The ServiceActivator bean messageReceiver logs each message arriving in inputMessageChannel to the standard output and then acknowledges the message.

Publishing messages to Pub/Sub

To publish messages from a message channel to a Pub/Sub topic, use an outbound channel adapter. The outbound channel adapter converts POJOs to Pub/Sub messages and then sends the messages to a Pub/Sub topic.

// Create an outbound channel adapter to send messages from the input message channel to the
// topic `topic-two`.
@Bean
@ServiceActivator(inputChannel = "inputMessageChannel")
public MessageHandler messageSender(PubSubTemplate pubsubTemplate) {
  PubSubMessageHandler adapter = new PubSubMessageHandler(pubsubTemplate, "topic-two");

  adapter.setPublishCallback(
      new ListenableFutureCallback<String>() {
        @Override
        public void onFailure(Throwable throwable) {
          LOGGER.info("There was an error sending the message.");
        }

        @Override
        public void onSuccess(String result) {
          LOGGER.info("Message was sent via the outbound channel adapter to topic-two!");
        }
      });
  return adapter;
}

The example above uses the following Spring beans and Pub/Sub resource:

  • A message channel bean named inputMessageChannel.
  • An outbound channel adapter bean named messageSender of type PubSubMessageHandler.
  • A Pub/Sub topic ID named topic-two.

The ServiceActivator bean applies the logic in messageSender to each message in inputMessageChannel.

The PubSubMessageHandler in messageSender publishes messages in the inputMessageChannel using a PubSubTemplate. The PubSubMessageHandler publishes messages to the Pub/Sub topic topic-two.

Using Spring Cloud Stream Binder

To call the Pub/Sub API in a Spring Cloud Stream application, use the Spring Cloud GCP Pub/Sub Stream Binder module.

Installing the module

To install the Spring Cloud Stream Binder module, add the following to your pom.xml file:

  1. The Spring Cloud GCP BOM.

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>org.springframework.cloud</groupId>
          <artifactId>spring-cloud-gcp-dependencies</artifactId>
          <version>1.2.7.RELEASE</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
  2. The Spring Cloud Stream Binder artifact:

    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-gcp-pubsub-stream-binder</artifactId>
    </dependency>

Receiving messages from Pub/Sub

To use your application as an event sink, configure the input binder by specifying the following:

  • A Consumer bean that defines message handling logic. For example, the following Consumer bean is named receiveMessageFromTopicTwo:

    // Create an input binder to receive messages from `topic-two` using a Consumer bean.
    @Bean
    public Consumer<Message<String>> receiveMessageFromTopicTwo() {
      return message -> {
        LOGGER.info(
            "Message arrived via an input binder from topic-two! Payload: " + message.getPayload());
      };
    }
  • A Pub/Sub topic ID in the configuration file application.properties. For example, the following configuration file uses a Pub/Sub topic ID named topic-two:

    # Bind the Pub/Sub topic `topic-two` to the Consumer bean
    # `receiveMessageFromTopicTwo`. Your Spring application will
    # automatically create and attach a subscription to the topic.
    spring.cloud.stream.bindings.receiveMessageFromTopicTwo-in-0.destination=topic-two

The example code receives messages from Pub/Sub. The example does the following:

  1. Finds the Pub/Sub topic ID topic-two in the input binding destination in application.properties.
  2. Creates a Pub/Sub subscription to topic-two.
  3. Uses the binding name receiveMessageFromTopicTwo-in-0 to find the Consumer bean named receiveMessageFromTopicTwo.
  4. Prints incoming messages to the standard output and automatically acknowledges them.

Publishing messages to Pub/Sub

To use your application as an event source, configure the output binder by specifying the following:

  • A Supplier bean that defines where messages come from within your application. For example, the following Supplier bean is named sendMessageToTopicOne:

    // Create an output binder to send messages to `topic-one` using a Supplier bean.
    @Bean
    public Supplier<Flux<Message<String>>> sendMessageToTopicOne() {
      return () ->
          Flux.<Message<String>>generate(
              sink -> {
                try {
                  Thread.sleep(10000);
                } catch (InterruptedException e) {
                  // stop sleep earlier.
                }
    
                Message<String> message =
                    MessageBuilder.withPayload("message-" + rand.nextInt(1000)).build();
                LOGGER.info(
                    "Sending a message via the output binder to topic-one! Payload: "
                        + message.getPayload());
                sink.next(message);
              })
              .subscribeOn(Schedulers.elastic());
    }
  • A Pub/Sub topic ID in the configuration file application.properties. For example, the following configuration file uses a Pub/Sub topic ID named topic-one:

    # Bind the Supplier bean `sendMessageToTopicOne` to the Pub/Sub topic
    # `topic-one`. If the topic does not exist, one will be created.
    spring.cloud.stream.bindings.sendMessageToTopicOne-out-0.destination=topic-one

The example code publishes messages to Pub/Sub. The example does the following:

  1. Finds the Pub/Sub topic ID topic-one in the output binding destination in application.properties.
  2. Uses the binding name sendMessageToTopicOne-out-0 to find the Supplier bean named sendMessageToTopicOne.
  3. Sends a numbered message to topic-one every 10 seconds.