Protobuf 열로 데이터 내보내기

이 문서에서는 BigQuery 사용자 정의 함수(UDF)를 사용하여 BigQuery 데이터를 프로토콜 버퍼(Protobuf) 열로 내보내는 방법을 설명합니다.

Protobuf 열을 사용하는 경우

BigQuery는 선택한 데이터의 형식을 지정하는 여러 가지 기본 제공 함수를 제공합니다. 그 중 한 옵션은 여러 열 값을 단일 Protobuf 값으로 병합하는 것으로 여기에는 다음과 같은 이점이 있습니다.

  • 객체 유형 안전성
  • JSON보다 개선된 비교, 데이터 전송 시간, 비용.
  • 유연성, 대부분의 프로그래밍 언어에는 Protobuf를 처리할 수 있는 라이브러리가 있음
  • 여러 열에서 읽고 단일 객체를 빌드할 때 오버헤드 감소.

다른 열 유형도 유형 안전성을 제공할 수 있지만 Protobuf 열을 사용하면 완전히 형식이 지정된 객체가 제공되어 애플리케이션 레이어 또는 파이프라인의 다른 부분에서 처리해야 하는 작업량을 줄일 수 있습니다.

그러나 BigQuery 데이터를 Protobuf 열로 내보내는 데는 몇 가지 제한사항이 있습니다.

  • Protobuf 열은 색인 생성 또는 필터링이 원활하지 않습니다. Protobuf 열의 콘텐츠로 검색하면 효과적이지 않을 수 있습니다.
  • Protobuf 형식의 데이터는 정렬하기 어려울 수 있습니다.

이러한 제한사항이 내보내기 워크플로에 적용되는 경우 다음과 같이 BigQuery 데이터를 내보내는 다른 방법을 고려해야 할 수 있습니다.

  • Dataflow를 사용하여 BigQuery 데이터를 Avro 또는 CSV 파일 형식으로 내보냅니다.
  • 예약된 쿼리를 사용하여 내보낸 BigQuery 데이터를 날짜 또는 시간별로 정렬하고 내보내기를 반복적으로 예약합니다.

필요한 역할

BigQuery 데이터를 Protobuf 열로 내보내는 데 필요한 권한을 얻으려면 관리자에게 프로젝트에 대한 다음 IAM 역할을 부여해 달라고 요청하세요.

역할 부여에 대한 자세한 내용은 프로젝트, 폴더, 조직에 대한 액세스 관리를 참조하세요.

커스텀 역할이나 다른 사전 정의된 역할을 통해 필요한 권한을 얻을 수도 있습니다.

UDF 만들기

다음 절차에서는 BigQuery STRUCT 데이터 유형을 Protobuf 열로 변환하는 사용자 정의 함수를 만드는 방법을 보여줍니다.

  1. 명령줄에서 bigquery-utils.git 저장소를 클론합니다.

    $ git clone https://github.com/GoogleCloudPlatform/bigquery-utils.git
    
  2. Protobuf 내보내기 폴더로 이동

    $ cd bigquery-utils/tools/protobuf_export
    
  3. proto 파일을 ./protos 폴더에 추가합니다.

  4. GitHub 저장소에서 필요한 패키지를 설치합니다.

    $ npm install
    
  5. webpack을 사용하여 패키지를 번들로 묶습니다.

    $ npx webpack --config webpack.config.js --stats-error-details
    
  6. dist/ 폴더에서 pbwrapper.js 파일을 찾아 파일을 Cloud Storage 버킷에 복사합니다.

  7. 기존 BigQuery 열에서 Protobuf 열을 빌드하는 UDF를 만듭니다.

    CREATE FUNCTION
      DATASET_ID.toMyProtoMessage(input STRUCT<field_1 TYPE1, field_2 TYPE2>)
      RETURNS BYTES
        LANGUAGE js OPTIONS ( library=["gs://BUCKET_NAME/pbwrapper.js"]
    ) AS r"""
    let message = pbwrapper.setup("PROTO_PACKAGE.PROTO_MESSAGE")
    return pbwrapper.parse(message, input)
      """;
    

    다음을 바꿉니다.

    • DATASET_ID: 함수를 저장하는 데이터 세트의 ID
    • BUCKET_NAME: Cloud Storage 버킷 이름
    • PROTO_PACKAGE: proto 파일의 패키지 이름
    • PROTO_MESSAGE: proto 파일의 메시지 유형

proto 언어에서 패키지를 사용하는 방법에 대한 자세한 내용은 패키지를 참조하세요.

열 형식을 Protobuf 값으로 지정

  • UDF를 만든 후 함수를 실행합니다.

    SELECT
      DATASET_ID.toMyProtoMessage(STRUCT(COLUMN_TYPE1, COLUMN_TYPE2)) AS protoResult
    FROM
      `DATASET_NAME`
    LIMIT
      100;
    

    다음을 바꿉니다.

    • DATASET_ID: 함수를 저장하는 데이터 세트의 ID
    • DATASET_NAME: 데이터 세트 이름(예: dataset_name.table_name)
    • COLUMN_TYPE1: 열 이름. 열에는 지원되는 모든 스칼라 값 유형 또는 비스칼라 유형(ARRAYSTRUCT 포함)이 포함될 수 있습니다.
    • COLUMN_TYPE2: 열 이름. 열에는 지원되는 모든 스칼라 값 유형 또는 비스칼라 유형(ARRAYSTRUCT 포함)이 포함될 수 있습니다.

