Importer, exporter et modifier des données à l'aide de Dataflow

Dataflow est un service géré permettant de transformer et d'enrichir des données. Le connecteur Dataflow pour Spanner vous permet de lire et d'écrire des données dans Spanner dans un pipeline Dataflow, en transformant ou en modifiant les données de manière facultative. Vous pouvez également créer des pipelines qui transfèrent des données entre Spanner et d'autres produits Google Cloud.

Le connecteur Dataflow est la méthode recommandée pour déplacer efficacement des données vers et depuis Spanner de manière groupée et pour effectuer des transformations importantes dans une base de données non compatibles avec le LMD partitionné, telles que les déplacements de tables, les suppressions groupées nécessitant une jointure, etc. Lorsque vous travaillez avec des bases de données individuelles, d'autres méthodes vous permettent d'importer et d'exporter des données:

  • Utilisez la console Google Cloud pour exporter une base de données individuelle de Spanner vers Cloud Storage au format Avro.
  • Utilisez la console Google Cloud pour import une base de données dans Spanner à partir des fichiers exportés vers Cloud Storage.
  • Utilisez l'API REST ou Google Cloud CLI pour exécuter des tâches d'exportation ou d'import depuis Spanner vers Cloud Storage, et inversement (également au format Avro).

Le connecteur Dataflow pour Spanner fait partie du SDK Java Apache Beam, et fournit une API pour effectuer les actions ci-dessus. Pour en savoir plus sur certains des concepts abordés ci-dessous, tels que les objets PCollection et les transformations, consultez le guide de programmation d'Apache Beam.

Ajouter le connecteur à votre projet Maven

Pour ajouter le connecteur Google Cloud Dataflow à un projet Maven, ajoutez l'artefact Maven beam-sdks-java-io-google-cloud-platform au fichier pom.xml, sous la forme d'une dépendance :

Par exemple, en supposant que votre fichier pom.xml définit la valeur de beam.version sur le numéro de version approprié, vous ajouteriez alors la dépendance suivante :

<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
    <version>${beam.version}</version>
</dependency>

Lire des données de Spanner

Pour lire des données à partir de Spanner, appliquez la transformation SpannerIO.read(). Configurez la lecture à l'aide des méthodes de la classe SpannerIO.Read. L'application de la transformation renvoie une classe PCollection<Struct> où chaque élément représente une ligne individuelle affichée par l'opération de lecture. Vous pouvez lire des données dans Spanner avec et sans requête SQL spécifique, en fonction du résultat souhaité.

L'application de la transformation SpannerIO.read() renvoie une vue cohérente des données en effectuant une lecture forte. Sauf indication contraire de votre part, le résultat de la lecture est instantané par rapport au moment où vous démarrez la lecture. Consultez la section Lectures pour plus d'informations sur les différents types de lectures que Spanner peut effectuer.

Lire des données à l'aide d'une requête

Pour lire un ensemble spécifique de données à partir de Spanner, configurez la transformation à l'aide de la méthode SpannerIO.Read.withQuery() pour spécifier une requête SQL. Exemple :

// Query for all the columns and rows in the specified Spanner table
PCollection<Struct> records = pipeline.apply(
    SpannerIO.read()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withQuery("SELECT * FROM " + options.getTable()));

Lire les données sans spécifier de requête

Pour lire des données dans une base de données sans utiliser de requête, vous pouvez spécifier un nom de table à l'aide de la méthode SpannerIO.Read.withTable() et une liste de colonnes à lire avec la méthode SpannerIO.Read.withColumns(). Exemple :

GoogleSQL

// Query for all the columns and rows in the specified Spanner table
PCollection<Struct> records = pipeline.apply(
    SpannerIO.read()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withTable("Singers")
        .withColumns("singerId", "firstName", "lastName"));

PostgreSQL

