Exporte dados como colunas Protobuf

Este documento descreve como pode exportar dados do BigQuery como colunas de Protocol Buffers (Protobuf) através de funções definidas pelo utilizador (UDFs) do BigQuery.

Quando usar colunas Protobuf

O BigQuery oferece várias funções incorporadas para formatar os dados selecionados. Uma opção é unir vários valores de colunas num único valor Protobuf, o que tem as seguintes vantagens:

  • Segurança do tipo de objeto.
  • Compressão, tempo de transferência de dados e custo melhorados em comparação com o JSON.
  • Flexibilidade, uma vez que a maioria das linguagens de programação tem bibliotecas para processar o Protobuf.
  • Menos sobrecarga ao ler de várias colunas e criar um único objeto.

Embora outros tipos de colunas também possam oferecer segurança de tipos, a utilização de colunas Protobuf oferece um objeto totalmente tipado, o que pode reduzir a quantidade de trabalho que tem de ser feito na camada de aplicação ou noutra parte do pipeline.

No entanto, existem limitações à exportação de dados do BigQuery como colunas Protobuf:

  • As colunas Protobuf não estão bem indexadas nem filtradas. A pesquisa pelo conteúdo das colunas Protobuf pode ser menos eficaz.
  • A ordenação de dados no formato Protobuf pode ser difícil.

Se estas limitações se aplicarem ao fluxo de trabalho de exportação, pode considerar outros métodos de exportação de dados do BigQuery:

  • Use consultas agendadas com declarações EXPORT DATA para ordenar os dados exportados do BigQuery por data ou hora e para agendar exportações recorrentes. O BigQuery suporta a exportação de dados para os formatos Avro, CSV, JSON e Parquet.
  • Use o Dataflow para exportar dados do BigQuery no formato de ficheiro Avro ou CSV.

Funções necessárias

Para obter as autorizações de que precisa para exportar dados do BigQuery como colunas Protobuf, peça ao seu administrador que lhe conceda as seguintes funções de IAM no seu projeto:

Para mais informações sobre a atribuição de funções, consulte o artigo Faça a gestão do acesso a projetos, pastas e organizações.

Também pode conseguir as autorizações necessárias através de funções personalizadas ou outras funções predefinidas.

Crie uma FDU

Crie uma UDF que converta um tipo de dados do BigQuery STRUCT numa coluna Protobuf:

  1. Numa linha de comandos, clone o repositório bigquery-utils.git:

    git clone https://github.com/GoogleCloudPlatform/bigquery-utils.git
    
  2. Navegue para a pasta de exportação do Protobuf:

    cd bigquery-utils/tools/protobuf_export
    
  3. Use o comando cp ou o explorador de ficheiros do seu sistema operativo para copiar o ficheiro proto para a pasta secundária ./protos.

    Já existe um ficheiro proto de exemplo com o nome dummy.proto na pasta ./protos.

  4. Instale os pacotes necessários a partir do repositório do GitHub:

    npm install
    
  5. Agrupe o pacote com o webpack:

    npx webpack --config webpack.config.js --stats-error-details
    
  6. Localize o ficheiro pbwrapper.js na pasta secundária ./dist e, de seguida, carregue o ficheiro para um contentor do Cloud Storage.

  7. Aceda à página do BigQuery.

    Aceda ao BigQuery

  8. Com o editor de consultas, crie uma UDF denominada toMyProtoMessage que crie uma coluna Protobuf a partir de colunas de tabelas do BigQuery existentes:

    CREATE FUNCTION
      DATASET_ID.toMyProtoMessage(input STRUCT<INPUT_FIELDS>)
      RETURNS BYTES
        LANGUAGE js OPTIONS ( library=["gs://BUCKET_NAME/pbwrapper.js"]
    ) AS r"""
    let message = pbwrapper.setup("PROTO_PACKAGE.PROTO_MESSAGE")
    return pbwrapper.parse(message, input)
      """;
    

    Substitua o seguinte:

    • DATASET_ID: o ID do conjunto de dados que vai conter a FDU.
    • INPUT_FIELDS: os campos usados no tipo de mensagem proto para o ficheiro proto, no formato field_name_1 field_type_1 [, field_name_2 field_type_2, ...].

      Tem de traduzir todos os campos de tipo de mensagem que usam carateres de sublinhado para usar, em alternativa, o formato camel case. Por exemplo, se o tipo de mensagem for semelhante ao seguinte, o valor dos campos de entrada tem de ser itemId int64, itemDescription string:

      message ThisMessage {
        int64 item_id = 1;
        string item_description = 2;
      }
      
    • BUCKET_NAME: o nome do contentor do Cloud Storage que contém o ficheiro pbwrapper.js.

    • PROTO_PACKAGE: o pacote para o ficheiro proto.

    • PROTO_MESSAGE: o tipo de mensagem para o ficheiro proto.

    Por exemplo, se usar o ficheiro dummy.proto fornecido, a declaração CREATE FUNCTION tem o seguinte aspeto:

    CREATE OR REPLACE FUNCTION
      mydataset.toMyProtoMessage(input STRUCT<dummyField STRING>)
      RETURNS BYTES
        LANGUAGE js OPTIONS ( library=["gs://mybucket/pbwrapper.js"]
    ) AS r"""
    let message = pbwrapper.setup("dummypackage.DummyMessage")
    return pbwrapper.parse(message, input)
      """;
    

