将 Pub/Sub 与 Cloud Run for Anthos 搭配使用


本教程介绍如何通过 Pub/Sub 推送订阅编写、部署和调用 Cloud Run 服务。

目标

  • 编写、构建服务并将其部署到 Cloud Run for Anthos
  • 通过向 Pub/Sub 主题发布消息来调用该服务。

费用

在本文档中,您将使用 Google Cloud 的以下收费组件:

您可使用价格计算器根据您的预计使用情况来估算费用。 Google Cloud 新用户可能有资格申请免费试用

准备工作

  1. 登录您的 Google Cloud 账号。如果您是 Google Cloud 新手,请创建一个账号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. 确保您的 Google Cloud 项目已启用结算功能

  4. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  5. 确保您的 Google Cloud 项目已启用结算功能

  6. 启用 Cloud Run for Anthos API
  7. 安装并初始化 gcloud CLI。
  8. 安装 kubectl 组件:
    gcloud components install kubectl
  9. 更新组件:
    gcloud components update
  10. 如果您使用的是 Cloud Run for Anthos,请按照设置 Cloud Run for Anthos 中的说明创建新集群。

设置 gcloud 默认值

如需为您的 Cloud Run for Anthos 服务配置 gcloud 默认值,请执行以下操作:

  1. 设置默认项目:

    gcloud config set project PROJECT_ID

    PROJECT_ID 替换为您在本教程中使用的项目名称。

  2. 为您的集群配置 gcloud:

    gcloud config set run/platform gke
    gcloud config set run/cluster CLUSTER-NAME
    gcloud config set run/cluster_location REGION

    您需要将其中的:

    • CLUSTER-NAME 替换为您使用的集群名称。
    • REGION 替换为您选择的受支持的集群位置。

创建 Pub/Sub 主题

示例服务由发布到 Pub/Sub 主题的消息触发,因此您需要在 Pub/Sub 中创建主题。

如需创建新的 Pub/Sub 主题,请使用以下命令:

gcloud pubsub topics create myRunTopic

您可以使用 myRunTopic 或将其替换为 Cloud 项目中唯一的主题名称。

检索代码示例

如需检索可用的代码示例,请执行以下操作:

  1. 将示例应用代码库克隆到本地机器:

    Node.js

    git clone https://github.com/GoogleCloudPlatform/nodejs-docs-samples.git

    或者,您也可以下载该示例的 zip 文件并将其解压缩。

    Python

    git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git

    或者,您也可以下载该示例的 zip 文件并将其解压缩。

    Go

    git clone https://github.com/GoogleCloudPlatform/golang-samples.git

    或者,您也可以下载该示例的 zip 文件并将其解压缩。

    Java

    git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git

    或者,您也可以下载该示例的 zip 文件并将其解压缩。

  2. 切换到包含 Cloud Run for Anthos 示例代码的目录:

    Node.js

    cd nodejs-docs-samples/run/pubsub/

    Python

    cd python-docs-samples/run/pubsub/

    Go

    cd golang-samples/run/pubsub/

    Java

    cd java-docs-samples/run/pubsub/

查看代码

