Importer, exporter et modifier des données à l'aide de Dataflow

Dataflow est un service géré permettant de transformer et d'enrichir des données. Le connecteur Dataflow pour Spanner vous permet de lire des données depuis et écrire des données dans Spanner dans un pipeline Dataflow, et éventuellement transformer ou modifier les données. Vous pouvez aussi créer des pipelines qui transfèrent des données entre Spanner et d'autres produits Google Cloud.

Le connecteur Dataflow est la méthode recommandée pour optimiser déplacer des données vers et depuis Spanner de façon groupée, des transformations en base de données qui ne sont pas compatibles avec le LMD partitionné, tels que les déplacements de tableau, les suppressions groupées nécessitant une jointure, etc. Lorsque vous travaillez avec des bases de données individuelles, il existe d'autres méthodes que vous pouvez utiliser pour importer et exporter des données:

  • Utiliser la console Google Cloud pour exporter une base de données individuelle depuis Spanner vers Cloud Storage au format Avro.
  • Utiliser la console Google Cloud pour réimporter une base de données dans Spanner à partir des fichiers que vous avez exportés vers Cloud Storage.
  • Utilisez l'API REST ou la Google Cloud CLI pour exécuter une exportation ou Importer des jobs de Spanner vers Cloud Storage et inversement au format Avro).

Le connecteur Dataflow pour Spanner fait partie Le SDK Java Apache Beam, qui fournit une API pour effectuer les opérations ci-dessus actions. Pour en savoir plus sur certains des concepts abordés ci-dessous, tels que en tant qu'objets PCollection et transformations, consultez le guide de programmation d'Apache Beam.

Ajouter le connecteur à votre projet Maven

Pour ajouter le connecteur Google Cloud Dataflow à un projet Maven, ajoutez l'artefact Maven beam-sdks-java-io-google-cloud-platform au fichier pom.xml, sous la forme d'une dépendance :

Par exemple, en supposant que votre fichier pom.xml définit la valeur de beam.version sur le numéro de version approprié, vous ajouteriez alors la dépendance suivante :

<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
    <version>${beam.version}</version>
</dependency>

Lire les données de Spanner

Pour lire des données issues de Spanner, appliquez la transformation SpannerIO.read(). Configurez la lecture à l'aide des méthodes de la classe SpannerIO.Read. L'application de la transformation renvoie une classe PCollection<Struct> où chaque élément représente une ligne individuelle affichée par l'opération de lecture. Vous pouvez lire des données depuis Spanner avec et sans une requête SQL spécifique. en fonction du résultat souhaité.

L'application de la transformation SpannerIO.read() renvoie une vue cohérente des données en effectuant une lecture forte. Sauf indication contraire de votre part, le résultat de la lecture est instantané par rapport au moment où vous démarrez la lecture. Voir les lectures pour plus d'informations des informations sur les différents types de lectures que Spanner peut effectuer.

Lire des données à l'aide d'une requête

Pour lire un ensemble spécifique de données de Spanner, configurez la transformation à l'aide de la méthode SpannerIO.Read.withQuery() pour spécifier un code SQL ; requête. Exemple :

// Query for all the columns and rows in the specified Spanner table
PCollection<Struct> records = pipeline.apply(
    SpannerIO.read()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withQuery("SELECT * FROM " + options.getTable()));

Lire des données sans spécifier de requête

Pour lire les données d'une base de données sans utiliser de requête, vous pouvez spécifier une table à l'aide de la méthode SpannerIO.Read.withTable() et spécifiez un liste de colonnes à lire à l'aide de la méthode SpannerIO.Read.withColumns() . Exemple :

GoogleSQL

// Query for all the columns and rows in the specified Spanner table
PCollection<Struct> records = pipeline.apply(
    SpannerIO.read()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withTable("Singers")
        .withColumns("singerId", "firstName", "lastName"));

PostgreSQL

