Dataflow est un service géré permettant de transformer et d'enrichir des données. Le connecteur Dataflow pour Spanner vous permet de lire des données depuis et écrire des données dans Spanner dans un pipeline Dataflow, et éventuellement transformer ou modifier les données. Vous pouvez aussi créer des pipelines qui transfèrent des données entre Spanner et d'autres produits Google Cloud.
Le connecteur Dataflow est la méthode recommandée pour optimiser déplacer des données vers et depuis Spanner de façon groupée, des transformations en base de données qui ne sont pas compatibles avec le LMD partitionné, tels que les déplacements de tableau, les suppressions groupées nécessitant une jointure, etc. Lorsque vous travaillez avec des bases de données individuelles, il existe d'autres méthodes que vous pouvez utiliser pour importer et exporter des données:
- Utiliser la console Google Cloud pour exporter une base de données individuelle depuis Spanner vers Cloud Storage au format Avro.
- Utiliser la console Google Cloud pour import une base de données dans Spanner à partir des fichiers que vous avez exportés vers Cloud Storage.
- Utilisez l'API REST ou la Google Cloud CLI pour exécuter une exportation ou import des jobs de Spanner vers Cloud Storage et inversement au format Avro).
Le connecteur Dataflow pour Spanner fait partie Le SDK Java Apache Beam, qui fournit une API pour effectuer les opérations ci-dessus actions. Pour en savoir plus sur certains des concepts abordés ci-dessous, tels que en tant qu'objets PCollection et transformations, consultez le guide de programmation d'Apache Beam.
Ajouter le connecteur à votre projet Maven
Pour ajouter le connecteur Google Cloud Dataflow à un projet Maven, ajoutez l'artefact Maven beam-sdks-java-io-google-cloud-platform
au fichier pom.xml
, sous la forme d'une dépendance :
Par exemple, en supposant que votre fichier pom.xml
définit la valeur de beam.version
sur le numéro de version approprié, vous ajouteriez alors la dépendance suivante :
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
<version>${beam.version}</version>
</dependency>
Lire les données de Spanner
Pour lire des données issues de Spanner, appliquez la transformation SpannerIO.read().
Configurez la lecture à l'aide des méthodes de la classe SpannerIO.Read
.
L'application de la transformation renvoie une classe PCollection<Struct>
où chaque élément représente une ligne individuelle affichée par l'opération de lecture. Vous pouvez lire les données issues de Spanner avec ou sans requête SQL spécifique, selon le résultat que vous souhaitez.
L'application de la transformation SpannerIO.read()
renvoie une vue cohérente des données en effectuant une lecture forte. Sauf indication contraire de votre part, le résultat de la lecture est instantané par rapport au moment où vous démarrez la lecture. Consultez la section concernant les lectures pour en savoir plus sur les différents types de lectures que Spanner peut effectuer.
Lire des données à l'aide d'une requête
Pour lire un ensemble spécifique de données de Spanner, configurez la transformation
à l'aide de la méthode SpannerIO.Read.withQuery()
pour spécifier un code SQL ;
requête. Exemple :
Lire des données sans spécifier de requête
Pour lire les données d'une base de données sans utiliser de requête, vous pouvez spécifier une table à l'aide de la méthode SpannerIO.Read.withTable() et spécifiez un liste de colonnes à lire à l'aide de la méthode SpannerIO.Read.withColumns() . Exemple :
GoogleSQL
PostgreSQL
Pour limiter le nombre de lignes lues, vous pouvez spécifier un ensemble de clés primaires à lire à l'aide de la méthode SpannerIO.Read.withKeySet().
Vous pouvez également lire une table à l'aide d'un index secondaire spécifié. Comme pour appel d'API readUsingIndex(), l'index doit contenir toutes les données qui apparaît dans les résultats de la requête.
Pour ce faire, spécifiez le tableau comme indiqué dans l'exemple précédent, puis l'index contenant les valeurs de colonne souhaitées à l'aide de la méthode SpannerIO.Read.withIndex()
. L'index doit stocker l'intégralité
les colonnes que la transformation doit lire. La clé primaire de la table de base est
stockées implicitement. Par exemple, pour lire la table Songs
à l'aide de l'index SongsBySongName
, utilisez le code suivant :
GoogleSQL
PostgreSQL
Contrôler l'obsolescence des données de transaction
Vous avez la garantie qu'une transformation s'exécute sur un instantané cohérent de données. Pour contrôler l'obsolescence des données, appelez la méthode SpannerIO.Read.withTimestampBound()
Consultez la page des transactions pour en savoir plus.
Lire à partir de plusieurs tables dans la même transaction
Si vous souhaitez lire les données issues de plusieurs tables au même moment afin d'assurer la cohérence des données, utilisez une seule transaction. Pour ce faire, appliquez une transformation createTransaction()
qui crée un objet PCollectionView<Transaction>
qui, à son tour, crée une transaction. La vue résultante peut être transmise à une opération de lecture à l'aide de SpannerIO.Read.withTransaction()
.
GoogleSQL
PostgreSQL
Lire les données de toutes les tables disponibles
Vous pouvez lire les données de toutes les tables disponibles dans une base de données Spanner.
GoogleSQL
PostgreSQL
Résoudre les problèmes de requêtes non compatibles
Le connecteur Dataflow n'est compatible qu'avec les requêtes SQL Spanner dans lesquelles le premier opérateur du plan d'exécution de la requête est une union distribuée. Si vous tentez de lire des données de Spanner à l'aide d'une requête
vous obtenez une exception indiquant que la requête does not have a DistributedUnion at
the root
, suivez les étapes de la section Comprendre le fonctionnement de Spanner
requêtes afin de récupérer un plan d'exécution pour votre requête à l'aide de
console Google Cloud.
Si la requête SQL n'est pas compatible, simplifiez-la par une requête dont l'union distribuée est le premier opérateur dans son plan d'exécution. Supprimez les fonctions d'agrégat, les jointures de table ainsi que les opérateurs DISTINCT
, GROUP BY
et ORDER
. Ce sont les opérateurs les plus susceptibles d'empêcher le fonctionnement de la requête.
Créer des mutations pour une écriture
Utiliser la classe Mutation
newInsertOrUpdateBuilder()
au lieu de
Méthode newInsertBuilder()
sauf si cela est absolument nécessaire pour les pipelines Java. Pour les pipelines Python, utilisez
SpannerInsertOrUpdate()
au lieu de
SpannerInsert()
Dataflow offre
au moins une fois, ce qui signifie que la mutation peut être écrite
plusieurs fois. Par conséquent, INSERT
seules les mutations peuvent générer
com.google.cloud.spanner.SpannerException: ALREADY_EXISTS
erreurs à l'origine de
l'échec du pipeline. Pour éviter cette erreur, utilisez le INSERT_OR_UPDATE
.
, qui ajoute une ligne ou met à jour les valeurs des colonnes si la ligne
existe déjà. La mutation INSERT_OR_UPDATE
peut être appliquée plusieurs fois.
Écrire dans Spanner et transformer des données
Vous pouvez écrire des données dans Spanner à l'aide du connecteur Dataflow en appliquant une transformation SpannerIO.write()
pour exécuter une collection de mutations de ligne d'entrée. Les groupes de connecteurs Dataflow
en lots pour plus d'efficacité.
L'exemple suivant montre comment appliquer une transformation d'écriture à une collection PCollection
de mutations :
GoogleSQL
PostgreSQL
Si une transformation s'arrête de manière inattendue avant la fin, les mutations qui ont déjà appliquée ne feront pas l'objet d'un rollback.
Appliquer des groupes de mutations de manière atomique
Vous pouvez utiliser la classe MutationGroup
pour vous assurer qu'un groupe de mutations est appliqué de manière atomique. Vous avez la garantie que les mutations d'un MutationGroup
sont envoyées dans la même transaction, mais la transaction est susceptible d'être relancée.
Les groupes de mutations fonctionnent mieux lorsqu'ils sont utilisés pour regrouper les mutations qui affectent les données stockées les unes près des autres dans l'espace clé. Étant donné que Spanner intercale les données des tables parentes et enfants dans la table parente, ces données sont toujours proches dans l'espace clé. Nous vous recommandons de structurer votre groupe de mutations de sorte qu'il contienne une mutation appliquée à une table parente et d'autres mutations appliquées aux tables enfants, ou que toutes ses mutations modifient des données proches les unes des autres dans l'espace clé. Pour savoir comment Spanner stocke les ressources données des tables enfants, consultez la section Schéma et modèle de données. Si vous n'organisez pas groupes de mutations autour des hiérarchies de tables recommandées, ou si les données ne sont pas proches les uns des autres dans l'espace clé, Spanner peut effectuer des commits en deux phases, ce qui ralentit les performances. Pour en savoir plus, consultez la section concernant les compromis de localité.
Pour utiliser la classe MutationGroup
, créez une transformation SpannerIO.write()
et appelez la méthode SpannerIO.Write.grouped()
, qui affiche une transformation que vous pouvez ensuite appliquer à une collection PCollection
d'objets MutationGroup
.
Lors de la création d'un objet MutationGroup
, la première mutation répertoriée devient la mutation principale. Si votre groupe de mutations affecte à la fois une table parente et une table enfant, la mutation principale devrait être une mutation à la table parente. Sinon, vous pouvez utiliser n'importe quelle mutation comme mutation principale. Le connecteur Dataflow utilise la mutation principale pour identifier les limites de la partition, afin de regrouper efficacement les mutations.
Par exemple, imaginez que votre application surveille le comportement et signale un comportement utilisateur problématique à examiner. Pour chaque comportement signalé, vous souhaitez mettre à jour la table Users
pour bloquer l'accès de l'utilisateur à votre application. Vous devez également enregistrer l'incident dans la table PendingReviews
. Pour vous assurer que les deux tables sont mises à jour de manière atomique, utilisez un MutationGroup
:
GoogleSQL
PostgreSQL
Lorsque vous créez un groupe de mutations, la première mutation fournie comme argument devient la mutation principale. Dans ce cas, les deux tables ne sont pas liées, la mutation principale n'est donc pas clairement définie. Nous avons choisi userMutation
comme mutation principale en la plaçant en premier. L'application des deux mutations séparément serait plus rapide, mais ne garantirait pas l'atomicité. Le groupe de mutations est donc le meilleur choix dans cette situation.
Étapes suivantes
- Découvrez comment concevoir un pipeline de données Apache Beam.
- Exporter et import des bases de données Spanner dans la console Google Cloud à l'aide de Dataflow ;