Pub/Sub in Spring-Anwendungen verwenden

Auf dieser Seite wird beschrieben, wie Sie Pub/Sub in Java-Anwendungen verwenden, die mit dem Spring Framework erstellt wurden.

Spring Cloud GCP bietet mehrere Module, um mithilfe des Spring Framework Nachrichten an Pub/Sub-Themen zu senden und Nachrichten von Pub/Sub-Abos zu empfangen. Sie können diese Module unabhängig voneinander verwenden oder für verschiedene Anwendungsfälle kombinieren:

HINWEIS: Die Spring Cloud GCP-Bibliothek bietet keinen Zugriff auf AckReplyConsumerWithResponse, ein erforderliches Modul, um das genau einmalige Feature mithilfe der Java-Clientbibliothek zu implementieren.

Hinweise

  1. Melden Sie sich bei Ihrem Google Cloud-Konto an. Wenn Sie mit Google Cloud noch nicht vertraut sind, erstellen Sie ein Konto, um die Leistungsfähigkeit unserer Produkte in der Praxis sehen und bewerten zu können. Neukunden erhalten außerdem ein Guthaben von 300 $, um Arbeitslasten auszuführen, zu testen und bereitzustellen.
  2. Richten Sie ein Google Cloud Console-Projekt ein.

    Projekt einrichten

    Führen Sie folgende Schritte per Mausklick aus:

    • Ein Projekt erstellen oder auswählen.
    • Aktivieren Sie die Pub/Sub API für dieses Projekt.
    • Erstellen Sie ein Dienstkonto.
    • Laden Sie einen privaten Schlüssel als JSON-Datei herunter.

    Sie können diese Ressourcen jederzeit in der Google Cloud Console aufrufen und verwalten.

  3. Legen Sie die Umgebungsvariable GOOGLE_APPLICATION_CREDENTIALS auf den Pfad der JSON-Datei fest, die Ihre Anmeldedaten enthält. Diese Variable gilt nur für Ihre aktuelle Shell-Sitzung. Wenn Sie eine neue Sitzung öffnen, müssen Sie die Variable neu festlegen.

  4. Richten Sie ein Google Cloud Console-Projekt ein.

    Projekt einrichten

    Führen Sie folgende Schritte per Mausklick aus:

    • Ein Projekt erstellen oder auswählen.
    • Aktivieren Sie die Pub/Sub API für dieses Projekt.
    • Erstellen Sie ein Dienstkonto.
    • Laden Sie einen privaten Schlüssel als JSON-Datei herunter.

    Sie können diese Ressourcen jederzeit in der Google Cloud Console aufrufen und verwalten.

  5. Legen Sie die Umgebungsvariable GOOGLE_APPLICATION_CREDENTIALS auf den Pfad der JSON-Datei fest, die Ihre Anmeldedaten enthält. Diese Variable gilt nur für Ihre aktuelle Shell-Sitzung. Wenn Sie eine neue Sitzung öffnen, müssen Sie die Variable neu festlegen.

  6. Legen Sie die Umgebungsvariable GOOGLE_CLOUD_PROJECT auf Ihre Google Cloud-Projekt-ID fest.

Spring Cloud GCP Pub/Sub Starter verwenden

Das Modul Spring Cloud GCP Pub/Sub Starter installiert die Pub/Sub-Java-Clientbibliothek mithilfe des Moduls Spring Cloud GCP Pub/Sub. Sie können die Pub/Sub API aus Ihrer Spring-Anwendung mit den Klassen aufrufen, die der Spring Cloud GCP Pub/Sub Starter oder die Pub/Sub Java-Clientbibliothek bereitstellt. Wenn Sie die Klassen verwenden, die vom Spring Cloud GCP Pub/Sub Starter bereitgestellt werden, können Sie die standardmäßigen Pub/Sub-Konfigurationen überschreiben.

Modul installieren

Fügen Sie der Datei pom.xml diese Abhängigkeiten hinzu, um das Modul „Spring Cloud GCP Pub/Sub-Starter“ zu installieren:

  1. Die Materialliste von Spring Cloud:

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>com.google.cloud</groupId>
          <artifactId>spring-cloud-gcp-dependencies</artifactId>
          <version>3.7.7</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
  2. Das Artefakt „Spring Cloud GCP Pub/Sub Starter“:

    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
    </dependency>

Unterstützte Vorgänge

Das Modul „Spring Cloud GCP Pub/Sub-Starter“ enthält die folgenden Klassen:

  • PubSubAdmin für Verwaltungsvorgänge:
    • Themen und Abos erstellen.
    • Themen und Abos abrufen.
    • Themen und Abos auflisten.
    • Themen und Abos löschen.
    • Bestätigungsfristen für ein Abo abrufen und festlegen.
  • PubSubTemplate, um Nachrichten zu senden und zu empfangen:
    • Nachrichten in Themen veröffentlichen.
    • Nachrichten synchron von Abos abrufen.
    • Nachrichten asynchron von Abos abrufen.
    • Nachrichten bestätigen.
    • Bestätigungsfristen ändern.
    • Pub/Sub-Nachrichten in Plain Old Java Objects (POJOs) konvertieren.

