Cette page a été traduite par l'API Cloud Translation.
Switch to English

Utiliser Pub/Sub dans les applications Spring

Cette page explique comment utiliser Pub/Sub dans les applications Java créées avec le framework Spring.

Spring Cloud GCP comporte plusieurs modules permettant d'envoyer des messages à des sujets Pub/Sub et de recevoir des messages à partir des abonnements Pub/Sub à l'aide du framework Spring. Vous pouvez utiliser ces modules indépendamment ou les combiner pour différents cas d'utilisation:

Avant de commencer

  1. Connectez-vous à votre compte Google.

    Si vous n'en possédez pas déjà un, vous devez en créer un.

  2. Configurez un projet Cloud Console.

    Configurer un projet

    Cliquez pour effectuer les opérations suivantes :

    • Créer ou sélectionner un projet
    • Activez l'API Pub/Sub pour ce projet.
    • Créez un compte de service.
    • Téléchargez une clé privée au format JSON.

    Vous pouvez consulter et gérer ces ressources à tout moment dans Cloud Console.

  3. Définissez la variable d'environnement GOOGLE_APPLICATION_CREDENTIALS pour qu'elle pointe vers le chemin du fichier JSON contenant la clé de votre compte de service. Cette variable ne s'applique qu'à la session de shell actuelle. Par conséquent, si vous ouvrez une nouvelle session, vous devez de nouveau la définir.

  4. Définissez la variable d'environnement GOOGLE_CLOUD_PROJECT sur l'ID de votre projet Cloud.

Utiliser Spring Cloud Pub/Sub Starter

Le module Spring Cloud GCP Pub/Sub Starter installe la bibliothèque cliente Java pour Pub/Sub à l'aide du module Spring Cloud GCP Pub/Sub. Vous pouvez appeler l'API Pub/Sub à partir de votre application Spring à l'aide des classes fournies par Spring Cloud GCP Pub/Sub Starter ou la bibliothèque cliente Java pour Pub/Sub. Si vous utilisez les classes proposées par Spring Cloud Pub/Sub Starter, vous pouvez remplacer les configurations Pub/Sub par défaut.

Installer le module

Pour installer le module Spring Cloud GCP Pub/Sub Starter, ajoutez les dépendances suivantes à votre fichier pom.xml :

  1. La nomenclature BOM Spring Cloud GCP :

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>org.springframework.cloud</groupId>
          <artifactId>spring-cloud-gcp-dependencies</artifactId>
          <version>1.2.7.RELEASE</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
  2. L'artefact Spring Cloud GCP Pub/Sub Starter :

    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
    </dependency>

Opérations disponibles

Le module Spring Cloud GCP de Pub/Sub inclut les classes suivantes :

  • PubSubAdmin pour les opérations d'administration :
    • Créer des sujets et des abonnements.
    • Obtenir des sujets et des abonnements.
    • Répertorier les sujets et les abonnements.
    • Supprimer des sujets et des abonnements.
    • Obtenez et définissez des délais de confirmation pour un abonnement.
  • PubSubTemplate pour l'envoi et la réception des messages :
    • Publier des messages sur des sujets.
    • Extraire de manière synchrone les messages des abonnements.
    • Extraire de manière asynchrone les messages des abonnements
    • Confirmer des messages.
    • Modifier les délais de confirmation
    • Convertir les messages Pub/Sub en POJO (Plain Old Java Objects, anciens objets Java standards).

Utiliser les adaptateurs de canaux d'intégration de Spring

Si votre application Spring utilise des canaux de messagerie d'intégration Spring, vous pouvez router les messages entre vos canaux de messagerie et Pub/Sub à l'aide d'adaptateurs de canaux.

Installer les modules

Pour installer des modules pour les adaptateurs de canaux d'intégration Spring, ajoutez les éléments suivants à votre fichier pom.xml :

  1. La nomenclature BOM Spring Cloud GCP.

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>org.springframework.cloud</groupId>
          <artifactId>spring-cloud-gcp-dependencies</artifactId>
          <version>1.2.7.RELEASE</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
  2. Les artefacts Spring Cloud Pub/Sub Starter et Spring Integration Core :

    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.integration</groupId>
      <artifactId>spring-integration-core</artifactId>
    </dependency>

