Como usar o Pub/Sub em aplicativos do Spring

Esta página descreve como usar o Pub/Sub em aplicativos Java criados com o Spring Framework.

O Spring Cloud GCP tem vários módulos para enviar mensagens para tópicos do Pub/Sub e receber mensagens de assinaturas do Pub/Sub usando o Spring Framework. Use esses módulos de maneira independente ou combine-os em diferentes casos de uso:

OBSERVAÇÃO: a biblioteca Spring Cloud GCP não fornece acesso a AckReplyConsumerWithResponse, que é um módulo necessário para implementar o recurso "exatamente uma vez" usando a biblioteca de cliente Java.

Antes de começar

  1. Faça login na sua conta do Google Cloud. Se você começou a usar o Google Cloud agora, crie uma conta para avaliar o desempenho de nossos produtos em situações reais. Clientes novos também recebem US$ 300 em créditos para executar, testar e implantar cargas de trabalho.
  2. Configurar um projeto do Google Cloud.

    Configurar um projeto

    Clique para:

    • Crie ou selecione um projeto.
    • ativar a API Pub/Sub para esse projeto;
    • criar uma conta de serviço;
    • fazer o download de uma chave privada como JSON.

    É possível visualizar e gerenciar esses recursos a qualquer momento no Console do Google Cloud.

  3. Defina a variável de ambiente GOOGLE_APPLICATION_CREDENTIALS como o caminho do arquivo JSON que contém suas credenciais. Essa variável só se aplica à sessão de shell atual. Assim, se você abrir uma nova sessão, precisará definir a variável novamente.

  4. Configurar um projeto do Google Cloud.

    Configurar um projeto

    Clique para:

    • Crie ou selecione um projeto.
    • ativar a API Pub/Sub para esse projeto;
    • criar uma conta de serviço;
    • fazer o download de uma chave privada como JSON.

    É possível visualizar e gerenciar esses recursos a qualquer momento no Console do Google Cloud.

  5. Defina a variável de ambiente GOOGLE_APPLICATION_CREDENTIALS como o caminho do arquivo JSON que contém suas credenciais. Essa variável só se aplica à sessão de shell atual. Assim, se você abrir uma nova sessão, precisará definir a variável novamente.

  6. Defina a variável de ambiente GOOGLE_CLOUD_PROJECT como o ID do projeto do Google Cloud.

Como usar o Spring Cloud GCP Pub/Sub Starter

O módulo Spring Cloud GCP Pub/Sub Starter (em inglês) instala a biblioteca de cliente Java do Pub/Sub usando o módulo Spring Cloud GCP Pub/Sub. É possível chamar a API Pub/Sub no aplicativo Spring usando as classes que o Spring Cloud GCP Pub/Sub Starter fornece ou a biblioteca de cliente Java do Pub/Sub. Se estiver usando as classes que o Spring Cloud GCP Spring Starter fornece, você poderá substituir as configurações padrão do Pub/Sub.

Como instalar o módulo

Para instalar o módulo Spring Cloud GCP Pub/Sub Starter, adicione estas dependências ao seu arquivo pom.xml:

  1. Lista de materiais (BoM) da Spring Cloud:

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>com.google.cloud</groupId>
          <artifactId>spring-cloud-gcp-dependencies</artifactId>
          <version>3.7.7</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
  2. O artefato Spring Cloud GCP Pub/Sub Starter:

    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
    </dependency>

Operações suportadas

O módulo Spring Cloud GCP Pub/Sub Starter inclui as seguintes classes:

  • PubSubAdmin para operações administrativas:
    • Criar tópicos e assinaturas
    • Receber tópicos e assinaturas.
    • Listar tópicos e assinaturas.
    • Excluir tópicos e assinaturas.
    • Receber e definir prazos de confirmação em uma assinatura.
  • PubSubTemplate para enviar e receber mensagens:
    • Publicar mensagens em tópicos.
    • Receber mensagens de assinaturas de maneira síncrona.
    • Receber mensagens de assinaturas de maneira assíncrona.
    • Confirmar mensagens.
    • Modificar prazos de confirmação.
    • Converter mensagens do Pub/Sub em objetos Java Plain Old (POJOs).

Usar adaptadores de canal da Spring Integration

Se seu aplicativo Spring usa canais de mensagens da Spring Integration, é possível rotear mensagens entre seus canais de mensagem e o Pub/Sub usando adaptadores de canal.

