Dataflow est un service géré permettant de transformer et d'enrichir des données. Le connecteur Dataflow pour Spanner vous permet de lire des données issues de Spanner et d'écrire des données vers Spanner dans un pipeline Dataflow, en transformant ou en modifiant les données de manière optionnelle. Vous pouvez aussi créer des pipelines qui transfèrent des données entre Spanner et d'autres produits Google Cloud.
Le connecteur Dataflow est la méthode recommandée pour optimiser déplacer des données vers et depuis Spanner de façon groupée, des transformations en base de données non compatibles avec le LMD partitionné ; tels que les déplacements de tableau, les suppressions groupées nécessitant une jointure, etc. Lorsque vous travaillez avec des bases de données individuelles, vous pouvez utiliser d'autres méthodes pour importer et exporter des données :
- Utiliser la console Google Cloud pour exporter une base de données individuelle de Spanner vers Cloud Storage au format Avro.
- Utiliser la console Google Cloud pour import une base de données dans Spanner à partir des fichiers que vous avez exportés vers Cloud Storage.
- Utiliser l'API REST ou Google Cloud CLI pour exécuter des tâches d'exportation ou d'importation depuis Spanner vers Cloud Storage, et inversement (également au format Avro).
Le connecteur Dataflow pour Spanner fait partie Le SDK Java Apache Beam, qui fournit une API pour effectuer les opérations ci-dessus actions. Pour en savoir plus sur certains des concepts abordés ci-dessous, tels que les objets et les transformations PCollection, consultez le guide de programmation d'Apache Beam.
Ajouter le connecteur à votre projet Maven
Pour ajouter le connecteur Google Cloud Dataflow à un projet Maven, ajoutez l'artefact Maven beam-sdks-java-io-google-cloud-platform
au fichier pom.xml
, sous la forme d'une dépendance :
Par exemple, en supposant que votre fichier pom.xml
définit la valeur de beam.version
sur le numéro de version approprié, vous ajouteriez alors la dépendance suivante :
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
<version>${beam.version}</version>
</dependency>
Lire les données de Spanner
Pour lire des données issues de Spanner, appliquez la transformation SpannerIO.read().
Configurez la lecture à l'aide des méthodes de la classe SpannerIO.Read
.
L'application de la transformation renvoie une classe PCollection<Struct>
où chaque élément représente une ligne individuelle affichée par l'opération de lecture. Vous pouvez lire les données issues de Spanner avec ou sans requête SQL spécifique, selon le résultat que vous souhaitez.
L'application de la transformation SpannerIO.read()
renvoie une vue cohérente des données en effectuant une lecture forte. Sauf indication contraire de votre part, le résultat de la lecture est instantané par rapport au moment où vous démarrez la lecture. Consultez la section concernant les lectures pour en savoir plus sur les différents types de lectures que Spanner peut effectuer.
Lire des données à l'aide d'une requête
Pour lire un ensemble spécifique de données issues de Spanner, configurez la transformation en appelant la méthode SpannerIO.Read.withQuery()
pour spécifier une requête SQL. Exemple :
Lire des données sans spécifier de requête
Pour lire les données d'une base de données sans utiliser de requête, vous pouvez spécifier un nom de table à l'aide de la méthode SpannerIO.Read.withTable() et une liste de colonnes à lire à l'aide de la méthode SpannerIO.Read.withColumns(). Exemple :
GoogleSQL
PostgreSQL
Pour limiter les lignes lues, vous pouvez spécifier un ensemble de clés primaires à lire à l'aide de la méthode SpannerIO.Read.withKeySet().
Vous pouvez également lire une table à l'aide d'un indice secondaire spécifié. Comme pour l'appel d'API readUsingIndex(), l'index doit contenir toutes les données qui apparaissent dans les résultats de la requête.
Pour ce faire, spécifiez le tableau comme indiqué dans l'exemple précédent, puis l'index contenant les valeurs de colonne souhaitées à l'aide de la méthode SpannerIO.Read.withIndex()
. L'index doit stocker toutes les colonnes que la transformation doit lire. La clé primaire de la table de base est
stockées implicitement. Par exemple, pour lire la table Songs
à l'aide de l'index SongsBySongName
, utilisez le code suivant :
GoogleSQL
PostgreSQL
Contrôler l'obsolescence des données de transaction
Vous avez la garantie qu'une transformation s'exécute sur un instantané cohérent de données. Pour contrôler l'obsolescence des données, appelez la méthode SpannerIO.Read.withTimestampBound()
Consultez la page des transactions pour en savoir plus.
Lire à partir de plusieurs tables dans la même transaction
Si vous souhaitez lire les données issues de plusieurs tables au même moment afin d'assurer la cohérence des données, utilisez une seule transaction. Pour ce faire, appliquez une transformation createTransaction()
qui crée un objet PCollectionView<Transaction>
qui, à son tour, crée une transaction. La vue résultante peut être transmise à une opération de lecture à l'aide de SpannerIO.Read.withTransaction()
.
GoogleSQL
PostgreSQL
Lire les données de toutes les tables disponibles
Vous pouvez lire les données de toutes les tables disponibles dans une base de données Spanner.
GoogleSQL
PostgreSQL
Résoudre les problèmes de requêtes non compatibles
Le connecteur Dataflow n'est compatible qu'avec les requêtes SQL Spanner dans lesquelles le premier opérateur du plan d'exécution de la requête est une union distribuée. Si vous tentez de lire des données issues de Spanner avec une requête et que vous obtenez une exception indiquant que la requête does not have a DistributedUnion at
the root
, suivez les étapes de la section Comprendre comment Spanner exécute les requêtes afin de récupérer un plan d'exécution pour la requête à l'aide de la console Google Cloud.
Si la requête SQL n'est pas compatible, simplifiez-la par une requête dont l'union distribuée est le premier opérateur dans son plan d'exécution. Supprimez les fonctions d'agrégat, les jointures de table ainsi que les opérateurs DISTINCT
, GROUP BY
et ORDER
. Ce sont les opérateurs les plus susceptibles d'empêcher le fonctionnement de la requête.
Créer des mutations pour une écriture
Utiliser la classe Mutation
newInsertOrUpdateBuilder()
au lieu de
Méthode newInsertBuilder()
sauf si cela est absolument nécessaire pour les pipelines Java. Pour les pipelines Python, utilisez
SpannerInsertOrUpdate()
au lieu de
SpannerInsert()
Dataflow propose
au moins une fois, ce qui signifie que la mutation peut être écrite
plusieurs fois. Par conséquent, seules les mutations INSERT
peuvent générer des erreurs com.google.cloud.spanner.SpannerException: ALREADY_EXISTS
qui entraînent l'échec du pipeline. Pour éviter cette erreur, utilisez plutôt la mutation INSERT_OR_UPDATE
, qui ajoute une ligne ou met à jour les valeurs de colonne si la ligne existe déjà. La mutation INSERT_OR_UPDATE
peut être appliquée plusieurs fois.
Écrire dans Spanner et transformer des données
Vous pouvez écrire des données dans Spanner à l'aide du connecteur Dataflow en appliquant une transformation SpannerIO.write()
pour exécuter une collection de mutations de ligne d'entrée. Les groupes de connecteurs Dataflow
en lots pour plus d'efficacité.
L'exemple suivant montre comment appliquer une transformation d'écriture à une collection PCollection
de mutations :
GoogleSQL
PostgreSQL
Si une transformation s'arrête de manière inattendue avant la fin, les mutations qui ont déjà été appliquées ne seront pas annulées.
Appliquer des groupes de mutations de manière atomique
Vous pouvez utiliser la classe MutationGroup
pour vous assurer qu'un groupe de mutations est appliqué de manière atomique. Vous avez la garantie que les mutations d'un MutationGroup
sont envoyées dans la même transaction, mais la transaction est susceptible d'être relancée.
Les groupes de mutations fonctionnent mieux lorsqu'ils sont utilisés pour regrouper les mutations qui affectent les données stockées les unes près des autres dans l'espace clé. Étant donné que Spanner entre les données des tables parentes et enfants dans la table parente, ces données sont toujours proches les uns des autres dans l’espace clé. Nous vous recommandons de structurer votre groupe de mutations de sorte qu'il contienne une mutation appliquée à une table parente et d'autres mutations appliquées aux tables enfants, ou que toutes ses mutations modifient des données proches les unes des autres dans l'espace clé. Pour savoir comment Spanner stocke les données des tables parentes et enfants, consultez la section Schéma et modèle de données. Si vous n'organisez pas groupes de mutations autour des hiérarchies de tables recommandées, ou si les données ne sont pas proches les uns des autres dans l'espace clé, Spanner peut effectuer des commits en deux phases, ce qui ralentit les performances. Pour en savoir plus, consultez la section concernant les compromis de localité.
Pour utiliser la classe MutationGroup
, créez une transformation SpannerIO.write()
et appelez la méthode SpannerIO.Write.grouped()
, qui affiche une transformation que vous pouvez ensuite appliquer à une collection PCollection
d'objets MutationGroup
.
Lors de la création d'un objet MutationGroup
, la première mutation répertoriée devient la mutation principale. Si votre groupe de mutations affecte à la fois une table parente et une table enfant, la mutation principale devrait être une mutation à la table parente. Sinon, vous pouvez utiliser n'importe quelle mutation comme mutation principale. Le connecteur Dataflow utilise la mutation principale pour identifier les limites de la partition, afin de regrouper efficacement les mutations.
Par exemple, imaginez que votre application surveille le comportement et signale un comportement utilisateur problématique à examiner. Pour chaque comportement signalé, vous souhaitez mettre à jour la table Users
pour bloquer l'accès de l'utilisateur à votre application. Vous devez également enregistrer l'incident dans la table PendingReviews
. Pour vous assurer que les deux tables sont mises à jour de manière atomique, utilisez un MutationGroup
:
GoogleSQL
PostgreSQL
Lorsque vous créez un groupe de mutations, la première mutation fournie comme argument devient la mutation principale. Dans ce cas, les deux tables ne sont pas liées, la mutation principale n'est donc pas clairement définie. Nous avons choisi userMutation
comme mutation principale en la plaçant en premier. L'application des deux mutations séparément serait plus rapide, mais ne garantirait pas l'atomicité. Le groupe de mutations est donc le meilleur choix dans cette situation.
Étapes suivantes
- Découvrez comment concevoir un pipeline de données Apache Beam.
- Exporter et import des bases de données Spanner dans la console Google Cloud à l'aide de Dataflow ;