Exporta datos como columnas de Protobuf

En este documento, se describe cómo puedes exportar datos de BigQuery como columnas de búferes de protocolo (Protobuf) mediante funciones definidas por el usuario (UDF) de BigQuery.

Cuándo usar columnas de Protobuf

BigQuery ofrece una variedad de funciones integradas para dar formato a los datos seleccionados. Una opción es combinar varios valores de columna en un solo valor de Protobuf, lo que brinda los siguientes beneficios:

  • Seguridad de tipo de objeto
  • Compresión mejorada, tiempo de transferencia de datos y costo en comparación con JSON
  • Flexibilidad, ya que la mayoría de los lenguajes de programación tienen bibliotecas para manejar Protobuf
  • Menos sobrecarga cuando se lee desde varias columnas y se compila un solo objeto

Si bien otros tipos de columna también pueden proporcionar seguridad de tipo, el uso de columnas de Protobuf proporciona un objeto de tipo completo, lo que puede reducir la cantidad de trabajo que se debe realizar en la capa de la aplicación o en otra parte de la canalización.

Sin embargo, existen limitaciones para exportar datos de BigQuery como columnas de Protobuf:

  • Las columnas de Protobuf no están bien indexadas ni filtradas. La búsqueda mediante el contenido de las columnas de Protobuf puede ser menos eficaz.
  • Ordenar datos en formato Protobuf puede ser difícil.

Si estas limitaciones se aplican a tu flujo de trabajo de exportación, puedes considerar otros métodos para exportar datos de BigQuery:

  • Usa Dataflow para exportar datos de BigQuery en formato de archivo Avro o CSV.
  • Usa las consultas programadas para ordenar los datos exportados de BigQuery por fecha o hora, y programar las exportaciones de forma recurrente.

Roles obligatorios

Para obtener los permisos que necesitas para exportar los datos de BigQuery como columnas de Protobuf, pídele a tu administrador que te otorgue los siguientes roles de IAM en tu proyecto:

Si quieres obtener más información para otorgar roles, consulta Administra el acceso.

También puedes obtener los permisos necesarios mediante roles personalizados o cualquier otro rol predefinido.

Crea una UDF

En el siguiente procedimiento, se muestra cómo puedes crear una función definida por el usuario que convierta un tipo de datos STRUCT de BigQuery en una columna de Protobuf:

  1. En una línea de comandos, clona el repositorio bigquery-utils.git.

    $ git clone https://github.com/GoogleCloudPlatform/bigquery-utils.git
    
  2. Navega a la carpeta de exportación de Protobuf.

    $ cd bigquery-utils/tools/protobuf_export
    
  3. Agrega los archivos .proto a la carpeta ./protos.

  4. Instala los paquetes necesarios desde el repositorio de GitHub:

    $ npm install
    
  5. Empaqueta el paquete mediante webpack:

    $ npx webpack --config webpack.config.js --stats-error-details
    
  6. Ubica el archivo pbwrapper.js en tu carpeta dist/ y copia el archivo en un bucket de Cloud Storage.

  7. Crea una UDF que compile una columna Protobuf a partir de tus columnas de BigQuery existentes:

    CREATE FUNCTION
      DATASET_ID.toMyProtoMessage(input STRUCT<field_1 TYPE1, field_2 TYPE2>)
      RETURNS BYTES
        LANGUAGE js OPTIONS ( library=["gs://BUCKET_NAME/pbwrapper.js"]
    ) AS r"""
    let message = pbwrapper.setup("PROTO_PACKAGE.PROTO_MESSAGE")
    return pbwrapper.parse(message, input)
      """;
    

    Reemplaza lo siguiente:

    • DATASET_ID: Es el ID del conjunto de datos en el que almacenas tu función
    • BUCKET_NAME: Es el nombre de tu bucket de Cloud Storage.
    • PROTO_PACKAGE: Es el nombre del paquete de tu archivo proto
    • PROTO_MESSAGE: Es el tipo de mensaje para tu archivo proto

Para obtener más información sobre el uso de paquetes en el lenguaje proto, consulta Paquetes.

Dales formato a las columnas como valores de Protobuf

  • Después de crear la UDF, ejecuta la función:

    SELECT
      DATASET_ID.toMyProtoMessage(STRUCT(COLUMN_TYPE1, COLUMN_TYPE2)) AS protoResult
    FROM
      `DATASET_NAME`
    LIMIT
      100;
    

    Reemplaza lo siguiente:

    • DATASET_ID: Es el ID del conjunto de datos en el que almacenas tu función
    • DATASET_NAME: Es el nombre de tu conjunto de datos, por ejemplo, dataset_name.table_name
    • COLUMN_TYPE1: Es el nombre de una columna. Las columnas pueden incluir cualquier tipo de valor escalar compatible o tipo no escalar, incluidos ARRAY y STRUCT
    • COLUMN_TYPE2: Es el nombre de una columna. Las columnas pueden incluir cualquier tipo de valor escalar compatible o tipo no escalar, incluidos ARRAY y STRUCT

Trabaja con valores de Protobuf

