创建 BigQuery 订阅

本文档介绍了如何创建 BigQuery 订阅。 您可以使用 Google Cloud 控制台、Google Cloud CLI、客户端库或 Pub/Sub API 来创建 BigQuery 订阅。

准备工作

在阅读本文档之前,请确保您熟悉以下内容:

除了熟悉 Pub/Sub 和 BigQuery 之外,请确保您在创建 BigQuery 订阅之前满足以下前提条件:

  • 已存在 BigQuery 表。或者,您也可以在创建 BigQuery 订阅时创建一个订阅,如本文档后面部分所述。

  • Pub/Sub 主题的架构与 BigQuery 表之间的兼容性。如果您添加不兼容的 BigQuery 表,则会收到与兼容性相关的错误消息。如需了解详情,请参阅架构兼容性

所需的角色和权限

以下是有关角色和权限的准则列表:

  • 如需创建订阅,您必须在项目级别配置访问权限控制。

  • 如果您的订阅和主题位于不同的项目中,那么您还需要拥有资源级权限,如本部分稍后所述。

  • 如需创建 BigQuery 订阅,Pub/Sub 服务帐号必须具有向特定 BigQuery 表写入数据的权限。如需详细了解如何授予这些权限,请参阅本文档的下一部分。

  • 您可以在某个项目中配置 BigQuery 订阅,以写入其他项目中的 BigQuery 表。

如需获取创建 BigQuery 订阅所需的权限,请让管理员授予您项目的 Pub/Sub Editor (roles/pubsub.editor) IAM 角色。如需详细了解如何授予角色,请参阅管理访问权限

此预定义角色包含创建 BigQuery 订阅所需的权限。如需查看所需的确切权限,请展开所需权限部分:

所需权限

如需创建 BigQuery 订阅,您需要拥有以下权限:

  • 从订阅中拉取: pubsub.subscriptions.consume
  • 创建订阅: pubsub.subscriptions.create
  • 删除订阅: pubsub.subscriptions.delete
  • 获取订阅: pubsub.subscriptions.get
  • 列出订阅: pubsub.subscriptions.list
  • 更新订阅: pubsub.subscriptions.update
  • 将订阅附加到主题: pubsub.topics.attachSubscription
  • 获取订阅的 IAM 政策: pubsub.subscriptions.getIamPolicy
  • 为订阅配置 IAM 政策 pubsub.subscriptions.setIamPolicy

您也可以使用自定义角色或其他预定义角色来获取这些权限。

如果您需要在一个项目中创建与另一个项目中的主题关联的 BigQuery 订阅,请让主题管理员也向您授予该主题的 Pub/Sub Editor (roles/pubsub.editor) IAM 角色。

将 BigQuery 角色分配给 Pub/Sub 服务帐号

某些 Google Cloud 服务具有 Google Cloud 代管式服务帐号,允许这些服务访问您的资源。这些服务账号称为服务代理。Pub/Sub 会以 service-project-number@gcp-sa-pubsub.iam.gserviceaccount.com 格式为每个项目创建并维护一个服务帐号。

如需创建 BigQuery 订阅,Pub/Sub 服务帐号必须具有写入特定 BigQuery 表和读取表元数据的权限。

将 BigQuery Data Editor (roles/bigquery.dataEditor) 角色授予 Pub/Sub 服务帐号。

  1. 在 Google Cloud 控制台中,转到 IAM 页面。

    转到 IAM

  2. 点击授予访问权限

  3. 添加主账号部分中,输入您的 Pub/Sub 服务帐号的名称。服务帐号的格式为 service-project-number@gcp-sa-pubsub.iam.gserviceaccount.com。例如,对于设置了 project-number=112233445566 的项目,服务帐号的格式为 service-112233445566@gcp-sa-pubsub.iam.gserviceaccount.com

  4. 分配角色部分中,点击添加其他角色

  5. 选择角色下拉列表中,输入 BigQuery,然后选择 BigQuery Data Editor 角色

  6. 点击保存

如需详细了解 BigQuery IAM,请参阅 BigQuery 角色和权限

