Charger et diffuser des données par lot avec l'API BigQuery Storage Write

L'API BigQuery Storage Write est une API d'ingestion de données unifiée pour BigQuery. Elle combine l'ingestion par flux et le chargement par lot dans une seule API hautes performances. Vous pouvez utiliser l'API Storage Write pour diffuser des enregistrements dans BigQuery en temps réel ou pour traiter par lot un grand nombre d'enregistrements et effectuer leur commit en une seule opération atomique.

Avantages de l'utilisation de l'API Storage Write

Sémantique de diffusion "exactement une fois". L'API Storage Write est compatible avec la sémantique "exactement une fois" grâce à l'utilisation de décalages de flux. Contrairement à la méthode tabledata.insertAll, l'API Storage Write n'écrit jamais deux messages avec le même décalage dans un flux, si le client fournit des décalages de flux lors de l'ajout d'enregistrements.

Transactions au niveau du flux. Vous pouvez écrire des données dans un flux et les valider en tant que transaction unique. Si l'opération de commit échoue, vous pouvez retenter l'opération en toute sécurité.

Transactions dans plusieurs flux. Plusieurs nœuds de calcul peuvent créer leurs propres flux pour traiter les données de manière indépendante. Lorsque tous les nœuds de calcul sont terminés, vous pouvez valider tous les flux en tant que transaction.

Protocole efficace. L'API Storage Write est plus efficace que l'ancienne méthode insertAll, car elle utilise le streaming gRPC plutôt que REST via HTTP. L'API Storage Write est également compatible avec les formats binaires sous la forme de tampons de protocole, qui constituent un format de communication plus efficace que JSON. Les requêtes d'écriture sont asynchrones avec un ordre garanti.

Détection des mises à jour de schéma. Si le schéma de la table sous-jacente change alors que le client est en streaming, l'API Storage Write informe le client. Le client peut choisir de se reconnecter à l'aide du schéma mis à jour ou de continuer à écrire sur la connexion existante.

Réduction des coûts L'API Storage Write a un coût nettement inférieur à celui de l'ancienne API de diffusion par flux insertAll. En outre, vous pouvez ingérer jusqu'à 2 To par mois gratuitement.

Autorisations requises

Pour utiliser l'API Storage Write, vous devez disposer des autorisations bigquery.tables.updateData.

Les rôles IAM (Identity and Access Management) suivants incluent les autorisations bigquery.tables.updateData :

  • bigquery.dataEditor
  • bigquery.dataOwner
  • bigquery.admin

Pour en savoir plus sur les rôles et les autorisations IAM dans BigQuery, consultez la page Rôles prédéfinis et autorisations.

Présentation de l'API Storage Write

L'abstraction principale dans l'API Storage Write est un flux. Un flux écrit des données dans une table BigQuery. Plusieurs flux peuvent écrire simultanément dans la même table.

Diffusion par défaut

L'API Storage Write fournit un flux par défaut conçu pour les scénarios dans lesquels vous recevez des données en continu. Il présente les caractéristiques suivantes :

  • Les données écrites dans le flux par défaut sont disponibles immédiatement pour la requête.
  • Le flux par défaut accepte la sémantique de type "au moins une fois".
  • Vous n'avez pas besoin de créer explicitement le flux par défaut.

Si vous effectuez une migration à partir de l'ancienne API tabledata.insertall, envisagez d'utiliser le flux par défaut. Il présente une sémantique d'écriture similaire, avec une plus grande résilience des données et moins de restrictions de scaling.

Pour plus d'informations et un exemple de code, consultez la section Utiliser le flux par défaut pour la sémantique de type "au moins une fois".

Flux créés par l'application

Vous pouvez créer explicitement un flux si vous avez besoin de l'un des comportements suivants :

  • Commits au niveau du flux pour les charges de travail par lot.
  • Sémantique d'écriture de type "exactement une fois" grâce à l'utilisation de décalages de flux.

