Exportar datos como columnas Protobuf

En este documento se describe cómo puede exportar datos de BigQuery como columnas de Protocol Buffers (Protobuf) mediante funciones definidas por el usuario (UDFs) de BigQuery.

Cuándo usar columnas Protobuf

BigQuery ofrece varias funciones integradas para dar formato a los datos seleccionados. Una opción es combinar varios valores de columna en un único valor de Protobuf, lo que ofrece las siguientes ventajas:

  • Seguridad de tipos de objetos.
  • Mejora la compresión, el tiempo de transferencia de datos y los costes en comparación con JSON.
  • Flexibilidad, ya que la mayoría de los lenguajes de programación tienen bibliotecas para gestionar Protobuf.
  • Menos sobrecarga al leer de varias columnas y crear un solo objeto.

Aunque otros tipos de columnas también pueden proporcionar seguridad de tipos, el uso de columnas Protobuf proporciona un objeto con todos los tipos definidos, lo que puede reducir la cantidad de trabajo que se debe realizar en la capa de aplicación o en otra parte de la canalización.

Sin embargo, hay limitaciones a la hora de exportar datos de BigQuery como columnas Protobuf:

  • Las columnas de Protobuf no están bien indexadas ni filtradas. Buscar por el contenido de las columnas Protobuf puede ser menos eficaz.
  • Ordenar datos en formato Protobuf puede ser difícil.

Si estas limitaciones se aplican al flujo de trabajo de exportación, puede plantearse usar otros métodos para exportar datos de BigQuery:

  • Usa consultas programadas con instrucciones EXPORT DATA para ordenar los datos de BigQuery exportados por fecha u hora, y para programar exportaciones periódicas. BigQuery permite exportar datos a los formatos Avro, CSV, JSON y Parquet.
  • Usa Dataflow para exportar datos de BigQuery en formato de archivo Avro o CSV.

Roles obligatorios

Para obtener los permisos que necesitas para exportar datos de BigQuery como columnas Protobuf, pide a tu administrador que te conceda los siguientes roles de gestión de identidades y accesos en tu proyecto:

Para obtener más información sobre cómo conceder roles, consulta el artículo Gestionar el acceso a proyectos, carpetas y organizaciones.

También puedes conseguir los permisos necesarios a través de roles personalizados u otros roles predefinidos.

Crear una función definida por el usuario

Crea una FDU que convierta un tipo de datos STRUCT de BigQuery en una columna Protobuf:

  1. En una línea de comandos, clona el repositorio bigquery-utils.git:

    git clone https://github.com/GoogleCloudPlatform/bigquery-utils.git
    
  2. Ve a la carpeta de exportación de Protobuf:

    cd bigquery-utils/tools/protobuf_export
    
  3. Usa el comando cp o el explorador de archivos de tu sistema operativo para copiar el archivo proto en la subcarpeta ./protos.

    Ya hay un archivo proto de ejemplo llamado dummy.proto en la carpeta ./protos.

  4. Instala los paquetes necesarios del repositorio de GitHub:

    npm install
    
  5. Empaqueta el paquete con webpack:

    npx webpack --config webpack.config.js --stats-error-details
    
  6. Busca el archivo pbwrapper.js en la carpeta secundaria ./dist y, a continuación, súbelo a un segmento de Cloud Storage.

  7. Ve a la página BigQuery.

    Ir a BigQuery

  8. En el editor de consultas, crea una UDF llamada toMyProtoMessage que genere una columna Protobuf a partir de las columnas de una tabla de BigQuery:

    CREATE FUNCTION
      DATASET_ID.toMyProtoMessage(input STRUCT<INPUT_FIELDS>)
      RETURNS BYTES
        LANGUAGE js OPTIONS ( library=["gs://BUCKET_NAME/pbwrapper.js"]
    ) AS r"""
    let message = pbwrapper.setup("PROTO_PACKAGE.PROTO_MESSAGE")
    return pbwrapper.parse(message, input)
      """;
    

    Haz los cambios siguientes:

    • DATASET_ID: el ID del conjunto de datos que contendrá la función definida por el usuario.
    • INPUT_FIELDS: los campos que se usan en el tipo de mensaje proto del archivo proto, en el formato field_name_1 field_type_1 [, field_name_2 field_type_2, ...].

      Debes traducir los campos de tipo de mensaje que usen guiones bajos a camel case. Por ejemplo, si el tipo de mensaje es como el siguiente, el valor de los campos de entrada debe ser itemId int64, itemDescription string:

      message ThisMessage {
        int64 item_id = 1;
        string item_description = 2;
      }
      
    • BUCKET_NAME: el nombre del segmento de Cloud Storage que contiene el archivo pbwrapper.js.

    • PROTO_PACKAGE: el paquete del archivo proto.

    • PROTO_MESSAGE: el tipo de mensaje del archivo proto.

    Por ejemplo, si usa el archivo dummy.proto proporcionado, la instrucción CREATE FUNCTION tendrá el siguiente aspecto:

    CREATE OR REPLACE FUNCTION
      mydataset.toMyProtoMessage(input STRUCT<dummyField STRING>)
      RETURNS BYTES
        LANGUAGE js OPTIONS ( library=["gs://mybucket/pbwrapper.js"]
    ) AS r"""
    let message = pbwrapper.setup("dummypackage.DummyMessage")
    return pbwrapper.parse(message, input)
      """;
    