Como instalar os módulos

Para instalar módulos para adaptadores do canal Spring Integration, adicione o seguinte ao seu arquivo pom.xml:

  1. O BOM do Spring Cloud GCP (em inglês).

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>com.google.cloud</groupId>
          <artifactId>spring-cloud-gcp-dependencies</artifactId>
          <version>3.7.7</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
  2. Os artefatos do Spring Cloud GCP Pub/Sub Starter e da Spring Integration Core:

    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.integration</groupId>
      <artifactId>spring-integration-core</artifactId>
    </dependency>

Como receber mensagens do Pub/Sub

Para receber mensagens de uma assinatura do Pub/Sub no aplicativo Spring, use um adaptador de canal de entrada. O adaptador de canal de entrada converte as mensagens do Pub/Sub recebidas em POJOs e, em seguida, encaminha os POJOs para um canal de mensagens.

// Create a message channel for messages arriving from the subscription `sub-one`.
@Bean
public MessageChannel inputMessageChannel() {
  return new PublishSubscribeChannel();
}

// Create an inbound channel adapter to listen to the subscription `sub-one` and send
// messages to the input message channel.
@Bean
public PubSubInboundChannelAdapter inboundChannelAdapter(
    @Qualifier("inputMessageChannel") MessageChannel messageChannel,
    PubSubTemplate pubSubTemplate) {
  PubSubInboundChannelAdapter adapter =
      new PubSubInboundChannelAdapter(pubSubTemplate, "sub-one");
  adapter.setOutputChannel(messageChannel);
  adapter.setAckMode(AckMode.MANUAL);
  adapter.setPayloadType(String.class);
  return adapter;
}

// Define what happens to the messages arriving in the message channel.
@ServiceActivator(inputChannel = "inputMessageChannel")
public void messageReceiver(
    String payload,
    @Header(GcpPubSubHeaders.ORIGINAL_MESSAGE) BasicAcknowledgeablePubsubMessage message) {
  LOGGER.info("Message arrived via an inbound channel adapter from sub-one! Payload: " + payload);
  message.ack();
}

O exemplo acima usa os seguintes recursos Springs e Pub/Sub:

  • Um canal de mensagens deve ser chamado de inputMessageChannel.
  • Um adaptador de canal de entrada é chamado de inboundChannelAdapter do tipo PubSubInboundChannelAdapter.
  • Um código de assinatura do Pub/Sub chamado sub-one.

O inboundChannelAdapter extrai mensagens de sub-one de forma assíncrona usando um PubSubTemplate e envia as mensagens para inputMessageChannel.

O inboundChannelAdapter define o modo de confirmação como MANUAL para que o aplicativo reconheça as mensagens depois de processá-las. O modo de confirmação padrão dos tipos PubSubInboundChannelAdapter é AUTO.

O ServiceActivator messageReceiver registra cada mensagem que chega em inputMessageChannel à saída padrão e, em seguida, confirma a mensagem.

Como publicar mensagens no Pub/Sub

Para publicar mensagens de um canal de mensagens em um tópico do Pub/Sub, use um adaptador de canal de saída. O adaptador de canal de saída converte os POJOs para mensagens do Pub/Sub e, em seguida, envia as mensagens para um tópico do Pub/Sub.

// Create an outbound channel adapter to send messages from the input message channel to the
// topic `topic-two`.
@Bean
@ServiceActivator(inputChannel = "inputMessageChannel")
public MessageHandler messageSender(PubSubTemplate pubsubTemplate) {
  PubSubMessageHandler adapter = new PubSubMessageHandler(pubsubTemplate, "topic-two");

  adapter.setSuccessCallback(
      ((ackId, message) ->
          LOGGER.info("Message was sent via the outbound channel adapter to topic-two!")));

  adapter.setFailureCallback(
      (cause, message) -> LOGGER.info("Error sending " + message + " due to " + cause));

  return adapter;
}

O exemplo acima usa os seguintes beans e o recurso Pub/Sub:

  • Um bean do canal de mensagens chamado inputMessageChannel.
  • Um bean do adaptador de canal de saída chamado messageSender do tipo PubSubMessageHandler.
  • Um ID de tópico do Pub/Sub chamado topic-two.

O bean ServiceActivator aplica a lógica em messageSender a cada mensagem em inputMessageChannel.

