E/S BigQuery

Les SDK Dataflow fournissent des transformations de lecture et d'écriture intégrées qui permettent de lire et d'écrire des données dans une table Google BigQuery. Vous pouvez lire une table entière dont vous spécifiez le nom, ou lire des données partielles en utilisant une chaîne de requête.

Spécifier un nom de table BigQuery

Pour lire ou écrire des données dans une table BigQuery, vous devez fournir un nom complet de table BigQuery. Ce nom se compose de trois parties :

  • Un ID de projet : l'ID de votre projet Google Cloud. La valeur par défaut provient de votre objet d'options de pipeline.
  • Un ID d'ensemble de données : l'ID de l'ensemble de données BigQuery. Il est unique au sein d'un projet Cloud spécifique.
  • Un ID de table : un ID de table unique au sein d'un ensemble de données spécifique.

Java

Notez que vous pouvez utiliser BigQueryIO sans fournir de nom de projet. Si ce paramètre est omis, BigQueryIO utilise le projet par défaut de votre objet PipelineOptions.

L'API cliente BigQuery Java exploite un objet de type TableReference pour identifier la table BigQuery cible. Le package BigQueryIO du SDK Dataflow pour Java comprend une méthode d'assistance, BigQueryIO.parseTableSpec, qui vous permet de concevoir un objet TableReference à partir d'une chaîne String contenant les trois parties de votre nom de table BigQuery.

La plupart du temps, vous n'avez pas besoin d'utiliser explicitement un objet TableReference. Les méthodes de fabrication statiques relatives à une transformation BigQueryIO exploitent le nom de table en tant que chaîne String. Elles utilisent ensuite parseTableSpec en interne pour créer un objet TableReference à partir de la chaîne String fournie.

Formats des chaînes de nom de table

Vous pouvez spécifier la table BigQuery cible à l'aide d'une chaîne contenant l'un des formats suivants :

  [project_id]:[dataset_id].[table_id]
  Example: "clouddataflow-readonly:samples.weather_stations"

Java

Vous pouvez également omettre project_id. Si vous omettez project_id, Cloud Dataflow utilise l'ID de projet par défaut de votre objet d'options de pipeline. En Java, l'ID est accessible avec PipelineOptions.getProject.

  [dataset_id].[table_id]
  Example: "samples.weather_stations"

Lignes et schémas de table BigQuery

Java

Les transformations BigQueryIO de lecture et d'écriture produisent et consomment des données en tant que PCollection d'objets BigQuery TableRow. TableRow est inclus dans l'API cliente BigQuery Java, dans le package com.google.api.services.bigquery.model.TableRow.

De plus, lorsque vous écrivez des données dans BigQuery, vous devez spécifier un objet TableSchema pour les champs que vous souhaitez écrire dans la table cible. Vous devez alors utiliser les classes TableSchema et TableFieldSchema de BigQuery. Elles sont respectivement définies dans les packages com.google.api.services.bigquery.model.TableSchema et com.google.api.services.bigquery.model.TableFieldSchema.

Lire des données depuis BigQuery

Java

Pour lire des données depuis une table BigQuery, vous devez appliquer une transformation BigQueryIO.Read à l'aide de la méthode apply. La transformation BigQueryIO.Read renvoie une PCollection d'objets BigQuery TableRow. Chaque élément de la PCollection représente une ligne de la table.

Vous pouvez lire une table BigQuery entière en spécifiant son nom à BigQueryIO.Read à l'aide de l'opération .from. L'exemple de code suivant vous montre comment appliquer la transformation BigQueryIO.Read pour lire la totalité d'une table BigQuery :

  PipelineOptions options = PipelineOptionsFactory.create();
  Pipeline p = Pipeline.create(options);

  PCollection<TableRow> weatherData = p.apply(
    BigQueryIO.Read
         .named("ReadWeatherStations")
         .from("clouddataflow-readonly:samples.weather_stations"));

