Utiliser l'ancienne API de diffusion de flux

Ce document explique comment diffuser des données en flux continu dans BigQuery à l'aide de la méthode tabledata.insertAll.

Pour les nouveaux projets, nous vous recommandons d'utiliser l'API BigQuery Storage Write au lieu de la méthode tabledata.insertAll. L'API Storage Write présente des tarifs plus bas et des fonctionnalités plus robustes, y compris une sémantique de diffusion de type "exactement une fois". Si vous migrez un projet existant de la méthode tabledata.insertAll vers l'API Storage Write, nous vous recommandons de sélectionner le flux par défaut. La méthode tabledata.insertAll est toujours entièrement compatible.

Avant de commencer

  1. Assurez-vous de disposer d'un accès en écriture à l'ensemble de données contenant votre table de destination. Pour écrire des données dans une table, vérifiez que celle-ci existe au préalable, sauf si vous utilisez des modèles de tables. Pour en savoir plus, consultez la section Créer des tables automatiquement à l'aide de modèles.

  2. Consultez les règles relatives aux quotas d'insertion en flux continu.

  3. Make sure that billing is enabled for your Google Cloud project.

  4. L'insertion de données en flux continu n'est pas disponible avec la version gratuite. Le message d'erreur suivant s'affiche si vous essayez d'utiliser l'insertion en flux continu sans activer la facturation : BigQuery: Streaming insert is not allowed in the free tier..

  5. Attribuez aux utilisateurs des rôles IAM (Identity and Access Management) incluant les autorisations nécessaires pour effectuer l'ensemble des tâches du présent document.

Autorisations requises

