创建 Cloud Storage 订阅

本文档介绍如何创建 Cloud Storage 订阅。 您可以使用 Google Cloud 控制台、Google Cloud CLI、客户端库 或使用 Pub/Sub API 创建 Cloud Storage 订阅。

准备工作

在阅读本文档之前,请确保您熟悉以下内容:

所需的角色和权限

以下是有关角色和权限的一系列指南:

  • 如需创建订阅,您必须在项目中配置访问权限控制 。

  • 如果您的订阅和主题,则还需要资源级权限 位于不同的项目中,如本部分稍后所述。

  • 要创建 Cloud Storage 订阅, Pub/Sub 服务账号必须具有写入 特定 Cloud Storage 存储桶以及读取存储桶元数据。有关 有关如何授予这些权限的信息,请参见 部分。

如需获取创建 Cloud Storage 订阅所需的权限, 请让管理员向您授予 项目的 Pub/Sub Editor (roles/pubsub.editor) IAM 角色。 如需详细了解如何授予角色,请参阅管理访问权限

此预定义角色包含 创建 Cloud Storage 订阅所需的权限。如需查看所需的确切权限,请展开所需权限部分:

所需权限

如需创建 Cloud Storage 订阅,您需要具备以下权限:

  • 创建订阅: pubsub.subscriptions.create
  • 将订阅附加到主题: pubsub.topics.attachSubscription
  • 从订阅中拉取: pubsub.subscriptions.consume
  • 获取订阅: pubsub.subscriptions.get
  • 列出订阅: pubsub.subscriptions.list
  • 更新订阅: pubsub.subscriptions.update
  • 删除订阅: pubsub.subscriptions.delete
  • 获取订阅的 IAM 政策: pubsub.subscriptions.getIamPolicy
  • 为订阅配置 IAM 政策 pubsub.subscriptions.setIamPolicy

您也可以使用自定义角色或其他预定义角色来获取这些权限。

如果您需要创建 Cloud Storage 一个项目中的订阅与另一个项目中的主题关联 项目,请让主题管理员也授予您 Pub/Sub Editor (roles/pubsub.editor) IAM 角色。

为 Pub/Sub 服务账号分配 Cloud Storage 角色

某些 Google Cloud 服务具有由 Google Cloud 代管式服务账号, 可让服务访问您的资源这些服务账号称为 服务代理Pub/Sub 创建和维护服务 为每个项目创建账号,格式为 service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com.

要创建 Cloud Storage 订阅,Pub/Sub 服务账号必须拥有 特定 Cloud Storage 存储桶以及读取存储桶元数据。选择 以下过程之一:

  • 在存储桶级别授予权限。在特定 Cloud Storage 中 授予 Storage Object Creator (roles/storage.objectCreator) 角色 和 Storage Legacy Bucket Reader (roles/storage.legacyBucketReader) 角色 Pub/Sub 服务账号。

  • 如果您必须在项目级别授予角色,则可以改为授予 对项目包含的项目的 Storage Admin (roles/storage.admin) 角色 Cloud Storage 存储桶。将此角色授予 Pub/Sub 服务账号。

存储分区权限

如需授予 Storage Object Creator,请执行以下步骤 (roles/storage.objectCreator) 和 Storage 旧版 Bucket Reader (roles/storage.legacyBucketReader) 存储桶级角色:

  1. 在 Google Cloud 控制台中,转到 Cloud Storage 页面。

    转到 Cloud Storage

  2. 点击要向其中写入消息的 Cloud Storage 存储桶。

    系统随即会打开存储分区详情页面。

  3. 存储桶详情页面中,点击配置标签页。

  4. 权限 >按主账号查看标签页中,点击授予访问权限

    系统会打开授予访问权限页面。

  5. 添加主账号部分中,输入 Pub/Sub 的名称。 服务账号。

    服务账号的格式为 service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com. 例如,对于 PROJECT_NUMBER=112233445566 的项目, 服务账号的格式为 service-112233445566@gcp-sa-pubsub.iam.gserviceaccount.com

  6. 分配角色中 >选择角色下拉菜单中, 输入 Creator,然后选择 Storage Object Creator 角色。

  7. 点击添加其他角色

  8. 选择角色下拉菜单中,输入 Bucket Reader。 然后选择 Storage Legacy Bucket Reader 角色。

  9. 点击保存

