将 Pub/Sub 与 Cloud Run for Anthos on Google Cloud 搭配使用

本教程介绍如何通过 Pub/Sub 推送订阅编写、部署和调用 Cloud Run for Anthos on Google Cloud 服务。

目标

  • 编写、构建服务并将其部署到 Cloud Run for Anthos on Google Cloud
  • 通过向 Pub/Sub 主题发布消息来调用该服务。

费用

本教程使用 Google Cloud 的以下收费组件:

请使用价格计算器根据您的预计用量来估算费用。

Google Cloud 新用户可能有资格申请免费试用

准备工作

  1. 登录您的 Google 帐号。

    如果您还没有 Google 帐号,请注册新帐号

  2. 在 Google Cloud Console 的项目选择器页面上,选择或创建一个 Google Cloud 项目。

    转到项目选择器页面

  3. 确保您的 Cloud 项目已启用结算功能。 了解如何确认您的项目是否已启用结算功能

  4. 启用 Cloud Run for Anthos on Google Cloud API
  5. 安装并初始化 Cloud SDK。
  6. 安装 kubectl 组件:
    gcloud components install kubectl
  7. 安装 beta 组件:
    gcloud components install beta
  8. 更新组件:
    gcloud components update
  9. 如果您使用的是 Cloud Run for Anthos on Google Cloud,请按照设置 Cloud Run for Anthos on Google Cloud 中的说明创建新集群。

设置 gcloud 默认值

如需为您的 Cloud Run for Anthos on Google Cloud 服务配置 gcloud 默认值,请执行以下操作:

  1. 设置默认项目:

    gcloud config set project PROJECT_ID

    PROJECT_ID 替换为您在本教程中使用的项目名称。

  2. 为您的集群配置 gcloud:

    gcloud config set kuberun/cluster CLUSTER-NAME
    gcloud config set kuberun/cluster_location REGION

    替换

    • CLUSTER-NAME 替换为您使用的集群名称。
    • REGION 替换为您选择的受支持的集群位置。

创建 Pub/Sub 主题

示例服务由发布到 Pub/Sub 主题的消息触发,因此您需要在 Pub/Sub 中创建主题。

如需创建新的 Pub/Sub 主题,请使用以下命令:

gcloud pubsub topics create myRunTopic

您可以使用 myRunTopic 或将其替换为 Cloud 项目中唯一的主题名称。

检索代码示例

如需检索可用的代码示例,请执行以下操作:

  1. 将示例应用代码库克隆到本地机器:

    Node.js

    git clone https://github.com/GoogleCloudPlatform/nodejs-docs-samples.git

    或者,您也可以下载该示例的 zip 文件并将其解压缩。

    Python

    git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git

    或者,您也可以下载该示例的 zip 文件并将其解压缩。

    Go

    git clone https://github.com/GoogleCloudPlatform/golang-samples.git

    或者,您也可以下载该示例的 zip 文件并将其解压缩。

    Java

    git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git

    或者,您也可以下载该示例的 zip 文件并将其解压缩。

  2. 转到包含 Cloud Run for Anthos on Google Cloud 示例代码的目录:

    Node.js

    cd nodejs-docs-samples/kuberun/pubsub/

    Python

    cd python-docs-samples/kuberun/pubsub/

    Go

    cd golang-samples/kuberun/pubsub/

    Java

    cd java-docs-samples/kuberun/pubsub/

查看代码

