Como usar o Pub/Sub em aplicativos do Spring

Esta página descreve como usar o Pub/Sub em aplicativos Java criados com o Spring Framework.

O Spring Cloud GCP tem vários módulos para enviar mensagens para tópicos do Pub/Sub e receber mensagens de assinaturas do Pub/Sub usando o Spring Framework. Use esses módulos de maneira independente ou combine-os em diferentes casos de uso:

OBSERVAÇÃO: a biblioteca Spring Cloud GCP não oferece acesso a AckReplyConsumerWithResponse, que é um módulo obrigatório para implementar o recurso exatamente uma vez usando a biblioteca de cliente Java.

Antes de começar

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Set up a Google Cloud console project.

    Set up a project

    Click to:

    • Create or select a project.
    • Enable the Pub/Sub API for that project.
    • Create a service account.
    • Download a private key as JSON.

    You can view and manage these resources at any time in the Google Cloud console.

  3. Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again.

  4. Set up a Google Cloud console project.

    Set up a project

    Click to:

    • Create or select a project.
    • Enable the Pub/Sub API for that project.
    • Create a service account.
    • Download a private key as JSON.

    You can view and manage these resources at any time in the Google Cloud console.

  5. Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again.

  6. Defina a variável de ambiente GOOGLE_CLOUD_PROJECT como seu ID do projeto do Google Cloud.

Como usar o Spring Cloud GCP Pub/Sub Starter

O módulo Spring Cloud GCP Pub/Sub Starter instala a biblioteca de cliente Java do Pub/Sub usando o módulo Spring Cloud GCP Pub/Sub. É possível chamar a API Pub/Sub no aplicativo Spring usando as classes que o Spring Cloud GCP Pub/Sub Starter fornece ou a biblioteca de cliente Java do Pub/Sub. Se estiver usando as classes que o Spring Cloud GCP Spring Starter fornece, você poderá substituir as configurações padrão do Pub/Sub.

Como instalar o módulo

Para instalar o módulo Spring Cloud GCP Pub/Sub Starter, adicione estas dependências ao seu arquivo pom.xml:

  1. A lista de materiais (BOM) do Spring Cloud:

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>com.google.cloud</groupId>
          <artifactId>spring-cloud-gcp-dependencies</artifactId>
          <version>3.7.7</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
  2. O artefato Spring Cloud GCP Pub/Sub Starter:

    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
    </dependency>

Operações suportadas

O módulo Spring Cloud GCP Pub/Sub Starter inclui as seguintes classes:

  • PubSubAdmin para operações administrativas:
    • Criar tópicos e assinaturas
    • Receber tópicos e assinaturas.
    • Listar tópicos e assinaturas.
    • Excluir tópicos e assinaturas.
    • Receber e definir prazos de confirmação em uma assinatura.
  • PubSubTemplate para enviar e receber mensagens:
    • Publicar mensagens em tópicos.
    • Receber mensagens de assinaturas de maneira síncrona.
    • Receber mensagens de assinaturas de maneira assíncrona.
    • Confirmar mensagens.
    • Modificar prazos de confirmação.
    • Converter mensagens do Pub/Sub em objetos Java Plain Old (POJOs).

Usar adaptadores de canal da Spring Integration

Se seu aplicativo Spring usa canais de mensagens da Spring Integration, é possível rotear mensagens entre seus canais de mensagem e o Pub/Sub usando adaptadores de canal.

Como instalar os módulos

Para instalar módulos para adaptadores do canal Spring Integration, adicione o seguinte ao seu arquivo pom.xml:

  1. O BOM do Spring Cloud GCP (em inglês).

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>com.google.cloud</groupId>
          <artifactId>spring-cloud-gcp-dependencies</artifactId>
          <version>3.7.7</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
  2. Os artefatos do Spring Cloud GCP Pub/Sub Starter e da Spring Integration Core:

    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.integration</groupId>
      <artifactId>spring-integration-core</artifactId>
    </dependency>

Como receber mensagens do Pub/Sub

