教程:将 Pub/Sub 与 Cloud Run 搭配使用

本教程介绍如何通过 Pub/Sub 推送订阅编写、部署和调用 Cloud Run 服务。

目标

  • 编写、构建服务并将其部署到 Cloud Run
  • 通过向 Pub/Sub 主题发布消息来调用该服务。

费用

本教程使用 Google Cloud 的以下收费组件:

请使用价格计算器根据您的预计用量来估算费用。

Google Cloud 新用户可能有资格申请免费试用

准备工作

  1. 登录您的 Google Cloud 帐号。如果您是 Google Cloud 新手,请创建一个帐号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
  2. 在 Google Cloud Console 的项目选择器页面上,选择或创建一个 Google Cloud 项目。

    转到“项目选择器”

  3. 确保您的 Cloud 项目已启用结算功能。 了解如何确认您的项目是否已启用结算功能

  4. 启用 Cloud Run API
  5. 安装并初始化 Cloud SDK。
  6. 更新组件:
    gcloud components update

设置 gcloud 默认值

要配置您的 Cloud Run 服务的 gcloud 默认值,请执行以下操作:

  1. 设置默认项目:

    gcloud config set project PROJECT_ID

    PROJECT_ID 替换为您在本教程中创建的项目的名称。

  2. 为您选择的区域配置 gcloud:

    gcloud config set run/region REGION

    REGION 替换为您选择的受支持的 Cloud Run 区域

Cloud Run 位置

Cloud Run 是地区级的,这意味着运行 Cloud Run 服务的基础架构位于特定地区,并且由 Google 代管,以便在该地区内的所有区域以冗余方式提供。

选择用于运行 Cloud Run 服务的地区时,主要考虑该地区能否满足您的延迟时间、可用性或耐用性要求。通常,您可以选择距离用户最近的地区,但除此之外,您还应该考虑 Cloud Run 服务使用的其他 Google Cloud 产品的位置。跨多个位置使用 Google Cloud 产品可能会影响服务的延迟时间和费用。

Cloud Run 可在以下地区使用:

基于层级 1 价格

基于层级 2 价格

  • asia-east2(香港)
  • asia-northeast3(韩国首尔)
  • asia-southeast1(新加坡)
  • asia-southeast2 (雅加达)
  • asia-south1(印度孟买)
  • asia-south2(印度德里)
  • australia-southeast1(悉尼)
  • australia-southeast2(墨尔本)
  • europe-central2(波兰,华沙)
  • europe-west2(英国伦敦)
  • europe-west3(德国法兰克福)
  • europe-west6(瑞士苏黎世) 叶形图标 二氧化碳排放量最低
  • northamerica-northeast1(蒙特利尔) 叶形图标 二氧化碳排放量最低
  • southamerica-east1(巴西圣保罗) 叶形图标 二氧化碳排放量最低
  • us-west2(洛杉矶)
  • us-west3(拉斯维加斯)
  • us-west4(盐湖城)

如果您已创建 Cloud Run 服务,则可以在 Cloud Console 的 Cloud Run 信息中心查看相应的地区。

创建 Pub/Sub 主题

示例服务由发布到 Pub/Sub 主题的消息触发,因此您需要在 Pub/Sub 中创建主题。

如需创建新的 Pub/Sub 主题,请使用以下命令:

gcloud pubsub topics create myRunTopic

您可以使用 myRunTopic 或将其替换为 Cloud 项目中唯一的主题名称。

检索代码示例

如需检索可用的代码示例,请执行以下操作:

  1. 将示例应用代码库克隆到本地机器:

    Node.js

    git clone https://github.com/GoogleCloudPlatform/nodejs-docs-samples.git

    或者,您也可以下载该示例的 zip 文件并将其解压缩。

    Python

    git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git

    或者,您也可以下载该示例的 zip 文件并将其解压缩。

    Go

    git clone https://github.com/GoogleCloudPlatform/golang-samples.git

    或者,您也可以下载该示例的 zip 文件并将其解压缩。

    Java

    git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git

    或者,您也可以下载该示例的 zip 文件并将其解压缩。

  2. 切换到包含 Cloud Run 示例代码的目录:

    Node.js

    cd nodejs-docs-samples/run/pubsub/

    Python

    cd python-docs-samples/run/pubsub/

    Go

    cd golang-samples/run/pubsub/

    Java

    cd java-docs-samples/run/pubsub/