本教程中使用的代码包含以下部分:

  • 处理传入请求的服务器。

    Node.js

    为了便于测试 Node.js 服务,服务器配置与服务器启动是相互独立的。

    Node.js Web 服务器在 app.js 中设置。

    const express = require('express');
    const bodyParser = require('body-parser');
    const app = express();
    
    app.use(bodyParser.json());
    Web 服务器在 index.js 中启动:
    const app = require('./app.js');
    const PORT = process.env.PORT || 8080;
    
    app.listen(PORT, () =>
      console.log(`nodejs-pubsub-tutorial listening on port ${PORT}`)
    );

    Python

    import base64
    import os
    
    from flask import Flask, request
    
    app = Flask(__name__)

    Go

    
    // Sample pubsub is a KubeRun service which handles Pub/Sub messages.
    package main
    
    import (
    	"encoding/json"
    	"io/ioutil"
    	"log"
    	"net/http"
    	"os"
    )
    
    func main() {
    	http.HandleFunc("/", HelloPubSub)
    	// Determine port for HTTP service.
    	port := os.Getenv("PORT")
    	if port == "" {
    		port = "8080"
    		log.Printf("Defaulting to port %s", port)
    	}
    	// Start HTTP server.
    	log.Printf("Listening on port %s", port)
    	if err := http.ListenAndServe(":"+port, nil); err != nil {
    		log.Fatal(err)
    	}
    }
    

    Java

    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    @SpringBootApplication
    public class PubSubApplication {
      public static void main(String[] args) {
        SpringApplication.run(PubSubApplication.class, args);
      }
    }

  • 用于处理 Pub/Sub 消息并记录问候语的处理程序。

    Node.js

    app.post('/', (req, res) => {
      if (!req.body) {
        const msg = 'no Pub/Sub message received';
        console.error(`error: ${msg}`);
        res.status(400).send(`Bad Request: ${msg}`);
        return;
      }
      if (!req.body.message) {
        const msg = 'invalid Pub/Sub message format';
        console.error(`error: ${msg}`);
        res.status(400).send(`Bad Request: ${msg}`);
        return;
      }
    
      const pubSubMessage = req.body.message;
      const name = pubSubMessage.data
        ? Buffer.from(pubSubMessage.data, 'base64').toString().trim()
        : 'World';
    
      console.log(`Hello ${name}!`);
      res.status(204).send();
    });

    Python

    @app.route("/", methods=["POST"])
    def index():
        envelope = request.get_json()
        if not envelope:
            msg = "no Pub/Sub message received"
            print(f"error: {msg}")
            return f"Bad Request: {msg}", 400
    
        if not isinstance(envelope, dict) or "message" not in envelope:
            msg = "invalid Pub/Sub message format"
            print(f"error: {msg}")
            return f"Bad Request: {msg}", 400
    
        pubsub_message = envelope["message"]
    
        name = "World"
        if isinstance(pubsub_message, dict) and "data" in pubsub_message:
            name = base64.b64decode(pubsub_message["data"]).decode("utf-8").strip()
    
        print(f"Hello {name}!")
    
        return ("", 204)

    Go

    
    // PubSubMessage is the payload of a Pub/Sub event.
    type PubSubMessage struct {
    	Message struct {
    		Data []byte `json:"data,omitempty"`
    		ID   string `json:"id"`
    	} `json:"message"`
    	Subscription string `json:"subscription"`
    }
    
    // HelloPubSub receives and processes a Pub/Sub push message.
    func HelloPubSub(w http.ResponseWriter, r *http.Request) {
    	var m PubSubMessage
    	body, err := ioutil.ReadAll(r.Body)
    	if err != nil {
    		log.Printf("ioutil.ReadAll: %v", err)
    		http.Error(w, "Bad Request", http.StatusBadRequest)
    		return
    	}
    	if err := json.Unmarshal(body, &m); err != nil {
    		log.Printf("json.Unmarshal: %v", err)
    		http.Error(w, "Bad Request", http.StatusBadRequest)
    		return
    	}
    
    	name := string(m.Message.Data)
    	if name == "" {
    		name = "World"
    	}
    	log.Printf("Hello %s!", name)
    }
    

    Java

    import java.util.Base64;
    import org.apache.commons.lang3.StringUtils;
    import org.springframework.http.HttpStatus;
    import org.springframework.http.ResponseEntity;
    import org.springframework.web.bind.annotation.RequestBody;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RequestMethod;
    import org.springframework.web.bind.annotation.RestController;
    
    // PubsubController consumes a Pub/Sub message.
    @RestController
    public class PubSubController {
      @RequestMapping(value = "/", method = RequestMethod.POST)
      public ResponseEntity receiveMessage(@RequestBody Body body) {
        // Get PubSub message from request body.
        Body.Message message = body.getMessage();
        if (message == null) {
          String msg = "Bad Request: invalid Pub/Sub message format";
          System.out.println(msg);
          return new ResponseEntity(msg, HttpStatus.BAD_REQUEST);
        }
    
        String data = message.getData();
        String target =
            !StringUtils.isEmpty(data) ? new String(Base64.getDecoder().decode(data)) : "World";
        String msg = "Hello " + target + "!";
    
        System.out.println(msg);
        return new ResponseEntity(msg, HttpStatus.OK);
      }
    }

    您必须对该服务进行编码,使其返回准确的 HTTP 响应代码。如果返回成功代码(例如 HTTP 200204),则指示 Pub/Sub 消息处理已完成。如果返回错误代码(如 HTTP 400500),则指示将重试该消息(请参阅“通过推送方式接收消息”指南

  • 用于定义服务的操作环境的 DockerfileDockerfile 的内容因语言而异。

    Node.js

    
    # Use the official lightweight Node.js 10 image.
    # https://hub.docker.com/_/node
    FROM node:12-slim
    
    # Create and change to the app directory.
    WORKDIR /usr/src/app
    
    # Copy application dependency manifests to the container image.
    # A wildcard is used to ensure both package.json AND package-lock.json are copied.
    # Copying this separately prevents re-running npm install on every code change.
    COPY package*.json ./
    
    # Install dependencies.
    # If you add a package-lock.json speed your build by switching to 'npm ci'.
    # RUN npm ci --only=production
    RUN npm install --production
    
    # Copy local code to the container image.
    COPY . .
    
    # Run the web service on container startup.
    CMD [ "npm", "start" ]
    

    Python

    
    # Use the official Python image.
    # https://hub.docker.com/_/python
    FROM python:3.9
    
    # Allow statements and log messages to immediately appear in the KubeRun logs
    ENV PYTHONUNBUFFERED True
    
    # Copy application dependency manifests to the container image.
    # Copying this separately prevents re-running pip install on every code change.
    COPY requirements.txt ./
    
    # Install production dependencies.
    RUN pip install -r requirements.txt
    
    # Copy local code to the container image.
    ENV APP_HOME /app
    WORKDIR $APP_HOME
    COPY . ./
    
    # Run the web service on container startup.
    # Use gunicorn webserver with one worker process and 8 threads.
    # For environments with multiple CPU cores, increase the number of workers
    # to be equal to the cores available.
    CMD exec gunicorn --bind :$PORT --workers 1 --threads 8 --timeout 0 main:app
    

    Go

    
    # Use the offical golang image to create a binary.
    # This is based on Debian and sets the GOPATH to /go.
    # https://hub.docker.com/_/golang
    FROM golang:1.15-buster as builder
    
    # Create and change to the app directory.
    WORKDIR /app
    
    # Retrieve application dependencies.
    # This allows the container build to reuse cached dependencies.
    # Expecting to copy go.mod and if present go.sum.
    COPY go.* ./
    RUN go mod download
    
    # Copy local code to the container image.
    COPY . ./
    
    # Build the binary.
    RUN go build -mod=readonly -v -o server
    
    # Use the official Debian slim image for a lean production container.
    # https://hub.docker.com/_/debian
    # https://docs.docker.com/develop/develop-images/multistage-build/#use-multi-stage-builds
    FROM debian:buster-slim
    RUN set -x && apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install -y \
        ca-certificates && \
        rm -rf /var/lib/apt/lists/*
    
    # Copy the binary to the production image from the builder stage.
    COPY --from=builder /app/server /server
    
    # Run the web service on container startup.
    CMD ["/server"]
    

    Java

    此示例使用 Jib 利用常见 Java 工具构建 Docker 映像。无需编写 Dockerfile 或安装 Docker,Jib 便可以优化容器构建。详细了解如何使用 Jib 构建 Java 容器
    <plugin>
      <groupId>com.google.cloud.tools</groupId>
      <artifactId>jib-maven-plugin</artifactId>
      <version>2.7.0</version>
      <configuration>
        <to>
          <image>gcr.io/PROJECT_ID/pubsub</image>
        </to>
      </configuration>
    </plugin>
    

如需详细了解如何对 Pub/Sub 请求的来源进行身份验证,请参阅下文中的与 Pub/Sub 集成部分。

交付代码

交付代码包括三个步骤:使用 Cloud Build 构建容器映像、将容器映像上传到 Container Registry,以及将容器映像部署到 Cloud Run for Anthos on Google Cloud。

要开发代码,请执行以下操作:

  1. 构建容器并将其发布到 Container Registry 上:

    Node.js

    gcloud builds submit --tag gcr.io/PROJECT_ID/pubsub

    其中 PROJECT_ID 是您的 Cloud 项目 ID,pubsub 是您要为服务指定的名称。

    成功完成后,您应该会看到一条包含 ID、创建时间和映像名称的 SUCCESS 消息。该映像存储在 Container Registry 中,并可根据需要重复使用。

    Python

    gcloud builds submit --tag gcr.io/PROJECT_ID/pubsub

    其中 PROJECT_ID 是您的 Cloud 项目 ID,pubsub 是您要为服务指定的名称。

    成功完成后,您应该会看到一条包含 ID、创建时间和映像名称的 SUCCESS 消息。该映像存储在 Container Registry 中,并可根据需要重复使用。

    Go

    gcloud builds submit --tag gcr.io/PROJECT_ID/pubsub

    其中 PROJECT_ID 是您的 Cloud 项目 ID,pubsub 是您要为服务指定的名称。

    成功完成后,您应该会看到一条包含 ID、创建时间和映像名称的 SUCCESS 消息。该映像存储在 Container Registry 中,并可根据需要重复使用。

    Java

    mvn compile jib:build -Dimage=gcr.io/PROJECT_ID/pubsub

    其中 PROJECT_ID 是您的 Cloud 项目 ID,pubsub 是您要为服务指定的名称。

    成功后,您会看到一条 BUILD SUCCESS 消息。该映像存储在 Container Registry 中,并可根据需要重复使用。

  2. 运行以下命令来部署您的应用:

    gcloud kuberun core services create pubsub-tutorial --image gcr.io/PROJECT_ID/pubsub

    PROJECT_ID 替换为您的 Cloud 项目 ID。 pubsub 为容器名称,pubsub-tutorial 为服务名称。请注意,容器映像已部署到您之前在设置 gcloud 中配置的服务和集群

    等待部署完成,这可能需要半分钟左右的时间。 成功完成时,命令行会显示服务网址。此网址用于配置 Pub/Sub 订阅。

  3. 如果要将代码更新部署到该服务,请重复执行上述步骤。向服务执行的每次部署都会创建一个新的修订版本,该修订版本准备就绪后会自动开始处理流量。

与 Pub/Sub 集成

部署完 Cloud Run for Anthos on Google Cloud 服务后,我们将配置 Pub/Sub 以向其推送消息。

如需将该服务与 Pub/Sub 集成,请执行以下操作:

  1. 允许 Pub/Sub 在您的项目中创建身份验证令牌:

    gcloud projects add-iam-policy-binding PROJECT_ID \
         --member=serviceAccount:service-PROJECT-NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com \
         --role=roles/iam.serviceAccountTokenCreator

    替换

    • PROJECT_ID 替换为您的 Cloud 项目 ID。
    • PROJECT-NUMBER 替换为您的 Cloud 项目编号。
  2. 创建或选择一个服务帐号,用于表示 Pub/Sub 订阅身份。

    gcloud iam service-accounts create kuberun-pubsub-invoker \
         --display-name "KubeRun Pub/Sub Invoker"

    您可以使用 kuberun-pubsub-invoker 或将其替换为 Cloud 项目中唯一的名称。

  3. 使用该服务帐号创建 Pub/Sub 订阅:

    1. 为您的集群启用自动 TLS 和 HTTPS 并向您的服务添加网域映射

    2. 为 Pub/Sub 注册网域所有权

    3. 添加代码以验证附加到 Pub/Sub 消息的身份验证令牌。 示例代码在使用推送端点执行身份验证和授权中提供。

      身份验证必须确保令牌有效且与预期的服务帐号相关联。与 Cloud Run(全托管式)不同,Cloud Run for Anthos on Google Cloud 不会针对令牌是否有效或服务帐号是否有权调用 Cloud Run for Anthos on Google Cloud 服务进行内置授权检查。

    4. 使用该服务帐号创建 Pub/Sub 订阅:

      gcloud pubsub subscriptions create myRunSubscription --topic myRunTopic \
           --push-endpoint=SERVICE-URL/ \
           --push-auth-service-account=kuberun-pubsub-invoker@PROJECT_ID.iam.gserviceaccount.com

      替换

      • myRunTopic 替换为之前创建的主题。
      • SERVICE-URL 替换为您的自定义服务网址。 指定 https 作为协议。
      • PROJECT_ID 替换为您的 Cloud 项目 ID。

      --push-auth-service-account 标志激活 Pub/Sub 推送功能,以进行身份验证和授权

您的服务现已与 Pub/Sub 全面集成。

测试

如需测试端到端解决方案,请按如下所述操作:

  1. 向该主题发送一条 Pub/Sub 消息:

    gcloud pubsub topics publish myRunTopic --message "Runner"

    除了使用命令行(如本教程中所示),您还可以编程方式发布消息。如需了解详情,请参阅发布消息

  2. 导航到服务日志:

    1. 导航到 Google Cloud Console
    2. 点击 pubsub-tutorial 服务。
    3. 选择日志标签页。

      您可能需要等待一些时间才能看到日志。如果您没有立即看到日志,请稍等片刻再检查一次。

  3. 查找“Hello Runner!”消息。

清理

如需浏览有关如何将 Cloud Run for Anthos on Google Cloud 与 Pub/Sub 搭配使用的更深层用例,请暂时跳过清理,继续阅读图片处理教程。

如果您为本教程创建了一个新项目,请删除项目。 如果您使用的是现有项目,希望保留此项目且不保留本教程中添加的任何更改,请删除为教程创建的资源

删除项目

为了避免产生费用,最简单的方法是删除您为本教程创建的项目。

如需删除项目,请执行以下操作:

  1. 在 Cloud Console 中,转到管理资源页面。

    转到“管理资源”

  2. 在项目列表中,选择要删除的项目,然后点击删除
  3. 在对话框中输入项目 ID,然后点击关闭以删除项目。

删除教程资源

  1. 删除您在本教程中部署的 Cloud Run for Anthos on Google Cloud 服务:

    gcloud kuberun core services delete SERVICE-NAME

    其中,SERVICE-NAME 是您选择的服务名称。

    您也可以通过 Google Cloud Console 删除 Cloud Run for Anthos on Google Cloud 服务。

  2. 移除您在教程设置过程中添加的 gcloud 默认配置:

     gcloud config unset kuberun/cluster
     gcloud config unset kuberun/cluster_location
    
  3. 移除项目配置:

     gcloud config unset project
    
  4. 删除在本教程中创建的其他 Google Cloud 资源:

后续步骤