Formate colunas como valores Protobuf

Execute a FDU para formatar as colunas da tabela do BigQuery como valores Protobuf:toMyProtoMessage

  SELECT
    UDF_DATASET_ID.toMyProtoMessage(STRUCT(INPUT_COLUMNS)) AS protoResult
  FROM
    `PROJECT_ID.DATASET_ID.TABLE_NAME`
  LIMIT
    100;

Substitua o seguinte:

  • UDF_DATASET_ID: o ID do conjunto de dados que contém a FDU.
  • INPUT_COLUMNS: os nomes das colunas a formatar como um valor Protobuf, no formato column_name_1 [, column_name_2, ...]. As colunas podem ser de qualquer tipo de valor escalar suportado ou tipo não escalar, incluindo ARRAY e STRUCT. As colunas de entrada têm de corresponder ao tipo e ao número dos campos do tipo de mensagem proto.
  • PROJECT_ID: o ID do projeto que contém a tabela. Pode ignorar a identificação do projeto se o conjunto de dados estiver no seu projeto atual.
  • DATASET_ID: o ID do conjunto de dados que contém a tabela.
  • TABLE_NAME: o nome da tabela que contém as colunas a formatar.

Por exemplo, se usar uma toMyProtoMessage UDF baseada em dummy.proto, a seguinte declaração SELECT funciona:

SELECT
  mydataset.toMyProtoMessage(STRUCT(word)) AS protoResult
FROM
  `bigquery-public-data.samples.shakespeare`
LIMIT 100;

Trabalhe com valores Protobuf

Com os dados do BigQuery exportados no formato Protobuf, pode agora trabalhar com os dados como um objeto ou uma estrutura totalmente tipados.

Os exemplos de código seguintes oferecem vários exemplos de formas de processar ou trabalhar com os dados exportados:

Go

// package Main queries Google BigQuery.
package main

import (
	"context"
	"fmt"
	"io"
	"log"
	"os"

	"cloud.google.com/go/bigquery"
	"google.golang.org/api/iterator"
	"google.golang.org/Protobuf/proto"

	pb "path/to/proto/file_proto"
)

const (
	projectID = "your-project-id"
)

// Row contains returned row data from bigquery.
type Row struct {
	RowKey string `bigquery:"RowKey"`
	Proto  []byte `bigquery:"ProtoResult"`
}

func main() {
	ctx := context.Background()

	client, err := bigquery.NewClient(ctx, projectID)
	if err != nil {
		log.Fatalf("bigquery.NewClient: %v", err)
	}
	defer client.Close()

	rows, err := query(ctx, client)
	if err != nil {
		log.Fatal(err)
	}
	if err := printResults(os.Stdout, rows); err != nil {
		log.Fatal(err)
	}
}

