Usa Pub/Sub en aplicaciones de Spring

En esta página, se describe cómo usar Pub/Sub en aplicaciones Java compiladas con el framework Spring.

Spring Cloud GCP tiene varios módulos para enviar mensajes a temas de Pub/Sub y recibir mensajes de suscripciones de Pub/Sub mediante el framework Spring. Puedes usar estos módulos por separado o combinarlos en diferentes casos de uso:

NOTA: La biblioteca de Spring Cloud GCP no proporciona acceso a AckReplyConsumerWithResponse, que es un módulo obligatorio para implementar la función de exactamente una vez con la biblioteca cliente de Java.

Antes de comenzar

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Set up a Google Cloud console project.

    Set up a project

    Click to:

    • Create or select a project.
    • Enable the Pub/Sub API for that project.
    • Create a service account.
    • Download a private key as JSON.

    You can view and manage these resources at any time in the Google Cloud console.

  3. Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again.

  4. Set up a Google Cloud console project.

    Set up a project

    Click to:

    • Create or select a project.
    • Enable the Pub/Sub API for that project.
    • Create a service account.
    • Download a private key as JSON.

    You can view and manage these resources at any time in the Google Cloud console.

  5. Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again.

  6. Establece la variable de entorno GOOGLE_CLOUD_PROJECT en el ID de tu proyecto de Google Cloud.

Usa Spring Cloud GCP Pub/Sub Starter

El inicio de Pub/Sub de Spring Cloud GCP instala la biblioteca cliente de Java de Pub/Sub con el Pub/Sub de Spring Cloud GCP módulo. Puedes llamar a la API de Pub/Sub desde tu aplicación de Spring con las clases que proporciona Spring Cloud Pub/Sub Starter o la biblioteca cliente de Java para Pub/Sub. Si usas las clases que proporciona Spring Cloud GCP Pub/Sub Starter, puedes anular las configuraciones de Pub/Sub predeterminadas.

Instala el módulo

Para instalar el módulo de Spring Cloud GCP Pub/Sub Starter, agrega estas dependencias a tu archivo pom.xml:

  1. La lista de materiales (BoM) de Spring Cloud:

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>com.google.cloud</groupId>
          <artifactId>spring-cloud-gcp-dependencies</artifactId>
          <version>3.7.7</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
  2. El artefacto de Spring Cloud GCP Pub/Sub Starter:

    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
    </dependency>

Operaciones admitidas

El módulo de Spring Cloud GCP Pub/Sub Starter incluye las siguientes clases:

  • PubSubAdmin para operaciones administrativas:
    • Crea temas y suscripciones.
    • Obtén temas y suscripciones.
    • Muestra una lista de temas y suscripciones.
    • Borra temas y suscripciones.
    • Obtén y establece plazos de confirmación en una suscripción.
  • PubSubTemplate para enviar y recibir mensajes:
    • Publica mensajes en temas.
    • Extrae mensajes de las suscripciones de forma síncrona.
    • Extrae mensajes de las suscripciones de forma asíncrona.
    • Confirma recepción de mensajes.
    • Modifica los plazos de confirmación.
    • Convierte mensajes de Pub/Sub en objetos antiguos y sin formato basados en Java (POJO).

Usa adaptadores de canal de integración de Spring

Si tu aplicación de Spring usa canales de mensaje de integración de Spring, puedes enrutar los mensajes entre tus canales de mensajes y Pub/Sub con adaptadores de canal.

Instala los módulos

A fin de instalar módulos para adaptadores de canal de integración de Spring, agrega lo siguiente a tu archivo pom.xml:

  1. La BOM de Spring Cloud GCP.

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>com.google.cloud</groupId>
          <artifactId>spring-cloud-gcp-dependencies</artifactId>
          <version>3.7.7</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
  2. Los artefactos de Spring Cloud GCP Pub/Sub Starter y Spring Integration Core:

    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.integration</groupId>
      <artifactId>spring-integration-core</artifactId>
    </dependency>

Recibe mensajes de Pub/Sub

