Pub/Sub in Spring-Anwendungen verwenden

Auf dieser Seite wird beschrieben, wie Sie Pub/Sub in Java-Anwendungen verwenden, die mit dem Spring Framework erstellt wurden.

Spring Cloud GCP bietet mehrere Module, um mithilfe des Spring Framework Nachrichten an Pub/Sub-Themen zu senden und Nachrichten von Pub/Sub-Abos zu empfangen. Sie können diese Module unabhängig voneinander verwenden oder für verschiedene Anwendungsfälle kombinieren:

HINWEIS: Die Spring Cloud GCP-Bibliothek bietet keinen Zugriff auf AckReplyConsumerWithResponse. Dieses Modul ist jedoch erforderlich, um die Funktion „Einmal genau“ mit der Java-Clientbibliothek zu implementieren.

Hinweise

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Set up a Google Cloud console project.

    Set up a project

    Click to:

    • Create or select a project.
    • Enable the Pub/Sub API for that project.
    • Create a service account.
    • Download a private key as JSON.

    You can view and manage these resources at any time in the Google Cloud console.

  3. Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again.

  4. Set up a Google Cloud console project.

    Set up a project

    Click to:

    • Create or select a project.
    • Enable the Pub/Sub API for that project.
    • Create a service account.
    • Download a private key as JSON.

    You can view and manage these resources at any time in the Google Cloud console.

  5. Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again.

  6. Legen Sie die Umgebungsvariable GOOGLE_CLOUD_PROJECT auf Ihre Google Cloud-Projekt-ID fest.

Spring Cloud GCP Pub/Sub Starter verwenden

Das Modul Spring Cloud GCP Pub/Sub Starter installiert die Pub/Sub-Java-Clientbibliothek mithilfe des Moduls Spring Cloud GCP Pub/Sub. Sie können die Pub/Sub API aus Ihrer Spring-Anwendung mit den Klassen aufrufen, die der Spring Cloud GCP Pub/Sub Starter oder die Pub/Sub Java-Clientbibliothek bereitstellt. Wenn Sie die Klassen verwenden, die vom Spring Cloud GCP Pub/Sub Starter bereitgestellt werden, können Sie die standardmäßigen Pub/Sub-Konfigurationen überschreiben.

Modul installieren

Fügen Sie der Datei pom.xml diese Abhängigkeiten hinzu, um das Modul „Spring Cloud GCP Pub/Sub-Starter“ zu installieren:

  1. Die Spring Cloud Bill of Materials (BOM):

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>com.google.cloud</groupId>
          <artifactId>spring-cloud-gcp-dependencies</artifactId>
          <version>3.7.7</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
  2. Das Artefakt „Spring Cloud GCP Pub/Sub Starter“:

    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
    </dependency>

Unterstützte Vorgänge

Das Modul „Spring Cloud GCP Pub/Sub-Starter“ enthält die folgenden Klassen:

  • PubSubAdmin für Verwaltungsvorgänge:
    • Themen und Abos erstellen.
    • Themen und Abos abrufen.
    • Themen und Abos auflisten.
    • Themen und Abos löschen.
    • Bestätigungsfristen für ein Abo abrufen und festlegen.
  • PubSubTemplate, um Nachrichten zu senden und zu empfangen:
    • Nachrichten in Themen veröffentlichen.
    • Nachrichten synchron von Abos abrufen.
    • Nachrichten asynchron von Abos abrufen.
    • Nachrichten bestätigen.
    • Bestätigungsfristen ändern.
    • Pub/Sub-Nachrichten in Plain Old Java Objects (POJOs) konvertieren.

Spring Integration-Kanaladapter verwenden

Wenn Ihre Spring-Anwendung Spring Integration-Nachrichtenkanäle verwendet, können Sie Nachrichten mithilfe von Kanaladaptern zwischen Ihren Nachrichtenkanälen und Pub/Sub weiterleiten.

  • Ein Eingangskanaladapter leitet Nachrichten von einem Pub/Sub-Abo an einen Nachrichtenkanal weiter.
  • Ein Ausgangskanaladapter veröffentlicht Nachrichten von einem Nachrichtenkanal in einem Pub/Sub-Thema.

Module installieren

Fügen Sie der Datei pom.xml Folgendes hinzu, um Module für Spring Integration-Kanaladapter zu installieren:

  1. Spring Cloud GCP BOM

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>com.google.cloud</groupId>
          <artifactId>spring-cloud-gcp-dependencies</artifactId>
          <version>3.7.7</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
  2. Die Artefakte „Spring Cloud GCP Pub/Sub Starter“ und „Spring Integration Core“:

    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.integration</groupId>
      <artifactId>spring-integration-core</artifactId>
    </dependency>