Lorsque vous créez un flux, vous spécifiez un mode. Le mode contrôle à quel moment les données écrites dans le flux deviennent visibles dans BigQuery pour lecture.

  • Mode en attente : les enregistrements sont mis en mémoire tampon dans un état en attente jusqu'à ce que vous effectuiez un commit du flux. Lorsque vous effectuez le commit d'un flux, toutes les données en attente deviennent disponibles pour lecture. Le commit est une opération atomique. Utilisez ce mode pour les charges de travail par lot, plutôt que les tâches de chargement BigQuery. Pour en savoir plus, consultez la page Charger des données par lot à l'aide de l'API Storage Write.

  • Mode commit : les enregistrements sont disponibles pour lecture immédiate dès que vous les écrivez dans le flux. Utilisez ce mode pour les charges de travail par flux qui nécessitent une latence minimale en lecture. Le flux par défaut utilise toujours le mode commit. Vous pouvez créer un flux en mode commit si vous avez besoin d'une sémantique de type "exactement une fois". Pour plus d'informations, consultez la section Utiliser le mode commit pour la sémantique de type "exactement une fois".

  • Mode de mise en mémoire tampon Le mode de mise en mémoire tampon est un mode avancé conçu pour le connecteur d'E/S BigQuery Apache Beam. Ce mode fournit des commits au niveau des lignes. Les enregistrements sont mis en mémoire tampon jusqu'au commit des lignes lors du vidage du flux. En règle générale, les applications ne doivent pas utiliser ce mode. Si vous souhaitez que des petits lots soient affichés ensemble, utilisez le mode commit et envoyez chaque lot en une seule requête.

Flux de l'API

Cette section décrit le flux global de l'API. La séquence d'appels dépend du mode de traitement par flux :

Flux par défaut :

  1. AppendRows (boucle)

Mode en attente :

  1. CreateWriteStream
  2. AppendRows (boucle)
  3. FinalizeWriteStream
  4. BatchCommitWriteStreams

Mode commit :

  1. CreateWriteStream
  2. AppendRows (boucle)
  3. FinalizeWriteStream (facultatif)

Mode de mise en mémoire tampon :

  1. CreateWriteStream
  2. AppendRows ⇒ FlushRows (boucle)
  3. FinalizeWriteStream (facultatif)

La méthode AppendRows ajoute un ou plusieurs enregistrements au flux. Le premier appel à AppendRows doit contenir un nom de flux avec le schéma de données, spécifié en tant que DescriptorProto. Nous vous recommandons d'envoyer un lot de lignes dans chaque appel AppendRows. N'envoyez pas une ligne à la fois.

Le message de tampon de protocole ne peut pas utiliser de spécificateur package, et tous les types imbriqués ou d'énumération doivent être définis dans le message racine de premier niveau. Les références à des messages externes ne sont pas autorisées. Pour obtenir un exemple, consultez la page sample_data.proto. Les clients Java et Go acceptent les tampons de protocole arbitraires, car la bibliothèque cliente normalise le schéma du tampon de protocole.

La méthode FinalizeWriteStream finalise le flux afin qu'aucune nouvelle donnée ne puisse être ajoutée. Cette méthode est requise pour le mode en attente et facultative pour les modes commit et de mise en mémoire tampon. Le flux par défaut n'est pas compatible avec cette méthode.

Si une erreur se produit, l'élément google.rpc.Status renvoyé peut inclure une erreur StorageError dans les détails d'erreur. Consultez le StorageErrorCode pour trouver le type d'erreur spécifique. Pour en savoir plus sur le modèle d'erreur des API Google, consultez la page Erreurs.

Connexions

L'API Storage Write est une API gRPC qui utilise des connexions bidirectionnelles. La méthode AppendRows crée une connexion à un flux. Vous pouvez ouvrir plusieurs connexions sur le flux par défaut. Les flux créés par une application ne peuvent avoir qu'une seule connexion active. Nous vous recommandons de limiter le nombre de connexions actives.

En règle générale, une seule connexion accepte un débit d'au moins 1 Mo/s. La limite supérieure dépend de plusieurs facteurs, tels que la bande passante réseau, le schéma des données et la charge du serveur, mais elle peut dépasser 10 Mo/s. Si vous avez besoin de plus de débit, créez davantage de connexions.

