Importar, exportar e modificar dados usando o Dataflow

O Dataflow é um serviço gerenciado para transformar e enriquecer dados. Com o conector do Dataflow para o Spanner, é possível ler os dados e gravar dados no Spanner em um pipeline do Dataflow. transformando ou modificando os dados. Também é possível criar pipelines que transferem dados entre o Spanner e outros produtos do Google Cloud.

O conector do Dataflow é o método recomendado para operações mover dados em massa para dentro e para fora do Spanner e realizar grandes transformações em um banco de dados que não são compatíveis com DML particionada; como transferências de tabelas, exclusões em massa que exigem JOIN e assim por diante. Durante o trabalho com bancos de dados individuais, há outros métodos que você pode usar para importar e exportar dados:

  • Use o console do Google Cloud para exportar um banco de dados individual de Spanner para o Cloud Storage no formato Avro.
  • Use o console do Google Cloud para importar um banco de dados de volta para o o Spanner dos arquivos exportados para o Cloud Storage.
  • Use a API REST ou a Google Cloud CLI para executar os comandos export ou import do Spanner para o Cloud Storage e vice-versa (também usando o formato Avro).

O conector do Dataflow para Spanner faz parte do SDK do Apache Beam para Java (em inglês) e fornece uma API para fazer isso ações. Para mais informações sobre alguns dos conceitos discutidos abaixo, como como objetos e transformações PCollection, consulte o guia de programação do Apache Beam.

Adicionar o conector ao seu projeto do Maven

Para adicionar o conector do Google Cloud Dataflow a um projeto do Maven, adicione o artefato beam-sdks-java-io-google-cloud-platform do Maven ao arquivo pom.xml como uma dependência.

Por exemplo, se o arquivo pom.xml definir beam.version como o número de versão apropriado, adicione a seguinte dependência:

<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
    <version>${beam.version}</version>
</dependency>

Ler dados do Spanner

Para ler dados do Spanner, aplique a transformação SpannerIO.read(). Configure a leitura usando os métodos na classe SpannerIO.Read. Aplicar a transformação retorna um PCollection<Struct>, em que cada elemento do conjunto representa uma linha individual retornada pela operação de leitura (links em inglês). É possível ler dados do Spanner com e sem um SQL específico consulta, dependendo do resultado desejado.

Aplicar a transformação SpannerIO.read() retorna uma visualização consistente dos dados devido à execução de uma leitura forte. A menos que você especifique o contrário, um snapshot do resultado da leitura será criado no momento em que você a iniciar. Consulte as leituras para saber mais informações sobre os diferentes tipos de leituras que o Spanner pode realizar.

Ler dados usando uma consulta

Para ler um conjunto específico de dados do Spanner, configure a transformação usando o método SpannerIO.Read.withQuery() para especificar um objeto consulta. Exemplo:

// Query for all the columns and rows in the specified Spanner table
PCollection<Struct> records = pipeline.apply(
    SpannerIO.read()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withQuery("SELECT * FROM " + options.getTable()));

Ler dados sem especificar uma consulta

Para ler um banco de dados sem usar uma consulta, especifique uma tabela nome usando o método SpannerIO.Read.withTable() e especifique um lista de colunas para leitura usando SpannerIO.Read.withColumns() . Exemplo:

GoogleSQL

// Query for all the columns and rows in the specified Spanner table
PCollection<Struct> records = pipeline.apply(
    SpannerIO.read()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withTable("Singers")
        .withColumns("singerId", "firstName", "lastName"));

PostgreSQL

// Query for all the columns and rows in the specified Spanner table
PCollection<Struct> records = pipeline.apply(
    SpannerIO.read()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withTable("singers")
        .withColumns("singer_id", "first_name", "last_name"));

Para limitar as linhas lidas, especifique um conjunto de chaves primárias para leitura usando o SpannerIO.Read.withKeySet().

Você também pode ler uma tabela usando um índice secundário especificado. Assim como acontece com readUsingIndex() chamado API, o índice deve conter todos os dados que aparece nos resultados da consulta.

Para fazer isso, especifique a tabela como mostrado no exemplo anterior e especifique os index que contém os valores de coluna desejados usando o SpannerIO.Read.withIndex(). O índice deve armazenar todos das colunas que a transformação precisa ler. A chave primária da tabela base é armazenadas implicitamente. Por exemplo, para ler a tabela Songs usando o índice SongsBySongName, use o seguinte código:

GoogleSQL

