Escribir y responder a mensajes de Pub/Sub

ID de región

El REGION_ID es un código abreviado que Google asigna en función de la región que selecciones al crear tu aplicación. El código no corresponde a un país o provincia, aunque algunos IDs de región pueden parecerse a los códigos de país y provincia que se usan habitualmente. En las aplicaciones creadas después de febrero del 2020, REGION_ID.r se incluye en las URLs de App Engine. En las aplicaciones creadas antes de esa fecha, el ID de región es opcional en la URL.

Más información sobre los IDs de región

Pub/Sub ofrece mensajes asíncronos y fiables, de muchos a muchos, entre aplicaciones. Las aplicaciones de los editores pueden enviar mensajes a un tema y otras aplicaciones se pueden suscribir a ese tema para recibir los mensajes.

En este documento se describe cómo usar la biblioteca de cliente de Cloud para enviar y recibir mensajes de Pub/Sub en una aplicación Java 8.

Requisitos previos

  • Sigue las instrucciones de "Hello, World!" para Java 8 en App Engine para configurar tu entorno y tu proyecto, y para entender cómo se estructuran las aplicaciones de Java 8 de App Engine.
  • Anota y guarda el ID de tu proyecto, ya que lo necesitarás para ejecutar la aplicación de ejemplo que se describe en este documento.

Clonar la aplicación de muestra

Copia las aplicaciones de ejemplo en tu máquina local y ve al directorio pubsub:

git clone https://github.com/GoogleCloudPlatform/java-docs-samples
cd java-docs-samples/appengine-java8/pubsub

Crear un tema y una suscripción

Crea un tema y una suscripción, lo que incluye especificar el endpoint al que el servidor de Pub/Sub debe enviar las solicitudes:

 bv
# Configure the topic
gcloud pubsub topics create YOUR_TOPIC_NAME

# Configure the push subscription
gcloud pubsub subscriptions create YOUR_SUBSCRIPTION_NAME \
    --topic=YOUR_TOPIC_NAME \
    --push-endpoint=https://YOUR_PROJECT_ID.REGION_ID.r.appspot.com/push-handlers/receive_messages?token=YOUR_TOKEN \
    --ack-deadline=10

Sustituye YOUR_TOKEN por un token aleatorio secreto. El endpoint de envío usa este valor para verificar las solicitudes.

Para usar Pub/Sub con autenticación, crea otra suscripción:

# Configure the push subscription
gcloud pubsub subscriptions create YOUR_SUBSCRIPTION_NAME \
    --topic=YOUR_TOPIC_NAME \
    --push-auth-service-account=YOUR-SERVICE-ACCOUNT-EMAIL\
    --push-auth-token-audience=OPTIONAL_AUDIENCE_OVERRIDE\
    --push-endpoint=https://YOUR_PROJECT_ID.REGION_ID.r.appspot.com/push-handlers/receive_messages?token=YOUR_TOKEN \
    --ack-deadline=10
# Your service agent
# `service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com` needs to have the
# `iam.serviceAccountTokenCreator` role.
PUBSUB_SERVICE_ACCOUNT="service-${PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com"
gcloud projects add-iam-policy-binding ${PROJECT_ID} \
    --member="serviceAccount:${PUBSUB_SERVICE_ACCOUNT}"\
    --role='roles/iam.serviceAccountTokenCreator'

Sustituye YOUR-SERVICE-ACCOUNT-EMAIL por el correo de tu cuenta de servicio.

Configurar variables de entorno

Edita el archivo appengine-web.xml para definir las variables de entorno de tu tema y tu token de verificación:

<appengine-web-app xmlns="http://appengine.google.com/ns/1.0">
  <threadsafe>true</threadsafe>
  <runtime>java8</runtime>

  <env-variables>
    <env-var name="PUBSUB_TOPIC" value="your-topic" />
    <env-var name="PUBSUB_VERIFICATION_TOKEN" value="your-verification-token" />
  </env-variables>
</appengine-web-app>

Revisión de código

La aplicación de ejemplo usa la biblioteca de cliente de Cloud.

La aplicación de ejemplo usa los valores que has definido en el archivo appengine-web.xml para configurar las variables de entorno. El controlador de solicitudes push usa estos valores para confirmar que la solicitud procede de Pub/Sub y de una fuente de confianza:

String pubsubVerificationToken = System.getenv("PUBSUB_VERIFICATION_TOKEN");

La aplicación de ejemplo mantiene una instancia de base de datos de Cloud Datastore para almacenar mensajes. El servlet PubSubPush recibe los mensajes insertados y los añade a la instancia de la base de datos messageRepository:

@WebServlet(value = "/pubsub/push")
public class PubSubPush extends HttpServlet {

  @Override
  public void doPost(HttpServletRequest req, HttpServletResponse resp)
      throws IOException, ServletException {
    String pubsubVerificationToken = System.getenv("PUBSUB_VERIFICATION_TOKEN");
    // Do not process message if request token does not match pubsubVerificationToken
    if (req.getParameter("token").compareTo(pubsubVerificationToken) != 0) {
      resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
      return;
    }
    // parse message object from "message" field in the request body json
    // decode message data from base64
    Message message = getMessage(req);
    try {
      messageRepository.save(message);
      // 200, 201, 204, 102 status codes are interpreted as success by the Pub/Sub system
      resp.setStatus(102);
    } catch (Exception e) {
      resp.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
    }
  }