Protobuf 값 사용

Protobuf 형식으로 내보낸 BigQuery 데이터를 사용하여 이제 완전히 형식이 지정된 객체 또는 구조체로 데이터를 작업할 수 있습니다.

다음 코드 샘플은 내보낸 데이터를 처리하거나 사용할 수 있는 몇 가지 방법의 예시를 제공합니다.

Go

// package Main queries Google BigQuery.
package main

import (
	"context"
	"fmt"
	"io"
	"log"
	"os"

	"cloud.google.com/go/bigquery"
	"google.golang.org/api/iterator"
	"google.golang.org/Protobuf/proto"

	pb "path/to/proto/file_proto"
)

const (
	projectID = "your-project-id"
)

// Row contains returned row data from bigquery.
type Row struct {
	RowKey string `bigquery:"RowKey"`
	Proto  []byte `bigquery:"ProtoResult"`
}

func main() {
	ctx := context.Background()

	client, err := bigquery.NewClient(ctx, projectID)
	if err != nil {
		log.Fatalf("bigquery.NewClient: %v", err)
	}
	defer client.Close()

	rows, err := query(ctx, client)
	if err != nil {
		log.Fatal(err)
	}
	if err := printResults(os.Stdout, rows); err != nil {
		log.Fatal(err)
	}
}

// query returns a row iterator suitable for reading query results.
func query(ctx context.Context, client *bigquery.Client) (*bigquery.RowIterator, error) {

	query := client.Query(
		`SELECT 
  concat(word, ":", corpus) as RowKey, 
  <dataset-id>.toMyProtoMessage(
    STRUCT(
      word, 
      CAST(word_count AS BIGNUMERIC)
    )
  ) AS ProtoResult 
FROM 
  ` + "` bigquery - public - data.samples.shakespeare `" + ` 
LIMIT 
  100;
`)
	return query.Read(ctx)
}

// printResults prints results from a query.
func printResults(w io.Writer, iter *bigquery.RowIterator) error {
	for {
		var row Row
		err := iter.Next(&row)
		if err == iterator.Done {
			return nil
		}
		if err != nil {
			return fmt.Errorf("error iterating through results: %w", err)
		}
		message := &pb.TestMessage{}
		if err = proto.Unmarshal(row.Proto, message); err != nil {
			return err
		}
		fmt.Fprintf(w, "rowKey: %s, message: %v\n", row.RowKey, message)
	}
}

자바

package proto;

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.FieldValueList;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobId;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.TableResult;
import path.to.proto.TestMessage;
import java.util.UUID;

/** Queries Google BigQuery */
public final class Main {
  public static void main(String[] args) throws Exception {
    String projectId = "your-project-id";
    BigQuery bigquery = BigQueryOptions.newBuilder().setProjectId(projectId).build().getService();

    QueryJobConfiguration queryConfig =
        QueryJobConfiguration.newBuilder(
                " SELECT "
                    + "concat(word , \":\",corpus) as RowKey,"
                    + "<dataset-id>.toMyProtoMessage(STRUCT(word, "
                    + "CAST(word_count AS BIGNUMERIC))) AS ProtoResult "
                    + "FROM "
                    + "`bigquery-public-data.samples.shakespeare` "
                    + "ORDER BY word_count DESC "
                    + "LIMIT 20")
            .setUseLegacySql(false)
            .build();

    // Create a job ID so that we can safely retry.
    JobId jobId = JobId.of(UUID.randomUUID().toString());
    Job queryJob = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());

    // Wait for the query to complete.
    queryJob = queryJob.waitFor();

    // Check for errors
    if (queryJob == null) {
      throw new RuntimeException("Job no longer exists");
    } else if (queryJob.getStatus().getError() != null) {
      // You can also look at queryJob.getStatus().getExecutionErrors() for all
      // errors, not just the latest one.
      throw new RuntimeException(queryJob.getStatus().getError().toString());
    }

    // Get the results.
    TableResult result = queryJob.getQueryResults();

    // Print all pages of the results.
    for (FieldValueList row : result.iterateAll()) {
      String key = row.get("RowKey").getStringValue();
      byte[] message = row.get("ProtoResult").getBytesValue();
      TestMessage testMessage = TestMessage.parseFrom(message);
      System.out.printf("rowKey: %s, message: %s\n", key, testMessage);
    }
  }
}

Python

"""Queries Google BigQuery."""

from google.cloud import bigquery
from path.to.proto import awesome_pb2


def main():
  project_id = "your-project-id"
  client = bigquery.Client(project=project_id)
  query_job = client.query(query="""
               SELECT
			concat(word , ":",corpus) as RowKey,
			<dataset-id>.toMyProtoMessage(
			    STRUCT(
			      word, 
			      CAST(word_count AS BIGNUMERIC)
			    )
			  ) AS ProtoResult 
		FROM
				  `bigquery-public-data.samples.shakespeare`
		ORDER BY word_count DESC
		LIMIT 20
    """)
  rows = query_job.result()
  for row in rows:
    message = awesome_pb2.TestMessage()
    message.ParseFromString(row.get("ProtoResult"))
    print(
        "rowKey: {}, message: {}".format(row.get("RowKey"), message)
    )