Spring Integration-Kanaladapter verwenden

Wenn Ihre Spring-Anwendung Spring Integration-Nachrichtenkanäle verwendet, können Sie Nachrichten mithilfe von Kanaladaptern zwischen Ihren Nachrichtenkanälen und Pub/Sub weiterleiten.

  • Ein Eingangskanaladapter leitet Nachrichten von einem Pub/Sub-Abo an einen Nachrichtenkanal weiter.
  • Ein Ausgangskanaladapter veröffentlicht Nachrichten von einem Nachrichtenkanal in einem Pub/Sub-Thema.

Module installieren

Fügen Sie der Datei pom.xml Folgendes hinzu, um Module für Spring Integration-Kanaladapter zu installieren:

  1. Spring Cloud GCP BOM

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>com.google.cloud</groupId>
          <artifactId>spring-cloud-gcp-dependencies</artifactId>
          <version>3.7.7</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
  2. Die Artefakte „Spring Cloud GCP Pub/Sub Starter“ und „Spring Integration Core“:

    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.integration</groupId>
      <artifactId>spring-integration-core</artifactId>
    </dependency>

Nachrichten von Pub/Sub empfangen

Verwenden Sie einen Eingangskanaladapter, um Nachrichten aus einem Pub/Sub-Abo in Ihrer Spring-Anwendung zu empfangen. Der Eingangskanaladapter konvertiert eingehende Pub/Sub-Nachrichten in POJOs und leitet dann die POJOs an einen Nachrichtenkanal weiter.

// Create a message channel for messages arriving from the subscription `sub-one`.
@Bean
public MessageChannel inputMessageChannel() {
  return new PublishSubscribeChannel();
}

// Create an inbound channel adapter to listen to the subscription `sub-one` and send
// messages to the input message channel.
@Bean
public PubSubInboundChannelAdapter inboundChannelAdapter(
    @Qualifier("inputMessageChannel") MessageChannel messageChannel,
    PubSubTemplate pubSubTemplate) {
  PubSubInboundChannelAdapter adapter =
      new PubSubInboundChannelAdapter(pubSubTemplate, "sub-one");
  adapter.setOutputChannel(messageChannel);
  adapter.setAckMode(AckMode.MANUAL);
  adapter.setPayloadType(String.class);
  return adapter;
}

// Define what happens to the messages arriving in the message channel.
@ServiceActivator(inputChannel = "inputMessageChannel")
public void messageReceiver(
    String payload,
    @Header(GcpPubSubHeaders.ORIGINAL_MESSAGE) BasicAcknowledgeablePubsubMessage message) {
  LOGGER.info("Message arrived via an inbound channel adapter from sub-one! Payload: " + payload);
  message.ack();
}

Im obigen Beispiel werden die folgenden Spring Beans und Pub/Sub-Ressourcen verwendet:

  • Eine Nachrichtenkanal-Bean namens inputMessageChannel.
  • Eine Eingangskanaladapter-Bean namens inboundChannelAdapter vom Typ PubSubInboundChannelAdapter.
  • Pub/Sub-Abo-ID namens sub-one.

Der inboundChannelAdapter ruft Nachrichten mithilfe eines PubSubTemplate asynchron aus sub-one ab und sendet die Nachrichten an inputMessageChannel.

Das inboundChannelAdapter setzt den Bestätigungsmodus auf MANUAL, damit die Anwendung Nachrichten bestätigen kann, nachdem sie sie verarbeitet hat. Der Standardbestätigungsmodus von PubSubInboundChannelAdapter-Typen ist AUTO.

Die ServiceActivator-Bean messageReceiver protokolliert jede in inputMessageChannel ankommende Nachricht in der Standardausgabe und bestätigt dann die Nachricht.

Nachrichten in Pub/Sub veröffentlichen

Verwenden Sie einen Ausgangskanaladapter, um Nachrichten von einem Nachrichtenkanal in einem Pub/Sub-Thema zu veröffentlichen. Der ausgehende Kanaladapter konvertiert POJOs in Pub/Sub-Nachrichten und sendet die Nachrichten dann an ein Pub/Sub-Thema.

// Create an outbound channel adapter to send messages from the input message channel to the
// topic `topic-two`.
@Bean
@ServiceActivator(inputChannel = "inputMessageChannel")
public MessageHandler messageSender(PubSubTemplate pubsubTemplate) {
  PubSubMessageHandler adapter = new PubSubMessageHandler(pubsubTemplate, "topic-two");

  adapter.setSuccessCallback(
      ((ackId, message) ->
          LOGGER.info("Message was sent via the outbound channel adapter to topic-two!")));

  adapter.setFailureCallback(
      (cause, message) -> LOGGER.info("Error sending " + message + " due to " + cause));

  return adapter;
}

Im obigen Beispiel werden die folgenden Spring-Beans und Pub/Sub-Ressourcen verwendet:

  • Eine Nachrichtenkanal-Bean namens inputMessageChannel.
  • Eine Ausgangskanaladapter-Bean namens messageSender vom Typ PubSubMessageHandler.
  • Eine Pub/Sub-Themen-ID namens topic-two.