Pour diffuser des données en flux continu dans BigQuery, vous devez disposer des autorisations IAM suivantes :

  • bigquery.tables.updateData (vous permet d'insérer des données dans la table)
  • bigquery.tables.get (vous permet d'obtenir des métadonnées de table)
  • bigquery.datasets.get (vous permet d'obtenir des métadonnées d'ensemble de données)
  • bigquery.tables.create (obligatoire si vous utilisez un modèle de table pour créer automatiquement la table)

Chacun des rôles IAM prédéfinis suivants inclut les autorisations dont vous avez besoin pour transférer des données en streaming dans BigQuery :

  • roles/bigquery.dataEditor
  • roles/bigquery.dataOwner
  • roles/bigquery.admin

Pour en savoir plus sur les rôles et les autorisations IAM dans BigQuery, consultez la page Rôles prédéfinis et autorisations.

Insérer des données en flux continu dans BigQuery

C#

Avant d'essayer cet exemple, suivez les instructions de configuration pour C# du guide de démarrage rapide de BigQuery : Utiliser les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API BigQuery pour C#.

Pour vous authentifier auprès de BigQuery, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez la page Configurer l'authentification pour les bibliothèques clientes.


using Google.Cloud.BigQuery.V2;

public class BigQueryTableInsertRows
{
    public void TableInsertRows(
        string projectId = "your-project-id",
        string datasetId = "your_dataset_id",
        string tableId = "your_table_id"
    )
    {
        BigQueryClient client = BigQueryClient.Create(projectId);
        BigQueryInsertRow[] rows = new BigQueryInsertRow[]
        {
            // The insert ID is optional, but can avoid duplicate data
            // when retrying inserts.
            new BigQueryInsertRow(insertId: "row1") {
                { "name", "Washington" },
                { "post_abbr", "WA" }
            },
            new BigQueryInsertRow(insertId: "row2") {
                { "name", "Colorado" },
                { "post_abbr", "CO" }
            }
        };
        client.InsertRows(datasetId, tableId, rows);
    }
}

Go

Avant d'essayer cet exemple, suivez les instructions de configuration pour Go du guide de démarrage rapide de BigQuery : Utiliser les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API BigQuery pour Go.

Pour vous authentifier auprès de BigQuery, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez la page Configurer l'authentification pour les bibliothèques clientes.

import (
	"context"
	"fmt"

	"cloud.google.com/go/bigquery"
)

// Item represents a row item.
type Item struct {
	Name string
	Age  int
}

// Save implements the ValueSaver interface.
// This example disables best-effort de-duplication, which allows for higher throughput.
func (i *Item) Save() (map[string]bigquery.Value, string, error) {
	return map[string]bigquery.Value{
		"full_name": i.Name,
		"age":       i.Age,
	}, bigquery.NoDedupeID, nil
}

// insertRows demonstrates inserting data into a table using the streaming insert mechanism.
func insertRows(projectID, datasetID, tableID string) error {
	// projectID := "my-project-id"
	// datasetID := "mydataset"
	// tableID := "mytable"
	ctx := context.Background()
	client, err := bigquery.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("bigquery.NewClient: %w", err)
	}
	defer client.Close()

	inserter := client.Dataset(datasetID).Table(tableID).Inserter()
	items := []*Item{
		// Item implements the ValueSaver interface.
		{Name: "Phred Phlyntstone", Age: 32},
		{Name: "Wylma Phlyntstone", Age: 29},
	}
	if err := inserter.Put(ctx, items); err != nil {
		return err
	}
	return nil
}

Java

Avant d'essayer cet exemple, suivez les instructions de configuration pour Java du guide de démarrage rapide de BigQuery : Utiliser les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API BigQuery pour Java.

Pour vous authentifier auprès de BigQuery, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez la page Configurer l'authentification pour les bibliothèques clientes.

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryError;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.InsertAllRequest;
import com.google.cloud.bigquery.InsertAllResponse;
import com.google.cloud.bigquery.TableId;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

// Sample to inserting rows into a table without running a load job.
public class TableInsertRows {

  public static void main(String[] args) {
    // TODO(developer): Replace these variables before running the sample.
    String datasetName = "MY_DATASET_NAME";
    String tableName = "MY_TABLE_NAME";
    // Create a row to insert
    Map<String, Object> rowContent = new HashMap<>();
    rowContent.put("booleanField", true);
    rowContent.put("numericField", "3.14");
    // TODO(developer): Replace the row id with a unique value for each row.
    String rowId = "ROW_ID";
    tableInsertRows(datasetName, tableName, rowId, rowContent);
  }

  public static void tableInsertRows(
      String datasetName, String tableName, String rowId, Map<String, Object> rowContent) {
    try {
      // Initialize client that will be used to send requests. This client only needs to be created
      // once, and can be reused for multiple requests.
      BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();

      // Get table
      TableId tableId = TableId.of(datasetName, tableName);

      // Inserts rowContent into datasetName:tableId.
      InsertAllResponse response =
          bigquery.insertAll(
              InsertAllRequest.newBuilder(tableId)
                  // More rows can be added in the same RPC by invoking .addRow() on the builder.
                  // You can omit the unique row ids to disable de-duplication.
                  .addRow(rowId, rowContent)
                  .build());

      if (response.hasErrors()) {
        // If any of the insertions failed, this lets you inspect the errors
        for (Map.Entry<Long, List<BigQueryError>> entry : response.getInsertErrors().entrySet()) {
          System.out.println("Response error: \n" + entry.getValue());
        }
      }
      System.out.println("Rows successfully inserted into table");
    } catch (BigQueryException e) {
      System.out.println("Insert operation not performed \n" + e.toString());
    }
  }
}

Node.js

Avant d'essayer cet exemple, suivez les instructions de configuration pour Node.js du guide de démarrage rapide de BigQuery : Utiliser les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API BigQuery pour Node.js.

Pour vous authentifier auprès de BigQuery, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez la page Configurer l'authentification pour les bibliothèques clientes.

// Import the Google Cloud client library
const {BigQuery} = require('@google-cloud/bigquery');
const bigquery = new BigQuery();

async function insertRowsAsStream() {
  // Inserts the JSON objects into my_dataset:my_table.

  /**
   * TODO(developer): Uncomment the following lines before running the sample.
   */
  // const datasetId = 'my_dataset';
  // const tableId = 'my_table';
  const rows = [
    {name: 'Tom', age: 30},
    {name: 'Jane', age: 32},
  ];

  // Insert data into a table
  await bigquery.dataset(datasetId).table(tableId).insert(rows);
  console.log(`Inserted ${rows.length} rows`);
}

PHP

Avant d'essayer cet exemple, suivez les instructions de configuration pour PHP du guide de démarrage rapide de BigQuery : Utiliser les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API BigQuery pour PHP.

Pour vous authentifier auprès de BigQuery, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez la page Configurer l'authentification pour les bibliothèques clientes.

use Google\Cloud\BigQuery\BigQueryClient;

/**
 * Stream data into bigquery
 *
 * @param string $projectId The project Id of your Google Cloud Project.
 * @param string $datasetId The BigQuery dataset ID.
 * @param string $tableId The BigQuery table ID.
 * @param string $data Json encoded data For eg,
 *    $data = json_encode([
 *       "field1" => "value1",
 *       "field2" => "value2",
 *    ]);
 */
function stream_row(
    string $projectId,
    string $datasetId,
    string $tableId,
    string $data
): void {
    // instantiate the bigquery table service
    $bigQuery = new BigQueryClient([
      'projectId' => $projectId,
    ]);
    $dataset = $bigQuery->dataset($datasetId);
    $table = $dataset->table($tableId);

    $data = json_decode($data, true);
    $insertResponse = $table->insertRows([
      ['data' => $data],
      // additional rows can go here
    ]);
    if ($insertResponse->isSuccessful()) {
        print('Data streamed into BigQuery successfully' . PHP_EOL);
    } else {
        foreach ($insertResponse->failedRows() as $row) {
            foreach ($row['errors'] as $error) {
                printf('%s: %s' . PHP_EOL, $error['reason'], $error['message']);
            }
        }
    }
}

Python

Avant d'essayer cet exemple, suivez les instructions de configuration pour Python du guide de démarrage rapide de BigQuery : Utiliser les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API BigQuery pour Python.

Pour vous authentifier auprès de BigQuery, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez la page Configurer l'authentification pour les bibliothèques clientes.

from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()

# TODO(developer): Set table_id to the ID of table to append to.
# table_id = "your-project.your_dataset.your_table"

rows_to_insert = [
    {"full_name": "Phred Phlyntstone", "age": 32},
    {"full_name": "Wylma Phlyntstone", "age": 29},
]

errors = client.insert_rows_json(table_id, rows_to_insert)  # Make an API request.
if errors == []:
    print("New rows have been added.")
else:
    print("Encountered errors while inserting rows: {}".format(errors))

Ruby

Avant d'essayer cet exemple, suivez les instructions de configuration pour Ruby du guide de démarrage rapide de BigQuery : Utiliser les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API BigQuery pour Ruby.

Pour vous authentifier auprès de BigQuery, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez la page Configurer l'authentification pour les bibliothèques clientes.

require "google/cloud/bigquery"

def table_insert_rows dataset_id = "your_dataset_id", table_id = "your_table_id"
  bigquery = Google::Cloud::Bigquery.new
  dataset  = bigquery.dataset dataset_id
  table    = dataset.table table_id

  row_data = [
    { name: "Alice", value: 5  },
    { name: "Bob",   value: 10 }
  ]
  response = table.insert row_data

  if response.success?
    puts "Inserted rows successfully"
  else
    puts "Failed to insert #{response.error_rows.count} rows"
  end
end

Vous n'avez pas besoin de renseigner le champ insertID lorsque vous insérez des lignes. L'exemple suivant montre comment éviter l'envoi de insertID pour chaque ligne lors de l'insertion en flux continu.

Java

Avant d'essayer cet exemple, suivez les instructions de configuration pour Java du guide de démarrage rapide de BigQuery : Utiliser les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API BigQuery pour Java.

Pour vous authentifier auprès de BigQuery, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez la page Configurer l'authentification pour les bibliothèques clientes.

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryError;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.InsertAllRequest;
import com.google.cloud.bigquery.InsertAllResponse;
import com.google.cloud.bigquery.TableId;
import com.google.common.collect.ImmutableList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

// Sample to insert rows without row ids in a table
public class TableInsertRowsWithoutRowIds {

  public static void main(String[] args) {
    // TODO(developer): Replace these variables before running the sample.
    String datasetName = "MY_DATASET_NAME";
    String tableName = "MY_TABLE_NAME";
    tableInsertRowsWithoutRowIds(datasetName, tableName);
  }

  public static void tableInsertRowsWithoutRowIds(String datasetName, String tableName) {
    try {
      // Initialize client that will be used to send requests. This client only needs to be created
      // once, and can be reused for multiple requests.
      final BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
      // Create rows to insert
      Map<String, Object> rowContent1 = new HashMap<>();
      rowContent1.put("stringField", "Phred Phlyntstone");
      rowContent1.put("numericField", 32);
      Map<String, Object> rowContent2 = new HashMap<>();
      rowContent2.put("stringField", "Wylma Phlyntstone");
      rowContent2.put("numericField", 29);
      InsertAllResponse response =
          bigquery.insertAll(
              InsertAllRequest.newBuilder(TableId.of(datasetName, tableName))
                  // No row ids disable de-duplication, and also disable the retries in the Java
                  // library.
                  .setRows(
                      ImmutableList.of(
                          InsertAllRequest.RowToInsert.of(rowContent1),
                          InsertAllRequest.RowToInsert.of(rowContent2)))
                  .build());

      if (response.hasErrors()) {
        // If any of the insertions failed, this lets you inspect the errors
        for (Map.Entry<Long, List<BigQueryError>> entry : response.getInsertErrors().entrySet()) {
          System.out.println("Response error: \n" + entry.getValue());
        }
      }
      System.out.println("Rows successfully inserted into table without row ids");
    } catch (BigQueryException e) {
      System.out.println("Insert operation not performed \n" + e.toString());
    }
  }
}

Python

Avant d'essayer cet exemple, suivez les instructions de configuration pour Python du guide de démarrage rapide de BigQuery : Utiliser les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API BigQuery pour Python.

Pour vous authentifier auprès de BigQuery, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez la page Configurer l'authentification pour les bibliothèques clientes.

from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()

# TODO(developer): Set table_id to the ID of table to append to.
# table_id = "your-project.your_dataset.your_table"

rows_to_insert = [
    {"full_name": "Phred Phlyntstone", "age": 32},
    {"full_name": "Wylma Phlyntstone", "age": 29},
]

errors = client.insert_rows_json(
    table_id, rows_to_insert, row_ids=[None] * len(rows_to_insert)
)  # Make an API request.
if errors == []:
    print("New rows have been added.")
else:
    print("Encountered errors while inserting rows: {}".format(errors))

Envoyer des données de date et d'heure

Pour les champs de date et d'heure, mettez en forme les données de la méthode tabledata.insertAll comme suit :

Type Format
DATE Une chaîne au format "YYYY-MM-DD"
DATETIME Une chaîne au format "YYYY-MM-DD [HH:MM:SS]"
TIME Une chaîne au format "HH:MM:SS"
TIMESTAMP Nombre de secondes écoulées depuis le 1970-01-01 (époque Unix) ou chaîne sous la forme "YYYY-MM-DD HH:MM[:SS]"

Envoyer des données de plage

Pour les champs de type RANGE<T>, formatez les données de la méthode tabledata.insertAll en tant qu'objet JSON avec deux champs, start et end. Les valeurs manquantes ou NULL pour les champs start et end représentent des limites illimitées. Ces champs doivent avoir le même format JSON compatible de type T, où T peut être défini sur DATE, DATETIME ou TIMESTAMP.

Dans l'exemple suivant, le champ f_range_date représente une colonne RANGE<DATE> dans une table. Une ligne est insérée dans cette colonne à l'aide de l'API tabledata.insertAll.

{
    "f_range_date": {
        "start": "1970-01-02",
        "end": null
    }
}

Disponibilité des données par flux

Les données sont disponibles pour une analyse en temps réel à l'aide de requêtes GoogleSQL immédiatement après que BigQuery a accusé réception d'une requête tabledata.insertAll.

Les lignes récemment diffusées vers une table partitionnée par date d'ingestion ont temporairement une valeur NULL pour la pseudo-colonne _PARTITIONTIME. Pour ces lignes, BigQuery attribue la valeur non nulle finale de la colonne PARTITIONTIME en arrière-plan, généralement en quelques minutes. Dans de rares cas, cela peut prendre jusqu'à 90 minutes.

Certaines lignes récemment diffusées peuvent ne pas être disponibles pour la copie de table, généralement pendant quelques minutes. Dans de rares cas, cela peut prendre jusqu'à 90 minutes. Pour savoir si les données peuvent être copiées, consultez la réponse tables.get pour une section nommée streamingBuffer. Si la section streamingBuffer est absente, vos données peuvent être copiées. Vous pouvez également utiliser le champ streamingBuffer.oldestEntryTime pour identifier l'âge des enregistrements dans le tampon d'insertion en flux continu.

Déduplication optimale

Lorsque vous fournissez insertId pour une ligne insérée, BigQuery utilise cet ID pour assurer la déduplication optimale pendant une durée maximale d'une minute. Par conséquent, si vous insérez la même ligne en flux continu avec le même insertId plusieurs fois au cours de cette période dans la même table, BigQuery peut dédupliquer le contenu. plusieurs occurrences de cette ligne, en n'en conservant qu'une seule.

Le système s'attend à ce que les lignes fournies avec des insertId identiques soient également identiques. Si deux lignes ont des insertId identiques, le choix de la ligne conservée par BigQuery n'est pas déterministe.

La déduplication est généralement destinée aux scénarios de nouvelle tentative dans un système distribué où il n'existe aucun moyen de déterminer l'état d'une insertion en flux continu en présence de certaines conditions d'erreur, telles que des erreurs réseau entre votre système et BigQuery, ou des erreurs internes à BigQuery. Dans ce cas, lors de la nouvelle tentative d'insertion, utilisez le même identifiant insertId pour le même ensemble de lignes afin que BigQuery puisse tenter de dédupliquer les données. Pour en savoir plus, consultez la section Dépannage pour les insertions en flux continu.

La déduplication optimale proposée par BigQuery ne doit pas être considérée comme un mécanisme permettant de garantir l'absence de doublons dans vos données. De plus, BigQuery peut à tout moment réduire la qualité de la déduplication optimale afin de garantir une plus grande fiabilité et une plus grande disponibilité des données.

Si vous avez des exigences strictes de déduplication pour vos données, sachez que le service Google Cloud Datastore autorise les transactions.

Désactiver la déduplication optimale

Vous pouvez désactiver la déduplication optimale en omettant le champ insertId pour chaque ligne insérée. Il s'agit de la méthode recommandée pour insérer des données.

Apache Beam et Dataflow

Pour empêcher la déduplication optimale lorsque vous utilisez le connecteur d'E/S BigQuery d'Apache Beam pour Java, utilisez la méthode ignoreInsertIds().

Supprimer manuellement les doublons

Pour vous assurer qu'il n'existe aucune ligne en double après l'insertion en flux continu, procédez comme suit :

  1. Ajoutez la valeur insertId en tant que colonne du schéma de table et incluez insertId dans les données de chaque ligne.
  2. Lorsque l'insertion en flux continu est terminée, exécutez la requête suivante pour rechercher les doublons :

    #standardSQL
    SELECT
      MAX(count) FROM(
      SELECT
        ID_COLUMN,
        count(*) as count
      FROM
        `TABLE_NAME`
      GROUP BY
        ID_COLUMN)

    Si le résultat est supérieur à 1, il existe des doublons.
  3. Pour supprimer les lignes en double, exécutez la requête suivante. Spécifiez une table de destination, autorisez les résultats volumineux et désactivez leur regroupement.

    #standardSQL
    SELECT
      * EXCEPT(row_number)
    FROM (
      SELECT
        *,
        ROW_NUMBER()
              OVER (PARTITION BY ID_COLUMN) row_number
      FROM
        `TABLE_NAME`)
    WHERE
      row_number = 1

Remarques sur la requête de suppression des doublons :

  • La stratégie la plus sûre consiste à cibler une nouvelle table. Vous pouvez également cibler la table source avec la préférence d'écriture WRITE_TRUNCATE.
  • La requête de suppression des doublons ajoute une colonne row_number avec la valeur 1 à la fin du schéma de table. Elle utilise une instruction SELECT * EXCEPT du langage GoogleSQL pour exclure la colonne row_number de la table de destination. Le préfixe #standardSQL active GoogleSQL pour cette requête. Vous pouvez également sélectionner des noms de colonne spécifiques pour omettre cette colonne.
  • Pour interroger des données actives avec doublons supprimés, vous pouvez également créer une vue sur votre table à l'aide de la requête de suppression des doublons. Sachez que le coût de la requête par rapport à la vue est calculé en fonction des colonnes sélectionnées dans votre vue, ce qui peut entraîner de grandes tailles d'octets analysés.

Insérer des données en flux continu dans des tables partitionnées par date

Lorsque vous insérez des données en flux continu dans une table partitionnée par date, chaque partition dispose d'un tampon d'insertion en flux continu. Ce tampon d'insertion en flux continu est conservé lorsque vous effectuez une tâche de chargement, de requête ou de copie qui écrase une partition en définissant la propriété writeDisposition sur WRITE_TRUNCATE. Si vous souhaitez supprimer le tampon d'insertion en flux continu, vérifiez qu'il est vide en appelant tables.get sur la partition.

Partitionnement par date d'ingestion

Lorsque vous insérez du contenu en flux continu dans une table partitionnée par date d'ingestion, BigQuery déduit la partition de destination de l'heure UTC actuelle.

Les données qui arrivent sont temporairement placées dans la partition __UNPARTITIONED__ dans le tampon d'insertion en flux continu. Lorsque les données non partitionnées sont suffisantes, BigQuery partitionne les données dans la partition appropriée. Toutefois, il n'existe aucun contrat de niveau de service pour le délai de transfert des données hors de la partition __UNPARTITIONED__. Les données du tampon peuvent être exclues d'une requête par le filtrage des valeurs NULL de la partition __UNPARTITIONED__ à l'aide de l'une des pseudo-colonnes (_PARTITIONTIME ou _PARTITIONDATE selon le type de données préféré).

Si vous insérez des données en flux continu dans une table partitionnée par jour, vous pouvez remplacer la date déduite en fournissant un décorateur de partition dans la requête insertAll. Incluez le décorateur dans le paramètre tableId. Par exemple, vous avez la possibilité d'insérer des données en flux continu dans la partition correspondant à 2021-03-01 pour la table table1 à l'aide du décorateur de partition suivant :

table1$20210301

Lors de l'insertion en flux continu à l'aide d'un décorateur de partition, vous pouvez insérer des données dans des partitions pour la période comprise entre les 31 derniers jours et les 16 prochains jours par rapport à la date actuelle (heure UTC). Pour écrire sur des partitions pour des dates situées en dehors de ces limites autorisées, utilisez plutôt une tâche de chargement ou de requête, comme décrit dans la section Ajouter ou écraser des données dans une table partitionnée.

L'insertion à l'aide d'un décorateur de partition n'est compatible qu'avec les tables partitionnées quotidiennes. Ce n'est pas compatible avec les tables partitionnées par heure, par mois ou par année.

À des fins de test, vous pouvez utiliser la commande CLI bq insert de l'outil de ligne de commande bq. Par exemple, la commande suivante permet d'insérer en flux continu une seule ligne de la partition pour la date du 1er janvier 2017 ($20170101) dans une table partitionnée nommée mydataset.mytable :

echo '{"a":1, "b":2}' | bq insert 'mydataset.mytable$20170101'

Partitionnement par colonnes d'unités de temps

Vous pouvez insérer des données en flux continu dans une table partitionnée en fonction d'une colonne DATE, DATETIME ou TIMESTAMP située dans la période comprise entre les cinq années qui précèdent et l'année qui suit. Les données qui ne s'appliquent pas à cette période sont refusées.

Lorsque les données sont diffusées en continu, elles sont initialement placées dans la partition __UNPARTITIONED__. Lorsque les données non partitionnées sont suffisantes, BigQuery les repartitionne automatiquement en les plaçant dans la partition appropriée. Toutefois, il n'existe aucun contrat de niveau de service pour le délai de transfert des données hors de la partition __UNPARTITIONED__.

  • Remarque : Les partitions quotidiennes sont traitées différemment des partitions horaires, mensuelles et annuelles. Seules les données situées en dehors de la plage de dates (les sept derniers jours au-delà de trois jours) sont extraites dans la partition UNPARTITIONED, en attente de repartitionnement. En revanche, pour la table partitionnée par heure, les données sont toujours extraites vers la partition UNPARTITIONED, puis partitionnées à nouveau.

Créer des tables automatiquement à l'aide de modèles

Les tables créées à partir de modèles fournissent un mécanisme permettant de diviser une table logique en tables plus petites pour créer des ensembles de données plus restreints (par exemple, par ID utilisateur). Les tables créées à partir de modèles présentent un certain nombre de limites, décrites ci-dessous. À la place, les tables partitionnées et les tables en cluster sont les méthodes recommandées pour obtenir ce comportement.

Pour utiliser un modèle de table avec l'API BigQuery, ajoutez un paramètre templateSuffix à la requête insertAll. Pour l'outil de ligne de commande bq, ajoutez l'option template_suffix à la commande insert. Si BigQuery détecte un paramètre templateSuffix ou l'option template_suffix, il traite la table ciblée en tant que modèle de base. Il crée une table qui partage le même schéma que la table ciblée et dont le nom inclut le suffixe spécifié :

<targeted_table_name> + <templateSuffix>

Un modèle de table vous évite de devoir créer chaque table individuellement et de spécifier le schéma de chacune. Il vous suffit de créer un modèle unique et de fournir différents suffixes pour que BigQuery génère les nouvelles tables pour vous. BigQuery place les tables dans les mêmes projet et ensemble de données.

Les tables créées à l'aide de modèles sont généralement disponibles en quelques secondes. Il arrive en de rares occasions que le processus prenne plus de temps.

Modifier le schéma d'un modèle de table

Si vous modifiez le schéma d'un modèle de table, toutes les tables générées utilisent le schéma mis à jour. Les tables existantes ne sont pas concernées, sauf si elles contiennent toujours un tampon d'insertion en flux continu.

Si vous modifiez le schéma d'un modèle de table avec rétrocompatibilité, le schéma des tables existantes qui contiennent toujours un tampon d'insertion en flux continu est également mis à jour. Sans rétrocompatibilité par contre, toutes les données mises en mémoire tampon utilisant l'ancien schéma sont perdues. En outre, vous ne pouvez pas insérer de nouvelles données en flux continu dans des tables existantes qui utilisent l'ancien schéma, désormais incompatible.

Une fois que vous avez modifié le schéma d'un modèle de table, attendez la propagation des modifications avant d'essayer d'insérer de nouvelles données ou d'interroger des tables générées. Les requêtes d'insertion de nouveaux champs devraient aboutir en quelques minutes. Les tentatives d'interrogation des nouveaux champs peuvent prendre plus de temps, jusqu'à 90 minutes.

Pour modifier le schéma d'une table générée, attendez que l'insertion en flux continu à l'aide du modèle de table se termine et que la section des statistiques d'insertion disparaisse de la réponse tables.get() (ce qui indique qu'aucune donnée n'est mise en mémoire tampon dans la table).

Les tables partitionnées et les tables en cluster ne sont pas soumises aux limites mentionnées ci-dessus, et constituent le mécanisme recommandé.

Détails du modèle de table

Valeur du suffixe de modèle
La valeur templateSuffix (ou --template_suffix) ne doit contenir que des lettres (a-z, A-Z), des chiffres (0-9) ou des traits de soulignement (_). La longueur maximale combinée du nom de table et du suffixe de table est de 1 024 caractères.
Quota

Les tables créées à partir de modèles sont soumises aux limites de quotas d'insertions en flux continu. Votre projet peut générer jusqu'à 10 tables par seconde avec des modèles de tables, à l'instar de l'API tables.insert. Ce quota ne s'applique qu'aux tables en cours de création, et non aux tables en cours de modification.

Si votre application doit créer plus de 10 tables par seconde, nous vous recommandons d'utiliser des tables en cluster. Par exemple, vous pouvez placer l'ID de table à cardinalité élevée dans la colonne de clé d'une seule table de clustering.

Durée de vie

La table générée hérite son délai d'expiration de l'ensemble de données. Comme les données insérées en flux continu, les tables générées ne peuvent pas être immédiatement copiées.

Déduplication

La déduplication ne se produit qu'entre des références uniformes à une table de destination. Par exemple, si vous insérez des données en flux continu dans une table générée à l'aide de modèles et d'une commande insertAll standard de façon simultanée, aucune déduplication ne se produit entre les lignes insérées par les modèles et par la commande insertAll standard.

Vues

Le modèle de table et les tables générées ne doivent pas être des vues.

Résoudre les problèmes liés aux insertions en flux continu

Les sections suivantes expliquent comment résoudre les erreurs qui se produisent lorsque vous diffusez des données en flux continu dans BigQuery à l'aide de l'ancienne API de diffusion de flux. Pour en savoir plus sur la résolution des erreurs de quota pour les insertions en flux continu, consultez la section Erreurs de quotas d'insertion en flux continu.

Codes de réponse HTTP d'échec

Si vous recevez un code de réponse HTTP d'échec, par exemple une erreur de réseau, il est impossible de savoir si l'insertion en flux continu a réussi. Si vous essayez simplement de renvoyer la requête, des lignes risquent d'apparaître en double dans votre table. Pour éviter les doublons dans la table, définissez la propriété insertId lors de l'envoi de la requête. BigQuery utilise la propriété insertId pour éliminer les doublons.

Si vous recevez une erreur d'autorisation, une erreur de nom de table non valide ou une erreur de quota dépassé, aucune ligne n'est insérée et l'intégralité de la requête échoue.

Codes de réponse HTTP de réussite

Même si vous recevez un code de réponse HTTP de réussite, vous devez examiner la propriété insertErrors de la réponse pour déterminer si les lignes ont bien été insérées, car il se peut que BigQuery n'ait réussi à insérer les lignes que partiellement. Vous pouvez rencontrer l'un des cas suivants :

  • Toutes les lignes ont bien été insérées. Si la propriété insertErrors est une liste vide, toutes les lignes ont été insérées correctement.
  • Certaines lignes ont bien été insérées. Sauf en cas d'incompatibilité de schémas dans l'une des lignes, les lignes ont été insérées correctement, sauf celles indiquées dans la propriété insertErrors. La propriété errors contient des informations détaillées sur la raison de l'échec de chaque ligne n'ayant pas été insérée. La propriété index indique l'index de ligne de base 0 de la requête à laquelle renvoie l'erreur.
  • Aucune ligne n'a été insérée. Si BigQuery rencontre une incompatibilité de schéma sur certaines des lignes de la requête, aucune des lignes n'est insérée et une entrée insertErrors est renvoyée pour chaque ligne, même celles qui ne présentent pas une incompatibilité de schéma. Les lignes sans incompatibilité de schéma présentent une erreur lorsque la propriété reason est définie sur stopped, et peuvent être renvoyées telles quelles. Les lignes qui ont échoué incluent des informations détaillées sur l'incompatibilité du schéma. Pour en savoir plus sur les types de tampons de protocole compatibles avec chaque type de données BigQuery, consultez la section Conversions de types de données.

Erreurs de métadonnées pour les insertions en flux continu

Étant donné que l'API de flux continu de BigQuery est conçue pour des taux d'insertion élevés, les modifications apportées à l'exposition des métadonnées de la table sous-jacente finissent par être cohérentes, lors de l'interaction avec le système de flux continu. La plupart du temps, les modifications des métadonnées sont propagées en quelques minutes. Les réponses de l'API peuvent cependant refléter un état incohérent de la table pendant cette période.

Scénarios possibles :

  • Modifications du schéma. La modification du schéma d'une table qui a récemment reçu des insertions en flux continu peut causer des réponses avec erreurs d'incompatibilité de schémas, car le système de flux continu peut ne pas relever immédiatement le changement de schéma.
  • Création/Suppression de table. Un flux continu dirigé vers une table qui n'existe pas renverra une variation de réponse notFound. La table créée dans la réponse peut ne pas être immédiatement reconnue par les insertions en flux continu suivantes. De même, supprimer ou recréer une table peut engendrer une période pendant laquelle les insertions en flux continu sont effectivement présentées à l'ancienne table. Les insertions en flux continu peuvent ne pas se trouver dans la nouvelle table.
  • Troncation de table. Le fait de tronquer les données d'une table (par exemple, à l'aide d'une tâche de requête utilisant writeDisposition de WRITE_TRUNCATE) peut également entraîner la suppression des insertions suivantes pendant la période de cohérence.

Données manquantes ou non disponibles

Les insertions en flux continu résident temporairement dans le stockage optimisé en écriture, qui présente des caractéristiques de disponibilité différentes de celles du stockage géré. Certaines opérations dans BigQuery n'interagissent pas avec le stockage optimisé en écriture, comme les tâches de copie de table et les méthodes d'API telles que tabledata.list. De ce fait, les données de flux continu récentes ne seront pas présentes dans la table ou la sortie de destination.