// Query for all the columns and rows in the specified Spanner table
PCollection<Struct> records = pipeline.apply(
    SpannerIO.read()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withTable("singers")
        .withColumns("singer_id", "first_name", "last_name"));

Pour limiter le nombre de lignes lues, vous pouvez spécifier un ensemble de clés primaires à lire à l'aide de la méthode SpannerIO.Read.withKeySet().

Vous pouvez également lire une table à l'aide d'un index secondaire spécifié. Comme pour l'appel d'API readUsingIndex(), l'index doit contenir toutes les données qui apparaissent dans les résultats de la requête.

Pour ce faire, spécifiez la table comme indiqué dans l'exemple précédent, ainsi que l'index qui contient les valeurs de colonne souhaitées à l'aide de la méthode SpannerIO.Read.withIndex(). L'index doit stocker toutes les colonnes que la transformation doit lire. La clé primaire de la table de base est stockée implicitement. Par exemple, pour lire la table Songs à l'aide de l'index SongsBySongName, utilisez le code suivant:

GoogleSQL

// Read the indexed columns from all rows in the specified index.
PCollection<Struct> records =
    pipeline.apply(
        SpannerIO.read()
            .withInstanceId(instanceId)
            .withDatabaseId(databaseId)
            .withTable("Songs")
            .withIndex("SongsBySongName")
            // Can only read columns that are either indexed, STORED in the index or
            // part of the primary key of the Songs table,
            .withColumns("SingerId", "AlbumId", "TrackId", "SongName"));

PostgreSQL

// // Read the indexed columns from all rows in the specified index.
PCollection<Struct> records =
    pipeline.apply(
        SpannerIO.read()
            .withInstanceId(instanceId)
            .withDatabaseId(databaseId)
            .withTable("Songs")
            .withIndex("SongsBySongName")
            // Can only read columns that are either indexed, STORED in the index or
            // part of the primary key of the songs table,
            .withColumns("singer_id", "album_id", "track_id", "song_name"));

Contrôler l'obsolescence des données de transaction

Vous avez la garantie qu'une transformation s'exécute sur un instantané cohérent de données. Pour contrôler l'obsolescence des données, appelez la méthode SpannerIO.Read.withTimestampBound() Consultez la page des transactions pour en savoir plus.

Lire les données de plusieurs tables dans la même transaction

Si vous souhaitez lire les données issues de plusieurs tables au même moment afin d'assurer la cohérence des données, utilisez une seule transaction. Pour ce faire, appliquez une transformation createTransaction() qui crée un objet PCollectionView<Transaction> qui, à son tour, crée une transaction. La vue résultante peut être transmise à une opération de lecture à l'aide de SpannerIO.Read.withTransaction().

GoogleSQL

SpannerConfig spannerConfig =
    SpannerConfig.create().withInstanceId(instanceId).withDatabaseId(databaseId);
PCollectionView<Transaction> tx =
    pipeline.apply(
        SpannerIO.createTransaction()
            .withSpannerConfig(spannerConfig)
            .withTimestampBound(TimestampBound.strong()));
PCollection<Struct> singers =
    pipeline.apply(
        SpannerIO.read()
            .withSpannerConfig(spannerConfig)
            .withQuery("SELECT SingerID, FirstName, LastName FROM Singers")
            .withTransaction(tx));
PCollection<Struct> albums =
    pipeline.apply(
        SpannerIO.read()
            .withSpannerConfig(spannerConfig)
            .withQuery("SELECT SingerId, AlbumId, AlbumTitle FROM Albums")
            .withTransaction(tx));

PostgreSQL

SpannerConfig spannerConfig =
    SpannerConfig.create().withInstanceId(instanceId).withDatabaseId(databaseId);
PCollectionView<Transaction> tx =
    pipeline.apply(
        SpannerIO.createTransaction()
            .withSpannerConfig(spannerConfig)
            .withTimestampBound(TimestampBound.strong()));
