Usar el conector de Cloud Dataflow

Cloud Dataflow es un servicio administrado con el fin de transformar y enriquecer datos. El conector de Cloud Dataflow para Cloud Spanner permite leer y escribir datos en Cloud Spanner mediante un flujo de procesamiento de Cloud Dataflow; de forma opcional, se pueden transformar o modificar los datos. También puedes crear canalizaciones que transfieran datos entre Cloud Spanner y otros productos de Google Cloud Platform.

El conector Cloud Dataflow es el método recomendado para mover datos entrantes y salientes de Cloud Spanner de forma masiva.

El conector de Cloud Dataflow para Cloud Spanner forma parte del SDK de Apache Beam Java y proporciona una API para realizar las acciones indicadas anteriormente. Consulta la guía de programación de Apache Beam para obtener más información sobre algunos de los conceptos que se tratan a continuación, como las transformaciones y los objetos PCollection.

Añadir el conector al proyecto Maven

Si quieres añadir el conector de GCP Cloud Dataflow a un proyecto Maven, añade el artefacto Maven al archivo pom.xml como dependencia:

<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
    <version>2.4.0</version>
</dependency>

Leer datos de Cloud Spanner

Si quieres realizar lecturas en Cloud Spanner, aplica la transformación SpannerIO.read. Puedes configurar la lectura mediante los métodos de la clase SpannerIO.Read. Al aplicar la transformación, se obtiene como resultado un valor PCollection<Struct>, en el que cada elemento de la colección representa una fila individual devuelta por la operación de lectura. Puedes realizar lecturas en Cloud Spanner tanto si cuentas con una consulta SQL específica como si no dispones de ninguna en función del resultado que busques.

La aplicación de la transformación SpannerIO.read() presenta como resultado una vista uniforme de los datos, ya que realiza una lectura a fondo. A menos que especifiques lo contrario, el resultado de la lectura se obtiene en el momento en el que se inicia la tarea. Consulta el apartado de lecturas para obtener más información sobre los diferentes tipos de lecturas que Cloud Spanner puede realizar.

Leer datos mediante una consulta

Si quieres leer un conjunto específico de datos de Cloud Spanner, configura la transformación mediante el método SpannerIO.Read.withQuery() para especificar una consulta SQL. Por ejemplo:

// Query for all the columns and rows in the specified Spanner table
PCollection<Struct> records = p.apply(
    SpannerIO.read()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withQuery("SELECT * FROM " + options.getTable()));

Leer datos sin especificar una consulta

Si quieres realizar una lectura en una base de datos sin utilizar una consulta, puedes especificar un nombre de tabla y una lista de columnas; también puedes leer datos mediante un índice. Si deseas leer las columnas seleccionadas, especifica un nombre de tabla y una lista de columnas cuando crees la transformación mediante SpannerIO.read(). Por ejemplo:

// Query for all the columns and rows in the specified Spanner table
PCollection<Struct> records = p.apply(
    SpannerIO.read()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withTable("Singers")
        .withColumns("singerId", "firstName", "lastName"));

También puedes leer la tabla mediante un conjunto específico de claves como valores de índice. Si deseas hacerlo, crea la lectura mediante un índice que contenga los valores de clave deseados con el método SpannerIO.Read.withIndex().

Controlar la falta de actualización de los datos de transacción

Se garantiza que las transformaciones se ejecutarán en una captura de datos uniforme. Si quieres controlar la falta de actualización de los datos, puedes usar el método SpannerIO.Read.withTimestampBound(). Consulta el apartado sobre transacciones para obtener más información.

Leer varias tablas en la misma transacción

Si deseas leer datos de varias tablas a la vez para asegurarte de que son uniformes, puedes realizar todas las lecturas en una sola transacción. Puedes hacerlo al aplicar una transformación createTransaction(), lo que crea un objeto PCollectionView<Transaction> que, a su vez, crea una transacción. La vista resultante se puede convertir en una operación de lectura mediante SpannerIO.Read.withTransaction().

SpannerConfig spannerConfig = SpannerConfig.create()
    .withInstanceId(instanceId)
    .withDatabaseId(databaseId);