BigQuery 订阅属性

配置 BigQuery 订阅时,您可以指定以下属性。

通用属性

了解您可以为所有订阅设置的常见订阅属性

使用主题架构

此选项允许 Pub/Sub 使用将订阅附加到的 Pub/Sub 主题的架构。此外,Pub/Sub 还会将消息中的字段写入 BigQuery 表中的相应列。

使用此选项时,请务必查看以下额外要求:

  • 主题架构中的字段和 BigQuery 架构中的字段必须具有相同的名称,并且它们的类型必须彼此兼容。

  • 主题架构中的任何可选字段在 BigQuery 架构中也必须是可选字段。

  • 主题架构中的必填字段无需在 BigQuery 架构中是必需的。

  • 如果主题架构中不存在 BigQuery 字段,则这些 BigQuery 字段必须采用 NULLABLE 模式。

  • 如果主题架构具有 BigQuery 架构中不存在的其他字段,并且这些字段可以丢弃,请选择删除未知字段选项。

  • 您只能选择以下订阅属性之一:使用主题架构使用表架构

如果您未选择使用主题架构使用表架构选项,请确保 BigQuery 表有一个名为 data 且类型为 BYTESSTRINGJSON 的列。Pub/Sub 会将消息写入此 BigQuery 列。

对 Pub/Sub 主题架构或 BigQuery 表架构的更改可能不会在消息写入 BigQuery 表时立即生效。例如,如果删除未知字段选项已启用,并且某个字段存在于 Pub/Sub 架构中,但不存在于 BigQuery 架构中,则写入 BigQuery 表的消息在将该字段添加到 BigQuery 架构后可能仍不包含该字段。最终,架构会同步,并且后续消息会包含该字段。

如果将使用主题架构选项用于 BigQuery 订阅,您还可以利用 BigQuery 变更数据捕获 (CDC) 功能。CDC 通过处理更改并将其应用于现有行来更新 BigQuery 表。

如需详细了解此功能,请参阅使用变更数据捕获来流式传输表更新

如需了解如何将此功能与 BigQuery 订阅搭配使用,请参阅 BigQuery 变更数据捕获

使用表架构

此选项允许 Pub/Sub 使用 BigQuery 表的架构将 JSON 消息的字段写入相应列。使用此选项时,请务必查看以下额外要求:

  • 发布的消息必须采用 JSON 格式。

  • 如果订阅的主题具有关联的架构,则必须将消息编码属性设置为 JSON

  • 如果消息中不存在 BigQuery 字段,则这些 BigQuery 字段必须采用 NULLABLE 模式。

  • 如果消息包含 BigQuery 架构中不存在的其他字段,并且这些字段可以丢弃,请选择删除未知字段选项。

  • 在 JSON 消息中,DATEDATETIMETIMETIMESTAMP 值必须是符合支持的表示法的整数。

  • 在 JSON 消息中,NUMERICBIGNUMERIC 值必须使用 BigDecimalByteStringEncoder 进行字节编码。

  • 您只能选择以下订阅属性之一:使用主题架构使用表架构

如果您未选择使用主题架构使用表架构选项,请确保 BigQuery 表有一个名为 data 且类型为 BYTESSTRINGJSON 的列。Pub/Sub 会将消息写入此 BigQuery 列。

对 BigQuery 表架构的更改可能不会立即与写入 BigQuery 表的消息生效。例如,如果删除未知字段选项已启用,并且某个字段出现在消息中,但不存在于 BigQuery 架构中,则写入 BigQuery 表的消息在将该字段添加到 BigQuery 架构后可能仍不包含该字段。最终,架构会同步,后续消息将包含该字段。

如果将使用表架构选项用于 BigQuery 订阅,您还可以利用 BigQuery 变更数据捕获 (CDC)。CDC 通过处理更改并将其应用于现有行来更新 BigQuery 表。

如需详细了解此功能,请参阅使用变更数据捕获来流式传输表更新

如需了解如何将此功能与 BigQuery 订阅搭配使用,请参阅 BigQuery 变更数据捕获

