Lire des données de BigQuery vers Dataflow

Ce document explique comment lire des données de BigQuery vers Dataflow.

Présentation

Dans la plupart des cas, envisagez d'utiliser les E/S gérées pour lire à partir de BigQuery. Les E/S gérées offrent des fonctionnalités telles que les mises à niveau automatiques et une API de configuration cohérente. Lors de la lecture à partir de BigQuery, les E/S gérées effectuent des lectures de table directes, ce qui offre les meilleures performances de lecture.

Si vous avez besoin d'un réglage des performances plus avancé, envisagez d'utiliser le connecteur BigQueryIO. Le connecteur BigQueryIO accepte à la fois les lectures de table directes et la lecture à partir d'ordres d'exportation BigQuery. Il offre également un contrôle plus précis sur la désérialisation des enregistrements de table. Pour en savoir plus, consultez la section Utiliser le connecteur BigQueryIO de ce document.

Projection et filtrage de colonnes

Pour réduire le volume de données que votre pipeline lit à partir de BigQuery, vous pouvez utiliser les techniques suivantes:

  • La projection de colonnes spécifie un sous-ensemble de colonnes à lire à partir de la table. Utilisez la projection de colonnes lorsque votre table comporte un grand nombre de colonnes et que vous n'avez besoin de lire qu'un sous-ensemble d'entre elles.
  • Le filtrage de ligne spécifie un prédicat à appliquer à la table. L'opération de lecture BigQuery ne renvoie que les lignes correspondant au filtre, ce qui peut réduire la quantité totale de données ingérées par le pipeline.

L'exemple suivant lit les colonnes "user_name" et "age" d'une table, et filtre les lignes qui ne correspondent pas au prédicat "age > 18". Cet exemple utilise les E/S gérées.

Java

Pour vous authentifier auprès de Dataflow, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

import com.google.common.collect.ImmutableMap;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptors;

public class BigQueryReadWithProjectionAndFiltering {
  public static void main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    String tableSpec = String.format("%s:%s.%s",
        options.getProjectId(),
        options.getDatasetName(),
        options.getTableName());

    ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
        .put("table", tableSpec)
        .put("row_restriction", "age > 18")
        .put("fields", List.of("user_name", "age"))
        .build();

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        .apply(Managed.read(Managed.BIGQUERY).withConfig(config)).getSinglePCollection()
        .apply(MapElements
            .into(TypeDescriptors.strings())
            // Access individual fields in the row.
            .via((Row row) -> {
              String output = String.format("Name: %s, Age: %s%n",
                  row.getString("user_name"),
                  row.getInt64("age"));
              System.out.println(output);
              return output;
            }));
    pipeline.run().waitUntilFinish();
  }
}

Lire les données d'un résultat de requête

L'exemple suivant utilise l'E/S gérée pour lire le résultat d'une requête SQL. Il exécute une requête sur un ensemble de données public BigQuery. Vous pouvez également utiliser des requêtes SQL pour lire à partir d'une vue BigQuery ou d'une vue matérialisée.

Java

Pour vous authentifier auprès de Dataflow, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

import com.google.common.collect.ImmutableMap;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptors;

public class BigQueryReadFromQuery {
  public static void main(String[] args) {
    // The SQL query to run inside BigQuery.
    final String queryString =
        "SELECT repo_name as repo, COUNT(*) as count "
            + "FROM `bigquery-public-data.github_repos.sample_commits` "
            + "GROUP BY repo_name";

    // Parse the pipeline options passed into the application.
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation().create();

    ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
        .put("query", queryString)
        .build();

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        .apply(Managed.read(Managed.BIGQUERY).withConfig(config)).getSinglePCollection()
        .apply(MapElements
            .into(TypeDescriptors.strings())
            // Access individual fields in the row.
            .via((Row row) -> {
              String output = String.format("Repo: %s, commits: %d%n",
                  row.getString("repo"),
                  row.getInt64("count"));
              System.out.println(output);
              return output;
            }));
    pipeline.run().waitUntilFinish();
  }
}

Utiliser le connecteur BigQueryIO

