创建 BigQuery 订阅

本文档介绍了如何创建 BigQuery 订阅。 您可以使用 Google Cloud 控制台、Google Cloud CLI、客户端库 或使用 Pub/Sub API 创建 BigQuery 订阅。

准备工作

在阅读本文档之前,请确保您熟悉以下内容:

除了熟悉 Pub/Sub BigQuery,请确保您满足以下前提条件 在创建 BigQuery 订阅之前:

  • BigQuery 表已存在。或者,您可以创建一个 请按照下方说明创建 BigQuery 订阅, 请参阅本文档后面的部分。

  • Pub/Sub 主题架构与 BigQuery 表架构之间的兼容性。如果您添加不兼容的 您收到与兼容性相关的错误 消息。如需了解详情,请参阅架构兼容性

所需的角色和权限

以下是有关角色和权限的一系列指南:

  • 如需创建订阅,您必须在项目中配置访问权限控制 。

  • 如果您的订阅和主题位于不同的项目中,您还需要具备资源级权限,如本部分后面所述。

  • 要创建 BigQuery 订阅, Pub/Sub 服务账号必须具有写入 特定的 BigQuery 表如需详细了解如何 具体请参阅本文档的下一部分。

  • 您可以在项目中配置 BigQuery 订阅 才能写入其他项目中的 BigQuery 表。

如需获取创建 BigQuery 订阅所需的权限, 请让管理员授予您 项目的 Pub/Sub Editor (roles/pubsub.editor) IAM 角色。 如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

此预定义角色包含 创建 BigQuery 订阅所需的权限。如需查看所需的确切权限,请展开所需权限部分:

所需权限

创建 BigQuery 订阅需要以下权限:

  • 从订阅中拉取: pubsub.subscriptions.consume
  • 创建订阅: pubsub.subscriptions.create
  • 删除订阅: pubsub.subscriptions.delete
  • 获取订阅: pubsub.subscriptions.get
  • 列出订阅: pubsub.subscriptions.list
  • 更新订阅: pubsub.subscriptions.update
  • 将订阅附加到主题: pubsub.topics.attachSubscription
  • 获取订阅的 IAM 政策: pubsub.subscriptions.getIamPolicy
  • 为订阅配置 IAM 政策 pubsub.subscriptions.setIamPolicy

您也可以使用自定义角色或其他预定义角色来获取这些权限。

如果您需要创建 BigQuery 一个项目中的订阅与另一个项目中的主题关联 项目,请让主题管理员也授予您 Pub/Sub Editor (roles/pubsub.editor) IAM 角色。

为 Pub/Sub 服务账号分配 BigQuery 角色

某些 Google Cloud 服务具有由 Google Cloud 代管式服务账号, 服务访问您的资源。这些服务账号称为服务代理。Pub/Sub 会创建并维护 为每个项目创建服务账号,格式为 service-project-number@gcp-sa-pubsub.iam.gserviceaccount.com

为了创建 BigQuery 订阅,Pub/Sub 服务账号必须拥有写入权限 并读取表元数据。

授予 BigQuery 数据编辑者 (roles/bigquery.dataEditor) 授予 Pub/Sub 服务账号的权限。

  1. 在 Google Cloud 控制台中,转到 IAM 页面。

    转到 IAM

  2. 点击授予访问权限

  3. 添加主账号部分,输入您的 Pub/Sub 服务账号的名称。服务账号的格式为 service-project-number@gcp-sa-pubsub.iam.gserviceaccount.com。例如,对于具有 project-number=112233445566 的项目,服务账号的格式为 service-112233445566@gcp-sa-pubsub.iam.gserviceaccount.com

  4. 分配角色部分中,点击添加其他角色

  5. 选择角色下拉菜单中,输入 BigQuery。 然后选择 BigQuery Data Editor 角色

  6. 点击保存

如需详细了解 BigQuery IAM,请参阅 BigQuery 角色和权限

BigQuery 订阅属性

配置 BigQuery 订阅时,您可以指定以下内容 属性。

通用属性

了解常见的订阅属性 您可以为所有订阅设置此折扣

使用主题架构

此选项允许 Pub/Sub 使用架构 Pub/Sub 主题 已附加订阅。此外,Pub/Sub 将消息中的字段写入相应的 BigQuery 表中的列。

