Lire des données de BigQuery vers Dataflow

Ce document explique comment lire des données de BigQuery vers Dataflow à l'aide du connecteur d'E/S BigQuery d'Apache Beam.

Présentation

Le connecteur d'E/S BigQuery accepte deux options de lecture à partir de BigQuery :

  • Lectures de table directes. Cette option est la plus rapide, car elle utilise l'API BigQuery Storage Read.
  • Jobs d'exportation. Avec cette option, BigQuery exécute un job d'exportation qui écrit les données de la table dans Cloud Storage. Le connecteur lit ensuite les données exportées à partir de Cloud Storage. Cette option est moins efficace, car elle nécessite l'étape d'exportation.

Les jobs d'exportation sont l'option par défaut. Pour spécifier des lectures directes, appelez la méthode withMethod(Method.DIRECT_READ).

Le connecteur sérialise les données de la table dans un objet PCollection. Chaque élément présent dans PCollection représente une seule ligne de table. Le connecteur est compatible avec les méthodes de sérialisation suivantes :

Parallélisme

Le parallélisme dans ce connecteur dépend de la méthode de lecture :

  • Lectures directes : le connecteur d'E/S génère un nombre dynamique de flux, en fonction de la taille de la requête d'exportation. Il lit ces flux directement à partir de BigQuery en parallèle.

  • Jobs d'exportation : BigQuery détermine le nombre de fichiers à écrire dans Cloud Storage. Le nombre de fichiers dépend de la requête et du volume de données. Le connecteur d'E/S lit les fichiers exportés en parallèle.

Performances

Le tableau suivant présente les métriques de performances de diverses options de lecture d'E/S BigQuery. Les charges de travail ont été exécutées sur un nœud de calcul e2-standard2 à l'aide du SDK Apache Beam 2.49.0 pour Java. Elles n'ont pas utilisé l'exécuteur v2.

100 millions d'enregistrements | 1 ko | 1 colonne Débit (octets) Débit (éléments)
Lecture du stockage 120 Mbit/s 88 000 éléments par seconde
Exportation Avro 105 Mbit/s 78 000 éléments par seconde
Exportation JSON 110 Mbit/s 81 000 éléments par seconde

Ces métriques sont basées sur des pipelines de traitement par lot simples. Elles ont pour but de comparer les performances entre les connecteurs d'E/S et ne sont pas nécessairement représentatives des pipelines réels. Les performances des pipelines Dataflow sont complexes et dépendent du type de machine virtuelle, des données traitées, des performances des sources et des récepteurs externes, ainsi que du code utilisateur. Les métriques sont basées sur l'exécution du SDK Java et ne sont pas représentatives des caractéristiques de performances des SDK d'autres langages. Pour en savoir plus, consultez la page Performances d'E/S Beam.

Bonnes pratiques

  • En général, nous vous recommandons d'utiliser des lectures de tables directes (Method.DIRECT_READ). L'API Storage Read est plus adaptée aux pipelines de données que les jobs d'exportation, car elle ne nécessite pas d'étape intermédiaire d'exportation des données.

  • Si vous avez recours aux lectures directes, l'utilisation de l'API Storage Read vous est facturée. Consultez la section Tarifs de l'extraction de données sur la page des tarifs de BigQuery.

  • Aucuns frais supplémentaires ne sont facturés pour les jobs d'exportation. Ils sont toutefois soumis à des limites. Pour les transferts de données volumineux, où la rapidité est une priorité et le coût est ajustable, les lectures directes sont recommandées.

  • L'API Storage Read est soumise à des limites de quota. Utilisez les métriques Google Cloud pour surveiller votre utilisation du quota.

  • Lorsque vous utilisez l'API Storage Read, des erreurs d'expiration de bail et d'expiration de session peuvent s'afficher dans les journaux, par exemple :

    • DEADLINE_EXCEEDED
    • Server Unresponsive
    • StatusCode.FAILED_PRECONDITION details = "there was an error operating on 'projects/<projectID>/locations/<location>/sessions/<sessionID>/streams/<streamID>': session`

    Ces erreurs peuvent se produire lorsqu'une opération prend plus de temps que le délai avant expiration, généralement dans les pipelines qui s'exécutent pendant plus de six heures. Pour résoudre ce problème, passez aux exportations de fichiers.

