Ce document explique comment écrire des données depuis Dataflow vers BigQuery à l'aide du connecteur d'E/S BigQuery d'Apache Beam.
Le connecteur d'E/S BigQuery est disponible dans le SDK Apache Beam. Nous vous recommandons d'utiliser la dernière version du SDK. Pour en savoir plus, consultez la page SDK Apache Beam 2.x.
Python est également compatible avec Python.
Présentation
Le connecteur d'E/S BigQuery accepte les méthodes suivantes pour écrire dans BigQuery :
STORAGE_WRITE_API
Dans ce mode, le connecteur effectue des écritures directes sur l'espace de stockage BigQuery, à l'aide de l'API BigQuery Storage Write. L'API Storage Write combine l'ingestion par flux et le chargement par lot dans une seule API hautes performances. Ce mode garantit une sémantique de type "exactement une fois".STORAGE_API_AT_LEAST_ONCE
Ce mode utilise également l'API Storage Write, mais fournit une sémantique de type "au moins une fois". Ce mode permet de réduire la latence pour la plupart des pipelines. Cependant, les écritures en double sont possibles.FILE_LOADS
Dans ce mode, le connecteur écrit les données d'entrée dans les fichiers de préproduction dans Cloud Storage. Il exécute ensuite une tâche de chargement BigQuery pour charger les données dans BigQuery. Ce mode est utilisé par défaut pour lesPCollections
limitées, que l'on retrouve le plus souvent dans les pipelines de traitement par lot.STREAMING_INSERTS
Dans ce mode, le connecteur utilise l'ancienne API de streaming. Ce mode est utilisé par défaut pour lesPCollections
illimitées, mais il n'est pas recommandé pour les nouveaux projets.
Lorsque vous choisissez une méthode d'écriture, tenez compte des points suivants :
- Pour les tâches de traitement par flux, envisagez d'utiliser
STORAGE_WRITE_API
ouSTORAGE_API_AT_LEAST_ONCE
, car ces modes écrivent directement dans l'espace de stockage BigQuery, sans utiliser de fichiers de préproduction intermédiaires. - Si vous exécutez le pipeline en mode de traitement par flux de type "au moins une fois", définissez le mode d'écriture sur
STORAGE_API_AT_LEAST_ONCE
. Ce paramètre est plus efficace et correspond à la sémantique du mode de traitement par flux "au moins une fois". - Les chargements de fichiers et l'API Storage Write ont des quotas et limites différents.
- Les tâches de chargement utilisent le pool d'emplacements BigQuery partagé ou des emplacements réservés. Pour utiliser des emplacements réservés, exécutez la tâche de chargement dans un projet avec une attribution de réservation de type
PIPELINE
. Les tâches de chargement sont gratuites si vous utilisez le pool d'emplacements BigQuery partagé. Cependant, BigQuery ne garantit pas la capacité disponible du pool partagé. Pour en savoir plus, consultez la page Présentation des réservations.
Parallélisme
Pour
FILE_LOADS
etSTORAGE_WRITE_API
dans les pipelines de traitement par flux, le connecteur segmente les données en un certain nombre de fichiers ou de flux. En général, nous vous recommandons d'appelerwithAutoSharding
pour activer la segmentation automatique.Pour
FILE_LOADS
dans les pipelines de traitement par lot, le connecteur écrit les données dans des fichiers partitionnés, qui sont ensuite chargés dans BigQuery en parallèle.Pour
STORAGE_WRITE_API
dans les pipelines de traitement par lot, chaque nœud de calcul crée un ou plusieurs flux à écrire dans BigQuery, déterminé par le nombre total de segments.Pour
STORAGE_API_AT_LEAST_ONCE
, il existe un seul flux d'écriture par défaut. Plusieurs nœuds de calcul s'ajoutent à ce flux.
Performances
Le tableau suivant présente les métriques de performances de diverses options de lecture d'E/S BigQuery. Les charges de travail ont été exécutées sur un nœud de calcul e2-standard2
à l'aide du SDK Apache Beam 2.49.0 pour Java. Elles n'ont pas utilisé l'exécuteur v2.
100 millions d'enregistrements | 1 ko | 1 colonne | Débit (octets) | Débit (éléments) |
---|---|---|
Écriture du stockage | 55 Mbit/s | 54 000 éléments par seconde |
Charge Avro | 78 Mbit/s | 77 000 éléments par seconde |
Charge Json | 54 Mbit/s | 53 000 éléments par seconde |
Ces métriques sont basées sur des pipelines de traitement par lot simples. Elles ont pour but de comparer les performances entre les connecteurs d'E/S et ne sont pas nécessairement représentatives des pipelines réels. Les performances des pipelines Dataflow sont complexes et dépendent du type de machine virtuelle, des données traitées, des performances des sources et des récepteurs externes, ainsi que du code utilisateur. Les métriques sont basées sur l'exécution du SDK Java et ne sont pas représentatives des caractéristiques de performances des SDK d'autres langages. Pour en savoir plus, consultez la page Performances d'E/S Beam.
Bonnes pratiques
Cette section décrit les bonnes pratiques pour écrire dans BigQuery à partir de Dataflow.
Éléments généraux à prendre en compte
L'API Storage Write est soumise à des limites de quota. Le connecteur gère ces limites pour la plupart des pipelines. Toutefois, certains scénarios peuvent épuiser les flux d'API Storage Write disponibles. Par exemple, ce problème peut survenir dans un pipeline qui utilise la segmentation automatique et l'autoscaling avec un grand nombre de destinations, en particulier dans les tâches de longue durée avec des charges de travail très variables. Si ce problème se produit, envisagez d'utiliser
STORAGE_WRITE_API_AT_LEAST_ONCE
pour éviter le problème.Utilisez les métriques Google Cloud pour surveiller l'utilisation du quota de l'API Storage Write.
Lorsque vous utilisez des chargements de fichiers, Avro est généralement plus performant que JSON. Pour utiliser Avro, appelez
withAvroFormatFunction
.Par défaut, les tâches de chargement s'exécutent dans le même projet que la tâche Dataflow. Pour spécifier un autre projet, appelez
withLoadJobProjectId
.Lorsque vous utilisez le SDK Java, envisagez de créer une classe qui représente le schéma de la table BigQuery. Appelez ensuite
useBeamSchema
dans votre pipeline pour permettre la conversion automatique entre les types Apache BeamRow
et BigQueryTableRow
. Pour obtenir un exemple de classe de schéma, consultez la sectionExampleModel.java
.Si vous chargez des tables avec des schémas complexes contenant des milliers de champs, envisagez d'appeler
withMaxBytesPerPartition
pour définir une taille maximale inférieure pour chaque tâche de chargement.
Pipelines en streaming
Les recommandations suivantes s'appliquent aux pipelines de traitement par flux.
Pour les pipelines de traitement par flux, nous vous recommandons d'utiliser l'API Storage Write (
STORAGE_WRITE_API
ouSTORAGE_API_AT_LEAST_ONCE
).Un pipeline de traitement par flux peut utiliser des chargements de fichiers, mais cette approche présente des inconvénients :
- Il nécessite un fenêtrage pour écrire les fichiers. Vous ne pouvez pas utiliser la fenêtre globale.
- BigQuery charge les fichiers de la façon la plus optimale possible lorsque vous utilisez le pool d'emplacements partagés. Il peut s'écouler un certain temps entre le moment où un enregistrement est écrit et celui où il est disponible dans BigQuery.
- Si une tâche de chargement échoue (par exemple en raison de données incorrectes ou d'une incompatibilité de schéma), l'ensemble du pipeline échoue.
Pensez à utiliser
STORAGE_WRITE_API_AT_LEAST_ONCE
lorsque cela est possible. Cela peut entraîner l'écriture d'enregistrements en double dans BigQuery, mais reste moins coûteux et plus évolutif queSTORAGE_WRITE_API
.En général, évitez d'utiliser
STREAMING_INSERTS
. Les insertions en flux continu sont plus coûteuses que l'API Storage Write et sont moins performantes.La segmentation des données peut améliorer les performances des pipelines de traitement par flux. Pour la plupart des pipelines, la segmentation automatique est un bon point de départ. Vous pouvez néanmoins ajuster la segmentation comme suit :
- Pour
STORAGE_WRITE_API
, appelezwithNumStorageWriteApiStreams
pour définir le nombre de flux d'écriture. - Pour
FILE_LOADS
, appelezwithNumFileShards
pour définir le nombre de partitions de fichiers.
- Pour
Si vous utilisez des insertions en flux continu, nous vous recommandons de définir
retryTransientErrors
comme stratégie de nouvelle tentative.
Pipelines par lots
Les recommandations suivantes s'appliquent aux pipelines de traitement par lot.
Pour la plupart des pipelines par lots volumineux, nous vous recommandons d'essayer d'abord
FILE_LOADS
. Un pipeline par lots peut utiliserSTORAGE_WRITE_API
, mais il est susceptible de dépasser les limites de quota à grande échelle (plus de 1 000 processeurs virtuels) ou si des pipelines simultanés sont en cours d'exécution. Apache Beam ne limite pas le nombre maximal de flux d'écriture pour les tâches par lotSTORAGE_WRITE_API
. Par conséquent, la tâche atteint les limites de l'API BigQuery Storage.Lorsque vous utilisez
FILE_LOADS
, vous pouvez épuiser le pool d'emplacements BigQuery partagé ou votre pool d'emplacements réservés. Si vous rencontrez ce type de défaillance, essayez les approches suivantes:- Réduisez le nombre maximal ou la taille des nœuds de calcul pour la tâche.
- Acheter d'autres emplacements réservés.
- Envisagez d'utiliser
STORAGE_WRITE_API
.
Les pipelines de petite à moyenne taille (moins de 1 000 processeurs virtuels) peuvent tirer parti de l'utilisation de
STORAGE_WRITE_API
. Pour ces tâches plus petites, envisagez d'utiliserSTORAGE_WRITE_API
si vous souhaitez une file d'attente de lettres mortes ou lorsque le pool d'emplacements partagésFILE_LOADS
n'est pas suffisant.Si vous pouvez tolérer des données en double, envisagez d'utiliser
STORAGE_WRITE_API_AT_LEAST_ONCE
. Ce mode peut entraîner l'écriture d'enregistrements en double dans BigQuery, mais peut s'avérer moins coûteux que l'optionSTORAGE_WRITE_API
.Les différents modes d'écriture peuvent fonctionner différemment en fonction des caractéristiques de votre pipeline. Faites des tests pour trouver le mode d'écriture le mieux adapté à votre charge de travail.
Gérer les erreurs au niveau des lignes
Cette section explique comment gérer les erreurs qui peuvent se produire au niveau des lignes, par exemple en raison d'une saisie incorrecte des données d'entrée ou d'incohérences de schéma.
Pour l'API Storage Write, toutes les lignes qui ne peuvent pas être écrites sont placées dans un PCollection
distinct. Pour obtenir cette collection, appelez getFailedStorageApiInserts
sur l'objet WriteResult
. Pour obtenir un exemple de cette approche, consultez la section Insérer des données en flux continu dans BigQuery.
Il est recommandé d'envoyer les erreurs à une file d'attente ou une table de lettres mortes pour un traitement ultérieur. Pour en savoir plus sur ce modèle, consultez la section Modèle de lettre morte BigQueryIO
.
Pour FILE_LOADS
, si une erreur se produit lors du chargement des données, la tâche de chargement échoue et le pipeline génère une exception d'exécution. Vous pouvez afficher l'erreur dans les journaux Dataflow ou consulter l'historique des tâches BigQuery.
Le connecteur d'E/S ne renvoie pas d'informations sur les lignes individuelles ayant échoué.
Pour en savoir plus sur le dépannage des erreurs, consultez la section Erreurs de connecteur BigQuery.
Exemples
Les exemples suivants montrent comment utiliser Dataflow pour écrire dans BigQuery.
Écrire dans une table existante
L'exemple suivant crée un pipeline par lots qui écrit un PCollection<MyData>
dans BigQuery, où MyData
est un type de données personnalisées.
La méthode BigQueryIO.write()
renvoie un type BigQueryIO.Write<T>
, qui permet de configurer l'opération d'écriture. Pour en savoir plus, consultez la section Écrire dans une table dans la documentation Apache Beam. Cet exemple de code écrit dans une table existante (CREATE_NEVER
) et ajoute les nouvelles lignes à la table (WRITE_APPEND
).
Java
Pour vous authentifier auprès de Dataflow, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.
Écrire dans une table nouvelle ou existante
L'exemple suivant crée une table si la table de destination n'existe pas, en définissant la disposition de création sur CREATE_IF_NEEDED
. Lorsque vous utilisez cette option, vous devez fournir un schéma de table. Le connecteur utilise ce schéma s'il crée une table.
Java
Pour vous authentifier auprès de Dataflow, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.
Insérer des données en flux continu dans BigQuery
L'exemple suivant montre comment diffuser des données en flux continu à l'aide de la sémantique de type "exactement une fois", en définissant le mode d'écriture sur STORAGE_WRITE_API
.
Tous les pipelines de traitement par flux ne nécessitent pas une sémantique de type "exactement une fois". Par exemple, vous pouvez peut-être supprimer manuellement les doublons dans la table de destination. Si la possibilité d'enregistrements en double est acceptable pour votre scénario, envisagez d'utiliser la sémantique de type "au moins une fois" en définissant la méthode d'écriture sur STORAGE_API_AT_LEAST_ONCE
. Cette méthode est généralement plus efficace et permet de réduire la latence pour la plupart des pipelines.
Java
Pour vous authentifier auprès de Dataflow, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.
Étapes suivantes
- Obtenez plus d'informations sur le connecteur d'E/S BigQuery dans la documentation Apache Beam.
- Consultez la page Insérer des données en flux continu dans BigQuery à l'aide de l'API Storage Write (article de blog).