Importa, exporta y modifica datos con Dataflow

Dataflow es un servicio administrado para transformar y enriquecer datos. El conector de Dataflow para Spanner te permite leer datos escribirlos en Spanner en una canalización de Dataflow y y, opcionalmente, transformar o modificar los datos. También puedes crear canalizaciones que transfieren datos entre Spanner y otros productos de Google Cloud.

El conector de Dataflow es el método recomendado para mover datos hacia y desde Spanner de forma masiva, y para Transformaciones en una base de datos que no son compatibles con DML particionado como traslados de tablas, eliminaciones masivas que requieren JOIN, etcétera. En el trabajo con bases de datos individuales, hay otros métodos que puedes usar para importar y exportar datos:

  • Usa la consola de Google Cloud para exportar una base de datos individual de Spanner a Cloud Storage en formato Avro.
  • Usa la consola de Google Cloud para volver a importar una base de datos a Spanner de los archivos que exportaste a Cloud Storage.
  • Usa la API de REST o Google Cloud CLI para ejecutar export o de importación de Spanner a Cloud Storage y viceversa (también con el formato Avro).

El conector de Dataflow para Spanner forma parte del SDK de Java de Apache Beam, que proporciona una API para realizar lo anterior acciones. Para obtener más información sobre algunos de los conceptos que se analizan a continuación, como como objetos de PCollection y transformaciones, consulta la guía de programación de Apache Beam.

Agrega el conector a tu proyecto de Maven

Para agregar el conector de Google Cloud Dataflow a un proyecto de Maven, agrega el artefacto de Maven beam-sdks-java-io-google-cloud-platform a tu archivo pom.xml como una dependencia.

Por ejemplo, si suponemos que tu archivo pom.xml establece beam.version en el número de versión correspondiente, debes agregar la siguiente dependencia:

<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
    <version>${beam.version}</version>
</dependency>

Lee datos de Spanner

Para leer desde Spanner, aplica la transformación SpannerIO.read(). Configura la lectura con los métodos de la clase SpannerIO.Read. Cuando aplicas la transformación, se muestra un PCollection<Struct>, donde cada elemento de la colección representa una fila individual que muestra la operación de lectura. Puedes leer desde Spanner con y sin un SQL específico para cada consulta, según el resultado que desees.

Cuando aplicas la transformación SpannerIO.read(), se muestra una lectura coherente de los datos mediante una lectura sólida. A menos que especifiques lo contrario, se toma una instantánea del resultado de la lectura en el momento en que comenzó la lectura. Consulta las lecturas para obtener más información. sobre los diferentes tipos de operaciones de lectura que puede realizar Spanner.

Lee datos con una consulta

Para leer un conjunto específico de datos de Spanner, configura la transformación con el método SpannerIO.Read.withQuery() para especificar un SQL para cada búsqueda. Por ejemplo:

// Query for all the columns and rows in the specified Spanner table
PCollection<Struct> records = pipeline.apply(
    SpannerIO.read()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withQuery("SELECT * FROM " + options.getTable()));

Lee datos sin especificar una consulta

Para leer desde una base de datos sin usar una consulta, puedes especificar una tabla con el método SpannerIO.Read.withTable() y especifica una lista de columnas para leer con SpannerIO.Read.withColumns() . Por ejemplo:

GoogleSQL

// Query for all the columns and rows in the specified Spanner table
PCollection<Struct> records = pipeline.apply(
    SpannerIO.read()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withTable("Singers")
        .withColumns("singerId", "firstName", "lastName"));

PostgreSQL

// Query for all the columns and rows in the specified Spanner table
PCollection<Struct> records = pipeline.apply(
    SpannerIO.read()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withTable("singers")
        .withColumns("singer_id", "first_name", "last_name"));

Para limitar las filas leídas, puedes especificar un conjunto de claves primarias para leer usando el SpannerIO.Read.withKeySet().

También puedes leer una tabla con un índice secundario especificado. Al igual que con readUsingIndex() API, el índice debe contener todos los datos que en los resultados de la consulta.

Para ello, especifica la tabla como se muestra en el ejemplo anterior y especifica la índice que contenga los valores de columna deseados con el SpannerIO.Read.withIndex(). El índice debe almacenar todas las columnas que la transformación debe leer. La clave primaria de la tabla base es se almacenan implícitamente. Por ejemplo, para leer la tabla Songs con el índice SongsBySongName, usas el siguiente código:

GoogleSQL

