Exporter des données sous forme de colonnes Protobuf

Ce document explique comment exporter des données BigQuery sous forme de colonnes Protocol Buffers (Protobuf) en utilisants des UDF (fonctions définies par l'utilisateur) BigQuery.

Quand utiliser les colonnes Protobuf ?

BigQuery propose un certain nombre de fonctions intégrées pour formater les données sélectionnées. Une option consiste à fusionner plusieurs valeurs de colonne en une seule valeur Protobuf, ce qui présente les avantages suivants :

  • Sûreté du typage d'objet.
  • Amélioration de la compression, du temps de transfert des données et des coûts par rapport à JSON.
  • Flexibilité, car la plupart des langages de programmation disposent de bibliothèques permettant de gérer le format Protobuf.
  • Coûts réduits lors de la lecture de plusieurs colonnes pour créer un seul objet.

Bien que d'autres types de colonnes puissent également assurer la sûreté du typage, les colonnes Protobuf fournissent un objet entièrement typé, ce qui peut réduire la quantité de travail à effectuer sur la couche d'application ou dans une autre partie du pipeline.

Toutefois, l'exportation de données BigQuery sous forme de colonnes Protobuf est limitée :

  • Les colonnes Protobuf ne sont pas bien indexées ni filtrées. La recherche par contenu dans des colonnes Protobuf peut s'avérer moins efficace.
  • Le tri des données au format Protobuf peut s'avérer difficile.

Si ces limites s'appliquent à votre workflow d'exportation, vous pouvez envisager d'autres méthodes d'exportation de données BigQuery :

  • Utilisez Dataflow pour exporter des données BigQuery aux formats de fichier Avro ou CSV.
  • Utilisez des requêtes programmées pour trier vos données BigQuery exportées par date ou heure, et pour planifier des exportations de manière récurrente.

Rôles requis

Pour obtenir les autorisations nécessaires pour exporter des données BigQuery en tant que colonnes Protobuf, demandez à votre administrateur de vous accorder les rôles IAM suivants sur votre projet :

Pour en savoir plus sur l'attribution de rôles, consultez la page Gérer l'accès aux projets, aux dossiers et aux organisations.

Vous pouvez également obtenir les autorisations requises via des rôles personnalisés ou d'autres rôles prédéfinis.

Créer une UDF (fonction définie par l'utilisateur)

La procédure suivante montre comment créer une fonction définie par l'utilisateur qui convertit un type de données BigQuery STRUCT en une colonne Protobuf :

  1. Dans une ligne de commande, clonez le dépôt bigquery-utils.git :

    $ git clone https://github.com/GoogleCloudPlatform/bigquery-utils.git
    
  2. Accéder au dossier d'exportation Protobuf

    $ cd bigquery-utils/tools/protobuf_export
    
  3. Ajoutez vos fichiers proto au dossier ./protos.

  4. Installez les packages nécessaires à partir du dépôt GitHub :

    $ npm install
    
  5. Regroupez le package à l'aide de webpack :

    $ npx webpack --config webpack.config.js --stats-error-details
    
  6. Recherchez le fichier pbwrapper.js dans votre dossier dist/ et copiez-le dans un bucket Cloud Storage.

  7. Créez une fonction définie par l'utilisateur qui crée une colonne Protobuf à partir de vos colonnes BigQuery existantes :

    CREATE FUNCTION
      DATASET_ID.toMyProtoMessage(input STRUCT<field_1 TYPE1, field_2 TYPE2>)
      RETURNS BYTES
        LANGUAGE js OPTIONS ( library=["gs://BUCKET_NAME/pbwrapper.js"]
    ) AS r"""
    let message = pbwrapper.setup("PROTO_PACKAGE.PROTO_MESSAGE")
    return pbwrapper.parse(message, input)
      """;
    

    Remplacez les éléments suivants :

    • DATASET_ID : ID de l'ensemble de données dans lequel vous stockez votre fonction
    • BUCKET_NAME : nom de votre bucket Cloud Storage
    • PROTO_PACKAGE : nom de package de votre fichier proto
    • PROTO_MESSAGE : type de message de votre fichier proto

Pour plus d'informations sur l'utilisation des packages dans le langage proto, consultez la page Packages.

Formater les colonnes sous forme de valeurs Protobuf

  • Après avoir créé la fonction définie par l'utilisateur, exécutez-la :

    SELECT
      DATASET_ID.toMyProtoMessage(STRUCT(COLUMN_TYPE1, COLUMN_TYPE2)) AS protoResult
    FROM
      `DATASET_NAME`
    LIMIT
      100;
    

    Remplacez les éléments suivants :

    • DATASET_ID : ID de l'ensemble de données dans lequel vous stockez votre fonction
    • DATASET_NAME : nom de votre ensemble de données, par exemple dataset_name.table_name.
    • COLUMN_TYPE1 : nom d'une colonne. Les colonnes peuvent inclure tout type de valeur scalaire ou non scalaire compatible, y compris ARRAY et STRUCT.
    • COLUMN_TYPE2 : nom d'une colonne. Les colonnes peuvent inclure tout type de valeur scalaire ou non scalaire compatible, y compris ARRAY et STRUCT.

Utiliser des valeurs Protobuf

Vos données BigQuery étant exportées au format Protobuf, vous pouvez maintenant utiliser vos données en tant que structure ou objet entièrement typé.

Les exemples de code suivants fournissent plusieurs exemples de traitement ou d'utilisation des données exportées :

Go

// package Main queries Google BigQuery.
package main

import (
	"context"
	"fmt"
	"io"
	"log"
	"os"

	"cloud.google.com/go/bigquery"
	"google.golang.org/api/iterator"
	"google.golang.org/Protobuf/proto"

	pb "path/to/proto/file_proto"
)

const (
	projectID = "your-project-id"
)

// Row contains returned row data from bigquery.
type Row struct {
	RowKey string `bigquery:"RowKey"`
	Proto  []byte `bigquery:"ProtoResult"`
}

func main() {
	ctx := context.Background()

	client, err := bigquery.NewClient(ctx, projectID)
	if err != nil {
		log.Fatalf("bigquery.NewClient: %v", err)
	}
	defer client.Close()

	rows, err := query(ctx, client)
	if err != nil {
		log.Fatal(err)
	}
	if err := printResults(os.Stdout, rows); err != nil {
		log.Fatal(err)
	}
}

// query returns a row iterator suitable for reading query results.
func query(ctx context.Context, client *bigquery.Client) (*bigquery.RowIterator, error) {

	query := client.Query(
		`SELECT 
  concat(word, ":", corpus) as RowKey, 
  <dataset-id>.toMyProtoMessage(
    STRUCT(
      word, 
      CAST(word_count AS BIGNUMERIC)
    )
  ) AS ProtoResult 
FROM 
  ` + "` bigquery - public - data.samples.shakespeare `" + ` 
LIMIT 
  100;
`)
	return query.Read(ctx)
}

// printResults prints results from a query.
func printResults(w io.Writer, iter *bigquery.RowIterator) error {
	for {
		var row Row
		err := iter.Next(&row)
		if err == iterator.Done {
			return nil
		}
		if err != nil {
			return fmt.Errorf("error iterating through results: %w", err)
		}
		message := &pb.TestMessage{}
		if err = proto.Unmarshal(row.Proto, message); err != nil {
			return err
		}
		fmt.Fprintf(w, "rowKey: %s, message: %v\n", row.RowKey, message)
	}
}

Java

package proto;

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.FieldValueList;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobId;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.TableResult;
import path.to.proto.TestMessage;
import java.util.UUID;

/** Queries Google BigQuery */
public final class Main {
  public static void main(String[] args) throws Exception {
    String projectId = "your-project-id";
    BigQuery bigquery = BigQueryOptions.newBuilder().setProjectId(projectId).build().getService();

    QueryJobConfiguration queryConfig =
        QueryJobConfiguration.newBuilder(
                " SELECT "
                    + "concat(word , \":\",corpus) as RowKey,"
                    + "<dataset-id>.toMyProtoMessage(STRUCT(word, "
                    + "CAST(word_count AS BIGNUMERIC))) AS ProtoResult "
                    + "FROM "
                    + "`bigquery-public-data.samples.shakespeare` "
                    + "ORDER BY word_count DESC "
                    + "LIMIT 20")
            .setUseLegacySql(false)
            .build();

    // Create a job ID so that we can safely retry.
    JobId jobId = JobId.of(UUID.randomUUID().toString());
    Job queryJob = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());

    // Wait for the query to complete.
    queryJob = queryJob.waitFor();

    // Check for errors
    if (queryJob == null) {
      throw new RuntimeException("Job no longer exists");
    } else if (queryJob.getStatus().getError() != null) {
      // You can also look at queryJob.getStatus().getExecutionErrors() for all
      // errors, not just the latest one.
      throw new RuntimeException(queryJob.getStatus().getError().toString());
    }

    // Get the results.
    TableResult result = queryJob.getQueryResults();

    // Print all pages of the results.
    for (FieldValueList row : result.iterateAll()) {
      String key = row.get("RowKey").getStringValue();
      byte[] message = row.get("ProtoResult").getBytesValue();
      TestMessage testMessage = TestMessage.parseFrom(message);
      System.out.printf("rowKey: %s, message: %s\n", key, testMessage);
    }
  }
}

Python

"""Queries Google BigQuery."""

from google.cloud import bigquery
from path.to.proto import awesome_pb2


def main():
  project_id = "your-project-id"
  client = bigquery.Client(project=project_id)
  query_job = client.query(query="""
               SELECT
			concat(word , ":",corpus) as RowKey,
			<dataset-id>.toMyProtoMessage(
			    STRUCT(
			      word, 
			      CAST(word_count AS BIGNUMERIC)
			    )
			  ) AS ProtoResult 
		FROM
				  `bigquery-public-data.samples.shakespeare`
		ORDER BY word_count DESC
		LIMIT 20
    """)
  rows = query_job.result()
  for row in rows:
    message = awesome_pb2.TestMessage()
    message.ParseFromString(row.get("ProtoResult"))
    print(
        "rowKey: {}, message: {}".format(row.get("RowKey"), message)
    )