PCollection<Struct> singers =
    pipeline.apply(
        SpannerIO.read()
            .withSpannerConfig(spannerConfig)
            .withQuery("SELECT singer_id, first_name, last_name FROM singers")
            .withTransaction(tx));
PCollection<Struct> albums =
    pipeline.apply(
        SpannerIO.read()
            .withSpannerConfig(spannerConfig)
            .withQuery("SELECT singer_id, album_id, album_title FROM albums")
            .withTransaction(tx));

Lire les données de toutes les tables disponibles

Vous pouvez lire les données de toutes les tables disponibles dans une base de données Spanner.

GoogleSQL

PCollection<Struct> allRecords =
    pipeline
        .apply(
            SpannerIO.read()
                .withSpannerConfig(spannerConfig)
                .withBatching(false)
                .withQuery(
                    "SELECT t.table_name FROM information_schema.tables AS t WHERE t"
                        + ".table_catalog = '' AND t.table_schema = ''"))
        .apply(
            MapElements.into(TypeDescriptor.of(ReadOperation.class))
                .via(
                    (SerializableFunction<Struct, ReadOperation>)
                        input -> {
                          String tableName = input.getString(0);
                          return ReadOperation.create().withQuery("SELECT * FROM " + tableName);
                        }))
        .apply(SpannerIO.readAll().withSpannerConfig(spannerConfig));

PostgreSQL

PCollection<Struct> allRecords =
    pipeline
        .apply(
            SpannerIO.read()
                .withSpannerConfig(spannerConfig)
                .withBatching(false)
                .withQuery(
                    Statement.newBuilder(
                            "SELECT t.table_name FROM information_schema.tables AS t "
                                + "WHERE t.table_catalog = $1 AND t.table_schema = $2")
                        .bind("p1")
                        .to(spannerConfig.getDatabaseId().get())
                        .bind("p2")
                        .to("public")
                        .build()))
        .apply(
            MapElements.into(TypeDescriptor.of(ReadOperation.class))
                .via(
                    (SerializableFunction<Struct, ReadOperation>)
                        input -> {
                          String tableName = input.getString(0);
                          return ReadOperation.create()
                              .withQuery("SELECT * FROM \"" + tableName + "\"");
                        }))
        .apply(SpannerIO.readAll().withSpannerConfig(spannerConfig));

Résoudre les problèmes liés aux requêtes non compatibles

Le connecteur Dataflow n'est compatible qu'avec les requêtes SQL Spanner dans lesquelles le premier opérateur du plan d'exécution de la requête est une union distribuée. Si vous tentez de lire des données à partir de Spanner à l'aide d'une requête et que vous obtenez une exception indiquant la requête does not have a DistributedUnion at the root, suivez les étapes de la section Comprendre comment Spanner exécute les requêtes pour récupérer un plan d'exécution pour votre requête à l'aide de la console Google Cloud.

Si la requête SQL n'est pas compatible, simplifiez-la par une requête dont l'union distribuée est le premier opérateur dans son plan d'exécution. Supprimez les fonctions d'agrégat, les jointures de table ainsi que les opérateurs DISTINCT, GROUP BY et ORDER. Ce sont les opérateurs les plus susceptibles d'empêcher le fonctionnement de la requête.

Créer des mutations pour une écriture

Utilisez la méthode newInsertOrUpdateBuilder() de la classe Mutation au lieu de la méthode newInsertBuilder(), sauf si cela est absolument nécessaire pour les pipelines Java. Pour les pipelines Python, utilisez SpannerInsertOrUpdate() au lieu de SpannerInsert(). Dataflow offre des garanties de type "au moins une fois", ce qui signifie que la mutation peut être écrite plusieurs fois. Par conséquent, seules les mutations INSERT peuvent générer des erreurs com.google.cloud.spanner.SpannerException: ALREADY_EXISTS qui entraînent l'échec du pipeline. Pour éviter cette erreur, utilisez plutôt la mutation INSERT_OR_UPDATE, qui ajoute une nouvelle ligne ou met à jour les valeurs de colonne si la ligne existe déjà. La mutation INSERT_OR_UPDATE peut être appliquée plusieurs fois.

