Utilizzo di Pub/Sub nelle applicazioni Spring

Questa pagina descrive come utilizzare Pub/Sub nelle applicazioni Java create con il Spring Framework.

Spring Cloud GCP dispone di diversi moduli per inviare messaggi agli argomenti Pub/Sub e ricevere messaggi dalle sottoscrizioni Pub/Sub utilizzando Spring Framework. Puoi usare questi moduli indipendentemente o combinarli per diversi casi d'uso:

NOTA: la libreria Spring Cloud GCP non fornisce l'accesso a AckReplyConsumerWithResponse, che è un modulo obbligatorio per implementare la funzionalità esattamente una volta utilizzando la libreria client Java.

Prima di iniziare

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Set up a Google Cloud console project.

    Set up a project

    Click to:

    • Create or select a project.
    • Enable the Pub/Sub API for that project.
    • Create a service account.
    • Download a private key as JSON.

    You can view and manage these resources at any time in the Google Cloud console.

  3. Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again.

  4. Set up a Google Cloud console project.

    Set up a project

    Click to:

    • Create or select a project.
    • Enable the Pub/Sub API for that project.
    • Create a service account.
    • Download a private key as JSON.

    You can view and manage these resources at any time in the Google Cloud console.

  5. Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again.

  6. Imposta la variabile di ambiente GOOGLE_CLOUD_PROJECT sul tuo ID progetto Google Cloud.

Utilizzo di Spring Cloud GCP Pub/Sub Starter

Il modulo Spring Cloud GCP Pub/Sub Starter installa la libreria client Java Pub/Sub utilizzando il modulo Spring Cloud GCP Pub/Sub. Puoi chiamare l'API Pub/Sub dalla tua applicazione Spring utilizzando le classi fornite da Spring Cloud GCP Pub/Sub Starter o la libreria client Java Pub/Sub. Se utilizzi le classi fornite da Spring Cloud GCP Pub/Sub Starter, puoi sostituire le configurazioni Pub/Sub predefinite.

Installazione del modulo

Per installare il modulo Spring Cloud GCP Pub/Sub Starter, aggiungi queste dipendenze al file pom.xml:

  1. La BOM (Bill of Materials) di Spring Cloud:

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>com.google.cloud</groupId>
          <artifactId>spring-cloud-gcp-dependencies</artifactId>
          <version>3.7.7</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
  2. L'elemento Spring Cloud GCP Pub/Sub Starter:

    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
    </dependency>

Operazioni supportate

Il modulo Spring Cloud GCP Pub/Sub Starter include le seguenti classi:

  • PubSubAdmin per le operazioni amministrative:
    • Crea argomenti e sottoscrizioni.
    • Ricevi argomenti e sottoscrizioni.
    • Elenca argomenti e sottoscrizioni.
    • Eliminare argomenti e sottoscrizioni.
    • Ottenere e impostare le scadenze per le conferme su una sottoscrizione.
  • PubSubTemplate per l'invio e la ricezione di messaggi:
    • Pubblicare messaggi negli argomenti.
    • Esegui il pull dei messaggi dalle sottoscrizioni in modo sincrono.
    • Esegui il pull dei messaggi dalle sottoscrizioni in modo asincrono.
    • Conferma i messaggi.
    • Modificare le scadenze per l'invio della conferma.
    • Converti i messaggi Pub/Sub in POJO (Plain Old Java Object).

Utilizzo degli adattatori del canale Spring Integration

Se la tua applicazione Spring utilizza i canali di messaggistica di Spring Integration, puoi instradare i messaggi tra i canali di messaggistica e Pub/Sub utilizzando adattatori di canale.

Installazione dei moduli

Per installare i moduli per gli adattatori di canale Spring Integration, aggiungi quanto segue al file pom.xml:

  1. Il BOM di Spring Cloud GCP.

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>com.google.cloud</groupId>
          <artifactId>spring-cloud-gcp-dependencies</artifactId>
          <version>3.7.7</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
  2. Gli elementi Spring Cloud GCP Pub/Sub Starter e Spring Integration Core:

    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.integration</groupId>
      <artifactId>spring-integration-core</artifactId>
    </dependency>

