Importar, exportar y modificar datos con Dataflow

En esta página se describe cómo usar el conector Dataflow para Spanner con el fin de importar, exportar y modificar datos en bases de datos de Spanner con dialecto GoogleSQL y con dialecto PostgreSQL.

Dataflow es un servicio gestionado para transformar y enriquecer datos. El conector de Dataflow para Spanner te permite leer y escribir datos en Spanner en una canalización de Dataflow, así como transformar o modificar los datos de forma opcional. También puedes crear pipelines que transfieran datos entre Spanner y otros productos deGoogle Cloud .

El conector de Dataflow es el método recomendado para mover datos de forma masiva a Spanner y desde Spanner de forma eficiente. También es el método recomendado para realizar grandes transformaciones en una base de datos que no son compatibles con DML particionado, como los movimientos de tablas y las eliminaciones en bloque que requieren una JOIN. Cuando trabajas con bases de datos individuales, puedes usar otros métodos para importar y exportar datos:

  • Usa la Google Cloud consola para exportar una base de datos individual de Spanner a Cloud Storage en formato Avro.
  • Usa la Google Cloud consola para importar una base de datos a Spanner desde los archivos que has exportado a Cloud Storage.
  • Usa la API REST o Google Cloud CLI para ejecutar tareas de exportación o importación de Spanner a Cloud Storage y viceversa. También puedes usar el formato Avro.

El conector de Dataflow para Spanner forma parte del SDK de Apache Beam para Java y proporciona una API para realizar las acciones anteriores. Para obtener más información sobre algunos de los conceptos que se explican en esta página, como los objetos PCollection y las transformaciones, consulta la guía de programación de Apache Beam.

Añadir el conector a tu proyecto de Maven

Para añadir el conector Google Cloud Dataflow a un proyecto de Maven, añade el artefacto de Maven beam-sdks-java-io-google-cloud-platform a tu archivo pom.xml como dependencia.

Por ejemplo, si tu archivo pom.xml asigna a beam.version el número de versión adecuado, añadirías la siguiente dependencia:

<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
    <version>${beam.version}</version>
</dependency>

Leer datos de Spanner

Para leer datos de Spanner, aplica la transformación SpannerIO.read. Configura la lectura con los métodos de la clase SpannerIO.Read. Al aplicar la transformación, se devuelve un PCollection<Struct>, donde cada elemento de la colección representa una fila individual devuelta por la operación de lectura. Puedes leer datos de Spanner con o sin una consulta de SQL específica, en función del resultado que necesites.

Al aplicar la transformación SpannerIO.read, se devuelve una vista coherente de los datos realizando una lectura sólida. A menos que especifiques lo contrario, el resultado de la lectura se obtiene en el momento en el que se inicia la tarea. Consulta lecturas para obtener más información sobre los diferentes tipos de lecturas que puede realizar Spanner.

Leer datos mediante una consulta

Para leer un conjunto de datos específico de Spanner, configura la transformación mediante el método SpannerIO.Read.withQuery para especificar una consulta de SQL. Por ejemplo:

// Query for all the columns and rows in the specified Spanner table
PCollection<Struct> records = pipeline.apply(
    SpannerIO.read()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withQuery("SELECT * FROM " + options.getTable()));

Leer datos sin especificar una consulta

Para leer datos de una base de datos sin usar una consulta, puede especificar el nombre de una tabla con el método SpannerIO.Read.withTable y especificar una lista de columnas que se van a leer con el método SpannerIO.Read.withColumns. Por ejemplo:

GoogleSQL

// Query for all the columns and rows in the specified Spanner table
PCollection<Struct> records = pipeline.apply(
    SpannerIO.read()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withTable("Singers")
        .withColumns("singerId", "firstName", "lastName"));

PostgreSQL

// Query for all the columns and rows in the specified Spanner table
PCollection<Struct> records = pipeline.apply(
    SpannerIO.read()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withTable("singers")
        .withColumns("singer_id", "first_name", "last_name"));

Para limitar las filas leídas, puedes especificar un conjunto de claves principales que se leerán mediante el método SpannerIO.Read.withKeySet.

También puedes leer una tabla usando un índice secundario específico. Al igual que con la llamada a la API readUsingIndex, el índice debe contener todos los datos que aparecen en los resultados de la consulta.

Para ello, especifique la tabla como se muestra en el ejemplo anterior y especifique el índice que contiene los valores de columna necesarios mediante el método SpannerIO.Read.withIndex. El índice debe almacenar todas las columnas que la transformación necesite leer. La clave principal de la tabla base se almacena implícitamente. Por ejemplo, para leer la tabla Songs usando el índice SongsBySongName, usa el siguiente código:

GoogleSQL