// Read the indexed columns from all rows in the specified index.
PCollection<Struct> records =
    pipeline.apply(
        SpannerIO.read()
            .withInstanceId(instanceId)
            .withDatabaseId(databaseId)
            .withTable("Songs")
            .withIndex("SongsBySongName")
            // Can only read columns that are either indexed, STORED in the index or
            // part of the primary key of the Songs table,
            .withColumns("SingerId", "AlbumId", "TrackId", "SongName"));

PostgreSQL

// // Read the indexed columns from all rows in the specified index.
PCollection<Struct> records =
    pipeline.apply(
        SpannerIO.read()
            .withInstanceId(instanceId)
            .withDatabaseId(databaseId)
            .withTable("Songs")
            .withIndex("SongsBySongName")
            // Can only read columns that are either indexed, STORED in the index or
            // part of the primary key of the songs table,
            .withColumns("singer_id", "album_id", "track_id", "song_name"));

Controla la obsolescencia de los datos de transacciones

Se garantiza que una transformación se ejecutará en una instantánea de datos coherente. Para controlar la obsolescencia de los datos, usa el método SpannerIO.Read.withTimestampBound(). Consulta transacciones para obtener más información.

Lee desde varias tablas en la misma transacción

Si deseas leer datos de varias tablas en el mismo momento para garantizar la coherencia de los datos, realiza todas las lecturas en una sola transacción. Para ello, aplica una transformación createTransaction() y crea un objeto PCollectionView<Transaction> que luego cree una transacción. La vista resultante se puede pasar a una operación de lectura con SpannerIO.Read.withTransaction().

GoogleSQL

SpannerConfig spannerConfig =
    SpannerConfig.create().withInstanceId(instanceId).withDatabaseId(databaseId);
PCollectionView<Transaction> tx =
    pipeline.apply(
        SpannerIO.createTransaction()
            .withSpannerConfig(spannerConfig)
            .withTimestampBound(TimestampBound.strong()));
PCollection<Struct> singers =
    pipeline.apply(
        SpannerIO.read()
            .withSpannerConfig(spannerConfig)
            .withQuery("SELECT SingerID, FirstName, LastName FROM Singers")
            .withTransaction(tx));
PCollection<Struct> albums =
    pipeline.apply(
        SpannerIO.read()
            .withSpannerConfig(spannerConfig)
            .withQuery("SELECT SingerId, AlbumId, AlbumTitle FROM Albums")
            .withTransaction(tx));

PostgreSQL

SpannerConfig spannerConfig =
    SpannerConfig.create().withInstanceId(instanceId).withDatabaseId(databaseId);
PCollectionView<Transaction> tx =
    pipeline.apply(
        SpannerIO.createTransaction()
            .withSpannerConfig(spannerConfig)
            .withTimestampBound(TimestampBound.strong()));
PCollection<Struct> singers =
    pipeline.apply(
        SpannerIO.read()
            .withSpannerConfig(spannerConfig)
            .withQuery("SELECT singer_id, first_name, last_name FROM singers")
            .withTransaction(tx));
PCollection<Struct> albums =
    pipeline.apply(
        SpannerIO.read()
            .withSpannerConfig(spannerConfig)
            .withQuery("SELECT singer_id, album_id, album_title FROM albums")
            .withTransaction(tx));

Leer datos de todas las tablas disponibles

Puedes leer datos de todas las tablas disponibles en una base de datos de Spanner.

GoogleSQL

PCollection<Struct> allRecords =
    pipeline
        .apply(
            SpannerIO.read()
                .withSpannerConfig(spannerConfig)
                .withBatching(false)
                .withQuery(
                    "SELECT t.table_name FROM information_schema.tables AS t WHERE t"
                        + ".table_catalog = '' AND t.table_schema = ''"))
        .apply(
            MapElements.into(TypeDescriptor.of(ReadOperation.class))
                .via(
                    (SerializableFunction<Struct, ReadOperation>)
                        input -> {
                          String tableName = input.getString(0);
                          return ReadOperation.create().withQuery("SELECT * FROM " + tableName);
                        }))
        .apply(SpannerIO.readAll().withSpannerConfig(spannerConfig));

PostgreSQL

PCollection<Struct> allRecords =
    pipeline
        .apply(
            SpannerIO.read()
                .withSpannerConfig(spannerConfig)
                .withBatching(false)
                .withQuery(
                    Statement.newBuilder(
                            "SELECT t.table_name FROM information_schema.tables AS t "
                                + "WHERE t.table_catalog = $1 AND t.table_schema = $2")
                        .bind("p1")
                        .to(spannerConfig.getDatabaseId().get())
                        .bind("p2")
                        .to("public")
                        .build()))
        .apply(
            MapElements.into(TypeDescriptor.of(ReadOperation.class))
                .via(
                    (SerializableFunction<Struct, ReadOperation>)
                        input -> {
                          String tableName = input.getString(0);
                          return ReadOperation.create()
                              .withQuery("SELECT * FROM \"" + tableName + "\"");
                        }))
        .apply(SpannerIO.readAll().withSpannerConfig(spannerConfig));

