Utiliser Pub/Sub dans les+ applications Spring

Cette page explique comment utiliser Pub/Sub dans des applications Java créées avec le framework Spring.

Spring Cloud GCP comporte plusieurs modules pour envoyer des messages aux sujets Pub/Sub et recevoir des messages depuis des abonnements Pub/Sub à l'aide du framework Spring. Vous pouvez utiliser ces modules indépendamment ou les combiner pour différents cas d'utilisation :

REMARQUE: La bibliothèque Spring Cloud GCP n'offre pas d'accès à AckReplyConsumerWithResponse, qui est un module requis pour implémenter la fonctionnalité "exactly-once" à l'aide de la bibliothèque cliente Java.

Avant de commencer

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Set up a Google Cloud console project.

    Set up a project

    Click to:

    • Create or select a project.
    • Enable the Pub/Sub API for that project.
    • Create a service account.
    • Download a private key as JSON.

    You can view and manage these resources at any time in the Google Cloud console.

  3. Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again.

  4. Set up a Google Cloud console project.

    Set up a project

    Click to:

    • Create or select a project.
    • Enable the Pub/Sub API for that project.
    • Create a service account.
    • Download a private key as JSON.

    You can view and manage these resources at any time in the Google Cloud console.

  5. Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again.

  6. Définissez la variable d'environnement GOOGLE_CLOUD_PROJECT sur l'ID de votre projet Google Cloud.

Utiliser Spring Cloud GCP Pub/Sub Starter

Le module Spring Cloud GCP Pub/Sub Starter installe la bibliothèque cliente Java pour Pub/Sub à l'aide du module Spring Cloud GCP Pub/Sub. Vous pouvez appeler l'API Pub/Sub à partir de votre application Spring à l'aide des classes fournies par Spring Cloud GCP Pub/Sub Starter, ou de la bibliothèque cliente Java pour Pub/Sub. Si vous utilisez les classes fournies par le module Spring Cloud GCP Pub/Sub Starter, vous pouvez remplacer les configurations Pub/Sub par défaut.

Installer le module

Pour installer le module Spring Cloud GCP Pub/Sub Starter, ajoutez les dépendances suivantes à votre fichier pom.xml :

  1. La nomenclature BOM Spring Cloud:

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>com.google.cloud</groupId>
          <artifactId>spring-cloud-gcp-dependencies</artifactId>
          <version>3.7.7</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
  2. L'artefact Spring Cloud GCP Pub/Sub Starter :

    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
    </dependency>

Opérations compatibles

Le module Spring Cloud GCP de Pub/Sub inclut les classes suivantes :

  • PubSubAdmin pour les opérations d'administration :
    • Créer des sujets et des abonnements.
    • Obtenir des sujets et des abonnements.
    • Répertorier les sujets et les abonnements.
    • Supprimer des sujets et des abonnements.
    • Obtenir et définir les délais de confirmation d'un abonnement.
  • PubSubTemplate pour l'envoi et la réception des messages :
    • Publier des messages sur des sujets.
    • Extraire de manière synchrone les messages des abonnements.
    • Extraire de manière asynchrone les messages des abonnements.
    • Confirmer des messages.
    • Modifier les délais de confirmation.
    • Convertir les messages Pub/Sub en POJO (Plain Old Java Objects, anciens objets Java standards).

Utiliser les adaptateurs de canaux d'intégration Spring

Si votre application Spring utilise des canaux de message d'intégration Spring, vous pouvez acheminer les messages entre vos canaux de message et Pub/Sub à l'aide d'adaptateurs de canaux.

Installer les modules

Pour installer des modules pour les adaptateurs de canaux d'intégration Spring, ajoutez les éléments suivants à votre fichier pom.xml :

  1. La nomenclature BOM Spring Cloud GCP.

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>com.google.cloud</groupId>
          <artifactId>spring-cloud-gcp-dependencies</artifactId>
          <version>3.7.7</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
  2. Les artefacts Spring Cloud Pub/Sub Starter et Spring Integration Core :

    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.integration</groupId>
      <artifactId>spring-integration-core</artifactId>
    </dependency>