Examples

Les exemples de code de cette section utilisent des lectures directes de table.

Pour utiliser un job d'exportation, omettez l'appel à withMethod ou spécifiez Method.EXPORT. Définissez ensuite l'option de pipeline --tempLocation afin de spécifier un bucket Cloud Storage pour les fichiers exportés.

Ces exemples de code partent du principe que la table source contient les colonnes suivantes :

  • name (chaîne)
  • age (entier)

Spécifié en tant que fichier de schéma JSON :

[
  {"name":"user_name","type":"STRING","mode":"REQUIRED"},
  {"name":"age","type":"INTEGER","mode":"REQUIRED"}
]

Lire des enregistrements au format Avro

Pour lire des données BigQuery dans des enregistrements au format Avro, utilisez la méthode read(SerializableFunction). Cette méthode utilise une fonction définie par l'application qui analyse les objets SchemaAndRecord et renvoie un type de données personnalisé. La sortie du connecteur est une PCollection de votre type de données personnalisé.

Le code suivant lit un PCollection<MyData> à partir d'une table BigQuery, où MyData est une classe définie par une application.

Java

Pour vous authentifier auprès de Dataflow, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead;
import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.TypeDescriptor;

public class BigQueryReadAvro {

  // A custom datatype to hold a record from the source table.
  @DefaultCoder(AvroCoder.class)
  public static class MyData {
    public String name;
    public Long age;

    // Function to convert Avro records to MyData instances.
    public static class FromSchemaAndRecord
            implements SerializableFunction<SchemaAndRecord, MyData> {
      @Override public MyData apply(SchemaAndRecord elem) {
        MyData data = new MyData();
        GenericRecord record = elem.getRecord();
        data.name = ((Utf8) record.get("user_name")).toString();
        data.age = (Long) record.get("age");
        return data;
      }
    }
  }

  public static void main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Read table data into Avro records, using an application-defined parsing function.
        .apply(BigQueryIO.read(new MyData.FromSchemaAndRecord())
            .from(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withMethod(TypedRead.Method.DIRECT_READ))
        // The output from the previous step is a PCollection<MyData>.
        .apply(MapElements
            .into(TypeDescriptor.of(MyData.class))
            .via((MyData x) -> {
              System.out.printf("Name: %s, Age: %d%n", x.name, x.age);
              return x;
            }));
    pipeline.run().waitUntilFinish();
  }
}

La méthode read utilise une interface SerializableFunction<SchemaAndRecord, T>, qui définit une fonction permettant de convertir des enregistrements Avro en classe de données personnalisée. Dans l'exemple de code précédent, la méthode MyData.apply implémente cette fonction de conversion. L'exemple de fonction analyse les champs name et age de l'enregistrement Avro et renvoie une instance MyData.

Pour spécifier la table BigQuery à lire, appelez la méthode from, comme indiqué dans l'exemple précédent. Pour en savoir plus, consultez la section Noms de tables dans la documentation du connecteur d'E/S BigQuery.

Lire des objets TableRow

La méthode readTableRows lit les données BigQuery dans une PCollection d'objets TableRow. Chaque TableRow est un mappage de paires clé/valeur qui contient une seule ligne de données de table. Spécifiez la table BigQuery à lire en appelant la méthode from.

Le code suivant lit un PCollection<TableRows> à partir d'une table BigQuery.

Java

Pour vous authentifier auprès de Dataflow, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptor;

