编写和响应 Pub/Sub 消息

区域 ID

REGION_ID 是 Google 根据您在创建应用时选择的区域分配的缩写代码。此代码不对应于国家/地区或省,尽管某些区域 ID 可能类似于常用国家/地区代码和省代码。对于 2020 年 2 月以后创建的应用,REGION_ID.r 包含在 App Engine 网址中。对于在此日期之前创建的现有应用,网址中的区域 ID 是可选的。

详细了解区域 ID

Pub/Sub 可在应用之间提供可靠的多对多异步消息传递服务。发布者应用可以向某一主题发送消息,其他应用可以订阅该主题以接收消息。

本文档介绍如何使用 Cloud 客户端库在 Java 8 应用中发送和接收 Pub/Sub 消息。

前提条件

  • 按照 App Engine Java 8 版“Hello, World!”中的说明设置您的环境和项目,并了解如何设计 App Engine Java 8 应用的结构。
  • 记录并保存项目 ID,因为您需要用它来运行本文档中介绍的示例应用。

克隆示例应用

将示例应用复制到本地机器,然后导航到 pubsub 目录:

git clone https://github.com/GoogleCloudPlatform/java-docs-samples
cd java-docs-samples/appengine-java8/pubsub

创建主题和订阅

创建主题和订阅,并指定 Pub/Sub 服务器应该向其发送请求的端点:

 bv
# Configure the topic
gcloud pubsub topics create YOUR_TOPIC_NAME

# Configure the push subscription
gcloud pubsub subscriptions create YOUR_SUBSCRIPTION_NAME \
    --topic=YOUR_TOPIC_NAME \
    --push-endpoint=https://YOUR_PROJECT_ID.REGION_ID.r.appspot.com/push-handlers/receive_messages?token=YOUR_TOKEN \
    --ack-deadline=10

用随机密钥令牌替换 YOUR_TOKEN。推送端点使用它来验证请求。

如需将 Pub/Sub 与身份验证搭配使用,请创建另一个订阅:

# Configure the push subscription
gcloud pubsub subscriptions create YOUR_SUBSCRIPTION_NAME \
    --topic=YOUR_TOPIC_NAME \
    --push-auth-service-account=YOUR-SERVICE-ACCOUNT-EMAIL\
    --push-auth-token-audience=OPTIONAL_AUDIENCE_OVERRIDE\
    --push-endpoint=https://YOUR_PROJECT_ID.REGION_ID.r.appspot.com/push-handlers/receive_messages?token=YOUR_TOKEN \
    --ack-deadline=10

# Your Google-managed service account
# `service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com` needs to have the
# `iam.serviceAccountTokenCreator` role.
PUBSUB_SERVICE_ACCOUNT="service-${PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com"
gcloud projects add-iam-policy-binding ${PROJECT_ID} \
    --member="serviceAccount:${PUBSUB_SERVICE_ACCOUNT}"\
    --role='roles/iam.serviceAccountTokenCreator'
将“YOUR-SERVICE-ACCOUNT-EMAIL”替换为您的[服务账号](/appengine/docs/flexible/configure-service-accounts) 电子邮件地址。 ### 设置环境变量 {: edit_appyaml} 修改“appengine-web.xml”文件,为您的主题和验证令牌设置环境变量:
<appengine-web-app xmlns="http://appengine.google.com/ns/1.0">
  <threadsafe>true</threadsafe>
  <runtime>java8</runtime>

  <env-variables>
    <env-var name="PUBSUB_TOPIC" value="your-topic" />
    <env-var name="PUBSUB_VERIFICATION_TOKEN" value="your-verification-token" />
  </env-variables>
</appengine-web-app>
## 代码审核 示例应用使用 [Cloud 客户端库](https://googleapis.dev/java/google-cloud-clients/latest/index.html){: class="external"}。 示例应用使用您在“appengine-web.xml”文件中设置的值来配置环境变量。推送请求处理程序使用这些值来确认请求来自 Pub/Sub 且来源可信: String pubsubVerificationToken = System.getenv("PUBSUB_VERIFICATION_TOKEN"); 示例应用会维护一个 Cloud Datastore 数据库实例来存储消息。 “PubSubPush”Servlet 用于接收推送的消息并将其添加到“messageRepository”数据库实例中:
@WebServlet(value = "/pubsub/push")
public class PubSubPush extends HttpServlet {

  @Override
  public void doPost(HttpServletRequest req, HttpServletResponse resp)
      throws IOException, ServletException {
    String pubsubVerificationToken = System.getenv("PUBSUB_VERIFICATION_TOKEN");
    // Do not process message if request token does not match pubsubVerificationToken
    if (req.getParameter("token").compareTo(pubsubVerificationToken) != 0) {
      resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
      return;
    }
    // parse message object from "message" field in the request body json
    // decode message data from base64
    Message message = getMessage(req);
    try {
      messageRepository.save(message);
      // 200, 201, 204, 102 status codes are interpreted as success by the Pub/Sub system
      resp.setStatus(102);
    } catch (Exception e) {
      resp.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
    }
  }