BigQuery ferme la connexion gRPC si elle reste inactive trop longtemps. Dans ce cas, le code de réponse est HTTP 409. La connexion gRPC peut également être fermée en cas de redémarrage du serveur ou pour d'autres raisons. Si une erreur de connexion se produit, créez une connexion. Les bibliothèques clientes Java et Go se reconnectent automatiquement si la connexion est fermée.

Compatibilité avec les bibliothèques clientes

Vous pouvez utiliser l'API Storage Write en appelant directement l'API gRPC ou en utilisant l'une des bibliothèques clientes disponibles pour Java, Python et Go. En général, nous vous recommandons d'utiliser une bibliothèque cliente, car elle fournit une interface de programmation plus simple et gère le RPC de streaming bidirectionnel sous-jacent pour vous.

Client Java

La bibliothèque cliente Java fournit deux objets "writer" :

  • StreamWriter : accepte les données au format de tampon de protocole.

  • JsonStreamWriter : accepte les données au format JSON et les convertit en tampons de protocole avant de les envoyer via le réseau. La fonction JsonStreamWriter est également compatible avec les mises à jour automatiques de schéma. Si le schéma de la table change, le rédacteur se reconnecte automatiquement au nouveau schéma, ce qui permet au client d'envoyer des données à l'aide du nouveau schéma.

Le modèle de programmation est similaire pour les deux objets "writer". La principale différence réside dans la mise en forme de la charge utile.

L'objet rédacteur gère une connexion à l'API Storage Write. L'objet rédacteur nettoie automatiquement les requêtes, ajoute les en-têtes de routage régionaux aux requêtes, puis se reconnecte après les erreurs de connexion. Si vous utilisez directement l'API gRPC, vous devez gérer ces détails.

Client Python

Le client Python est un client de niveau inférieur qui encapsule l'API gRPC. Pour utiliser ce client, vous devez envoyer les données sous forme de tampons de protocole, comme décrit dans le flux d'API.

Pour en savoir plus sur l'utilisation des tampons de protocole avec Python, consultez le tutoriel sur les principes de base des tampons de protocole dans Python.

Conversions de types de données

Le tableau suivant indique les types de tampons de protocole acceptés pour chaque type de données BigQuery :

Type de données BigQuery Types de tampons de protocole compatibles
BOOL bool, int32, int64, uint32, uint64
BYTES bytes
DATE int32 (rôle à privilégier), int64

La valeur représente le nombre de jours écoulés depuis l'époque Unix (1970-01-01). La plage valide est comprise entre -719162` (0001-01-01) à 2932896 (9999-12-31).

DATETIME, TIME string

La valeur doit être une valeur littérale DATETIME ou TIME.

int64

Utilisez la classe CivilTimeEncoder pour effectuer la conversion.

FLOAT double, float
GEOGRAPHY string

Cette valeur correspond à une géométrie au format WKT ou GeoJson.

INTEGER int32, int64, uint32, enum
JSON (bêta) string
NUMERIC, BIGNUMERIC int32, int64, uint32, uint64, double, float, string
bytes

Utilisez la classe BigDecimalByteStringEncoder pour effectuer la conversion.

STRING string, enum
TIME string

La valeur doit être une valeur littérale TIME.

TIMESTAMP int64 (rôle à privilégier), int32, uint32

La valeur est donnée en microsecondes depuis l'époque Unix (1970-01-01).

REPEATED FIELD array

Un type de tableau dans le fichier proto correspond à un champ répété dans BigQuery.

RECORD message

Un type de message imbriqué dans le fichier proto correspond à un champ d'enregistrement dans BigQuery.

Limites du langage de manipulation de données (LMD)

Vous ne pouvez pas utiliser les instructions UPDATE, DELETE ou MERGE pour modifier les lignes qui ont été écrites dans une table par l'API BigQuery Storage Write au cours des 30 dernières minutes. Vous pouvez utiliser ces instructions pour modifier toutes les autres lignes.

Quotas de l'API Storage Write

Pour en savoir plus sur les quotas et limites de l'API Storage Write, consultez la page Requêtes de l'API BigQuery Storage Write.

Tarifs de l'API Storage Write

Pour connaître les tarifs, consultez la section Tarifs de l'ingestion de données.

Étape suivante