Importer, exporter et modifier des données à l'aide de Dataflow

Dataflow est un service géré permettant de transformer et d'enrichir des données. Le connecteur Dataflow pour Spanner vous permet de lire et d'écrire des données dans Spanner dans un pipeline Dataflow, en transformant ou en modifiant les données (facultatif). Vous pouvez également créer des pipelines qui transfèrent des données entre Spanner et d'autres produits Google Cloud.

Le connecteur Dataflow est la méthode recommandée pour déplacer efficacement des données vers et depuis Spanner de façon groupée, et pour effectuer des transformations importantes dans une base de données qui ne sont pas compatibles avec le LMD partitionné, telles que les déplacements de table, les suppressions groupées nécessitant une jointure, etc. Lorsque vous travaillez avec des bases de données individuelles, vous pouvez utiliser d'autres méthodes pour importer et exporter des données:

  • Utiliser la console Google Cloud pour exporter une base de données individuelle de Spanner vers Cloud Storage au format Avro.
  • Utiliser la console Google Cloud pour import une base de données dans Spanner à partir des fichiers exportés vers Cloud Storage.
  • Utilisez l'API REST ou la Google Cloud CLI pour exécuter des tâches d'exportation ou d'import depuis Spanner vers Cloud Storage, et inversement (également au format Avro).

Le connecteur Dataflow pour Spanner fait partie du SDK Java Apache Beam, et fournit une API permettant d'effectuer les actions ci-dessus. Pour en savoir plus sur certains concepts abordés ci-dessous, tels que les objets PCollection et les transformations, consultez le guide de programmation d'Apache Beam.

Ajouter le connecteur à votre projet Maven

Pour ajouter le connecteur Google Cloud Dataflow à un projet Maven, ajoutez l'artefact Maven beam-sdks-java-io-google-cloud-platform au fichier pom.xml, sous la forme d'une dépendance :

Par exemple, en supposant que votre fichier pom.xml définit la valeur de beam.version sur le numéro de version approprié, vous ajouteriez alors la dépendance suivante :

<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
    <version>${beam.version}</version>
</dependency>

Lire les données de Spanner

Pour lire des données issues de Spanner, appliquez la transformation SpannerIO.read(). Configurez la lecture à l'aide des méthodes de la classe SpannerIO.Read. L'application de la transformation renvoie une classe PCollection<Struct> où chaque élément représente une ligne individuelle affichée par l'opération de lecture. Vous pouvez lire des données depuis Spanner avec et sans requête SQL spécifique, en fonction du résultat souhaité.

L'application de la transformation SpannerIO.read() renvoie une vue cohérente des données en effectuant une lecture forte. Sauf indication contraire de votre part, le résultat de la lecture est instantané par rapport au moment où vous démarrez la lecture. Pour en savoir plus sur les différents types de lectures que Spanner peut effectuer, consultez la section Lectures.

Lire des données à l'aide d'une requête

Pour lire un ensemble spécifique de données issues de Spanner, configurez la transformation à l'aide de la méthode SpannerIO.Read.withQuery() pour spécifier une requête SQL. Exemple :

// Query for all the columns and rows in the specified Spanner table
PCollection<Struct> records = pipeline.apply(
    SpannerIO.read()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withQuery("SELECT * FROM " + options.getTable()));

Lire des données sans spécifier de requête

Pour lire les données d'une base de données sans utiliser de requête, vous pouvez spécifier un nom de table à l'aide de la méthode SpannerIO.Read.withTable() et une liste de colonnes à lire à l'aide de la méthode SpannerIO.Read.withColumns(). Exemple :

GoogleSQL

// Query for all the columns and rows in the specified Spanner table
PCollection<Struct> records = pipeline.apply(
    SpannerIO.read()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withTable("Singers")
        .withColumns("singerId", "firstName", "lastName"));

PostgreSQL

