Exportar dados como colunas Protobuf

Neste documento, descrevemos como exportar dados do BigQuery como colunas de buffers de protocolo (Protobuf) usando funções definidas pelo usuário (UDFs, na sigla em inglês) do BigQuery.

Quando usar as colunas do Protobuf

O BigQuery oferece várias funções integradas para formatar os dados selecionados. Uma opção é mesclar vários valores de coluna em um único valor de Protobuf, o que tem os seguintes benefícios:

  • Segurança de tipo de objeto
  • Compactação, tempo de transferência de dados e custo aprimorados em comparação com o JSON.
  • Flexibilidade, já que a maioria das linguagens de programação tem bibliotecas para lidar com Protobuf.
  • Menos sobrecarga ao ler de várias colunas e criar um único objeto.

Enquanto outros tipos de coluna também podem fornecer segurança de tipo, o uso de colunas Protobuf fornece um objeto totalmente tipado, o que pode reduzir a quantidade de trabalho que precisa ser feita na camada do aplicativo ou em outra parte do pipeline.

No entanto, há limitações para exportar dados do BigQuery como colunas Protobuf:

  • As colunas protobuf não são bem indexadas ou filtradas. Pesquisar pelo conteúdo das colunas do Protobuf pode ser menos eficaz.
  • Classificar dados no formato Protobuf pode ser difícil.

Se essas limitações se aplicarem ao seu fluxo de trabalho de exportação, considere outros métodos de exportação de dados do BigQuery:

  • Use o Dataflow para exportar dados do BigQuery nos formatos de arquivo Avro ou CSV.
  • Use consultas programadas para classificar os dados exportados do BigQuery por data ou hora e programar exportações de maneira recorrente.

Funções exigidas

Para conseguir as permissões necessárias para exportar dados do BigQuery como colunas do Protobuf, peça ao seu administrador para conceder a você os seguintes papéis do IAM no seu projeto:

Para mais informações sobre a concessão de papéis, consulte Gerenciar o acesso a projetos, pastas e organizações.

Também é possível conseguir as permissões necessárias por meio de papéis personalizados ou de outros papéis predefinidos.

Criar uma UDF

O procedimento a seguir mostra como criar uma função definida pelo usuário que converte um tipo de dados STRUCT do BigQuery em uma coluna Protobuf:

  1. Em uma linha de comando, clone o repositório bigquery-utils.git:

    $ git clone https://github.com/GoogleCloudPlatform/bigquery-utils.git
    
  2. Navegue até a pasta de exportação do Protobuf.

    $ cd bigquery-utils/tools/protobuf_export
    
  3. Adicione seus arquivos proto à pasta ./protos.

  4. Instale os pacotes necessários no repositório do GitHub:

    $ npm install
    
  5. Agrupe o pacote usando o webpack:

    $ npx webpack --config webpack.config.js --stats-error-details
    
  6. Localize o arquivo pbwrapper.js na pasta dist/ e copie-o para um bucket do Cloud Storage.

  7. Crie uma UDF que crie uma coluna Protobuf com base nas colunas atuais do BigQuery:

    CREATE FUNCTION
      DATASET_ID.toMyProtoMessage(input STRUCT<field_1 TYPE1, field_2 TYPE2>)
      RETURNS BYTES
        LANGUAGE js OPTIONS ( library=["gs://BUCKET_NAME/pbwrapper.js"]
    ) AS r"""
    let message = pbwrapper.setup("PROTO_PACKAGE.PROTO_MESSAGE")
    return pbwrapper.parse(message, input)
      """;
    

    Substitua:

    • DATASET_ID: o ID do conjunto de dados em que você armazena sua função
    • BUCKET_NAME: o nome do bucket do Cloud Storage
    • PROTO_PACKAGE: o nome do pacote do arquivo proto.
    • PROTO_MESSAGE: o tipo de mensagem do arquivo proto.

Para saber mais sobre como usar pacotes na linguagem proto, consulte Pacotes.

Formatar colunas como valores Protobuf

  • Depois de criar a UDF, execute a função:

    SELECT
      DATASET_ID.toMyProtoMessage(STRUCT(COLUMN_TYPE1, COLUMN_TYPE2)) AS protoResult
    FROM
      `DATASET_NAME`
    LIMIT
      100;
    

    Substitua:

    • DATASET_ID: o ID do conjunto de dados em que você armazena sua função
    • DATASET_NAME: o nome do conjunto de dados, por exemplo, dataset_name.table_name
    • COLUMN_TYPE1: o nome de uma coluna. As colunas podem incluir qualquer tipo de valor escalar ou não escalar compatível, incluindo ARRAY e STRUCT.
    • COLUMN_TYPE2: o nome de uma coluna. As colunas podem incluir qualquer tipo de valor escalar ou não escalar compatível, incluindo ARRAY e STRUCT.

Trabalhar com valores Protobuf

Com os dados do BigQuery exportados no formato Protobuf, agora é possível trabalhar com seus dados como um objeto ou estrutura totalmente tipado.

