Importar, exportar e modificar dados usando o Dataflow

O Dataflow é um serviço gerenciado para transformar e enriquecer dados. Com o conector do Dataflow para o Spanner, é possível ler os dados e gravar dados no Spanner em um pipeline do Dataflow. transformando ou modificando os dados. Também é possível criar pipelines que transferem dados entre o Spanner e outros produtos do Google Cloud.

O conector do Dataflow é o método recomendado para operações mover dados em massa para dentro e para fora do Spanner e realizar grandes transformações em um banco de dados que não são compatíveis com DML particionada; como transferências de tabelas, exclusões em massa que exigem JOIN e assim por diante. Durante o trabalho com bancos de dados individuais, há outros métodos que você pode usar para importar e exportar dados:

  • Use o console do Google Cloud para exportar um banco de dados individual de Spanner para o Cloud Storage no formato Avro.
  • Use o console do Google Cloud para importar um banco de dados de volta para o o Spanner dos arquivos exportados para o Cloud Storage.
  • Use a API REST ou a Google Cloud CLI para executar os comandos export ou import do Spanner para o Cloud Storage e vice-versa (também usando o formato Avro).

O conector do Dataflow para Spanner faz parte do SDK do Apache Beam para Java (em inglês) e fornece uma API para fazer isso ações. Para mais informações sobre alguns dos conceitos discutidos abaixo, como como objetos e transformações PCollection, consulte o guia de programação do Apache Beam.

Adicionar o conector ao seu projeto do Maven

Para adicionar o conector do Google Cloud Dataflow a um projeto do Maven, adicione o artefato beam-sdks-java-io-google-cloud-platform do Maven ao arquivo pom.xml como uma dependência.

Por exemplo, se o arquivo pom.xml definir beam.version como o número de versão apropriado, adicione a seguinte dependência:

<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
    <version>${beam.version}</version>
</dependency>

Ler dados do Spanner

Para ler dados do Spanner, aplique a transformação SpannerIO.read(). Configure a leitura usando os métodos na classe SpannerIO.Read. Aplicar a transformação retorna um PCollection<Struct>, em que cada elemento do conjunto representa uma linha individual retornada pela operação de leitura (links em inglês). É possível ler dados do Spanner com e sem um SQL específico consulta, dependendo do resultado desejado.

Aplicar a transformação SpannerIO.read() retorna uma visualização consistente dos dados devido à execução de uma leitura forte. A menos que você especifique o contrário, um snapshot do resultado da leitura será criado no momento em que você a iniciar. Consulte as leituras para saber mais informações sobre os diferentes tipos de leituras que o Spanner pode executar.

Ler dados usando uma consulta

Para ler um conjunto específico de dados do Spanner, configure a transformação usando o método SpannerIO.Read.withQuery() para especificar um objeto consulta. Exemplo:

// Query for all the columns and rows in the specified Spanner table
PCollection<Struct> records = pipeline.apply(
    SpannerIO.read()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withQuery("SELECT * FROM " + options.getTable()));

Ler dados sem especificar uma consulta

Para ler um banco de dados sem usar uma consulta, especifique uma tabela nome usando o método SpannerIO.Read.withTable() e especifique um lista de colunas para leitura usando SpannerIO.Read.withColumns() . Exemplo:

GoogleSQL

// Query for all the columns and rows in the specified Spanner table
PCollection<Struct> records = pipeline.apply(
    SpannerIO.read()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withTable("Singers")
        .withColumns("singerId", "firstName", "lastName"));

PostgreSQL

// Query for all the columns and rows in the specified Spanner table
PCollection<Struct> records = pipeline.apply(
    SpannerIO.read()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withTable("singers")
        .withColumns("singer_id", "first_name", "last_name"));

Para limitar as linhas lidas, especifique um conjunto de chaves primárias para leitura usando o SpannerIO.Read.withKeySet().

Você também pode ler uma tabela usando um índice secundário especificado. Assim como acontece com o chamada da API readUsingIndex(), o índice deve conter todos os dados que aparece nos resultados da consulta.

Para fazer isso, especifique a tabela como mostrado no exemplo anterior e especifique os index que contém os valores de coluna desejados usando o SpannerIO.Read.withIndex(). O índice deve armazenar todos das colunas que a transformação precisa ler. A chave primária da tabela base é armazenadas implicitamente. Por exemplo, para ler a tabela Songs usando o índice SongsBySongName, use o seguinte código:

GoogleSQL

