Écrire des données depuis Dataflow vers BigQuery

Ce document explique comment écrire des données depuis Dataflow vers BigQuery à l'aide du connecteur d'E/S BigQuery d'Apache Beam.

Le connecteur d'E/S BigQuery est disponible dans le SDK Apache Beam. Nous vous recommandons d'utiliser la dernière version du SDK. Pour en savoir plus, consultez la page SDK Apache Beam 2.x.

Python est également compatible avec Python.

Présentation

Le connecteur d'E/S BigQuery accepte les méthodes suivantes pour écrire dans BigQuery :

  • STORAGE_WRITE_API Dans ce mode, le connecteur effectue des écritures directes sur l'espace de stockage BigQuery, à l'aide de l'API BigQuery Storage Write. L'API Storage Write combine l'ingestion par flux et le chargement par lot dans une seule API hautes performances. Ce mode garantit une sémantique de type "exactement une fois".
  • STORAGE_API_AT_LEAST_ONCE Ce mode utilise également l'API Storage Write, mais fournit une sémantique de type "au moins une fois". Ce mode permet de réduire la latence pour la plupart des pipelines. Cependant, les écritures en double sont possibles.
  • FILE_LOADS Dans ce mode, le connecteur écrit les données d'entrée dans les fichiers de préproduction dans Cloud Storage. Il exécute ensuite une tâche de chargement BigQuery pour charger les données dans BigQuery. Ce mode est utilisé par défaut pour les PCollections limitées, que l'on retrouve le plus souvent dans les pipelines de traitement par lot.
  • STREAMING_INSERTS Dans ce mode, le connecteur utilise l'ancienne API de streaming. Ce mode est utilisé par défaut pour les PCollections illimitées, mais il n'est pas recommandé pour les nouveaux projets.

Lorsque vous choisissez une méthode d'écriture, tenez compte des points suivants :

  • Pour les tâches de traitement par flux, envisagez d'utiliser STORAGE_WRITE_API ou STORAGE_API_AT_LEAST_ONCE, car ces modes écrivent directement dans l'espace de stockage BigQuery, sans utiliser de fichiers de préproduction intermédiaires.
  • Si vous exécutez le pipeline en mode de traitement par flux de type "au moins une fois", définissez le mode d'écriture sur STORAGE_API_AT_LEAST_ONCE. Ce paramètre est plus efficace et correspond à la sémantique du mode de traitement par flux "au moins une fois".
  • Les chargements de fichiers et l'API Storage Write ont des quotas et limites différents.
  • Les tâches de chargement utilisent le pool d'emplacements BigQuery partagé ou des emplacements réservés. Pour utiliser des emplacements réservés, exécutez la tâche de chargement dans un projet avec une attribution de réservation de type PIPELINE. Les tâches de chargement sont gratuites si vous utilisez le pool d'emplacements BigQuery partagé. Cependant, BigQuery ne garantit pas la capacité disponible du pool partagé. Pour en savoir plus, consultez la page Présentation des réservations.

Parallélisme

  • Pour FILE_LOADS et STORAGE_WRITE_API dans les pipelines de traitement par flux, le connecteur segmente les données en un certain nombre de fichiers ou de flux. En général, nous vous recommandons d'appeler withAutoSharding pour activer la segmentation automatique.

  • Pour FILE_LOADS dans les pipelines de traitement par lot, le connecteur écrit les données dans des fichiers partitionnés, qui sont ensuite chargés dans BigQuery en parallèle.

  • Pour STORAGE_WRITE_API dans les pipelines de traitement par lot, chaque nœud de calcul crée un ou plusieurs flux à écrire dans BigQuery, déterminé par le nombre total de segments.

  • Pour STORAGE_API_AT_LEAST_ONCE, il existe un seul flux d'écriture par défaut. Plusieurs nœuds de calcul s'ajoutent à ce flux.

Performances

Le tableau suivant présente les métriques de performances de diverses options de lecture d'E/S BigQuery. Les charges de travail ont été exécutées sur un nœud de calcul e2-standard2 à l'aide du SDK Apache Beam 2.49.0 pour Java. Elles n'ont pas utilisé l'exécuteur v2.

100 millions d'enregistrements | 1 ko | 1 colonne Débit (octets) Débit (éléments)
Écriture du stockage 55 Mbit/s 54 000 éléments par seconde
Charge Avro 78 Mbit/s 77 000 éléments par seconde
Charge Json 54 Mbit/s 53 000 éléments par seconde