Con tus datos de BigQuery exportados en formato Protobuf, ahora puedes trabajar con tus datos como un objeto o una struct con una definición completa de tipo.

En las siguientes muestras de código, se proporcionan varios ejemplos de formas en las que puedes procesar o trabajar con tus datos exportados:

Go

// package Main queries Google BigQuery.
package main

import (
	"context"
	"fmt"
	"io"
	"log"
	"os"

	"cloud.google.com/go/bigquery"
	"google.golang.org/api/iterator"
	"google.golang.org/Protobuf/proto"

	pb "path/to/proto/file_proto"
)

const (
	projectID = "your-project-id"
)

// Row contains returned row data from bigquery.
type Row struct {
	RowKey string `bigquery:"RowKey"`
	Proto  []byte `bigquery:"ProtoResult"`
}

func main() {
	ctx := context.Background()

	client, err := bigquery.NewClient(ctx, projectID)
	if err != nil {
		log.Fatalf("bigquery.NewClient: %v", err)
	}
	defer client.Close()

	rows, err := query(ctx, client)
	if err != nil {
		log.Fatal(err)
	}
	if err := printResults(os.Stdout, rows); err != nil {
		log.Fatal(err)
	}
}

// query returns a row iterator suitable for reading query results.
func query(ctx context.Context, client *bigquery.Client) (*bigquery.RowIterator, error) {

	query := client.Query(
		`SELECT 
  concat(word, ":", corpus) as RowKey, 
  <dataset-id>.toMyProtoMessage(
    STRUCT(
      word, 
      CAST(word_count AS BIGNUMERIC)
    )
  ) AS ProtoResult 
FROM 
  ` + "` bigquery - public - data.samples.shakespeare `" + ` 
LIMIT 
  100;
`)
	return query.Read(ctx)
}

// printResults prints results from a query.
func printResults(w io.Writer, iter *bigquery.RowIterator) error {
	for {
		var row Row
		err := iter.Next(&row)
		if err == iterator.Done {
			return nil
		}
		if err != nil {
			return fmt.Errorf("error iterating through results: %w", err)
		}
		message := &pb.TestMessage{}
		if err = proto.Unmarshal(row.Proto, message); err != nil {
			return err
		}
		fmt.Fprintf(w, "rowKey: %s, message: %v\n", row.RowKey, message)
	}
}

Java

package proto;

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.FieldValueList;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobId;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.TableResult;
import path.to.proto.TestMessage;
import java.util.UUID;

/** Queries Google BigQuery */
public final class Main {
  public static void main(String[] args) throws Exception {
    String projectId = "your-project-id";
    BigQuery bigquery = BigQueryOptions.newBuilder().setProjectId(projectId).build().getService();

    QueryJobConfiguration queryConfig =
        QueryJobConfiguration.newBuilder(
                " SELECT "
                    + "concat(word , \":\",corpus) as RowKey,"
                    + "<dataset-id>.toMyProtoMessage(STRUCT(word, "
                    + "CAST(word_count AS BIGNUMERIC))) AS ProtoResult "
                    + "FROM "
                    + "`bigquery-public-data.samples.shakespeare` "
                    + "ORDER BY word_count DESC "
                    + "LIMIT 20")
            .setUseLegacySql(false)
            .build();

    // Create a job ID so that we can safely retry.
    JobId jobId = JobId.of(UUID.randomUUID().toString());
    Job queryJob = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());

    // Wait for the query to complete.
    queryJob = queryJob.waitFor();

    // Check for errors
    if (queryJob == null) {
      throw new RuntimeException("Job no longer exists");
    } else if (queryJob.getStatus().getError() != null) {
      // You can also look at queryJob.getStatus().getExecutionErrors() for all
      // errors, not just the latest one.
      throw new RuntimeException(queryJob.getStatus().getError().toString());
    }

    // Get the results.
    TableResult result = queryJob.getQueryResults();

    // Print all pages of the results.
    for (FieldValueList row : result.iterateAll()) {
      String key = row.get("RowKey").getStringValue();
      byte[] message = row.get("ProtoResult").getBytesValue();
      TestMessage testMessage = TestMessage.parseFrom(message);
      System.out.printf("rowKey: %s, message: %s\n", key, testMessage);
    }
  }
}

Python

"""Queries Google BigQuery."""

from google.cloud import bigquery
from path.to.proto import awesome_pb2


def main():
  project_id = "your-project-id"
  client = bigquery.Client(project=project_id)
  query_job = client.query(query="""
               SELECT
			concat(word , ":",corpus) as RowKey,
			<dataset-id>.toMyProtoMessage(
			    STRUCT(
			      word, 
			      CAST(word_count AS BIGNUMERIC)
			    )
			  ) AS ProtoResult 
		FROM
				  `bigquery-public-data.samples.shakespeare`
		ORDER BY word_count DESC
		LIMIT 20
    """)
  rows = query_job.result()
  for row in rows:
    message = awesome_pb2.TestMessage()
    message.ParseFromString(row.get("ProtoResult"))
    print(
        "rowKey: {}, message: {}".format(row.get("RowKey"), message)
    )