Scrittura di messaggi Cloud Pub/Sub e risposta a tali messaggi

ID regione

REGION_ID è un codice abbreviato assegnato da Google in base alla regione selezionata quando crei l'app. Il codice non corrisponde a un paese o a una provincia, anche se alcuni ID regione possono sembrare simili ai codici di paesi e province di uso comune. Per le app create dopo febbraio 2020, REGION_ID.r è incluso negli URL di App Engine. Per le app esistenti create prima di questa data, l'ID regione è facoltativo nell'URL.

Scopri di più sugli ID regione.

Pub/Sub offre messaggistica asincrona many-to-many affidabile tra le applicazioni. Le applicazioni publisher possono inviare messaggi a un argomento e altre applicazioni possono abbonarsi all'argomento per ricevere i messaggi.

Questo documento descrive come utilizzare la libreria client Cloud per inviare e ricevere messaggi Pub/Sub in un'app Java 8.

Prerequisiti

  • Segui le istruzioni riportate in "Hello, World!" per Java 8 su App Engine per configurare l'ambiente e il progetto e per comprendere la struttura delle app Java 8 di App Engine.
  • Prendi nota e salva l'ID progetto, perché ti servirà per eseguire l'applicazione di esempio descritta in questo documento.

Clonazione dell'app di esempio

Copia le app di esempio sulla tua macchina locale e vai alla directory pubsub:

git clone https://github.com/GoogleCloudPlatform/java-docs-samples
cd java-docs-samples/appengine-java8/pubsub

Creazione di un argomento e una sottoscrizione

Crea un argomento e una sottoscrizione, che include la specifica dell'endpoint a cui deve inviare richieste il server Pub/Sub:

 bv
# Configure the topic
gcloud pubsub topics create YOUR_TOPIC_NAME

# Configure the push subscription
gcloud pubsub subscriptions create YOUR_SUBSCRIPTION_NAME \
    --topic=YOUR_TOPIC_NAME \
    --push-endpoint=https://YOUR_PROJECT_ID.REGION_ID.r.appspot.com/push-handlers/receive_messages?token=YOUR_TOKEN \
    --ack-deadline=10

Sostituisci YOUR_TOKEN con un token casuale segreto. L'endpoint push lo utilizza per verificare le richieste.

Per utilizzare Pub/Sub con l'autenticazione, crea un'altra sottoscrizione:

# Configure the push subscription
gcloud pubsub subscriptions create YOUR_SUBSCRIPTION_NAME \
    --topic=YOUR_TOPIC_NAME \
    --push-auth-service-account=YOUR-SERVICE-ACCOUNT-EMAIL\
    --push-auth-token-audience=OPTIONAL_AUDIENCE_OVERRIDE\
    --push-endpoint=https://YOUR_PROJECT_ID.REGION_ID.r.appspot.com/push-handlers/receive_messages?token=YOUR_TOKEN \
    --ack-deadline=10
# Your service agent
# `service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com` needs to have the
# `iam.serviceAccountTokenCreator` role.
PUBSUB_SERVICE_ACCOUNT="service-${PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com"
gcloud projects add-iam-policy-binding ${PROJECT_ID} \
    --member="serviceAccount:${PUBSUB_SERVICE_ACCOUNT}"\
    --role='roles/iam.serviceAccountTokenCreator'

Sostituisci YOUR-SERVICE-ACCOUNT-EMAIL con l'indirizzo email del tuo account di servizio.

Imposta le variabili di ambiente

Modifica il file appengine-web.xml per impostare le variabili di ambiente per l'argomento e il token di verifica:

<appengine-web-app xmlns="http://appengine.google.com/ns/1.0">
  <threadsafe>true</threadsafe>
  <runtime>java8</runtime>

  <env-variables>
    <env-var name="PUBSUB_TOPIC" value="your-topic" />
    <env-var name="PUBSUB_VERIFICATION_TOKEN" value="your-verification-token" />
  </env-variables>
</appengine-web-app>

Revisione del codice

L'app di esempio utilizza la libreria client Cloud.

L'app di esempio utilizza i valori impostati nel file appengine-web.xml per configurare le variabili di ambiente. Il gestore delle richieste push utilizza questi valori per confermare che la richiesta proviene da Pub/Sub e da una fonte attendibile:

String pubsubVerificationToken = System.getenv("PUBSUB_VERIFICATION_TOKEN");

L'app di esempio gestisce un'istanza del database Cloud Datastore per archiviare i messaggi. Il servlet PubSubPush riceve i messaggi push e li aggiunge all'istanza di database messageRepository:

@WebServlet(value = "/pubsub/push")
public class PubSubPush extends HttpServlet {

  @Override
  public void doPost(HttpServletRequest req, HttpServletResponse resp)
      throws IOException, ServletException {
    String pubsubVerificationToken = System.getenv("PUBSUB_VERIFICATION_TOKEN");
    // Do not process message if request token does not match pubsubVerificationToken
    if (req.getParameter("token").compareTo(pubsubVerificationToken) != 0) {
      resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
      return;
    }
    // parse message object from "message" field in the request body json
    // decode message data from base64
    Message message = getMessage(req);
    try {
      messageRepository.save(message);
      // 200, 201, 204, 102 status codes are interpreted as success by the Pub/Sub system
      resp.setStatus(102);
    } catch (Exception e) {
      resp.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
    }
  }

