Utilizzo di Pub/Sub nelle applicazioni Spring

Questa pagina descrive come utilizzare Pub/Sub nelle applicazioni Java create con il framework Spring.

Spring Cloud GCP dispone di diversi moduli per inviare messaggi ad argomenti Pub/Sub e ricevere messaggi da sottoscrizioni Pub/Sub utilizzando il framework Spring. Puoi utilizzare questi moduli in modo indipendente o combinarli per casi d'uso diversi:

NOTA: la libreria Spring Cloud GCP non fornisce l'accesso ad AckReplyConsumerWithResponse, che è un modulo obbligatorio per implementare la funzionalità "exactly-once" utilizzando la libreria client Java.

Prima di iniziare

  1. Accedi al tuo account Google Cloud. Se non conosci Google Cloud, crea un account per valutare le prestazioni dei nostri prodotti in scenari reali. I nuovi clienti ricevono anche 300 $di crediti gratuiti per l'esecuzione, il test e il deployment dei carichi di lavoro.
  2. Configura un progetto nella console.

    Configurare un progetto

    Fai clic per:

    • Crea o seleziona un progetto.
    • Abilita l'API Pub/Sub per quel progetto.
    • Creare un account di servizio.
    • Scarica una chiave privata come JSON.

    Puoi visualizzare e gestire queste risorse in qualsiasi momento nella console.

  3. Imposta la variabile di ambiente GOOGLE_APPLICATION_CREDENTIALS sul percorso del file JSON che contiene le tue credenziali. Questa variabile si applica solo alla sessione di shell attuale. Pertanto, se apri una nuova sessione, imposta di nuovo la variabile.

  4. Configura un progetto nella console.

    Configurare un progetto

    Fai clic per:

    • Crea o seleziona un progetto.
    • Abilita l'API Pub/Sub per quel progetto.
    • Creare un account di servizio.
    • Scarica una chiave privata come JSON.

    Puoi visualizzare e gestire queste risorse in qualsiasi momento nella console.

  5. Imposta la variabile di ambiente GOOGLE_APPLICATION_CREDENTIALS sul percorso del file JSON che contiene le tue credenziali. Questa variabile si applica solo alla sessione di shell attuale. Pertanto, se apri una nuova sessione, imposta di nuovo la variabile.

  6. Imposta la variabile di ambiente GOOGLE_CLOUD_PROJECT sul tuo ID progetto Google Cloud.

Utilizzo di Spring Cloud GCP Pub/Sub Starter

Il modulo Spring Cloud GCP Pub/Sub Starter installa la libreria client Java di Pub/Sub utilizzando il modulo Spring Cloud GCP Pub/Sub. Puoi chiamare l'API Pub/Sub dall'applicazione Spring utilizzando le classi fornite da Spring Cloud Google Cloud Pub/Sub Starter o la libreria client Java di Pub/Sub. Se utilizzi le classi fornite da Spring Cloud GCP Pub/Sub Starter, puoi eseguire l'override delle configurazioni Pub/Sub predefinite.

Installazione del modulo

Per installare il modulo Spring Cloud Google Cloud Pub/Sub Starter, aggiungi queste dipendenze al file pom.xml:

  1. La distinta base di Spring Cloud (BOM):

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>com.google.cloud</groupId>
          <artifactId>spring-cloud-gcp-dependencies</artifactId>
          <version>3.7.7</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
  2. L'artefatto Spring Cloud GCP Pub/Sub Starter:

    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
    </dependency>

Operazioni supportate

Il modulo Spring Cloud Google Cloud Pub/Sub Starter include le seguenti classi:

  • PubSubAdmin per le operazioni amministrative:
    • Creare argomenti e sottoscrizioni.
    • Visualizza argomenti e sottoscrizioni.
    • Elenca argomenti e sottoscrizioni.
    • Eliminare argomenti e sottoscrizioni.
    • Stabilisci e imposta scadenze per la conferma di un abbonamento.
  • PubSubTemplate per inviare e ricevere messaggi:
    • Pubblicare messaggi negli argomenti.
    • Esegui il pull dei messaggi in modo sincrono dalle sottoscrizioni.
    • Esegui il pull in modo asincrono dei messaggi dalle sottoscrizioni.
    • Conferma i messaggi.
    • Modificare le scadenze di conferma.
    • Converti i messaggi Pub/Sub in oggetti POJO (normali vecchi oggetti Java).

Utilizzo degli adattatori del canale di integrazione di Spring