Recevoir des messages à partir de Pub/Sub

Pour recevoir des messages d'un abonnement Pub/Sub dans votre application Spring, utilisez un adaptateur de canal entrant. L'adaptateur de canal entrant convertit les messages Pub/Sub entrants en POJO, puis les transmet à un canal de messagerie.

// Create a message channel for messages arriving from the subscription `sub-one`.
@Bean
public MessageChannel inputMessageChannel() {
  return new PublishSubscribeChannel();
}

// Create an inbound channel adapter to listen to the subscription `sub-one` and send
// messages to the input message channel.
@Bean
public PubSubInboundChannelAdapter inboundChannelAdapter(
    @Qualifier("inputMessageChannel") MessageChannel messageChannel,
    PubSubTemplate pubSubTemplate) {
  PubSubInboundChannelAdapter adapter =
      new PubSubInboundChannelAdapter(pubSubTemplate, "sub-one");
  adapter.setOutputChannel(messageChannel);
  adapter.setAckMode(AckMode.MANUAL);
  adapter.setPayloadType(String.class);
  return adapter;
}

// Define what happens to the messages arriving in the message channel.
@ServiceActivator(inputChannel = "inputMessageChannel")
public void messageReceiver(
    String payload,
    @Header(GcpPubSubHeaders.ORIGINAL_MESSAGE) BasicAcknowledgeablePubsubMessage message) {
  LOGGER.info("Message arrived via an inbound channel adapter from sub-one! Payload: " + payload);
  message.ack();
}

L'exemple ci-dessus utilise les beans Spring et la ressource Pub/Sub suivants :

  • Un bean de canal de messagerie appelé inputMessageChannel.
  • Un bean d'adaptateur de canal entrant, nommé inboundChannelAdapter et de type PubSubInboundChannelAdapter.
  • ID d'abonnement Pub/Sub nommé sub-one.

La fonction inboundChannelAdapter extrait de manière asynchrone les messages de sub-one à l'aide d'un objet PubSubTemplate et les envoie à inputMessageChannel.

La méthode inboundChannelAdapter définit le mode de confirmation sur MANUAL, afin que l'application puisse accuser réception des messages après leur traitement. Le mode de confirmation par défaut pour les types PubSubInboundChannelAdapter est AUTO.

Le bean ServiceActivator messageReceiver enregistre chaque message reçu dans inputMessageChannel à la sortie standard puis accuse réception du message.

Publier des messages dans Pub/Sub

Pour publier des messages dans un sujet Pub/Sub à partir d'un canal, utilisez un adaptateur du canal sortant. L'adaptateur de canal sortant convertit les POJOs en messages Pub/Sub, puis les envoie à un sujet Pub/Sub.

// Create an outbound channel adapter to send messages from the input message channel to the
// topic `topic-two`.
@Bean
@ServiceActivator(inputChannel = "inputMessageChannel")
public MessageHandler messageSender(PubSubTemplate pubsubTemplate) {
  PubSubMessageHandler adapter = new PubSubMessageHandler(pubsubTemplate, "topic-two");

  adapter.setPublishCallback(
      new ListenableFutureCallback<String>() {
        @Override
        public void onFailure(Throwable throwable) {
          LOGGER.info("There was an error sending the message.");
        }

        @Override
        public void onSuccess(String result) {
          LOGGER.info("Message was sent via the outbound channel adapter to topic-two!");
        }
      });
  return adapter;
}

L'exemple ci-dessus utilise les beans Spring et la ressource Pub/Sub suivants :

  • Un bean de canal de messagerie appelé inputMessageChannel.
  • Un adaptateur de canal sortant nommé messageSender du type PubSubMessageHandler.
  • ID de sujet Pub/Sub nommé topic-two.

Le bean ServiceActivator applique la logique de messageSender à chaque message dans inputMessageChannel.

Le PubSubMessageHandler dans messageSender publie des messages dans le fichier inputMessageChannel à l'aide d'un objet PubSubTemplate. PubSubMessageHandler publie des messages dans le sujet Pub/Sub topic-two.

Utiliser le composant Spring Cloud Stream