  private Message getMessage(HttpServletRequest request) throws IOException {
    String requestBody = request.getReader().lines().collect(Collectors.joining("\n"));
    JsonElement jsonRoot = JsonParser.parseString(requestBody).getAsJsonObject();
    String messageStr = jsonRoot.getAsJsonObject().get("message").toString();
    Message message = gson.fromJson(messageStr, Message.class);
    // decode from base64
    String decoded = decode(message.getData());
    message.setData(decoded);
    return message;
  }

  private String decode(String data) {
    return new String(Base64.getDecoder().decode(data));
  }

  private final Gson gson = new Gson();
  private MessageRepository messageRepository;

  PubSubPush(MessageRepository messageRepository) {
    this.messageRepository = messageRepository;
  }

  public PubSubPush() {
    this.messageRepository = MessageRepositoryImpl.getInstance();
  }
}

El servlet PubSubPublish interactúa con la aplicación web de App Engine para publicar mensajes nuevos y mostrar los mensajes recibidos:

/*
 * Copyright 2017 Google Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.example.appengine.pubsub;

import com.google.cloud.ServiceOptions;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;
import java.io.IOException;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.http.HttpStatus;

@WebServlet(name = "Publish with PubSub", value = "/pubsub/publish")
public class PubSubPublish extends HttpServlet {

  @Override
  public void doPost(HttpServletRequest req, HttpServletResponse resp)
      throws IOException, ServletException {
    Publisher publisher = this.publisher;
    try {
      String topicId = System.getenv("PUBSUB_TOPIC");
      // create a publisher on the topic
      if (publisher == null) {
        ProjectTopicName topicName =
            ProjectTopicName.newBuilder()
                .setProject(ServiceOptions.getDefaultProjectId())
                .setTopic(topicId)
                .build();
        publisher = Publisher.newBuilder(topicName).build();
      }
      // construct a pubsub message from the payload
      final String payload = req.getParameter("payload");
      PubsubMessage pubsubMessage =
          PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8(payload)).build();

      publisher.publish(pubsubMessage);
      // redirect to home page
      resp.sendRedirect("/");
    } catch (Exception e) {
      resp.sendError(HttpStatus.SC_INTERNAL_SERVER_ERROR, e.getMessage());
    }
  }

  private Publisher publisher;

  public PubSubPublish() {}

  PubSubPublish(Publisher publisher) {
    this.publisher = publisher;
  }
}

Ejecutar la muestra de forma local

Cuando se ejecuta de forma local, puedes usar la CLI de Google Cloud para proporcionar autenticación y usar las APIs de Google Cloud. Si has configurado tu entorno tal como se describe en Requisitos previos, ya habrás ejecutado el comando gcloud init, que proporciona esta autenticación.

mvn clean package

A continuación, define las variables de entorno antes de iniciar la aplicación:

export PUBSUB_VERIFICATION_TOKEN=[your-verification-token]
export PUBSUB_TOPIC=[your-topic]
mvn appengine:run

Simular notificaciones push

La aplicación puede enviar mensajes de forma local, pero no puede recibir mensajes push de forma local. Sin embargo, puedes simular un mensaje push haciendo una solicitud HTTP al endpoint de notificaciones push local. El ejemplo incluye el archivo sample_message.json.

Puedes usar curl o un cliente httpie para enviar una solicitud HTTP POST:

curl -H "Content-Type: application/json" -i --data @sample_message.json "localhost:8080/pubsub/push?token=[your-token]"

O

http POST ":8080/pubsub/push?token=[your-token]" < sample_message.json

Respuesta:

HTTP/1.1 200 OK
Date: Wed, 26 Apr 2017 00:03:28 GMT
Content-Length: 0
Server: Jetty(9.3.8.v20160314)

Una vez que se haya completado la solicitud, puedes actualizar localhost:8080 y ver el mensaje en la lista de mensajes recibidos.

Ejecutar en App Engine

Para desplegar la aplicación de demostración en App Engine mediante la herramienta de línea de comandos gcloud, ejecuta el siguiente comando desde el directorio donde se encuentra pom.xml:

mvn package appengine:deploy -Dapp.deploy.projectId=PROJECT_ID

Sustituye PROJECT_ID por el ID de tu Google Cloud proyecto. Si tu archivo pom.xml ya especifica tu ID de proyecto, no es necesario que incluyas la propiedad -Dapp.deploy.projectId en el comando que ejecutes.

Ahora puedes acceder a la aplicación en https://PROJECT_ID.REGION_ID.r.appspot.com. Puedes usar el formulario para enviar mensajes, pero no hay ninguna garantía de qué instancia de tu aplicación recibirá la notificación. Puedes enviar varios mensajes y actualizar la página para ver el mensaje recibido.