Daten als Protobuf-Spalten exportieren

In diesem Dokument wird beschrieben, wie Sie BigQuery-Daten als Protokollpufferspalten (Protobuf) mit benutzerdefinierten BigQuery-Funktionen (UDFs) exportieren können.

Wann werden Protobuf-Spalten verwendet?

BigQuery bietet eine Reihe von integrierten Funktionen zum Formatieren ausgewählter Daten. Eine Möglichkeit besteht darin, mehrere Spaltenwerte in einem einzigen Protobuf-Wert zusammenzuführen. Dies hat folgende Vorteile:

  • Sicherheit des Objekttyps.
  • Verbesserte Komprimierung, Datenübertragungszeit und Kosten im Vergleich zu JSON.
  • Flexibilität, da die meisten Programmiersprachen Bibliotheken für die Verarbeitung von Protobuf haben.
  • Weniger Aufwand beim Lesen aus mehreren Spalten und Erstellen eines einzelnen Objekts.

Andere Spaltentypen können auch Typensicherheit bieten. Bei Verwendung von Protobuf-Spalten steht jedoch ein vollständig typisiertes Objekt zur Verfügung, wodurch der Arbeitsaufwand für die Anwendungsebene oder einen anderen Teil der Pipeline reduziert werden kann.

Es gibt jedoch Einschränkungen beim Exportieren von BigQuery-Daten als Protobuf-Spalten:

  • Protobuf-Spalten sind nicht gut indexiert oder gefiltert. Die Suche nach dem Inhalt der Protobuf-Spalten kann weniger effektiv sein.
  • Das Sortieren von Daten im Protobuf-Format kann schwierig sein.

Wenn diese Einschränkungen für Ihren Exportworkflow gelten, können Sie andere Methoden zum Exportieren von BigQuery-Daten in Betracht ziehen:

  • Mit Dataflow können Sie BigQuery-Daten entweder im Avro- oder CSV-Dateiformat exportieren.
  • Verwenden Sie geplante Abfragen, um Ihre exportierten BigQuery-Daten nach Datum oder Uhrzeit zu sortieren und Exporte regelmäßig zu planen.

Erforderliche Rollen

Bitten Sie Ihren Administrator, Ihnen die folgenden IAM-Rollen für Ihr Projekt zuzuweisen, um die Berechtigungen zu erhalten, die Sie zum Exportieren von BigQuery-Daten als Protobuf-Spalten benötigen:

Weitere Informationen zum Zuweisen von Rollen finden Sie unter Zugriff verwalten.

Sie können die erforderlichen Berechtigungen auch über benutzerdefinierte Rollen oder andere vordefinierte Rollen erhalten.

UDF erstellen

Im Folgenden wird gezeigt, wie Sie eine benutzerdefinierte Funktion erstellen, die den BigQuery-Datentyp STRUCT in eine Protobuf-Spalte konvertiert:

  1. Klonen Sie in einer Befehlszeile das Repository bigquery-utils.git:

    $ git clone https://github.com/GoogleCloudPlatform/bigquery-utils.git
    
  2. Rufen Sie den Protobuf-Exportordner auf.

    $ cd bigquery-utils/tools/protobuf_export
    
  3. Fügen Sie Ihre Proto-Dateien zum Ordner ./protos hinzu.

  4. Installieren Sie die erforderlichen Pakete aus dem GitHub-Repository:

    $ npm install
    
  5. Verpacken Sie das Paket mit Webpack:

    $ npx webpack --config webpack.config.js --stats-error-details
    
  6. Suchen Sie in Ihrem dist/-Ordner die Datei pbwrapper.js und kopieren Sie die Datei in einen Cloud Storage-Bucket.

  7. Erstellen Sie eine UDF, die aus Ihren vorhandenen BigQuery-Spalten eine Protobuf-Spalte erstellt:

    CREATE FUNCTION
      DATASET_ID.toMyProtoMessage(input STRUCT<field_1 TYPE1, field_2 TYPE2>)
      RETURNS BYTES
        LANGUAGE js OPTIONS ( library=["gs://BUCKET_NAME/pbwrapper.js"]
    ) AS r"""
    let message = pbwrapper.setup("PROTO_PACKAGE.PROTO_MESSAGE")
    return pbwrapper.parse(message, input)
      """;
    

    Ersetzen Sie Folgendes:

    • DATASET_ID: die ID des Datasets, in dem Sie die Funktion speichern
    • BUCKET_NAME: der Name Ihres Cloud Storage-Buckets
    • PROTO_PACKAGE: der Paketname für Ihre Proto-Datei
    • PROTO_MESSAGE: der Nachrichtentyp für Ihre Proto-Datei

Weitere Informationen zur Verwendung von Paketen in der Proto-Sprache finden Sie unter Pakete.

Spalten als Protobuf-Werte formatieren

  • Führen Sie nach dem Erstellen der UDF die Funktion aus:

    SELECT
      DATASET_ID.toMyProtoMessage(STRUCT(COLUMN_TYPE1, COLUMN_TYPE2)) AS protoResult
    FROM
      `DATASET_NAME`
    LIMIT
      100;
    

    Ersetzen Sie Folgendes:

    • DATASET_ID: die ID des Datasets, in dem Sie die Funktion speichern
    • DATASET_NAME: der Name Ihres Datasets, z. B. dataset_name.table_name
    • COLUMN_TYPE1: der Name einer Spalte. Spalten können alle unterstützten skalaren Werttypen oder nicht skalaren Typen enthalten, einschließlich ARRAY und STRUCT.
    • COLUMN_TYPE2: der Name einer Spalte. Spalten können alle unterstützten skalaren Werttypen oder nicht skalaren Typen enthalten, einschließlich ARRAY und STRUCT.