Écrire dans Spanner et transformer des données

Vous pouvez écrire des données dans Spanner avec le connecteur Dataflow. Pour ce faire, utilisez une transformation SpannerIO.write() pour exécuter une collection de mutations de ligne d'entrée. Le connecteur Dataflow regroupe les mutations en lots pour plus d'efficacité.

L'exemple suivant montre comment appliquer une transformation d'écriture à une collection PCollection de mutations :

GoogleSQL

albums
    // Spanner expects a Mutation object, so create it using the Album's data
    .apply("CreateAlbumMutation", ParDo.of(new DoFn<Album, Mutation>() {
      @ProcessElement
      public void processElement(ProcessContext c) {
        Album album = c.element();
        c.output(Mutation.newInsertOrUpdateBuilder("albums")
            .set("singerId").to(album.singerId)
            .set("albumId").to(album.albumId)
            .set("albumTitle").to(album.albumTitle)
            .build());
      }
    }))
    // Write mutations to Spanner
    .apply("WriteAlbums", SpannerIO.write()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId));

PostgreSQL

PCollectionView<Dialect> dialectView =
    pipeline.apply(Create.of(Dialect.POSTGRESQL)).apply(View.asSingleton());
albums
    // Spanner expects a Mutation object, so create it using the Album's data
    .apply("CreateAlbumMutation", ParDo.of(new DoFn<Album, Mutation>() {
      @ProcessElement
      public void processElement(ProcessContext c) {
        Album album = c.element();
        c.output(Mutation.newInsertOrUpdateBuilder("albums")
            .set("singerId").to(album.singerId)
            .set("albumId").to(album.albumId)
            .set("albumTitle").to(album.albumTitle)
            .build());
      }
    }))
    // Write mutations to Spanner
    .apply("WriteAlbums", SpannerIO.write()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withDialectView(dialectView));

Si une transformation s'arrête de manière inattendue avant la fin, les mutations déjà appliquées ne sont pas annulées.

Appliquer des groupes de mutations de manière atomique

Vous pouvez utiliser la classe MutationGroup pour vous assurer qu'un groupe de mutations est appliqué de manière atomique. Vous avez la garantie que les mutations d'un MutationGroup sont envoyées dans la même transaction, mais la transaction est susceptible d'être relancée.

Les groupes de mutations fonctionnent mieux lorsqu'ils sont utilisés pour regrouper les mutations qui affectent les données stockées les unes près des autres dans l'espace clé. Étant donné que Spanner entrelace les données des tables parentes et enfants dans la table parente, ces données sont toujours proches les unes des autres dans l'espace clé. Nous vous recommandons de structurer votre groupe de mutations de sorte qu'il contienne une mutation appliquée à une table parente et d'autres mutations appliquées aux tables enfants, ou que toutes ses mutations modifient des données proches les unes des autres dans l'espace clé. Pour savoir comment Spanner stocke les données des tables parentes et enfants, consultez la section Schéma et modèle de données. Si vous n'organisez pas vos groupes de mutations autour des hiérarchies de tables recommandées, ou si les données consultées ne sont pas proches dans l'espace clé, Spanner peut avoir besoin d'effectuer des commits en deux phases, ce qui ralentit les performances. Pour en savoir plus, consultez la section concernant les compromis de localité.

Pour utiliser la classe MutationGroup, créez une transformation SpannerIO.write() et appelez la méthode SpannerIO.Write.grouped(), qui affiche une transformation que vous pouvez ensuite appliquer à une collection PCollection d'objets MutationGroup.