// Query for all the columns and rows in the specified Spanner table
PCollection<Struct> records = pipeline.apply(
    SpannerIO.read()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withTable("singers")
        .withColumns("singer_id", "first_name", "last_name"));

Pour limiter le nombre de lignes lues, vous pouvez spécifier un ensemble de clés primaires à lire à l'aide de la méthode SpannerIO.Read.withKeySet().

Vous pouvez également lire une table à l'aide d'un index secondaire spécifié. Comme pour appel d'API readUsingIndex(), l'index doit contenir toutes les données qui apparaît dans les résultats de la requête.

Pour ce faire, spécifiez la table comme dans l'exemple précédent et indiquez la index contenant les valeurs de colonne souhaitées à l'aide de la propriété SpannerIO.Read.withIndex(). L'index doit stocker l'intégralité les colonnes que la transformation doit lire. La clé primaire de la table de base est stockées implicitement. Par exemple, pour lire la table Songs à l'aide de l'index SongsBySongName, vous utilisez code suivant:

GoogleSQL

// Read the indexed columns from all rows in the specified index.
PCollection<Struct> records =
    pipeline.apply(
        SpannerIO.read()
            .withInstanceId(instanceId)
            .withDatabaseId(databaseId)
            .withTable("Songs")
            .withIndex("SongsBySongName")
            // Can only read columns that are either indexed, STORED in the index or
            // part of the primary key of the Songs table,
            .withColumns("SingerId", "AlbumId", "TrackId", "SongName"));

PostgreSQL

// // Read the indexed columns from all rows in the specified index.
PCollection<Struct> records =
    pipeline.apply(
        SpannerIO.read()
            .withInstanceId(instanceId)
            .withDatabaseId(databaseId)
            .withTable("Songs")
            .withIndex("SongsBySongName")
            // Can only read columns that are either indexed, STORED in the index or
            // part of the primary key of the songs table,
            .withColumns("singer_id", "album_id", "track_id", "song_name"));

Contrôler l'obsolescence des données de transaction

Vous avez la garantie qu'une transformation s'exécute sur un instantané cohérent de données. Pour contrôler l'obsolescence des données, appelez la méthode SpannerIO.Read.withTimestampBound() Consultez la page des transactions pour en savoir plus.

Lire à partir de plusieurs tables dans la même transaction

Si vous souhaitez lire les données issues de plusieurs tables au même moment afin d'assurer la cohérence des données, utilisez une seule transaction. Pour ce faire, appliquez une transformation createTransaction() qui crée un objet PCollectionView<Transaction> qui, à son tour, crée une transaction. La vue résultante peut être transmise à une opération de lecture à l'aide de SpannerIO.Read.withTransaction().

GoogleSQL

SpannerConfig spannerConfig =
    SpannerConfig.create().withInstanceId(instanceId).withDatabaseId(databaseId);
PCollectionView<Transaction> tx =
    pipeline.apply(
        SpannerIO.createTransaction()
            .withSpannerConfig(spannerConfig)
            .withTimestampBound(TimestampBound.strong()));
PCollection<Struct> singers =
    pipeline.apply(
        SpannerIO.read()
            .withSpannerConfig(spannerConfig)
            .withQuery("SELECT SingerID, FirstName, LastName FROM Singers")
            .withTransaction(tx));
PCollection<Struct> albums =
    pipeline.apply(
        SpannerIO.read()
            .withSpannerConfig(spannerConfig)
            .withQuery("SELECT SingerId, AlbumId, AlbumTitle FROM Albums")
            .withTransaction(tx));

PostgreSQL

SpannerConfig spannerConfig =
    SpannerConfig.create().withInstanceId(instanceId).withDatabaseId(databaseId);
PCollectionView<Transaction> tx =
    pipeline.apply(
        SpannerIO.createTransaction()
            .withSpannerConfig(spannerConfig)
            .withTimestampBound(TimestampBound.strong()));
