Esportare i dati come colonne Protobuf

Questo documento descrive come esportare i dati BigQuery come colonne Protocol Buffer (Protobuf) utilizzando le funzioni definite dall'utente (UDF) di BigQuery.

Quando utilizzare le colonne Protobuf

BigQuery offre una serie di funzioni integrate per formattare i dati selezionati. Un'opzione è unire più valori di colonna in un unico valore Protobuf, che presenta i seguenti vantaggi:

  • Sicurezza del tipo di oggetto.
  • Compressione, tempo di trasferimento dei dati e costi migliorati rispetto a JSON.
  • Flessibilità, poiché la maggior parte dei linguaggi di programmazione dispone di librerie per gestire Protobuf.
  • Meno overhead durante la lettura da più colonne e la creazione di un singolo oggetto.

Sebbene anche altri tipi di colonne possano garantire la sicurezza del tipo, l'utilizzo delle colonne Protobuf fornisce un oggetto completamente tipizzato, che può ridurre la quantità di lavoro da svolgere a livello di applicazione o in un'altra parte della pipeline.

Tuttavia, esistono limitazioni per l'esportazione dei dati di BigQuery come colonne Protobuf:

  • Le colonne Protobuf non sono ben indicizzate o filtrate. La ricerca in base ai contenuti delle colonne Protobuf può essere meno efficace.
  • L'ordinamento dei dati in formato Protobuf può essere difficile.

Se queste limitazioni si applicano al tuo flusso di lavoro di esportazione, potresti prendere in considerazione altri metodi per esportare i dati di BigQuery:

  • Utilizza le query pianificate con istanze EXPORT DATA per ordinare i dati di BigQuery esportati in base alla data o all'ora e per pianificare le esportazioni su base ricorrente. BigQuery supporta l'esportazione dei dati nei formati Avro, CSV, JSON e Parquet.
  • Utilizza Dataflow per esportare i dati di BigQuery in formato file Avro o CSV.

Ruoli obbligatori

Per ottenere le autorizzazioni necessarie per esportare i dati BigQuery come colonne Protobuf, chiedi all'amministratore di concederti i seguenti ruoli IAM nel progetto:

Per saperne di più sulla concessione dei ruoli, consulta Gestire l'accesso a progetti, cartelle e organizzazioni.

Potresti anche riuscire a ottenere le autorizzazioni richieste tramite i ruoli personalizzati o altri ruoli predefiniti.

Creare una funzione definita dall'utente

La procedura seguente mostra come creare una funzione definita dall'utente;utente che converte un tipo di dati STRUCT BigQuery in una colonna Protobuf:

  1. In una riga di comando, clona il repository bigquery-utils.git:

    git clone https://github.com/GoogleCloudPlatform/bigquery-utils.git
    
  2. Vai alla cartella di esportazione di Protobuf

    cd bigquery-utils/tools/protobuf_export
    
  3. Aggiungi i file proto alla cartella ./protos.

  4. Installa i pacchetti necessari dal repository GitHub:

    npm install
    
  5. Unisci il pacchetto utilizzando webpack:

    npx webpack --config webpack.config.js --stats-error-details
    
  6. Individua il file pbwrapper.js nella cartella dist/ e copialo in un bucket Cloud Storage.

  7. Crea una UDF che genera una colonna Protobuf dalle colonne BigQuery esistenti:

    CREATE FUNCTION
      DATASET_ID.toMyProtoMessage(input STRUCT<field_1 TYPE1, field_2 TYPE2>)
      RETURNS BYTES
        LANGUAGE js OPTIONS ( library=["gs://BUCKET_NAME/pbwrapper.js"]
    ) AS r"""
    let message = pbwrapper.setup("PROTO_PACKAGE.PROTO_MESSAGE")
    return pbwrapper.parse(message, input)
      """;
    

    Sostituisci quanto segue:

    • DATASET_ID: l'ID del set di dati in cui memorizzi la funzione
    • BUCKET_NAME: il nome del bucket Cloud Storage
    • PROTO_PACKAGE: il nome del pacchetto per il file proto
    • PROTO_MESSAGE: il tipo di messaggio per il file proto

Per ulteriori informazioni sull'utilizzo dei pacchetti nel linguaggio proto, consulta Pacchetti.

Formattare le colonne come valori Protobuf

  • Dopo aver creato la FDU, esegui la funzione:

    SELECT
      DATASET_ID.toMyProtoMessage(STRUCT(COLUMN_TYPE1, COLUMN_TYPE2)) AS protoResult
    FROM
      `DATASET_NAME`
    LIMIT
      100;
    

    Sostituisci quanto segue:

    • DATASET_ID: l'ID del set di dati in cui memorizzi la funzione
    • DATASET_NAME: il nome del set di dati, ad esempio dataset_name.table_name
    • COLUMN_TYPE1: il nome di una colonna. Le colonne possono includere qualsiasi tipo di valore scalare supportato o tipo non scalare, inclusi ARRAY e STRUCT
    • COLUMN_TYPE2: il nome di una colonna. Le colonne possono includere qualsiasi tipo di valore scalare supportato o tipo non scalare, inclusi ARRAY e STRUCT