Mit Protobuf-Werten arbeiten

Wenn Ihre BigQuery-Daten im Protobuf-Format exportiert wurden, können Sie jetzt mit Ihren Daten als vollständig typisiertes Objekt oder Struct arbeiten.

Die folgenden Codebeispiele enthalten mehrere Beispiele für die Verarbeitung oder Verwendung der exportierten Daten:

Go

// package Main queries Google BigQuery.
package main

import (
	"context"
	"fmt"
	"io"
	"log"
	"os"

	"cloud.google.com/go/bigquery"
	"google.golang.org/api/iterator"
	"google.golang.org/Protobuf/proto"

	pb "path/to/proto/file_proto"
)

const (
	projectID = "your-project-id"
)

// Row contains returned row data from bigquery.
type Row struct {
	RowKey string `bigquery:"RowKey"`
	Proto  []byte `bigquery:"ProtoResult"`
}

func main() {
	ctx := context.Background()

	client, err := bigquery.NewClient(ctx, projectID)
	if err != nil {
		log.Fatalf("bigquery.NewClient: %v", err)
	}
	defer client.Close()

	rows, err := query(ctx, client)
	if err != nil {
		log.Fatal(err)
	}
	if err := printResults(os.Stdout, rows); err != nil {
		log.Fatal(err)
	}
}

// query returns a row iterator suitable for reading query results.
func query(ctx context.Context, client *bigquery.Client) (*bigquery.RowIterator, error) {

	query := client.Query(
		`SELECT 
  concat(word, ":", corpus) as RowKey, 
  <dataset-id>.toMyProtoMessage(
    STRUCT(
      word, 
      CAST(word_count AS BIGNUMERIC)
    )
  ) AS ProtoResult 
FROM 
  ` + "` bigquery - public - data.samples.shakespeare `" + ` 
LIMIT 
  100;
`)
	return query.Read(ctx)
}

// printResults prints results from a query.
func printResults(w io.Writer, iter *bigquery.RowIterator) error {
	for {
		var row Row
		err := iter.Next(&row)
		if err == iterator.Done {
			return nil
		}
		if err != nil {
			return fmt.Errorf("error iterating through results: %w", err)
		}
		message := &pb.TestMessage{}
		if err = proto.Unmarshal(row.Proto, message); err != nil {
			return err
		}
		fmt.Fprintf(w, "rowKey: %s, message: %v\n", row.RowKey, message)
	}
}

Java

package proto;

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.FieldValueList;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobId;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.TableResult;
import path.to.proto.TestMessage;
import java.util.UUID;

/** Queries Google BigQuery */
public final class Main {
  public static void main(String[] args) throws Exception {
    String projectId = "your-project-id";
    BigQuery bigquery = BigQueryOptions.newBuilder().setProjectId(projectId).build().getService();

    QueryJobConfiguration queryConfig =
        QueryJobConfiguration.newBuilder(
                " SELECT "
                    + "concat(word , \":\",corpus) as RowKey,"
                    + "<dataset-id>.toMyProtoMessage(STRUCT(word, "
                    + "CAST(word_count AS BIGNUMERIC))) AS ProtoResult "
                    + "FROM "
                    + "`bigquery-public-data.samples.shakespeare` "
                    + "ORDER BY word_count DESC "
                    + "LIMIT 20")
            .setUseLegacySql(false)
            .build();

    // Create a job ID so that we can safely retry.
    JobId jobId = JobId.of(UUID.randomUUID().toString());
    Job queryJob = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());

    // Wait for the query to complete.
    queryJob = queryJob.waitFor();

    // Check for errors
    if (queryJob == null) {
      throw new RuntimeException("Job no longer exists");
    } else if (queryJob.getStatus().getError() != null) {
      // You can also look at queryJob.getStatus().getExecutionErrors() for all
      // errors, not just the latest one.
      throw new RuntimeException(queryJob.getStatus().getError().toString());
    }

    // Get the results.
    TableResult result = queryJob.getQueryResults();

    // Print all pages of the results.
    for (FieldValueList row : result.iterateAll()) {
      String key = row.get("RowKey").getStringValue();
      byte[] message = row.get("ProtoResult").getBytesValue();
      TestMessage testMessage = TestMessage.parseFrom(message);
      System.out.printf("rowKey: %s, message: %s\n", key, testMessage);
    }
  }
}

Python

"""Queries Google BigQuery."""

from google.cloud import bigquery
from path.to.proto import awesome_pb2


def main():
  project_id = "your-project-id"
  client = bigquery.Client(project=project_id)
  query_job = client.query(query="""
               SELECT
			concat(word , ":",corpus) as RowKey,
			<dataset-id>.toMyProtoMessage(
			    STRUCT(
			      word, 
			      CAST(word_count AS BIGNUMERIC)
			    )
			  ) AS ProtoResult 
		FROM
				  `bigquery-public-data.samples.shakespeare`
		ORDER BY word_count DESC
		LIMIT 20
    """)
  rows = query_job.result()
  for row in rows:
    message = awesome_pb2.TestMessage()
    message.ParseFromString(row.get("ProtoResult"))
    print(
        "rowKey: {}, message: {}".format(row.get("RowKey"), message)
    )