项目权限

如需授予 Storage Admin,请执行以下步骤 (roles/storage.admin) 角色:

  1. 在 Google Cloud 控制台中,转到 IAM 页面。

    转到 IAM

  2. 权限 >按主账号查看标签页中,点击授予访问权限

    系统会打开授予访问权限页面。

  3. 添加主账号部分中,输入 Pub/Sub 的名称。 服务账号。

    服务账号的格式为 service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com. 例如,对于 PROJECT_NUMBER=112233445566 的项目, 服务账号的格式为 service-112233445566@gcp-sa-pubsub.iam.gserviceaccount.com

  4. 分配角色中 >选择角色下拉菜单中, 输入 Storage Admin,然后选择 Storage Admin 角色。

  5. 点击保存

如需详细了解 Cloud Storage IAM 请参阅 Cloud Storage Identity and Access Management

Cloud Storage 订阅属性

配置 Cloud Storage 订阅时,您必须指定 所有订阅类型的通用属性,以及一些 Cloud Storage 订阅专用属性。

常见订阅属性

了解常见的订阅属性 您可以为所有订阅设置此折扣

存储桶名称

在创建存储分区之前,Cloud Storage 存储桶必须已经存在 Cloud Storage 订阅。

这些消息批量发送并存储在 Cloud Storage 存储桶中。 单个批次或文件存储为对象 存储桶中的资源。

Cloud Storage 存储桶必须具有 已停用请求者付款

如需创建 Cloud Storage 存储桶,请参阅 创建存储分区

文件名前缀、后缀和日期时间

Cloud Storage 生成的 Cloud Storage 输出文件 订阅作为对象存储在 Cloud Storage 存储桶中。名称 存储在 Cloud Storage 存储桶中的对象具有以下特征: 格式:<file-prefix><UTC-date-time>_<uuid><file-suffix>

以下列表详细说明了文件格式以及 您可以自定义:

  • <file-prefix> 是自定义文件名前缀。这是一个可选字段。

  • <UTC-date-time>”是基于时间自动生成的可自定义字符串 创建对象。

  • <uuid> 是系统自动生成的随机对象字符串。

  • <file-suffix> 是自定义文件名后缀。这是一个可选字段。通过 文件名后缀不能以“/”结尾。

  • 您可以更改文件名前缀和后缀:

    • 例如,如果文件名前缀的值为 prod_, 文件名后缀为 _archive,示例对象名称为 prod_2023-09-25T04:10:00+00:00_uN1QuE_archive.

    • 如果您未指定文件名前缀和后缀,则存储的对象名称 采用以下格式: <UTC-date-time>_<uuid>.

    • Cloud Storage 对象命名要求也适用于文件名 前缀和后缀。如需了解详情,请参阅 Cloud Storage 对象简介

  • 您可以更改文件名中日期和时间的显示方式:

    • 只能使用一次的日期时间匹配器:年份(YYYYYY)、月 (MM)、日 (DD)、小时 (hh)、分钟 (mm)、秒 (ss)。 例如,YY-YYYYMMM 无效。

    • 只能使用一次的可选匹配器:日期时间分隔符 (T) 和 和时区偏移量(Z+00:00)。

    • 可多次使用的可选元素:连字符 (-)、 下划线 (_)、冒号 (:) 和正斜杠 (/)。

    • 例如,如果文件名日期时间格式的值是 YYYY-MM-DD/hh_mm_ssZ,示例对象名称为 prod_2023-09-25/04_10_00Z_uNiQuE_archive.

    • 如果文件名日期时间格式以非匹配器字符结尾, 该字符将替换 <UTC-date-time><uuid>.例如,如果文件名日期时间格式的值是 YYYY-MM-DDThh_mm_ss-,示例对象名称为 prod_2023-09-25T04_10_00-uNiQuE_archive.

文件批处理