Pour appeler l'API Pub/Sub dans une application Spring Cloud Stream, utilisez le module Spring Cloud GCP Pub/Sub Stream Binder.

Installer le module

Pour installer le module Spring Cloud Stream Binder, ajoutez les éléments suivants à votre fichier pom.xml :

  1. La nomenclature BOM Spring Cloud GCP.

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>org.springframework.cloud</groupId>
          <artifactId>spring-cloud-gcp-dependencies</artifactId>
          <version>1.2.7.RELEASE</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
  2. L'artefact Spring Cloud Stream Binder :

    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-gcp-pubsub-stream-binder</artifactId>
    </dependency>

Recevoir des messages à partir de Pub/Sub

Pour utiliser votre application en tant que récepteur d'événements, configurez le binder d'entrée en spécifiant les éléments suivants :

  • Un bean Consumer qui définit la logique de traitement des messages. Par exemple, le bean Consumer suivant s'appelle receiveMessageFromTopicTwo:

    // Create an input binder to receive messages from `topic-two` using a Consumer bean.
    @Bean
    public Consumer<Message<String>> receiveMessageFromTopicTwo() {
      return message -> {
        LOGGER.info(
            "Message arrived via an input binder from topic-two! Payload: " + message.getPayload());
      };
    }
  • Un ID de sujet Pub/Sub dans le fichier de configuration application.properties. Par exemple, le fichier de configuration suivant utilise un ID de sujet Pub/Sub nommé topic-two:

    # Bind the Pub/Sub topic `topic-two` to the Consumer bean
    # `receiveMessageFromTopicTwo`. Your Spring application will
    # automatically create and attach a subscription to the topic.
    spring.cloud.stream.bindings.receiveMessageFromTopicTwo-in-0.destination=topic-two

L'exemple de code reçoit les messages de Pub/Sub. L'exemple effectue les opérations suivantes:

  1. Il recherche l'ID du sujet Pub/Sub topic-two dans la destination de la liaison d'entrée dans application.properties.
  2. Crée un abonnement Pub/Sub pour topic-two.
  3. Il utilise le nom de liaison receiveMessageFromTopicTwo-in-0 pour rechercher le bean Consumer nommé receiveMessageFromTopicTwo.
  4. Il affiche les messages entrants dans la sortie standard et les confirme automatiquement.

Publier des messages dans Pub/Sub

Pour utiliser votre application en tant que récepteur d'événements, configurez le binder de sortie en spécifiant les éléments suivants :

  • Un bean Supplier qui définit la provenance des messages dans votre application. Par exemple, le bean Supplier suivant s'appelle sendMessageToTopicOne:

    // Create an output binder to send messages to `topic-one` using a Supplier bean.
    @Bean
    public Supplier<Flux<Message<String>>> sendMessageToTopicOne() {
      return () ->
          Flux.<Message<String>>generate(
              sink -> {
                try {
                  Thread.sleep(10000);
                } catch (InterruptedException e) {
                  // stop sleep earlier.
                }
    
                Message<String> message =
                    MessageBuilder.withPayload("message-" + rand.nextInt(1000)).build();
                LOGGER.info(
                    "Sending a message via the output binder to topic-one! Payload: "
                        + message.getPayload());
                sink.next(message);
              })
              .subscribeOn(Schedulers.elastic());
    }
  • Un ID de sujet Pub/Sub dans le fichier de configuration application.properties. Par exemple, le fichier de configuration suivant utilise un ID de sujet Pub/Sub nommé topic-one:

    # Bind the Supplier bean `sendMessageToTopicOne` to the Pub/Sub topic
    # `topic-one`. If the topic does not exist, one will be created.
    spring.cloud.stream.bindings.sendMessageToTopicOne-out-0.destination=topic-one

L'exemple de code publie des messages dans Pub/Sub. L'exemple effectue les opérations suivantes:

  1. Il recherche l'ID du sujet Pub/Sub topic-one dans la destination de la liaison de sortie dans application.properties.
  2. Il utilise le nom de liaison sendMessageToTopicOne-out-0 pour rechercher le bean Supplier nommé sendMessageToTopicOne.
  3. Il envoie un message numéroté à topic-one toutes les 10 secondes.