Recevoir des messages à partir de Pub/Sub

Pour recevoir des messages d'un abonnement Pub/Sub dans votre application Spring, utilisez un adaptateur de canal entrant. L'adaptateur de canal entrant convertit les messages Pub/Sub entrants en POJO, puis transfère ces POJO à un canal de message.

// Create a message channel for messages arriving from the subscription `sub-one`.
@Bean
public MessageChannel inputMessageChannel() {
  return new PublishSubscribeChannel();
}

// Create an inbound channel adapter to listen to the subscription `sub-one` and send
// messages to the input message channel.
@Bean
public PubSubInboundChannelAdapter inboundChannelAdapter(
    @Qualifier("inputMessageChannel") MessageChannel messageChannel,
    PubSubTemplate pubSubTemplate) {
  PubSubInboundChannelAdapter adapter =
      new PubSubInboundChannelAdapter(pubSubTemplate, "sub-one");
  adapter.setOutputChannel(messageChannel);
  adapter.setAckMode(AckMode.MANUAL);
  adapter.setPayloadType(String.class);
  return adapter;
}

// Define what happens to the messages arriving in the message channel.
@ServiceActivator(inputChannel = "inputMessageChannel")
public void messageReceiver(
    String payload,
    @Header(GcpPubSubHeaders.ORIGINAL_MESSAGE) BasicAcknowledgeablePubsubMessage message) {
  LOGGER.info("Message arrived via an inbound channel adapter from sub-one! Payload: " + payload);
  message.ack();
}

L'exemple ci-dessus utilise les beans Spring et la ressource Pub/Sub suivants:

  • Un bean de canal de messagerie appelé inputMessageChannel.
  • Un bean d'adaptateur de canal entrant, nommé inboundChannelAdapter et de type PubSubInboundChannelAdapter.
  • Un ID d'abonnement Pub/Sub nommé sub-one.

Le bean inboundChannelAdapter récupère de manière asynchrone les messages de sub-one à l'aide d'un modèle PubSubTemplate et les envoie à inputMessageChannel.

inboundChannelAdapter définit le mode de confirmation sur MANUAL afin que l'application puisse confirmer la réception des messages après leur traitement. Le mode d'accusé de réception par défaut des types PubSubInboundChannelAdapter est AUTO.

Le bean ServiceActivator messageReceiver enregistre chaque message reçu dans inputMessageChannel à la sortie standard puis accuse réception du message.

Publier des messages dans Pub/Sub

Pour publier des messages depuis un canal de message vers un sujet Pub/Sub, utilisez un adaptateur de canal sortant. L'adaptateur de canal sortant convertit les POJO en messages Pub/Sub, puis envoie les messages à un sujet Pub/Sub.

// Create an outbound channel adapter to send messages from the input message channel to the
// topic `topic-two`.
@Bean
@ServiceActivator(inputChannel = "inputMessageChannel")
public MessageHandler messageSender(PubSubTemplate pubsubTemplate) {
  PubSubMessageHandler adapter = new PubSubMessageHandler(pubsubTemplate, "topic-two");

  adapter.setSuccessCallback(
      ((ackId, message) ->
          LOGGER.info("Message was sent via the outbound channel adapter to topic-two!")));

  adapter.setFailureCallback(
      (cause, message) -> LOGGER.info("Error sending " + message + " due to " + cause));

  return adapter;
}

L'exemple ci-dessus utilise les beans Spring et la ressource Pub/Sub suivants :

  • Un bean de canal de messagerie appelé inputMessageChannel.
  • Un adaptateur de canal sortant nommé messageSender du type PubSubMessageHandler.
  • Un ID de sujet Pub/Sub nommé topic-two.

Le bean ServiceActivator applique la logique dans messageSender à chaque message du bean inputMessageChannel.

Le PubSubMessageHandler dans messageSender publie les messages dans le bean inputMessageChannel à l'aide d'un modèle PubSubTemplate. Le PubSubMessageHandler publie les messages sur le sujet Pub/Sub topic-two.

Utiliser Spring Cloud Stream Binder

Pour appeler l'API Pub/Sub dans une application Spring Cloud Stream, utilisez le module Spring Cloud GCP Pub/Sub Stream Binder.