PCollectionView<Transaction> tx = p.apply(
    SpannerIO.createTransaction()
        .withSpannerConfig(spannerConfig)
        .withTimestampBound(TimestampBound.strong()));
PCollection<Struct> singers = p.apply(SpannerIO.read()
    .withSpannerConfig(spannerConfig)
    .withQuery("SELECT SingerID, FirstName, LastName FROM Singers")
    .withTransaction(tx));
PCollection<Struct> albums = p.apply(SpannerIO.read().withSpannerConfig(spannerConfig)
    .withQuery("SELECT SingerId, AlbumId, AlbumTitle FROM Albums")
    .withTransaction(tx));

Leer datos de todas las tablas disponibles

Puedes leer datos de todas las tablas disponibles en una base de datos de Cloud Spanner:

PCollection<Struct> allRecords = p.apply(SpannerIO.read()
    .withSpannerConfig(spannerConfig)
    .withQuery("SELECT t.table_name FROM information_schema.tables AS t WHERE t"
        + ".table_catalog = '' AND t.table_schema = ''")).apply(
    MapElements.into(TypeDescriptor.of(ReadOperation.class))
        .via((SerializableFunction<Struct, ReadOperation>) input -> {
          String tableName = input.getString(0);
          return ReadOperation.create().withQuery("SELECT * FROM " + tableName);
        })).apply(SpannerIO.readAll().withSpannerConfig(spannerConfig));

Solucionar problemas de consultas no compatibles

El conector de Cloud Dataflow solo es compatible con las consultas SQL de Cloud Spanner, en las que el primer operador del plan de ejecución de consultas es una unión distribuida. Si intentas leer datos de Cloud Spanner mediante una consulta y aparece una excepción que indica esta información sobre la ruta (does not have a DistributedUnion at the root), sigue los pasos del apartado sobre cómo Cloud Spanner ejecuta las consultas para recuperar un plan de ejecución de consultas mediante la consola de GCP.

Si la consulta SQL no es compatible, transfórmala en una consulta más sencilla que tenga una unión distribuida similar al primer operador del plan de ejecución de la consulta. Puedes eliminar funciones añadidas, así como los operadores DISTINCT, GROUP BY y ORDER, ya que son los operadores con más probabilidad de impedir que la consulta funcione.

Crear mutaciones para escrituras

Debes usar el método Mutation de clase newInsertOrUpdateBuilder() en lugar del método newInsertBuilder() a menos que resulte estrictamente necesario. Cloud Dataflow proporciona una garantía at-least-once (por lo menos una vez), lo que significa que puede que la mutación se escriba varias veces. Por este motivo, puede que las mutaciones de inserción generen errores que provoquen fallos en el flujo de procesamiento. Si quieres evitar estos errores, crea mutaciones de inserción o actualización, que se pueden aplicar más de una vez.

Escribir en Cloud Spanner y transformar datos

Puedes escribir datos en Cloud Spanner con el conector de Cloud Dataflow mediante una transformación SpannerIO.write() con el fin de ejecutar una colección de mutaciones de fila de entrada. El conector de Cloud Dataflow agrupa mutaciones en lotes para obtener una mayor eficacia.

Mediante el siguiente ejemplo se muestra cómo aplicar una transformación de escritura a una colección de mutaciones PCollection:

albums
    // Spanner expects a Mutation object, so create it using the Album's data
    .apply("CreateAlbumMutation", ParDo.of(new DoFn<Album, Mutation>() {
      @ProcessElement
      public void processElement(ProcessContext c) {
        Album album = c.element();
        c.output(Mutation.newInsertOrUpdateBuilder("albums")
            .set("singerId").to(album.singerId)
            .set("albumId").to(album.albumId)
            .set("albumTitle").to(album.albumTitle)
            .build());
      }
    }))
    // Write mutations to Spanner
    .apply("WriteAlbums", SpannerIO.write()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId));

Si una transformación se detiene inesperadamente antes de completarse, las mutaciones que ya se han aplicado no se revertirán.

Si importas datos a una nueva tabla Cloud Spanner, puedes conseguir un rendimiento de escritura con una mayor rapidez al eliminar los índices antes de realizar la carga masiva de datos. Restaura los índices cuando finalice la carga.

Aplicar los grupos de mutaciones de forma atómica