删除未知字段

此选项与使用主题架构使用表架构选项搭配使用。通过此选项,Pub/Sub 可以删除主题架构或消息中存在但 BigQuery 架构中不存在的任何字段。如果未设置删除未知字段,包含额外字段的消息将不会写入 BigQuery,并且会保留在订阅积压消息中。订阅最终处于错误状态

写入元数据

此选项允许 Pub/Sub 将每条消息的元数据写入 BigQuery 表中的其他列。否则,元数据不会写入 BigQuery 表。

如果选择写入元数据选项,请确保 BigQuery 表包含下表中介绍的字段。

如果您没有选择写入元数据选项,则除非 use_topic_schema 为 true,否则目标 BigQuery 表将仅需要 data 字段。如果您同时选择了写入元数据使用主题架构选项,则主题的架构不得包含名称与元数据参数名称相符的任何字段。此限制包括这些蛇形命名法参数的驼峰式大小写版本。

参数
subscription_name

STRING

订阅的名称。

message_id

STRING

消息的 ID

publish_time

TIMESTAMP

发布消息的时间。

data

BYTES、STRING 或 JSON

消息正文。

所有未选择使用主题架构的目标 BigQuery 表都需要 data 字段。如果该字段是 JSON 类型,则消息正文必须是有效的 JSON。

attributes

STRING 或 JSON

包含所有消息属性的 JSON 对象。它还包含 Pub/Sub 消息中包含的其他字段,包括排序键(如果存在)。

创建 BigQuery 订阅

以下示例演示了如何创建通过 BigQuery 传送的订阅。

控制台

  1. 在 Google Cloud 控制台中,前往订阅页面。

    前往“订阅”

  2. 点击创建订阅
  3. 订阅 ID 字段中,输入一个名称。

    如需了解如何为订阅命名,请参阅主题或订阅命名准则

  4. 从下拉菜单中选择或创建一个主题。订阅会从主题接收消息。
  5. 传送类型中选择写入 BigQuery
  6. 为 BigQuery 表选择项目。
  7. 选择现有数据集或创建新数据集。

    如需了解如何创建数据集,请参阅创建数据集

  8. 选择现有表或创建新表。

    如需了解如何创建表,请参阅创建表

  9. 我们强烈建议您启用死信来处理消息失败情况。

    如需了解详情,请参阅死信主题

  10. 点击创建

您还可以通过主题页面创建订阅。此快捷方式可帮助您将主题与订阅关联。

  1. 在 Google Cloud 控制台中,前往主题页面。

    前往“主题”

  2. 点击要为其创建订阅的主题旁边的
  3. 从上下文菜单中选择创建订阅
  4. 传送类型中选择写入 BigQuery
  5. 为 BigQuery 表选择项目。
  6. 选择现有数据集或创建新数据集。

    如需了解如何创建数据集,请参阅创建数据集

  7. 选择现有表或创建新表。

    如需了解如何创建数据集,请参阅创建表

  8. 我们强烈建议您启用死信来处理消息失败情况。

    如需了解详情,请参阅死信主题

  9. 点击创建

gcloud

  1. 在 Google Cloud 控制台中,激活 Cloud Shell。

    激活 Cloud Shell

    Cloud Shell 会话随即会在 Google Cloud 控制台的底部启动,并显示命令行提示符。Cloud Shell 是一个已安装 Google Cloud CLI 且已为当前项目设置值的 Shell 环境。该会话可能需要几秒钟时间来完成初始化。

  2. 如需创建 Pub/Sub 订阅,请使用 gcloud pubsub subscriptions create 命令:

    gcloud pubsub subscriptions create SUBSCRIPTION_ID \
        --topic=TOPIC_ID \
        --bigquery-table=PROJECT_ID:DATASET_ID.TABLE_ID

    替换以下内容:

    • SUBSCRIPTION_ID:指定订阅的 ID。
    • TOPIC_ID:指定主题的 ID。该主题需要架构。
    • PROJECT_ID:指定项目的 ID。
    • DATASET_ID:指定现有数据集的 ID。如需创建数据集,请参阅 创建数据集
    • TABLE_ID:指定现有表的 ID。如果您的主题没有架构,则该表需要 data 字段。如需创建表,请参阅创建具有架构定义的空表