Para recibir mensajes de una suscripción de Pub/Sub en tu aplicación de Spring, usa un adaptador de canal entrante. El adaptador de canal entrante convierte los mensajes de Pub/Sub entrantes en POJO y, luego, reenvía los POJO a un canal de mensajes.

// Create a message channel for messages arriving from the subscription `sub-one`.
@Bean
public MessageChannel inputMessageChannel() {
  return new PublishSubscribeChannel();
}

// Create an inbound channel adapter to listen to the subscription `sub-one` and send
// messages to the input message channel.
@Bean
public PubSubInboundChannelAdapter inboundChannelAdapter(
    @Qualifier("inputMessageChannel") MessageChannel messageChannel,
    PubSubTemplate pubSubTemplate) {
  PubSubInboundChannelAdapter adapter =
      new PubSubInboundChannelAdapter(pubSubTemplate, "sub-one");
  adapter.setOutputChannel(messageChannel);
  adapter.setAckMode(AckMode.MANUAL);
  adapter.setPayloadType(String.class);
  return adapter;
}

// Define what happens to the messages arriving in the message channel.
@ServiceActivator(inputChannel = "inputMessageChannel")
public void messageReceiver(
    String payload,
    @Header(GcpPubSubHeaders.ORIGINAL_MESSAGE) BasicAcknowledgeablePubsubMessage message) {
  LOGGER.info("Message arrived via an inbound channel adapter from sub-one! Payload: " + payload);
  message.ack();
}

En el ejemplo anterior, se usan los siguientes beans de Spring. y recurso de Pub/Sub:

  • El bean del canal de mensajes llamado inputMessageChannel.
  • Un bean adaptador de canal entrante llamado inboundChannelAdapter de tipo PubSubInboundChannelAdapter.
  • Un ID de suscripción a Pub/Sub con el nombre sub-one.

El inboundChannelAdapter extrae de forma asíncrona los mensajes de sub-one mediante un PubSubTemplate y los envía a inputMessageChannel.

inboundChannelAdapter establece el modo de confirmación en MANUAL para que la aplicación pueda confirmar los mensajes después de procesarlos. El modo de confirmación predeterminado de los tipos PubSubInboundChannelAdapter es AUTO.

El ServiceActivator bean messageReceiver registra cada mensaje que llega a inputMessageChannel al resultado estándar y, luego, confirma el mensaje.

Publica mensajes en Pub/Sub

Para publicar mensajes de un canal de mensajes en un tema de Pub/Sub, usa un adaptador de canal saliente. El adaptador de canal saliente convierte los POJO en mensajes de Pub/Sub y, luego, envía los mensajes a un tema de Pub/Sub.

// Create an outbound channel adapter to send messages from the input message channel to the
// topic `topic-two`.
@Bean
@ServiceActivator(inputChannel = "inputMessageChannel")
public MessageHandler messageSender(PubSubTemplate pubsubTemplate) {
  PubSubMessageHandler adapter = new PubSubMessageHandler(pubsubTemplate, "topic-two");

  adapter.setSuccessCallback(
      ((ackId, message) ->
          LOGGER.info("Message was sent via the outbound channel adapter to topic-two!")));

  adapter.setFailureCallback(
      (cause, message) -> LOGGER.info("Error sending " + message + " due to " + cause));

  return adapter;
}

En el ejemplo anterior, se usan los siguientes beans de Spring y el siguiente recurso de Pub/Sub:

  • El bean del canal de mensajes llamado inputMessageChannel.
  • Un adaptador de canal saliente llamado messageSender, de tipo PubSubMessageHandler,
  • Un ID de tema de Pub/Sub llamado topic-two.

El bean ServiceActivator aplica la lógica en messageSender a cada mensaje en inputMessageChannel.

El PubSubMessageHandler en messageSender publica mensajes en inputMessageChannel con un PubSubTemplate. El PubSubMessageHandler publica mensajes en el tema de Pub/Sub topic-two.

Usa Spring Cloud Stream Binder

Para llamar a la API de Pub/Sub en una aplicación de Spring Cloud Stream, usa el módulo Spring Cloud Pub/Sub Stream Binder.

