編寫及回應 Pub/Sub 訊息

區域 ID

REGION_ID 是 Google 根據您在建立應用程式時選取的地區所指派的縮寫代碼。此代碼不對應至國家/地區或省份,即使部分區域 ID 可能與常用的國家/地區和省份代碼相似。如果是 2020 年 2 月後建立的應用程式,App Engine 網址會包含 REGION_ID.r。如果是這段時間前建立的現有應用程式,網址可選擇是否包含地區 ID。

進一步瞭解區域 ID

Pub/Sub 在應用程式之間提供可靠的多對多非同步訊息傳遞功能。發布者應用程式能將訊息傳送到「主題」,而其他應用程式只要訂閱該主題後就能收到訊息。

本文說明如何使用 Cloud 用戶端程式庫,在 Java 8 應用程式中傳送及接收 Pub/Sub 訊息。

必要條件

  • 請按照 App Engine 上 Java 8 適用的「Hello, World!」操作說明來設定您的環境和專案,並瞭解如何在 App Engine 中建構 Java 8 應用程式。
  • 請記下並儲存您的專案 ID,您需要這項資訊才能執行本文所述的範例應用程式。

複製範例應用程式

將範例應用程式複製到您的本機電腦,然後前往 pubsub 目錄:

git clone https://github.com/GoogleCloudPlatform/java-docs-samples
cd java-docs-samples/appengine-java8/pubsub

建立主題和訂閱

建立主題和訂閱,其中包括指定 Pub/Sub 伺服器傳送要求的目的地端點:

 bv
# Configure the topic
gcloud pubsub topics create YOUR_TOPIC_NAME

# Configure the push subscription
gcloud pubsub subscriptions create YOUR_SUBSCRIPTION_NAME \
    --topic=YOUR_TOPIC_NAME \
    --push-endpoint=https://YOUR_PROJECT_ID.REGION_ID.r.appspot.com/push-handlers/receive_messages?token=YOUR_TOKEN \
    --ack-deadline=10

以私密的隨機憑證取代 YOUR_TOKEN。推送端點會使用這個憑證來驗證要求。

如要搭配驗證機制使用 Pub/Sub,請建立另一個訂閱項目:

# Configure the push subscription
gcloud pubsub subscriptions create YOUR_SUBSCRIPTION_NAME \
    --topic=YOUR_TOPIC_NAME \
    --push-auth-service-account=YOUR-SERVICE-ACCOUNT-EMAIL\
    --push-auth-token-audience=OPTIONAL_AUDIENCE_OVERRIDE\
    --push-endpoint=https://YOUR_PROJECT_ID.REGION_ID.r.appspot.com/push-handlers/receive_messages?token=YOUR_TOKEN \
    --ack-deadline=10
# Your service agent
# `service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com` needs to have the
# `iam.serviceAccountTokenCreator` role.
PUBSUB_SERVICE_ACCOUNT="service-${PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com"
gcloud projects add-iam-policy-binding ${PROJECT_ID} \
    --member="serviceAccount:${PUBSUB_SERVICE_ACCOUNT}"\
    --role='roles/iam.serviceAccountTokenCreator'

YOUR-SERVICE-ACCOUNT-EMAIL 替換為您的服務帳戶電子郵件地址。

設定環境變數

編輯 appengine-web.xml 檔案,為您的主題和驗證憑證設定環境變數:

<appengine-web-app xmlns="http://appengine.google.com/ns/1.0">
  <threadsafe>true</threadsafe>
  <runtime>java8</runtime>

  <env-variables>
    <env-var name="PUBSUB_TOPIC" value="your-topic" />
    <env-var name="PUBSUB_VERIFICATION_TOKEN" value="your-verification-token" />
  </env-variables>
</appengine-web-app>

審查程式碼

範例應用程式使用 Cloud 用戶端程式庫

範例應用程式會使用您在 appengine-web.xml 檔案中指定的值來設定環境變數。推送要求處理常式會使用這些值,確認要求是來自 Pub/Sub 和可信任的來源:

String pubsubVerificationToken = System.getenv("PUBSUB_VERIFICATION_TOKEN");

範例應用程式會維護一個 Cloud Datastore 資料庫執行個體,以便儲存訊息。 PubSubPush Servlet 可接收已推送的訊息,並將訊息新增至 messageRepository 資料庫執行個體:

@WebServlet(value = "/pubsub/push")
public class PubSubPush extends HttpServlet {

