Utilizzo di Pub/Sub nelle applicazioni Spring

Questa pagina descrive come utilizzare Pub/Sub nelle applicazioni Java create con il framework di primavera.

Spring Cloud GCP dispone di diversi moduli per l'invio agli argomenti Pub/Sub e alla ricezione di messaggi Sottoscrizioni Pub/Sub con framework Spring. Puoi utilizzare la modalità questi moduli singolarmente o combinarli per diversi casi d'uso:

NOTA: la libreria Spring Cloud GCP non fornisce l'accesso a AckReplyConsumerWithResponse, che è un modulo obbligatorio per implementare la funzionalità esattamente una volta utilizzando la libreria client Java.

Prima di iniziare

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Set up a Google Cloud console project.

    Set up a project

    Click to:

    • Create or select a project.
    • Enable the Pub/Sub API for that project.
    • Create a service account.
    • Download a private key as JSON.

    You can view and manage these resources at any time in the Google Cloud console.

  3. Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again.

  4. Set up a Google Cloud console project.

    Set up a project

    Click to:

    • Create or select a project.
    • Enable the Pub/Sub API for that project.
    • Create a service account.
    • Download a private key as JSON.

    You can view and manage these resources at any time in the Google Cloud console.

  5. Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again.

  6. Imposta la variabile di ambiente GOOGLE_CLOUD_PROJECT sul tuo ID progetto Google Cloud.

Utilizzo di Spring Cloud Google Cloud Pub/Sub Starter

Il modulo Spring Cloud GCP Pub/Sub Starter installa la libreria client Java Pub/Sub utilizzando il modulo Spring Cloud GCP Pub/Sub. Puoi chiamare l'API Pub/Sub dalla tua applicazione Spring utilizzando le classi fornite da Pub/Sub Starter di Spring Cloud Cloud libreria client Java di Pub/Sub. Se utilizzi le classi fornite da Spring Cloud GCP Pub/Sub Starter, puoi sostituire le configurazioni Pub/Sub predefinite.

Installazione del modulo

Per installare il modulo Spring Cloud GCP Pub/Sub Starter, aggiungi queste dipendenze al file pom.xml:

  1. La BOM (Bill of Materials) di Spring Cloud:

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>com.google.cloud</groupId>
          <artifactId>spring-cloud-gcp-dependencies</artifactId>
          <version>3.7.7</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
  2. L'artefatto Pub/Sub Starter di Cloud Google Cloud Spring:

    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
    </dependency>

Operazioni supportate

Il modulo Pub/Sub Starter Spring Cloud Google Cloud include le seguenti classi:

  • PubSubAdmin per le operazioni amministrative:
    • Crea argomenti e sottoscrizioni.
    • Visualizza argomenti e abbonamenti.
    • Elenca argomenti e sottoscrizioni.
    • Eliminare argomenti e sottoscrizioni.
    • Ricevi e imposta scadenze per la conferma di un abbonamento.
  • PubSubTemplate per inviare e ricevere messaggi:
    • Pubblicare messaggi negli argomenti.
    • Esegui il pull dei messaggi dalle sottoscrizioni in modo sincrono.
    • Esegui il pull dei messaggi dalle sottoscrizioni in modo asincrono.
    • Conferma i messaggi.
    • Modificare le scadenze per l'invio della conferma.
    • Converti i messaggi Pub/Sub in POJO (Plain Old Java Object).

Utilizzo degli adattatori del canale Spring Integration

Se la tua applicazione Spring utilizza i canali di messaggistica di Spring Integration, puoi instradare i messaggi tra i canali di messaggistica e Pub/Sub utilizzando adattatori di canale.

Installazione dei moduli

Per installare i moduli per gli adattatori di canale Spring Integration, aggiungi quanto segue al file pom.xml:

  1. La distinta base di Spring Cloud Google Cloud.

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>com.google.cloud</groupId>
          <artifactId>spring-cloud-gcp-dependencies</artifactId>
          <version>3.7.7</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
  2. Gli elementi Spring Cloud GCP Pub/Sub Starter e Spring Integration Core:

    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.integration</groupId>
      <artifactId>spring-integration-core</artifactId>
    </dependency>