使用此选项时,请务必检查以下额外要求:

  • 主题架构和 BigQuery 架构中的字段 必须具有相同的名称,并且其类型必须相互兼容。

  • 主题架构中的任何选填字段也必须为 在 BigQuery 架构中是可选属性。

  • 主题架构中的必填字段不必是 BigQuery 架构中必需的参数。

  • 如果存在 BigQuery 字段不存在, 主题架构,这些 BigQuery 字段 必须采用 NULLABLE 模式。

  • 如果主题架构包含 不存在于 BigQuery 架构中,而 字段,请选择丢弃未知字段选项。

  • 您只能选择以下订阅属性之一:使用主题架构使用表架构

如果您未选择使用主题架构使用表架构选项,请确保 BigQuery 表中有一个名为 data 且类型为 BYTESSTRINGJSON 的列。Pub/Sub 会将消息写入到 此 BigQuery 列。

您可能看不到 Pub/Sub 主题架构更改或 BigQuery 表架构会在消息中立即生效 写入 BigQuery 表。例如,如果拖放 未知字段选项,且 Pub/Sub 架构(而非 BigQuery 架构) 写入 BigQuery 表的消息可能仍然不包含 该字段添加到 BigQuery 架构中。最终, 架构同步,后续消息包含 字段。

对 BigQuery 使用使用主题架构选项时 还可以利用 BigQuery 变更 数据捕获 (CDC)。CDC 通过以下方式更新 BigQuery 表: 处理更改并将其应用到现有行。

如需详细了解此功能,请参阅使用变更数据捕获来流式插入表更新

如需了解如何将此功能与 BigQuery 订阅搭配使用, 请参阅 BigQuery 变更数据捕获

使用表架构

通过此选项,Pub/Sub 可以使用 用于写入 JSON 字段的 BigQuery 表 消息发送到相应的列。使用此选项时,请务必查看以下其他要求:

  • 已发布的消息必须采用 JSON 格式。

  • 支持以下 JSON 转换:

    JSON 类型 BigQuery 数据类型
    string NUMERICBIGNUMERICDATETIMEDATETIMETIMESTAMP
    number NUMERICBIGNUMERICDATETIMEDATETIMETIMESTAMP
    • 使用 number 进行 DATEDATETIMETIMETIMESTAMP 转换时,数值必须符合支持的表示法
    • 使用 number 转换为 NUMERICBIGNUMERIC 时,值的精度和范围仅限于 IEEE 754 浮点算术标准接受的值。如果您需要高精确度或范围更广泛的值,请改用 stringNUMERICBIGNUMERIC 转换。
    • 使用 string 进行 NUMERICBIGNUMERIC 转换时,Pub/Sub 会假定字符串是人类可读的数字(例如 "123.124")。如果将字符串作为人类可读的数字处理失败,则 Pub/Sub 会将字符串视为使用 BigDecimalByteStringEncoder 编码的字节。
  • 如果订阅的主题 具有关联的架构 消息编码属性必须设置为 JSON

  • 如果存在 BigQuery 字段不存在, 消息,这些 BigQuery 字段必须采用 NULLABLE 模式。

  • 如果邮件中包含 可以删除 BigQuery 架构和这些字段,请选择 选项丢弃未知字段

  • 您只能选择其中一个订阅属性,即使用主题架构使用表架构

如果您不选择使用主题架构使用表架构选项, 确保 BigQuery 表有一个名为 data 的列, 类型 BYTESSTRINGJSON。Pub/Sub 会将消息写入到 此 BigQuery 列。

您可能不会看到 BigQuery 表架构的更改 将消息写入 BigQuery 表后立即生效。 例如,如果舍弃未知字段选项处于启用状态,并且消息中存在某个字段,但该字段不存在于 BigQuery 架构中,那么将消息写入 BigQuery 表后,该字段可能仍不会包含在 BigQuery 架构中。最终, 架构同步,后续消息中会包含该字段。

针对 BigQuery 订阅使用使用表架构选项时,您需要 还可以利用 BigQuery 变更数据捕获 (CDC)。 CDC 通过处理更改并将其应用于现有表来更新 BigQuery 表 行。