Dar formato a las columnas como valores de Protobuf

Ejecuta la FDU toMyProtoMessage para dar formato a las columnas de la tabla de BigQuery como valores de Protobuf:

  SELECT
    UDF_DATASET_ID.toMyProtoMessage(STRUCT(INPUT_COLUMNS)) AS protoResult
  FROM
    `PROJECT_ID.DATASET_ID.TABLE_NAME`
  LIMIT
    100;

Haz los cambios siguientes:

  • UDF_DATASET_ID: el ID del conjunto de datos que contiene la función definida por el usuario.
  • INPUT_COLUMNS: los nombres de las columnas que se van a formatear como valor de Protobuf, en el formato column_name_1 [, column_name_2, ...]. Las columnas pueden ser de cualquier tipo de valor escalar o no escalar admitido, incluidos ARRAY y STRUCT. Las columnas de entrada deben coincidir con el tipo y el número de los campos del tipo de mensaje proto.
  • PROJECT_ID: el ID del proyecto que contiene la tabla. Puedes omitir la identificación del proyecto si el conjunto de datos está en tu proyecto actual.
  • DATASET_ID: el ID del conjunto de datos que contiene la tabla.
  • TABLE_NAME: el nombre de la tabla que contiene las columnas que se van a formatear.

Por ejemplo, si usa una función definida por el usuario toMyProtoMessage basada en dummy.proto, la siguiente instrucción SELECT funciona:

SELECT
  mydataset.toMyProtoMessage(STRUCT(word)) AS protoResult
FROM
  `bigquery-public-data.samples.shakespeare`
LIMIT 100;

Trabajar con valores de Protobuf

Con los datos de BigQuery exportados en formato Protobuf, ahora puede trabajar con los datos como un objeto o struct con tipo completo.

En los siguientes ejemplos de código se muestran varias formas de procesar o trabajar con los datos exportados:

Go

// package Main queries Google BigQuery.
package main

import (
	"context"
	"fmt"
	"io"
	"log"
	"os"

	"cloud.google.com/go/bigquery"
	"google.golang.org/api/iterator"
	"google.golang.org/Protobuf/proto"

	pb "path/to/proto/file_proto"
)

const (
	projectID = "your-project-id"
)

// Row contains returned row data from bigquery.
type Row struct {
	RowKey string `bigquery:"RowKey"`
	Proto  []byte `bigquery:"ProtoResult"`
}

func main() {
	ctx := context.Background()

	client, err := bigquery.NewClient(ctx, projectID)
	if err != nil {
		log.Fatalf("bigquery.NewClient: %v", err)
	}
	defer client.Close()

	rows, err := query(ctx, client)
	if err != nil {
		log.Fatal(err)
	}
	if err := printResults(os.Stdout, rows); err != nil {
		log.Fatal(err)
	}
}