// Read the indexed columns from all rows in the specified index.
PCollection<Struct> records =
    pipeline.apply(
        SpannerIO.read()
            .withInstanceId(instanceId)
            .withDatabaseId(databaseId)
            .withTable("Songs")
            .withIndex("SongsBySongName")
            // Can only read columns that are either indexed, STORED in the index or
            // part of the primary key of the Songs table,
            .withColumns("SingerId", "AlbumId", "TrackId", "SongName"));

PostgreSQL

// // Read the indexed columns from all rows in the specified index.
PCollection<Struct> records =
    pipeline.apply(
        SpannerIO.read()
            .withInstanceId(instanceId)
            .withDatabaseId(databaseId)
            .withTable("Songs")
            .withIndex("SongsBySongName")
            // Can only read columns that are either indexed, STORED in the index or
            // part of the primary key of the songs table,
            .withColumns("singer_id", "album_id", "track_id", "song_name"));

Controlar la antigüedad de los datos de transacciones

Se garantiza que las transformaciones se ejecutarán en una captura de datos uniforme. Para controlar la obsolescencia de los datos, usa el método SpannerIO.Read.withTimestampBound. Consulta más información sobre las transacciones.

Leer datos de varias tablas en la misma transacción

Si deseas leer datos de varias tablas a la vez para asegurarte de que son uniformes, puedes realizar todas las lecturas en una sola transacción. Para ello, aplica una transformación createTransaction, lo que crea un objeto PCollectionView<Transaction> que, a su vez, crea una transacción. La vista resultante se puede transferir a una operación de lectura mediante SpannerIO.Read.withTransaction.

GoogleSQL

SpannerConfig spannerConfig =
    SpannerConfig.create().withInstanceId(instanceId).withDatabaseId(databaseId);
PCollectionView<Transaction> tx =
    pipeline.apply(
        SpannerIO.createTransaction()
            .withSpannerConfig(spannerConfig)
            .withTimestampBound(TimestampBound.strong()));
PCollection<Struct> singers =
    pipeline.apply(
        SpannerIO.read()
            .withSpannerConfig(spannerConfig)
            .withQuery("SELECT SingerID, FirstName, LastName FROM Singers")
            .withTransaction(tx));
PCollection<Struct> albums =
    pipeline.apply(
        SpannerIO.read()
            .withSpannerConfig(spannerConfig)
            .withQuery("SELECT SingerId, AlbumId, AlbumTitle FROM Albums")
            .withTransaction(tx));

PostgreSQL

SpannerConfig spannerConfig =
    SpannerConfig.create().withInstanceId(instanceId).withDatabaseId(databaseId);
PCollectionView<Transaction> tx =
    pipeline.apply(
        SpannerIO.createTransaction()
            .withSpannerConfig(spannerConfig)
            .withTimestampBound(TimestampBound.strong()));
PCollection<Struct> singers =
    pipeline.apply(
        SpannerIO.read()
            .withSpannerConfig(spannerConfig)
            .withQuery("SELECT singer_id, first_name, last_name FROM singers")
            .withTransaction(tx));
PCollection<Struct> albums =
    pipeline.apply(
        SpannerIO.read()
            .withSpannerConfig(spannerConfig)
            .withQuery("SELECT singer_id, album_id, album_title FROM albums")
            .withTransaction(tx));

Leer datos de todas las tablas disponibles

Puedes leer datos de todas las tablas disponibles en una base de datos de Spanner.

GoogleSQL

PCollection<Struct> allRecords =
    pipeline
        .apply(
            SpannerIO.read()
                .withSpannerConfig(spannerConfig)
                .withBatching(false)
                .withQuery(
                    "SELECT t.table_name FROM information_schema.tables AS t WHERE t"
                        + ".table_catalog = '' AND t.table_schema = ''"))
        .apply(
            MapElements.into(TypeDescriptor.of(ReadOperation.class))
                .via(
                    (SerializableFunction<Struct, ReadOperation>)
                        input -> {
                          String tableName = input.getString(0);
                          return ReadOperation.create().withQuery("SELECT * FROM " + tableName);
                        }))
        .apply(SpannerIO.readAll().withSpannerConfig(spannerConfig));

PostgreSQL

PCollection<Struct> allRecords =
    pipeline
        .apply(
            SpannerIO.read()
                .withSpannerConfig(spannerConfig)
                .withBatching(false)
                .withQuery(
                    Statement.newBuilder(
                            "SELECT t.table_name FROM information_schema.tables AS t "
                                + "WHERE t.table_catalog = $1 AND t.table_schema = $2")
                        .bind("p1")
                        .to(spannerConfig.getDatabaseId().get())
                        .bind("p2")
                        .to("public")
                        .build()))
        .apply(
            MapElements.into(TypeDescriptor.of(ReadOperation.class))
                .via(
                    (SerializableFunction<Struct, ReadOperation>)
                        input -> {
                          String tableName = input.getString(0);
                          return ReadOperation.create()
                              .withQuery("SELECT * FROM \"" + tableName + "\"");
                        }))
        .apply(SpannerIO.readAll().withSpannerConfig(spannerConfig));

