编写和响应 Pub/Sub 消息

区域 ID

REGION_ID 是 Google 根据您在创建应用时选择的区域分配的缩写代码。此代码不对应于国家/地区或省,尽管某些区域 ID 可能类似于常用国家/地区代码和省代码。对于 2020 年 2 月以后创建的应用,REGION_ID.r 包含在 App Engine 网址中。对于在此日期之前创建的现有应用,网址中的区域 ID 是可选的。

详细了解区域 ID

Pub/Sub 可在应用之间提供可靠的多对多异步消息传递服务。发布者应用可以向某一主题发送消息,其他应用可以订阅该主题以接收消息。

本文档介绍如何使用 Cloud 客户端库在 Java 应用中发送和接收 Pub/Sub 消息。

前提条件

  • 按照 App Engine Java 版“Hello, World!”中的说明设置您的环境和项目,并了解如何设计 App Engine Java 应用的结构。
  • 记录并保存项目 ID,因为您需要用它来运行本文档中介绍的示例应用。

    克隆示例应用

    将示例应用复制到本地机器,然后导航到 pubsub 目录:

    git clone https://github.com/GoogleCloudPlatform/java-docs-samples
    cd java-docs-samples/flexible/pubsub
    

    创建主题和订阅

    创建主题和订阅,并指定 Pub/Sub 服务器应该向其发送请求的端点:

    gcloud pubsub topics create YOUR_TOPIC_NAME
    gcloud pubsub subscriptions create YOUR_SUBSCRIPTION_NAME \
        --topic YOUR_TOPIC_NAME \
        --push-endpoint \
        https://YOUR_PROJECT_ID.REGION_ID.r.appspot.com/pubsub/push?token=YOUR_TOKEN \
        --ack-deadline 10
    

    用随机密钥令牌替换 YOUR_TOKEN。推送端点使用它来验证请求。

    设置环境变量

    修改 app.yaml 文件,以为您的主题和验证令牌设置环境变量:

    env_variables:
      PUBSUB_TOPIC: <your-topic-name>
      # This token is used to verify that requests originate from your
      # application. It can be any sufficiently random string.
      PUBSUB_VERIFICATION_TOKEN: <your-verification-token>

    代码审核

    示例应用使用 Cloud 客户端库

    示例应用使用您在 app.yaml 文件中设置的值来配置环境变量。推送请求处理程序使用这些值来确认请求来自 Pub/Sub 且来源可信:

    String pubsubVerificationToken = System.getenv("PUBSUB_VERIFICATION_TOKEN");
    

    示例应用维护一个 Cloud Datastore 数据库实例以用于存储消息。PubSubPush Servlet 用于接收推送的消息并将其添加到 messageRepository 数据库实例中:

    @WebServlet(value = "/pubsub/push")
    public class PubSubPush extends HttpServlet {
    
      @Override
      public void doPost(HttpServletRequest req, HttpServletResponse resp)
          throws IOException, ServletException {
        String pubsubVerificationToken = System.getenv("PUBSUB_VERIFICATION_TOKEN");
        // Do not process message if request token does not match pubsubVerificationToken
        if (req.getParameter("token").compareTo(pubsubVerificationToken) != 0) {
          resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
          return;
        }
        // parse message object from "message" field in the request body json
        // decode message data from base64
        Message message = getMessage(req);
        try {
          messageRepository.save(message);
          // 200, 201, 204, 102 status codes are interpreted as success by the Pub/Sub system
          resp.setStatus(HttpServletResponse.SC_OK);
        } catch (Exception e) {
          resp.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
        }
      }

    PubSubPublish Servlet 与 App Engine Web 应用交互,以发布新消息并显示收到的消息:

    @WebServlet(name = "Publish with PubSub", value = "/pubsub/publish")
    public class PubSubPublish extends HttpServlet {
    
      @Override
      public void doPost(HttpServletRequest req, HttpServletResponse resp)
          throws IOException, ServletException {
        Publisher publisher = this.publisher;
        try {
          String topicId = System.getenv("PUBSUB_TOPIC");
          // create a publisher on the topic
          if (publisher == null) {
            publisher = Publisher.newBuilder(
                ProjectTopicName.of(ServiceOptions.getDefaultProjectId(), topicId))
                .build();
          }
          // construct a pubsub message from the payload
          final String payload = req.getParameter("payload");
          PubsubMessage pubsubMessage =
              PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8(payload)).build();
    
          publisher.publish(pubsubMessage);
          // redirect to home page
          resp.sendRedirect("/");
        } catch (Exception e) {
          resp.sendError(HttpStatus.SC_INTERNAL_SERVER_ERROR, e.getMessage());
        }
      }

    在本地运行示例

    在本地运行时,您可以借助 Google Cloud CLI 提供身份验证以使用 Google Cloud API。假设您按照前提条件中的说明设置了环境,那么您已经运行了 gcloud init 命令,该命令可提供此身份验证。

    mvn clean package
    

    然后在启动应用之前设置环境变量:

    export PUBSUB_VERIFICATION_TOKEN=[your-verification-token]
    export PUBSUB_TOPIC=[your-topic]
    mvn jetty:run
    

    模拟推送通知

    应用可以在本地发送消息,但无法在本地接收推送消息。但是,您可以向本地推送通知端点发送 HTTP 请求来模拟推送消息。该示例包含文件 sample_message.json

    您可以使用 curlhttpie 客户端发送 HTTP POST 请求:

    curl -H "Content-Type: application/json" -i --data @sample_message.json "localhost:8080/pubsub/push?token=[your-token]"
    

    http POST ":8080/pubsub/push?token=[your-token]" < sample_message.json
    

    响应:

    HTTP/1.1 200 OK
    Date: Wed, 26 Apr 2017 00:03:28 GMT
    Content-Length: 0
    Server: Jetty(9.3.8.v20160314)
    

    请求完成后,您可以刷新 localhost:8080 并在收到的消息列表中查看该消息。

    在 App Engine 上运行

    如需使用 gcloud 命令行工具将演示应用部署到 App Engine,请从 app.yaml 文件所在的目录运行以下命令:

    mvn package appengine:deploy -Dapp.deploy.projectId=PROJECT_ID

    PROJECT_ID 替换为您的 Cloud 项目 ID。如果您的 pom.xml 文件已经指定了您的项目 ID,则您无需在运行的命令中添加 -Dapp.deploy.projectId 属性。

    您现在可以通过 https://PROJECT_ID.REGION_ID.r.appspot.com 访问该应用。您可以使用表单提交消息,但无法保证您的哪个应用实例会收到通知。您可以发送多条消息并刷新页面,以查看收到的消息。