E/S BigQuery

Les SDK Dataflow fournissent des transformations de lecture et d'écriture intégrées qui permettent de lire et d'écrire des données dans une table Google BigQuery. Vous pouvez lire une table entière dont vous spécifiez le nom, ou lire des données partielles en utilisant une chaîne de requête.

Spécifier un nom de table BigQuery

Pour lire ou écrire des données dans une table BigQuery, vous devez fournir un nom complet de table BigQuery. Ce nom se compose de trois parties :

  • Un ID de projet : l'ID de votre projet Google Cloud. La valeur par défaut provient de votre objet d'options de pipeline.
  • Un ID d'ensemble de données : l'ID de l'ensemble de données BigQuery. Il est unique au sein d'un projet Cloud spécifique.
  • Un ID de table : un ID de table unique au sein d'un ensemble de données spécifique.

Java

Notez que vous pouvez utiliser BigQueryIO sans indiquer de nom de projet ; s'il est omis, BigQueryIO utilise le projet par défaut de votre objet PipelineOptions.

L'API cliente BigQuery Java utilise un objet de type TableReference pour identifier la table BigQuery cible. Le package BigQueryIO du SDK Dataflow pour Java contient une méthode d'assistance, BigQueryIO.parseTableSpec, que vous pouvez utiliser pour construire un objet TableReference à partir d'un objet String contenant les trois parties du nom de votre table BigQuery.

La plupart du temps, vous n'avez pas besoin d'utiliser explicitement un objet TableReference. En effet, les méthodes de fabrique statiques d'une transformation BigQueryIO prennent le nom de la table en tant que String ; elles utilisent ensuite parseTableSpec en interne pour construire un objet TableReference à partir de l'élément String fourni.

Formats des chaînes de nom de table

Vous pouvez spécifier la table BigQuery cible à l'aide d'une chaîne contenant l'un des formats suivants :

  [project_id]:[dataset_id].[table_id]
  Example: "clouddataflow-readonly:samples.weather_stations"

Java

Vous pouvez également omettre project_id. Si vous omettez project_id, Cloud Dataflow utilise l'ID de projet par défaut de votre objet d'options de pipeline. En Java, l'ID est accessible avec PipelineOptions.getProject.

  [dataset_id].[table_id]
  Example: "samples.weather_stations"

Lignes et schémas de table BigQuery

Java

Les transformations BigQueryIO en lecture et en écriture produisent et consomment des données sous forme de PCollection d'objets TableRow BigQuery. TableRow fait partie de l'API cliente BigQuery Java, dans le package com.google.api.services.bigquery.model.TableRow.

De plus, lorsque vous écrivez des données dans BigQuery, vous devez fournir un objet TableSchema pour les champs que vous souhaitez écrire dans la table cible. Vous devez utiliser les classes TableSchema et TableFieldSchema de BigQuery. Ces classes sont respectivement définies dans les packages com.google.api.services.bigquery.model.TableSchema et com.google.api.services.bigquery.model.TableFieldSchema.

Lire des données depuis BigQuery

Java

Pour lire une table BigQuery, vous devez effectuer apply sur une transformation BigQueryIO.Read. BigQueryIO.Read renvoie un objet PCollection BigQuery TableRow, où chaque élément de PCollection représente une seule ligne de la table.

Vous pouvez lire une table BigQuery entière en spécifiant son nom à BigQueryIO.Read à l'aide de l'opération .from. L'exemple de code suivant montre comment appliquer la transformation BigQueryIO.Read pour lire une table BigQuery entière :

  PipelineOptions options = PipelineOptionsFactory.create();
  Pipeline p = Pipeline.create(options);

  PCollection<TableRow> weatherData = p.apply(
    BigQueryIO.Read
         .named("ReadWeatherStations")
         .from("clouddataflow-readonly:samples.weather_stations"));