Soluciona problemas de consultas no admitidas

El conector de Dataflow solo admite consultas en SQL de Spanner donde el primer operador en el plan de ejecución de consultas es un operador Distribuido Unión. Si intentas leer datos de Spanner usando una consulta y recibirás una excepción que indica que la consulta does not have a DistributedUnion at the root, sigue los pasos de la sección Comprende cómo se ejecuta Spanner consultas para recuperar un plan de ejecución para tu consulta usando el Consola de Google Cloud

Si tu consulta de SQL no es compatible, simplifícala a una consulta que tenga una unión distribuida como el primer operador en el plan de ejecución de consultas. Quita funciones agregadas, así como los operadores DISTINCT, GROUP BY y ORDER, ya que son los operadores que tienen más probabilidades de impedir que la consulta funcione.

Crea mutaciones para una escritura

Usa la clase Mutation newInsertOrUpdateBuilder() en lugar del método Método newInsertBuilder() a menos que sea absolutamente necesario para las canalizaciones de Java. Para las canalizaciones de Python, usa SpannerInsertOrUpdate() en lugar de SpannerInsert(). Dataflow proporciona garantías al menos una vez, lo que significa que la mutación podría escribirse varias veces. Como resultado, solo INSERT de mutaciones podrían generar com.google.cloud.spanner.SpannerException: ALREADY_EXISTS errores que causan que la canalización falle. Para evitar este error, usa el INSERT_OR_UPDATE mutación, que agrega una nueva fila o actualiza los valores de la columna si la fila ya existe. La mutación INSERT_OR_UPDATE se puede aplicar más de una vez.

Escribe en Spanner y transforma datos

Puedes escribir datos en Spanner con la con una transformación SpannerIO.write() para ejecutar una de mutaciones de las filas de entrada. Los grupos de conectores de Dataflow mutaciones en lotes para lograr una mayor eficiencia.

En el siguiente ejemplo, se muestra cómo aplicar una transformación de escritura a una PCollection de mutaciones:

GoogleSQL

albums
    // Spanner expects a Mutation object, so create it using the Album's data
    .apply("CreateAlbumMutation", ParDo.of(new DoFn<Album, Mutation>() {
      @ProcessElement
      public void processElement(ProcessContext c) {
        Album album = c.element();
        c.output(Mutation.newInsertOrUpdateBuilder("albums")
            .set("singerId").to(album.singerId)
            .set("albumId").to(album.albumId)
            .set("albumTitle").to(album.albumTitle)
            .build());
      }
    }))
    // Write mutations to Spanner
    .apply("WriteAlbums", SpannerIO.write()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId));

PostgreSQL

PCollectionView<Dialect> dialectView =
    pipeline.apply(Create.of(Dialect.POSTGRESQL)).apply(View.asSingleton());
albums
    // Spanner expects a Mutation object, so create it using the Album's data
    .apply("CreateAlbumMutation", ParDo.of(new DoFn<Album, Mutation>() {
      @ProcessElement
      public void processElement(ProcessContext c) {
        Album album = c.element();
        c.output(Mutation.newInsertOrUpdateBuilder("albums")
            .set("singerId").to(album.singerId)
            .set("albumId").to(album.albumId)
            .set("albumTitle").to(album.albumTitle)
            .build());
      }
    }))
    // Write mutations to Spanner
    .apply("WriteAlbums", SpannerIO.write()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withDialectView(dialectView));

Si una transformación se detiene inesperadamente antes de completarse, las mutaciones que ya no se revertirán.

Aplicar grupos de mutaciones de forma atómica

Puedes usar la clase MutationGroup para asegurarte de que un grupo de mutaciones se apliquen de forma atómica. Se garantiza que las mutaciones en un MutationGroup se enviarán en la misma transacción, pero se puede volver a intentar la transacción.

Los grupos de mutación tienen mejor rendimiento cuando se usan para agrupar mutaciones que afectan a los datos almacenados en un espacio clave. Debido a que Spanner entre los datos de la tabla superior y la secundaria en la tabla superior, los datos siempre esté cerca en el espacio clave. Te recomendamos estructurar tu grupo de mutaciones para que contenga una mutación que se aplique a una tabla superior y mutaciones adicionales que se apliquen a tablas secundarias, o para que todas sus mutaciones modifiquen datos que estén cerca en el espacio clave. Para obtener más información sobre cómo Spanner almacena Datos de la tabla secundaria, consulta Esquema y modelo de datos. Si no organizas tus grupos de mutación en las jerarquías de tablas recomendadas o si los datos a las que se accede no está muy cerca en el espacio clave, Spanner podría deben realizar confirmaciones en dos fases, lo que ralentizará el rendimiento. Para obtener más información, consulta Compensaciones de la localidad.