O PubSubMessageHandler em messageSender publica mensagens na inputMessageChannel usando um PubSubTemplate. O PubSubMessageHandler publica mensagens no tópico topic-two do Pub/Sub.

Como usar o Spring Cloud Stream Binder

Para chamar a API Pub/Sub em um aplicativo Spring Cloud Stream, use o módulo Spring Cloud GCP Pub/Sub Stream Binder.

Como instalar o módulo

Para instalar o módulo Springer do Spring Cloud Stream, adicione o seguinte ao arquivo pom.xml:

  1. O BOM do Spring Cloud GCP (em inglês).

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>com.google.cloud</groupId>
          <artifactId>spring-cloud-gcp-dependencies</artifactId>
          <version>3.7.7</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
  2. O artefato Spring Cloud Stream Binder:

    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>spring-cloud-gcp-pubsub-stream-binder</artifactId>
    </dependency>

Como receber mensagens do Pub/Sub

Para usar seu aplicativo como um coletor de eventos, configure o vinculador de entrada especificando o seguinte:

  • Um bean Consumer que define a lógica de gerenciamento de mensagens. Por exemplo, o bean Consumer a seguir é chamado de receiveMessageFromTopicTwo:

    // Create an input binder to receive messages from `topic-two` using a Consumer bean.
    @Bean
    public Consumer<Message<String>> receiveMessageFromTopicTwo() {
      return message -> {
        LOGGER.info(
            "Message arrived via an input binder from topic-two! Payload: " + message.getPayload());
      };
    }
  • Um ID do tópico Pub/Sub no arquivo de configuração application.properties. Por exemplo, o arquivo de configuração a seguir usa um ID de tópico Pub/Sub chamado topic-two:

    # Bind the Pub/Sub topic `topic-two` to the Consumer bean
    # `receiveMessageFromTopicTwo`. Your Spring application will
    # automatically create and attach a subscription to the topic.
    spring.cloud.stream.bindings.receiveMessageFromTopicTwo-in-0.destination=topic-two

O código de exemplo recebe mensagens do Pub/Sub. O exemplo faz o seguinte:

  1. Encontra o ID do tópico do Pub/Sub topic-two no destino de vinculação de entrada em application.properties.
  2. Cria uma assinatura do Pub/Sub para topic-two.
  3. Usa o nome de vinculação receiveMessageFromTopicTwo-in-0 para encontrar o bean Consumer chamado receiveMessageFromTopicTwo.
  4. Imprime as mensagens recebidas na saída padrão e as confirma automaticamente.

Como publicar mensagens no Pub/Sub

Para usar seu aplicativo como uma origem de evento, configure o vinculador de saída especificando o seguinte:

  • Um bean Supplier que define a origem das mensagens dentro do seu aplicativo. Por exemplo, o bean Supplier a seguir é chamado de sendMessageToTopicOne:

    // Create an output binder to send messages to `topic-one` using a Supplier bean.
    @Bean
    public Supplier<Flux<Message<String>>> sendMessageToTopicOne() {
      return () ->
          Flux.<Message<String>>generate(
                  sink -> {
                    try {
                      Thread.sleep(10000);
                    } catch (InterruptedException e) {
                      // Stop sleep earlier.
                    }
    
                    Message<String> message =
                        MessageBuilder.withPayload("message-" + rand.nextInt(1000)).build();
                    LOGGER.info(
                        "Sending a message via the output binder to topic-one! Payload: "
                            + message.getPayload());
                    sink.next(message);
                  })
              .subscribeOn(Schedulers.boundedElastic());
    }
  • Um ID do tópico Pub/Sub no arquivo de configuração application.properties. Por exemplo, o arquivo de configuração a seguir usa um ID de tópico Pub/Sub chamado topic-one:

    # Bind the Supplier bean `sendMessageToTopicOne` to the Pub/Sub topic
    # `topic-one`. If the topic does not exist, one will be created.
    spring.cloud.stream.bindings.sendMessageToTopicOne-out-0.destination=topic-one

O código de exemplo publica mensagens no Pub/Sub. O exemplo faz o seguinte:

  1. Encontra o ID do tópico do Pub/Sub topic-one no destino de vinculação de saída em application.properties.
  2. Usa o nome de vinculação sendMessageToTopicOne-out-0 para encontrar o bean Supplier chamado sendMessageToTopicOne.
  3. Envia uma mensagem numerada para topic-one a cada 10 segundos.