Pub/Sub-Nachrichten schreiben und beantworten

Regions-ID

REGION_ID ist ein abgekürzter Code, den Google anhand der Region zuweist, die Sie beim Erstellen Ihrer Anwendung ausgewählt haben. Der Code bezieht sich nicht auf ein Land oder eine Provinz, auch wenn einige Regions-IDs häufig verwendeten Länder- und Provinzcodes ähneln können. Bei Anwendungen, die nach Februar 2020 erstellt wurden, ist REGION_ID.r in den App Engine-URLs enthalten. Bei Anwendungen, die vor diesem Datum erstellt wurden, ist die Regions-ID in der URL optional.

Hier finden Sie weitere Informationen zu Regions-IDs.

Pub/Sub bietet zuverlässiges und asynchrones m:n-Messaging zwischen Anwendungen. Publisher-Anwendungen können Nachrichten an ein bestimmtes Thema senden. Andere Anwendungen haben die Möglichkeit, dieses Thema zu abonnieren, um Nachrichten dazu zu erhalten.

In diesem Dokument wird beschrieben, wie Sie mit der Cloud-Clientbibliothek Pub/Sub-Nachrichten in einer Java 8-Anwendung senden und empfangen.

Vorbereitung

  • Folgen Sie der Anleitung unter "Hello, World!" für Java 8 in App Engine, um eine Umgebung und ein Projekt einzurichten. Darin erfahren Sie auch mehr über die Strukturierung von Java 8-Anwendungen in App Engine.
  • Notieren Sie sich Ihre Projekt-ID und bewahren Sie sie auf. Sie benötigen die ID zur Ausführung der in diesem Dokument beschriebenen Beispielanwendung.

Beispielanwendung klonen

Kopieren Sie die Beispielanwendungen auf Ihren lokalen Computer und rufen Sie das Verzeichnis pubsub auf:

git clone https://github.com/GoogleCloudPlatform/java-docs-samples
cd java-docs-samples/appengine-java8/pubsub

Thema und Abo erstellen

Erstellen Sie ein Thema und ein Abo, einschließlich des Endpunkts, an den der Pub/Sub-Server Anfragen senden soll:

 bv
# Configure the topic
gcloud pubsub topics create YOUR_TOPIC_NAME

# Configure the push subscription
gcloud pubsub subscriptions create YOUR_SUBSCRIPTION_NAME \
    --topic=YOUR_TOPIC_NAME \
    --push-endpoint=https://YOUR_PROJECT_ID.REGION_ID.r.appspot.com/push-handlers/receive_messages?token=YOUR_TOKEN \
    --ack-deadline=10

Ersetzen Sie YOUR_TOKEN durch ein geheimes, zufälliges Token. Der Push-Endpunkt verwendet dieses zum Verifizieren von Anfragen.

Erstellen Sie ein weiteres Abo, um Pub/Sub mit Authentifizierung zu verwenden:

# Configure the push subscription
gcloud pubsub subscriptions create YOUR_SUBSCRIPTION_NAME \
    --topic=YOUR_TOPIC_NAME \
    --push-auth-service-account=YOUR-SERVICE-ACCOUNT-EMAIL\
    --push-auth-token-audience=OPTIONAL_AUDIENCE_OVERRIDE\
    --push-endpoint=https://YOUR_PROJECT_ID.REGION_ID.r.appspot.com/push-handlers/receive_messages?token=YOUR_TOKEN \
    --ack-deadline=10
# Your service agent
# `service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com` needs to have the
# `iam.serviceAccountTokenCreator` role.
PUBSUB_SERVICE_ACCOUNT="service-${PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com"
gcloud projects add-iam-policy-binding ${PROJECT_ID} \
    --member="serviceAccount:${PUBSUB_SERVICE_ACCOUNT}"\
    --role='roles/iam.serviceAccountTokenCreator'

Ersetzen Sie YOUR-SERVICE-ACCOUNT-EMAIL durch die E-Mail-Adresse Ihres Dienstkontos.

Umgebungsvariablen festlegen

Bearbeiten Sie die appengine-web.xml -Datei, um die Umgebungsvariablen für Ihr Thema und das Verifizierungstoken festzulegen:

<appengine-web-app xmlns="http://appengine.google.com/ns/1.0">
  <threadsafe>true</threadsafe>
  <runtime>java8</runtime>

  <env-variables>
    <env-var name="PUBSUB_TOPIC" value="your-topic" />
    <env-var name="PUBSUB_VERIFICATION_TOKEN" value="your-verification-token" />
  </env-variables>
</appengine-web-app>

Code Review

In der Beispielanwendung kommt die Cloud-Clientbibliothek zum Einsatz.

Die Beispielanwendung verwendet die in der Datei appengine-web.xml festgelegten Werte zum Konfigurieren von Umgebungsvariablen. Auf der Grundlage dieser Werte bestätigt der Push-Anfrage-Handler, dass die Anfrage von Pub/Sub kommt und aus einer vertrauenswürdigen Quelle stammt:

String pubsubVerificationToken = System.getenv("PUBSUB_VERIFICATION_TOKEN");

Die Beispielanwendung verwaltet eine Cloud Datastore-Datenbankinstanz zum Speichern von Nachrichten. Das Servlet PubSubPush empfängt Push-Nachrichten und fügt sie der Datenbankinstanz messageRepository hinzu:

@WebServlet(value = "/pubsub/push")
public class PubSubPush extends HttpServlet {