查看代码

本教程中使用的代码包含以下部分:

  • 处理传入请求的服务器。

    Node.js

    为了便于测试 Node.js 服务,服务器配置与服务器启动是相互独立的。

    Node.js Web 服务器在 app.js 中设置。

    const express = require('express');
    const app = express();
    
    // This middleware is available in Express v4.16.0 onwards
    app.use(express.json());
    Web 服务器在 index.js 中启动:
    const app = require('./app.js');
    const PORT = process.env.PORT || 8080;
    
    app.listen(PORT, () =>
      console.log(`nodejs-pubsub-tutorial listening on port ${PORT}`)
    );

    Python

    import base64
    import os
    
    from flask import Flask, request
    
    app = Flask(__name__)

    Go

    
    // Sample run-pubsub is a Cloud Run service which handles Pub/Sub messages.
    package main
    
    import (
    	"encoding/json"
    	"io/ioutil"
    	"log"
    	"net/http"
    	"os"
    )
    
    func main() {
    	http.HandleFunc("/", HelloPubSub)
    	// Determine port for HTTP service.
    	port := os.Getenv("PORT")
    	if port == "" {
    		port = "8080"
    		log.Printf("Defaulting to port %s", port)
    	}
    	// Start HTTP server.
    	log.Printf("Listening on port %s", port)
    	if err := http.ListenAndServe(":"+port, nil); err != nil {
    		log.Fatal(err)
    	}
    }
    

    Java

    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    @SpringBootApplication
    public class PubSubApplication {
      public static void main(String[] args) {
        SpringApplication.run(PubSubApplication.class, args);
      }
    }

  • 用于处理 Pub/Sub 消息并记录问候语的处理程序。

    Node.js

    app.post('/', (req, res) => {
      if (!req.body) {
        const msg = 'no Pub/Sub message received';
        console.error(`error: ${msg}`);
        res.status(400).send(`Bad Request: ${msg}`);
        return;
      }
      if (!req.body.message) {
        const msg = 'invalid Pub/Sub message format';
        console.error(`error: ${msg}`);
        res.status(400).send(`Bad Request: ${msg}`);
        return;
      }
    
      const pubSubMessage = req.body.message;
      const name = pubSubMessage.data
        ? Buffer.from(pubSubMessage.data, 'base64').toString().trim()
        : 'World';
    
      console.log(`Hello ${name}!`);
      res.status(204).send();
    });

    Python

    @app.route("/", methods=["POST"])
    def index():
        envelope = request.get_json()
        if not envelope:
            msg = "no Pub/Sub message received"
            print(f"error: {msg}")
            return f"Bad Request: {msg}", 400
    
        if not isinstance(envelope, dict) or "message" not in envelope:
            msg = "invalid Pub/Sub message format"
            print(f"error: {msg}")
            return f"Bad Request: {msg}", 400
    
        pubsub_message = envelope["message"]
    
        name = "World"
        if isinstance(pubsub_message, dict) and "data" in pubsub_message:
            name = base64.b64decode(pubsub_message["data"]).decode("utf-8").strip()
    
        print(f"Hello {name}!")
    
        return ("", 204)
    
    

    Go

    
    // PubSubMessage is the payload of a Pub/Sub event.
    // See the documentation for more details:
    // https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage
    type PubSubMessage struct {
    	Message struct {
    		Data []byte `json:"data,omitempty"`
    		ID   string `json:"id"`
    	} `json:"message"`
    	Subscription string `json:"subscription"`
    }
    
    // HelloPubSub receives and processes a Pub/Sub push message.
    func HelloPubSub(w http.ResponseWriter, r *http.Request) {
    	var m PubSubMessage
    	body, err := ioutil.ReadAll(r.Body)
    	if err != nil {
    		log.Printf("ioutil.ReadAll: %v", err)
    		http.Error(w, "Bad Request", http.StatusBadRequest)
    		return
    	}
    	if err := json.Unmarshal(body, &m); err != nil {
    		log.Printf("json.Unmarshal: %v", err)
    		http.Error(w, "Bad Request", http.StatusBadRequest)
    		return
    	}
    
    	name := string(m.Message.Data)
    	if name == "" {
    		name = "World"
    	}
    	log.Printf("Hello %s!", name)
    }
    

    Java

    import java.util.Base64;
    import org.apache.commons.lang3.StringUtils;
    import org.springframework.http.HttpStatus;
    import org.springframework.http.ResponseEntity;
    import org.springframework.web.bind.annotation.RequestBody;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RequestMethod;
    import org.springframework.web.bind.annotation.RestController;
    
    // PubsubController consumes a Pub/Sub message.
    @RestController
    public class PubSubController {
      @RequestMapping(value = "/", method = RequestMethod.POST)
      public ResponseEntity receiveMessage(@RequestBody Body body) {
        // Get PubSub message from request body.
        Body.Message message = body.getMessage();
        if (message == null) {
          String msg = "Bad Request: invalid Pub/Sub message format";
          System.out.println(msg);
          return new ResponseEntity(msg, HttpStatus.BAD_REQUEST);
        }
    
        String data = message.getData();
        String target =
            !StringUtils.isEmpty(data) ? new String(Base64.getDecoder().decode(data)) : "World";
        String msg = "Hello " + target + "!";
    
        System.out.println(msg);
        return new ResponseEntity(msg, HttpStatus.OK);
      }
    }

    您必须对该服务进行编码,使其返回准确的 HTTP 响应代码。如果返回成功代码(例如 HTTP 200204),则指示 Pub/Sub 消息处理已完成。如果返回错误代码(如 HTTP 400500),则指示将重试该消息(请参阅“通过推送方式接收消息”指南

  • 用于定义服务的操作环境的 DockerfileDockerfile 的内容因语言而异。

    Node.js

    
    # Use the official lightweight Node.js 12 image.
    # https://hub.docker.com/_/node
    FROM node:12-slim
    
    # Create and change to the app directory.
    WORKDIR /usr/src/app
    
    # Copy application dependency manifests to the container image.
    # A wildcard is used to ensure both package.json AND package-lock.json are copied.
    # Copying this separately prevents re-running npm install on every code change.
    COPY package*.json ./
    
    # Install dependencies.
    # If you add a package-lock.json speed your build by switching to 'npm ci'.
    # RUN npm ci --only=production
    RUN npm install --production
    
    # Copy local code to the container image.
    COPY . .
    
    # Run the web service on container startup.
    CMD [ "npm", "start" ]
    

    Python

    
    # Use the official Python image.
    # https://hub.docker.com/_/python
    FROM python:3.9
    
    # Allow statements and log messages to immediately appear in the Cloud Run logs
    ENV PYTHONUNBUFFERED True
    
    # Copy application dependency manifests to the container image.
    # Copying this separately prevents re-running pip install on every code change.
    COPY requirements.txt ./
    
    # Install production dependencies.
    RUN pip install -r requirements.txt
    
    # Copy local code to the container image.
    ENV APP_HOME /app
    WORKDIR $APP_HOME
    COPY . ./
    
    # Run the web service on container startup.
    # Use gunicorn webserver with one worker process and 8 threads.
    # For environments with multiple CPU cores, increase the number of workers
    # to be equal to the cores available.
    # Timeout is set to 0 to disable the timeouts of the workers to allow Cloud Run to handle instance scaling.
    CMD exec gunicorn --bind :$PORT --workers 1 --threads 8 --timeout 0 main:app
    

    Go

    
    # Use the offical golang image to create a binary.
    # This is based on Debian and sets the GOPATH to /go.
    # https://hub.docker.com/_/golang
    FROM golang:1.16-buster as builder
    
    # Create and change to the app directory.
    WORKDIR /app
    
    # Retrieve application dependencies.
    # This allows the container build to reuse cached dependencies.
    # Expecting to copy go.mod and if present go.sum.
    COPY go.* ./
    RUN go mod download
    
    # Copy local code to the container image.
    COPY . ./
    
    # Build the binary.
    RUN go build -v -o server
    
    # Use the official Debian slim image for a lean production container.
    # https://hub.docker.com/_/debian
    # https://docs.docker.com/develop/develop-images/multistage-build/#use-multi-stage-builds
    FROM debian:buster-slim
    RUN set -x && apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install -y \
        ca-certificates && \
        rm -rf /var/lib/apt/lists/*
    
    # Copy the binary to the production image from the builder stage.
    COPY --from=builder /app/server /server
    
    # Run the web service on container startup.
    CMD ["/server"]
    

    Java

    此示例使用 Jib 利用常见 Java 工具构建 Docker 映像。无需编写 Dockerfile 或安装 Docker,Jib 便可以优化容器构建。详细了解如何使用 Jib 构建 Java 容器
    <plugin>
      <groupId>com.google.cloud.tools</groupId>
      <artifactId>jib-maven-plugin</artifactId>
      <version>3.1.2</version>
      <configuration>
        <to>
          <image>gcr.io/PROJECT_ID/pubsub</image>
        </to>
      </configuration>
    </plugin>
    

如需详细了解如何对 Pub/Sub 请求的来源进行身份验证,请参阅下文中的与 Pub/Sub 集成部分。

交付代码

交付代码包括三个步骤:使用 Cloud Build 构建容器映像、将容器映像上传到 Container Registry,以及将容器映像部署到 Cloud Run。

如需交付代码,请执行以下操作:

  1. 构建容器并将其发布到 Container Registry 上:

    Node.js

    gcloud builds submit --tag gcr.io/PROJECT_ID/pubsub

    其中 PROJECT_ID 是您的 Cloud 项目 ID,pubsub 是您要为服务指定的名称。

    成功完成后,您应该会看到一条包含 ID、创建时间和映像名称的 SUCCESS 消息。该映像存储在 Container Registry 中,并可根据需要重复使用。

    Python

    gcloud builds submit --tag gcr.io/PROJECT_ID/pubsub

    其中 PROJECT_ID 是您的 Cloud 项目 ID,pubsub 是您要为服务指定的名称。

    成功完成后,您应该会看到一条包含 ID、创建时间和映像名称的 SUCCESS 消息。该映像存储在 Container Registry 中,并可根据需要重复使用。

    Go

    gcloud builds submit --tag gcr.io/PROJECT_ID/pubsub

    其中 PROJECT_ID 是您的 Cloud 项目 ID,pubsub 是您要为服务指定的名称。

    成功完成后,您应该会看到一条包含 ID、创建时间和映像名称的 SUCCESS 消息。该映像存储在 Container Registry 中,并可根据需要重复使用。

    Java

    1. 使用 gcloud 凭据帮助程序,授权 Docker 推送到您的 Container Registry。

      gcloud auth configure-docker

    2. 使用 Jib Maven 插件来构建容器并将其推送到 Container Registry。

      mvn compile jib:build -Dimage=gcr.io/PROJECT_ID/pubsub

    其中 PROJECT_ID 是您的 Cloud 项目 ID,pubsub 是您要为服务指定的名称。

    成功后,您会看到一条 BUILD SUCCESS 消息。该映像存储在 Container Registry 中,并可根据需要重复使用。

  2. 运行以下命令来部署您的应用:

    gcloud run deploy pubsub-tutorial --image gcr.io/PROJECT_ID/pubsub

    PROJECT_ID 替换为您的 Cloud 项目 ID。 pubsub 为容器名称,pubsub-tutorial 为服务名称。请注意,容器映像会部署到您之前在设置 gcloud 中配置的服务和区域

    在出现“允许未通过身份验证的调用”提示时回复 n“否”。通过将该服务保留为不公开状态,您可以依赖 Cloud Run 的自动 Pub/Sub 集成功能来对请求进行身份验证。如需详细了解如何进行此项配置,请参阅与 Pub/Sub 集成。如需详细了解基于 Identity and Access Management (IAM) 的身份验证,请参阅使用 IAM 管理访问权限

    等待部署完成,这可能需要半分钟左右的时间。 成功完成时,命令行会显示服务网址。此网址用于配置 Pub/Sub 订阅。

  3. 如果要将代码更新部署到该服务,请重复执行上述步骤。向服务执行的每次部署都会创建一个新的修订版本,该修订版本准备就绪后会自动开始处理流量。

与 Pub/Sub 集成

部署完 Cloud Run 服务后,我们将配置 Pub/Sub 以向其推送消息。

如需将该服务与 Pub/Sub 集成,请执行以下操作:

  1. 创建或选择一个服务帐号,用于表示 Pub/Sub 订阅身份。

    gcloud iam service-accounts create cloud-run-pubsub-invoker \
         --display-name "Cloud Run Pub/Sub Invoker"

    您可以使用 cloud-run-pubsub-invoker 或将其替换为 Cloud 项目中唯一的名称。

  2. 使用该服务帐号创建 Pub/Sub 订阅:

    1. 为调用方服务帐号授予调用您的 pubsub-tutorial 服务的权限:

      gcloud run services add-iam-policy-binding pubsub-tutorial \
         --member=serviceAccount:cloud-run-pubsub-invoker@PROJECT_ID.iam.gserviceaccount.com \
         --role=roles/run.invoker

      IAM 更改可能需要几分钟时间才能完成传播。在此期间,您可能会在服务日志中看到 HTTP 403 错误。

    2. 使用该服务帐号创建 Pub/Sub 订阅:

      gcloud pubsub subscriptions create myRunSubscription --topic myRunTopic \
         --push-endpoint=SERVICE-URL/ \
         --push-auth-service-account=cloud-run-pubsub-invoker@PROJECT_ID.iam.gserviceaccount.com

      替换

      • myRunTopic 替换为之前创建的主题。
      • SERVICE-URL 替换为在部署服务时提供的 HTTPS 网址。即使您还添加了网域映射,此网址仍然有效。
      • PROJECT_ID 替换为您的 Cloud 项目 ID。

      --push-auth-service-account 标志激活 Pub/Sub 推送功能,以进行身份验证和授权

      您的 Cloud Run 服务网域会自动注册用于 Pub/Sub 订阅。

      仅适用于 Cloud Run,可以对令牌是否有效进行内置身份验证检查,以及对服务帐号是否有权调用 Cloud Run 服务进行授权检查。

您的服务现已与 Pub/Sub 全面集成。

测试

如需测试端到端解决方案,请按如下所述操作:

  1. 向该主题发送一条 Pub/Sub 消息:

    gcloud pubsub topics publish myRunTopic --message "Runner"

    除了使用命令行(如本教程中所示),您还可以编程方式发布消息。如需了解详情,请参阅发布消息

  2. 导航到服务日志:

    1. 导航到 Google Cloud Console
    2. 点击 pubsub-tutorial 服务。
    3. 选择日志标签页。

      您可能需要等待一些时间才能看到日志。如果您没有立即看到日志,请稍等片刻再检查一次。

  3. 查找“Hello Runner!”消息。

清理

如需浏览有关如何将 Cloud Run 与 Pub/Sub 搭配使用的更深层用例,请暂时跳过清理,继续阅读教程:使用 Cloud Run 处理图片

如果您为本教程创建了一个新项目,请删除项目。 如果您使用的是现有项目,希望保留此项目且不保留本教程中添加的任何更改,请删除为教程创建的资源

删除项目

为了避免产生费用,最简单的方法是删除您为本教程创建的项目。

如需删除项目,请执行以下操作:

  1. 在 Cloud Console 中,转到管理资源页面。

    转到“管理资源”

  2. 在项目列表中,选择要删除的项目,然后点击删除
  3. 在对话框中输入项目 ID,然后点击关闭以删除项目。

删除教程资源

  1. 删除您在本教程中部署的 Cloud Run 服务:

    gcloud run services delete SERVICE-NAME

    其中,SERVICE-NAME 是您选择的服务名称。

    您还可以从 Google Cloud Console 中删除 Cloud Run 服务。

  2. 移除您在教程设置过程中添加的 gcloud 默认区域配置:

     gcloud config unset run/region
    
  3. 移除项目配置:

     gcloud config unset project
    
  4. 删除在本教程中创建的其他 Google Cloud 资源:

后续步骤