Installer le module

Pour installer le module Spring Cloud Stream Binder, ajoutez les éléments suivants à votre fichier pom.xml :

  1. La nomenclature BOM Spring Cloud GCP.

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>com.google.cloud</groupId>
          <artifactId>spring-cloud-gcp-dependencies</artifactId>
          <version>3.7.7</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
  2. L'artefact Spring Cloud Stream Binder :

    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>spring-cloud-gcp-pubsub-stream-binder</artifactId>
    </dependency>

Recevoir des messages à partir de Pub/Sub

Pour utiliser votre application en tant que récepteur d'événements, configurez le binder d'entrée en spécifiant les éléments suivants :

  • Un bean Consumer qui définit la logique de traitement des messages. Par exemple, le bean Consumer suivant est nommé receiveMessageFromTopicTwo :

    // Create an input binder to receive messages from `topic-two` using a Consumer bean.
    @Bean
    public Consumer<Message<String>> receiveMessageFromTopicTwo() {
      return message -> {
        LOGGER.info(
            "Message arrived via an input binder from topic-two! Payload: " + message.getPayload());
      };
    }
  • Un ID de sujet Pub/Sub dans le fichier de configuration application.properties. Par exemple, le fichier de configuration suivant utilise un ID de sujet Pub/Sub nommé topic-two :

    # Bind the Pub/Sub topic `topic-two` to the Consumer bean
    # `receiveMessageFromTopicTwo`. Your Spring application will
    # automatically create and attach a subscription to the topic.
    spring.cloud.stream.bindings.receiveMessageFromTopicTwo-in-0.destination=topic-two

L'exemple de code reçoit les messages de Pub/Sub en effectuant les opérations suivantes :

  1. Il recherche l'ID du sujet Pub/Sub topic-two dans la destination de la liaison d'entrée dans application.properties.
  2. Il crée un abonnement Pub/Sub sur topic-two.
  3. Il utilise le nom de liaison receiveMessageFromTopicTwo-in-0 pour rechercher le bean Consumer nommé receiveMessageFromTopicTwo.
  4. Il affiche les messages entrants dans la sortie standard et les confirme automatiquement.

Publier des messages dans Pub/Sub

Pour utiliser votre application en tant que récepteur d'événements, configurez le binder de sortie en spécifiant les éléments suivants :

  • Un bean Supplier qui définit l'origine des messages au sein de votre application. Par exemple, le bean Supplier suivant est nommé sendMessageToTopicOne :

    // Create an output binder to send messages to `topic-one` using a Supplier bean.
    @Bean
    public Supplier<Flux<Message<String>>> sendMessageToTopicOne() {
      return () ->
          Flux.<Message<String>>generate(
                  sink -> {
                    try {
                      Thread.sleep(10000);
                    } catch (InterruptedException e) {
                      // Stop sleep earlier.
                    }
    
                    Message<String> message =
                        MessageBuilder.withPayload("message-" + rand.nextInt(1000)).build();
                    LOGGER.info(
                        "Sending a message via the output binder to topic-one! Payload: "
                            + message.getPayload());
                    sink.next(message);
                  })
              .subscribeOn(Schedulers.boundedElastic());
    }
  • Un ID de sujet Pub/Sub dans le fichier de configuration application.properties. Par exemple, le fichier de configuration suivant utilise un ID de sujet Pub/Sub nommé topic-one :

    # Bind the Supplier bean `sendMessageToTopicOne` to the Pub/Sub topic
    # `topic-one`. If the topic does not exist, one will be created.
    spring.cloud.stream.bindings.sendMessageToTopicOne-out-0.destination=topic-one

L'exemple de code publie des messages dans Pub/Sub en effectuant les opérations suivantes :

  1. Il recherche l'ID du sujet Pub/Sub topic-one dans la destination de la liaison de sortie dans application.properties.
  2. Il utilise le nom de liaison sendMessageToTopicOne-out-0 pour rechercher le bean Supplier nommé sendMessageToTopicOne.
  3. Il envoie un message numéroté à topic-one toutes les 10 secondes.