Cloud Storage 订阅可让您决定何时创建 作为对象存储在 Cloud Storage 中的新输出文件 存储桶。当出现以下任一情况时,Pub/Sub 会写入输出文件 满足指定的批处理条件。以下是 Cloud Storage 批处理条件:

  • 批量存储时长上限。这是一项必需设置。通过 如果出现以下情况,Cloud Storage 订阅会写入新的输出文件: 超过指定的时长上限值。如果您 指定值,则系统会应用默认值 5 分钟。 以下是适用于最长时长的值:

    • 最小值 = 1 分钟
    • 默认值 = 5 分钟
    • 最大值 = 10 分钟
  • 批量存储字节数上限。这是一项可选设置。通过 如果存在以下情况,则 Cloud Storage 订阅会写入新的输出文件: 已超过指定的最大字节数值。以下是适用的 最大字节数的值:

    • 最小值 = 1 KB
    • 最大值 = 10 GiB

例如,您可以将时长上限配置为 6 分钟, 最大字节数为 2 GB。如果在第 4 分钟时,输出文件达到 文件大小为 2 GB,Pub/Sub 将最终确定上一个文件并开始 写入新文件。

Cloud Storage 订阅可能会 Cloud Storage 存储桶。如果您已经配置了 订阅即每 6 分钟创建一个新文件,您可能会发现 系统每 6 分钟会创建多个 Cloud Storage 文件。

在某些情况下,Pub/Sub 可能会开始 文件。 如果进行订阅,文件也可能会超过“最大字节数”值 接收大于最大字节数值的消息。

文件格式

创建 Cloud Storage 订阅时,您可以指定 要在容器内存储的输出文件的格式 Cloud Storage 存储桶,以 TextAvro 格式表示。

  • 文本:消息以纯文本形式存储。换行符 用于将一封邮件与文件中的前一封邮件分开。仅附加短信信息 存储的是载荷,而不是属性或其他元数据。

  • Avro:消息存储在 Apache Avro 二进制文件格式。如果选择 Avro,您还可以启用以下额外属性:

    • 写入元数据:选择此选项后,您可以随邮件一起存储邮件元数据。subscription_namemessage_idpublish_timeattributes 字段等元数据会写入输出 Avro 对象中的顶级字段,而数据以外的其他所有其他消息属性(例如,order_key,如果存在)会作为条目添加到 attributes 映射中。

      如果停用写入元数据,则只有消息载荷会写入输出 Avro 对象。以下是已停用写入元数据的输出消息的 Avro 架构:

      {
        "type": "record",
        "namespace": "com.google.pubsub",
        "name": "PubsubMessage",
        "fields": [
          { "name": "data", "type": "bytes" }
        ]
      }
      

      以下是启用了写入元数据的输出消息的 Avro 架构:

      {
        "type": "record",
        "namespace": "com.google.pubsub",
        "name": "PubsubMessageWithMetadata",
        "fields": [
          { "name": "subscription_name", "type": "string" },
          { "name": "message_id", "type": "string"  },
          { "name": "publish_time", "type": {
              "type": "long",
              "logicalType": "timestamp-micros"
            }
          },
          { "name": "attributes", "type": { "type": "map", "values": "string" } },
          { "name": "data", "type": "bytes" }
        ]
      }
      
    • 使用主题架构:此选项可让 Pub/Sub 使用 Cloud Pub/Sub 主题的架构 订阅会附加在写入 Avro 文件时。

      使用此选项时,请务必检查以下额外要求:

      • 主题架构必须采用 Apache Avro 格式

      • 如果使用主题架构写入元数据都已启用,则主题架构的根目录中必须有一个 Record 对象。Pub/Sub 将展开记录的字段列表以包含元数据字段。因此,该记录不能包含与元数据字段(subscription_namemessage_idpublish_timeattributes)同名的任何字段。

创建 Cloud Storage 订阅