// Read the indexed columns from all rows in the specified index.
PCollection<Struct> records =
    pipeline.apply(
        SpannerIO.read()
            .withInstanceId(instanceId)
            .withDatabaseId(databaseId)
            .withTable("Songs")
            .withIndex("SongsBySongName")
            // Can only read columns that are either indexed, STORED in the index or
            // part of the primary key of the Songs table,
            .withColumns("SingerId", "AlbumId", "TrackId", "SongName"));

PostgreSQL

// // Read the indexed columns from all rows in the specified index.
PCollection<Struct> records =
    pipeline.apply(
        SpannerIO.read()
            .withInstanceId(instanceId)
            .withDatabaseId(databaseId)
            .withTable("Songs")
            .withIndex("SongsBySongName")
            // Can only read columns that are either indexed, STORED in the index or
            // part of the primary key of the songs table,
            .withColumns("singer_id", "album_id", "track_id", "song_name"));

Controlar a desatualização dos dados de transação

Uma transformação será executada em um snapshot consistente de dados. Para controlar a inatividade dos dados, use o método SpannerIO.Read.withTimestampBound() (em inglês). Consulte Transações para mais informações.

Ler de várias tabelas na mesma transação

Se quiser ler dados de várias tabelas ao mesmo tempo para garantir a consistência dos dados, execute todas as leituras em uma única transação. Para isso, aplique uma transformação createTransaction() (em inglês) e crie um objeto PCollectionView<Transaction> que depois gere uma transação. Para transmitir a exibição resultante a uma operação de leitura, use SpannerIO.Read.withTransaction() (em inglês).

GoogleSQL

SpannerConfig spannerConfig =
    SpannerConfig.create().withInstanceId(instanceId).withDatabaseId(databaseId);
PCollectionView<Transaction> tx =
    pipeline.apply(
        SpannerIO.createTransaction()
            .withSpannerConfig(spannerConfig)
            .withTimestampBound(TimestampBound.strong()));
PCollection<Struct> singers =
    pipeline.apply(
        SpannerIO.read()
            .withSpannerConfig(spannerConfig)
            .withQuery("SELECT SingerID, FirstName, LastName FROM Singers")
            .withTransaction(tx));
PCollection<Struct> albums =
    pipeline.apply(
        SpannerIO.read()
            .withSpannerConfig(spannerConfig)
            .withQuery("SELECT SingerId, AlbumId, AlbumTitle FROM Albums")
            .withTransaction(tx));

PostgreSQL

SpannerConfig spannerConfig =
    SpannerConfig.create().withInstanceId(instanceId).withDatabaseId(databaseId);
PCollectionView<Transaction> tx =
    pipeline.apply(
        SpannerIO.createTransaction()
            .withSpannerConfig(spannerConfig)
            .withTimestampBound(TimestampBound.strong()));
PCollection<Struct> singers =
    pipeline.apply(
        SpannerIO.read()
            .withSpannerConfig(spannerConfig)
            .withQuery("SELECT singer_id, first_name, last_name FROM singers")
            .withTransaction(tx));
PCollection<Struct> albums =
    pipeline.apply(
        SpannerIO.read()
            .withSpannerConfig(spannerConfig)
            .withQuery("SELECT singer_id, album_id, album_title FROM albums")
            .withTransaction(tx));

Ler dados de todas as tabelas disponíveis

É possível ler dados de todas as tabelas disponíveis em um banco de dados do Spanner.

GoogleSQL

PCollection<Struct> allRecords =
    pipeline
        .apply(
            SpannerIO.read()
                .withSpannerConfig(spannerConfig)
                .withBatching(false)
                .withQuery(
                    "SELECT t.table_name FROM information_schema.tables AS t WHERE t"
                        + ".table_catalog = '' AND t.table_schema = ''"))
        .apply(
            MapElements.into(TypeDescriptor.of(ReadOperation.class))
                .via(
                    (SerializableFunction<Struct, ReadOperation>)
                        input -> {
                          String tableName = input.getString(0);
                          return ReadOperation.create().withQuery("SELECT * FROM " + tableName);
                        }))
        .apply(SpannerIO.readAll().withSpannerConfig(spannerConfig));

PostgreSQL

PCollection<Struct> allRecords =
    pipeline
        .apply(
            SpannerIO.read()
                .withSpannerConfig(spannerConfig)
                .withBatching(false)
                .withQuery(
                    Statement.newBuilder(
                            "SELECT t.table_name FROM information_schema.tables AS t "
                                + "WHERE t.table_catalog = $1 AND t.table_schema = $2")
                        .bind("p1")
                        .to(spannerConfig.getDatabaseId().get())
                        .bind("p2")
                        .to("public")
                        .build()))
        .apply(
            MapElements.into(TypeDescriptor.of(ReadOperation.class))
                .via(
                    (SerializableFunction<Struct, ReadOperation>)
                        input -> {
                          String tableName = input.getString(0);
                          return ReadOperation.create()
                              .withQuery("SELECT * FROM \"" + tableName + "\"");
                        }))
        .apply(SpannerIO.readAll().withSpannerConfig(spannerConfig));