// query returns a row iterator suitable for reading query results.
func query(ctx context.Context, client *bigquery.Client) (*bigquery.RowIterator, error) {

	query := client.Query(
		`SELECT 
  concat(word, ":", corpus) as RowKey, 
  <dataset-id>.toMyProtoMessage(
    STRUCT(
      word, 
      CAST(word_count AS BIGNUMERIC)
    )
  ) AS ProtoResult 
FROM 
  ` + "` bigquery - public - data.samples.shakespeare `" + ` 
LIMIT 
  100;
`)
	return query.Read(ctx)
}

// printResults prints results from a query.
func printResults(w io.Writer, iter *bigquery.RowIterator) error {
	for {
		var row Row
		err := iter.Next(&row)
		if err == iterator.Done {
			return nil
		}
		if err != nil {
			return fmt.Errorf("error iterating through results: %w", err)
		}
		message := &pb.TestMessage{}
		if err = proto.Unmarshal(row.Proto, message); err != nil {
			return err
		}
		fmt.Fprintf(w, "rowKey: %s, message: %v\n", row.RowKey, message)
	}
}

Java

package proto;

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.FieldValueList;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobId;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.TableResult;
import path.to.proto.TestMessage;
import java.util.UUID;

/** Queries Google BigQuery */
public final class Main {
  public static void main(String[] args) throws Exception {
    String projectId = "your-project-id";
    BigQuery bigquery = BigQueryOptions.newBuilder().setProjectId(projectId).build().getService();

    QueryJobConfiguration queryConfig =
        QueryJobConfiguration.newBuilder(
                " SELECT "
                    + "concat(word , \":\",corpus) as RowKey,"
                    + "<dataset-id>.toMyProtoMessage(STRUCT(word, "
                    + "CAST(word_count AS BIGNUMERIC))) AS ProtoResult "
                    + "FROM "
                    + "`bigquery-public-data.samples.shakespeare` "
                    + "ORDER BY word_count DESC "
                    + "LIMIT 20")
            .setUseLegacySql(false)
            .build();

    // Create a job ID so that we can safely retry.
    JobId jobId = JobId.of(UUID.randomUUID().toString());
    Job queryJob = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());

    // Wait for the query to complete.
    queryJob = queryJob.waitFor();

    // Check for errors
    if (queryJob == null) {
      throw new RuntimeException("Job no longer exists");
    } else if (queryJob.getStatus().getError() != null) {
      // You can also look at queryJob.getStatus().getExecutionErrors() for all
      // errors, not just the latest one.
      throw new RuntimeException(queryJob.getStatus().getError().toString());
    }

    // Get the results.
    TableResult result = queryJob.getQueryResults();

    // Print all pages of the results.
    for (FieldValueList row : result.iterateAll()) {
      String key = row.get("RowKey").getStringValue();
      byte[] message = row.get("ProtoResult").getBytesValue();
      TestMessage testMessage = TestMessage.parseFrom(message);
      System.out.printf("rowKey: %s, message: %s\n", key, testMessage);
    }
  }
}

Python

"""Queries Google BigQuery."""

from google.cloud import bigquery
from path.to.proto import awesome_pb2


def main():
  project_id = "your-project-id"
  client = bigquery.Client(project=project_id)
  query_job = client.query(query="""
               SELECT
			concat(word , ":",corpus) as RowKey,
			<dataset-id>.toMyProtoMessage(
			    STRUCT(
			      word, 
			      CAST(word_count AS BIGNUMERIC)
			    )
			  ) AS ProtoResult 
		FROM
				  `bigquery-public-data.samples.shakespeare`
		ORDER BY word_count DESC
		LIMIT 20
    """)
  rows = query_job.result()
  for row in rows:
    message = awesome_pb2.TestMessage()
    message.ParseFromString(row.get("ProtoResult"))
    print(
        "rowKey: {}, message: {}".format(row.get("RowKey"), message)
    )