Nachrichten von Pub/Sub empfangen

Verwenden Sie einen Eingangskanaladapter, um Nachrichten aus einem Pub/Sub-Abo in Ihrer Spring-Anwendung zu empfangen. Der Eingangskanaladapter konvertiert eingehende Pub/Sub-Nachrichten in POJOs und leitet dann die POJOs an einen Nachrichtenkanal weiter.

// Create a message channel for messages arriving from the subscription `sub-one`.
@Bean
public MessageChannel inputMessageChannel() {
  return new PublishSubscribeChannel();
}

// Create an inbound channel adapter to listen to the subscription `sub-one` and send
// messages to the input message channel.
@Bean
public PubSubInboundChannelAdapter inboundChannelAdapter(
    @Qualifier("inputMessageChannel") MessageChannel messageChannel,
    PubSubTemplate pubSubTemplate) {
  PubSubInboundChannelAdapter adapter =
      new PubSubInboundChannelAdapter(pubSubTemplate, "sub-one");
  adapter.setOutputChannel(messageChannel);
  adapter.setAckMode(AckMode.MANUAL);
  adapter.setPayloadType(String.class);
  return adapter;
}

// Define what happens to the messages arriving in the message channel.
@ServiceActivator(inputChannel = "inputMessageChannel")
public void messageReceiver(
    String payload,
    @Header(GcpPubSubHeaders.ORIGINAL_MESSAGE) BasicAcknowledgeablePubsubMessage message) {
  LOGGER.info("Message arrived via an inbound channel adapter from sub-one! Payload: " + payload);
  message.ack();
}

Im obigen Beispiel werden die folgenden Spring Beans und Pub/Sub-Ressourcen verwendet:

  • Eine Nachrichtenkanal-Bean namens inputMessageChannel.
  • Eine Eingangskanaladapter-Bean namens inboundChannelAdapter vom Typ PubSubInboundChannelAdapter.
  • Pub/Sub-Abo-ID namens sub-one.

Der inboundChannelAdapter ruft Nachrichten mithilfe eines PubSubTemplate asynchron aus sub-one ab und sendet die Nachrichten an inputMessageChannel.

Das inboundChannelAdapter setzt den Bestätigungsmodus auf MANUAL, damit die Anwendung Nachrichten bestätigen kann, nachdem sie sie verarbeitet hat. Der Standardbestätigungsmodus von PubSubInboundChannelAdapter-Typen ist AUTO.

Die ServiceActivator-Bean messageReceiver protokolliert jede in inputMessageChannel ankommende Nachricht in der Standardausgabe und bestätigt dann die Nachricht.

Nachrichten in Pub/Sub veröffentlichen

Verwenden Sie einen Ausgangskanaladapter, um Nachrichten von einem Nachrichtenkanal in einem Pub/Sub-Thema zu veröffentlichen. Der ausgehende Kanaladapter konvertiert POJOs in Pub/Sub-Nachrichten und sendet die Nachrichten dann an ein Pub/Sub-Thema.

// Create an outbound channel adapter to send messages from the input message channel to the
// topic `topic-two`.
@Bean
@ServiceActivator(inputChannel = "inputMessageChannel")
public MessageHandler messageSender(PubSubTemplate pubsubTemplate) {
  PubSubMessageHandler adapter = new PubSubMessageHandler(pubsubTemplate, "topic-two");

  adapter.setSuccessCallback(
      ((ackId, message) ->
          LOGGER.info("Message was sent via the outbound channel adapter to topic-two!")));

  adapter.setFailureCallback(
      (cause, message) -> LOGGER.info("Error sending " + message + " due to " + cause));

  return adapter;
}

Im obigen Beispiel werden die folgenden Spring-Beans und Pub/Sub-Ressourcen verwendet:

  • Eine Nachrichtenkanal-Bean namens inputMessageChannel.
  • Eine Ausgangskanaladapter-Bean namens messageSender vom Typ PubSubMessageHandler.
  • Eine Pub/Sub-Themen-ID namens topic-two.

Die ServiceActivator-Bean wendet die Logik in messageSender auf jede Nachricht in inputMessageChannel an.

Der PubSubMessageHandler in messageSender veröffentlicht Nachrichten im inputMessageChannel mithilfe eines PubSubTemplate. Der PubSubMessageHandler veröffentlicht Nachrichten für das Pub/Sub-Thema topic-two.

Spring Cloud Stream Binder verwenden