Lavorare con i valori Protobuf

Con i dati BigQuery esportati nel formato Protobuf, ora puoi lavorare con i dati come oggetto o struct completamente tipizzato.

I seguenti esempi di codice forniscono diversi esempi di modi in cui puoi elaborare o utilizzare i dati esportati:

Vai

// package Main queries Google BigQuery.
package main

import (
	"context"
	"fmt"
	"io"
	"log"
	"os"

	"cloud.google.com/go/bigquery"
	"google.golang.org/api/iterator"
	"google.golang.org/Protobuf/proto"

	pb "path/to/proto/file_proto"
)

const (
	projectID = "your-project-id"
)

// Row contains returned row data from bigquery.
type Row struct {
	RowKey string `bigquery:"RowKey"`
	Proto  []byte `bigquery:"ProtoResult"`
}

func main() {
	ctx := context.Background()

	client, err := bigquery.NewClient(ctx, projectID)
	if err != nil {
		log.Fatalf("bigquery.NewClient: %v", err)
	}
	defer client.Close()

	rows, err := query(ctx, client)
	if err != nil {
		log.Fatal(err)
	}
	if err := printResults(os.Stdout, rows); err != nil {
		log.Fatal(err)
	}
}

// query returns a row iterator suitable for reading query results.
func query(ctx context.Context, client *bigquery.Client) (*bigquery.RowIterator, error) {

	query := client.Query(
		`SELECT 
  concat(word, ":", corpus) as RowKey, 
  <dataset-id>.toMyProtoMessage(
    STRUCT(
      word, 
      CAST(word_count AS BIGNUMERIC)
    )
  ) AS ProtoResult 
FROM 
  ` + "` bigquery - public - data.samples.shakespeare `" + ` 
LIMIT 
  100;
`)
	return query.Read(ctx)
}

// printResults prints results from a query.
func printResults(w io.Writer, iter *bigquery.RowIterator) error {
	for {
		var row Row
		err := iter.Next(&row)
		if err == iterator.Done {
			return nil
		}
		if err != nil {
			return fmt.Errorf("error iterating through results: %w", err)
		}
		message := &pb.TestMessage{}
		if err = proto.Unmarshal(row.Proto, message); err != nil {
			return err
		}
		fmt.Fprintf(w, "rowKey: %s, message: %v\n", row.RowKey, message)
	}
}

Java

package proto;

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.FieldValueList;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobId;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.TableResult;
import path.to.proto.TestMessage;
import java.util.UUID;

/** Queries Google BigQuery */
public final class Main {
  public static void main(String[] args) throws Exception {
    String projectId = "your-project-id";
    BigQuery bigquery = BigQueryOptions.newBuilder().setProjectId(projectId).build().getService();

    QueryJobConfiguration queryConfig =
        QueryJobConfiguration.newBuilder(
                " SELECT "
                    + "concat(word , \":\",corpus) as RowKey,"
                    + "<dataset-id>.toMyProtoMessage(STRUCT(word, "
                    + "CAST(word_count AS BIGNUMERIC))) AS ProtoResult "
                    + "FROM "
                    + "`bigquery-public-data.samples.shakespeare` "
                    + "ORDER BY word_count DESC "
                    + "LIMIT 20")
            .setUseLegacySql(false)
            .build();

    // Create a job ID so that we can safely retry.
    JobId jobId = JobId.of(UUID.randomUUID().toString());
    Job queryJob = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());

    // Wait for the query to complete.
    queryJob = queryJob.waitFor();

    // Check for errors
    if (queryJob == null) {
      throw new RuntimeException("Job no longer exists");
    } else if (queryJob.getStatus().getError() != null) {
      // You can also look at queryJob.getStatus().getExecutionErrors() for all
      // errors, not just the latest one.
      throw new RuntimeException(queryJob.getStatus().getError().toString());
    }

    // Get the results.
    TableResult result = queryJob.getQueryResults();

    // Print all pages of the results.
    for (FieldValueList row : result.iterateAll()) {
      String key = row.get("RowKey").getStringValue();
      byte[] message = row.get("ProtoResult").getBytesValue();
      TestMessage testMessage = TestMessage.parseFrom(message);
      System.out.printf("rowKey: %s, message: %s\n", key, testMessage);
    }
  }
}

Python

"""Queries Google BigQuery."""

from google.cloud import bigquery
from path.to.proto import awesome_pb2


def main():
  project_id = "your-project-id"
  client = bigquery.Client(project=project_id)
  query_job = client.query(query="""
               SELECT
			concat(word , ":",corpus) as RowKey,
			<dataset-id>.toMyProtoMessage(
			    STRUCT(
			      word, 
			      CAST(word_count AS BIGNUMERIC)
			    )
			  ) AS ProtoResult 
		FROM
				  `bigquery-public-data.samples.shakespeare`
		ORDER BY word_count DESC
		LIMIT 20
    """)
  rows = query_job.result()
  for row in rows:
    message = awesome_pb2.TestMessage()
    message.ParseFromString(row.get("ProtoResult"))
    print(
        "rowKey: {}, message: {}".format(row.get("RowKey"), message)
    )