从 Pub/Sub 推送触发

您可以使用 Pub/Sub 将消息推送到 Cloud Run 服务的端点,随后这些消息会作为 HTTP 请求传送到容器。您无法使用 Pub/Sub 拉取订阅,因为 Cloud Run 仅在处理请求期间分配 CPU。

您应该处理该消息,然后在完成后返回响应。

利用服务帐号和 IAM 权限,您可以安全、私密地将 Pub/Sub 与 Cloud Run 搭配使用,而无需公开 Cloud Run 服务。只有您设置的 Pub/Sub 订阅才能调用您的服务。

可能的使用场景包括:

本页面介绍如何在同一 Google Cloud 项目中让您的服务安全地处理由 Pub/Sub 订阅推送的消息。

要将您的服务与 Pub/Sub 集成,请执行以下操作:

  • 创建 Pub/Sub 主题。
  • 在 Cloud Run 服务中添加代码,以响应发送到您创建的主题的 Pub/Sub 消息。
  • 创建具有所需权限的服务帐号。
  • 创建 Pub/Sub 订阅并将其与服务帐号相关联。此订阅将向您的服务发送已发布到该主题的任何消息。

前期准备

按照 Cloud Run 的设置页面中的说明设置您的环境(如果您尚未这样做)。您需要使用 gcloud 命令行和 Google Cloud 项目将 Cloud Run 服务部署到其中。

添加代码以处理来自 Pub/Sub 的消息

您的服务必须从请求中提取消息并返回预期的成功代码。以下所选语言(您可以使用任何语言)的代码段显示如何针对简单的“Hello World”消息执行此操作:

Node.js

app.post('/', (req, res) => {
  if (!req.body) {
    const msg = 'no Pub/Sub message received';
    console.error(`error: ${msg}`);
    res.status(400).send(`Bad Request: ${msg}`);
    return;
  }
  if (!req.body.message) {
    const msg = 'invalid Pub/Sub message format';
    console.error(`error: ${msg}`);
    res.status(400).send(`Bad Request: ${msg}`);
    return;
  }

  const pubSubMessage = req.body.message;
  const name = pubSubMessage.data
    ? Buffer.from(pubSubMessage.data, 'base64').toString().trim()
    : 'World';

  console.log(`Hello ${name}!`);
  res.status(204).send();
});

Python

@app.route("/", methods=["POST"])
def index():
    envelope = request.get_json()
    if not envelope:
        msg = "no Pub/Sub message received"
        print(f"error: {msg}")
        return f"Bad Request: {msg}", 400

    if not isinstance(envelope, dict) or "message" not in envelope:
        msg = "invalid Pub/Sub message format"
        print(f"error: {msg}")
        return f"Bad Request: {msg}", 400

    pubsub_message = envelope["message"]

    name = "World"
    if isinstance(pubsub_message, dict) and "data" in pubsub_message:
        name = base64.b64decode(pubsub_message["data"]).decode("utf-8").strip()

    print(f"Hello {name}!")

    return ("", 204)

Go


// PubSubMessage is the payload of a Pub/Sub event.
// See the documentation for more details:
// https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage
type PubSubMessage struct {
	Message struct {
		Data []byte `json:"data,omitempty"`
		ID   string `json:"id"`
	} `json:"message"`
	Subscription string `json:"subscription"`
}

// HelloPubSub receives and processes a Pub/Sub push message.
func HelloPubSub(w http.ResponseWriter, r *http.Request) {
	var m PubSubMessage
	body, err := ioutil.ReadAll(r.Body)
	if err != nil {
		log.Printf("ioutil.ReadAll: %v", err)
		http.Error(w, "Bad Request", http.StatusBadRequest)
		return
	}
	if err := json.Unmarshal(body, &m); err != nil {
		log.Printf("json.Unmarshal: %v", err)
		http.Error(w, "Bad Request", http.StatusBadRequest)
		return
	}

	name := string(m.Message.Data)
	if name == "" {
		name = "World"
	}
	log.Printf("Hello %s!", name)
}

Java

import java.util.Base64;
import org.apache.commons.lang3.StringUtils;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

// PubsubController consumes a Pub/Sub message.
@RestController
public class PubSubController {
  @RequestMapping(value = "/", method = RequestMethod.POST)
  public ResponseEntity receiveMessage(@RequestBody Body body) {
    // Get PubSub message from request body.
    Body.Message message = body.getMessage();
    if (message == null) {
      String msg = "Bad Request: invalid Pub/Sub message format";
      System.out.println(msg);
      return new ResponseEntity(msg, HttpStatus.BAD_REQUEST);
    }

    String data = message.getData();
    String target =
        !StringUtils.isEmpty(data) ? new String(Base64.getDecoder().decode(data)) : "World";
    String msg = "Hello " + target + "!";

    System.out.println(msg);
    return new ResponseEntity(msg, HttpStatus.OK);
  }
}