Si vous ne souhaitez pas lire la totalité de la table, vous pouvez fournir une chaîne de requête à BigQueryIO.Read à l'aide de l'opération .fromQuery. L'exemple de code suivant vous montre comment lire des champs spécifiques d'une table BigQuery grâce à une chaîne de requête :

  PipelineOptions options = PipelineOptionsFactory.create();
  Pipeline p = Pipeline.create(options);

  PCollection<TableRow> weatherData = p.apply(
    BigQueryIO.Read
         .named("ReadYearAndTemp")
         .fromQuery("SELECT year, mean_temp FROM [samples.weather_stations]");

Vous pouvez également utiliser le dialecte SQL standard de BigQuery, comme décrit dans l'exemple suivant :

PCollection<TableRow> weatherData = p.apply(
BigQueryIO.Read
    .named("ReadYearAndTemp")
    .fromQuery("SELECT year, mean_temp FROM `samples.weather_stations`")
    .usingStandardSql();

Lorsque vous lisez des données depuis BigQuery, notez que les valeurs entières des objets TableRow sont encodées en tant que chaînes String afin de respecter le format JSON exporté par BigQuery.

Lorsque vous appliquez une transformation BigQueryIO.Read en mode de traitement par lots, Dataflow appelle une requête d'exportation BigQuery. Notez que l'utilisation de cette API par Dataflow est soumise aux règles relatives aux quotas et aux tarifs de BigQuery.

Écrire des données dans BigQuery

Java

Pour écrire des données dans une table BigQuery, vous devez appliquer une transformation BigQueryIO.Write à l'aide de la méthode apply. Cette transformation doit être appliquée à un objet PCollection<TableRow> à l'aide de la méthode apply.

En général, vous devez effectuer une autre transformation, telle que ParDo, pour mettre en forme vos données de sortie en une collection d'objets BigQuery TableRow.

Lorsque vous créez une transformation BigQueryIO.Write, vous devez fournir certaines informations supplémentaires en fonction de la table cible. Voici les éléments à spécifier en plus du nom de table :

  • La propriété CreateDisposition de la table cible. CreateDisposition spécifie si la table cible doit exister ou si elle peut être créée par l'opération d'écriture.
  • La propriété WriteDisposition de la table cible. WriteDisposition spécifie si les données que vous écrivez remplaceront une table existante, ajouteront des lignes à une table existante, ou seront simplement écrites dans une table vide.

En outre, si votre opération d'écriture crée une table BigQuery, vous devez fournir les informations de schéma relatives à la table cible. Dans ce cas, vous devez inclure un objet TableSchema avec votre opération d'écriture.

CreateDisposition

La propriété CreateDisposition détermine si votre opération d'écriture BigQuery doit créer une table lorsque la table cible n'existe pas. Vous pouvez spécifier CreateDisposition lors de la création de votre transformation BigQueryIO.Write en appelant la méthode .withCreateDisposition.

CreateDisposition est une énumération enum qui accepte les valeurs suivantes :

  • BigQueryIO.Write.CreateDisposition.CREATE_NEVER : spécifie qu'une table ne doit jamais être créée. Si la table cible n'existe pas, l'opération d'écriture échoue.
  • BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED : spécifie que l'opération d'écriture doit créer une table s'il n'en existe aucune. Si vous utilisez cette valeur, vous devez également fournir un schéma de table à l'aide de l'opération .withSchema. CREATE_IF_NEEDED est le comportement par défaut.

Notez que si vous spécifiez CREATE_IF_NEEDED pour la propriété CreateDisposition, mais que vous ne fournissez pas de schéma TableSchema, la transformation peut échouer au moment de l'exécution et afficher une exception java.lang.IllegalArgumentException si la table cible n'existe pas.

WriteDisposition

La propriété WriteDisposition contrôle la façon dont votre opération d'écriture BigQuery s'applique à une table existante. Vous pouvez spécifier WriteDisposition lors de la création de votre transformation BigQueryIO.Write en appelant la méthode .withWriteDisposition.

WriteDisposition est une énumération enum qui accepte les valeurs suivantes :

  • BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE : spécifie que l'opération d'écriture doit remplacer une table existante. Toutes les lignes existantes de la table cible sont supprimées et les nouvelles lignes sont ajoutées à la table.
  • BigQueryIO.Write.WriteDisposition.WRITE_APPEND : spécifie que l'opération d'écriture doit ajouter les lignes à la fin de la table existante.
  • BigQueryIO.Write.WriteDisposition.WRITE_EMPTY : spécifie que l'opération d'écriture doit échouer au moment de l'exécution si la table cible n'est pas vide. WRITE_EMPTY est le comportement par défaut.

Lorsque vous utilisez WRITE_EMPTY avec la propriété WriteDisposition, notez que la vérification d'état (vide ou non) de la table cible peut se produire bien avant l'opération d'écriture réelle. En outre, cette vérification ne garantit pas que votre pipeline disposera d'un accès exclusif à la table. Si deux programmes exécutés simultanément tentent d'écrire des données dans la même table de sortie avec WriteDisposition défini sur WRITE_EMPTY, les deux opérations peuvent réussir.

Créer un schéma de table pour écrire des données dans une nouvelle table

Si votre opération d'écriture BigQuery crée une table, vous devez fournir les informations relatives au schéma en créant un objet TableSchema. Vous pouvez transmettre le schéma TableSchema à l'aide de l'opération .withSchema lors de la création de votre transformation BigQueryIO.Write.

Un objet TableSchema contient des informations sur chaque champ de la table, grâce à des objets de type TableFieldSchema. Pour créer un schéma TableSchema, vous devez d'abord établir une liste List des champs de la table. Vous la transmettez ensuite à l'aide de l'opération .setFields lors de la création du schéma TableSchema.

L'exemple de code suivant vous montre comment créer un schéma TableSchema pour une table avec deux champs de type String :

  List<TableFieldSchema> fields = new ArrayList<>();
  fields.add(new TableFieldSchema().setName("source").setType("STRING"));
  fields.add(new TableFieldSchema().setName("quote").setType("STRING"));
  TableSchema schema = new TableSchema().setFields(fields);

Appliquer une transformation BigQueryIO.Write

L'exemple de code suivant vous montre comment appliquer (à l'aide de la méthode apply) une transformation BigQueryIO.Write pour écrire un objet PCollection<TableRow> dans une table BigQuery. L'opération d'écriture crée une table si nécessaire. Si la table existe déjà, celle-ci est remplacée.

  PCollection<TableRow> quotes = ...;

  quotes.apply(BigQueryIO.Write
      .named("Write")
      .to("my-project:output.output_table")
      .withSchema(schema)
      .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
      .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
Apache Beam est une marque commerciale d'Apache Software Foundation ou de ses filiales aux États-Unis et/ou dans d'autres pays.
Cette page vous a-t-elle été utile ? Évaluez-la :

Envoyer des commentaires concernant…

Besoin d'aide ? Consultez notre page d'assistance.