  @Override
  public void doPost(HttpServletRequest req, HttpServletResponse resp)
      throws IOException, ServletException {
    String pubsubVerificationToken = System.getenv("PUBSUB_VERIFICATION_TOKEN");
    // Do not process message if request token does not match pubsubVerificationToken
    if (req.getParameter("token").compareTo(pubsubVerificationToken) != 0) {
      resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
      return;
    }
    // parse message object from "message" field in the request body json
    // decode message data from base64
    Message message = getMessage(req);
    try {
      messageRepository.save(message);
      // 200, 201, 204, 102 status codes are interpreted as success by the Pub/Sub system
      resp.setStatus(102);
    } catch (Exception e) {
      resp.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
    }
  }

  private Message getMessage(HttpServletRequest request) throws IOException {
    String requestBody = request.getReader().lines().collect(Collectors.joining("\n"));
    JsonElement jsonRoot = JsonParser.parseString(requestBody).getAsJsonObject();
    String messageStr = jsonRoot.getAsJsonObject().get("message").toString();
    Message message = gson.fromJson(messageStr, Message.class);
    // decode from base64
    String decoded = decode(message.getData());
    message.setData(decoded);
    return message;
  }

  private String decode(String data) {
    return new String(Base64.getDecoder().decode(data));
  }

  private final Gson gson = new Gson();
  private MessageRepository messageRepository;

  PubSubPush(MessageRepository messageRepository) {
    this.messageRepository = messageRepository;
  }

  public PubSubPush() {
    this.messageRepository = MessageRepositoryImpl.getInstance();
  }
}

PubSubPublish Servlet 會與 App Engine 網頁應用程式互動,以發布新訊息並顯示收到的訊息:

/*
 * Copyright 2017 Google Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.example.appengine.pubsub;

import com.google.cloud.ServiceOptions;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;
import java.io.IOException;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.http.HttpStatus;

@WebServlet(name = "Publish with PubSub", value = "/pubsub/publish")
public class PubSubPublish extends HttpServlet {

  @Override
  public void doPost(HttpServletRequest req, HttpServletResponse resp)
      throws IOException, ServletException {
    Publisher publisher = this.publisher;
    try {
      String topicId = System.getenv("PUBSUB_TOPIC");
      // create a publisher on the topic
      if (publisher == null) {
        ProjectTopicName topicName =
            ProjectTopicName.newBuilder()
                .setProject(ServiceOptions.getDefaultProjectId())
                .setTopic(topicId)
                .build();
        publisher = Publisher.newBuilder(topicName).build();
      }
      // construct a pubsub message from the payload
      final String payload = req.getParameter("payload");
      PubsubMessage pubsubMessage =
          PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8(payload)).build();

      publisher.publish(pubsubMessage);
      // redirect to home page
      resp.sendRedirect("/");
    } catch (Exception e) {
      resp.sendError(HttpStatus.SC_INTERNAL_SERVER_ERROR, e.getMessage());
    }
  }

  private Publisher publisher;

  public PubSubPublish() {}

  PubSubPublish(Publisher publisher) {
    this.publisher = publisher;
  }
}

在本機執行範例

在本機執行應用程式範例時,您可以藉由 Google Cloud CLI 提供驗證以使用 Google Cloud API。假設您是根據必備條件一節中的指示來設定環境,那麼您應該已經執行可提供這項驗證的 gcloud init 指令。

mvn clean package

接著設定環境變數,然後啟動應用程式:

export PUBSUB_VERIFICATION_TOKEN=[your-verification-token]
export PUBSUB_TOPIC=[your-topic]
mvn appengine:run

模擬推播通知

應用程式可在本機傳送訊息,但是無法在本機接收推送訊息。不過您可以向本機的推送通知端點提出 HTTP 要求,就能模擬推送訊息。範例包含 sample_message.json 檔案。

您可以使用 curlhttpie 用戶端傳送 HTTP POST 要求:

curl -H "Content-Type: application/json" -i --data @sample_message.json "localhost:8080/pubsub/push?token=[your-token]"

http POST ":8080/pubsub/push?token=[your-token]" < sample_message.json

回應:

HTTP/1.1 200 OK
Date: Wed, 26 Apr 2017 00:03:28 GMT
Content-Length: 0
Server: Jetty(9.3.8.v20160314)

完成要求後,您可以重新整理 localhost:8080,並在收到的訊息的清單中查看訊息。

在 App Engine 上執行

如要使用 gcloud 指令列工具將範例應用程式部署至 App Engine,請從 pom.xml 所在的目錄執行下列指令:

mvn package appengine:deploy -Dapp.deploy.projectId=PROJECT_ID

PROJECT_ID 替換為專案 ID。 Google Cloud 如果 pom.xml 檔案已指定專案 ID,則不需要在執行的指令中加入 -Dapp.deploy.projectId 屬性。

您現在可以前往 https://PROJECT_ID.REGION_ID.r.appspot.com 存取應用程式。且可使用表單提交訊息,但系統無法保證應用程式的哪個執行個體會收到通知。此外,您也可以傳送多則訊息、重新整理頁面以查看收到的訊息。