  private Message getMessage(HttpServletRequest request) throws IOException {
    String requestBody = request.getReader().lines().collect(Collectors.joining("\n"));
    JsonElement jsonRoot = JsonParser.parseString(requestBody).getAsJsonObject();
    String messageStr = jsonRoot.getAsJsonObject().get("message").toString();
    Message message = gson.fromJson(messageStr, Message.class);
    // decode from base64
    String decoded = decode(message.getData());
    message.setData(decoded);
    return message;
  }

  private String decode(String data) {
    return new String(Base64.getDecoder().decode(data));
  }

  private final Gson gson = new Gson();
  private MessageRepository messageRepository;

  PubSubPush(MessageRepository messageRepository) {
    this.messageRepository = messageRepository;
  }

  public PubSubPush() {
    this.messageRepository = MessageRepositoryImpl.getInstance();
  }
}

Il servlet PubSubPublish interagisce con l'app web App Engine per pubblicare nuovi messaggi e visualizzare quelli ricevuti:

/*
 * Copyright 2017 Google Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.example.appengine.pubsub;

import com.google.cloud.ServiceOptions;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;
import java.io.IOException;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.http.HttpStatus;

@WebServlet(name = "Publish with PubSub", value = "/pubsub/publish")
public class PubSubPublish extends HttpServlet {

  @Override
  public void doPost(HttpServletRequest req, HttpServletResponse resp)
      throws IOException, ServletException {
    Publisher publisher = this.publisher;
    try {
      String topicId = System.getenv("PUBSUB_TOPIC");
      // create a publisher on the topic
      if (publisher == null) {
        ProjectTopicName topicName =
            ProjectTopicName.newBuilder()
                .setProject(ServiceOptions.getDefaultProjectId())
                .setTopic(topicId)
                .build();
        publisher = Publisher.newBuilder(topicName).build();
      }
      // construct a pubsub message from the payload
      final String payload = req.getParameter("payload");
      PubsubMessage pubsubMessage =
          PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8(payload)).build();

      publisher.publish(pubsubMessage);
      // redirect to home page
      resp.sendRedirect("/");
    } catch (Exception e) {
      resp.sendError(HttpStatus.SC_INTERNAL_SERVER_ERROR, e.getMessage());
    }
  }

  private Publisher publisher;

  public PubSubPublish() {}

  PubSubPublish(Publisher publisher) {
    this.publisher = publisher;
  }
}

Eseguire il sample localmente

Quando esegui l'app localmente, puoi utilizzare Google Cloud CLI per fornire l'autenticazione per l'utilizzo delle API Google Cloud. Supponendo che tu abbia configurato l'ambiente come descritto in Prerequisiti, hai già eseguito il comando gcloud init, che fornisce questa autenticazione.

mvn clean package

Quindi imposta le variabili di ambiente prima di avviare l'applicazione:

export PUBSUB_VERIFICATION_TOKEN=[your-verification-token]
export PUBSUB_TOPIC=[your-topic]
mvn appengine:run

Simulazione delle notifiche push

L'applicazione può inviare messaggi localmente, ma non è in grado di ricevere messaggi push localmente. Tuttavia, puoi simulare un messaggio push inviando una richiesta HTTP all'endpoint di notifica push locale. L'esempio include il file sample_message.json.

Puoi utilizzare curl o un client httpie per inviare una richiesta HTTP POST:

curl -H "Content-Type: application/json" -i --data @sample_message.json "localhost:8080/pubsub/push?token=[your-token]"

Oppure

http POST ":8080/pubsub/push?token=[your-token]" < sample_message.json

Risposta:

HTTP/1.1 200 OK
Date: Wed, 26 Apr 2017 00:03:28 GMT
Content-Length: 0
Server: Jetty(9.3.8.v20160314)

Al termine della richiesta, puoi aggiornare localhost:8080 e visualizzare il messaggio nell'elenco dei messaggi ricevuti.

Esecuzione su App Engine

Per eseguire il deployment dell'app di demo in App Engine utilizzando lo strumento a riga di comando gcloud, esegui il seguente comando dalla directory in cui si trova pom.xml:

mvn package appengine:deploy -Dapp.deploy.projectId=PROJECT_ID

Sostituisci PROJECT_ID con l'ID del tuo progetto Google Cloud. Se il tuo file pom.xml specifica già il tuo ID progetto, non devi includere la proprietà -Dapp.deploy.projectId nel comando eseguito.

Ora puoi accedere all'applicazione all'indirizzo https://PROJECT_ID.REGION_ID.r.appspot.com. Puoi utilizzare il modulo per inviare messaggi, ma non è garantito quale istanza della tua applicazione riceverà la notifica. Puoi inviare più messaggi e aggiornare la pagina per visualizzare il messaggio ricevuto.