Esporta i dati come colonne Protobuf

Questo documento descrive come esportare i dati BigQuery come protocollo Memorizza le colonne (Protobuf) utilizzando BigQuery definito dall'utente (UDF).

Quando utilizzare le colonne Protobuf

BigQuery offre una serie di funzioni integrate per formattare la selezione e i dati di Google Cloud. Un'opzione è unire più valori di colonna in un unico valore Protobuf, che presenta i seguenti vantaggi:

  • Sicurezza del tipo di oggetto.
  • Compressione, tempo di trasferimento dati e costi migliorati rispetto a JSON.
  • Flessibilità, poiché la maggior parte dei linguaggi di programmazione dispone di librerie per gestire Protobuf.
  • Meno overhead durante la lettura da più colonne e la creazione di un singolo oggetto.

Anche altri tipi di colonna possono garantire la sicurezza dei tipi, utilizzando le colonne Protobuf fornisce un oggetto completamente digitato, il che può ridurre la quantità di lavoro richiesta a livello di applicazione o in un'altra parte della pipeline.

Tuttavia, esistono limitazioni per esportare i dati di BigQuery Colonne protobuf:

  • Le colonne protobuf non sono ben indicizzate o filtrate. La ricerca in base ai contenuti le colonne Protobuf possono essere meno efficaci.
  • Ordinare i dati in formato Protobuf può essere difficile.

Se queste limitazioni si applicano al flusso di lavoro di esportazione, puoi prendere in considerazione altri per esportare i dati di BigQuery:

  • Utilizza Dataflow per l'esportazione Dati BigQuery in formati file Avro o CSV.
  • Utilizza le query pianificate per ordinare di BigQuery esportati per data o ora e per pianificare le esportazioni su base ricorrente.

Ruoli obbligatori

Per ottenere le autorizzazioni necessarie per esportare i dati BigQuery come colonne Protobuf, chiedi all'amministratore di concederti i seguenti ruoli IAM sul tuo progetto:

Per saperne di più sulla concessione dei ruoli, consulta Gestire l'accesso.

Potresti anche riuscire a ottenere le autorizzazioni richieste tramite la ruoli o altri ruoli predefiniti ruoli.

Crea una funzione definita dall'utente

La procedura seguente mostra come creare una funzione definita dall'utente che converte un tipo di dati STRUCT di BigQuery in una colonna Protobuf:

  1. In una riga di comando, clona il repository bigquery-utils.git:

    $ git clone https://github.com/GoogleCloudPlatform/bigquery-utils.git
    
  2. Vai alla cartella di esportazione Protobuf

    $ cd bigquery-utils/tools/protobuf_export
    
  3. Aggiungi i file proto alla cartella ./protos.

  4. Installa i pacchetti necessari dal repository GitHub:

    $ npm install
    
  5. Raggruppa il pacchetto utilizzando webpack:

    $ npx webpack --config webpack.config.js --stats-error-details
    
  6. Individua il file pbwrapper.js nella cartella dist/ e copialo in un nel bucket Cloud Storage.

  7. Crea una funzione definita dall'utente che crea una colonna Protobuf dal colonne BigQuery esistenti:

    CREATE FUNCTION
      DATASET_ID.toMyProtoMessage(input STRUCT<field_1 TYPE1, field_2 TYPE2>)
      RETURNS BYTES
        LANGUAGE js OPTIONS ( library=["gs://BUCKET_NAME/pbwrapper.js"]
    ) AS r"""
    let message = pbwrapper.setup("PROTO_PACKAGE.PROTO_MESSAGE")
    return pbwrapper.parse(message, input)
      """;
    

    Sostituisci quanto segue:

    • DATASET_ID: l'ID del set di dati in cui archivi la funzione
    • BUCKET_NAME: il nome del tuo bucket Cloud Storage
    • PROTO_PACKAGE: nome del pacchetto del file di protocollo
    • PROTO_MESSAGE: il tipo di messaggio per il file proto

Per ulteriori informazioni sull'uso dei pacchetti nel linguaggio proto, consulta la sezione Pacchetti.

Formatta le colonne come valori Protobuf

  • Dopo aver creato la funzione definita dall'utente, esegui la funzione:

    SELECT
      DATASET_ID.toMyProtoMessage(STRUCT(COLUMN_TYPE1, COLUMN_TYPE2)) AS protoResult
    FROM
      `DATASET_NAME`
    LIMIT
      100;
    

    Sostituisci quanto segue:

    • DATASET_ID: l'ID del set di dati in cui archivi la funzione
    • DATASET_NAME: il nome del set di dati, ad esempio dataset_name.table_name
    • COLUMN_TYPE1: il nome di una colonna. Le colonne possono includere qualsiasi tipo di valore scalabile supportato o non scalabile, tra cui ARRAY e STRUCT
    • COLUMN_TYPE2: il nome di una colonna. Le colonne possono includere qualsiasi tipo di valore scalabile supportato o non scalabile, tra cui ARRAY e STRUCT

Lavorare con i valori Protobuf

