Mengekspor data sebagai kolom Protobuf

Dokumen ini menjelaskan cara mengekspor data BigQuery sebagai kolom Buffering Protokol (Protobuf) menggunakan fungsi yang ditentukan pengguna (UDF) BigQuery.

Kapan kolom Protobuf digunakan

BigQuery menawarkan sejumlah fungsi bawaan untuk memformat data yang dipilih. Salah satu opsinya adalah menggabungkan beberapa nilai kolom menjadi satu nilai Protobuf, yang memiliki manfaat berikut:

  • Keamanan jenis objek.
  • Kompresi, waktu transfer data, dan biaya yang lebih baik dibandingkan dengan JSON.
  • Fleksibilitas, karena sebagian besar bahasa pemrograman memiliki library untuk menangani Protobuf.
  • Lebih sedikit overhead saat membaca dari beberapa kolom dan membangun satu objek.

Meskipun jenis kolom lainnya juga dapat memberikan keamanan jenis, penggunaan kolom Protobuf akan menyediakan objek fully-typed, yang dapat mengurangi jumlah pekerjaan yang perlu dilakukan pada lapisan aplikasi atau di bagian lain dari pipeline.

Namun, ada batasan untuk mengekspor data BigQuery sebagai kolom Protobuf:

  • Kolom protobuf tidak diindeks atau difilter dengan baik. Mencari berdasarkan isi kolom Protobuf bisa menjadi kurang efektif.
  • Mengurutkan data dalam format Protobuf bisa jadi sulit.

Jika batasan ini berlaku untuk alur kerja ekspor Anda, Anda dapat mempertimbangkan metode lain untuk mengekspor data BigQuery:

  • Gunakan kueri terjadwal dengan pernyataan EXPORT DATA untuk mengurutkan data BigQuery yang diekspor menurut tanggal atau waktu, dan untuk menjadwalkan ekspor secara berulang. BigQuery mendukung ekspor data ke dalam format Avro, CSV, JSON, dan Parquet.
  • Gunakan Dataflow untuk mengekspor data BigQuery dalam format file Avro atau CSV.

Peran yang diperlukan

Untuk mendapatkan izin yang Anda perlukan untuk mengekspor data BigQuery sebagai kolom Protobuf, minta administrator Anda untuk memberi Anda peran IAM berikut di project Anda:

Untuk mengetahui informasi selengkapnya tentang cara memberikan peran, lihat Mengelola akses ke project, folder, dan organisasi.

Anda mungkin juga bisa mendapatkan izin yang diperlukan melalui peran khusus atau peran bawaan lainnya.

Membuat UDF

Prosedur berikut menunjukkan cara membuat fungsi yang ditentukan pengguna yang mengonversi jenis data STRUCT BigQuery menjadi kolom Protobuf:

  1. Pada command line, clone repositori bigquery-utils.git:

    git clone https://github.com/GoogleCloudPlatform/bigquery-utils.git
    
  2. Buka folder ekspor Protobuf

    cd bigquery-utils/tools/protobuf_export
    
  3. Tambahkan file proto Anda ke folder ./protos.

  4. Instal paket yang diperlukan dari repositori GitHub:

    npm install
    
  5. Paketkan paket menggunakan webpack:

    npx webpack --config webpack.config.js --stats-error-details
    
  6. Temukan file pbwrapper.js di folder dist/ Anda, lalu salin file tersebut ke bucket Cloud Storage.

  7. Buat UDF yang membangun kolom Protobuf dari kolom BigQuery yang ada:

    CREATE FUNCTION
      DATASET_ID.toMyProtoMessage(input STRUCT<field_1 TYPE1, field_2 TYPE2>)
      RETURNS BYTES
        LANGUAGE js OPTIONS ( library=["gs://BUCKET_NAME/pbwrapper.js"]
    ) AS r"""
    let message = pbwrapper.setup("PROTO_PACKAGE.PROTO_MESSAGE")
    return pbwrapper.parse(message, input)
      """;
    

    Ganti kode berikut:

    • DATASET_ID: ID set data tempat Anda menyimpan fungsi
    • BUCKET_NAME: nama bucket Cloud Storage Anda
    • PROTO_PACKAGE: nama paket untuk file proto Anda
    • PROTO_MESSAGE: jenis pesan untuk file proto Anda

Untuk mengetahui informasi selengkapnya tentang penggunaan paket dalam bahasa proto, lihat Paket.

Memformat kolom sebagai nilai Protobuf

  • Setelah Anda membuat UDF, jalankan fungsi:

    SELECT
      DATASET_ID.toMyProtoMessage(STRUCT(COLUMN_TYPE1, COLUMN_TYPE2)) AS protoResult
    FROM
      `DATASET_NAME`
    LIMIT
      100;
    

    Ganti kode berikut:

    • DATASET_ID: ID set data tempat Anda menyimpan fungsi
    • DATASET_NAME: nama set data Anda—misalnya, dataset_name.table_name
    • COLUMN_TYPE1: nama kolom. Kolom dapat berisi jenis nilai skalar yang didukung atau jenis non-skalar, termasuk ARRAY dan STRUCT
    • COLUMN_TYPE2: nama kolom. Kolom dapat berisi jenis nilai skalar yang didukung atau jenis non-skalar, termasuk ARRAY dan STRUCT