Le connecteur BigQueryIO est compatible avec les méthodes de sérialisation suivantes:

Le connecteur accepte deux options de lecture des données:

  • Tâche d'exportation Par défaut, le connecteur BigQueryIO exécute un job d'exportation BigQuery qui écrit les données de la table dans Cloud Storage. Le connecteur lit ensuite les données à partir de Cloud Storage.
  • Lectures de table directes Cette option est plus rapide que les jobs d'exportation, car elle utilise l'API BigQuery Storage Read et ignore l'étape d'exportation. Pour utiliser la lecture des tables directe, appelez withMethod(Method.DIRECT_READ) lorsque vous créez le pipeline.

Lorsque vous choisissez l'option à utiliser, tenez compte des points suivants:

  • En règle générale, nous vous recommandons d'utiliser des lectures de table directes. L'API Storage Read est plus adaptée aux pipelines de données que les jobs d'exportation, car elle ne nécessite pas d'étape intermédiaire d'exportation des données.

  • Si vous avez recours aux lectures directes, l'utilisation de l'API Storage Read vous est facturée. Consultez la section Tarifs de l'extraction de données sur la page des tarifs de BigQuery.

  • Aucuns frais supplémentaires ne sont facturés pour les jobs d'exportation. Ils sont toutefois soumis à des limites. Pour les transferts de données volumineux, où la rapidité est une priorité et le coût est ajustable, les lectures directes sont recommandées.

  • L'API Storage Read est soumise à des limites de quota. Utilisez les métriquesGoogle Cloud pour surveiller votre utilisation du quota.

  • Si vous utilisez des jobs d'exportation, définissez l'option de pipeline --tempLocation afin de spécifier un bucket Cloud Storage pour les fichiers exportés.

  • Lorsque vous utilisez l'API Storage Read, des erreurs d'expiration de bail et d'expiration de session peuvent s'afficher dans les journaux, par exemple :

    • DEADLINE_EXCEEDED
    • Server Unresponsive
    • StatusCode.FAILED_PRECONDITION details = "there was an error operating on 'projects/<projectID>/locations/<location>/sessions/<sessionID>/streams/<streamID>': session`

    Ces erreurs peuvent se produire lorsqu'une opération prend plus de temps que le délai avant expiration, généralement dans les pipelines qui s'exécutent pendant plus de six heures. Pour résoudre ce problème, passez aux exportations de fichiers.

  • Le degré de parallélisme dépend de la méthode de lecture:

    • Lectures directes : le connecteur d'E/S génère un nombre dynamique de flux, en fonction de la taille de la requête d'exportation. Il lit ces flux directement à partir de BigQuery en parallèle.

    • Jobs d'exportation : BigQuery détermine le nombre de fichiers à écrire dans Cloud Storage. Le nombre de fichiers dépend de la requête et du volume de données. Le connecteur d'E/S lit les fichiers exportés en parallèle.

Le tableau suivant présente les métriques de performances de diverses options de lecture d'E/S BigQuery. Les charges de travail ont été exécutées sur un nœud de calcul e2-standard2 à l'aide du SDK Apache Beam 2.49.0 pour Java. Elles n'ont pas utilisé l'exécuteur v2.

100 millions d'enregistrements | 1 ko | 1 colonne Débit (octets) Débit (éléments)
Lecture du stockage 120 Mbit/s 88 000 éléments par seconde
Exportation Avro 105 Mbit/s 78 000 éléments par seconde
Exportation JSON 110 Mbit/s 81 000 éléments par seconde

Ces métriques sont basées sur des pipelines de traitement par lot simples. Elles ont pour but de comparer les performances entre les connecteurs d'E/S et ne sont pas nécessairement représentatives des pipelines réels. Les performances des pipelines Dataflow sont complexes et dépendent du type de machine virtuelle, des données traitées, des performances des sources et des récepteurs externes, ainsi que du code utilisateur. Les métriques sont basées sur l'exécution du SDK Java et ne sont pas représentatives des caractéristiques de performances des SDK d'autres langages. Pour en savoir plus, consultez la page Performances d'E/S Beam.