public class BiqQueryReadTableRows {
  public static void main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Read table data into TableRow objects.
        .apply(BigQueryIO.readTableRows()
            .from(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withMethod(Method.DIRECT_READ)
        )
        // The output from the previous step is a PCollection<TableRow>.
        .apply(MapElements
            .into(TypeDescriptor.of(TableRow.class))
            // Use TableRow to access individual fields in the row.
            .via((TableRow row) -> {
              var name = (String) row.get("user_name");
              var age = (String) row.get("age");
              System.out.printf("Name: %s, Age: %s%n", name, age);
              return row;
            }));
    pipeline.run().waitUntilFinish();
  }
}

Cet exemple montre également comment accéder aux valeurs à partir du dictionnaire TableRow. Les valeurs entières sont encodées sous forme de chaînes afin de respecter le format JSON exporté par BigQuery.

Projection et filtrage de colonnes

Lorsque vous utilisez des lectures directes (Method.DIRECT_READ), vous pouvez rendre les opérations de lecture plus efficaces en réduisant la quantité de données lues depuis BigQuery et envoyées sur le réseau.

  • Projection de colonne : appelez withSelectedFields pour lire un sous-ensemble de colonnes de la table. Cela permet des lectures efficaces lorsque les tables contiennent de nombreuses colonnes.
  • Filtrage de ligne : appelez withRowRestriction pour spécifier un prédicat qui filtre les données côté serveur.

Les prédicats de filtre doivent être déterministes, et l'agrégation n'est pas disponible.

L'exemple suivant projette les colonnes "user_name" et "age", et filtre les lignes qui ne correspondent pas au prédicat "age > 18".

Java

Pour vous authentifier auprès de Dataflow, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

import com.google.api.services.bigquery.model.TableRow;
import java.util.Arrays;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptor;

public class BigQueryReadWithProjectionAndFiltering {
  public static void main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        .apply(BigQueryIO.readTableRows()
            // Read rows from a specified table.
            .from(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withMethod(TypedRead.Method.DIRECT_READ)
            .withSelectedFields(Arrays.asList("user_name", "age"))
            .withRowRestriction("age > 18")
        )
        // The output from the previous step is a PCollection<TableRow>.
        .apply(MapElements
            .into(TypeDescriptor.of(TableRow.class))
            // Use TableRow to access individual fields in the row.
            .via((TableRow row) -> {
              var name = (String) row.get("user_name");
              var age = row.get("age");
              System.out.printf("Name: %s, Age: %s%n", name, age);
              return row;
            }));
    pipeline.run().waitUntilFinish();
  }
}

Lire les données d'un résultat de requête

Les exemples précédents montrent comment lire les lignes d'une table. Vous pouvez également lire les résultats d'une requête SQL en appelant fromQuery. Cette approche déplace une partie du travail de calcul dans BigQuery. Vous pouvez également utiliser cette méthode pour lire à partir d'une vue BigQuery ou une vue matérialisée en exécutant une requête sur la vue.

L'exemple suivant exécute une requête sur un ensemble de données public BigQuery et lit les résultats. Une fois le pipeline exécuté, vous pouvez voir le job de requête dans votre historique des jobs BigQuery.

Java

Pour vous authentifier auprès de Dataflow, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptor;

public class BigQueryReadFromQuery {
  public static void main(String[] args) {
    // The SQL query to run inside BigQuery.
    final String queryString =
        "SELECT repo_name as repo, COUNT(*) as count "
            + "FROM `bigquery-public-data.github_repos.sample_commits` "
            + "GROUP BY repo_name";

    // Parse the pipeline options passed into the application.
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation().create();

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Read the query results into TableRow objects.
        .apply(BigQueryIO.readTableRows()
            .fromQuery(queryString)
            .usingStandardSql()
            .withMethod(TypedRead.Method.DIRECT_READ))
        // The output from the previous step is a PCollection<TableRow>.
        .apply(MapElements
            .into(TypeDescriptor.of(TableRow.class))
            .via((TableRow row) -> {
              System.out.printf("Repo: %s, commits: %s%n", row.get("repo"), row.get("count"));
              return row;
            }));
    pipeline.run().waitUntilFinish();
  }
}

Étape suivante