// query returns a row iterator suitable for reading query results.
func query(ctx context.Context, client *bigquery.Client) (*bigquery.RowIterator, error) {

	query := client.Query(
		`SELECT 
  concat(word, ":", corpus) as RowKey, 
  <dataset-id>.toMyProtoMessage(
    STRUCT(
      word, 
      CAST(word_count AS BIGNUMERIC)
    )
  ) AS ProtoResult 
FROM 
  ` + "` bigquery - public - data.samples.shakespeare `" + ` 
LIMIT 
  100;
`)
	return query.Read(ctx)
}

// printResults prints results from a query.
func printResults(w io.Writer, iter *bigquery.RowIterator) error {
	for {
		var row Row
		err := iter.Next(&row)
		if err == iterator.Done {
			return nil
		}
		if err != nil {
			return fmt.Errorf("error iterating through results: %w", err)
		}
		message := &pb.TestMessage{}
		if err = proto.Unmarshal(row.Proto, message); err != nil {
			return err
		}
		fmt.Fprintf(w, "rowKey: %s, message: %v\n", row.RowKey, message)
	}
}

Java

package proto;

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.FieldValueList;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobId;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.TableResult;
import path.to.proto.TestMessage;
import java.util.UUID;

/** Queries Google BigQuery */
public final class Main {
  public static void main(String[] args) throws Exception {
    String projectId = "your-project-id";
    BigQuery bigquery = BigQueryOptions.newBuilder().setProjectId(projectId).build().getService();

    QueryJobConfiguration queryConfig =
        QueryJobConfiguration.newBuilder(
                " SELECT "
                    + "concat(word , \":\",corpus) as RowKey,"
                    + "<dataset-id>.toMyProtoMessage(STRUCT(word, "
                    + "CAST(word_count AS BIGNUMERIC))) AS ProtoResult "
                    + "FROM "
                    + "`bigquery-public-data.samples.shakespeare` "
                    + "ORDER BY word_count DESC "
                    + "LIMIT 20")
            .setUseLegacySql(false)
            .build();

    // Create a job ID so that we can safely retry.
    JobId jobId = JobId.of(UUID.randomUUID().toString());
    Job queryJob = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());

    // Wait for the query to complete.
    queryJob = queryJob.waitFor();

    // Check for errors
    if (queryJob == null) {
      throw new RuntimeException("Job no longer exists");
    } else if (queryJob.getStatus().getError() != null) {
      // You can also look at queryJob.getStatus().getExecutionErrors() for all
      // errors, not just the latest one.
      throw new RuntimeException(queryJob.getStatus().getError().toString());
    }

    // Get the results.
    TableResult result = queryJob.getQueryResults();

    // Print all pages of the results.
    for (FieldValueList row : result.iterateAll()) {
      String key = row.get("RowKey").getStringValue();
      byte[] message = row.get("ProtoResult").getBytesValue();
      TestMessage testMessage = TestMessage.parseFrom(message);
      System.out.printf("rowKey: %s, message: %s\n", key, testMessage);
    }
  }
}

Python

"""Queries Google BigQuery."""

from google.cloud import bigquery
from path.to.proto import awesome_pb2


def main():
  project_id = "your-project-id"
  client = bigquery.Client(project=project_id)
  query_job = client.query(query="""
               SELECT
			concat(word , ":",corpus) as RowKey,
			<dataset-id>.toMyProtoMessage(
			    STRUCT(
			      word, 
			      CAST(word_count AS BIGNUMERIC)
			    )
			  ) AS ProtoResult 
		FROM
				  `bigquery-public-data.samples.shakespeare`
		ORDER BY word_count DESC
		LIMIT 20
    """)
  rows = query_job.result()
  for row in rows:
    message = awesome_pb2.TestMessage()
    message.ParseFromString(row.get("ProtoResult"))
    print(
        "rowKey: {}, message: {}".format(row.get("RowKey"), message)
    )