// Query for all the columns and rows in the specified Spanner table
PCollection<Struct> records = pipeline.apply(
    SpannerIO.read()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withTable("singers")
        .withColumns("singer_id", "first_name", "last_name"));

Pour limiter le nombre de lignes lues, vous pouvez spécifier un ensemble de clés primaires à lire à l'aide de la méthode SpannerIO.Read.withKeySet().

Vous pouvez également lire une table à l'aide d'un index secondaire spécifié. Comme pour l'appel d'API readUsingIndex(), l'index doit contenir toutes les données qui apparaissent dans les résultats de la requête.

Pour ce faire, spécifiez la table comme indiqué dans l'exemple précédent et spécifiez l'index qui contient les valeurs de colonne souhaitées à l'aide de la méthode SpannerIO.Read.withIndex(). L'index doit stocker toutes les colonnes que la transformation doit lire. La clé primaire de la table de base est implicitement stockée. Par exemple, pour lire la table Songs à l'aide de l'index SongsBySongName, utilisez le code suivant:

GoogleSQL

// Read the indexed columns from all rows in the specified index.
PCollection<Struct> records =
    pipeline.apply(
        SpannerIO.read()
            .withInstanceId(instanceId)
            .withDatabaseId(databaseId)
            .withTable("Songs")
            .withIndex("SongsBySongName")
            // Can only read columns that are either indexed, STORED in the index or
            // part of the primary key of the Songs table,
            .withColumns("SingerId", "AlbumId", "TrackId", "SongName"));

PostgreSQL

// // Read the indexed columns from all rows in the specified index.
PCollection<Struct> records =
    pipeline.apply(
        SpannerIO.read()
            .withInstanceId(instanceId)
            .withDatabaseId(databaseId)
            .withTable("Songs")
            .withIndex("SongsBySongName")
            // Can only read columns that are either indexed, STORED in the index or
            // part of the primary key of the songs table,
            .withColumns("singer_id", "album_id", "track_id", "song_name"));

Contrôler l'obsolescence des données de transaction

Vous avez la garantie qu'une transformation s'exécute sur un instantané cohérent de données. Pour contrôler l'obsolescence des données, appelez la méthode SpannerIO.Read.withTimestampBound() Consultez la page des transactions pour en savoir plus.

Lire à partir de plusieurs tables dans la même transaction

Si vous souhaitez lire les données issues de plusieurs tables au même moment afin d'assurer la cohérence des données, utilisez une seule transaction. Pour ce faire, appliquez une transformation createTransaction() qui crée un objet PCollectionView<Transaction> qui, à son tour, crée une transaction. La vue résultante peut être transmise à une opération de lecture à l'aide de SpannerIO.Read.withTransaction().

GoogleSQL

SpannerConfig spannerConfig =
    SpannerConfig.create().withInstanceId(instanceId).withDatabaseId(databaseId);
PCollectionView<Transaction> tx =
    pipeline.apply(
        SpannerIO.createTransaction()
            .withSpannerConfig(spannerConfig)
            .withTimestampBound(TimestampBound.strong()));
PCollection<Struct> singers =
    pipeline.apply(
        SpannerIO.read()
            .withSpannerConfig(spannerConfig)
            .withQuery("SELECT SingerID, FirstName, LastName FROM Singers")
            .withTransaction(tx));
PCollection<Struct> albums =
    pipeline.apply(
        SpannerIO.read()
            .withSpannerConfig(spannerConfig)
            .withQuery("SELECT SingerId, AlbumId, AlbumTitle FROM Albums")
            .withTransaction(tx));

PostgreSQL

SpannerConfig spannerConfig =
    SpannerConfig.create().withInstanceId(instanceId).withDatabaseId(databaseId);
PCollectionView<Transaction> tx =
    pipeline.apply(
        SpannerIO.createTransaction()
            .withSpannerConfig(spannerConfig)
            .withTimestampBound(TimestampBound.strong()));
