Escribir y responder mensajes de Pub/Sub

ID de región

REGION_ID es un código abreviado que Google asigna en función de la región que eliges cuando creas la app. El código no corresponde a un país ni a una provincia, aunque algunos ID de región puedan parecer similares a los códigos de país y provincia que se suelen usar. En el caso de las apps creadas después de febrero de 2020, REGION_ID.r se incluye en las URL de App Engine. En el caso de las apps existentes creadas antes de esta fecha, el ID de región es opcional en la URL.

Obtén más información acerca de los ID de región.

Pub/Sub proporciona mensajería asíncrona, confiable y de varios a varios entre aplicaciones. Las aplicaciones de publicador pueden enviar mensajes a un tema, y otras aplicaciones pueden suscribirse a ese tema para recibir los mensajes.

En este documento, se describe cómo usar la biblioteca cliente de Cloud para enviar y recibir mensajes de Pub/Sub en una app de Java 8.

Requisitos previos

  • Sigue las instrucciones que se proporcionan en “Hello, World!” para Java 8 en App Engine a fin de configurar el entorno y el proyecto, y comprender cómo se estructuran las apps de Java 8 en App Engine.
  • Anota y guarda el ID del proyecto porque lo necesitarás para ejecutar la aplicación de muestra que se describe en este documento.

Clona la app de muestra

Copia las apps de muestra en tu máquina local y navega hasta el directorio pubsub:

git clone https://github.com/GoogleCloudPlatform/java-docs-samples
cd java-docs-samples/appengine-java8/pubsub

Crea un tema y una suscripción

Crea un tema y una suscripción, lo que incluye especificar el extremo al que el servidor Pub/Sub debe enviar solicitudes:

 bv
# Configure the topic
gcloud pubsub topics create YOUR_TOPIC_NAME

# Configure the push subscription
gcloud pubsub subscriptions create YOUR_SUBSCRIPTION_NAME \
    --topic=YOUR_TOPIC_NAME \
    --push-endpoint=https://YOUR_PROJECT_ID.REGION_ID.r.appspot.com/push-handlers/receive_messages?token=YOUR_TOKEN \
    --ack-deadline=10

Reemplaza YOUR_TOKEN con un token secreto aleatorio. El extremo de envío lo utiliza para verificar las solicitudes.

Para usar Pub/Sub con autenticación, crea otra suscripción:

# Configure the push subscription
gcloud pubsub subscriptions create YOUR_SUBSCRIPTION_NAME \
    --topic=YOUR_TOPIC_NAME \
    --push-auth-service-account=YOUR-SERVICE-ACCOUNT-EMAIL\
    --push-auth-token-audience=OPTIONAL_AUDIENCE_OVERRIDE\
    --push-endpoint=https://YOUR_PROJECT_ID.REGION_ID.r.appspot.com/push-handlers/receive_messages?token=YOUR_TOKEN \
    --ack-deadline=10
# Your service agent
# `service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com` needs to have the
# `iam.serviceAccountTokenCreator` role.
PUBSUB_SERVICE_ACCOUNT="service-${PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com"
gcloud projects add-iam-policy-binding ${PROJECT_ID} \
    --member="serviceAccount:${PUBSUB_SERVICE_ACCOUNT}"\
    --role='roles/iam.serviceAccountTokenCreator'

Reemplaza YOUR-SERVICE-ACCOUNT-EMAIL por el correo electrónico de tu cuenta de servicio.

Configura variables de entorno

Edita el archivo appengine-web.xml a fin de establecer las variables de entorno para el tema y el token de verificación:

<appengine-web-app xmlns="http://appengine.google.com/ns/1.0">
  <threadsafe>true</threadsafe>
  <runtime>java8</runtime>

  <env-variables>
    <env-var name="PUBSUB_TOPIC" value="your-topic" />
    <env-var name="PUBSUB_VERIFICATION_TOKEN" value="your-verification-token" />
  </env-variables>
</appengine-web-app>

Revisión de código

La app de ejemplo usa la biblioteca cliente de Cloud.

La app de muestra usa los valores que estableces en el archivo appengine-web.xml para configurar las variables de entorno. El controlador de las solicitudes push utiliza estos valores para confirmar que la solicitud provenga de Pub/Sub y que se origine en una fuente confiable:

String pubsubVerificationToken = System.getenv("PUBSUB_VERIFICATION_TOKEN");

La app de muestra mantiene una instancia de base de datos de Cloud Datastore para almacenar mensajes. El servlet PubSubPush recibe los mensajes enviados y los agrega a la instancia de base de datos messageRepository:

@WebServlet(value = "/pubsub/push")
public class PubSubPush extends HttpServlet {