Ces métriques sont basées sur des pipelines de traitement par lot simples. Elles ont pour but de comparer les performances entre les connecteurs d'E/S et ne sont pas nécessairement représentatives des pipelines réels. Les performances des pipelines Dataflow sont complexes et dépendent du type de machine virtuelle, des données traitées, des performances des sources et des récepteurs externes, ainsi que du code utilisateur. Les métriques sont basées sur l'exécution du SDK Java et ne sont pas représentatives des caractéristiques de performances des SDK d'autres langages. Pour en savoir plus, consultez la page Performances d'E/S Beam.

Bonnes pratiques

Cette section décrit les bonnes pratiques pour écrire dans BigQuery à partir de Dataflow.

Éléments généraux à prendre en compte

  • L'API Storage Write est soumise à des limites de quota. Le connecteur gère ces limites pour la plupart des pipelines. Toutefois, certains scénarios peuvent épuiser les flux d'API Storage Write disponibles. Par exemple, ce problème peut survenir dans un pipeline qui utilise la segmentation automatique et l'autoscaling avec un grand nombre de destinations, en particulier dans les tâches de longue durée avec des charges de travail très variables. Si ce problème se produit, envisagez d'utiliser STORAGE_WRITE_API_AT_LEAST_ONCE pour éviter le problème.

  • Utilisez les métriques Google Cloud pour surveiller l'utilisation du quota de l'API Storage Write.

  • Lorsque vous utilisez des chargements de fichiers, Avro est généralement plus performant que JSON. Pour utiliser Avro, appelez withAvroFormatFunction.

  • Par défaut, les tâches de chargement s'exécutent dans le même projet que la tâche Dataflow. Pour spécifier un autre projet, appelez withLoadJobProjectId.

  • Lorsque vous utilisez le SDK Java, envisagez de créer une classe qui représente le schéma de la table BigQuery. Appelez ensuite useBeamSchema dans votre pipeline pour permettre la conversion automatique entre les types Apache Beam Row et BigQuery TableRow. Pour obtenir un exemple de classe de schéma, consultez la section ExampleModel.java.

  • Si vous chargez des tables avec des schémas complexes contenant des milliers de champs, envisagez d'appeler withMaxBytesPerPartition pour définir une taille maximale inférieure pour chaque tâche de chargement.

Pipelines de traitement par flux