PCollection<Struct> singers =
    pipeline.apply(
        SpannerIO.read()
            .withSpannerConfig(spannerConfig)
            .withQuery("SELECT singer_id, first_name, last_name FROM singers")
            .withTransaction(tx));
PCollection<Struct> albums =
    pipeline.apply(
        SpannerIO.read()
            .withSpannerConfig(spannerConfig)
            .withQuery("SELECT singer_id, album_id, album_title FROM albums")
            .withTransaction(tx));

Lire les données de toutes les tables disponibles

Vous pouvez lire les données de toutes les tables disponibles dans une base de données Spanner.

GoogleSQL

PCollection<Struct> allRecords =
    pipeline
        .apply(
            SpannerIO.read()
                .withSpannerConfig(spannerConfig)
                .withBatching(false)
                .withQuery(
                    "SELECT t.table_name FROM information_schema.tables AS t WHERE t"
                        + ".table_catalog = '' AND t.table_schema = ''"))
        .apply(
            MapElements.into(TypeDescriptor.of(ReadOperation.class))
                .via(
                    (SerializableFunction<Struct, ReadOperation>)
                        input -> {
                          String tableName = input.getString(0);
                          return ReadOperation.create().withQuery("SELECT * FROM " + tableName);
                        }))
        .apply(SpannerIO.readAll().withSpannerConfig(spannerConfig));

PostgreSQL

PCollection<Struct> allRecords =
    pipeline
        .apply(
            SpannerIO.read()
                .withSpannerConfig(spannerConfig)
                .withBatching(false)
                .withQuery(
                    Statement.newBuilder(
                            "SELECT t.table_name FROM information_schema.tables AS t "
                                + "WHERE t.table_catalog = $1 AND t.table_schema = $2")
                        .bind("p1")
                        .to(spannerConfig.getDatabaseId().get())
                        .bind("p2")
                        .to("public")
                        .build()))
        .apply(
            MapElements.into(TypeDescriptor.of(ReadOperation.class))
                .via(
                    (SerializableFunction<Struct, ReadOperation>)
                        input -> {
                          String tableName = input.getString(0);
                          return ReadOperation.create()
                              .withQuery("SELECT * FROM \"" + tableName + "\"");
                        }))
        .apply(SpannerIO.readAll().withSpannerConfig(spannerConfig));

Résoudre les problèmes de requêtes non compatibles

Le connecteur Dataflow n'est compatible qu'avec les requêtes SQL Spanner dont le premier opérateur du plan d'exécution de la requête est une union distribuée. Si vous tentez de lire des données depuis Spanner à l'aide d'une requête et que vous obtenez une exception indiquant que la requête est does not have a DistributedUnion at the root, suivez les étapes de la section Comprendre comment Spanner exécute les requêtes afin de récupérer un plan d'exécution pour votre requête à l'aide de la console Google Cloud.

Si la requête SQL n'est pas compatible, simplifiez-la par une requête dont l'union distribuée est le premier opérateur dans son plan d'exécution. Supprimez les fonctions d'agrégat, les jointures de table ainsi que les opérateurs DISTINCT, GROUP BY et ORDER. Ce sont les opérateurs les plus susceptibles d'empêcher le fonctionnement de la requête.

Créer des mutations pour une écriture

Utilisez la méthode newInsertOrUpdateBuilder() de la classe Mutation au lieu de la méthode newInsertBuilder(), sauf si cela est absolument nécessaire pour les pipelines Java. Pour les pipelines Python, utilisez SpannerInsertOrUpdate() au lieu de SpannerInsert(). Dataflow fournit la garantie de type "au moins une fois", ce qui signifie que la mutation peut être écrite plusieurs fois. Par conséquent, seules les mutations INSERT peuvent générer des erreurs com.google.cloud.spanner.SpannerException: ALREADY_EXISTS qui entraînent l'échec du pipeline. Pour éviter cette erreur, utilisez plutôt la mutation INSERT_OR_UPDATE, qui ajoute une ligne ou met à jour les valeurs de colonne si la ligne existe déjà. La mutation INSERT_OR_UPDATE peut être appliquée plusieurs fois.