如需详细了解此功能,请参阅使用变更数据捕获流式传输表更新

如需了解如何将此功能与 BigQuery 订阅搭配使用,请参阅 BigQuery 变更数据捕获

删除未知字段

此选项与使用主题架构使用表架构搭配使用 选项。此选项可让 Pub/Sub 删除主题中存在的任何字段 但不包含在 BigQuery 架构中。不含未知丢弃 字段设置,则包含额外字段的消息不会被写入到 BigQuery 并保留在订阅积压消息中。通过 订阅将处于错误状态

写入元数据

此选项可让 Pub/Sub 将每条消息的元数据写入 BigQuery 表。否则,元数据 不会写入 BigQuery 表。

如果您选择写入元数据选项,请确保 BigQuery 表包含下表中所述的字段。

如果您不选择写入元数据选项,则目标 BigQuery 表仅需要 data 字段,除非 use_topic_schema 为 true。如果您同时选择了 Write metadata使用主题架构选项,则主题的架构必须 不包含任何名称与元数据参数匹配的字段。 此限制包括这些蛇形参数的驼峰式大小写版本。

参数
subscription_name

STRING

订阅的名称。

message_id

STRING

消息的 ID

publish_time

TIMESTAMP

发布消息的时间。

data

BYTES、STRING 或 JSON

消息正文。

所有目标平台都必须填写 data 字段 未选择使用 主题架构使用表架构。如果该字段的类型为 JSON,则消息正文必须为有效的 JSON。

attributes

STRING 或 JSON

包含所有消息属性的 JSON 对象。它还 包含属于 包含排序键的 Pub/Sub 消息。 (如果存在)。

创建 BigQuery 订阅

以下示例演示了如何创建使用 BigQuery 传送功能的订阅。

控制台

  1. 在 Google Cloud 控制台中,前往 订阅页面。

    前往 订阅

  2. 点击创建订阅
  3. 对于订阅 ID 字段,输入名称。

    对于 请参阅命名准则 主题或订阅

  4. 从下拉菜单中选择或创建一个主题。订阅 从主题接收消息。
  5. 传送类型选为写入 BigQuery
  6. 为 BigQuery 表选择项目。
  7. 选择现有数据集或创建新数据集。

    相关信息 如需了解如何创建数据集,请参阅创建数据集

  8. 选择现有表或创建一个新表。

    如需了解如何创建表,请参阅创建表

  9. 我们强烈建议您启用死信以处理消息传送失败问题。

    有关详情,请参阅终止 字母主题

  10. 点击创建

您还可以从主题页面创建订阅。 此快捷方式可帮助您将主题与订阅关联。

  1. 在 Google Cloud 控制台中,前往主题页面。

    前往“主题”

  2. 点击您要查看的主题旁边的 。 以创建订阅。
  3. 从上下文菜单中,选择创建订阅
  4. 传送类型选为写入 BigQuery
  5. 为 BigQuery 表选择项目。
  6. 选择现有数据集或创建新数据集。

    相关信息 如需了解如何创建数据集,请参阅创建数据集

  7. 选择现有表或创建一个新表。

    如需了解 请参阅创建表

  8. 我们强烈建议您启用 Dead 字母以处理消息故障。

    有关详情,请参阅终止 字母主题

  9. 点击创建

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 如需创建 Pub/Sub 订阅,请使用 gcloud pubsub subscriptions create 命令:

    gcloud pubsub subscriptions create SUBSCRIPTION_ID \
        --topic=TOPIC_ID \
        --bigquery-table=PROJECT_ID:DATASET_ID.TABLE_ID

    替换以下内容:

    • SUBSCRIPTION_ID:指定 订阅。
    • TOPIC_ID:指定主题的 ID。通过 主题需要架构。
    • PROJECT_ID:指定项目的 ID。
    • DATASET_ID:指定现有数据集的 ID。要创建数据集,请参阅 创建 数据集
    • TABLE_ID:指定现有表的 ID。 如果您的主题没有架构,则表需要 data 字段。如需创建表,请参阅使用架构定义创建空表

C++

在尝试此示例之前,请按照C++ Pub/Sub 快速入门: 客户端库。 有关详情,请参阅 Pub/Sub C++ API 参考文档