Se la tua applicazione Spring utilizza canali di messaggi di integrazione Spring, puoi instradare i messaggi tra i tuoi canali di messaggistica e Pub/Sub utilizzando gli adattatori del canale.

Installazione dei moduli

Per installare i moduli per gli adattatori del canale di integrazione di Spring, aggiungi quanto segue al file pom.xml:

  1. La distinta base di Spring Cloud Google Cloud.

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>com.google.cloud</groupId>
          <artifactId>spring-cloud-gcp-dependencies</artifactId>
          <version>3.7.7</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
  2. Gli artefatti Pub/Sub Starter e Spring Integration Core di Spring Cloud GCP:

    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.integration</groupId>
      <artifactId>spring-integration-core</artifactId>
    </dependency>

Ricezione di messaggi da Pub/Sub

Per ricevere messaggi da una sottoscrizione Pub/Sub nell'applicazione Spring, utilizza un adattatore per il canale in entrata. L'adattatore per il canale in entrata converte i messaggi Pub/Sub in arrivo in POJO e quindi inoltra i POJO a un canale di messaggi.

// Create a message channel for messages arriving from the subscription `sub-one`.
@Bean
public MessageChannel inputMessageChannel() {
  return new PublishSubscribeChannel();
}

// Create an inbound channel adapter to listen to the subscription `sub-one` and send
// messages to the input message channel.
@Bean
public PubSubInboundChannelAdapter inboundChannelAdapter(
    @Qualifier("inputMessageChannel") MessageChannel messageChannel,
    PubSubTemplate pubSubTemplate) {
  PubSubInboundChannelAdapter adapter =
      new PubSubInboundChannelAdapter(pubSubTemplate, "sub-one");
  adapter.setOutputChannel(messageChannel);
  adapter.setAckMode(AckMode.MANUAL);
  adapter.setPayloadType(String.class);
  return adapter;
}

// Define what happens to the messages arriving in the message channel.
@ServiceActivator(inputChannel = "inputMessageChannel")
public void messageReceiver(
    String payload,
    @Header(GcpPubSubHeaders.ORIGINAL_MESSAGE) BasicAcknowledgeablePubsubMessage message) {
  LOGGER.info("Message arrived via an inbound channel adapter from sub-one! Payload: " + payload);
  message.ack();
}

L'esempio riportato sopra utilizza la seguente risorsa bean e Pub/Sub Spring:

  • Un channel bean denominato inputMessageChannel.
  • Un bean adattatore di canale in entrata denominato inboundChannelAdapter di tipo PubSubInboundChannelAdapter.
  • Un ID sottoscrizione Pub/Sub denominato sub-one.

L'inboundChannelAdapter estrae in modo asincrono i messaggi da sub-one utilizzando un PubSubTemplate e li invia a inputMessageChannel.

inboundChannelAdapter imposta la modalità di accettazione su MANUAL in modo che l'applicazione possa confermare i messaggi dopo averli elaborati. La modalità di conferma predefinita dei tipi di PubSubInboundChannelAdapter è AUTO.

Il bean ServiceActivator messageReceiver registra ogni messaggio che arriva in inputMessageChannel nell'output standard e quindi conferma il messaggio.

Pubblicazione di messaggi in Pub/Sub

Per pubblicare messaggi da un canale di messaggi in un argomento Pub/Sub, utilizza un adattatore per il canale in uscita. L'adattatore del canale in uscita converte i POJO in messaggi Pub/Sub e quindi li invia a un argomento Pub/Sub.

// Create an outbound channel adapter to send messages from the input message channel to the
// topic `topic-two`.
@Bean
@ServiceActivator(inputChannel = "inputMessageChannel")
public MessageHandler messageSender(PubSubTemplate pubsubTemplate) {
  PubSubMessageHandler adapter = new PubSubMessageHandler(pubsubTemplate, "topic-two");

  adapter.setSuccessCallback(
      ((ackId, message) ->
          LOGGER.info("Message was sent via the outbound channel adapter to topic-two!")));

  adapter.setFailureCallback(
      (cause, message) -> LOGGER.info("Error sending " + message + " due to " + cause));

  return adapter;
}

L'esempio precedente utilizza i seguenti bean Spring e la seguente risorsa Pub/Sub:

  • Un channel bean denominato inputMessageChannel.
  • Un adattatore di canale in uscita denominato messageSender di tipo PubSubMessageHandler.
  • Un ID argomento Pub/Sub denominato topic-two.