Ricezione di messaggi da Pub/Sub

Per ricevere messaggi da un abbonamento Pub/Sub nella tua applicazione Spring, utilizza un adattatore del canale in entrata. L'adattatore del canale in entrata converte i messaggi Pub/Sub in entrata in POJO e poi li inoltra a un canale di messaggi.

// Create a message channel for messages arriving from the subscription `sub-one`.
@Bean
public MessageChannel inputMessageChannel() {
  return new PublishSubscribeChannel();
}

// Create an inbound channel adapter to listen to the subscription `sub-one` and send
// messages to the input message channel.
@Bean
public PubSubInboundChannelAdapter inboundChannelAdapter(
    @Qualifier("inputMessageChannel") MessageChannel messageChannel,
    PubSubTemplate pubSubTemplate) {
  PubSubInboundChannelAdapter adapter =
      new PubSubInboundChannelAdapter(pubSubTemplate, "sub-one");
  adapter.setOutputChannel(messageChannel);
  adapter.setAckMode(AckMode.MANUAL);
  adapter.setPayloadType(String.class);
  return adapter;
}

// Define what happens to the messages arriving in the message channel.
@ServiceActivator(inputChannel = "inputMessageChannel")
public void messageReceiver(
    String payload,
    @Header(GcpPubSubHeaders.ORIGINAL_MESSAGE) BasicAcknowledgeablePubsubMessage message) {
  LOGGER.info("Message arrived via an inbound channel adapter from sub-one! Payload: " + payload);
  message.ack();
}

L'esempio riportato sopra utilizza i seguenti bean Spring e la risorsa Pub/Sub:

  • Un bean del canale di messaggi denominato inputMessageChannel.
  • Un bean dell'adattatore del canale in entrata denominato inboundChannelAdapter di tipo PubSubInboundChannelAdapter.
  • Un ID sottoscrizione Pub/Sub denominato sub-one.

inboundChannelAdapter estrae in modo asincrono i messaggi da sub-one utilizzando un PubSubTemplate e li invia a inputMessageChannel.

inboundChannelAdapter imposta la modalità di conferma su MANUAL in modo che l'applicazione possa confermare i messaggi dopo averli elaborati. La modalità di conferma predefinita dei tipi PubSubInboundChannelAdapter è AUTO.

Il bean messageReceiver ServiceActivator registra ogni messaggio in arrivo in inputMessageChannel nell'output standard e poi lo conferma.

Pubblicazione di messaggi in Pub/Sub

Per pubblicare messaggi da un canale di messaggi a un argomento Pub/Sub, utilizza un adattatore del canale in uscita. L'adattatore del canale in uscita converte i POJO in messaggi Pub/Sub e poi li invia a un argomento Pub/Sub.

// Create an outbound channel adapter to send messages from the input message channel to the
// topic `topic-two`.
@Bean
@ServiceActivator(inputChannel = "inputMessageChannel")
public MessageHandler messageSender(PubSubTemplate pubsubTemplate) {
  PubSubMessageHandler adapter = new PubSubMessageHandler(pubsubTemplate, "topic-two");

  adapter.setSuccessCallback(
      ((ackId, message) ->
          LOGGER.info("Message was sent via the outbound channel adapter to topic-two!")));

  adapter.setFailureCallback(
      (cause, message) -> LOGGER.info("Error sending " + message + " due to " + cause));

  return adapter;
}

L'esempio riportato sopra utilizza i seguenti bean Spring e la risorsa Pub/Sub:

  • Un bean del canale di messaggi denominato inputMessageChannel.
  • Un bean di adattatore di canale in uscita denominato messageSender di tipo PubSubMessageHandler.
  • Un ID argomento Pub/Sub denominato topic-two.

Il bean ServiceActivator applica la logica in messageSender a ogni messaggio in inputMessageChannel.

PubSubMessageHandler in messageSender pubblica messaggi in inputMessageChannel utilizzando un PubSubTemplate. PubSubMessageHandler pubblica messaggi nell'argomento Pub/Sub topic-two.

Utilizzo del binder Spring Cloud Stream