// Read the indexed columns from all rows in the specified index.
PCollection<Struct> records =
    pipeline.apply(
        SpannerIO.read()
            .withInstanceId(instanceId)
            .withDatabaseId(databaseId)
            .withTable("Songs")
            .withIndex("SongsBySongName")
            // Can only read columns that are either indexed, STORED in the index or
            // part of the primary key of the Songs table,
            .withColumns("SingerId", "AlbumId", "TrackId", "SongName"));

PostgreSQL

// // Read the indexed columns from all rows in the specified index.
PCollection<Struct> records =
    pipeline.apply(
        SpannerIO.read()
            .withInstanceId(instanceId)
            .withDatabaseId(databaseId)
            .withTable("Songs")
            .withIndex("SongsBySongName")
            // Can only read columns that are either indexed, STORED in the index or
            // part of the primary key of the songs table,
            .withColumns("singer_id", "album_id", "track_id", "song_name"));

Controlar a desatualização dos dados de transação

Uma transformação será executada em um snapshot consistente de dados. Para controlar a inatividade dos dados, use o método SpannerIO.Read.withTimestampBound() (em inglês). Consulte Transações para mais informações.

Ler de várias tabelas na mesma transação

Se quiser ler dados de várias tabelas ao mesmo tempo para garantir a consistência dos dados, execute todas as leituras em uma única transação. Para isso, aplique uma transformação createTransaction() (em inglês) e crie um objeto PCollectionView<Transaction> que depois gere uma transação. Para transmitir a exibição resultante a uma operação de leitura, use SpannerIO.Read.withTransaction() (em inglês).

GoogleSQL

SpannerConfig spannerConfig =
    SpannerConfig.create().withInstanceId(instanceId).withDatabaseId(databaseId);
PCollectionView<Transaction> tx =
    pipeline.apply(
        SpannerIO.createTransaction()
            .withSpannerConfig(spannerConfig)
            .withTimestampBound(TimestampBound.strong()));
PCollection<Struct> singers =
    pipeline.apply(
        SpannerIO.read()
            .withSpannerConfig(spannerConfig)
            .withQuery("SELECT SingerID, FirstName, LastName FROM Singers")
            .withTransaction(tx));
PCollection<Struct> albums =
    pipeline.apply(
        SpannerIO.read()
            .withSpannerConfig(spannerConfig)
            .withQuery("SELECT SingerId, AlbumId, AlbumTitle FROM Albums")
            .withTransaction(tx));

PostgreSQL

SpannerConfig spannerConfig =
    SpannerConfig.create().withInstanceId(instanceId).withDatabaseId(databaseId);
PCollectionView<Transaction> tx =
    pipeline.apply(
        SpannerIO.createTransaction()
            .withSpannerConfig(spannerConfig)
            .withTimestampBound(TimestampBound.strong()));
PCollection<Struct> singers =
    pipeline.apply(
        SpannerIO.read()
            .withSpannerConfig(spannerConfig)
            .withQuery("SELECT singer_id, first_name, last_name FROM singers")
            .withTransaction(tx));
PCollection<Struct> albums =
    pipeline.apply(
        SpannerIO.read()
            .withSpannerConfig(spannerConfig)
            .withQuery("SELECT singer_id, album_id, album_title FROM albums")
            .withTransaction(tx));

Ler dados de todas as tabelas disponíveis

É possível ler dados de todas as tabelas disponíveis em um banco de dados do Spanner.

GoogleSQL

PCollection<Struct> allRecords =
    pipeline
        .apply(
            SpannerIO.read()
                .withSpannerConfig(spannerConfig)
                .withBatching(false)
                .withQuery(
                    "SELECT t.table_name FROM information_schema.tables AS t WHERE t"
                        + ".table_catalog = '' AND t.table_schema = ''"))
        .apply(
            MapElements.into(TypeDescriptor.of(ReadOperation.class))
                .via(
                    (SerializableFunction<Struct, ReadOperation>)
                        input -> {
                          String tableName = input.getString(0);
                          return ReadOperation.create().withQuery("SELECT * FROM " + tableName);
                        }))
        .apply(SpannerIO.readAll().withSpannerConfig(spannerConfig));

PostgreSQL

PCollection<Struct> allRecords =
    pipeline
        .apply(
            SpannerIO.read()
                .withSpannerConfig(spannerConfig)
                .withBatching(false)
                .withQuery(
                    Statement.newBuilder(
                            "SELECT t.table_name FROM information_schema.tables AS t "
                                + "WHERE t.table_catalog = $1 AND t.table_schema = $2")
                        .bind("p1")
                        .to(spannerConfig.getDatabaseId().get())
                        .bind("p2")
                        .to("public")
                        .build()))
        .apply(
            MapElements.into(TypeDescriptor.of(ReadOperation.class))
                .via(
                    (SerializableFunction<Struct, ReadOperation>)
                        input -> {
                          String tableName = input.getString(0);
                          return ReadOperation.create()
                              .withQuery("SELECT * FROM \"" + tableName + "\"");
                        }))
        .apply(SpannerIO.readAll().withSpannerConfig(spannerConfig));