  private Message getMessage(HttpServletRequest request) throws IOException {
    String requestBody = request.getReader().lines().collect(Collectors.joining("\n"));
    JsonElement jsonRoot = JsonParser.parseString(requestBody).getAsJsonObject();
    String messageStr = jsonRoot.getAsJsonObject().get("message").toString();
    Message message = gson.fromJson(messageStr, Message.class);
    // decode from base64
    String decoded = decode(message.getData());
    message.setData(decoded);
    return message;
  }

  private String decode(String data) {
    return new String(Base64.getDecoder().decode(data));
  }

  private final Gson gson = new Gson();
  private MessageRepository messageRepository;

  PubSubPush(MessageRepository messageRepository) {
    this.messageRepository = messageRepository;
  }

  public PubSubPush() {
    this.messageRepository = MessageRepositoryImpl.getInstance();
  }
}
“PubSubPublish”Servlet 用于与 App Engine Web 应用交互,以发布新消息并显示收到的消息:
/*
 * Copyright 2017 Google Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.example.appengine.pubsub;

import com.google.cloud.ServiceOptions;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;
import java.io.IOException;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.http.HttpStatus;

@WebServlet(name = "Publish with PubSub", value = "/pubsub/publish")
public class PubSubPublish extends HttpServlet {

  @Override
  public void doPost(HttpServletRequest req, HttpServletResponse resp)
      throws IOException, ServletException {
    Publisher publisher = this.publisher;
    try {
      String topicId = System.getenv("PUBSUB_TOPIC");
      // create a publisher on the topic
      if (publisher == null) {
        ProjectTopicName topicName =
            ProjectTopicName.newBuilder()
                .setProject(ServiceOptions.getDefaultProjectId())
                .setTopic(topicId)
                .build();
        publisher = Publisher.newBuilder(topicName).build();
      }
      // construct a pubsub message from the payload
      final String payload = req.getParameter("payload");
      PubsubMessage pubsubMessage =
          PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8(payload)).build();

      publisher.publish(pubsubMessage);
      // redirect to home page
      resp.sendRedirect("/");
    } catch (Exception e) {
      resp.sendError(HttpStatus.SC_INTERNAL_SERVER_ERROR, e.getMessage());
    }
  }

  private Publisher publisher;

  public PubSubPublish() {}

  PubSubPublish(Publisher publisher) {
    this.publisher = publisher;
  }
}
## 在本地运行示例 {: #run_the_sample_locally} 在本地运行时,您可以通过 Google Cloud CLI 提供身份验证以使用 Google Cloud API。如果您按照[前提条件](#prerequisites) 中的说明设置了环境,则表示您已经运行“gcloud init”命令,该命令可提供此身份验证。 mvn clean package 然后在启动应用之前设置环境变量: 导出 PUBSUB_VERIFICATION_TOKEN=[your-verification-token] 导出 PUBSUB_TOPIC=[your-topic] mvn appengine:run ### 模拟推送通知 {: simulate_push_notifications} 应用可以在本地发送消息,但无法在本地接收推送消息。但是,您可以向本地推送通知端点发送 HTTP 请求来模拟推送消息。该示例包含文件“sample_message.json”。 您可以使用“curl”或 [`httpie`](https://github.com/jkbrzt/httpie){: class="external"} 客户端发送 HTTP“POST”请求: curl -H "Content-Type: application/json" -i --data @sample_message.json "localhost:8080/pubsub/push?token=[your-token]" 或者 http POST ":8080/pubsub/push?token=[your-token]" < sample_message.json 响应: HTTP/1.1 200 OK Date: Wed, 26 Apr 2017 00:03:28 GMT Content-Length: 0 Server: Jetty(9.3.8.v20160314) 请求完成后,您可以刷新“localhost:8080”并在收到的消息列表中查看消息。 ## 在 App Engine 上运行 {: #run_on_app_engine} 如需使用“gcloud”命令行工具将演示版应用部署到 App Engine,请从“pom.xml”所在的目录中运行以下命令:
mvn package appengine:deploy -Dapp.deploy.projectId=PROJECT_ID

PROJECT_ID 替换为您的 Google Cloud 项目的 ID。 如果您的 pom.xml 文件已经指定了您的项目 ID,则您无需在运行的命令中添加 -Dapp.deploy.projectId 属性。

您现在可以通过 https://PROJECT_ID.REGION_ID.r.appspot.com 访问该应用。您可以使用表单提交消息,但无法保证您的哪个应用实例会收到通知。您可以发送多条消息并刷新页面,以查看收到的消息。