Writing and Responding to Pub/Sub Messages

Region ID

The REGION_ID is an abbreviated code that Google assigns based on the region you select when you create your app. The code does not correspond to a country or province, even though some region IDs may appear similar to commonly used country and province codes. For apps created after February 2020, REGION_ID.r is included in App Engine URLs. For existing apps created before this date, the region ID is optional in the URL.

Learn more about region IDs.

Pub/Sub provides reliable, many-to-many, asynchronous messaging between applications. Publisher applications can send messages to a topic, and other applications can subscribe to that topic to receive the messages.

This document describes how to use the Cloud Client Library to send and receive Pub/Sub messages in a Java 8 app.

Prerequisites

  • Follow the instructions in "Hello, World!" for Java 8 on App Engine to set up your environment and project, and to understand how App Engine Java 8 apps are structured.
  • Write down and save your project ID, because you will need it to run the sample application described in this document.

Cloning the sample app

Copy the sample apps to your local machine, and navigate to the pubsub directory:

git clone https://github.com/GoogleCloudPlatform/java-docs-samples
cd java-docs-samples/appengine-java8/pubsub

Creating a topic and subscription

Create a topic and subscription, which includes specifying the endpoint to which the Pub/Sub server should send requests:

 bv
# Configure the topic
gcloud pubsub topics create YOUR_TOPIC_NAME

# Configure the push subscription
gcloud pubsub subscriptions create YOUR_SUBSCRIPTION_NAME \
    --topic=YOUR_TOPIC_NAME \
    --push-endpoint=https://YOUR_PROJECT_ID.REGION_ID.r.appspot.com/push-handlers/receive_messages?token=YOUR_TOKEN \
    --ack-deadline=10

Replace YOUR_TOKEN with a secret random token. The push endpoint uses this to verify requests.

To use Pub/Sub with authentication, create another subscription:

# Configure the push subscription
gcloud pubsub subscriptions create YOUR_SUBSCRIPTION_NAME \
    --topic=YOUR_TOPIC_NAME \
    --push-auth-service-account=YOUR-SERVICE-ACCOUNT-EMAIL\
    --push-auth-token-audience=OPTIONAL_AUDIENCE_OVERRIDE\
    --push-endpoint=https://YOUR_PROJECT_ID.REGION_ID.r.appspot.com/push-handlers/receive_messages?token=YOUR_TOKEN \
    --ack-deadline=10

# Your Google-managed service account
# `service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com` needs to have the
# `iam.serviceAccountTokenCreator` role.
PUBSUB_SERVICE_ACCOUNT="service-${PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com"
gcloud projects add-iam-policy-binding ${PROJECT_ID} \
    --member="serviceAccount:${PUBSUB_SERVICE_ACCOUNT}"\
    --role='roles/iam.serviceAccountTokenCreator'
Replace `YOUR-SERVICE-ACCOUNT-EMAIL` with your [service account](/appengine/docs/flexible/configure-service-accounts) email. ### Setting environment variables {: edit_appyaml} Edit the ` appengine-web.xml ` file to set the environment variables for your topic and verification token:
<appengine-web-app xmlns="http://appengine.google.com/ns/1.0">
  <threadsafe>true</threadsafe>
  <runtime>java8</runtime>

  <env-variables>
    <env-var name="PUBSUB_TOPIC" value="your-topic" />
    <env-var name="PUBSUB_VERIFICATION_TOKEN" value="your-verification-token" />
  </env-variables>
</appengine-web-app>
## Code review The sample app uses the [Cloud Client Library](https://googleapis.dev/java/google-cloud-clients/latest/index.html){: class="external"}. The sample app uses the values you set in the ` appengine-web.xml ` file to configure environment variables. The push request handler uses these values to confirm that the request came from Pub/Sub and originated from a trusted source: String pubsubVerificationToken = System.getenv("PUBSUB_VERIFICATION_TOKEN"); The sample app maintains a Cloud Datastore database instance to store messages. The `PubSubPush` servlet receives pushed messages and adds them to the `messageRepository` database instance:
@WebServlet(value = "/pubsub/push")
public class PubSubPush extends HttpServlet {

  @Override
  public void doPost(HttpServletRequest req, HttpServletResponse resp)
      throws IOException, ServletException {
    String pubsubVerificationToken = System.getenv("PUBSUB_VERIFICATION_TOKEN");
    // Do not process message if request token does not match pubsubVerificationToken
    if (req.getParameter("token").compareTo(pubsubVerificationToken) != 0) {
      resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
      return;
    }
    // parse message object from "message" field in the request body json
    // decode message data from base64
    Message message = getMessage(req);
    try {
      messageRepository.save(message);
      // 200, 201, 204, 102 status codes are interpreted as success by the Pub/Sub system
      resp.setStatus(102);
    } catch (Exception e) {
      resp.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
    }
  }