Lors de la création d'un objet MutationGroup, la première mutation répertoriée devient la mutation principale. Si votre groupe de mutations affecte à la fois une table parente et une table enfant, la mutation principale devrait être une mutation à la table parente. Sinon, vous pouvez utiliser n'importe quelle mutation comme mutation principale. Le connecteur Dataflow utilise la mutation principale pour identifier les limites de la partition, afin de regrouper efficacement les mutations.

Par exemple, imaginez que votre application surveille le comportement et signale un comportement utilisateur problématique à examiner. Pour chaque comportement signalé, vous souhaitez mettre à jour la table Users pour bloquer l'accès de l'utilisateur à votre application. Vous devez également enregistrer l'incident dans la table PendingReviews. Pour vous assurer que les deux tables sont mises à jour de manière atomique, utilisez un MutationGroup :

GoogleSQL

PCollection<MutationGroup> mutations =
    suspiciousUserIds.apply(
        MapElements.via(
            new SimpleFunction<>() {

              @Override
              public MutationGroup apply(String userId) {
                // Immediately block the user.
                Mutation userMutation =
                    Mutation.newUpdateBuilder("Users")
                        .set("id")
                        .to(userId)
                        .set("state")
                        .to("BLOCKED")
                        .build();
                long generatedId =
                    Hashing.sha1()
                        .newHasher()
                        .putString(userId, Charsets.UTF_8)
                        .putLong(timestamp.getSeconds())
                        .putLong(timestamp.getNanos())
                        .hash()
                        .asLong();

                // Add an entry to pending review requests.
                Mutation pendingReview =
                    Mutation.newInsertOrUpdateBuilder("PendingReviews")
                        .set("id")
                        .to(generatedId) // Must be deterministically generated.
                        .set("userId")
                        .to(userId)
                        .set("action")
                        .to("REVIEW ACCOUNT")
                        .set("note")
                        .to("Suspicious activity detected.")
                        .build();

                return MutationGroup.create(userMutation, pendingReview);
              }
            }));

mutations.apply(SpannerIO.write()
    .withInstanceId(instanceId)
    .withDatabaseId(databaseId)
    .grouped());

PostgreSQL

PCollectionView<Dialect> dialectView =
    pipeline.apply(Create.of(Dialect.POSTGRESQL)).apply(View.asSingleton());
PCollection<MutationGroup> mutations = suspiciousUserIds
    .apply(MapElements.via(new SimpleFunction<String, MutationGroup>() {

      @Override
      public MutationGroup apply(String userId) {
        // Immediately block the user.
        Mutation userMutation = Mutation.newUpdateBuilder("Users")
            .set("id").to(userId)
            .set("state").to("BLOCKED")
            .build();
        long generatedId = Hashing.sha1().newHasher()
            .putString(userId, Charsets.UTF_8)
            .putLong(timestamp.getSeconds())
            .putLong(timestamp.getNanos())
            .hash()
            .asLong();

        // Add an entry to pending review requests.
        Mutation pendingReview = Mutation.newInsertOrUpdateBuilder("PendingReviews")
            .set("id").to(generatedId)  // Must be deterministically generated.
            .set("userId").to(userId)
            .set("action").to("REVIEW ACCOUNT")
            .set("note").to("Suspicious activity detected.")
            .build();

        return MutationGroup.create(userMutation, pendingReview);
      }
    }));

mutations.apply(SpannerIO.write()
    .withInstanceId(instanceId)
    .withDatabaseId(databaseId)
    .withDialectView(dialectView)
    .grouped());

Lorsque vous créez un groupe de mutations, la première mutation fournie comme argument devient la mutation principale. Dans ce cas, les deux tables ne sont pas liées, la mutation principale n'est donc pas clairement définie. Nous avons choisi userMutation comme mutation principale en la plaçant en premier. L'application des deux mutations séparément serait plus rapide, mais ne garantirait pas l'atomicité. Le groupe de mutations est donc le meilleur choix dans cette situation.

Étapes suivantes