PCollection<Struct> singers =
    pipeline.apply(
        SpannerIO.read()
            .withSpannerConfig(spannerConfig)
            .withQuery("SELECT singer_id, first_name, last_name FROM singers")
            .withTransaction(tx));
PCollection<Struct> albums =
    pipeline.apply(
        SpannerIO.read()
            .withSpannerConfig(spannerConfig)
            .withQuery("SELECT singer_id, album_id, album_title FROM albums")
            .withTransaction(tx));

Lire les données de toutes les tables disponibles

Vous pouvez lire les données de toutes les tables disponibles dans une base de données Spanner.

GoogleSQL

PCollection<Struct> allRecords =
    pipeline
        .apply(
            SpannerIO.read()
                .withSpannerConfig(spannerConfig)
                .withBatching(false)
                .withQuery(
                    "SELECT t.table_name FROM information_schema.tables AS t WHERE t"
                        + ".table_catalog = '' AND t.table_schema = ''"))
        .apply(
            MapElements.into(TypeDescriptor.of(ReadOperation.class))
                .via(
                    (SerializableFunction<Struct, ReadOperation>)
                        input -> {
                          String tableName = input.getString(0);
                          return ReadOperation.create().withQuery("SELECT * FROM " + tableName);
                        }))
        .apply(SpannerIO.readAll().withSpannerConfig(spannerConfig));

PostgreSQL

PCollection<Struct> allRecords =
    pipeline
        .apply(
            SpannerIO.read()
                .withSpannerConfig(spannerConfig)
                .withBatching(false)
                .withQuery(
                    Statement.newBuilder(
                            "SELECT t.table_name FROM information_schema.tables AS t "
                                + "WHERE t.table_catalog = $1 AND t.table_schema = $2")
                        .bind("p1")
                        .to(spannerConfig.getDatabaseId().get())
                        .bind("p2")
                        .to("public")
                        .build()))
        .apply(
            MapElements.into(TypeDescriptor.of(ReadOperation.class))
                .via(
                    (SerializableFunction<Struct, ReadOperation>)
                        input -> {
                          String tableName = input.getString(0);
                          return ReadOperation.create()
                              .withQuery("SELECT * FROM \"" + tableName + "\"");
                        }))
        .apply(SpannerIO.readAll().withSpannerConfig(spannerConfig));

Résoudre les problèmes de requêtes non compatibles

Le connecteur Dataflow n'est compatible qu'avec les requêtes SQL Spanner où le premier opérateur du plan d'exécution de la requête est un opérateur Distributed Union. Si vous tentez de lire des données de Spanner à l'aide d'une requête vous obtenez une exception indiquant que la requête does not have a DistributedUnion at the root, suivez les étapes de la section Comprendre le fonctionnement de Spanner requêtes afin de récupérer un plan d'exécution pour votre requête à l'aide de console Google Cloud.

Si la requête SQL n'est pas compatible, simplifiez-la par une requête dont l'union distribuée est le premier opérateur dans son plan d'exécution. Supprimez les fonctions d'agrégat, les jointures de table ainsi que les opérateurs DISTINCT, GROUP BY et ORDER. Ce sont les opérateurs les plus susceptibles d'empêcher le fonctionnement de la requête.

Créer des mutations pour une écriture

Utiliser la classe Mutation newInsertOrUpdateBuilder() au lieu de Méthode newInsertBuilder() sauf si cela est absolument nécessaire pour les pipelines Java. Pour les pipelines Python, utilisez SpannerInsertOrUpdate() au lieu de SpannerInsert(). Dataflow offre au moins une fois, ce qui signifie que la mutation peut être écrite plusieurs fois. Par conséquent, INSERT seules les mutations peuvent générer com.google.cloud.spanner.SpannerException: ALREADY_EXISTS erreurs à l'origine de l'échec du pipeline. Pour éviter cette erreur, utilisez le INSERT_OR_UPDATE. , qui ajoute une ligne ou met à jour les valeurs des colonnes si la ligne existe déjà. La mutation INSERT_OR_UPDATE peut être appliquée plusieurs fois.