Resolver problemas de consultas incompatíveis

O conector do Dataflow só oferece suporte a consultas SQL do Spanner em que o primeiro operador no plano de execução da consulta é um objeto Distributed União. Se você tentar ler dados do Spanner usando uma consulta Você receber uma exceção informando que a consulta does not have a DistributedUnion at the root, siga as etapas em Entender como o Spanner é executado. consultas para recuperar um plano de execução para sua consulta usando o console do Google Cloud.

Se sua consulta SQL não for compatível, simplifique-a para uma consulta que tenha um "distributed union" como o primeiro operador no plano de execução. Remova funções de agregação, assim como os operadores DISTINCT, GROUP BY e ORDER, porque eles têm maior probabilidade de impedir a execução da consulta.

Criar mutações para uma gravação

Use o método Mutation newInsertOrUpdateBuilder() em vez do método Método newInsertBuilder() a menos que seja absolutamente necessário para pipelines Java. Para pipelines Python, use SpannerInsertOrUpdate() em vez de SpannerInsert() O Dataflow oferece garante pelo menos uma vez, o que significa que a mutação pode ser escrita várias vezes. Como resultado, somente INSERT mutações podem gerar com.google.cloud.spanner.SpannerException: ALREADY_EXISTS erros que causam a falha do pipeline. Para evitar esse erro, use o INSERT_OR_UPDATE mutação, que adiciona uma nova linha ou atualiza os valores da coluna se a linha já existe. A mutação INSERT_OR_UPDATE pode ser aplicada mais de uma vez.

Gravar no Spanner e transformar dados

É possível gravar dados no Spanner com o usando uma transformação SpannerIO.write() para executar uma coleção de mutações de linha de entrada. Grupos de conectores do Dataflow em lotes para aumentar a eficiência.

Veja no exemplo a seguir como aplicar uma transformação de gravação a um PCollection de mutações:

GoogleSQL

albums
    // Spanner expects a Mutation object, so create it using the Album's data
    .apply("CreateAlbumMutation", ParDo.of(new DoFn<Album, Mutation>() {
      @ProcessElement
      public void processElement(ProcessContext c) {
        Album album = c.element();
        c.output(Mutation.newInsertOrUpdateBuilder("albums")
            .set("singerId").to(album.singerId)
            .set("albumId").to(album.albumId)
            .set("albumTitle").to(album.albumTitle)
            .build());
      }
    }))
    // Write mutations to Spanner
    .apply("WriteAlbums", SpannerIO.write()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId));

PostgreSQL

PCollectionView<Dialect> dialectView =
    pipeline.apply(Create.of(Dialect.POSTGRESQL)).apply(View.asSingleton());
albums
    // Spanner expects a Mutation object, so create it using the Album's data
    .apply("CreateAlbumMutation", ParDo.of(new DoFn<Album, Mutation>() {
      @ProcessElement
      public void processElement(ProcessContext c) {
        Album album = c.element();
        c.output(Mutation.newInsertOrUpdateBuilder("albums")
            .set("singerId").to(album.singerId)
            .set("albumId").to(album.albumId)
            .set("albumTitle").to(album.albumTitle)
            .build());
      }
    }))
    // Write mutations to Spanner
    .apply("WriteAlbums", SpannerIO.write()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withDialectView(dialectView));

Se uma transformação for interrompida inesperadamente antes da conclusão, as mutações que já foram aplicadas não serão revertidas.

Aplicar grupos de mutações atomicamente

É possível usar a classe MutationGroup (em inglês) para garantir que um grupo de mutações seja aplicado atomicamente em conjunto. As mutações em um MutationGroup serão enviadas na mesma transação, mas a transação pode se repetir.