Resolver problemas de consultas sem suporte

O conector do Dataflow só oferece suporte a consultas SQL do Spanner em que o primeiro operador no plano de execução da consulta é um objeto Distributed União. Se você tentar ler dados do Spanner usando uma consulta Você receber uma exceção informando que a consulta does not have a DistributedUnion at the root, siga as etapas em Entender como o Spanner é executado. consultas para recuperar um plano de execução para sua consulta usando o console do Google Cloud.

Se sua consulta SQL não for compatível, simplifique-a para uma consulta que tenha um "distributed union" como o primeiro operador no plano de execução. Remova funções de agregação, assim como os operadores DISTINCT, GROUP BY e ORDER, porque eles têm maior probabilidade de impedir a execução da consulta.

Criar mutações para uma gravação

Use o método Mutation newInsertOrUpdateBuilder() em vez do método Método newInsertBuilder() a menos que seja absolutamente necessário para pipelines Java. Para pipelines Python, use SpannerInsertOrUpdate() em vez de SpannerInsert(). O Dataflow oferece garante pelo menos uma vez, o que significa que a mutação pode ser escrita várias vezes. Como resultado, somente INSERT mutações podem gerar com.google.cloud.spanner.SpannerException: ALREADY_EXISTS erros que causam que o pipeline falhe. Para evitar esse erro, use o INSERT_OR_UPDATE mutação, que adiciona uma nova linha ou atualiza os valores da coluna se a linha já existe. A mutação INSERT_OR_UPDATE pode ser aplicada mais de uma vez.

Gravar no Spanner e transformar dados

É possível gravar dados no Spanner com o usando uma transformação SpannerIO.write() para executar uma coleção de mutações de linha de entrada. Grupos de conectores do Dataflow em lotes para aumentar a eficiência.

Veja no exemplo a seguir como aplicar uma transformação de gravação a um PCollection de mutações:

GoogleSQL

albums
    // Spanner expects a Mutation object, so create it using the Album's data
    .apply("CreateAlbumMutation", ParDo.of(new DoFn<Album, Mutation>() {
      @ProcessElement
      public void processElement(ProcessContext c) {
        Album album = c.element();
        c.output(Mutation.newInsertOrUpdateBuilder("albums")
            .set("singerId").to(album.singerId)
            .set("albumId").to(album.albumId)
            .set("albumTitle").to(album.albumTitle)
            .build());
      }
    }))
    // Write mutations to Spanner
    .apply("WriteAlbums", SpannerIO.write()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId));

PostgreSQL

PCollectionView<Dialect> dialectView =
    pipeline.apply(Create.of(Dialect.POSTGRESQL)).apply(View.asSingleton());
albums
    // Spanner expects a Mutation object, so create it using the Album's data
    .apply("CreateAlbumMutation", ParDo.of(new DoFn<Album, Mutation>() {
      @ProcessElement
      public void processElement(ProcessContext c) {
        Album album = c.element();
        c.output(Mutation.newInsertOrUpdateBuilder("albums")
            .set("singerId").to(album.singerId)
            .set("albumId").to(album.albumId)
            .set("albumTitle").to(album.albumTitle)
            .build());
      }
    }))
    // Write mutations to Spanner
    .apply("WriteAlbums", SpannerIO.write()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withDialectView(dialectView));

Se uma transformação for interrompida inesperadamente antes da conclusão, as mutações que já foram aplicadas não serão revertidas.

Aplicar grupos de mutações atomicamente

É possível usar a classe MutationGroup (em inglês) para garantir que um grupo de mutações seja aplicado atomicamente em conjunto. As mutações em um MutationGroup serão enviadas na mesma transação, mas a transação pode se repetir.