Para receber mensagens de uma assinatura do Pub/Sub no aplicativo Spring, use um adaptador de canal de entrada. O adaptador de canal de entrada converte as mensagens do Pub/Sub recebidas em POJOs e, em seguida, encaminha os POJOs para um canal de mensagens.

// Create a message channel for messages arriving from the subscription `sub-one`.
@Bean
public MessageChannel inputMessageChannel() {
  return new PublishSubscribeChannel();
}

// Create an inbound channel adapter to listen to the subscription `sub-one` and send
// messages to the input message channel.
@Bean
public PubSubInboundChannelAdapter inboundChannelAdapter(
    @Qualifier("inputMessageChannel") MessageChannel messageChannel,
    PubSubTemplate pubSubTemplate) {
  PubSubInboundChannelAdapter adapter =
      new PubSubInboundChannelAdapter(pubSubTemplate, "sub-one");
  adapter.setOutputChannel(messageChannel);
  adapter.setAckMode(AckMode.MANUAL);
  adapter.setPayloadType(String.class);
  return adapter;
}

// Define what happens to the messages arriving in the message channel.
@ServiceActivator(inputChannel = "inputMessageChannel")
public void messageReceiver(
    String payload,
    @Header(GcpPubSubHeaders.ORIGINAL_MESSAGE) BasicAcknowledgeablePubsubMessage message) {
  LOGGER.info("Message arrived via an inbound channel adapter from sub-one! Payload: " + payload);
  message.ack();
}

O exemplo acima usa os seguintes beans e recursos do Pub/Sub:

  • Um bean do canal de mensagens chamado inputMessageChannel.
  • Um adaptador de canal de entrada é chamado de inboundChannelAdapter do tipo PubSubInboundChannelAdapter.
  • Um código de assinatura do Pub/Sub chamado sub-one.

O inboundChannelAdapter extrai mensagens de sub-one de forma assíncrona usando um PubSubTemplate e envia as mensagens para inputMessageChannel.

O inboundChannelAdapter define o modo de confirmação como MANUAL para que o aplicativo reconheça as mensagens depois de processá-las. O modo de confirmação padrão dos tipos PubSubInboundChannelAdapter é AUTO.

O messageReceiver do bean ServiceActivator registra cada mensagem que chega em inputMessageChannel na saída padrão e, em seguida, confirma a mensagem.

Como publicar mensagens no Pub/Sub

Para publicar mensagens de um canal de mensagens em um tópico do Pub/Sub, use um adaptador de canal de saída. O adaptador de canal de saída converte os POJOs para mensagens do Pub/Sub e, em seguida, envia as mensagens para um tópico do Pub/Sub.

// Create an outbound channel adapter to send messages from the input message channel to the
// topic `topic-two`.
@Bean
@ServiceActivator(inputChannel = "inputMessageChannel")
public MessageHandler messageSender(PubSubTemplate pubsubTemplate) {
  PubSubMessageHandler adapter = new PubSubMessageHandler(pubsubTemplate, "topic-two");

  adapter.setSuccessCallback(
      ((ackId, message) ->
          LOGGER.info("Message was sent via the outbound channel adapter to topic-two!")));

  adapter.setFailureCallback(
      (cause, message) -> LOGGER.info("Error sending " + message + " due to " + cause));

  return adapter;
}

O exemplo acima usa os seguintes beans e o recurso Pub/Sub:

  • Um bean do canal de mensagens chamado inputMessageChannel.
  • Um bean do adaptador de canal de saída chamado messageSender do tipo PubSubMessageHandler.
  • Um ID de tópico do Pub/Sub chamado topic-two.

O bean ServiceActivator aplica a lógica em messageSender a cada mensagem em inputMessageChannel.

O PubSubMessageHandler em messageSender publica mensagens na inputMessageChannel usando um PubSubTemplate. O PubSubMessageHandler publica mensagens no tópico topic-two do Pub/Sub.

Como usar o Spring Cloud Stream Binder

Para chamar a API Pub/Sub em um aplicativo Spring Cloud Stream, use o módulo Spring Cloud GCP Pub/Sub Stream Binder.