如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::SubscriptionAdminClient client,
   std::string const& project_id, std::string const& topic_id,
   std::string const& subscription_id, std::string const& table_id) {
  google::pubsub::v1::Subscription request;
  request.set_name(
      pubsub::Subscription(project_id, subscription_id).FullName());
  request.set_topic(pubsub::Topic(project_id, topic_id).FullName());
  request.mutable_bigquery_config()->set_table(table_id);
  auto sub = client.CreateSubscription(request);
  if (!sub) {
    if (sub.status().code() == google::cloud::StatusCode::kAlreadyExists) {
      std::cout << "The subscription already exists\n";
      return;
    }
    throw std::move(sub).status();
  }

  std::cout << "The subscription was successfully created: "
            << sub->DebugString() << "\n";
}

C#

在尝试此示例之前,请按照C# Pub/Sub 快速入门: 客户端库。 有关详情,请参阅 Pub/Sub C# API 参考文档

如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证


using Google.Cloud.PubSub.V1;

public class CreateBigQuerySubscriptionSample
{
    public Subscription CreateBigQuerySubscription(string projectId, string topicId, string subscriptionId, string bigqueryTableId)
    {
        SubscriberServiceApiClient subscriber = SubscriberServiceApiClient.Create();
        TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);

        var subscriptionRequest = new Subscription
        {
            SubscriptionName = subscriptionName,
            TopicAsTopicName = topicName,
            BigqueryConfig = new BigQueryConfig
            {
                Table = bigqueryTableId
            }
        };
        var subscription = subscriber.CreateSubscription(subscriptionRequest);
        return subscription;
    }
}

Go

在尝试此示例之前,请按照Go Pub/Sub 快速入门: 客户端库。 如需了解详情,请参阅 Pub/Sub Go API 参考文档

如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

// createBigQuerySubscription creates a Pub/Sub subscription that exports messages to BigQuery.
func createBigQuerySubscription(w io.Writer, projectID, subID string, topic *pubsub.Topic, table string) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	// topic of type https://godoc.org/cloud.google.com/go/pubsub#Topic
	// table := "my-project-id.dataset_id.table_id"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	sub, err := client.CreateSubscription(ctx, subID, pubsub.SubscriptionConfig{
		Topic: topic,
		BigQueryConfig: pubsub.BigQueryConfig{
			Table:         table,
			WriteMetadata: true,
		},
	})
	if err != nil {
		return fmt.Errorf("client.CreateSubscription: %w", err)
	}
	fmt.Fprintf(w, "Created BigQuery subscription: %v\n", sub)

	return nil
}

Java

在尝试此示例之前,请按照Java Pub/Sub 快速入门: 客户端库。 有关详情,请参阅 Pub/Sub Java API 参考文档

如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证

import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.pubsub.v1.BigQueryConfig;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.Subscription;
import java.io.IOException;

public class CreateBigQuerySubscriptionExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    String subscriptionId = "your-subscription-id";
    String bigqueryTableId = "your-project.your-dataset.your-table";

    createBigQuerySubscription(projectId, topicId, subscriptionId, bigqueryTableId);
  }

  public static void createBigQuerySubscription(
      String projectId, String topicId, String subscriptionId, String bigqueryTableId)
      throws IOException {
    try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {

      ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId);
      ProjectSubscriptionName subscriptionName =
          ProjectSubscriptionName.of(projectId, subscriptionId);

      BigQueryConfig bigqueryConfig =
          BigQueryConfig.newBuilder().setTable(bigqueryTableId).setWriteMetadata(true).build();

      Subscription subscription =
          subscriptionAdminClient.createSubscription(
              Subscription.newBuilder()
                  .setName(subscriptionName.toString())
                  .setTopic(topicName.toString())
                  .setBigqueryConfig(bigqueryConfig)
                  .build());

      System.out.println("Created a BigQuery subscription: " + subscription.getAllFields());
    }
  }
}

Node.js

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const bigqueryTableId = 'YOUR_TABLE_ID';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createBigQuerySubscription(
  topicNameOrId,
  subscriptionNameOrId,
  bigqueryTableId
) {
  const options = {
    bigqueryConfig: {
      table: bigqueryTableId,
      writeMetadata: true,
    },
  };

  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, options);

  console.log(`Subscription ${subscriptionNameOrId} created.`);
}

