通过 Pub/Sub 推送触发

本页面介绍如何使用 Pub/Sub 将消息推送到 Cloud Run 服务的端点,随后这些消息会作为 HTTP 请求传送到容器。本页面介绍如何在同一 Google Cloud 项目中让您的服务安全地处理由 Pub/Sub 订阅推送的消息。

利用服务账号和 IAM 权限,您可以安全、私密地将 Pub/Sub 与 Cloud Run 搭配使用,而无需公开 Cloud Run 服务。只有您设置的 Pub/Sub 订阅才能调用您的服务。

Cloud Run 的确认时限

请务必将 Pub/Sub 订阅确认时限 (ackDeadlineSeconds) 设置为允许的 600 秒上限。您的 Cloud Run 服务必须在 600 秒内返回响应以确认 Pub/Sub 消息,否则 Pub/Sub 将重新传送该消息,从而导致 Cloud Run 服务重复触发。

使用场景

可能的使用场景包括:

集成概览

要将您的服务与 Pub/Sub 集成,请执行以下操作:

  • 创建 Pub/Sub 主题。
  • 在 Cloud Run 服务中添加代码,以响应发送到您创建的主题的 Pub/Sub 消息。
  • 创建具有所需权限的服务账号。
  • 创建 Pub/Sub 订阅并将其与服务账号相关联。此订阅将向您的服务发送已发布到该主题的任何消息。

前期准备

  • 按照 Cloud Run 设置页面中的说明设置您的环境(如果尚未设置)。
  • 本指南假定您已拥有 Cloud Run 服务,并希望添加代码以将该服务与 Pub/Sub 集成。如果您没有此类服务,请考虑使用 Pub/Sub 的 Cloud Run 教程,而不是按照本页面的说明操作。

添加代码以处理来自 Pub/Sub 的消息

修改现有服务代码以添加支持 Pub/Sub 所需的代码。您的服务必须从请求中提取消息并返回预期的成功代码。以下所选语言(您可以使用任何语言)的代码段显示如何针对简单的“Hello World”消息执行此操作:

Node.js

app.post('/', (req, res) => {
  if (!req.body) {
    const msg = 'no Pub/Sub message received';
    console.error(`error: ${msg}`);
    res.status(400).send(`Bad Request: ${msg}`);
    return;
  }
  if (!req.body.message) {
    const msg = 'invalid Pub/Sub message format';
    console.error(`error: ${msg}`);
    res.status(400).send(`Bad Request: ${msg}`);
    return;
  }

  const pubSubMessage = req.body.message;
  const name = pubSubMessage.data
    ? Buffer.from(pubSubMessage.data, 'base64').toString().trim()
    : 'World';

  console.log(`Hello ${name}!`);
  res.status(204).send();
});

Python

@app.route("/", methods=["POST"])
def index():
    """Receive and parse Pub/Sub messages."""
    envelope = request.get_json()
    if not envelope:
        msg = "no Pub/Sub message received"
        print(f"error: {msg}")
        return f"Bad Request: {msg}", 400

    if not isinstance(envelope, dict) or "message" not in envelope:
        msg = "invalid Pub/Sub message format"
        print(f"error: {msg}")
        return f"Bad Request: {msg}", 400

    pubsub_message = envelope["message"]

    name = "World"
    if isinstance(pubsub_message, dict) and "data" in pubsub_message:
        name = base64.b64decode(pubsub_message["data"]).decode("utf-8").strip()

    print(f"Hello {name}!")

    return ("", 204)

Go


// PubSubMessage is the payload of a Pub/Sub event.
// See the documentation for more details:
// https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage
type PubSubMessage struct {
	Message struct {
		Data []byte `json:"data,omitempty"`
		ID   string `json:"id"`
	} `json:"message"`
	Subscription string `json:"subscription"`
}

// HelloPubSub receives and processes a Pub/Sub push message.
func HelloPubSub(w http.ResponseWriter, r *http.Request) {
	var m PubSubMessage
	body, err := ioutil.ReadAll(r.Body)
	if err != nil {
		log.Printf("ioutil.ReadAll: %v", err)
		http.Error(w, "Bad Request", http.StatusBadRequest)
		return
	}
	// byte slice unmarshalling handles base64 decoding.
	if err := json.Unmarshal(body, &m); err != nil {
		log.Printf("json.Unmarshal: %v", err)
		http.Error(w, "Bad Request", http.StatusBadRequest)
		return
	}

	name := string(m.Message.Data)
	if name == "" {
		name = "World"
	}
	log.Printf("Hello %s!", name)
}

Java

import com.example.cloudrun.Body;
import java.util.Base64;
import org.apache.commons.lang3.StringUtils;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