控制台

  1. 在 Google Cloud 控制台中,前往订阅 页面。

    前往“订阅”页面

  2. 点击创建订阅

  3. 对于订阅 ID 字段,输入名称。

    有关如何命名订阅的信息,请参阅主题命名准则或 订阅

  4. 从下拉菜单中选择或创建一个主题。

    订阅将接收来自该主题的消息。

    如需了解如何创建主题,请参阅创建和管理主题

  5. 传送类型选为写入 Cloud Storage

  6. 对于 Cloud Storage 存储桶,点击浏览

    • 您可以从任何适当的项目中选择现有存储桶。

    • 您也可以点击“创建”图标,然后按照 创建新存储桶的界面

      创建存储桶后, Cloud Storage 订阅。

      如需详细了解如何创建存储桶,请参阅创建存储桶

    指定存储桶时,Pub/Sub 会检查 拥有适当的权限 服务账号。如果存在权限问题,您会看到一条消息 类似于 Unable to verify if the Pub/Sub service agent has write permissions on this bucket. You may be lacking permissions to view or set permissions

  7. 如果您遇到权限问题,请点击设置权限,然后 按照屏幕上的说明操作。

    或者,请按照分配 Cloud Storage 角色中的说明操作 Pub/Sub 服务账号

  8. 文件格式部分,选择文本Avro 格式表示排序依据。

    如果选择 Avro,您还可以视需要指定 您希望在输出中存储消息元数据。

    如需详细了解这两个选项(包括消息) 元数据选项,请参阅文件 格式

  9. 可选:您可以指定文件名前缀、后缀和 日期时间。 Cloud Storage 存储桶。文件作为对象存储在存储桶中。

    详细了解如何设置文件前缀、后缀和 日期时间,请参阅文件名前缀、后缀和 日期时间

  10. 文件批处理部分,指定经过的最长时间 然后再创建新文件

    您还可以选择设置文件的大小上限。

    如需详细了解这两种文件批处理选项,请参阅文件批处理

  11. 我们强烈建议您启用死信 以处理消息故障

    如需了解详情,请参阅死信 主题

  12. 您可以将其他设置保留为默认值,然后点击 创建

gcloud

  1. 在 Google Cloud 控制台中,激活 Cloud Shell。

    激活 Cloud Shell

    Cloud Shell 会话随即会在 Google Cloud 控制台的底部启动,并显示命令行提示符。Cloud Shell 是一个已安装 Google Cloud CLI 且已为当前项目设置值的 Shell 环境。该会话可能需要几秒钟时间来完成初始化。

  2. 要创建 Cloud Storage 订阅,请运行 gcloud pubsub subscriptions create 命令
    gcloud pubsub subscriptions create SUBSCRIPTION_ID \
        --topic=TOPIC_ID \
        --cloud-storage-bucket=BUCKET_NAME \
        --cloud-storage-file-prefix=CLOUD_STORAGE_FILE_PREFIX \
        --cloud-storage-file-suffix=CLOUD_STORAGE_FILE_SUFFIX \
        --cloud-storage-file-datetime-format=CLOUD_STORAGE_FILE_DATETIME_FORMAT \
        --cloud-storage-max-bytes=CLOUD_STORAGE_MAX_BYTES \
        --cloud-storage-max-duration=CLOUD_STORAGE_MAX_DURATION \
        --cloud-storage-output-format=CLOUD_STORAGE_OUTPUT_FORMAT \
        --cloud-storage-write-metadata

    在此命令中,只有 SUBSCRIPTION_ID,即 --topic 标志,以及 --cloud-storage-bucket 标志为必填项。其余标志是可选的,可以省略。

    替换以下内容:

    • SUBSCRIPTION_ID:新密钥的名称或 ID Cloud Storage 订阅。
    • TOPIC_ID:主题的名称或 ID。
    • BUCKET_NAME:指定 现有存储桶例如 prod_bucket。存储桶 名称中不得包含项目 ID。如需创建存储桶,请参阅创建存储桶
    • CLOUD_STORAGE_FILE_PREFIX:指定 Cloud Storage 文件名的前缀。例如 log_events_
    • CLOUD_STORAGE_FILE_SUFFIX:指定 为 Cloud Storage 文件名添加后缀。例如 .txt
    • CLOUD_STORAGE_FILE_DATETIME_FORMAT: 指定 Cloud Storage 文件名的日期时间格式。 例如 YYYY-MM-DD/hh_mm_ssZ
    • CLOUD_STORAGE_MAX_BYTES:最大字节数 可在创建新文件之前写入 Cloud Storage 文件 。该值必须介于 1 KB 到 10 GB 之间。例如 20MB
    • CLOUD_STORAGE_MAX_DURATION:最大值 指定新的 Cloud Storage 文件 创建。该值必须介于 1 米到 10 米之间。例如 5m
    • CLOUD_STORAGE_OUTPUT_FORMAT:输出 写入 Cloud Storage 的数据的格式。值包括 :
      • text:消息以原始文本的形式编写,分开式 换行。
      • avro:消息作为 Avro 二进制文件编写。 --cloud-storage-write-metadata 仅对 输出格式为 avro 的订阅。