Si vous ne souhaitez pas lire l'ensemble de la table, vous pouvez fournir une chaîne de requête à BigQueryIO.Read en utilisant l'opération .fromQuery. L'exemple de code suivant vous montre comment lire des champs spécifiques d'une table BigQuery grâce à une chaîne de requête :

  PipelineOptions options = PipelineOptionsFactory.create();
  Pipeline p = Pipeline.create(options);

  PCollection<TableRow> weatherData = p.apply(
    BigQueryIO.Read
         .named("ReadYearAndTemp")
         .fromQuery("SELECT year, mean_temp FROM [samples.weather_stations]");

Vous pouvez également utiliser le dialecte SQL standard de BigQuery, comme décrit dans l'exemple suivant :

PCollection<TableRow> weatherData = p.apply(
BigQueryIO.Read
    .named("ReadYearAndTemp")
    .fromQuery("SELECT year, mean_temp FROM `samples.weather_stations`")
    .usingStandardSql();

Lors de la lecture à partir de BigQuery, les valeurs entières des objets TableRow sont encodées en tant que String afin de correspondre au format JSON exporté par BigQuery.

Lorsque vous appliquez une transformation BigQueryIO.Read en mode de traitement par lots, Dataflow appelle une requête d'exportation BigQuery. Notez que l'utilisation de cette API par Dataflow est soumise aux règles relatives aux quotas et aux tarifs de BigQuery.

Écrire des données dans BigQuery

Java

Pour écrire dans une table BigQuery, vous devez apply une transformation BigQueryIO.Write. Vous devez apply pour effectuer la transformation en PCollection<TableRow>.

En général, vous devez effectuer une autre transformation, telle que ParDo, pour mettre en forme vos données de sortie en une collection d'objets BigQuery TableRow.

Lorsque vous créez une transformation BigQueryIO.Write, vous devez fournir des informations supplémentaires basées sur la table cible. Voici les éléments à spécifier en plus du nom de table :

  • La table cible est définie sur CreateDisposition. CreateDisposition indique si la table cible doit exister ou peut être créée par l'opération d'écriture.
  • La table cible est définie sur WriteDisposition. WriteDisposition spécifie si les données que vous écrivez remplacent une table existante, ajoutent des lignes à une table existante ou écrivent uniquement dans une table vide.

En outre, si votre opération d'écriture crée une table BigQuery, vous devez fournir les informations de schéma relatives à la table cible. Dans ce cas, vous devez inclure un objet TableSchema dans votre opération d'écriture.

CreateDisposition

La propriété CreateDisposition détermine si votre opération d'écriture BigQuery doit créer une table au cas où la table cible n'existe pas. Vous pouvez spécifier le CreateDisposition lorsque vous créez votre transformation BigQueryIO.Write en appelant la méthode .withCreateDisposition.

CreateDisposition est un enum avec les valeurs valides suivantes :

  • BigQueryIO.Write.CreateDisposition.CREATE_NEVER : indique qu'une table ne doit jamais être créée. Si la table cible n'existe pas, l'opération d'écriture échoue.
  • BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED : indique que l'opération d'écriture doit créer une table s'il n'en existe aucune. Si vous utilisez cette valeur, vous devez également fournir un schéma de table à l'aide de l'opération .withSchema. CREATE_IF_NEEDED est le comportement par défaut.

Notez que si vous spécifiez CREATE_IF_NEEDED en tant que CreateDisposition et que vous n'indiquez pas de TableSchema, la transformation peut échouer au moment de l'exécution avec une erreur java.lang.IllegalArgumentException si la table cible n'existe pas.

WriteDisposition

Le paramètre WriteDisposition contrôle la manière dont votre opération d'écriture BigQuery s'applique à une table existante. Vous pouvez spécifier le WriteDisposition lorsque vous créez votre transformation BigQueryIO.Write en appelant la méthode .withWriteDisposition.

WriteDisposition est un enum avec les valeurs valides suivantes :

  • BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE : indique que l'opération d'écriture doit remplacer une table existante. Toutes les lignes existantes de la table cible sont supprimées et les nouvelles lignes sont ajoutées à la table.
  • BigQueryIO.Write.WriteDisposition.WRITE_APPEND : indique que l'opération d'écriture doit ajouter les lignes à la fin de la table existante.
  • BigQueryIO.Write.WriteDisposition.WRITE_EMPTY : indique que l'opération d'écriture doit échouer lors de l'exécution si la table cible n'est pas vide. WRITE_EMPTY est le comportement par défaut.

Lorsque vous utilisez WRITE_EMPTY pour WriteDisposition, notez que la vérification d'état (vide ou non) de la table cible peut se produire bien avant l'opération d'écriture réelle. En outre, cette vérification ne garantit pas que votre pipeline disposera d'un accès exclusif à la table. Si deux programmes exécutés simultanément tentent d'écrire des données dans la même table de sortie avec un WriteDisposition de WRITE_EMPTY, les deux peuvent réussir.

Créer un schéma de table pour écrire des données dans une nouvelle table

Si votre opération d'écriture BigQuery crée une table, vous devez fournir les informations relatives au schéma Pour fournir les informations du schéma, créez un objet TableSchema. Vous pouvez transmettre TableSchema à l'aide de l'opération .withSchema lorsque vous créez votre transformation BigQueryIO.Write.

Un objet TableSchema contient des informations sur chaque champ de la table, à l'aide d'objets de type TableFieldSchema. Vous construisez un TableSchema en créant d'abord un List des champs de la table. Vous transmettez ensuite la liste à l'aide de l'opération .setFields lors de la construction du TableSchema.

L'exemple de code suivant montre comment créer un TableSchema pour une table avec deux champs de type String :

  List<TableFieldSchema> fields = new ArrayList<>();
  fields.add(new TableFieldSchema().setName("source").setType("STRING"));
  fields.add(new TableFieldSchema().setName("quote").setType("STRING"));
  TableSchema schema = new TableSchema().setFields(fields);

Appliquer une transformation BigQueryIO.Write

L'exemple de code suivant montre comment effectuer apply sur une transformation BigQueryIO.Write pour écrire un PCollection<TableRow> dans une table BigQuery. L'opération d'écriture crée une table si nécessaire. Si la table existe déjà, celle-ci est remplacée.

  PCollection<TableRow> quotes = ...;

  quotes.apply(BigQueryIO.Write
      .named("Write")
      .to("my-project:output.output_table")
      .withSchema(schema)
      .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
      .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
Apache Beam™ est une marque commerciale d'Apache Software Foundation ou de ses filiales aux États-Unis et/ou dans d'autres pays.