Os grupos de mutações têm melhor desempenho quando são usados para agrupar mutações que afetam os dados armazenados juntos no espaço da chave. Como o Spanner intercala os dados das tabelas mãe e filha na tabela mãe, os dados estão sempre próximos no espaço da chave. Recomendamos que você estruture seu grupo de mutações para que ele contenha uma mutação aplicada a uma tabela primária e outras mutações aplicadas a tabelas secundárias. Ou, então, faça com que todas suas mutações modifiquem dados próximos no espaço da chave. Para mais informações sobre como o Spanner armazena para dados da tabela filha, consulte Esquema e modelo de dados. Se você não organiza seus grupos de mutação em torno das hierarquias de tabela recomendadas ou se os dados que o acesso não está próximo no espaço da chave, o Spanner pode executar confirmações em duas fases, o que resultará em um desempenho mais lento. Para mais informações, consulte Compensações de localidade.

Para usar MutationGroup, crie uma transformação SpannerIO.write() e chame o método SpannerIO.Write.grouped() (em inglês). Ele retornará uma transformação a ser aplicada a uma PCollection de objetos MutationGroup.

Ao criar um MutationGroup, a primeira mutação listada se torna a primária. Caso seu grupo afete uma tabela mãe e uma filha, a mutação primária precisa estar incluída na tabela mãe. Caso contrário, use qualquer mutação como a primária. O conector do Dataflow usa a mutação primária para determinar os limites de particionamento a fim de agrupar as mutações de maneira eficiente.

Por exemplo, imagine que seu aplicativo monitore o comportamento e sinalize ações problemáticas dos usuários para análise. Para cada comportamento sinalizado, você quer atualizar a tabela Users para bloquear o acesso do usuário ao seu aplicativo e registrar o incidente na tabela PendingReviews. Para garantir que ambas as tabelas sejam atualizadas atomicamente, use um MutationGroup:

GoogleSQL

PCollection<MutationGroup> mutations =
    suspiciousUserIds.apply(
        MapElements.via(
            new SimpleFunction<>() {

              @Override
              public MutationGroup apply(String userId) {
                // Immediately block the user.
                Mutation userMutation =
                    Mutation.newUpdateBuilder("Users")
                        .set("id")
                        .to(userId)
                        .set("state")
                        .to("BLOCKED")
                        .build();
                long generatedId =
                    Hashing.sha1()
                        .newHasher()
                        .putString(userId, Charsets.UTF_8)
                        .putLong(timestamp.getSeconds())
                        .putLong(timestamp.getNanos())
                        .hash()
                        .asLong();

                // Add an entry to pending review requests.
                Mutation pendingReview =
                    Mutation.newInsertOrUpdateBuilder("PendingReviews")
                        .set("id")
                        .to(generatedId) // Must be deterministically generated.
                        .set("userId")
                        .to(userId)
                        .set("action")
                        .to("REVIEW ACCOUNT")
                        .set("note")
                        .to("Suspicious activity detected.")
                        .build();

                return MutationGroup.create(userMutation, pendingReview);
              }
            }));

mutations.apply(SpannerIO.write()
    .withInstanceId(instanceId)
    .withDatabaseId(databaseId)
    .grouped());

PostgreSQL

PCollectionView<Dialect> dialectView =
    pipeline.apply(Create.of(Dialect.POSTGRESQL)).apply(View.asSingleton());
PCollection<MutationGroup> mutations = suspiciousUserIds
    .apply(MapElements.via(new SimpleFunction<String, MutationGroup>() {

      @Override
      public MutationGroup apply(String userId) {
        // Immediately block the user.
        Mutation userMutation = Mutation.newUpdateBuilder("Users")
            .set("id").to(userId)
            .set("state").to("BLOCKED")
            .build();
        long generatedId = Hashing.sha1().newHasher()
            .putString(userId, Charsets.UTF_8)
            .putLong(timestamp.getSeconds())
            .putLong(timestamp.getNanos())
            .hash()
            .asLong();

        // Add an entry to pending review requests.
        Mutation pendingReview = Mutation.newInsertOrUpdateBuilder("PendingReviews")
            .set("id").to(generatedId)  // Must be deterministically generated.
            .set("userId").to(userId)
            .set("action").to("REVIEW ACCOUNT")
            .set("note").to("Suspicious activity detected.")
            .build();

        return MutationGroup.create(userMutation, pendingReview);
      }
    }));

mutations.apply(SpannerIO.write()
    .withInstanceId(instanceId)
    .withDatabaseId(databaseId)
    .withDialectView(dialectView)
    .grouped());

Ao criar um grupo de mutações, a primeira mutação fornecida como um argumento se torna a primária. Nesse caso, as duas tabelas não estão relacionadas. Portanto, não há uma mutação primária clara. Selecionamos userMutation como primária colocando-a primeiro. Aplicar as duas mutações separadamente seria mais rápido, mas não garantiria a atomicidade. Assim, criar um grupo de mutações é a melhor escolha nessa situação.

A seguir