C++

在试用此示例之前,请按照C++ Pub/Sub 快速入门: 客户端库。 有关详情,请参阅 Pub/Sub C++ API 参考文档

如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::SubscriptionAdminClient client,
   std::string const& project_id, std::string const& topic_id,
   std::string const& subscription_id, std::string const& bucket) {
  google::pubsub::v1::Subscription request;
  request.set_name(
      pubsub::Subscription(project_id, subscription_id).FullName());
  request.set_topic(pubsub::Topic(project_id, topic_id).FullName());
  request.mutable_cloud_storage_config()->set_bucket(bucket);
  auto sub = client.CreateSubscription(request);
  if (!sub) {
    if (sub.status().code() == google::cloud::StatusCode::kAlreadyExists) {
      std::cout << "The subscription already exists\n";
      return;
    }
    throw std::move(sub).status();
  }

  std::cout << "The subscription was successfully created: "
            << sub->DebugString() << "\n";
}

Go

在试用此示例之前,请按照Go Pub/Sub 快速入门: 客户端库。 有关详情,请参阅 Pub/Sub Go API 参考文档

如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证

import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/pubsub"
)

// createCloudStorageSubscription creates a Pub/Sub subscription that exports messages to Cloud Storage.
func createCloudStorageSubscription(w io.Writer, projectID, subID string, topic *pubsub.Topic, bucket string) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	// topic of type https://godoc.org/cloud.google.com/go/pubsub#Topic
	// note bucket should not have the gs:// prefix
	// bucket := "my-bucket"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	sub, err := client.CreateSubscription(ctx, subID, pubsub.SubscriptionConfig{
		Topic: topic,
		CloudStorageConfig: pubsub.CloudStorageConfig{
			Bucket:         bucket,
			FilenamePrefix: "log_events_",
			FilenameSuffix: ".avro",
			OutputFormat:   &pubsub.CloudStorageOutputFormatAvroConfig{WriteMetadata: true},
			MaxDuration:    1 * time.Minute,
			MaxBytes:       1e8,
		},
	})
	if err != nil {
		return fmt.Errorf("client.CreateSubscription: %w", err)
	}
	fmt.Fprintf(w, "Created Cloud Storage subscription: %v\n", sub)

	return nil
}

Java

在试用此示例之前,请按照Java Pub/Sub 快速入门: 客户端库。 有关详情,请参阅 Pub/Sub Java API 参考文档

如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证

import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.protobuf.Duration;
import com.google.pubsub.v1.CloudStorageConfig;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.Subscription;
import java.io.IOException;

public class CreateCloudStorageSubscriptionExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    String subscriptionId = "your-subscription-id";
    String bucket = "your-bucket";
    String filenamePrefix = "log_events_";
    String filenameSuffix = ".text";
    Duration maxDuration = Duration.newBuilder().setSeconds(300).build();

    createCloudStorageSubscription(
        projectId, topicId, subscriptionId, bucket, filenamePrefix, filenameSuffix, maxDuration);
  }

  public static void createCloudStorageSubscription(
      String projectId,
      String topicId,
      String subscriptionId,
      String bucket,
      String filenamePrefix,
      String filenameSuffix,
      Duration maxDuration)
      throws IOException {
    try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {

      ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId);
      ProjectSubscriptionName subscriptionName =
          ProjectSubscriptionName.of(projectId, subscriptionId);

      CloudStorageConfig cloudStorageConfig =
          CloudStorageConfig.newBuilder()
              .setBucket(bucket)
              .setFilenamePrefix(filenamePrefix)
              .setFilenameSuffix(filenameSuffix)
              .setMaxDuration(maxDuration)
              .build();

      Subscription subscription =
          subscriptionAdminClient.createSubscription(
              Subscription.newBuilder()
                  .setName(subscriptionName.toString())
                  .setTopic(topicName.toString())
                  .setCloudStorageConfig(cloudStorageConfig)
                  .build());

      System.out.println("Created a CloudStorage subscription: " + subscription.getAllFields());
    }
  }
}

监控订阅

Cloud Monitoring 提供了多种指标来监控订阅

您还可以在 Pub/Sub 中监控订阅。

后续步骤