  @Override
  public void doPost(HttpServletRequest req, HttpServletResponse resp)
      throws IOException, ServletException {
    String pubsubVerificationToken = System.getenv("PUBSUB_VERIFICATION_TOKEN");
    // Do not process message if request token does not match pubsubVerificationToken
    if (req.getParameter("token").compareTo(pubsubVerificationToken) != 0) {
      resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
      return;
    }
    // parse message object from "message" field in the request body json
    // decode message data from base64
    Message message = getMessage(req);
    try {
      messageRepository.save(message);
      // 200, 201, 204, 102 status codes are interpreted as success by the Pub/Sub system
      resp.setStatus(102);
    } catch (Exception e) {
      resp.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
    }
  }

  private Message getMessage(HttpServletRequest request) throws IOException {
    String requestBody = request.getReader().lines().collect(Collectors.joining("\n"));
    JsonElement jsonRoot = JsonParser.parseString(requestBody).getAsJsonObject();
    String messageStr = jsonRoot.getAsJsonObject().get("message").toString();
    Message message = gson.fromJson(messageStr, Message.class);
    // decode from base64
    String decoded = decode(message.getData());
    message.setData(decoded);
    return message;
  }

  private String decode(String data) {
    return new String(Base64.getDecoder().decode(data));
  }

  private final Gson gson = new Gson();
  private MessageRepository messageRepository;

  PubSubPush(MessageRepository messageRepository) {
    this.messageRepository = messageRepository;
  }

  public PubSubPush() {
    this.messageRepository = MessageRepositoryImpl.getInstance();
  }
}

El servlet PubSubPublish interactúa con la app web de App Engine para publicar mensajes nuevos y mostrar los recibidos:

/*
 * Copyright 2017 Google Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.example.appengine.pubsub;

import com.google.cloud.ServiceOptions;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;
import java.io.IOException;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.http.HttpStatus;

@WebServlet(name = "Publish with PubSub", value = "/pubsub/publish")
public class PubSubPublish extends HttpServlet {

  @Override
  public void doPost(HttpServletRequest req, HttpServletResponse resp)
      throws IOException, ServletException {
    Publisher publisher = this.publisher;
    try {
      String topicId = System.getenv("PUBSUB_TOPIC");
      // create a publisher on the topic
      if (publisher == null) {
        ProjectTopicName topicName =
            ProjectTopicName.newBuilder()
                .setProject(ServiceOptions.getDefaultProjectId())
                .setTopic(topicId)
                .build();
        publisher = Publisher.newBuilder(topicName).build();
      }
      // construct a pubsub message from the payload
      final String payload = req.getParameter("payload");
      PubsubMessage pubsubMessage =
          PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8(payload)).build();

      publisher.publish(pubsubMessage);
      // redirect to home page
      resp.sendRedirect("/");
    } catch (Exception e) {
      resp.sendError(HttpStatus.SC_INTERNAL_SERVER_ERROR, e.getMessage());
    }
  }

  private Publisher publisher;

  public PubSubPublish() {}

  PubSubPublish(Publisher publisher) {
    this.publisher = publisher;
  }
}

Ejecuta la muestra de forma local

En la ejecución local, puedes usar Google Cloud CLI para proporcionar autenticación a fin de emplear las API de Google Cloud. Si configuraste tu entorno como se describe en Requisitos, ya ejecutaste el comando gcloud init que proporciona esta autenticación.

mvn clean package

A continuación, configura las variables de entorno antes de iniciar la aplicación:

export PUBSUB_VERIFICATION_TOKEN=[your-verification-token]
export PUBSUB_TOPIC=[your-topic]
mvn appengine:run

Simular notificaciones push

La aplicación puede enviar mensajes de forma local, pero no puede recibir mensajes push localmente. Sin embargo, puedes simular un mensaje push; para ello, envía una solicitud HTTP al extremo de notificación push local. La muestra incluye el archivo sample_message.json.

Puedes usar curl o un cliente httpie para enviar una solicitud HTTP POST:

curl -H "Content-Type: application/json" -i --data @sample_message.json "localhost:8080/pubsub/push?token=[your-token]"

O

http POST ":8080/pubsub/push?token=[your-token]" < sample_message.json

Respuesta:

HTTP/1.1 200 OK
Date: Wed, 26 Apr 2017 00:03:28 GMT
Content-Length: 0
Server: Jetty(9.3.8.v20160314)

Una vez completada la solicitud, puedes actualizar localhost:8080 y ver el mensaje en la lista de mensajes recibidos.

Ejecuta en App Engine

Para implementar la app de demostración en App Engine mediante la herramienta de línea de comandos de gcloud, debes ejecutar el siguiente comando desde el directorio en el que se encuentra pom.xml:

mvn package appengine:deploy -Dapp.deploy.projectId=PROJECT_ID

Reemplaza PROJECT_ID por el ID del proyecto de Google Cloud. Si tu archivo pom.xml ya especifica tu ID del proyecto, no necesitas incluir la propiedad -Dapp.deploy.projectId en el comando que ejecutas.

Ahora puedes acceder a la aplicación en https://PROJECT_ID.REGION_ID.r.appspot.com. Puedes usar el formulario para enviar mensajes, pero no hay forma de garantizar qué instancia de tu aplicación recibirá la notificación. Puedes enviar varios mensajes y actualizar la página para ver el mensaje recibido.