本教程中使用的代码包含以下部分:

  • 处理传入请求的服务器。

    Node.js

    为了便于测试 Node.js 服务,服务器配置与服务器启动是相互独立的。

    Node.js Web 服务器在 app.js 中设置。

    const express = require('express');
    const app = express();
    
    // This middleware is available in Express v4.16.0 onwards
    app.use(express.json());
    网络服务器在 index.js 中启动:
    const app = require('./app.js');
    const PORT = parseInt(parseInt(process.env.PORT)) || 8080;
    
    app.listen(PORT, () =>
      console.log(`nodejs-pubsub-tutorial listening on port ${PORT}`)
    );

    Python

    import base64
    import os
    
    from flask import Flask, request
    
    app = Flask(__name__)

    Go

    
    // Sample run-pubsub is a Cloud Run service which handles Pub/Sub messages.
    package main
    
    import (
    	"encoding/json"
    	"io/ioutil"
    	"log"
    	"net/http"
    	"os"
    )
    
    func main() {
    	http.HandleFunc("/", HelloPubSub)
    	// Determine port for HTTP service.
    	port := os.Getenv("PORT")
    	if port == "" {
    		port = "8080"
    		log.Printf("Defaulting to port %s", port)
    	}
    	// Start HTTP server.
    	log.Printf("Listening on port %s", port)
    	if err := http.ListenAndServe(":"+port, nil); err != nil {
    		log.Fatal(err)
    	}
    }
    

    Java

    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    @SpringBootApplication
    public class PubSubApplication {
      public static void main(String[] args) {
        SpringApplication.run(PubSubApplication.class, args);
      }
    }

  • 用于处理 Pub/Sub 消息并记录问候语的处理程序。

    Node.js

    app.post('/', (req, res) => {
      if (!req.body) {
        const msg = 'no Pub/Sub message received';
        console.error(`error: ${msg}`);
        res.status(400).send(`Bad Request: ${msg}`);
        return;
      }
      if (!req.body.message) {
        const msg = 'invalid Pub/Sub message format';
        console.error(`error: ${msg}`);
        res.status(400).send(`Bad Request: ${msg}`);
        return;
      }
    
      const pubSubMessage = req.body.message;
      const name = pubSubMessage.data
        ? Buffer.from(pubSubMessage.data, 'base64').toString().trim()
        : 'World';
    
      console.log(`Hello ${name}!`);
      res.status(204).send();
    });

    Python

    @app.route("/", methods=["POST"])
    def index():
        envelope = request.get_json()
        if not envelope:
            msg = "no Pub/Sub message received"
            print(f"error: {msg}")
            return f"Bad Request: {msg}", 400
    
        if not isinstance(envelope, dict) or "message" not in envelope:
            msg = "invalid Pub/Sub message format"
            print(f"error: {msg}")
            return f"Bad Request: {msg}", 400
    
        pubsub_message = envelope["message"]
    
        name = "World"
        if isinstance(pubsub_message, dict) and "data" in pubsub_message:
            name = base64.b64decode(pubsub_message["data"]).decode("utf-8").strip()
    
        print(f"Hello {name}!")
    
        return ("", 204)
    
    

    Go

    
    // PubSubMessage is the payload of a Pub/Sub event.
    // See the documentation for more details:
    // https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage
    type PubSubMessage struct {
    	Message struct {
    		Data []byte `json:"data,omitempty"`
    		ID   string `json:"id"`
    	} `json:"message"`
    	Subscription string `json:"subscription"`
    }
    
    // HelloPubSub receives and processes a Pub/Sub push message.
    func HelloPubSub(w http.ResponseWriter, r *http.Request) {
    	var m PubSubMessage
    	body, err := ioutil.ReadAll(r.Body)
    	if err != nil {
    		log.Printf("ioutil.ReadAll: %v", err)
    		http.Error(w, "Bad Request", http.StatusBadRequest)
    		return
    	}
    	// byte slice unmarshalling handles base64 decoding.
    	if err := json.Unmarshal(body, &m); err != nil {
    		log.Printf("json.Unmarshal: %v", err)
    		http.Error(w, "Bad Request", http.StatusBadRequest)
    		return
    	}
    
    	name := string(m.Message.Data)
    	if name == "" {
    		name = "World"
    	}
    	log.Printf("Hello %s!", name)
    }
    

    Java

    import com.example.cloudrun.Body;
    import java.util.Base64;
    import org.apache.commons.lang3.StringUtils;
    import org.springframework.http.HttpStatus;
    import org.springframework.http.ResponseEntity;
    import org.springframework.web.bind.annotation.RequestBody;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RequestMethod;
    import org.springframework.web.bind.annotation.RestController;
    
    // PubsubController consumes a Pub/Sub message.
    @RestController
    public class PubSubController {
      @RequestMapping(value = "/", method = RequestMethod.POST)
      public ResponseEntity receiveMessage(@RequestBody Body body) {
        // Get PubSub message from request body.
        Body.Message message = body.getMessage();
        if (message == null) {
          String msg = "Bad Request: invalid Pub/Sub message format";
          System.out.println(msg);
          return new ResponseEntity(msg, HttpStatus.BAD_REQUEST);
        }
    
        String data = message.getData();
        String target =
            !StringUtils.isEmpty(data) ? new String(Base64.getDecoder().decode(data)) : "World";
        String msg = "Hello " + target + "!";
    
        System.out.println(msg);
        return new ResponseEntity(msg, HttpStatus.OK);
      }
    }

    您必须对该服务进行编码,使其返回准确的 HTTP 响应代码。如果返回成功代码(例如 HTTP 200204),则指示 Pub/Sub 消息处理已完成。如果返回错误代码(如 HTTP 400500),则指示将重试该消息(请参阅“通过推送方式接收消息”指南

  • 用于定义服务的操作环境的 DockerfileDockerfile 的内容因语言而异。

    Node.js

    
    # Use the official lightweight Node.js 12 image.
    # https://hub.docker.com/_/node
    FROM node:18-slim
    
    # Create and change to the app directory.
    WORKDIR /usr/src/app
    
    # Copy application dependency manifests to the container image.
    # A wildcard is used to ensure both package.json AND package-lock.json are copied.
    # Copying this separately prevents re-running npm install on every code change.
    COPY package*.json ./
    
    # Install dependencies.
    # If you add a package-lock.json speed your build by switching to 'npm ci'.
    # RUN npm ci --only=production
    RUN npm install --production
    
    # Copy local code to the container image.
    COPY . .
    
    # Run the web service on container startup.
    CMD [ "npm", "start" ]
    

    Python

    
    # Use the official Python image.
    # https://hub.docker.com/_/python
    FROM python:3.10
    
    # Allow statements and log messages to immediately appear in the Cloud Run logs
    ENV PYTHONUNBUFFERED True
    
    # Copy application dependency manifests to the container image.
    # Copying this separately prevents re-running pip install on every code change.
    COPY requirements.txt ./
    
    # Install production dependencies.
    RUN pip install -r requirements.txt
    
    # Copy local code to the container image.
    ENV APP_HOME /app
    WORKDIR $APP_HOME
    COPY . ./
    
    # Run the web service on container startup.
    # Use gunicorn webserver with one worker process and 8 threads.
    # For environments with multiple CPU cores, increase the number of workers
    # to be equal to the cores available.
    # Timeout is set to 0 to disable the timeouts of the workers to allow Cloud Run to handle instance scaling.
    CMD exec gunicorn --bind :$PORT --workers 1 --threads 8 --timeout 0 main:app
    

    Go

    
    # Use the offical golang image to create a binary.
    # This is based on Debian and sets the GOPATH to /go.
    # https://hub.docker.com/_/golang
    FROM golang:1.17-buster as builder
    
    # Create and change to the app directory.
    WORKDIR /app
    
    # Retrieve application dependencies.
    # This allows the container build to reuse cached dependencies.
    # Expecting to copy go.mod and if present go.sum.
    COPY go.* ./
    RUN go mod download
    
    # Copy local code to the container image.
    COPY . ./
    
    # Build the binary.
    RUN go build -v -o server
    
    # Use the official Debian slim image for a lean production container.
    # https://hub.docker.com/_/debian
    # https://docs.docker.com/develop/develop-images/multistage-build/#use-multi-stage-builds
    FROM debian:buster-slim
    RUN set -x && apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install -y \
        ca-certificates && \
        rm -rf /var/lib/apt/lists/*
    
    # Copy the binary to the production image from the builder stage.
    COPY --from=builder /app/server /server
    
    # Run the web service on container startup.
    CMD ["/server"]
    

    Java

    此示例使用 Jib 利用常见 Java 工具构建 Docker 映像。无需编写 Dockerfile 或安装 Docker,Jib 便可以优化容器构建。详细了解如何使用 Jib 构建 Java 容器
    <plugin>
      <groupId>com.google.cloud.tools</groupId>
      <artifactId>jib-maven-plugin</artifactId>
      <version>3.3.1</version>
      <configuration>
        <to>
          <image>gcr.io/PROJECT_ID/pubsub</image>
        </to>
      </configuration>
    </plugin>
    

如需详细了解如何对 Pub/Sub 请求的来源进行身份验证,请参阅下文中的与 Pub/Sub 集成部分。

交付代码

交付代码包括三个步骤:使用 Cloud Build 构建容器映像、将容器映像上传到 Container Registry,以及将容器映像部署到 Cloud Run for Anthos。

要开发代码,请执行以下操作:

  1. 构建容器并将其发布到 Container Registry 上:

    Node.js

    gcloud builds submit --tag gcr.io/PROJECT_ID/pubsub

    其中 PROJECT_ID 是您的 Cloud 项目 ID,pubsub 是您要为服务指定的名称。

    成功完成后,您应该会看到一条包含 ID、创建时间和映像名称的 SUCCESS 消息。该映像存储在 Container Registry 中,并可根据需要重复使用。

    Python

    gcloud builds submit --tag gcr.io/PROJECT_ID/pubsub

    其中 PROJECT_ID 是您的 Cloud 项目 ID,pubsub 是您要为服务指定的名称。

    成功完成后,您应该会看到一条包含 ID、创建时间和映像名称的 SUCCESS 消息。该映像存储在 Container Registry 中,并可根据需要重复使用。

    Go

    gcloud builds submit --tag gcr.io/PROJECT_ID/pubsub

    其中 PROJECT_ID 是您的 Cloud 项目 ID,pubsub 是您要为服务指定的名称。

    成功完成后,您应该会看到一条包含 ID、创建时间和映像名称的 SUCCESS 消息。该映像存储在 Container Registry 中,并可根据需要重复使用。

    Java

    mvn compile jib:build -Dimage=gcr.io/PROJECT_ID/pubsub

    其中 PROJECT_ID 是您的 Cloud 项目 ID,pubsub 是您要为服务指定的名称。

    成功后,您会看到一条 BUILD SUCCESS 消息。该映像存储在 Container Registry 中,并可根据需要重复使用。

  2. 运行以下命令来部署您的应用:

    gcloud run deploy pubsub-tutorial --image gcr.io/PROJECT_ID/pubsub

    PROJECT_ID 替换为您的 Cloud 项目 ID。 pubsub 为容器名称,pubsub-tutorial 为服务名称。请注意,容器映像已部署到您之前在设置 gcloud 中配置的服务和集群

    等待部署完成,这可能需要半分钟左右的时间。 成功完成时,命令行会显示服务网址。此网址用于配置 Pub/Sub 订阅。

  3. 如果要将代码更新部署到该服务,请重复执行上述步骤。向服务执行的每次部署都会创建一个新的修订版本,该修订版本准备就绪后会自动开始处理流量。

与 Pub/Sub 集成

部署完 Cloud Run 服务后,我们将配置 Pub/Sub 以向其推送消息。

如需将该服务与 Pub/Sub 集成,请执行以下操作:

  1. 允许 Pub/Sub 在您的项目中创建身份验证令牌:

    gcloud projects add-iam-policy-binding PROJECT_ID \
         --member=serviceAccount:service-PROJECT-NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com \
         --role=roles/iam.serviceAccountTokenCreator

    替换

    • PROJECT_ID 替换为您的 Cloud 项目 ID。
    • PROJECT-NUMBER 替换为您的 Cloud 项目编号。
  2. 创建或选择一个服务帐号,用于表示 Pub/Sub 订阅身份。

    gcloud iam service-accounts create cloud-run-pubsub-invoker \
         --display-name "Cloud Run for Anthos Pub/Sub Invoker"

    您可以使用 cloud-run-pubsub-invoker 或将其替换为 Cloud 项目中唯一的名称。

  3. 使用该服务帐号创建 Pub/Sub 订阅:

    1. 为您的集群启用自动 TLS 和 HTTPS 并向您的服务添加网域映射

    2. 为 Pub/Sub 注册网域所有权

    3. 添加代码以验证附加到 Pub/Sub 消息的身份验证令牌。 示例代码在使用推送端点执行身份验证和授权中提供。

      身份验证必须确保令牌有效且与预期的服务帐号相关联。与 Cloud Run 不同,Cloud Run for Anthos 没有内置授权检查可验证令牌是否有效,或者服务帐号是否有权调用 Cloud Run for Anthos 服务。

    4. 使用该服务帐号创建 Pub/Sub 订阅:

      gcloud pubsub subscriptions create myRunSubscription --topic myRunTopic \
           --push-endpoint=SERVICE-URL/ \
           --push-auth-service-account=cloud-run-pubsub-invoker@PROJECT_ID.iam.gserviceaccount.com

      替换

      • myRunTopic 替换为之前创建的主题。
      • SERVICE-URL 替换为您的自定义服务网址。 指定 https 作为协议。
      • PROJECT_ID 替换为您的 Cloud 项目 ID。

      --push-auth-service-account 标志激活 Pub/Sub 推送功能,以进行身份验证和授权

您的服务现已与 Pub/Sub 全面集成。

测试

如需测试端到端解决方案,请按如下所述操作:

  1. 向该主题发送一条 Pub/Sub 消息:

    gcloud pubsub topics publish myRunTopic --message "Runner"

    除了使用命令行(如本教程中所示),您还可以编程方式发布消息。如需了解详情,请参阅发布消息

  2. 导航到服务日志:

    1. 转到 Google Cloud 控制台中的 Cloud Run for Anthos 页面:

      转到 Cloud Run for Anthos

    2. 点击 pubsub-tutorial 服务。

    3. 选择日志标签页。

      您可能需要等待一些时间才能看到日志。如果您没有立即看到日志,请稍等片刻再检查一次。

  3. 查找“Hello Runner!”消息。

清理

如需浏览有关如何将 Cloud Run for Anthos 与 Pub/Sub 搭配使用的更深层用例,请暂时跳过清理,继续阅读图片处理教程。

如果您为本教程创建了一个新项目,请删除项目。 如果您使用的是现有项目,希望保留此项目且不保留本教程中添加的任何更改,请删除为教程创建的资源

删除项目

为了避免产生费用,最简单的方法是删除您为本教程创建的项目。

如需删除项目,请执行以下操作:

  1. 在 Google Cloud 控制台中,进入管理资源页面。

    转到“管理资源”

  2. 在项目列表中,选择要删除的项目,然后点击删除
  3. 在对话框中输入项目 ID,然后点击关闭以删除项目。

删除教程资源

  1. 删除您在本教程中部署的 Cloud Run for Anthos 服务:

    gcloud run services delete SERVICE-NAME

    其中,SERVICE-NAME 是您选择的服务名称。

    您还可以在 Google Cloud 控制台中删除 Cloud Run for Anthos 服务。

    转到 Cloud Run for Anthos

  2. 移除您在教程设置过程中添加的 gcloud 默认配置:

     gcloud config unset run/platform
     gcloud config unset run/cluster
     gcloud config unset run/cluster_location
    
  3. 移除项目配置:

     gcloud config unset project
    
  4. 删除在本教程中创建的其他 Google Cloud 资源:

后续步骤