Writing and Responding to Pub/Sub Messages

Cloud Pub/Sub provides reliable, many-to-many, asynchronous messaging between applications. Publisher applications can send messages to a topic, and other applications can subscribe to that topic to receive the messages.

This document describes how to use the Google Cloud Client Library to send and receive Cloud Pub/Sub messages in a Java 8 app.

Prerequisites

  • Follow the instructions in "Hello, World!" for Java 8 on App Engine to set up your environment and project, and to understand how App Engine Java 8 apps are structured.
  • Write down and save your project ID, because you will need it to run the sample application described in this document.

Cloning the sample app

Copy the sample apps to your local machine, and navigate to the pubsub directory:

git clone https://github.com/GoogleCloudPlatform/java-docs-samples
cd java-docs-samples/appengine-java8/pubsub

Creating a topic and subscription

Create a topic and subscription, which includes specifying the endpoint to which the Pub/Sub server should send requests:

gcloud pubsub topics create YOUR_TOPIC_NAME
gcloud pubsub subscriptions create YOUR_SUBSCRIPTION_NAME \
    --topic YOUR_TOPIC_NAME \
    --push-endpoint \
    https://YOUR_PROJECT_ID.appspot.com/pubsub/push?token=YOUR_TOKEN \
    --ack-deadline 10

Replace YOUR_TOKEN with a secret random token. The push endpoint uses this to verify requests.

Setting environment variables

Edit the appengine-web.xml file to set the environment variables for your topic and verification token:

<appengine-web-app xmlns="http://appengine.google.com/ns/1.0">
  <threadsafe>true</threadsafe>
  <runtime>java8</runtime>

  <env-variables>
    <env-var name="PUBSUB_TOPIC" value="your-topic" />
    <env-var name="PUBSUB_VERIFICATION_TOKEN" value="your-verification-token" />
  </env-variables>
</appengine-web-app>

Code review

The sample app uses the Google Cloud Client Library.

The sample app uses the values you set in the appengine-web.xml file to configure environment variables. The push request handler uses these values to confirm that the request came from Pub/Sub and originated from a trusted source:

String pubsubVerificationToken = System.getenv("PUBSUB_VERIFICATION_TOKEN");

The sample app maintains a Cloud Datastore database instance to store messages. The PubSubPush servlet receives pushed messages and adds them to the messageRepository database instance:

@WebServlet(value = "/pubsub/push")
public class PubSubPush extends HttpServlet {

  @Override
  public void doPost(HttpServletRequest req, HttpServletResponse resp)
      throws IOException, ServletException {
    String pubsubVerificationToken = System.getenv("PUBSUB_VERIFICATION_TOKEN");
    // Do not process message if request token does not match pubsubVerificationToken
    if (req.getParameter("token").compareTo(pubsubVerificationToken) != 0) {
      resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
      return;
    }
    // parse message object from "message" field in the request body json
    // decode message data from base64
    Message message = getMessage(req);
    try {
      messageRepository.save(message);
      // 200, 201, 204, 102 status codes are interpreted as success by the Pub/Sub system
      resp.setStatus(102);
    } catch (Exception e) {
      resp.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
    }
  }

  private Message getMessage(HttpServletRequest request) throws IOException {
    String requestBody = request.getReader().lines().collect(Collectors.joining("\n"));
    JsonElement jsonRoot = jsonParser.parse(requestBody);
    String messageStr = jsonRoot.getAsJsonObject().get("message").toString();
    Message message = gson.fromJson(messageStr, Message.class);
    // decode from base64
    String decoded = decode(message.getData());
    message.setData(decoded);
    return message;
  }

  private String decode(String data) {
    return new String(Base64.getDecoder().decode(data));
  }

  private final Gson gson = new Gson();
  private final JsonParser jsonParser = new JsonParser();
  private MessageRepository messageRepository;

  PubSubPush(MessageRepository messageRepository) {
    this.messageRepository = messageRepository;
  }

  public PubSubPush() {
    this.messageRepository = MessageRepositoryImpl.getInstance();
  }
}

The PubSubPublish servlet interacts with the App Engine web app to publish new messages and display received messages:

/*
 * Copyright 2017 Google Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.example.appengine.pubsub;

import com.google.cloud.ServiceOptions;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;
import java.io.IOException;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.http.HttpStatus;

@WebServlet(name = "Publish with PubSub", value = "/pubsub/publish")
public class PubSubPublish extends HttpServlet {

  @Override
  public void doPost(HttpServletRequest req, HttpServletResponse resp)
      throws IOException, ServletException {
    Publisher publisher = this.publisher;
    try {
      String topicId = System.getenv("PUBSUB_TOPIC");
      // create a publisher on the topic
      if (publisher == null) {
        ProjectTopicName topicName =
            ProjectTopicName.newBuilder()
                .setProject(ServiceOptions.getDefaultProjectId())
                .setTopic(topicId)
                .build();
        publisher = Publisher.newBuilder(topicName).build();
      }
      // construct a pubsub message from the payload
      final String payload = req.getParameter("payload");
      PubsubMessage pubsubMessage =
          PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8(payload)).build();

      publisher.publish(pubsubMessage);
      // redirect to home page
      resp.sendRedirect("/");
    } catch (Exception e) {
      resp.sendError(HttpStatus.SC_INTERNAL_SERVER_ERROR, e.getMessage());
    }
  }

  private Publisher publisher;

  public PubSubPublish() {}

  PubSubPublish(Publisher publisher) {
    this.publisher = publisher;
  }
}

Running the sample locally

When running locally, you can use the Cloud SDK to provide authentication to use Google Cloud APIs. Assuming you set up your environment as described in Prerequisites, you have already run the gcloud init command, which provides this authentication.

mvn clean package

Then set environment variables before starting your application:

export PUBSUB_VERIFICATION_TOKEN=[your-verification-token] export PUBSUB_TOPIC=[your-topic] mvn appengine:run

Simulating push notifications

The application can send messages locally, but it is not able to receive push messages locally. You can, however, simulate a push message by making an HTTP request to the local push notification endpoint. The sample includes the file sample_message.json.

You can use curl or a httpie client to send an HTTP POST request:

curl -H "Content-Type: application/json" -i --data @sample_message.json "localhost:8080/pubsub/push?token=[your-token]"

Or

http POST ":8080/pubsub/push?token=[your-token]" < sample_message.json

Response:

HTTP/1.1 200 OK
Date: Wed, 26 Apr 2017 00:03:28 GMT
Content-Length: 0
Server: Jetty(9.3.8.v20160314)

After the request completes, you can refresh localhost:8080 and see the message in the list of received messages.

Running on App Engine

To deploy the demo app to App Engine by using the gcloud command-line tool, you run the following command from the directory where your pom.xml is located:

mvn appengine:deploy

You can now access the application at https://[YOUR_PROJECT_ID].appspot.com. You can use the form to submit messages, but there's no guarantee of which instance of your application will receive the notification. You can send multiple messages and refresh the page to see the received message.

本頁內容對您是否有任何幫助?請提供意見:

傳送您對下列選項的寶貴意見...

這個網頁
App Engine standard environment for Java 8