Solucionar problemas con consultas no admitidas

El conector de Dataflow solo admite consultas de Spanner SQL en las que el primer operador del plan de ejecución de la consulta es DistributedUnion. Si intentas leer datos de Spanner mediante una consulta y recibes una excepción que indica que la consulta does not have a DistributedUnion at the root, sigue los pasos que se indican en el artículo Entender cómo ejecuta Spanner las consultas para obtener un plan de ejecución de tu consulta mediante la consola Google Cloud .

Si la consulta SQL no es compatible, transfórmala en una consulta más sencilla que tenga una unión distribuida similar al primer operador del plan de ejecución de la consulta. Elimina las funciones de agregado, las combinaciones de tablas y los operadores DISTINCT, GROUP BY y ORDER, ya que son los que tienen más probabilidades de impedir que funcione la consulta.

Crear mutaciones para una escritura

Usa el método newInsertOrUpdateBuilder de la clase Mutation en lugar del método newInsertBuilder, a menos que sea absolutamente necesario para las canalizaciones de Java. En las canalizaciones de Python, usa SpannerInsertOrUpdate en lugar de SpannerInsert. Dataflow ofrece garantías de al menos una vez, lo que significa que la mutación se puede escribir varias veces. Por lo tanto, solo las mutaciones de INSERT pueden generar errores de com.google.cloud.spanner.SpannerException: ALREADY_EXISTS que provoquen que la canalización falle. Para evitar este error, usa la INSERT_OR_UPDATE mutación, que añade una fila o actualiza los valores de las columnas si la fila ya existe. La mutación INSERT_OR_UPDATE se puede aplicar más de una vez.

Escribir en Spanner y transformar datos

Puede escribir datos en Spanner con el conector de Dataflow mediante una transformación SpannerIO.write para ejecutar una colección de mutaciones de filas de entrada. El conector de Dataflow agrupa las mutaciones en lotes para mejorar la eficiencia.

En el siguiente ejemplo se muestra cómo aplicar una transformación de escritura a un PCollection de mutaciones:

GoogleSQL

albums
    // Spanner expects a Mutation object, so create it using the Album's data
    .apply("CreateAlbumMutation", ParDo.of(new DoFn<Album, Mutation>() {
      @ProcessElement
      public void processElement(ProcessContext c) {
        Album album = c.element();
        c.output(Mutation.newInsertOrUpdateBuilder("albums")
            .set("singerId").to(album.singerId)
            .set("albumId").to(album.albumId)
            .set("albumTitle").to(album.albumTitle)
            .build());
      }
    }))
    // Write mutations to Spanner
    .apply("WriteAlbums", SpannerIO.write()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId));

PostgreSQL

PCollectionView<Dialect> dialectView =
    pipeline.apply(Create.of(Dialect.POSTGRESQL)).apply(View.asSingleton());
albums
    // Spanner expects a Mutation object, so create it using the Album's data
    .apply("CreateAlbumMutation", ParDo.of(new DoFn<Album, Mutation>() {
      @ProcessElement
      public void processElement(ProcessContext c) {
        Album album = c.element();
        c.output(Mutation.newInsertOrUpdateBuilder("albums")
            .set("singerId").to(album.singerId)
            .set("albumId").to(album.albumId)
            .set("albumTitle").to(album.albumTitle)
            .build());
      }
    }))
    // Write mutations to Spanner
    .apply("WriteAlbums", SpannerIO.write()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withDialectView(dialectView));

Si una transformación se detiene inesperadamente antes de completarse, las mutaciones que ya se hayan aplicado no se revertirán.

Aplicar grupos de mutaciones de forma atómica

Puedes usar la clase MutationGroup para asegurarte de que un grupo de mutaciones se aplique de forma atómica. Las mutaciones de un MutationGroup se envían en la misma transacción, pero es posible que se vuelva a intentar.