Examples

Les exemples de code suivants utilisent le connecteur BigQueryIO avec des lectures directes de table. Pour utiliser un job d'exportation, omettez l'appel à withMethod.

Lire des enregistrements au format Avro

Cet exemple montre comment utiliser le connecteur BigQueryIO pour lire des enregistrements au format Avro.

Pour lire des données BigQuery dans des enregistrements au format Avro, utilisez la méthode read(SerializableFunction). Cette méthode utilise une fonction définie par l'application qui analyse les objets SchemaAndRecord et renvoie un type de données personnalisé. La sortie du connecteur est une PCollection de votre type de données personnalisé.

Le code suivant lit un PCollection<MyData> à partir d'une table BigQuery, où MyData est une classe définie par une application.

Java

Pour vous authentifier auprès de Dataflow, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead;
import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.TypeDescriptor;

public class BigQueryReadAvro {

  // A custom datatype to hold a record from the source table.
  @DefaultCoder(AvroCoder.class)
  public static class MyData {
    public String name;
    public Long age;

    // Function to convert Avro records to MyData instances.
    public static class FromSchemaAndRecord
            implements SerializableFunction<SchemaAndRecord, MyData> {
      @Override public MyData apply(SchemaAndRecord elem) {
        MyData data = new MyData();
        GenericRecord record = elem.getRecord();
        data.name = ((Utf8) record.get("user_name")).toString();
        data.age = (Long) record.get("age");
        return data;
      }
    }
  }

  public static void main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Read table data into Avro records, using an application-defined parsing function.
        .apply(BigQueryIO.read(new MyData.FromSchemaAndRecord())
            .from(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withMethod(TypedRead.Method.DIRECT_READ))
        // The output from the previous step is a PCollection<MyData>.
        .apply(MapElements
            .into(TypeDescriptor.of(MyData.class))
            .via((MyData x) -> {
              System.out.printf("Name: %s, Age: %d%n", x.name, x.age);
              return x;
            }));
    pipeline.run().waitUntilFinish();
  }
}

La méthode read utilise une interface SerializableFunction<SchemaAndRecord, T>, qui définit une fonction permettant de convertir des enregistrements Avro en classe de données personnalisée. Dans l'exemple de code précédent, la méthode MyData.apply implémente cette fonction de conversion. L'exemple de fonction analyse les champs name et age de l'enregistrement Avro et renvoie une instance MyData.

Pour spécifier la table BigQuery à lire, appelez la méthode from, comme indiqué dans l'exemple précédent. Pour en savoir plus, consultez la section Noms de tables dans la documentation du connecteur d'E/S BigQuery.

Lire des objets TableRow

Cet exemple montre comment utiliser le connecteur BigQueryIO pour lire des objets TableRow.

La méthode readTableRows lit les données BigQuery dans une PCollection d'objets TableRow. Chaque TableRow est un mappage de paires clé/valeur qui contient une seule ligne de données de table. Spécifiez la table BigQuery à lire en appelant la méthode from.

Le code suivant lit un PCollection<TableRows> à partir d'une table BigQuery.

Java

Pour vous authentifier auprès de Dataflow, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptor;

public class BiqQueryReadTableRows {
  public static void main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Read table data into TableRow objects.
        .apply(BigQueryIO.readTableRows()
            .from(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withMethod(Method.DIRECT_READ)
        )
        // The output from the previous step is a PCollection<TableRow>.
        .apply(MapElements
            .into(TypeDescriptor.of(TableRow.class))
            // Use TableRow to access individual fields in the row.
            .via((TableRow row) -> {
              var name = (String) row.get("user_name");
              var age = (String) row.get("age");
              System.out.printf("Name: %s, Age: %s%n", name, age);
              return row;
            }));
    pipeline.run().waitUntilFinish();
  }
}

Cet exemple montre également comment accéder aux valeurs à partir du dictionnaire TableRow. Les valeurs entières sont encodées sous forme de chaînes afin de respecter le format JSON exporté par BigQuery.

Étape suivante