Écrire dans Spanner et transformer des données

Vous pouvez écrire des données dans Spanner à l'aide de la Dataflow à l'aide d'une transformation SpannerIO.write() pour exécuter collection de mutations de ligne d'entrée. Les groupes de connecteurs Dataflow en lots pour plus d'efficacité.

L'exemple suivant montre comment appliquer une transformation d'écriture à une collection PCollection de mutations :

GoogleSQL

albums
    // Spanner expects a Mutation object, so create it using the Album's data
    .apply("CreateAlbumMutation", ParDo.of(new DoFn<Album, Mutation>() {
      @ProcessElement
      public void processElement(ProcessContext c) {
        Album album = c.element();
        c.output(Mutation.newInsertOrUpdateBuilder("albums")
            .set("singerId").to(album.singerId)
            .set("albumId").to(album.albumId)
            .set("albumTitle").to(album.albumTitle)
            .build());
      }
    }))
    // Write mutations to Spanner
    .apply("WriteAlbums", SpannerIO.write()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId));

PostgreSQL

PCollectionView<Dialect> dialectView =
    pipeline.apply(Create.of(Dialect.POSTGRESQL)).apply(View.asSingleton());
albums
    // Spanner expects a Mutation object, so create it using the Album's data
    .apply("CreateAlbumMutation", ParDo.of(new DoFn<Album, Mutation>() {
      @ProcessElement
      public void processElement(ProcessContext c) {
        Album album = c.element();
        c.output(Mutation.newInsertOrUpdateBuilder("albums")
            .set("singerId").to(album.singerId)
            .set("albumId").to(album.albumId)
            .set("albumTitle").to(album.albumTitle)
            .build());
      }
    }))
    // Write mutations to Spanner
    .apply("WriteAlbums", SpannerIO.write()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withDialectView(dialectView));

Si une transformation s'arrête de manière inattendue avant la fin, les mutations qui ont déjà appliquée ne feront pas l'objet d'un rollback.

Appliquer des groupes de mutations de manière atomique

Vous pouvez utiliser la classe MutationGroup pour vous assurer qu'un groupe de mutations est appliqué de manière atomique. Vous avez la garantie que les mutations d'un MutationGroup sont envoyées dans la même transaction, mais la transaction est susceptible d'être relancée.

Les groupes de mutations fonctionnent mieux lorsqu'ils sont utilisés pour regrouper les mutations qui affectent les données stockées les unes près des autres dans l'espace clé. Étant donné que Spanner entre les données des tables parentes et enfants dans la table parente, ces données sont toujours proches les uns des autres dans l’espace clé. Nous vous recommandons de structurer votre groupe de mutations de sorte qu'il contienne une mutation appliquée à une table parente et d'autres mutations appliquées aux tables enfants, ou que toutes ses mutations modifient des données proches les unes des autres dans l'espace clé. Pour en savoir plus sur la façon dont Spanner stocke les ressources données des tables enfants, consultez la section Schéma et modèle de données. Si vous n'organisez pas groupes de mutations autour des hiérarchies de tables recommandées, ou si les données ne sont pas proches les unes des autres dans l'espace clé, Spanner peut effectuer des commits en deux phases, ce qui ralentit les performances. Pour en savoir plus, consultez la section concernant les compromis de localité.

Pour utiliser la classe MutationGroup, créez une transformation SpannerIO.write() et appelez la méthode SpannerIO.Write.grouped(), qui affiche une transformation que vous pouvez ensuite appliquer à une collection PCollection d'objets MutationGroup.