您必须对该服务进行编码,使其返回准确的 HTTP 响应代码。如果返回成功代码(例如 HTTP 200204),则指示 Pub/Sub 消息处理已完成。如果返回错误代码(如 HTTP 400500),则指示将重试该消息(请参阅通过推送方式接收消息

为订阅创建服务帐号

您需要创建一个服务帐号来与您的 Pub/Sub 订阅相关联,并为其授予调用 Cloud Run 服务的权限。推送到您的 Cloud Run 服务的 Pub/Sub 消息将采用此服务帐号的身份。

您可以使用现有服务帐号来表示 Pub/Sub 订阅身份,也可以创建新帐号。

要创建新服务帐号并向其授予调用 Cloud Run 服务的权限,请执行以下操作:

控制台

  1. 在 Cloud Console 中,转到服务帐号页面。

    转到“服务帐号”

  2. 选择一个项目。

  3. 输入 Cloud Console 中显示的服务帐号名称。

    Cloud Console 会根据此名称生成服务帐号 ID。如有必要,请修改 ID。此 ID 创建后便无法更改。

  4. 可选:输入服务帐号的说明。

  5. 点击创建

  6. 点击选择角色字段。

  7. 所有角色下,选择 Cloud Run > Cloud Run Invoker

  8. 点击完成

命令行

  1. 创建服务帐号:

    gcloud iam service-accounts create SERVICE_ACCOUNT_NAME \
       --display-name "DISPLAYED_SERVICE_ACCOUNT_NAME"

    注意替换如下内容:

    • SERVICE_ACCOUNT_NAME 替换为 Google Cloud 项目中唯一的小写名称,例如 my-invoker-service-account-name
    • DISPLAYED_SERVICE_ACCOUNT_NAME 替换为您要在控制台等界面中为此服务帐号显示的名称,例如 My Invoker Service Account
  2. 对于 Cloud Run,请向您的服务帐号授予调用服务的权限:

    gcloud run services add-iam-policy-binding SERVICE \
       --member=serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com \
       --role=roles/run.invoker

    替换

    • SERVICE 替换为您希望 Pub/Sub 调用的服务的名称。
    • SERVICE_ACCOUNT_NAME 替换为服务帐号的名称。
    • PROJECT_ID 替换为 Google Cloud 项目 ID。

创建 Pub/Sub 主题

对您的服务的请求由发布到 Pub/Sub 主题的消息触发,因此您需要创建一个主题:

控制台

  1. 访问 Cloud Console 中的 Pub/Sub 主题页面。

    Pub/Sub 主题页面

  2. 点击创建主题

  3. 为您的主题输入一个独一无二的名称,例如 MyTopic。

命令行

gcloud pubsub topics create TOPIC-NAME

TOPIC-NAME 替换为 Google Cloud 项目中唯一的主题名称。

创建推送订阅并将其与服务帐号相关联

创建 Pub/Sub 主题之后,您必须订阅服务以接收发送到主题的消息,并且必须将订阅与您为服务创建的服务帐号相关联。 您可以使用 Cloud Console 或 gcloud 命令行:

控制台

  1. 转到 Pub/Sub 主题页面。

    Pub/Sub 主题页面

  2. 点击您要订阅的主题。

  3. 点击创建订阅以显示订阅表单:

    订阅表单

    在此表单中执行以下操作:

    1. 指定推送传送类型。
    2. 对于 Endpoints 网址,请指定您的服务网址,该网址显示在服务详情页面中。
    3. 在“服务帐号”下拉列表中,选择您创建的具有所需权限的服务帐号
    4. 根据需要设置订阅到期时间和确认截止时间。
    5. 点击创建
  4. 订阅已完成。发布到该主题的消息现在会推送到您的服务中。

命令行

  1. 允许 Pub/Sub 在您的项目中创建身份验证令牌:

    gcloud projects add-iam-policy-binding PROJECT-ID \
         --member=serviceAccount:service-PROJECT-NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com \
         --role=roles/iam.serviceAccountTokenCreator

    替换

    • PROJECT-ID 替换为您的 Google Cloud 项目 ID。
    • PROJECT-NUMBER 替换为您的 Google Cloud 项目编号。

      Cloud Console 中与您项目对应的“项目信息”面板中列出了项目 ID 和项目编号。

  2. 使用您创建的具有所需权限的服务帐号创建 Pub/Sub 订阅:

    gcloud beta pubsub subscriptions create SUBSCRIPTION-ID --topic TOPIC-NAME \
       --push-endpoint=SERVICE-URL/ \
       --push-auth-service-account=SERVICE-ACCOUNT-NAME@PROJECT-ID.iam.gserviceaccount.com

    替换

    • TOPIC-NAME 替换为之前创建的主题。
    • SERVICE-URL 替换为您在部署服务时提供的 HTTPS 网址。您可以使用 gcloud run services describe 命令并指定服务名称找到该网址:查找以 domain 开头的返回行。
    • PROJECT-ID 替换为 Google Cloud 项目 ID。

    --push-auth-service-account 标志激活 Pub/Sub 推送功能,以进行身份验证和授权

  3. 订阅已完成。发布到该主题的消息现在会推送到您的服务中。您可以使用以下命令将测试消息推送到主题:

    gcloud pubsub topics publish TOPIC --message "hello"

    TOPIC 替换为您创建的主题的名称

后续步骤