Écrire dans Spanner et transformer des données

Vous pouvez écrire des données dans Spanner à l'aide du connecteur Dataflow en utilisant une transformation SpannerIO.write() pour exécuter une collection de mutations de ligne d'entrée. Le connecteur Dataflow regroupe les mutations en lots pour plus d'efficacité.

L'exemple suivant montre comment appliquer une transformation d'écriture à une collection PCollection de mutations :

GoogleSQL

albums
    // Spanner expects a Mutation object, so create it using the Album's data
    .apply("CreateAlbumMutation", ParDo.of(new DoFn<Album, Mutation>() {
      @ProcessElement
      public void processElement(ProcessContext c) {
        Album album = c.element();
        c.output(Mutation.newInsertOrUpdateBuilder("albums")
            .set("singerId").to(album.singerId)
            .set("albumId").to(album.albumId)
            .set("albumTitle").to(album.albumTitle)
            .build());
      }
    }))
    // Write mutations to Spanner
    .apply("WriteAlbums", SpannerIO.write()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId));

PostgreSQL

PCollectionView<Dialect> dialectView =
    pipeline.apply(Create.of(Dialect.POSTGRESQL)).apply(View.asSingleton());
albums
    // Spanner expects a Mutation object, so create it using the Album's data
    .apply("CreateAlbumMutation", ParDo.of(new DoFn<Album, Mutation>() {
      @ProcessElement
      public void processElement(ProcessContext c) {
        Album album = c.element();
        c.output(Mutation.newInsertOrUpdateBuilder("albums")
            .set("singerId").to(album.singerId)
            .set("albumId").to(album.albumId)
            .set("albumTitle").to(album.albumTitle)
            .build());
      }
    }))
    // Write mutations to Spanner
    .apply("WriteAlbums", SpannerIO.write()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withDialectView(dialectView));

Si une transformation s'arrête de manière inattendue avant la fin, le rollback ne sera pas effectué pour les mutations déjà appliquées.

Appliquer des groupes de mutations de manière atomique

Vous pouvez utiliser la classe MutationGroup pour vous assurer qu'un groupe de mutations est appliqué de manière atomique. Vous avez la garantie que les mutations d'un MutationGroup sont envoyées dans la même transaction, mais la transaction est susceptible d'être relancée.

Les groupes de mutations fonctionnent mieux lorsqu'ils sont utilisés pour regrouper les mutations qui affectent les données stockées les unes près des autres dans l'espace clé. Étant donné que Spanner entrelace les données des tables parentes et enfants dans la table parente, ces données sont toujours proches dans l'espace clé. Nous vous recommandons de structurer votre groupe de mutations de sorte qu'il contienne une mutation appliquée à une table parente et d'autres mutations appliquées aux tables enfants, ou que toutes ses mutations modifient des données proches les unes des autres dans l'espace clé. Pour savoir comment Spanner stocke les données des tables parentes et enfants, consultez la page Schéma et modèle de données. Si vous n'organisez pas vos groupes de mutations autour des hiérarchies de tables recommandées, ou si les données consultées ne sont pas proches les unes des autres dans l'espace clé, Spanner devra peut-être effectuer des commits en deux phases, ce qui ralentira les performances. Pour en savoir plus, consultez la section concernant les compromis de localité.

Pour utiliser la classe MutationGroup, créez une transformation SpannerIO.write() et appelez la méthode SpannerIO.Write.grouped(), qui affiche une transformation que vous pouvez ensuite appliquer à une collection PCollection d'objets MutationGroup.

