将数据导出为 Protobuf 列

本文档介绍了如何使用 BigQuery 用户定义的函数 (UDF) 将 BigQuery 数据导出为协议缓冲区 (Protobuf) 列。

何时使用 Protobuf 列

BigQuery 提供了许多内置函数,用于设置所选数据的格式。一种方法是将多个列值合并为一个 Protobuf 值,这具有以下优势:

  • 对象类型安全。
  • 与 JSON 相比,改进了压缩功能、缩短了数据转移时间且降低了费用。
  • 提供了灵活性,因为大多数编程语言都有用于处理 Protobuf 的库。
  • 从多个列读取数据并构建单个对象时,开销更低。

虽然其他列类型也可以提供类型安全,但使用 Protobuf 列可以提供完全类型化的对象,这样可减少需要在应用层或流水线的其他部分完成的工作量。

但是,将 BigQuery 数据导出为 Protobuf 列存在以下限制:

  • Protobuf 列未很好地编入索引或进行过滤。按 Protobuf 列的内容搜索的效果可能不太理想。
  • 以 Protobuf 格式对数据进行排序可能比较困难。

如果这些限制适用于您的导出工作流,您可以考虑使用导出 BigQuery 数据的其他方法:

  • 计划查询EXPORT DATA 语句搭配使用,按日期或时间对导出的 BigQuery 数据进行排序,并定期安排导出。BigQuery 支持将数据导出为 Avro、CSV、JSON 和 Parquet 格式。
  • 使用 Dataflow 以 Avro 或 CSV 文件格式导出 BigQuery 数据。

所需的角色

如需获得将 BigQuery 数据导出为 Protobuf 列所需的权限,请让您的管理员为您授予项目的以下 IAM 角色:

如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

您也可以通过自定义角色或其他预定义角色来获取所需的权限。

创建 UDF

以下过程介绍了如何创建用户定义的函数,以将 BigQuery STRUCT 数据类型转换为 Protobuf 列:

  1. 在命令行中,克隆 bigquery-utils.git 代码库:

    git clone https://github.com/GoogleCloudPlatform/bigquery-utils.git
    
  2. 找到 Protobuf 导出文件夹

    cd bigquery-utils/tools/protobuf_export
    
  3. 将 proto 文件添加到 ./protos 文件夹中。

  4. 从 GitHub 代码库安装必要的软件包:

    npm install
    
  5. 使用 webpack 捆绑软件包:

    npx webpack --config webpack.config.js --stats-error-details
    
  6. dist/ 文件夹中找到 pbwrapper.js 文件,然后将该文件复制到 Cloud Storage 存储桶。

  7. 创建一个 UDF 以从现有 BigQuery 列构建 Protobuf 列:

    CREATE FUNCTION
      DATASET_ID.toMyProtoMessage(input STRUCT<field_1 TYPE1, field_2 TYPE2>)
      RETURNS BYTES
        LANGUAGE js OPTIONS ( library=["gs://BUCKET_NAME/pbwrapper.js"]
    ) AS r"""
    let message = pbwrapper.setup("PROTO_PACKAGE.PROTO_MESSAGE")
    return pbwrapper.parse(message, input)
      """;
    

    请替换以下内容:

    • DATASET_ID:存储函数的数据集的 ID
    • BUCKET_NAME:Cloud Storage 存储桶的名称
    • PROTO_PACKAGE:proto 文件的软件包名称
    • PROTO_MESSAGE:proto 文件的消息类型

如需详细了解如何在 proto 语言中使用软件包,请参阅软件包