Como instalar o módulo

Para instalar o módulo Springer do Spring Cloud Stream, adicione o seguinte ao arquivo pom.xml:

  1. O BOM do Spring Cloud GCP (em inglês).

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>com.google.cloud</groupId>
          <artifactId>spring-cloud-gcp-dependencies</artifactId>
          <version>3.7.7</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
  2. O artefato Spring Cloud Stream Binder:

    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>spring-cloud-gcp-pubsub-stream-binder</artifactId>
    </dependency>

Como receber mensagens do Pub/Sub

Para usar seu aplicativo como um coletor de eventos, configure o vinculador de entrada especificando o seguinte:

  • Um bean Consumer que define a lógica de gerenciamento de mensagens. Por exemplo, o bean Consumer a seguir é chamado de receiveMessageFromTopicTwo:

    // Create an input binder to receive messages from `topic-two` using a Consumer bean.
    @Bean
    public Consumer<Message<String>> receiveMessageFromTopicTwo() {
      return message -> {
        LOGGER.info(
            "Message arrived via an input binder from topic-two! Payload: " + message.getPayload());
      };
    }
  • Um ID do tópico Pub/Sub no arquivo de configuração application.properties. Por exemplo, o arquivo de configuração a seguir usa um ID de tópico Pub/Sub chamado topic-two:

    # Bind the Pub/Sub topic `topic-two` to the Consumer bean
    # `receiveMessageFromTopicTwo`. Your Spring application will
    # automatically create and attach a subscription to the topic.
    spring.cloud.stream.bindings.receiveMessageFromTopicTwo-in-0.destination=topic-two

O código de exemplo recebe mensagens do Pub/Sub. O exemplo faz o seguinte:

  1. Encontra o ID do tópico do Pub/Sub topic-two no destino de vinculação de entrada em application.properties.
  2. Cria uma assinatura do Pub/Sub para topic-two.
  3. Usa o nome de vinculação receiveMessageFromTopicTwo-in-0 para encontrar o bean Consumer chamado receiveMessageFromTopicTwo.
  4. Imprime as mensagens recebidas na saída padrão e as confirma automaticamente.

Como publicar mensagens no Pub/Sub

Para usar seu aplicativo como uma origem de evento, configure o vinculador de saída especificando o seguinte:

  • Um bean Supplier que define a origem das mensagens dentro do seu aplicativo. Por exemplo, o bean Supplier a seguir é chamado de sendMessageToTopicOne:

    // Create an output binder to send messages to `topic-one` using a Supplier bean.
    @Bean
    public Supplier<Flux<Message<String>>> sendMessageToTopicOne() {
      return () ->
          Flux.<Message<String>>generate(
                  sink -> {
                    try {
                      Thread.sleep(10000);
                    } catch (InterruptedException e) {
                      // Stop sleep earlier.
                    }
    
                    Message<String> message =
                        MessageBuilder.withPayload("message-" + rand.nextInt(1000)).build();
                    LOGGER.info(
                        "Sending a message via the output binder to topic-one! Payload: "
                            + message.getPayload());
                    sink.next(message);
                  })
              .subscribeOn(Schedulers.boundedElastic());
    }
  • Um ID do tópico Pub/Sub no arquivo de configuração application.properties. Por exemplo, o arquivo de configuração a seguir usa um ID de tópico Pub/Sub chamado topic-one:

    # Bind the Supplier bean `sendMessageToTopicOne` to the Pub/Sub topic
    # `topic-one`. If the topic does not exist, one will be created.
    spring.cloud.stream.bindings.sendMessageToTopicOne-out-0.destination=topic-one

O código de exemplo publica mensagens no Pub/Sub. O exemplo faz o seguinte:

  1. Encontra o ID do tópico do Pub/Sub topic-one no destino de vinculação de saída em application.properties.
  2. Usa o nome de vinculação sendMessageToTopicOne-out-0 para encontrar o bean Supplier chamado sendMessageToTopicOne.
  3. Envia uma mensagem numerada para topic-one a cada 10 segundos.