C++

在试用此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 C++ 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C++ API 参考文档

要向 Pub/Sub 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为客户端库设置身份验证

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::SubscriptionAdminClient client,
   std::string const& project_id, std::string const& topic_id,
   std::string const& subscription_id, std::string const& table_id) {
  google::pubsub::v1::Subscription request;
  request.set_name(
      pubsub::Subscription(project_id, subscription_id).FullName());
  request.set_topic(pubsub::Topic(project_id, topic_id).FullName());
  request.mutable_bigquery_config()->set_table(table_id);
  auto sub = client.CreateSubscription(request);
  if (!sub) {
    if (sub.status().code() == google::cloud::StatusCode::kAlreadyExists) {
      std::cout << "The subscription already exists\n";
      return;
    }
    throw std::move(sub).status();
  }

  std::cout << "The subscription was successfully created: "
            << sub->DebugString() << "\n";
}

C#

在试用此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 C# 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C# API 参考文档

要向 Pub/Sub 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为客户端库设置身份验证


using Google.Cloud.PubSub.V1;

public class CreateBigQuerySubscriptionSample
{
    public Subscription CreateBigQuerySubscription(string projectId, string topicId, string subscriptionId, string bigqueryTableId)
    {
        SubscriberServiceApiClient subscriber = SubscriberServiceApiClient.Create();
        TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);

        var subscriptionRequest = new Subscription
        {
            SubscriptionName = subscriptionName,
            TopicAsTopicName = topicName,
            BigqueryConfig = new BigQueryConfig
            {
                Table = bigqueryTableId
            }
        };
        var subscription = subscriber.CreateSubscription(subscriptionRequest);
        return subscription;
    }
}

Go

在试用此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 Go 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Go API 参考文档

要向 Pub/Sub 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为客户端库设置身份验证

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

// createBigQuerySubscription creates a Pub/Sub subscription that exports messages to BigQuery.
func createBigQuerySubscription(w io.Writer, projectID, subID string, topic *pubsub.Topic, table string) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	// topic of type https://godoc.org/cloud.google.com/go/pubsub#Topic
	// table := "my-project-id.dataset_id.table_id"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	sub, err := client.CreateSubscription(ctx, subID, pubsub.SubscriptionConfig{
		Topic: topic,
		BigQueryConfig: pubsub.BigQueryConfig{
			Table:         table,
			WriteMetadata: true,
		},
	})
	if err != nil {
		return fmt.Errorf("client.CreateSubscription: %w", err)
	}
	fmt.Fprintf(w, "Created BigQuery subscription: %v\n", sub)

	return nil
}

Java

在试用此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 Java 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Java API 参考文档

要向 Pub/Sub 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为客户端库设置身份验证

import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.pubsub.v1.BigQueryConfig;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.Subscription;
import java.io.IOException;

public class CreateBigQuerySubscriptionExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    String subscriptionId = "your-subscription-id";
    String bigqueryTableId = "your-project.your-dataset.your-table";

    createBigQuerySubscription(projectId, topicId, subscriptionId, bigqueryTableId);
  }

  public static void createBigQuerySubscription(
      String projectId, String topicId, String subscriptionId, String bigqueryTableId)
      throws IOException {
    try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {

      ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId);
      ProjectSubscriptionName subscriptionName =
          ProjectSubscriptionName.of(projectId, subscriptionId);

      BigQueryConfig bigqueryConfig =
          BigQueryConfig.newBuilder().setTable(bigqueryTableId).setWriteMetadata(true).build();

      Subscription subscription =
          subscriptionAdminClient.createSubscription(
              Subscription.newBuilder()
                  .setName(subscriptionName.toString())
                  .setTopic(topicName.toString())
                  .setBigqueryConfig(bigqueryConfig)
                  .build());

      System.out.println("Created a BigQuery subscription: " + subscription.getAllFields());
    }
  }
}