Lors de la création d'un objet MutationGroup, la première mutation répertoriée devient la mutation principale. Si votre groupe de mutations affecte à la fois une table parente et une table enfant, la mutation principale devrait être une mutation à la table parente. Sinon, vous pouvez utiliser n'importe quelle mutation comme mutation principale. Le connecteur Dataflow utilise la mutation principale pour identifier les limites de la partition, afin de regrouper efficacement les mutations.

Par exemple, imaginez que votre application surveille le comportement et signale un comportement utilisateur problématique à examiner. Pour chaque comportement signalé, vous souhaitez mettre à jour la table Users pour bloquer l'accès de l'utilisateur à votre application. Vous devez également enregistrer l'incident dans la table PendingReviews. Pour vous assurer que les deux tables sont mises à jour de manière atomique, utilisez un MutationGroup :

GoogleSQL

PCollection<MutationGroup> mutations =
    suspiciousUserIds.apply(
        MapElements.via(
            new SimpleFunction<>() {

              @Override
              public MutationGroup apply(String userId) {
                // Immediately block the user.
                Mutation userMutation =
                    Mutation.newUpdateBuilder("Users")
                        .set("id")
                        .to(userId)
                        .set("state")
                        .to("BLOCKED")
                        .build();
                long generatedId =
                    Hashing.sha1()
                        .newHasher()
                        .putString(userId, Charsets.UTF_8)
                        .putLong(timestamp.getSeconds())
                        .putLong(timestamp.getNanos())
                        .hash()
                        .asLong();

                // Add an entry to pending review requests.
                Mutation pendingReview =
                    Mutation.newInsertOrUpdateBuilder("PendingReviews")
                        .set("id")
                        .to(generatedId) // Must be deterministically generated.
                        .set("userId")
                        .to(userId)
                        .set("action")
                        .to("REVIEW ACCOUNT")
                        .set("note")
                        .to("Suspicious activity detected.")
                        .build();

                return MutationGroup.create(userMutation, pendingReview);
              }
            }));

mutations.apply(SpannerIO.write()
    .withInstanceId(instanceId)
    .withDatabaseId(databaseId)
    .grouped());

PostgreSQL

PCollectionView<Dialect> dialectView =
    pipeline.apply(Create.of(Dialect.POSTGRESQL)).apply(View.asSingleton());
PCollection<MutationGroup> mutations = suspiciousUserIds
    .apply(MapElements.via(new SimpleFunction<String, MutationGroup>() {

      @Override
      public MutationGroup apply(String userId) {
        // Immediately block the user.
        Mutation userMutation = Mutation.newUpdateBuilder("Users")
            .set("id").to(userId)
            .set("state").to("BLOCKED")
            .build();
        long generatedId = Hashing.sha1().newHasher()
            .putString(userId, Charsets.UTF_8)
            .putLong(timestamp.getSeconds())
            .putLong(timestamp.getNanos())
            .hash()
            .asLong();

        // Add an entry to pending review requests.
        Mutation pendingReview = Mutation.newInsertOrUpdateBuilder("PendingReviews")
            .set("id").to(generatedId)  // Must be deterministically generated.
            .set("userId").to(userId)
            .set("action").to("REVIEW ACCOUNT")
            .set("note").to("Suspicious activity detected.")
            .build();

        return MutationGroup.create(userMutation, pendingReview);
      }
    }));

mutations.apply(SpannerIO.write()
    .withInstanceId(instanceId)
    .withDatabaseId(databaseId)
    .withDialectView(dialectView)
    .grouped());

Lorsque vous créez un groupe de mutations, la première mutation fournie comme argument devient la mutation principale. Dans ce cas, les deux tables ne sont pas liées, la mutation principale n'est donc pas clairement définie. Nous avons choisi userMutation comme mutation principale en la plaçant en premier. L'application des deux mutations séparément serait plus rapide, mais ne garantirait pas l'atomicité. Le groupe de mutations est donc le meilleur choix dans cette situation.

Étapes suivantes