Con i tuoi dati BigQuery esportati nel formato Protobuf, puoi ora puoi lavorare con i tuoi dati come struct o oggetto completamente digitato.

I seguenti esempi di codice forniscono diversi esempi di modalità di elaborazione o lavorare con i dati esportati:

Vai

// package Main queries Google BigQuery.
package main

import (
	"context"
	"fmt"
	"io"
	"log"
	"os"

	"cloud.google.com/go/bigquery"
	"google.golang.org/api/iterator"
	"google.golang.org/Protobuf/proto"

	pb "path/to/proto/file_proto"
)

const (
	projectID = "your-project-id"
)

// Row contains returned row data from bigquery.
type Row struct {
	RowKey string `bigquery:"RowKey"`
	Proto  []byte `bigquery:"ProtoResult"`
}

func main() {
	ctx := context.Background()

	client, err := bigquery.NewClient(ctx, projectID)
	if err != nil {
		log.Fatalf("bigquery.NewClient: %v", err)
	}
	defer client.Close()

	rows, err := query(ctx, client)
	if err != nil {
		log.Fatal(err)
	}
	if err := printResults(os.Stdout, rows); err != nil {
		log.Fatal(err)
	}
}

// query returns a row iterator suitable for reading query results.
func query(ctx context.Context, client *bigquery.Client) (*bigquery.RowIterator, error) {

	query := client.Query(
		`SELECT 
  concat(word, ":", corpus) as RowKey, 
  <dataset-id>.toMyProtoMessage(
    STRUCT(
      word, 
      CAST(word_count AS BIGNUMERIC)
    )
  ) AS ProtoResult 
FROM 
  ` + "` bigquery - public - data.samples.shakespeare `" + ` 
LIMIT 
  100;
`)
	return query.Read(ctx)
}

// printResults prints results from a query.
func printResults(w io.Writer, iter *bigquery.RowIterator) error {
	for {
		var row Row
		err := iter.Next(&row)
		if err == iterator.Done {
			return nil
		}
		if err != nil {
			return fmt.Errorf("error iterating through results: %w", err)
		}
		message := &pb.TestMessage{}
		if err = proto.Unmarshal(row.Proto, message); err != nil {
			return err
		}
		fmt.Fprintf(w, "rowKey: %s, message: %v\n", row.RowKey, message)
	}
}

Java

package proto;

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.FieldValueList;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobId;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.TableResult;
import path.to.proto.TestMessage;
import java.util.UUID;

/** Queries Google BigQuery */
public final class Main {
  public static void main(String[] args) throws Exception {
    String projectId = "your-project-id";
    BigQuery bigquery = BigQueryOptions.newBuilder().setProjectId(projectId).build().getService();

    QueryJobConfiguration queryConfig =
        QueryJobConfiguration.newBuilder(
                " SELECT "
                    + "concat(word , \":\",corpus) as RowKey,"
                    + "<dataset-id>.toMyProtoMessage(STRUCT(word, "
                    + "CAST(word_count AS BIGNUMERIC))) AS ProtoResult "
                    + "FROM "
                    + "`bigquery-public-data.samples.shakespeare` "
                    + "ORDER BY word_count DESC "
                    + "LIMIT 20")
            .setUseLegacySql(false)
            .build();

    // Create a job ID so that we can safely retry.
    JobId jobId = JobId.of(UUID.randomUUID().toString());
    Job queryJob = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());

    // Wait for the query to complete.
    queryJob = queryJob.waitFor();

    // Check for errors
    if (queryJob == null) {
      throw new RuntimeException("Job no longer exists");
    } else if (queryJob.getStatus().getError() != null) {
      // You can also look at queryJob.getStatus().getExecutionErrors() for all
      // errors, not just the latest one.
      throw new RuntimeException(queryJob.getStatus().getError().toString());
    }

    // Get the results.
    TableResult result = queryJob.getQueryResults();

    // Print all pages of the results.
    for (FieldValueList row : result.iterateAll()) {
      String key = row.get("RowKey").getStringValue();
      byte[] message = row.get("ProtoResult").getBytesValue();
      TestMessage testMessage = TestMessage.parseFrom(message);
      System.out.printf("rowKey: %s, message: %s\n", key, testMessage);
    }
  }
}

Python

"""Queries Google BigQuery."""

from google.cloud import bigquery
from path.to.proto import awesome_pb2


def main():
  project_id = "your-project-id"
  client = bigquery.Client(project=project_id)
  query_job = client.query(query="""
               SELECT
			concat(word , ":",corpus) as RowKey,
			<dataset-id>.toMyProtoMessage(
			    STRUCT(
			      word, 
			      CAST(word_count AS BIGNUMERIC)
			    )
			  ) AS ProtoResult 
		FROM
				  `bigquery-public-data.samples.shakespeare`
		ORDER BY word_count DESC
		LIMIT 20
    """)
  rows = query_job.result()
  for row in rows:
    message = awesome_pb2.TestMessage()
    message.ParseFromString(row.get("ProtoResult"))
    print(
        "rowKey: {}, message: {}".format(row.get("RowKey"), message)
    )