Node.js

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const bigqueryTableId = 'YOUR_TABLE_ID';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createBigQuerySubscription(
  topicNameOrId,
  subscriptionNameOrId,
  bigqueryTableId
) {
  const options = {
    bigqueryConfig: {
      table: bigqueryTableId,
      writeMetadata: true,
    },
  };

  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, options);

  console.log(`Subscription ${subscriptionNameOrId} created.`);
}

Node.js

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const bigqueryTableId = 'YOUR_TABLE_ID';

// Imports the Google Cloud client library
import {PubSub, CreateSubscriptionOptions} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createBigQuerySubscription(
  topicNameOrId: string,
  subscriptionNameOrId: string,
  bigqueryTableId: string
) {
  const options: CreateSubscriptionOptions = {
    bigqueryConfig: {
      table: bigqueryTableId,
      writeMetadata: true,
    },
  };

  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, options);

  console.log(`Subscription ${subscriptionNameOrId} created.`);
}

PHP

在试用此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 PHP 设置说明进行操作。如需了解详情,请参阅 Pub/Sub PHP API 参考文档

要向 Pub/Sub 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为客户端库设置身份验证

use Google\Cloud\PubSub\PubSubClient;
use Google\Cloud\PubSub\V1\BigQueryConfig;

/**
 * Creates a Pub/Sub BigQuery subscription.
 *
 * @param string $projectId  The Google project ID.
 * @param string $topicName  The Pub/Sub topic name.
 * @param string $subscriptionName  The Pub/Sub subscription name.
 * @param string $table      The BigQuery table to which to write.
 */
function create_bigquery_subscription($projectId, $topicName, $subscriptionName, $table)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);
    $topic = $pubsub->topic($topicName);
    $subscription = $topic->subscription($subscriptionName);
    $config = new BigQueryConfig(['table' => $table]);
    $subscription->create([
        'bigqueryConfig' => $config
    ]);

    printf('Subscription created: %s' . PHP_EOL, $subscription->name());
}

Python

在试用此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 Python 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Python API 参考文档

要向 Pub/Sub 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为客户端库设置身份验证

from google.cloud import pubsub_v1

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# subscription_id = "your-subscription-id"
# bigquery_table_id = "your-project.your-dataset.your-table"

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
topic_path = publisher.topic_path(project_id, topic_id)
subscription_path = subscriber.subscription_path(project_id, subscription_id)

bigquery_config = pubsub_v1.types.BigQueryConfig(
    table=bigquery_table_id, write_metadata=True
)

# Wrap the subscriber in a 'with' block to automatically call close() to
# close the underlying gRPC channel when done.
with subscriber:
    subscription = subscriber.create_subscription(
        request={
            "name": subscription_path,
            "topic": topic_path,
            "bigquery_config": bigquery_config,
        }
    )

print(f"BigQuery subscription created: {subscription}.")
print(f"Table for subscription is: {bigquery_table_id}")

Ruby

在试用此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 Ruby 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Ruby API 参考文档

要向 Pub/Sub 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为客户端库设置身份验证

require "google/cloud/pubsub"

##
# Shows how to create a BigQuery subscription where messages published
# to a topic populates a BigQuery table.
#
# @param project_id [String]
# Your Google Cloud project (e.g. "my-project")
# @param topic_id [String]
# Your topic name (e.g. "my-secret")
# @param subscription_id [String]
# ID for new subscription to be created (e.g. "my-subscription")
# @param bigquery_table_id [String]
# ID of bigquery table (e.g "my-project:dataset-id.table-id")
#
def pubsub_create_bigquery_subscription project_id:, topic_id:, subscription_id:, bigquery_table_id:
  pubsub = Google::Cloud::Pubsub.new project_id: project_id
  topic = pubsub.topic topic_id
  subscription = topic.subscribe subscription_id,
                                 bigquery_config: {
                                   table: bigquery_table_id,
                                   write_metadata: true
                                 }
  puts "BigQuery subscription created: #{subscription_id}."
  puts "Table for subscription is: #{bigquery_table_id}"
end

后续步骤