Para usar MutationGroup, compila una transformación SpannerIO.write() y llama al método SpannerIO.Write.grouped(), que muestra una transformación que luego puedes aplicar a una PCollection de objetos MutationGroup.

Cuando creas un MutationGroup, la primera mutación de la lista se convierte en la principal. Si tu grupo de mutaciones afecta tanto a una tabla superior como a una secundaria, la mutación principal debe ser una mutación a la tabla superior. De lo contrario, puedes usar cualquier mutación como la mutación principal. El conector de Dataflow usa la mutación principal para determinar los límites de partición a fin de agrupar eficazmente las mutaciones.

Por ejemplo, imagina que tu aplicación supervisa el comportamiento y marca el comportamiento del usuario problemático para su revisión. Para cada comportamiento marcado, deseas actualizar la tabla Users con el fin de bloquear el acceso del usuario a su aplicación, y también debes registrar el incidente en la tabla PendingReviews. Para asegurarte de que ambas tablas se actualicen de manera atómica, usa un MutationGroup:

GoogleSQL

PCollection<MutationGroup> mutations =
    suspiciousUserIds.apply(
        MapElements.via(
            new SimpleFunction<>() {

              @Override
              public MutationGroup apply(String userId) {
                // Immediately block the user.
                Mutation userMutation =
                    Mutation.newUpdateBuilder("Users")
                        .set("id")
                        .to(userId)
                        .set("state")
                        .to("BLOCKED")
                        .build();
                long generatedId =
                    Hashing.sha1()
                        .newHasher()
                        .putString(userId, Charsets.UTF_8)
                        .putLong(timestamp.getSeconds())
                        .putLong(timestamp.getNanos())
                        .hash()
                        .asLong();

                // Add an entry to pending review requests.
                Mutation pendingReview =
                    Mutation.newInsertOrUpdateBuilder("PendingReviews")
                        .set("id")
                        .to(generatedId) // Must be deterministically generated.
                        .set("userId")
                        .to(userId)
                        .set("action")
                        .to("REVIEW ACCOUNT")
                        .set("note")
                        .to("Suspicious activity detected.")
                        .build();

                return MutationGroup.create(userMutation, pendingReview);
              }
            }));

mutations.apply(SpannerIO.write()
    .withInstanceId(instanceId)
    .withDatabaseId(databaseId)
    .grouped());

PostgreSQL

PCollectionView<Dialect> dialectView =
    pipeline.apply(Create.of(Dialect.POSTGRESQL)).apply(View.asSingleton());
PCollection<MutationGroup> mutations = suspiciousUserIds
    .apply(MapElements.via(new SimpleFunction<String, MutationGroup>() {

      @Override
      public MutationGroup apply(String userId) {
        // Immediately block the user.
        Mutation userMutation = Mutation.newUpdateBuilder("Users")
            .set("id").to(userId)
            .set("state").to("BLOCKED")
            .build();
        long generatedId = Hashing.sha1().newHasher()
            .putString(userId, Charsets.UTF_8)
            .putLong(timestamp.getSeconds())
            .putLong(timestamp.getNanos())
            .hash()
            .asLong();

        // Add an entry to pending review requests.
        Mutation pendingReview = Mutation.newInsertOrUpdateBuilder("PendingReviews")
            .set("id").to(generatedId)  // Must be deterministically generated.
            .set("userId").to(userId)
            .set("action").to("REVIEW ACCOUNT")
            .set("note").to("Suspicious activity detected.")
            .build();

        return MutationGroup.create(userMutation, pendingReview);
      }
    }));

mutations.apply(SpannerIO.write()
    .withInstanceId(instanceId)
    .withDatabaseId(databaseId)
    .withDialectView(dialectView)
    .grouped());

Cuando creas un grupo de mutaciones, la primera mutación proporcionada como argumento se convierte en la mutación principal. En este caso, las dos tablas no están relacionadas, por lo que no hay una mutación principal clara. Seleccionamos userMutation como principal porque la colocamos primero. Aplicar las dos mutaciones por separado sería más rápido, pero no garantizaría la atomicidad, por lo que el grupo de mutación es la mejor opción en esta situación.

¿Qué sigue?