Ce document explique comment lire des données de BigQuery vers Dataflow à l'aide du connecteur d'E/S BigQuery d'Apache Beam.
Présentation
Le connecteur d'E/S BigQuery accepte deux options de lecture à partir de BigQuery :
- Lectures de table directes. Cette option est la plus rapide, car elle utilise l'API BigQuery Storage Read.
- Jobs d'exportation. Avec cette option, BigQuery exécute un job d'exportation qui écrit les données de la table dans Cloud Storage. Le connecteur lit ensuite les données exportées à partir de Cloud Storage. Cette option est moins efficace, car elle nécessite l'étape d'exportation.
Les jobs d'exportation sont l'option par défaut. Pour spécifier des lectures directes, appelez la méthode withMethod(Method.DIRECT_READ)
.
Le connecteur sérialise les données de la table dans un objet PCollection
. Chaque élément présent dans PCollection
représente une seule ligne de table. Le connecteur est compatible avec les méthodes de sérialisation suivantes :
- Lire les données sous forme d'enregistrements au format Avro. Cette méthode permet de fournir une fonction qui analyse les enregistrements Avro dans un type de données personnalisé.
- Lire les données sous forme d'objets
TableRow
. Cette méthode est pratique car elle ne nécessite pas de type de données personnalisé. Cependant, elle est généralement moins performante que la lecture d'enregistrements au format Avro.
Parallélisme
Le parallélisme dans ce connecteur dépend de la méthode de lecture :
Lectures directes : le connecteur d'E/S génère un nombre dynamique de flux, en fonction de la taille de la requête d'exportation. Il lit ces flux directement à partir de BigQuery en parallèle.
Jobs d'exportation : BigQuery détermine le nombre de fichiers à écrire dans Cloud Storage. Le nombre de fichiers dépend de la requête et du volume de données. Le connecteur d'E/S lit les fichiers exportés en parallèle.
Performances
Le tableau suivant présente les métriques de performances de diverses options de lecture d'E/S BigQuery. Les charges de travail ont été exécutées sur un nœud de calcul e2-standard2
à l'aide du SDK Apache Beam 2.49.0 pour Java. Elles n'ont pas utilisé l'exécuteur v2.
100 millions d'enregistrements | 1 ko | 1 colonne | Débit (octets) | Débit (éléments) |
---|---|---|
Lecture du stockage | 120 Mbit/s | 88 000 éléments par seconde |
Exportation Avro | 105 Mbit/s | 78 000 éléments par seconde |
Exportation JSON | 110 Mbit/s | 81 000 éléments par seconde |
Ces métriques sont basées sur des pipelines de traitement par lot simples. Elles ont pour but de comparer les performances entre les connecteurs d'E/S et ne sont pas nécessairement représentatives des pipelines réels. Les performances des pipelines Dataflow sont complexes et dépendent du type de machine virtuelle, des données traitées, des performances des sources et des récepteurs externes, ainsi que du code utilisateur. Les métriques sont basées sur l'exécution du SDK Java et ne sont pas représentatives des caractéristiques de performances des SDK d'autres langages. Pour en savoir plus, consultez la page Performances d'E/S Beam.
Bonnes pratiques
En général, nous vous recommandons d'utiliser des lectures de tables directes (
Method.DIRECT_READ
). L'API Storage Read est plus adaptée aux pipelines de données que les jobs d'exportation, car elle ne nécessite pas d'étape intermédiaire d'exportation des données.Si vous avez recours aux lectures directes, l'utilisation de l'API Storage Read vous est facturée. Consultez la section Tarifs de l'extraction de données sur la page des tarifs de BigQuery.
Aucuns frais supplémentaires ne sont facturés pour les jobs d'exportation. Ils sont toutefois soumis à des limites. Pour les transferts de données volumineux, où la rapidité est une priorité et le coût est ajustable, les lectures directes sont recommandées.
L'API Storage Read est soumise à des limites de quota. Utilisez les métriques Google Cloud pour surveiller votre utilisation du quota.
Lorsque vous utilisez l'API Storage Read, des erreurs d'expiration de bail et d'expiration de session peuvent s'afficher dans les journaux, par exemple :
DEADLINE_EXCEEDED
Server Unresponsive
StatusCode.FAILED_PRECONDITION details = "there was an error operating on 'projects/<projectID>/locations/<location>/sessions/<sessionID>/streams/<streamID>': session
`
Ces erreurs peuvent se produire lorsqu'une opération prend plus de temps que le délai avant expiration, généralement dans les pipelines qui s'exécutent pendant plus de six heures. Pour résoudre ce problème, passez aux exportations de fichiers.
Lorsque vous utilisez le SDK Java, envisagez de créer une classe qui représente le schéma de la table BigQuery. Appelez ensuite
useBeamSchema
dans votre pipeline pour permettre la conversion automatique entre les types Apache BeamRow
et BigQueryTableRow
. Pour obtenir un exemple de classe de schéma, consultez la sectionExampleModel.java
.
Exemples
Les exemples de code de cette section utilisent des lectures directes de table.
Pour utiliser un job d'exportation, omettez l'appel à withMethod
ou spécifiez Method.EXPORT
. Définissez ensuite l'option de pipeline --tempLocation
afin de spécifier un bucket Cloud Storage pour les fichiers exportés.
Ces exemples de code partent du principe que la table source contient les colonnes suivantes :
name
(chaîne)age
(entier)
Spécifié en tant que fichier de schéma JSON :
[
{"name":"user_name","type":"STRING","mode":"REQUIRED"},
{"name":"age","type":"INTEGER","mode":"REQUIRED"}
]
Lire des enregistrements au format Avro
Pour lire des données BigQuery dans des enregistrements au format Avro, utilisez la méthode read(SerializableFunction)
. Cette méthode utilise une fonction définie par l'application qui analyse les objets SchemaAndRecord
et renvoie un type de données personnalisé. La sortie du connecteur est une PCollection
de votre type de données personnalisé.
Le code suivant lit un PCollection<MyData>
à partir d'une table BigQuery, où MyData
est une classe définie par une application.
Java
Pour vous authentifier auprès de Dataflow, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.
La méthode read
utilise une interface SerializableFunction<SchemaAndRecord, T>
, qui définit une fonction permettant de convertir des enregistrements Avro en classe de données personnalisée. Dans l'exemple de code précédent, la méthode MyData.apply
implémente cette fonction de conversion. L'exemple de fonction analyse les champs name
et age
de l'enregistrement Avro et renvoie une instance MyData
.
Pour spécifier la table BigQuery à lire, appelez la méthode from
, comme indiqué dans l'exemple précédent. Pour en savoir plus, consultez la section Noms de tables dans la documentation du connecteur d'E/S BigQuery.
Lire des objets TableRow
La méthode readTableRows
lit les données BigQuery dans une PCollection
d'objets TableRow
. Chaque TableRow
est un mappage de paires clé/valeur qui contient une seule ligne de données de table. Spécifiez la table BigQuery à lire en appelant la méthode from
.
Le code suivant lit un PCollection<TableRows>
à partir d'une table BigQuery.
Java
Pour vous authentifier auprès de Dataflow, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.
Cet exemple montre également comment accéder aux valeurs à partir du dictionnaire TableRow
.
Les valeurs entières sont encodées sous forme de chaînes afin de respecter le format JSON exporté par BigQuery.
Projection et filtrage de colonnes
Lorsque vous utilisez des lectures directes (Method.DIRECT_READ
), vous pouvez rendre les opérations de lecture plus efficaces en réduisant la quantité de données lues depuis BigQuery et envoyées sur le réseau.
- Projection de colonne : appelez
withSelectedFields
pour lire un sous-ensemble de colonnes de la table. Cela permet des lectures efficaces lorsque les tables contiennent de nombreuses colonnes. - Filtrage de ligne : appelez
withRowRestriction
pour spécifier un prédicat qui filtre les données côté serveur.
Les prédicats de filtre doivent être déterministes, et l'agrégation n'est pas disponible.
L'exemple suivant projette les colonnes "user_name"
et "age"
, et filtre les lignes qui ne correspondent pas au prédicat "age > 18"
.
Java
Pour vous authentifier auprès de Dataflow, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.
Lire les données d'un résultat de requête
Les exemples précédents montrent comment lire les lignes d'une table. Vous pouvez également lire les résultats d'une requête SQL en appelant fromQuery
. Cette approche déplace une partie du travail de calcul dans BigQuery. Vous pouvez également utiliser cette méthode pour lire à partir d'une vue BigQuery ou une vue matérialisée en exécutant une requête sur la vue.
L'exemple suivant exécute une requête sur un ensemble de données public BigQuery et lit les résultats. Une fois le pipeline exécuté, vous pouvez voir le job de requête dans votre historique des jobs BigQuery.
Java
Pour vous authentifier auprès de Dataflow, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.
Étape suivante
- Consultez la documentation sur le connecteur d'E/S BigQuery.
- Consultez la liste des modèles fournis par Google.