Die ServiceActivator-Bean wendet die Logik in messageSender auf jede Nachricht in inputMessageChannel an.

Der PubSubMessageHandler in messageSender veröffentlicht Nachrichten im inputMessageChannel mithilfe eines PubSubTemplate. Der PubSubMessageHandler veröffentlicht Nachrichten für das Pub/Sub-Thema topic-two.

Spring Cloud Stream Binder verwenden

Verwenden Sie das Modul Spring Cloud GCP Pub/Sub Stream Binder, um die Pub/Sub API in einer Spring Cloud Stream-Anwendung aufzurufen.

Modul installieren

Fügen Sie der Datei pom.xml Folgendes hinzu, um das Modul „Spring Cloud Stream Bender“ zu installieren:

  1. Spring Cloud GCP BOM

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>com.google.cloud</groupId>
          <artifactId>spring-cloud-gcp-dependencies</artifactId>
          <version>3.7.7</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
  2. Das Artefakt „Spring Cloud Stream Bender“:

    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>spring-cloud-gcp-pubsub-stream-binder</artifactId>
    </dependency>

Nachrichten von Pub/Sub empfangen

Um Ihre Anwendung als Ereignissenke zu verwenden, konfigurieren Sie den Eingabebinder. Geben Sie dazu Folgendes an:

  • Eine Bean vom Typ Consumer, die die Logik der Nachrichtenverarbeitung definiert. Die folgende Bean vom Typ Consumer hat beispielsweise den Namen receiveMessageFromTopicTwo:

    // Create an input binder to receive messages from `topic-two` using a Consumer bean.
    @Bean
    public Consumer<Message<String>> receiveMessageFromTopicTwo() {
      return message -> {
        LOGGER.info(
            "Message arrived via an input binder from topic-two! Payload: " + message.getPayload());
      };
    }
  • Eine Pub/Sub-Themen-ID in der Konfigurationsdatei application.properties. Die folgende Konfigurationsdatei verwendet beispielsweise eine Pub/Sub-Themen-ID namens topic-two:

    # Bind the Pub/Sub topic `topic-two` to the Consumer bean
    # `receiveMessageFromTopicTwo`. Your Spring application will
    # automatically create and attach a subscription to the topic.
    spring.cloud.stream.bindings.receiveMessageFromTopicTwo-in-0.destination=topic-two

Der Beispielcode empfängt Nachrichten von Pub/Sub. Das Beispiel führt Folgendes aus:

  1. Ermittelt die Pub/Sub-Themen-ID topic-two im Eingabebindungsziel in application.properties.
  2. Erstellt ein Pub/Sub-Abo für topic-two.
  3. Verwendet den Bindungsnamen receiveMessageFromTopicTwo-in-0, um die Bean vom Typ Consumer namens receiveMessageFromTopicTwo zu finden.
  4. Gibt eingehende Nachrichten in der Standardausgabe aus und bestätigt sie automatisch.

Nachrichten in Pub/Sub veröffentlichen

Wenn Sie Ihre Anwendung als Ereignisquelle verwenden möchten, konfigurieren Sie den Ausgabebinder. Geben Sie dazu Folgendes an:

  • Eine Bean vom Typ Supplier, die definiert, woher Nachrichten aus Ihrer Anwendung stammen. Die folgende Bean vom Typ Supplier hat beispielsweise den Namen sendMessageToTopicOne:

    // Create an output binder to send messages to `topic-one` using a Supplier bean.
    @Bean
    public Supplier<Flux<Message<String>>> sendMessageToTopicOne() {
      return () ->
          Flux.<Message<String>>generate(
                  sink -> {
                    try {
                      Thread.sleep(10000);
                    } catch (InterruptedException e) {
                      // Stop sleep earlier.
                    }
    
                    Message<String> message =
                        MessageBuilder.withPayload("message-" + rand.nextInt(1000)).build();
                    LOGGER.info(
                        "Sending a message via the output binder to topic-one! Payload: "
                            + message.getPayload());
                    sink.next(message);
                  })
              .subscribeOn(Schedulers.boundedElastic());
    }
  • Eine Pub/Sub-Themen-ID in der Konfigurationsdatei application.properties. Die folgende Konfigurationsdatei verwendet beispielsweise eine Pub/Sub-Themen-ID namens topic-one:

    # Bind the Supplier bean `sendMessageToTopicOne` to the Pub/Sub topic
    # `topic-one`. If the topic does not exist, one will be created.
    spring.cloud.stream.bindings.sendMessageToTopicOne-out-0.destination=topic-one

Der Beispielcode veröffentlicht Nachrichten an Pub/Sub. Das Beispiel führt Folgendes aus:

  1. Findet die Pub/Sub-Themen-ID topic-one im Ausgabebindungsziel in application.properties.
  2. Verwendet den Bindungsnamen sendMessageToTopicOne-out-0, um die Bean vom Typ Supplier namens sendMessageToTopicOne zu finden.
  3. Sendet alle 10 Sekunden eine nummerierte Nachricht an topic-one.