Los grupos de mutación funcionan mejor cuando se usan para agrupar mutaciones que afectan a los datos que se almacenan muy cerca unos de otros en el espacio de claves. Como Spanner entrelaza los datos de la tabla principal y la secundaria en la tabla principal, esos datos siempre están cerca en el espacio de claves. Te recomendamos que estructures tu grupo de mutaciones de forma que contenga una mutación que se aplique a una tabla principal y mutaciones adicionales que se apliquen a tablas secundarias, o bien de forma que todas sus mutaciones modifiquen datos que estén cerca en el espacio de claves. Para obtener más información sobre cómo almacena Spanner los datos de las tablas principales y secundarias, consulta Esquema y modelo de datos. Si no organizas tus grupos de mutación en torno a las jerarquías de tablas recomendadas o si los datos a los que se accede no están cerca en el espacio de claves, es posible que Spanner tenga que realizar confirmaciones en dos fases, lo que ralentizará el rendimiento. Para obtener más información, consulta Compensaciones de localidad.

Para usar MutationGroup, crea una transformación SpannerIO.write y llama al método SpannerIO.Write.grouped, que devuelve una transformación que puedes aplicar a un PCollection de objetos MutationGroup.

Cuando se crea un MutationGroup, la primera mutación de la lista se convierte en la mutación principal. Si el grupo de mutación afecta tanto a una tabla principal como a una tabla secundaria, la mutación principal debe ser una mutación de la tabla principal. De lo contrario, puedes usar cualquier mutación como la principal. El conector de Dataflow usa la mutación principal para determinar los límites de las particiones y, de esta forma, agrupar las mutaciones de forma eficiente.

Por ejemplo, imagina que tu aplicación realiza una supervisión y señala el comportamiento problemático del usuario para que se revise. Por cada comportamiento marcado, debes actualizar la tabla Users para bloquear el acceso del usuario a tu aplicación y registrar el incidente en la tabla PendingReviews. Para asegurarte de que ambas tablas se actualizan de forma atómica, usa un MutationGroup:

GoogleSQL

PCollection<MutationGroup> mutations =
    suspiciousUserIds.apply(
        MapElements.via(
            new SimpleFunction<>() {

              @Override
              public MutationGroup apply(String userId) {
                // Immediately block the user.
                Mutation userMutation =
                    Mutation.newUpdateBuilder("Users")
                        .set("id")
                        .to(userId)
                        .set("state")
                        .to("BLOCKED")
                        .build();
                long generatedId =
                    Hashing.sha1()
                        .newHasher()
                        .putString(userId, Charsets.UTF_8)
                        .putLong(timestamp.getSeconds())
                        .putLong(timestamp.getNanos())
                        .hash()
                        .asLong();

                // Add an entry to pending review requests.
                Mutation pendingReview =
                    Mutation.newInsertOrUpdateBuilder("PendingReviews")
                        .set("id")
                        .to(generatedId) // Must be deterministically generated.
                        .set("userId")
                        .to(userId)
                        .set("action")
                        .to("REVIEW ACCOUNT")
                        .set("note")
                        .to("Suspicious activity detected.")
                        .build();

                return MutationGroup.create(userMutation, pendingReview);
              }
            }));

mutations.apply(SpannerIO.write()
    .withInstanceId(instanceId)
    .withDatabaseId(databaseId)
    .grouped());

PostgreSQL

PCollectionView<Dialect> dialectView =
    pipeline.apply(Create.of(Dialect.POSTGRESQL)).apply(View.asSingleton());
PCollection<MutationGroup> mutations = suspiciousUserIds
    .apply(MapElements.via(new SimpleFunction<String, MutationGroup>() {

      @Override
      public MutationGroup apply(String userId) {
        // Immediately block the user.
        Mutation userMutation = Mutation.newUpdateBuilder("Users")
            .set("id").to(userId)
            .set("state").to("BLOCKED")
            .build();
        long generatedId = Hashing.sha1().newHasher()
            .putString(userId, Charsets.UTF_8)
            .putLong(timestamp.getSeconds())
            .putLong(timestamp.getNanos())
            .hash()
            .asLong();

        // Add an entry to pending review requests.
        Mutation pendingReview = Mutation.newInsertOrUpdateBuilder("PendingReviews")
            .set("id").to(generatedId)  // Must be deterministically generated.
            .set("userId").to(userId)
            .set("action").to("REVIEW ACCOUNT")
            .set("note").to("Suspicious activity detected.")
            .build();

        return MutationGroup.create(userMutation, pendingReview);
      }
    }));

mutations.apply(SpannerIO.write()
    .withInstanceId(instanceId)
    .withDatabaseId(databaseId)
    .withDialectView(dialectView)
    .grouped());

Al crear un grupo de mutaciones, la primera mutación usada como argumento se convierte en la mutación principal. En este caso, las dos tablas no están relacionadas, por lo que no hay una mutación principal clara. Hemos seleccionado userMutation como principal colocándolo en primer lugar. Aplicar las dos mutaciones por separado sería más rápido, pero no garantizaría la atomicidad, por lo que el grupo de mutación es la mejor opción en esta situación.

Siguientes pasos