Ricezione di messaggi da Pub/Sub

Per ricevere messaggi da una sottoscrizione Pub/Sub nell'applicazione Spring, usa un adattatore del canale in entrata. L'adattatore del canale in entrata converte i messaggi Pub/Sub in entrata in POJO e poi li inoltra a un canale di messaggi.

// Create a message channel for messages arriving from the subscription `sub-one`.
@Bean
public MessageChannel inputMessageChannel() {
  return new PublishSubscribeChannel();
}

// Create an inbound channel adapter to listen to the subscription `sub-one` and send
// messages to the input message channel.
@Bean
public PubSubInboundChannelAdapter inboundChannelAdapter(
    @Qualifier("inputMessageChannel") MessageChannel messageChannel,
    PubSubTemplate pubSubTemplate) {
  PubSubInboundChannelAdapter adapter =
      new PubSubInboundChannelAdapter(pubSubTemplate, "sub-one");
  adapter.setOutputChannel(messageChannel);
  adapter.setAckMode(AckMode.MANUAL);
  adapter.setPayloadType(String.class);
  return adapter;
}

// Define what happens to the messages arriving in the message channel.
@ServiceActivator(inputChannel = "inputMessageChannel")
public void messageReceiver(
    String payload,
    @Header(GcpPubSubHeaders.ORIGINAL_MESSAGE) BasicAcknowledgeablePubsubMessage message) {
  LOGGER.info("Message arrived via an inbound channel adapter from sub-one! Payload: " + payload);
  message.ack();
}

L'esempio precedente utilizza i seguenti fagioli primaverili e la risorsa Pub/Sub:

  • Un bean di canale di messaggi denominato inputMessageChannel.
  • Un bean dell'adattatore del canale in entrata denominato inboundChannelAdapter di tipo PubSubInboundChannelAdapter.
  • Un ID sottoscrizione Pub/Sub denominato sub-one.

inboundChannelAdapter esegue il pull dei messaggi in modo asincrono da sub-one utilizzando un PubSubTemplate e invia i messaggi a inputMessageChannel.

inboundChannelAdapter imposta la modalità di conferma su MANUAL in modo che l'applicazione possa confermare i messaggi dopo averli elaborati. La modalità di conferma predefinita dei tipi PubSubInboundChannelAdapter è AUTO.

La ServiceActivator bean messageReceiver registra ogni messaggio in arrivo inputMessageChannel all'output standard e quindi conferma il messaggio.

Pubblicare messaggi in Pub/Sub

Per pubblicare messaggi da un canale di messaggi in un argomento Pub/Sub, un adattatore di canale in uscita. L'adattatore del canale in uscita converte i POJO ai messaggi Pub/Sub e li invia a un Pub/Sub.

// Create an outbound channel adapter to send messages from the input message channel to the
// topic `topic-two`.
@Bean
@ServiceActivator(inputChannel = "inputMessageChannel")
public MessageHandler messageSender(PubSubTemplate pubsubTemplate) {
  PubSubMessageHandler adapter = new PubSubMessageHandler(pubsubTemplate, "topic-two");

  adapter.setSuccessCallback(
      ((ackId, message) ->
          LOGGER.info("Message was sent via the outbound channel adapter to topic-two!")));

  adapter.setFailureCallback(
      (cause, message) -> LOGGER.info("Error sending " + message + " due to " + cause));

  return adapter;
}

L'esempio precedente utilizza i seguenti bean Spring e la risorsa Pub/Sub:

  • Un bean del canale di messaggi denominato inputMessageChannel.
  • Un bean di adattatori del canale in uscita denominato messageSender di tipo PubSubMessageHandler
  • Un ID argomento Pub/Sub denominato topic-two.

Il bean ServiceActivator applica la logica in messageSender a ogni messaggio in inputMessageChannel.

PubSubMessageHandler in messageSender pubblica messaggi in inputMessageChannel utilizzando un PubSubTemplate. PubSubMessageHandler pubblica messaggi nell'argomento Pub/Sub topic-two.

Utilizzo del binder Spring Cloud Stream

