Protobuf 列としてデータをエクスポートする

このドキュメントでは、BigQuery のユーザー定義関数(UDF)を使用して、BigQuery データをプロトコル バッファ(Protobuf)列としてエクスポートする方法について説明します。

Protobuf 列を使用する場合

BigQuery には、選択したデータをフォーマットするための組み込み関数が数多く用意されています。1 つの方法は、複数の列の値を 1 つの Protobuf 値にマージすることです。これには次の利点があります。

  • オブジェクト タイプの安全性を確保できる。
  • JSON と比較して圧縮、データ移転時間、費用が改善されている。
  • 多くのプログラミング言語に Protobuf を処理するためのライブラリがあるため、柔軟性がある。
  • 複数の列から読み取り、1 つのオブジェクトを構築する場合のオーバーヘッドが少ない。

他の列型でも型の安全性を確保できますが、Protobuf 列を使用すると完全型オブジェクトが提供されるため、アプリケーション レイヤやパイプラインの別の部分で行う必要がある作業の量を減らすことができます。

ただし、BigQuery データを Protobuf 列としてエクスポートするには制限があります。

  • Protobuf 列は、適切にインデックスに登録されず、フィルタされません。Protobuf 列の内容で検索すると、効果が下がる可能性があります。
  • Protobuf 形式でのデータの並べ替えは困難な場合があります。

これらの制限がエクスポート ワークフローに適用される場合は、BigQuery データをエクスポートするための他の方法を検討することをおすすめします。

  • Dataflow を使用して、Avro または CSV ファイル形式で BigQuery データをエクスポートする。
  • スケジュールされたクエリを使用して、エクスポートされた BigQuery データを日時で並べ替え、定期的なエクスポートをスケジュールする。

必要なロール

BigQuery データを Protobuf 列としてエクスポートするために必要な権限を取得するには、プロジェクトに対する次の IAM ロールを付与するよう管理者に依頼してください。

ロールの付与については、プロジェクト、フォルダ、組織へのアクセス権の管理をご覧ください。

必要な権限は、カスタムロールや他の事前定義ロールから取得することもできます。

UDF を作成する

次の手順では、BigQuery の STRUCT データ型を Protobuf 列に変換するユーザー定義関数を作成する方法を示します。

  1. コマンドラインで、bigquery-utils.git リポジトリのクローンを作成します。

    $ git clone https://github.com/GoogleCloudPlatform/bigquery-utils.git
    
  2. Protobuf エクスポート フォルダに移動します。

    $ cd bigquery-utils/tools/protobuf_export
    
  3. proto ファイルを ./protos フォルダに追加します。

  4. 必要なパッケージを GitHub リポジトリからインストールします。

    $ npm install
    
  5. webpack を使用してパッケージをバンドルします。

    $ npx webpack --config webpack.config.js --stats-error-details
    
  6. dist/ フォルダにある pbwrapper.js ファイルを見つけて、ファイルを Cloud Storage バケットにコピーします。

  7. 既存の BigQuery 列から Protobuf 列を構築する UDF を作成します。

    CREATE FUNCTION
      DATASET_ID.toMyProtoMessage(input STRUCT<field_1 TYPE1, field_2 TYPE2>)
      RETURNS BYTES
        LANGUAGE js OPTIONS ( library=["gs://BUCKET_NAME/pbwrapper.js"]
    ) AS r"""
    let message = pbwrapper.setup("PROTO_PACKAGE.PROTO_MESSAGE")
    return pbwrapper.parse(message, input)
      """;
    

    次のように置き換えます。

    • DATASET_ID: 関数を保存するデータセットの ID
    • BUCKET_NAME: Cloud Storage バケットの名前
    • PROTO_PACKAGE: proto ファイルのパッケージ名
    • PROTO_MESSAGE: proto ファイルのメッセージ タイプ

proto 言語でパッケージを使用する方法については、パッケージをご覧ください。