// PubsubController consumes a Pub/Sub message.
@RestController
public class PubSubController {
  @RequestMapping(value = "/", method = RequestMethod.POST)
  public ResponseEntity<String> receiveMessage(@RequestBody Body body) {
    // Get PubSub message from request body.
    Body.Message message = body.getMessage();
    if (message == null) {
      String msg = "Bad Request: invalid Pub/Sub message format";
      System.out.println(msg);
      return new ResponseEntity<>(msg, HttpStatus.BAD_REQUEST);
    }

    String data = message.getData();
    String target =
        !StringUtils.isEmpty(data) ? new String(Base64.getDecoder().decode(data)) : "World";
    String msg = "Hello " + target + "!";

    System.out.println(msg);
    return new ResponseEntity<>(msg, HttpStatus.OK);
  }
}

您必须对该服务进行编码,使其返回准确的 HTTP 响应代码。如果返回成功代码(例如 HTTP 200204),则指示 Pub/Sub 消息处理已完成。如果返回错误代码(如 HTTP 400500),则指示将重试该消息(请参阅通过推送方式接收消息

构建您的 Cloud Run 服务,然后使用上述 Pub/Sub 代码更新该服务,最后部署该服务。

为订阅创建服务账号

您需要创建一个服务账号来与您的 Pub/Sub 订阅相关联,并为其授予调用 Cloud Run 服务的权限。推送到您的 Cloud Run 服务的 Pub/Sub 消息将采用此服务账号的身份。

您可以使用现有服务账号来表示 Pub/Sub 订阅身份,也可以创建新账号。

要创建新服务账号并向其授予调用 Cloud Run 服务的权限,请执行以下操作:

控制台

  1. 在 Google Cloud 控制台中,转到服务账号页面。

    转到“服务账号”

  2. 选择一个项目。

  3. 输入要在 Google Cloud 控制台中显示的服务账号名称。

    Google Cloud 控制台会根据此名称生成服务账号 ID。如有必要,请修改 ID。此 ID 创建后便无法更改。

  4. 可选:输入服务账号的说明。

  5. 点击创建并继续

  6. 可选:点击选择角色字段。

  7. 选择 Cloud Run > Cloud Run Invoker

  8. 点击完成

命令行

  1. 创建服务账号:

    gcloud iam service-accounts create SERVICE_ACCOUNT_NAME \
       --display-name "DISPLAYED_SERVICE_ACCOUNT_NAME"

    注意替换如下内容:

    • SERVICE_ACCOUNT_NAME 替换为 Google Cloud 项目中唯一的小写名称,例如 my-invoker-service-account-name
    • DISPLAYED_SERVICE_ACCOUNT_NAME 替换为您要在控制台等界面中为此服务账号显示的名称,例如 My Invoker Service Account
  2. 对于 Cloud Run,请向您的服务账号授予调用服务的权限:

    gcloud run services add-iam-policy-binding SERVICE \
       --member=serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com \
       --role=roles/run.invoker

    替换

    • SERVICE 替换为您希望 Pub/Sub 调用的服务的名称。
    • SERVICE_ACCOUNT_NAME 替换为服务账号的名称。
    • PROJECT_ID 替换为 Google Cloud 项目 ID。
  3. 向服务账号授予对项目的访问权限,以便其能够完成对项目中资源的特定操作:

    gcloud projects add-iam-policy-binding RESOURCE_ID \
       --member=PRINCIPAL --role=roles/run.invoker

    替换

    • RESOURCE_ID:您的 Google Cloud 项目 ID。

    • PRINCIPAL:主账号(成员)的标识符,通常具有以下格式:PRINCIPAL_TYPE:ID,例如 user:my-user@example.com。如需查看 PRINCIPAL 可以采用的值的完整列表,请参阅政策绑定参考文档

Terraform

如需了解如何应用或移除 Terraform 配置,请参阅基本 Terraform 命令

  1. 如需创建服务账号,请将以下内容添加到现有 .tf 文件中:

    resource "google_service_account" "sa" {
      account_id   = "cloud-run-pubsub-invoker"
      display_name = "Cloud Run Pub/Sub Invoker"
    }
  2. 如需向您的服务账号授予调用服务的权限,请将以下内容添加到现有 .tf 文件中:

    resource "google_cloud_run_service_iam_binding" "binding" {
      location = google_cloud_run_v2_service.default.location
      service  = google_cloud_run_v2_service.default.name
      role     = "roles/run.invoker"
      members  = ["serviceAccount:${google_service_account.sa.email}"]
    }

创建 Pub/Sub 主题

对您的服务的请求由发布到 Pub/Sub 主题的消息触发,因此您需要创建一个主题:

控制台

  1. 访问 Google Cloud 控制台中的 Pub/Sub 主题页面。

    Pub/Sub 主题页面

  2. 点击创建主题

  3. 为您的主题输入一个独一无二的名称,例如 MyTopic。

命令行

gcloud pubsub topics create TOPIC-NAME

TOPIC-NAME 替换为 Google Cloud 项目中唯一的主题名称。

Terraform

如需了解如何应用或移除 Terraform 配置,请参阅基本 Terraform 命令

本部分介绍如何使用 TerraformGoogle Cloud Platform 提供商提供的 google_pubsub_topic 资源在 Terraform 配置中定义服务

将以下内容添加到现有的 .tf 文件中:

resource "google_pubsub_topic" "default" {
  name = "pubsub_topic"
}

创建推送订阅并将其与服务账号相关联

创建 Pub/Sub 主题之后,您必须订阅服务以接收发送到主题的消息,并且必须将订阅与您为服务创建的服务账号相关联。 您可以使用 Google Cloud 控制台或 gcloud 命令行:

控制台

  1. 转到 Pub/Sub 主题页面。

    Pub/Sub 主题页面

  2. 点击您要订阅的主题。

  3. 点击创建订阅以显示订阅表单:

    订阅表单

    在此表单中执行以下操作:

    1. 指定推送传送类型。
    2. 对于 Endpoints 网址,请指定您的服务网址,该网址显示在服务详情页面中。
    3. 在“服务账号”下拉列表中,选择您创建的具有所需权限的服务账号
    4. 设置订阅到期时间和确认时限(600 秒)。
    5. 点击创建
  4. 订阅已完成。发布到该主题的消息现在会推送到您的服务中。

命令行

  1. 允许 Pub/Sub 在您的项目中创建身份验证令牌:

    gcloud projects add-iam-policy-binding PROJECT-ID \
         --member=serviceAccount:service-PROJECT-NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com \
         --role=roles/iam.serviceAccountTokenCreator

    替换

    • PROJECT-ID 替换为您的 Google Cloud 项目 ID。
    • PROJECT-NUMBER 替换为您的 Google Cloud 项目编号。

      Google Cloud 控制台中与您项目对应的“项目信息”面板中列出了项目 ID 和项目编号。

  2. 使用您创建的具有所需权限的服务账号创建 Pub/Sub 订阅:

    gcloud pubsub subscriptions create SUBSCRIPTION-ID --topic TOPIC-NAME \
       --ack-deadline=600 \
       --push-endpoint=SERVICE-URL/ \
       --push-auth-service-account=SERVICE-ACCOUNT-NAME@PROJECT-ID.iam.gserviceaccount.com

    替换

    • TOPIC-NAME 替换为之前创建的主题。
    • SERVICE-URL 替换为您在部署服务时提供的 HTTPS 网址。您可以使用 gcloud run services describe 命令并指定服务名称找到该网址:查找以 domain 开头的返回行。
    • PROJECT-ID 替换为您的 Google Cloud 项目 ID。

    --push-auth-service-account 标志激活 Pub/Sub 推送功能,以进行身份验证和授权

    请注意,确认时限设置为上限(600 秒)。

  3. 订阅已完成。发布到该主题的消息现在会推送到您的服务中。您可以使用以下命令将测试消息推送到主题:

    gcloud pubsub topics publish TOPIC --message "hello"

    TOPIC 替换为您创建的主题的名称

Terraform

如需了解如何应用或移除 Terraform 配置,请参阅基本 Terraform 命令

  1. 允许 Pub/Sub 在您的项目中创建身份验证令牌。请将以下内容添加到 .tf 文件:

    resource "google_project_service_identity" "pubsub_agent" {
      provider = google-beta
      project  = data.google_project.project.project_id
      service  = "pubsub.googleapis.com"
    }
    
    resource "google_project_iam_binding" "project_token_creator" {
      project = data.google_project.project.project_id
      role    = "roles/iam.serviceAccountTokenCreator"
      members = ["serviceAccount:${google_project_service_identity.pubsub_agent.email}"]
    }
  2. 使用您创建的具有所需权限的服务账号创建 Pub/Sub 订阅。将以下内容添加到 .tf 文件:

    resource "google_pubsub_subscription" "subscription" {
      name  = "pubsub_subscription"
      topic = google_pubsub_topic.default.name
      push_config {
        push_endpoint = google_cloud_run_v2_service.default.uri
        oidc_token {
          service_account_email = google_service_account.sa.email
        }
        attributes = {
          x-goog-version = "v1"
        }
      }
      depends_on = [google_cloud_run_v2_service.default]
    }
  3. 订阅已完成。发布到该主题的消息现在会推送到您的服务中。您可以使用以下命令将测试消息推送到主题:

    gcloud pubsub topics publish TOPIC --message "hello"

    TOPIC 替换为您创建的主题的名称

后续步骤