Instala el módulo

Para instalar el módulo de Spring Cloud Stream Binder, agrega lo siguiente al archivo pom.xml:

  1. La BOM de Spring Cloud GCP.

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>com.google.cloud</groupId>
          <artifactId>spring-cloud-gcp-dependencies</artifactId>
          <version>3.7.7</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
  2. El artefacto de Spring Cloud Stream Binder:

    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>spring-cloud-gcp-pubsub-stream-binder</artifactId>
    </dependency>

Recibe mensajes de Pub/Sub

Para usar tu aplicación como un receptor de eventos, configura el vinculador de entrada especificando lo siguiente:

  • Un bean Consumer que define la lógica del manejo de mensajes. Por ejemplo, el siguiente bean Consumer se llama receiveMessageFromTopicTwo:

    // Create an input binder to receive messages from `topic-two` using a Consumer bean.
    @Bean
    public Consumer<Message<String>> receiveMessageFromTopicTwo() {
      return message -> {
        LOGGER.info(
            "Message arrived via an input binder from topic-two! Payload: " + message.getPayload());
      };
    }
  • Un ID de tema de Pub/Sub en el archivo de configuración application.properties. Por ejemplo, el siguiente archivo de configuración usa un ID de tema de Pub/Sub llamado topic-two:

    # Bind the Pub/Sub topic `topic-two` to the Consumer bean
    # `receiveMessageFromTopicTwo`. Your Spring application will
    # automatically create and attach a subscription to the topic.
    spring.cloud.stream.bindings.receiveMessageFromTopicTwo-in-0.destination=topic-two

En el código de ejemplo, se reciben mensajes de Pub/Sub. En el ejemplo, se realizan las acciones siguientes:

  1. Buscar el ID de tema de Pub/Sub topic-two en el destino de vinculación de entrada en application.properties.
  2. Crear una suscripción a Pub/Sub para topic-two.
  3. Usar el nombre de vinculación receiveMessageFromTopicTwo-in-0 para encontrar el bean Consumer llamado receiveMessageFromTopicTwo.
  4. Imprimir los mensajes entrantes en la salida estándar y reconocerlos automáticamente.

Publica mensajes en Pub/Sub

Si quieres usar tu aplicación como fuente de eventos, configura el vinculador de salida mediante la especificación siguiente:

  • Un bean Supplier que define de dónde provienen los mensajes dentro de tu aplicación. Por ejemplo, el siguiente bean Supplier se llama sendMessageToTopicOne:

    // Create an output binder to send messages to `topic-one` using a Supplier bean.
    @Bean
    public Supplier<Flux<Message<String>>> sendMessageToTopicOne() {
      return () ->
          Flux.<Message<String>>generate(
                  sink -> {
                    try {
                      Thread.sleep(10000);
                    } catch (InterruptedException e) {
                      // Stop sleep earlier.
                    }
    
                    Message<String> message =
                        MessageBuilder.withPayload("message-" + rand.nextInt(1000)).build();
                    LOGGER.info(
                        "Sending a message via the output binder to topic-one! Payload: "
                            + message.getPayload());
                    sink.next(message);
                  })
              .subscribeOn(Schedulers.boundedElastic());
    }
  • Un ID de tema de Pub/Sub en el archivo de configuración application.properties. Por ejemplo, el siguiente archivo de configuración usa un ID de tema de Pub/Sub llamado topic-one:

    # Bind the Supplier bean `sendMessageToTopicOne` to the Pub/Sub topic
    # `topic-one`. If the topic does not exist, one will be created.
    spring.cloud.stream.bindings.sendMessageToTopicOne-out-0.destination=topic-one

El código de ejemplo publica mensajes en Pub/Sub. En el ejemplo, se realizan las acciones siguientes:

  1. Busca el ID de tema de Pub/Sub topic-one en el destino de vinculación de salida en application.properties.
  2. Usa el nombre de vinculación sendMessageToTopicOne-out-0 para encontrar el bean Supplier llamado sendMessageToTopicOne.
  3. Envía un mensaje numerado a topic-one cada 10 segundos.