Verwenden Sie das Modul Spring Cloud GCP Pub/Sub Stream Binder, um die Pub/Sub API in einer Spring Cloud Stream-Anwendung aufzurufen.

Modul installieren

Fügen Sie der Datei pom.xml Folgendes hinzu, um das Modul „Spring Cloud Stream Bender“ zu installieren:

  1. Spring Cloud GCP BOM

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>com.google.cloud</groupId>
          <artifactId>spring-cloud-gcp-dependencies</artifactId>
          <version>3.7.7</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
  2. Das Artefakt „Spring Cloud Stream Bender“:

    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>spring-cloud-gcp-pubsub-stream-binder</artifactId>
    </dependency>

Nachrichten von Pub/Sub empfangen

Um Ihre Anwendung als Ereignissenke zu verwenden, konfigurieren Sie den Eingabebinder. Geben Sie dazu Folgendes an:

  • Eine Bean vom Typ Consumer, die die Logik der Nachrichtenverarbeitung definiert. Die folgende Bean vom Typ Consumer hat beispielsweise den Namen receiveMessageFromTopicTwo:

    // Create an input binder to receive messages from `topic-two` using a Consumer bean.
    @Bean
    public Consumer<Message<String>> receiveMessageFromTopicTwo() {
      return message -> {
        LOGGER.info(
            "Message arrived via an input binder from topic-two! Payload: " + message.getPayload());
      };
    }
  • Eine Pub/Sub-Themen-ID in der Konfigurationsdatei application.properties. Die folgende Konfigurationsdatei verwendet beispielsweise eine Pub/Sub-Themen-ID namens topic-two:

    # Bind the Pub/Sub topic `topic-two` to the Consumer bean
    # `receiveMessageFromTopicTwo`. Your Spring application will
    # automatically create and attach a subscription to the topic.
    spring.cloud.stream.bindings.receiveMessageFromTopicTwo-in-0.destination=topic-two

Der Beispielcode empfängt Nachrichten von Pub/Sub. Das Beispiel führt Folgendes aus:

  1. Ermittelt die Pub/Sub-Themen-ID topic-two im Eingabebindungsziel in application.properties.
  2. Erstellt ein Pub/Sub-Abo für topic-two.
  3. Verwendet den Bindungsnamen receiveMessageFromTopicTwo-in-0, um die Bean vom Typ Consumer namens receiveMessageFromTopicTwo zu finden.
  4. Gibt eingehende Nachrichten in der Standardausgabe aus und bestätigt sie automatisch.

Nachrichten in Pub/Sub veröffentlichen

Wenn Sie Ihre Anwendung als Ereignisquelle verwenden möchten, konfigurieren Sie den Ausgabebinder. Geben Sie dazu Folgendes an:

  • Eine Bean vom Typ Supplier, die definiert, woher Nachrichten aus Ihrer Anwendung stammen. Die folgende Bean vom Typ Supplier hat beispielsweise den Namen sendMessageToTopicOne:

    // Create an output binder to send messages to `topic-one` using a Supplier bean.
    @Bean
    public Supplier<Flux<Message<String>>> sendMessageToTopicOne() {
      return () ->
          Flux.<Message<String>>generate(
                  sink -> {
                    try {
                      Thread.sleep(10000);
                    } catch (InterruptedException e) {
                      // Stop sleep earlier.
                    }
    
                    Message<String> message =
                        MessageBuilder.withPayload("message-" + rand.nextInt(1000)).build();
                    LOGGER.info(
                        "Sending a message via the output binder to topic-one! Payload: "
                            + message.getPayload());
                    sink.next(message);
                  })
              .subscribeOn(Schedulers.boundedElastic());
    }
  • Eine Pub/Sub-Themen-ID in der Konfigurationsdatei application.properties. Die folgende Konfigurationsdatei verwendet beispielsweise eine Pub/Sub-Themen-ID namens topic-one:

    # Bind the Supplier bean `sendMessageToTopicOne` to the Pub/Sub topic
    # `topic-one`. If the topic does not exist, one will be created.
    spring.cloud.stream.bindings.sendMessageToTopicOne-out-0.destination=topic-one

Der Beispielcode veröffentlicht Nachrichten an Pub/Sub. Das Beispiel führt Folgendes aus:

  1. Findet die Pub/Sub-Themen-ID topic-one im Ausgabebindungsziel in application.properties.
  2. Verwendet den Bindungsnamen sendMessageToTopicOne-out-0, um die Bean vom Typ Supplier namens sendMessageToTopicOne zu finden.
  3. Sendet alle 10 Sekunden eine nummerierte Nachricht an topic-one.