  @Override
  public void doPost(HttpServletRequest req, HttpServletResponse resp)
      throws IOException, ServletException {
    String pubsubVerificationToken = System.getenv("PUBSUB_VERIFICATION_TOKEN");
    // Do not process message if request token does not match pubsubVerificationToken
    if (req.getParameter("token").compareTo(pubsubVerificationToken) != 0) {
      resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
      return;
    }
    // parse message object from "message" field in the request body json
    // decode message data from base64
    Message message = getMessage(req);
    try {
      messageRepository.save(message);
      // 200, 201, 204, 102 status codes are interpreted as success by the Pub/Sub system
      resp.setStatus(102);
    } catch (Exception e) {
      resp.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
    }
  }

  private Message getMessage(HttpServletRequest request) throws IOException {
    String requestBody = request.getReader().lines().collect(Collectors.joining("\n"));
    JsonElement jsonRoot = JsonParser.parseString(requestBody).getAsJsonObject();
    String messageStr = jsonRoot.getAsJsonObject().get("message").toString();
    Message message = gson.fromJson(messageStr, Message.class);
    // decode from base64
    String decoded = decode(message.getData());
    message.setData(decoded);
    return message;
  }

  private String decode(String data) {
    return new String(Base64.getDecoder().decode(data));
  }

  private final Gson gson = new Gson();
  private MessageRepository messageRepository;

  PubSubPush(MessageRepository messageRepository) {
    this.messageRepository = messageRepository;
  }

  public PubSubPush() {
    this.messageRepository = MessageRepositoryImpl.getInstance();
  }
}

Das PubSubPublish-Servlet interagiert mit der App Engine-Webanwendung, um neue Nachrichten zu veröffentlichen und empfangene Nachrichten anzuzeigen:

/*
 * Copyright 2017 Google Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.example.appengine.pubsub;

import com.google.cloud.ServiceOptions;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;
import java.io.IOException;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.http.HttpStatus;

@WebServlet(name = "Publish with PubSub", value = "/pubsub/publish")
public class PubSubPublish extends HttpServlet {

  @Override
  public void doPost(HttpServletRequest req, HttpServletResponse resp)
      throws IOException, ServletException {
    Publisher publisher = this.publisher;
    try {
      String topicId = System.getenv("PUBSUB_TOPIC");
      // create a publisher on the topic
      if (publisher == null) {
        ProjectTopicName topicName =
            ProjectTopicName.newBuilder()
                .setProject(ServiceOptions.getDefaultProjectId())
                .setTopic(topicId)
                .build();
        publisher = Publisher.newBuilder(topicName).build();
      }
      // construct a pubsub message from the payload
      final String payload = req.getParameter("payload");
      PubsubMessage pubsubMessage =
          PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8(payload)).build();

      publisher.publish(pubsubMessage);
      // redirect to home page
      resp.sendRedirect("/");
    } catch (Exception e) {
      resp.sendError(HttpStatus.SC_INTERNAL_SERVER_ERROR, e.getMessage());
    }
  }

  private Publisher publisher;

  public PubSubPublish() {}

  PubSubPublish(Publisher publisher) {
    this.publisher = publisher;
  }
}

Beispiel lokal ausführen

Bei einer lokalen Ausführung können Sie mit der Google Cloud CLI eine Authentifizierung für die Nutzung von Google Cloud APIs bereitstellen. Wenn Sie Ihre Umgebung wie unter Voraussetzungen beschrieben eingerichtet haben, wurde der gcloud init-Befehl zur Bereitstellung dieser Authentifizierung bereits ausgeführt.

mvn clean package

Legen Sie vor dem Starten Ihrer Anwendung Umgebungsvariablen fest:

export PUBSUB_VERIFICATION_TOKEN=[your-verification-token]
export PUBSUB_TOPIC=[your-topic]
mvn appengine:run

Push-Benachrichtigungen simulieren

Die Anwendung kann Nachrichten lokal senden, aber keine Push-Nachrichten lokal empfangen. Sie können jedoch eine Push-Nachricht simulieren, wenn Sie eine HTTP-Anfrage an den lokalen Push-Benachrichtigungsendpunkt senden. Das Beispiel enthält die Datei sample_message.json.

Sie verwenden curl oder einen httpie-Client, um eine HTTP-Anfrage vom Typ POST zu senden:

curl -H "Content-Type: application/json" -i --data @sample_message.json "localhost:8080/pubsub/push?token=[your-token]"

Oder

http POST ":8080/pubsub/push?token=[your-token]" < sample_message.json

Antwort:

HTTP/1.1 200 OK
Date: Wed, 26 Apr 2017 00:03:28 GMT
Content-Length: 0
Server: Jetty(9.3.8.v20160314)

Nachdem die Anfrage abgeschlossen ist, können Sie localhost:8080 aktualisieren und die Nachricht in der Liste der empfangenen Nachrichten sehen.

In App Engine ausführen

Zum Bereitstellen der Demo-Anwendung in App Engine mithilfe des gcloud-Befehlszeilentools führen Sie den folgenden Befehl in dem Verzeichnis aus, in dem sich die Datei pom.xml befindet:

mvn package appengine:deploy -Dapp.deploy.projectId=PROJECT_ID

Ersetzen Sie PROJECT_ID durch die ID Ihres Google Cloud-Projekts. Wenn in der Datei pom.xml bereits Ihre Projekt-ID angegeben ist, müssen Sie das Attribut -Dapp.deploy.projectId nicht in dem von Ihnen ausgeführten Befehl einfügen.

Sie können jetzt unter https://PROJECT_ID.REGION_ID.r.appspot.com auf die Anwendung zugreifen. Sie können das Formular zum Senden von Nachrichten verwenden, dabei ist jedoch nicht sicher, welche Instanz Ihrer Anwendung die Benachrichtigung erhält. Außerdem haben Sie die Möglichkeit, mehrere Nachrichten zu senden und die Seite zu aktualisieren, damit die empfangene Nachricht angezeigt wird.