Per chiamare l'API Pub/Sub in un'applicazione Spring Cloud Stream, utilizza il modulo Spring Cloud GCP Pub/Sub Stream Binder.

Installazione del modulo

Per installare il modulo Spring Cloud Stream Binder, aggiungi quanto segue al file pom.xml:

  1. Il BOM di Spring Cloud GCP.

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>com.google.cloud</groupId>
          <artifactId>spring-cloud-gcp-dependencies</artifactId>
          <version>3.7.7</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
  2. L'artifact Spring Cloud Stream Binder:

    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>spring-cloud-gcp-pubsub-stream-binder</artifactId>
    </dependency>

Ricezione di messaggi da Pub/Sub

Per utilizzare l'applicazione come destinazione eventi, configura il binder di input specificando quanto segue:

  • Un bean Consumer che definisce la logica di gestione dei messaggi. Ad esempio, il seguente bean Consumer è denominato receiveMessageFromTopicTwo:

    // Create an input binder to receive messages from `topic-two` using a Consumer bean.
    @Bean
    public Consumer<Message<String>> receiveMessageFromTopicTwo() {
      return message -> {
        LOGGER.info(
            "Message arrived via an input binder from topic-two! Payload: " + message.getPayload());
      };
    }
  • Un ID argomento Pub/Sub nel file di configurazioneapplication.properties. Ad esempio, il seguente file di configurazione utilizza un ID argomento Pub/Sub denominato topic-two:

    # Bind the Pub/Sub topic `topic-two` to the Consumer bean
    # `receiveMessageFromTopicTwo`. Your Spring application will
    # automatically create and attach a subscription to the topic.
    spring.cloud.stream.bindings.receiveMessageFromTopicTwo-in-0.destination=topic-two

Il codice di esempio riceve messaggi da Pub/Sub. L'esempio esegue le seguenti operazioni:

  1. Trova l'ID argomento Pub/Sub topic-two nella destinazione della associazione di input in application.properties.
  2. Crea una sottoscrizione Pub/Sub a topic-two.
  3. Utilizza il nome del vincolo receiveMessageFromTopicTwo-in-0 per trovare il bean Consumer denominato receiveMessageFromTopicTwo.
  4. Stampa i messaggi in arrivo nell'output standard e li conferma automaticamente.

Pubblicazione di messaggi in Pub/Sub

Per utilizzare l'applicazione come origine evento, configura il binder di output specificando quanto segue:

  • Un bean Supplier che definisce la provenienza dei messaggi all'interno della tua applicazione. Ad esempio, il seguente bean Supplier è denominato sendMessageToTopicOne:

    // Create an output binder to send messages to `topic-one` using a Supplier bean.
    @Bean
    public Supplier<Flux<Message<String>>> sendMessageToTopicOne() {
      return () ->
          Flux.<Message<String>>generate(
                  sink -> {
                    try {
                      Thread.sleep(10000);
                    } catch (InterruptedException e) {
                      // Stop sleep earlier.
                    }
    
                    Message<String> message =
                        MessageBuilder.withPayload("message-" + rand.nextInt(1000)).build();
                    LOGGER.info(
                        "Sending a message via the output binder to topic-one! Payload: "
                            + message.getPayload());
                    sink.next(message);
                  })
              .subscribeOn(Schedulers.boundedElastic());
    }
  • Un ID argomento Pub/Sub nel file di configurazione application.properties. Ad esempio, il seguente file di configurazione utilizza un ID argomento Pub/Sub denominato topic-one:

    # Bind the Supplier bean `sendMessageToTopicOne` to the Pub/Sub topic
    # `topic-one`. If the topic does not exist, one will be created.
    spring.cloud.stream.bindings.sendMessageToTopicOne-out-0.destination=topic-one

Il codice di esempio pubblica messaggi in Pub/Sub. L'esempio esegue le seguenti operazioni:

  1. Trova l'ID argomento Pub/Sub topic-one nella destinazione della associazione di output in application.properties.
  2. Utilizza il nome del vincolo sendMessageToTopicOne-out-0 per trovare il bean Supplier denominato sendMessageToTopicOne.
  3. Invia un messaggio numerato a topic-one ogni 10 secondi.