Protobuf 値として列をフォーマットする

  • UDF を作成したら、関数を実行します。

    SELECT
      DATASET_ID.toMyProtoMessage(STRUCT(COLUMN_TYPE1, COLUMN_TYPE2)) AS protoResult
    FROM
      `DATASET_NAME`
    LIMIT
      100;
    

    次のように置き換えます。

    • DATASET_ID: 関数を保存するデータセットの ID
    • DATASET_NAME: データセットの名前(例: dataset_name.table_name
    • COLUMN_TYPE1: 列の名前。列には、対応しているスカラー値型か、ARRAYSTRUCT などのスカラー以外の型を含めることができます。
    • COLUMN_TYPE2: 列の名前。列には、対応しているスカラー値型か、ARRAYSTRUCT などのスカラー以外の型を含めることができます。

Protobuf 値を操作する

BigQuery データを Protobuf 形式でエクスポートすると、データを完全な型のオブジェクトまたは構造体として操作できるようになります。

次のコードサンプルは、エクスポートされたデータを処理または操作する方法の例をいくつか示しています。

Go

// package Main queries Google BigQuery.
package main

import (
	"context"
	"fmt"
	"io"
	"log"
	"os"

	"cloud.google.com/go/bigquery"
	"google.golang.org/api/iterator"
	"google.golang.org/Protobuf/proto"

	pb "path/to/proto/file_proto"
)

const (
	projectID = "your-project-id"
)

// Row contains returned row data from bigquery.
type Row struct {
	RowKey string `bigquery:"RowKey"`
	Proto  []byte `bigquery:"ProtoResult"`
}

func main() {
	ctx := context.Background()

	client, err := bigquery.NewClient(ctx, projectID)
	if err != nil {
		log.Fatalf("bigquery.NewClient: %v", err)
	}
	defer client.Close()

	rows, err := query(ctx, client)
	if err != nil {
		log.Fatal(err)
	}
	if err := printResults(os.Stdout, rows); err != nil {
		log.Fatal(err)
	}
}

// query returns a row iterator suitable for reading query results.
func query(ctx context.Context, client *bigquery.Client) (*bigquery.RowIterator, error) {

	query := client.Query(
		`SELECT 
  concat(word, ":", corpus) as RowKey, 
  <dataset-id>.toMyProtoMessage(
    STRUCT(
      word, 
      CAST(word_count AS BIGNUMERIC)
    )
  ) AS ProtoResult 
FROM 
  ` + "` bigquery - public - data.samples.shakespeare `" + ` 
LIMIT 
  100;
`)
	return query.Read(ctx)
}

// printResults prints results from a query.
func printResults(w io.Writer, iter *bigquery.RowIterator) error {
	for {
		var row Row
		err := iter.Next(&row)
		if err == iterator.Done {
			return nil
		}
		if err != nil {
			return fmt.Errorf("error iterating through results: %w", err)
		}
		message := &pb.TestMessage{}
		if err = proto.Unmarshal(row.Proto, message); err != nil {
			return err
		}
		fmt.Fprintf(w, "rowKey: %s, message: %v\n", row.RowKey, message)
	}
}

Java

package proto;

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.FieldValueList;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobId;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.TableResult;
import path.to.proto.TestMessage;
import java.util.UUID;

/** Queries Google BigQuery */
public final class Main {
  public static void main(String[] args) throws Exception {
    String projectId = "your-project-id";
    BigQuery bigquery = BigQueryOptions.newBuilder().setProjectId(projectId).build().getService();

    QueryJobConfiguration queryConfig =
        QueryJobConfiguration.newBuilder(
                " SELECT "
                    + "concat(word , \":\",corpus) as RowKey,"
                    + "<dataset-id>.toMyProtoMessage(STRUCT(word, "
                    + "CAST(word_count AS BIGNUMERIC))) AS ProtoResult "
                    + "FROM "
                    + "`bigquery-public-data.samples.shakespeare` "
                    + "ORDER BY word_count DESC "
                    + "LIMIT 20")
            .setUseLegacySql(false)
            .build();

    // Create a job ID so that we can safely retry.
    JobId jobId = JobId.of(UUID.randomUUID().toString());
    Job queryJob = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());

    // Wait for the query to complete.
    queryJob = queryJob.waitFor();

    // Check for errors
    if (queryJob == null) {
      throw new RuntimeException("Job no longer exists");
    } else if (queryJob.getStatus().getError() != null) {
      // You can also look at queryJob.getStatus().getExecutionErrors() for all
      // errors, not just the latest one.
      throw new RuntimeException(queryJob.getStatus().getError().toString());
    }

    // Get the results.
    TableResult result = queryJob.getQueryResults();

    // Print all pages of the results.
    for (FieldValueList row : result.iterateAll()) {
      String key = row.get("RowKey").getStringValue();
      byte[] message = row.get("ProtoResult").getBytesValue();
      TestMessage testMessage = TestMessage.parseFrom(message);
      System.out.printf("rowKey: %s, message: %s\n", key, testMessage);
    }
  }
}

Python

"""Queries Google BigQuery."""

from google.cloud import bigquery
from path.to.proto import awesome_pb2


def main():
  project_id = "your-project-id"
  client = bigquery.Client(project=project_id)
  query_job = client.query(query="""
               SELECT
			concat(word , ":",corpus) as RowKey,
			<dataset-id>.toMyProtoMessage(
			    STRUCT(
			      word, 
			      CAST(word_count AS BIGNUMERIC)
			    )
			  ) AS ProtoResult 
		FROM
				  `bigquery-public-data.samples.shakespeare`
		ORDER BY word_count DESC
		LIMIT 20
    """)
  rows = query_job.result()
  for row in rows:
    message = awesome_pb2.TestMessage()
    message.ParseFromString(row.get("ProtoResult"))
    print(
        "rowKey: {}, message: {}".format(row.get("RowKey"), message)
    )