Lors de la création d'un objet MutationGroup, la première mutation répertoriée devient la mutation principale. Si votre groupe de mutations affecte à la fois une table parente et une table enfant, la mutation principale devrait être une mutation à la table parente. Sinon, vous pouvez utiliser n'importe quelle mutation comme mutation principale. Le connecteur Dataflow utilise la mutation principale pour identifier les limites de la partition, afin de regrouper efficacement les mutations.

Par exemple, imaginez que votre application surveille le comportement et signale un comportement utilisateur problématique à examiner. Pour chaque comportement signalé, vous souhaitez mettre à jour la table Users pour bloquer l'accès de l'utilisateur à votre application. Vous devez également enregistrer l'incident dans la table PendingReviews. Pour vous assurer que les deux tables sont mises à jour de manière atomique, utilisez un MutationGroup :

GoogleSQL

PCollection<MutationGroup> mutations =
    suspiciousUserIds.apply(
        MapElements.via(
            new SimpleFunction<>() {

              @Override
              public MutationGroup apply(String userId) {
                // Immediately block the user.
                Mutation userMutation =
                    Mutation.newUpdateBuilder("Users")
                        .set("id")
                        .to(userId)
                        .set("state")
                        .to("BLOCKED")
                        .build();
                long generatedId =
                    Hashing.sha1()
                        .newHasher()
                        .putString(userId, Charsets.UTF_8)
                        .putLong(timestamp.getSeconds())
                        .putLong(timestamp.getNanos())
                        .hash()
                        .asLong();

                // Add an entry to pending review requests.
                Mutation pendingReview =
                    Mutation.newInsertOrUpdateBuilder("PendingReviews")
                        .set("id")
                        .to(generatedId) // Must be deterministically generated.
                        .set("userId")
                        .to(userId)
                        .set("action")
                        .to("REVIEW ACCOUNT")
                        .set("note")
                        .to("Suspicious activity detected.")
                        .build();

                return MutationGroup.create(userMutation, pendingReview);
              }
            }));

mutations.apply(SpannerIO.write()
    .withInstanceId(instanceId)
    .withDatabaseId(databaseId)
    .grouped());

PostgreSQL

PCollectionView<Dialect> dialectView =
    pipeline.apply(Create.of(Dialect.POSTGRESQL)).apply(View.asSingleton());
PCollection<MutationGroup> mutations = suspiciousUserIds
    .apply(MapElements.via(new SimpleFunction<String, MutationGroup>() {

      @Override
      public MutationGroup apply(String userId) {
        // Immediately block the user.
        Mutation userMutation = Mutation.newUpdateBuilder("Users")
            .set("id").to(userId)
            .set("state").to("BLOCKED")
            .build();
        long generatedId = Hashing.sha1().newHasher()
            .putString(userId, Charsets.UTF_8)
            .putLong(timestamp.getSeconds())
            .putLong(timestamp.getNanos())
            .hash()
            .asLong();

        // Add an entry to pending review requests.
        Mutation pendingReview = Mutation.newInsertOrUpdateBuilder("PendingReviews")
            .set("id").to(generatedId)  // Must be deterministically generated.
            .set("userId").to(userId)
            .set("action").to("REVIEW ACCOUNT")
            .set("note").to("Suspicious activity detected.")
            .build();

        return MutationGroup.create(userMutation, pendingReview);
      }
    }));

mutations.apply(SpannerIO.write()
    .withInstanceId(instanceId)
    .withDatabaseId(databaseId)
    .withDialectView(dialectView)
    .grouped());

Lorsque vous créez un groupe de mutations, la première mutation fournie comme argument devient la mutation principale. Dans ce cas, les deux tables ne sont pas liées, la mutation principale n'est donc pas clairement définie. Nous avons choisi userMutation comme mutation principale en la plaçant en premier. L'application des deux mutations séparément serait plus rapide, mais ne garantirait pas l'atomicité. Le groupe de mutations est donc le meilleur choix dans cette situation.

Étapes suivantes