Node.js

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const bigqueryTableId = 'YOUR_TABLE_ID';

// Imports the Google Cloud client library
import {PubSub, CreateSubscriptionOptions} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createBigQuerySubscription(
  topicNameOrId: string,
  subscriptionNameOrId: string,
  bigqueryTableId: string
) {
  const options: CreateSubscriptionOptions = {
    bigqueryConfig: {
      table: bigqueryTableId,
      writeMetadata: true,
    },
  };

  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, options);

  console.log(`Subscription ${subscriptionNameOrId} created.`);
}

PHP

在尝试此示例之前,请按照PHP Pub/Sub 快速入门: 客户端库。 如需了解详情,请参阅 Pub/Sub PHP API 参考文档

如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证

use Google\Cloud\PubSub\PubSubClient;
use Google\Cloud\PubSub\V1\BigQueryConfig;

/**
 * Creates a Pub/Sub BigQuery subscription.
 *
 * @param string $projectId  The Google project ID.
 * @param string $topicName  The Pub/Sub topic name.
 * @param string $subscriptionName  The Pub/Sub subscription name.
 * @param string $table      The BigQuery table to which to write.
 */
function create_bigquery_subscription($projectId, $topicName, $subscriptionName, $table)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);
    $topic = $pubsub->topic($topicName);
    $subscription = $topic->subscription($subscriptionName);
    $config = new BigQueryConfig(['table' => $table]);
    $subscription->create([
        'bigqueryConfig' => $config
    ]);

    printf('Subscription created: %s' . PHP_EOL, $subscription->name());
}

Python

在尝试此示例之前,请按照Python Pub/Sub 快速入门: 客户端库。 有关详情,请参阅 Pub/Sub Python API 参考文档

如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证

from google.cloud import pubsub_v1

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# subscription_id = "your-subscription-id"
# bigquery_table_id = "your-project.your-dataset.your-table"

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
topic_path = publisher.topic_path(project_id, topic_id)
subscription_path = subscriber.subscription_path(project_id, subscription_id)

bigquery_config = pubsub_v1.types.BigQueryConfig(
    table=bigquery_table_id, write_metadata=True
)

# Wrap the subscriber in a 'with' block to automatically call close() to
# close the underlying gRPC channel when done.
with subscriber:
    subscription = subscriber.create_subscription(
        request={
            "name": subscription_path,
            "topic": topic_path,
            "bigquery_config": bigquery_config,
        }
    )

print(f"BigQuery subscription created: {subscription}.")
print(f"Table for subscription is: {bigquery_table_id}")

Ruby

在尝试此示例之前,请按照Ruby Pub/Sub 快速入门: 客户端库。 有关详情,请参阅 Pub/Sub Ruby API 参考文档

如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证

require "google/cloud/pubsub"

##
# Shows how to create a BigQuery subscription where messages published
# to a topic populates a BigQuery table.
#
# @param project_id [String]
# Your Google Cloud project (e.g. "my-project")
# @param topic_id [String]
# Your topic name (e.g. "my-secret")
# @param subscription_id [String]
# ID for new subscription to be created (e.g. "my-subscription")
# @param bigquery_table_id [String]
# ID of bigquery table (e.g "my-project:dataset-id.table-id")
#
def pubsub_create_bigquery_subscription project_id:, topic_id:, subscription_id:, bigquery_table_id:
  pubsub = Google::Cloud::Pubsub.new project_id: project_id
  topic = pubsub.topic topic_id
  subscription = topic.subscribe subscription_id,
                                 bigquery_config: {
                                   table: bigquery_table_id,
                                   write_metadata: true
                                 }
  puts "BigQuery subscription created: #{subscription_id}."
  puts "Table for subscription is: #{bigquery_table_id}"
end

监控 BigQuery 订阅

Cloud Monitoring 提供了一些指标来监控订阅

如需查看与 Pub/Sub 相关的所有可用指标及其说明的列表,请参阅 Pub/Sub 监控文档

您还可以在 Pub/Sub 中监控订阅。

后续步骤