  private Message getMessage(HttpServletRequest request) throws IOException {
    String requestBody = request.getReader().lines().collect(Collectors.joining("\n"));
    JsonElement jsonRoot = JsonParser.parseString(requestBody).getAsJsonObject();
    String messageStr = jsonRoot.getAsJsonObject().get("message").toString();
    Message message = gson.fromJson(messageStr, Message.class);
    // decode from base64
    String decoded = decode(message.getData());
    message.setData(decoded);
    return message;
  }

  private String decode(String data) {
    return new String(Base64.getDecoder().decode(data));
  }

  private final Gson gson = new Gson();
  private MessageRepository messageRepository;

  PubSubPush(MessageRepository messageRepository) {
    this.messageRepository = messageRepository;
  }

  public PubSubPush() {
    this.messageRepository = MessageRepositoryImpl.getInstance();
  }
}
The `PubSubPublish` servlet interacts with the App Engine web app to publish new messages and display received messages:
/*
 * Copyright 2017 Google Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.example.appengine.pubsub;

import com.google.cloud.ServiceOptions;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;
import java.io.IOException;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.http.HttpStatus;

@WebServlet(name = "Publish with PubSub", value = "/pubsub/publish")
public class PubSubPublish extends HttpServlet {

  @Override
  public void doPost(HttpServletRequest req, HttpServletResponse resp)
      throws IOException, ServletException {
    Publisher publisher = this.publisher;
    try {
      String topicId = System.getenv("PUBSUB_TOPIC");
      // create a publisher on the topic
      if (publisher == null) {
        ProjectTopicName topicName =
            ProjectTopicName.newBuilder()
                .setProject(ServiceOptions.getDefaultProjectId())
                .setTopic(topicId)
                .build();
        publisher = Publisher.newBuilder(topicName).build();
      }
      // construct a pubsub message from the payload
      final String payload = req.getParameter("payload");
      PubsubMessage pubsubMessage =
          PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8(payload)).build();

      publisher.publish(pubsubMessage);
      // redirect to home page
      resp.sendRedirect("/");
    } catch (Exception e) {
      resp.sendError(HttpStatus.SC_INTERNAL_SERVER_ERROR, e.getMessage());
    }
  }

  private Publisher publisher;

  public PubSubPublish() {}

  PubSubPublish(Publisher publisher) {
    this.publisher = publisher;
  }
}
## Running the sample locally {: #run_the_sample_locally} When running locally, you can use the Google Cloud CLI to provide authentication to use Google Cloud APIs. Assuming you set up your environment as described in [Prerequisites](#prerequisites), you have already run the `gcloud init` command, which provides this authentication. mvn clean package Then set environment variables before starting your application: export PUBSUB_VERIFICATION_TOKEN=[your-verification-token] export PUBSUB_TOPIC=[your-topic] mvn appengine:run ### Simulating push notifications {: simulate_push_notifications} The application can send messages locally, but it is not able to receive push messages locally. You can, however, simulate a push message by making an HTTP request to the local push notification endpoint. The sample includes the file `sample_message.json`. You can use `curl` or a [`httpie`](https://github.com/jkbrzt/httpie){: class="external"} client to send an HTTP `POST` request: curl -H "Content-Type: application/json" -i --data @sample_message.json "localhost:8080/pubsub/push?token=[your-token]" Or http POST ":8080/pubsub/push?token=[your-token]" < sample_message.json Response: HTTP/1.1 200 OK Date: Wed, 26 Apr 2017 00:03:28 GMT Content-Length: 0 Server: Jetty(9.3.8.v20160314) After the request completes, you can refresh `localhost:8080` and see the message in the list of received messages. ## Running on App Engine {: #run_on_app_engine} To deploy the demo app to App Engine by using the `gcloud` command-line tool, you run the following command from the directory where your `pom.xml` is located:
mvn package appengine:deploy -Dapp.deploy.projectId=PROJECT_ID

Replace PROJECT_ID with the ID of your Google Cloud project. If your pom.xml file already specifies your project ID, you don't need to include the -Dapp.deploy.projectId property in the command you run.

You can now access the application at https://PROJECT_ID.REGION_ID.r.appspot.com. You can use the form to submit messages, but there's no guarantee of which instance of your application will receive the notification. You can send multiple messages and refresh the page to see the received message.