Per chiamare l'API Pub/Sub in un l'applicazione Spring Cloud Stream, usa il modulo Spring Cloud Google Cloud Pub/Sub Stream Binder.

Installazione del modulo

Per installare il modulo Spring Cloud Stream Binder, aggiungi quanto segue al file pom.xml:

  1. Il BOM di Spring Cloud GCP.

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>com.google.cloud</groupId>
          <artifactId>spring-cloud-gcp-dependencies</artifactId>
          <version>3.7.7</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
  2. L'artifact Spring Cloud Stream Binder:

    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>spring-cloud-gcp-pubsub-stream-binder</artifactId>
    </dependency>

Ricezione di messaggi da Pub/Sub

Per utilizzare l'applicazione come sink di eventi, configura il binder di input specificando quanto segue:

  • Un bean Consumer che definisce la logica di gestione dei messaggi. Ad esempio: il seguente fagiolo Consumer si chiama receiveMessageFromTopicTwo:

    // Create an input binder to receive messages from `topic-two` using a Consumer bean.
    @Bean
    public Consumer<Message<String>> receiveMessageFromTopicTwo() {
      return message -> {
        LOGGER.info(
            "Message arrived via an input binder from topic-two! Payload: " + message.getPayload());
      };
    }
  • Un ID argomento Pub/Sub nel file di configurazione application.properties. Ad esempio, il seguente file di configurazione utilizza un ID argomento Pub/Sub denominato topic-two:

    # Bind the Pub/Sub topic `topic-two` to the Consumer bean
    # `receiveMessageFromTopicTwo`. Your Spring application will
    # automatically create and attach a subscription to the topic.
    spring.cloud.stream.bindings.receiveMessageFromTopicTwo-in-0.destination=topic-two

Il codice di esempio riceve messaggi da Pub/Sub. L'esempio esegue le seguenti operazioni:

  1. Trova l'ID argomento Pub/Sub topic-two nell'associazione di input destinazione in application.properties.
  2. Crea una sottoscrizione Pub/Sub a topic-two.
  3. Utilizza il nome del vincolo receiveMessageFromTopicTwo-in-0 per trovare il bean Consumer denominato receiveMessageFromTopicTwo.
  4. Stampa i messaggi in arrivo sull'output standard e automaticamente e li riconosce.

Pubblicazione di messaggi in Pub/Sub

Per utilizzare la tua applicazione come origine evento, configura binder di output specificando quanto segue:

  • Un bean Supplier che definisce la provenienza dei messaggi all'interno un'applicazione. Ad esempio, il seguente bean Supplier è denominato sendMessageToTopicOne:

    // Create an output binder to send messages to `topic-one` using a Supplier bean.
    @Bean
    public Supplier<Flux<Message<String>>> sendMessageToTopicOne() {
      return () ->
          Flux.<Message<String>>generate(
                  sink -> {
                    try {
                      Thread.sleep(10000);
                    } catch (InterruptedException e) {
                      // Stop sleep earlier.
                    }
    
                    Message<String> message =
                        MessageBuilder.withPayload("message-" + rand.nextInt(1000)).build();
                    LOGGER.info(
                        "Sending a message via the output binder to topic-one! Payload: "
                            + message.getPayload());
                    sink.next(message);
                  })
              .subscribeOn(Schedulers.boundedElastic());
    }
  • Un ID argomento Pub/Sub nel file di configurazione application.properties. Ad esempio, il seguente file di configurazione utilizza un ID argomento Pub/Sub denominato topic-one:

    # Bind the Supplier bean `sendMessageToTopicOne` to the Pub/Sub topic
    # `topic-one`. If the topic does not exist, one will be created.
    spring.cloud.stream.bindings.sendMessageToTopicOne-out-0.destination=topic-one

Il codice di esempio pubblica messaggi in Pub/Sub. Nell'esempio:

  1. Trova l'ID argomento Pub/Sub topic-one nella destinazione della associazione di output in application.properties.
  2. Utilizza il nome del vincolo sendMessageToTopicOne-out-0 per trovare il bean Supplier denominato sendMessageToTopicOne.
  3. Invia un messaggio numerato a topic-one ogni 10 secondi.