Les recommandations suivantes s'appliquent aux pipelines de traitement par flux.

  • Pour les pipelines de traitement par flux, nous vous recommandons d'utiliser l'API Storage Write (STORAGE_WRITE_API ou STORAGE_API_AT_LEAST_ONCE).

  • Un pipeline de traitement par flux peut utiliser des chargements de fichiers, mais cette approche présente des inconvénients :

    • Il nécessite un fenêtrage pour écrire les fichiers. Vous ne pouvez pas utiliser la fenêtre globale.
    • BigQuery charge les fichiers de la façon la plus optimale possible lorsque vous utilisez le pool d'emplacements partagés. Il peut s'écouler un certain temps entre le moment où un enregistrement est écrit et celui où il est disponible dans BigQuery.
    • Si une tâche de chargement échoue (par exemple en raison de données incorrectes ou d'une incompatibilité de schéma), l'ensemble du pipeline échoue.
  • Pensez à utiliser STORAGE_WRITE_API_AT_LEAST_ONCE lorsque cela est possible. Cela peut entraîner l'écriture d'enregistrements en double dans BigQuery, mais reste moins coûteux et plus évolutif que STORAGE_WRITE_API.

  • En général, évitez d'utiliser STREAMING_INSERTS. Les insertions en flux continu sont plus coûteuses que l'API Storage Write et sont moins performantes.

  • La segmentation des données peut améliorer les performances des pipelines de traitement par flux. Pour la plupart des pipelines, la segmentation automatique est un bon point de départ. Vous pouvez néanmoins ajuster la segmentation comme suit :

  • Si vous utilisez des insertions en flux continu, nous vous recommandons de définir retryTransientErrors comme stratégie de nouvelle tentative.

Pipelines par lots

Les recommandations suivantes s'appliquent aux pipelines de traitement par lot.

  • Pour la plupart des pipelines par lots volumineux, nous vous recommandons d'essayer d'abord FILE_LOADS. Un pipeline par lots peut utiliser STORAGE_WRITE_API, mais il est susceptible de dépasser les limites de quota à grande échelle (plus de 1 000 processeurs virtuels) ou si des pipelines simultanés sont en cours d'exécution. Apache Beam ne limite pas le nombre maximal de flux d'écriture pour les tâches par lot STORAGE_WRITE_API. Par conséquent, la tâche atteint les limites de l'API BigQuery Storage.

  • Lorsque vous utilisez FILE_LOADS, vous pouvez épuiser le pool d'emplacements BigQuery partagé ou votre pool d'emplacements réservés. Si vous rencontrez ce type de défaillance, essayez les approches suivantes:

    • Réduisez le nombre maximal ou la taille des nœuds de calcul pour la tâche.
    • Acheter d'autres emplacements réservés.
    • Envisagez d'utiliser STORAGE_WRITE_API.
  • Les pipelines de petite à moyenne taille (moins de 1 000 processeurs virtuels) peuvent tirer parti de l'utilisation de STORAGE_WRITE_API. Pour ces tâches plus petites, envisagez d'utiliser STORAGE_WRITE_API si vous souhaitez une file d'attente de lettres mortes ou lorsque le pool d'emplacements partagés FILE_LOADS n'est pas suffisant.

  • Si vous pouvez tolérer des données en double, envisagez d'utiliser STORAGE_WRITE_API_AT_LEAST_ONCE. Ce mode peut entraîner l'écriture d'enregistrements en double dans BigQuery, mais peut s'avérer moins coûteux que l'option STORAGE_WRITE_API.

  • Les différents modes d'écriture peuvent fonctionner différemment en fonction des caractéristiques de votre pipeline. Faites des tests pour trouver le mode d'écriture le mieux adapté à votre charge de travail.

Gérer les erreurs au niveau des lignes

Cette section explique comment gérer les erreurs qui peuvent se produire au niveau des lignes, par exemple en raison d'une saisie incorrecte des données d'entrée ou d'incohérences de schéma.

Pour l'API Storage Write, toutes les lignes qui ne peuvent pas être écrites sont placées dans un PCollection distinct. Pour obtenir cette collection, appelez getFailedStorageApiInserts sur l'objet WriteResult. Pour obtenir un exemple de cette approche, consultez la section Insérer des données en flux continu dans BigQuery.

Il est recommandé d'envoyer les erreurs à une file d'attente ou une table de lettres mortes pour un traitement ultérieur. Pour en savoir plus sur ce modèle, consultez la section Modèle de lettre morte BigQueryIO.

Pour FILE_LOADS, si une erreur se produit lors du chargement des données, la tâche de chargement échoue et le pipeline génère une exception d'exécution. Vous pouvez afficher l'erreur dans les journaux Dataflow ou consulter l'historique des tâches BigQuery. Le connecteur d'E/S ne renvoie pas d'informations sur les lignes individuelles ayant échoué.

Pour en savoir plus sur le dépannage des erreurs, consultez la section Erreurs de connecteur BigQuery.

Examples

Les exemples suivants montrent comment utiliser Dataflow pour écrire dans BigQuery.

Écrire dans une table existante

L'exemple suivant crée un pipeline par lots qui écrit un PCollection<MyData> dans BigQuery, où MyData est un type de données personnalisées.

La méthode BigQueryIO.write() renvoie un type BigQueryIO.Write<T>, qui permet de configurer l'opération d'écriture. Pour en savoir plus, consultez la section Écrire dans une table dans la documentation Apache Beam. Cet exemple de code écrit dans une table existante (CREATE_NEVER) et ajoute les nouvelles lignes à la table (WRITE_APPEND).

Java

Pour vous authentifier auprès de Dataflow, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez la page Configurer l'authentification pour un environnement de développement local.

import com.google.api.services.bigquery.model.TableRow;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;

public class BigQueryWrite {
  // A custom datatype for the source data.
  @DefaultCoder(AvroCoder.class)
  public static class MyData {
    public String name;
    public Long age;

    public MyData() {}

    public MyData(String name, Long age) {
      this.name = name;
      this.age = age;
    }
  }

  public static void main(String[] args) {
    // Example source data.
    final List<MyData> data = Arrays.asList(
        new MyData("Alice", 40L),
        new MyData("Bob", 30L),
        new MyData("Charlie", 20L)
    );

    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Create an in-memory PCollection of MyData objects.
        .apply(Create.of(data))
        // Write the data to an exiting BigQuery table.
        .apply(BigQueryIO.<MyData>write()
            .to(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withFormatFunction(
                (MyData x) -> new TableRow().set("user_name", x.name).set("age", x.age))
            .withCreateDisposition(CreateDisposition.CREATE_NEVER)
            .withWriteDisposition(WriteDisposition.WRITE_APPEND)
            .withMethod(Write.Method.STORAGE_WRITE_API));
    pipeline.run().waitUntilFinish();
  }
}

Écrire dans une table nouvelle ou existante

L'exemple suivant crée une table si la table de destination n'existe pas, en définissant la disposition de création sur CREATE_IF_NEEDED. Lorsque vous utilisez cette option, vous devez fournir un schéma de table. Le connecteur utilise ce schéma s'il crée une table.

Java

Pour vous authentifier auprès de Dataflow, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez la page Configurer l'authentification pour un environnement de développement local.

import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;

public class BigQueryWriteWithSchema {
  // A custom datatype for the source data.
  @DefaultCoder(AvroCoder.class)
  public static class MyData {
    public String name;
    public Long age;

    public MyData() {}

    public MyData(String name, Long age) {
      this.name = name;
      this.age = age;
    }
  }

  public static void main(String[] args) {
    // Example source data.
    final List<MyData> data = Arrays.asList(
        new MyData("Alice", 40L),
        new MyData("Bob", 30L),
        new MyData("Charlie", 20L)
    );

    // Define a table schema. A schema is required for write disposition CREATE_IF_NEEDED.
    TableSchema schema = new TableSchema()
        .setFields(
            Arrays.asList(
                new TableFieldSchema()
                    .setName("user_name")
                    .setType("STRING")
                    .setMode("REQUIRED"),
                new TableFieldSchema()
                    .setName("age")
                    .setType("INT64") // Defaults to NULLABLE
            )
        );

    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Create an in-memory PCollection of MyData objects.
        .apply(Create.of(data))
        // Write the data to a new or existing BigQuery table.
        .apply(BigQueryIO.<MyData>write()
            .to(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withFormatFunction(
                (MyData x) -> new TableRow().set("user_name", x.name).set("age", x.age))
            .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
            .withSchema(schema)
            .withMethod(Write.Method.STORAGE_WRITE_API)
        );
    pipeline.run().waitUntilFinish();
  }
}

Insérer des données en flux continu dans BigQuery

L'exemple suivant montre comment diffuser des données en flux continu à l'aide de la sémantique de type "exactement une fois", en définissant le mode d'écriture sur STORAGE_WRITE_API.

Tous les pipelines de traitement par flux ne nécessitent pas une sémantique de type "exactement une fois". Par exemple, vous pouvez peut-être supprimer manuellement les doublons dans la table de destination. Si la possibilité d'enregistrements en double est acceptable pour votre scénario, envisagez d'utiliser une sémantique de type "au moins une fois" en définissant la méthode d'écriture sur STORAGE_API_AT_LEAST_ONCE. Cette méthode est généralement plus efficace et permet de réduire la latence pour la plupart des pipelines.

Java

Pour vous authentifier auprès de Dataflow, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez la page Configurer l'authentification pour un environnement de développement local.

import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.joda.time.Duration;
import org.joda.time.Instant;

public class BigQueryStreamExactlyOnce {
  // Create a PTransform that sends simulated streaming data. In a real application, the data
  // source would be an external source, such as Pub/Sub.
  private static TestStream<String> createEventSource() {
    Instant startTime = new Instant(0);
    return TestStream.create(StringUtf8Coder.of())
        .advanceWatermarkTo(startTime)
        .addElements(
            TimestampedValue.of("Alice,20", startTime),
            TimestampedValue.of("Bob,30",
                startTime.plus(Duration.standardSeconds(1))),
            TimestampedValue.of("Charles,40",
                startTime.plus(Duration.standardSeconds(2))),
            TimestampedValue.of("Dylan,Invalid value",
                startTime.plus(Duration.standardSeconds(2))))
        .advanceWatermarkToInfinity();
  }

  public static PipelineResult main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);
    options.setStreaming(true);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Add a streaming data source.
        .apply(createEventSource())
        // Map the event data into TableRow objects.
        .apply(MapElements
            .into(TypeDescriptor.of(TableRow.class))
            .via((String x) -> {
              String[] columns = x.split(",");
              return new TableRow().set("user_name", columns[0]).set("age", columns[1]);
            }))
        // Write the rows to BigQuery
        .apply(BigQueryIO.writeTableRows()
            .to(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withCreateDisposition(CreateDisposition.CREATE_NEVER)
            .withWriteDisposition(WriteDisposition.WRITE_APPEND)
            .withMethod(Write.Method.STORAGE_WRITE_API)
            // For exactly-once processing, set the triggering frequency.
            .withTriggeringFrequency(Duration.standardSeconds(5)))
        // Get the collection of write errors.
        .getFailedStorageApiInserts()
        .apply(MapElements.into(TypeDescriptors.strings())
            // Process each error. In production systems, it's useful to write the errors to
            // another destination, such as a dead-letter table or queue.
            .via(
                x -> {
                  System.out.println("Failed insert: " + x.getErrorMessage());
                  System.out.println("Row: " + x.getRow());
                  return "";
                }));
    return pipeline.run();
  }
}

Étapes suivantes