Puedes usar la clase MutationGroup para asegurarte de que un grupo de mutaciones se aplica de forma atómica. Se garantiza que las mutaciones de un MutationGroup se enviarán en la misma transacción, pero puede que haga falta volver a intentar realizarla.

Los grupos de mutación funcionan mejor cuando se usan para agrupar mutaciones que afectan a los datos que se almacenan muy cerca unos de otros en el espacio de claves. Debido a que Cloud Spanner intercala los datos de tablas principales y secundarias en la tabla principal, estos datos siempre están muy cerca unos de otros en el espacio de claves. Recomendamos que organices el grupo de mutaciones para que contenga una que se aplique a una tabla principal y otras que se apliquen a las tablas secundarias, o para que todas las mutaciones modifiquen los datos que están muy cerca unos de otros en el espacio de claves. Si quieres obtener más información acerca de cómo Cloud Spanner almacena datos de tablas principales y secundarias, consulta el apartado sobre el esquema y el modelo de datos. Si no organizas los grupos de mutación en torno a las jerarquías de tabla recomendadas, o si los datos a los que se accede no están muy cerca unos de otros en el espacio de claves, Cloud Spanner podría necesitar realizar confirmaciones en dos fases, lo que dará como resultado un rendimiento más lento. Si deseas obtener más información, consulta el apartado sobre intercambios de localidad.

Si quieres usar MutationGroup, crea una transformación SpannerIO.write() y llama al método SpannerIO.Write.grouped(), que dará como resultado una transformación que luego podrás aplicar a una colección PCollection de objetos MutationGroup.

Al crear un grupo MutationGroup, la primera mutación enumerada se convierte en la mutación principal. Si el grupo de mutación afecta tanto a una tabla principal como a una tabla secundaria, la mutación principal debe ser una mutación de la tabla principal. De lo contrario, puedes usar cualquier mutación como la principal. La mutación principal le sirve al conector de Cloud Dataflow para determinar los límites de partición con el fin de combinar las mutaciones por lotes de manera eficaz.

Por ejemplo, imagina que tu aplicación realiza una supervisión y señala el comportamiento problemático del usuario para que se revise. En el caso de cada comportamiento marcado, debes actualizar la tabla Users para bloquear el acceso del usuario a la aplicación y registrar el incidente en la tabla PendingReviews. Si quieres asegurarte de que ambas tablas se actualizan atómicamente, usa un MutationGroup:

PCollection<MutationGroup> mutations = suspiciousUserIds
    .apply(MapElements.via(new SimpleFunction<String, MutationGroup>() {

      @Override
      public MutationGroup apply(String userId) {
        // Immediately block the user.
        Mutation userMutation = Mutation.newUpdateBuilder("Users")
            .set("id").to(userId)
            .set("state").to("BLOCKED")
            .build();
        long generatedId = Hashing.sha1().newHasher()
            .putString(userId, Charsets.UTF_8)
            .putLong(timestamp.getSeconds())
            .putLong(timestamp.getNanos())
            .hash()
            .asLong();

        // Add an entry to pending review requests.
        Mutation pendingReview = Mutation.newInsertOrUpdateBuilder("PendingReviews")
            .set("id").to(generatedId)  // Must be deterministically generated.
            .set("userId").to(userId)
            .set("action").to("REVIEW ACCOUNT")
            .set("note").to("Suspicious activity detected.")
            .build();

        return MutationGroup.create(userMutation, pendingReview);
      }
    }));

mutations.apply(SpannerIO.write()
    .withInstanceId(instanceId)
    .withDatabaseId(databaseId)
    .grouped());

Al crear un grupo de mutaciones, la primera mutación usada como argumento se convierte en la mutación principal. En este caso, las dos tablas no están relacionadas, por lo que no hay una mutación principal clara. Hemos seleccionado userMutation como principal al colocarla primero. Aplicar las dos mutaciones por separado sería más rápido, pero no garantizaría la atomicidad, por lo que el grupo de mutación es la mejor opción en esta situación.

Siguientes pasos

Más información sobre cómo diseñar un flujo de procesamiento de datos de Apache Beam

¿Te ha resultado útil esta página? Enviar comentarios:

Enviar comentarios sobre...

Cloud Spanner Documentation