Os exemplos de código a seguir fornecem vários exemplos de maneiras de processar ou trabalhar com os dados exportados:

Go

// package Main queries Google BigQuery.
package main

import (
	"context"
	"fmt"
	"io"
	"log"
	"os"

	"cloud.google.com/go/bigquery"
	"google.golang.org/api/iterator"
	"google.golang.org/Protobuf/proto"

	pb "path/to/proto/file_proto"
)

const (
	projectID = "your-project-id"
)

// Row contains returned row data from bigquery.
type Row struct {
	RowKey string `bigquery:"RowKey"`
	Proto  []byte `bigquery:"ProtoResult"`
}

func main() {
	ctx := context.Background()

	client, err := bigquery.NewClient(ctx, projectID)
	if err != nil {
		log.Fatalf("bigquery.NewClient: %v", err)
	}
	defer client.Close()

	rows, err := query(ctx, client)
	if err != nil {
		log.Fatal(err)
	}
	if err := printResults(os.Stdout, rows); err != nil {
		log.Fatal(err)
	}
}

// query returns a row iterator suitable for reading query results.
func query(ctx context.Context, client *bigquery.Client) (*bigquery.RowIterator, error) {

	query := client.Query(
		`SELECT 
  concat(word, ":", corpus) as RowKey, 
  <dataset-id>.toMyProtoMessage(
    STRUCT(
      word, 
      CAST(word_count AS BIGNUMERIC)
    )
  ) AS ProtoResult 
FROM 
  ` + "` bigquery - public - data.samples.shakespeare `" + ` 
LIMIT 
  100;
`)
	return query.Read(ctx)
}

// printResults prints results from a query.
func printResults(w io.Writer, iter *bigquery.RowIterator) error {
	for {
		var row Row
		err := iter.Next(&row)
		if err == iterator.Done {
			return nil
		}
		if err != nil {
			return fmt.Errorf("error iterating through results: %w", err)
		}
		message := &pb.TestMessage{}
		if err = proto.Unmarshal(row.Proto, message); err != nil {
			return err
		}
		fmt.Fprintf(w, "rowKey: %s, message: %v\n", row.RowKey, message)
	}
}

Java

package proto;

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.FieldValueList;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobId;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.TableResult;
import path.to.proto.TestMessage;
import java.util.UUID;

/** Queries Google BigQuery */
public final class Main {
  public static void main(String[] args) throws Exception {
    String projectId = "your-project-id";
    BigQuery bigquery = BigQueryOptions.newBuilder().setProjectId(projectId).build().getService();

    QueryJobConfiguration queryConfig =
        QueryJobConfiguration.newBuilder(
                " SELECT "
                    + "concat(word , \":\",corpus) as RowKey,"
                    + "<dataset-id>.toMyProtoMessage(STRUCT(word, "
                    + "CAST(word_count AS BIGNUMERIC))) AS ProtoResult "
                    + "FROM "
                    + "`bigquery-public-data.samples.shakespeare` "
                    + "ORDER BY word_count DESC "
                    + "LIMIT 20")
            .setUseLegacySql(false)
            .build();

    // Create a job ID so that we can safely retry.
    JobId jobId = JobId.of(UUID.randomUUID().toString());
    Job queryJob = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());

    // Wait for the query to complete.
    queryJob = queryJob.waitFor();

    // Check for errors
    if (queryJob == null) {
      throw new RuntimeException("Job no longer exists");
    } else if (queryJob.getStatus().getError() != null) {
      // You can also look at queryJob.getStatus().getExecutionErrors() for all
      // errors, not just the latest one.
      throw new RuntimeException(queryJob.getStatus().getError().toString());
    }

    // Get the results.
    TableResult result = queryJob.getQueryResults();

    // Print all pages of the results.
    for (FieldValueList row : result.iterateAll()) {
      String key = row.get("RowKey").getStringValue();
      byte[] message = row.get("ProtoResult").getBytesValue();
      TestMessage testMessage = TestMessage.parseFrom(message);
      System.out.printf("rowKey: %s, message: %s\n", key, testMessage);
    }
  }
}

Python

"""Queries Google BigQuery."""

from google.cloud import bigquery
from path.to.proto import awesome_pb2


def main():
  project_id = "your-project-id"
  client = bigquery.Client(project=project_id)
  query_job = client.query(query="""
               SELECT
			concat(word , ":",corpus) as RowKey,
			<dataset-id>.toMyProtoMessage(
			    STRUCT(
			      word, 
			      CAST(word_count AS BIGNUMERIC)
			    )
			  ) AS ProtoResult 
		FROM
				  `bigquery-public-data.samples.shakespeare`
		ORDER BY word_count DESC
		LIMIT 20
    """)
  rows = query_job.result()
  for row in rows:
    message = awesome_pb2.TestMessage()
    message.ParseFromString(row.get("ProtoResult"))
    print(
        "rowKey: {}, message: {}".format(row.get("RowKey"), message)
    )