Menggunakan nilai Protobuf

Dengan data BigQuery yang diekspor dalam format Protobuf, Anda kini dapat menggunakan data Anda sebagai objek atau struct yang memiliki jenis lengkap.

Contoh kode berikut memberikan beberapa contoh cara untuk memproses atau menggunakan data yang diekspor:

Go

// package Main queries Google BigQuery.
package main

import (
	"context"
	"fmt"
	"io"
	"log"
	"os"

	"cloud.google.com/go/bigquery"
	"google.golang.org/api/iterator"
	"google.golang.org/Protobuf/proto"

	pb "path/to/proto/file_proto"
)

const (
	projectID = "your-project-id"
)

// Row contains returned row data from bigquery.
type Row struct {
	RowKey string `bigquery:"RowKey"`
	Proto  []byte `bigquery:"ProtoResult"`
}

func main() {
	ctx := context.Background()

	client, err := bigquery.NewClient(ctx, projectID)
	if err != nil {
		log.Fatalf("bigquery.NewClient: %v", err)
	}
	defer client.Close()

	rows, err := query(ctx, client)
	if err != nil {
		log.Fatal(err)
	}
	if err := printResults(os.Stdout, rows); err != nil {
		log.Fatal(err)
	}
}

// query returns a row iterator suitable for reading query results.
func query(ctx context.Context, client *bigquery.Client) (*bigquery.RowIterator, error) {

	query := client.Query(
		`SELECT 
  concat(word, ":", corpus) as RowKey, 
  <dataset-id>.toMyProtoMessage(
    STRUCT(
      word, 
      CAST(word_count AS BIGNUMERIC)
    )
  ) AS ProtoResult 
FROM 
  ` + "` bigquery - public - data.samples.shakespeare `" + ` 
LIMIT 
  100;
`)
	return query.Read(ctx)
}

// printResults prints results from a query.
func printResults(w io.Writer, iter *bigquery.RowIterator) error {
	for {
		var row Row
		err := iter.Next(&row)
		if err == iterator.Done {
			return nil
		}
		if err != nil {
			return fmt.Errorf("error iterating through results: %w", err)
		}
		message := &pb.TestMessage{}
		if err = proto.Unmarshal(row.Proto, message); err != nil {
			return err
		}
		fmt.Fprintf(w, "rowKey: %s, message: %v\n", row.RowKey, message)
	}
}

Java

package proto;

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.FieldValueList;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobId;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.TableResult;
import path.to.proto.TestMessage;
import java.util.UUID;

/** Queries Google BigQuery */
public final class Main {
  public static void main(String[] args) throws Exception {
    String projectId = "your-project-id";
    BigQuery bigquery = BigQueryOptions.newBuilder().setProjectId(projectId).build().getService();

    QueryJobConfiguration queryConfig =
        QueryJobConfiguration.newBuilder(
                " SELECT "
                    + "concat(word , \":\",corpus) as RowKey,"
                    + "<dataset-id>.toMyProtoMessage(STRUCT(word, "
                    + "CAST(word_count AS BIGNUMERIC))) AS ProtoResult "
                    + "FROM "
                    + "`bigquery-public-data.samples.shakespeare` "
                    + "ORDER BY word_count DESC "
                    + "LIMIT 20")
            .setUseLegacySql(false)
            .build();

    // Create a job ID so that we can safely retry.
    JobId jobId = JobId.of(UUID.randomUUID().toString());
    Job queryJob = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());

    // Wait for the query to complete.
    queryJob = queryJob.waitFor();

    // Check for errors
    if (queryJob == null) {
      throw new RuntimeException("Job no longer exists");
    } else if (queryJob.getStatus().getError() != null) {
      // You can also look at queryJob.getStatus().getExecutionErrors() for all
      // errors, not just the latest one.
      throw new RuntimeException(queryJob.getStatus().getError().toString());
    }

    // Get the results.
    TableResult result = queryJob.getQueryResults();

    // Print all pages of the results.
    for (FieldValueList row : result.iterateAll()) {
      String key = row.get("RowKey").getStringValue();
      byte[] message = row.get("ProtoResult").getBytesValue();
      TestMessage testMessage = TestMessage.parseFrom(message);
      System.out.printf("rowKey: %s, message: %s\n", key, testMessage);
    }
  }
}

Python

"""Queries Google BigQuery."""

from google.cloud import bigquery
from path.to.proto import awesome_pb2


def main():
  project_id = "your-project-id"
  client = bigquery.Client(project=project_id)
  query_job = client.query(query="""
               SELECT
			concat(word , ":",corpus) as RowKey,
			<dataset-id>.toMyProtoMessage(
			    STRUCT(
			      word, 
			      CAST(word_count AS BIGNUMERIC)
			    )
			  ) AS ProtoResult 
		FROM
				  `bigquery-public-data.samples.shakespeare`
		ORDER BY word_count DESC
		LIMIT 20
    """)
  rows = query_job.result()
  for row in rows:
    message = awesome_pb2.TestMessage()
    message.ParseFromString(row.get("ProtoResult"))
    print(
        "rowKey: {}, message: {}".format(row.get("RowKey"), message)
    )