将列的格式设置为 Protobuf 值

  • 创建 UDF 后,请运行以下函数:

    SELECT
      DATASET_ID.toMyProtoMessage(STRUCT(COLUMN_TYPE1, COLUMN_TYPE2)) AS protoResult
    FROM
      `DATASET_NAME`
    LIMIT
      100;
    

    替换以下内容:

    • DATASET_ID:存储函数的数据集的 ID
    • DATASET_NAME:数据集的名称,例如 dataset_name.table_name
    • COLUMN_TYPE1:列的名称。列可以包含任何受支持的标量值类型或非标量类型(包括 ARRAYSTRUCT
    • COLUMN_TYPE2:列的名称。列可以包含任何受支持的标量值类型或非标量类型(包括 ARRAYSTRUCT

使用 Protobuf 值

以 Protobuf 格式导出 BigQuery 数据后,您现在可以将数据作为完全类型化的对象或结构体使用。

以下代码示例提供了几种可以处理或使用导出的数据的方式示例:

Go

// package Main queries Google BigQuery.
package main

import (
	"context"
	"fmt"
	"io"
	"log"
	"os"

	"cloud.google.com/go/bigquery"
	"google.golang.org/api/iterator"
	"google.golang.org/Protobuf/proto"

	pb "path/to/proto/file_proto"
)

const (
	projectID = "your-project-id"
)

// Row contains returned row data from bigquery.
type Row struct {
	RowKey string `bigquery:"RowKey"`
	Proto  []byte `bigquery:"ProtoResult"`
}

func main() {
	ctx := context.Background()

	client, err := bigquery.NewClient(ctx, projectID)
	if err != nil {
		log.Fatalf("bigquery.NewClient: %v", err)
	}
	defer client.Close()

	rows, err := query(ctx, client)
	if err != nil {
		log.Fatal(err)
	}
	if err := printResults(os.Stdout, rows); err != nil {
		log.Fatal(err)
	}
}

// query returns a row iterator suitable for reading query results.
func query(ctx context.Context, client *bigquery.Client) (*bigquery.RowIterator, error) {

	query := client.Query(
		`SELECT 
  concat(word, ":", corpus) as RowKey, 
  <dataset-id>.toMyProtoMessage(
    STRUCT(
      word, 
      CAST(word_count AS BIGNUMERIC)
    )
  ) AS ProtoResult 
FROM 
  ` + "` bigquery - public - data.samples.shakespeare `" + ` 
LIMIT 
  100;
`)
	return query.Read(ctx)
}

// printResults prints results from a query.
func printResults(w io.Writer, iter *bigquery.RowIterator) error {
	for {
		var row Row
		err := iter.Next(&row)
		if err == iterator.Done {
			return nil
		}
		if err != nil {
			return fmt.Errorf("error iterating through results: %w", err)
		}
		message := &pb.TestMessage{}
		if err = proto.Unmarshal(row.Proto, message); err != nil {
			return err
		}
		fmt.Fprintf(w, "rowKey: %s, message: %v\n", row.RowKey, message)
	}
}

Java

package proto;

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.FieldValueList;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobId;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.TableResult;
import path.to.proto.TestMessage;
import java.util.UUID;

/** Queries Google BigQuery */
public final class Main {
  public static void main(String[] args) throws Exception {
    String projectId = "your-project-id";
    BigQuery bigquery = BigQueryOptions.newBuilder().setProjectId(projectId).build().getService();

    QueryJobConfiguration queryConfig =
        QueryJobConfiguration.newBuilder(
                " SELECT "
                    + "concat(word , \":\",corpus) as RowKey,"
                    + "<dataset-id>.toMyProtoMessage(STRUCT(word, "
                    + "CAST(word_count AS BIGNUMERIC))) AS ProtoResult "
                    + "FROM "
                    + "`bigquery-public-data.samples.shakespeare` "
                    + "ORDER BY word_count DESC "
                    + "LIMIT 20")
            .setUseLegacySql(false)
            .build();

    // Create a job ID so that we can safely retry.
    JobId jobId = JobId.of(UUID.randomUUID().toString());
    Job queryJob = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());

    // Wait for the query to complete.
    queryJob = queryJob.waitFor();

    // Check for errors
    if (queryJob == null) {
      throw new RuntimeException("Job no longer exists");
    } else if (queryJob.getStatus().getError() != null) {
      // You can also look at queryJob.getStatus().getExecutionErrors() for all
      // errors, not just the latest one.
      throw new RuntimeException(queryJob.getStatus().getError().toString());
    }

    // Get the results.
    TableResult result = queryJob.getQueryResults();

    // Print all pages of the results.
    for (FieldValueList row : result.iterateAll()) {
      String key = row.get("RowKey").getStringValue();
      byte[] message = row.get("ProtoResult").getBytesValue();
      TestMessage testMessage = TestMessage.parseFrom(message);
      System.out.printf("rowKey: %s, message: %s\n", key, testMessage);
    }
  }
}

Python

"""Queries Google BigQuery."""

from google.cloud import bigquery
from path.to.proto import awesome_pb2


def main():
  project_id = "your-project-id"
  client = bigquery.Client(project=project_id)
  query_job = client.query(query="""
               SELECT
			concat(word , ":",corpus) as RowKey,
			<dataset-id>.toMyProtoMessage(
			    STRUCT(
			      word, 
			      CAST(word_count AS BIGNUMERIC)
			    )
			  ) AS ProtoResult 
		FROM
				  `bigquery-public-data.samples.shakespeare`
		ORDER BY word_count DESC
		LIMIT 20
    """)
  rows = query_job.result()
  for row in rows:
    message = awesome_pb2.TestMessage()
    message.ParseFromString(row.get("ProtoResult"))
    print(
        "rowKey: {}, message: {}".format(row.get("RowKey"), message)
    )