Os grupos de mutações têm melhor desempenho quando são usados para agrupar mutações que afetam os dados armazenados juntos no espaço da chave. Como o Spanner intercala os dados das tabelas mãe e filha na tabela mãe, os dados estão sempre próximos no espaço da chave. Recomendamos que você estruture seu grupo de mutações para que ele contenha uma mutação aplicada a uma tabela primária e outras mutações aplicadas a tabelas secundárias. Ou, então, faça com que todas suas mutações modifiquem dados próximos no espaço da chave. Para mais informações sobre como o Spanner armazena para dados da tabela filha, consulte Esquema e modelo de dados. Se você não organiza seus grupos de mutação em torno das hierarquias de tabela recomendadas ou se os dados que são acessados não estão próximos no espaço da chave, o Spanner pode executar confirmações em duas fases, o que resultará em um desempenho mais lento. Para mais informações, consulte Compensações de localidade.

Para usar MutationGroup, crie uma transformação SpannerIO.write() e chame o método SpannerIO.Write.grouped() (em inglês). Ele retornará uma transformação a ser aplicada a uma PCollection de objetos MutationGroup.

Ao criar um MutationGroup, a primeira mutação listada se torna a primária. Caso seu grupo afete uma tabela mãe e uma filha, a mutação primária precisa estar incluída na tabela mãe. Caso contrário, use qualquer mutação como a primária. O conector do Dataflow usa a mutação primária para determinar os limites de particionamento a fim de agrupar as mutações de maneira eficiente.

Por exemplo, imagine que seu aplicativo monitore o comportamento e sinalize ações problemáticas dos usuários para análise. Para cada comportamento sinalizado, você quer atualizar a tabela Users para bloquear o acesso do usuário ao seu aplicativo e registrar o incidente na tabela PendingReviews. Para garantir que ambas as tabelas sejam atualizadas atomicamente, use um MutationGroup:

GoogleSQL

PCollection<MutationGroup> mutations =
    suspiciousUserIds.apply(
        MapElements.via(
            new SimpleFunction<>() {

              @Override
              public MutationGroup apply(String userId) {
                // Immediately block the user.
                Mutation userMutation =
                    Mutation.newUpdateBuilder("Users")
                        .set("id")
                        .to(userId)
                        .set("state")
                        .to("BLOCKED")
                        .build();
                long generatedId =
                    Hashing.sha1()
                        .newHasher()
                        .putString(userId, Charsets.UTF_8)
                        .putLong(timestamp.getSeconds())
                        .putLong(timestamp.getNanos())
                        .hash()
                        .asLong();

                // Add an entry to pending review requests.
                Mutation pendingReview =
                    Mutation.newInsertOrUpdateBuilder("PendingReviews")
                        .set("id")
                        .to(generatedId) // Must be deterministically generated.
                        .set("userId")
                        .to(userId)
                        .set("action")
                        .to("REVIEW ACCOUNT")
                        .set("note")
                        .to("Suspicious activity detected.")
                        .build();

                return MutationGroup.create(userMutation, pendingReview);
              }
            }));

mutations.apply(SpannerIO.write()
    .withInstanceId(instanceId)
    .withDatabaseId(databaseId)
    .grouped());

PostgreSQL

PCollectionView<Dialect> dialectView =
    pipeline.apply(Create.of(Dialect.POSTGRESQL)).apply(View.asSingleton());
PCollection<MutationGroup> mutations = suspiciousUserIds
    .apply(MapElements.via(new SimpleFunction<String, MutationGroup>() {

      @Override
      public MutationGroup apply(String userId) {
        // Immediately block the user.
        Mutation userMutation = Mutation.newUpdateBuilder("Users")
            .set("id").to(userId)
            .set("state").to("BLOCKED")
            .build();
        long generatedId = Hashing.sha1().newHasher()
            .putString(userId, Charsets.UTF_8)
            .putLong(timestamp.getSeconds())
            .putLong(timestamp.getNanos())
            .hash()
            .asLong();

        // Add an entry to pending review requests.
        Mutation pendingReview = Mutation.newInsertOrUpdateBuilder("PendingReviews")
            .set("id").to(generatedId)  // Must be deterministically generated.
            .set("userId").to(userId)
            .set("action").to("REVIEW ACCOUNT")
            .set("note").to("Suspicious activity detected.")
            .build();

        return MutationGroup.create(userMutation, pendingReview);
      }
    }));

mutations.apply(SpannerIO.write()
    .withInstanceId(instanceId)
    .withDatabaseId(databaseId)
    .withDialectView(dialectView)
    .grouped());

Ao criar um grupo de mutações, a primeira mutação fornecida como um argumento se torna a primária. Nesse caso, as duas tabelas não estão relacionadas. Portanto, não há uma mutação primária clara. Selecionamos userMutation como primária colocando-a primeiro. Aplicar as duas mutações separadamente seria mais rápido, mas não garantiria a atomicidade. Assim, criar um grupo de mutações é a melhor escolha nessa situação.

A seguir