Il bean ServiceActivator applica la logica in messageSender a ogni messaggio in inputMessageChannel.

PubSubMessageHandler in messageSender pubblica messaggi in inputMessageChannel utilizzando un PubSubTemplate. PubSubMessageHandler pubblica messaggi nell'argomento Pub/Sub topic-two.

Utilizzo di Spring Cloud Stream Binder

Per chiamare l'API Pub/Sub in un'applicazione Spring Cloud Stream, utilizza il modulo Binnder stream Pub/Sub di Cloud GCP.

Installazione del modulo

Per installare il modulo Spring Cloud Stream Binder, aggiungi quanto segue al file pom.xml:

  1. La distinta base di Spring Cloud Google Cloud.

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>com.google.cloud</groupId>
          <artifactId>spring-cloud-gcp-dependencies</artifactId>
          <version>3.7.7</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
  2. L'artefatto di Spring Cloud Stream Binder:

    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>spring-cloud-gcp-pubsub-stream-binder</artifactId>
    </dependency>

Ricezione di messaggi da Pub/Sub

Per utilizzare la tua applicazione come sink di eventi, configura il binder di input specificando quanto segue:

  • Un bean Consumer che definisce la logica di gestione dei messaggi. Ad esempio, il seguente bean Consumer è denominato receiveMessageFromTopicTwo:

    // Create an input binder to receive messages from `topic-two` using a Consumer bean.
    @Bean
    public Consumer<Message<String>> receiveMessageFromTopicTwo() {
      return message -> {
        LOGGER.info(
            "Message arrived via an input binder from topic-two! Payload: " + message.getPayload());
      };
    }
  • Un ID argomento Pub/Sub nel file di configurazione application.properties. Ad esempio, il seguente file di configurazione utilizza un ID argomento Pub/Sub denominato topic-two:

    # Bind the Pub/Sub topic `topic-two` to the Consumer bean
    # `receiveMessageFromTopicTwo`. Your Spring application will
    # automatically create and attach a subscription to the topic.
    spring.cloud.stream.bindings.receiveMessageFromTopicTwo-in-0.destination=topic-two

Il codice di esempio riceve messaggi da Pub/Sub. Nell'esempio:

  1. Trova l'ID argomento Pub/Sub topic-two nella destinazione dell'associazione di input in application.properties.
  2. Crea una sottoscrizione Pub/Sub per topic-two.
  3. Utilizza il nome dell'associazione receiveMessageFromTopicTwo-in-0 per trovare il bean Consumer denominato receiveMessageFromTopicTwo.
  4. Stampa i messaggi in arrivo nell'output standard e li conferma automaticamente.

Pubblicazione di messaggi in Pub/Sub

Per utilizzare l'applicazione come origine evento, configura il binder di output specificando quanto segue:

  • Un bean Supplier che definisce la provenienza dei messaggi all'interno dell'applicazione. Ad esempio, il seguente bean Supplier è denominato sendMessageToTopicOne:

    // Create an output binder to send messages to `topic-one` using a Supplier bean.
    @Bean
    public Supplier<Flux<Message<String>>> sendMessageToTopicOne() {
      return () ->
          Flux.<Message<String>>generate(
                  sink -> {
                    try {
                      Thread.sleep(10000);
                    } catch (InterruptedException e) {
                      // Stop sleep earlier.
                    }
    
                    Message<String> message =
                        MessageBuilder.withPayload("message-" + rand.nextInt(1000)).build();
                    LOGGER.info(
                        "Sending a message via the output binder to topic-one! Payload: "
                            + message.getPayload());
                    sink.next(message);
                  })
              .subscribeOn(Schedulers.boundedElastic());
    }
  • Un ID argomento Pub/Sub nel file di configurazione application.properties. Ad esempio, il seguente file di configurazione utilizza un ID argomento Pub/Sub denominato topic-one:

    # Bind the Supplier bean `sendMessageToTopicOne` to the Pub/Sub topic
    # `topic-one`. If the topic does not exist, one will be created.
    spring.cloud.stream.bindings.sendMessageToTopicOne-out-0.destination=topic-one

Il codice di esempio pubblica messaggi in Pub/Sub. Nell'esempio:

  1. Trova l'ID argomento Pub/Sub topic-one nella destinazione dell'associazione di output in application.properties.
  2. Utilizza il nome dell'associazione sendMessageToTopicOne-out-0 per trovare il bean Supplier denominato sendMessageToTopicOne.
  3. Invia un messaggio numerato a topic-one ogni 10 secondi.