Como usar o conector do Cloud Dataflow

O Cloud Dataflow é um serviço gerenciado para transformar e enriquecer dados. O conector do Cloud Dataflow para Cloud Spanner permite que você leia e grave dados no Cloud Spanner e a partir dele em um canal do Cloud Dataflow. Além disso, é possível transformar ou modificar esses dados. Você também pode criar canais que transferem dados entre o Cloud Spanner e outros produtos do Google Cloud Platform.

O conector do Cloud Dataflow é o método recomendado para migrar dados em massa de modo eficiente para dentro e fora do Cloud Spanner. Ao trabalhar com bancos de dados individuais, há outros métodos que podem ser usados para importar e exportar dados:

  • Use o Console do GCP para exportar um banco de dados individual do Cloud Spanner para o Cloud Storage no formato Avro (em inglês).
  • Use o Console do GCP para importar um banco de dados de volta para o Cloud Spanner a partir dos arquivos exportados para o Cloud Storage.
  • Use a API REST ou a ferramenta de linha de comando gcloud para executar jobs de exportação e importação do Cloud Spanner para o Cloud Storage e de volta, também usando o formato Avro.

O conector do Cloud Dataflow para Cloud Spanner faz parte do SDK do Apache Beam para Java e oferece uma API para realizar as ações mencionadas acima. Consulte o guia de programação do Apache Beam (em inglês) para mais informações sobre alguns dos conceitos discutidos abaixo, como objetos e transformações PCollection.

Como adicionar o conector ao projeto do Maven

Para adicionar o conector do Cloud Dataflow do GCP a um projeto Maven, adicione o artefato Maven beam-sdks-java-io-google-cloud-platform ao arquivo pom.xml como uma dependência.

Por exemplo, se o arquivo pom.xml definir beam.version como o número de versão apropriado, a seguinte dependência será incluída:

<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
    <version>${beam.version}</version>
</dependency>

Como ler dados do Cloud Spanner

Para ler dados do Cloud Spanner, aplique a transformação SpannerIO.read(). Configure a leitura usando os métodos na classe SpannerIO.Read. Aplicar a transformação retorna um PCollection<Struct>, em que cada elemento do conjunto representa uma linha individual retornada pela operação de leitura. Leia a partir do Cloud Spanner com e sem uma consulta SQL específica, dependendo da saída desejada.

Aplicar a transformação SpannerIO.read() retorna uma visualização consistente dos dados devido à execução de uma leitura forte. A menos que você especifique o contrário, um instantâneo do resultado da leitura é criado no momento em que você a inicia. Consulte Leituras para mais informações sobre os diferentes tipos de leituras que o Cloud Spanner pode executar.

Como ler dados usando uma consulta

Para ler um conjunto específico de dados do Cloud Spanner, configure a transformação usando o método SpannerIO.Read.withQuery() para especificar uma consulta SQL. Por exemplo:

// Query for all the columns and rows in the specified Spanner table
PCollection<Struct> records = p.apply(
    SpannerIO.read()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withQuery("SELECT * FROM " + options.getTable()));

Como ler dados sem especificar uma consulta

Para ler um banco de dados sem usar uma consulta, especifique um nome de tabela e uma lista de colunas ou leia com um índice. Para ler as colunas selecionadas, especifique um nome de tabela e uma lista de colunas ao criar a transformação. Para isso, use SpannerIO.read(). Por exemplo:

// Query for all the columns and rows in the specified Spanner table
PCollection<Struct> records = p.apply(
    SpannerIO.read()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withTable("Singers")
        .withColumns("singerId", "firstName", "lastName"));

Também é possível ler a partir da tabela usando um conjunto específico de chaves como valores de índice. Para isso, crie a leitura com um índice que contenha os valores de chave escolhidos usando o método SpannerIO.Read.withIndex().

Como controlar a inatividade dos dados de transação

Uma transformação será executada em um instantâneo consistente de dados. Para controlar a inatividade deles, use o método SpannerIO.Read.withTimestampBound(). Clique neste link sobre transações para mais informações.

Como ler dados de várias tabelas na mesma transação

Se quiser ler dados de várias tabelas ao mesmo tempo para garantir a consistência dos dados, execute todas as leituras em uma única transação. Para isso, aplique uma transformação createTransaction() e crie um objeto PCollectionView<Transaction> que depois gera uma transação. Para transmitir a exibição resultante a uma operação de leitura, use SpannerIO.Read.withTransaction().

SpannerConfig spannerConfig = SpannerConfig.create()
    .withInstanceId(instanceId)
    .withDatabaseId(databaseId);
PCollectionView<Transaction> tx = p.apply(
    SpannerIO.createTransaction()
        .withSpannerConfig(spannerConfig)
        .withTimestampBound(TimestampBound.strong()));
PCollection<Struct> singers = p.apply(SpannerIO.read()
    .withSpannerConfig(spannerConfig)
    .withQuery("SELECT SingerID, FirstName, LastName FROM Singers")
    .withTransaction(tx));
PCollection<Struct> albums = p.apply(SpannerIO.read().withSpannerConfig(spannerConfig)
    .withQuery("SELECT SingerId, AlbumId, AlbumTitle FROM Albums")
    .withTransaction(tx));

Como ler dados de todas as tabelas disponíveis

Leia dados de todas as tabelas disponíveis em um banco de dados do Cloud Spanner:

PCollection<Struct> allRecords = p.apply(SpannerIO.read()
    .withSpannerConfig(spannerConfig)
    .withBatching(false)
    .withQuery("SELECT t.table_name FROM information_schema.tables AS t WHERE t"
        + ".table_catalog = '' AND t.table_schema = ''")).apply(
    MapElements.into(TypeDescriptor.of(ReadOperation.class))
        .via((SerializableFunction<Struct, ReadOperation>) input -> {
          String tableName = input.getString(0);
          return ReadOperation.create().withQuery("SELECT * FROM " + tableName);
        })).apply(SpannerIO.readAll().withSpannerConfig(spannerConfig));

Solução de problemas de consultas incompatíveis

O conector do Cloud Dataflow só é compatível com consultas SQL do Cloud Spanner em que o primeiro operador no plano de execução da consulta é um Distributed Union. Se tentar ler os dados do Cloud Spanner com uma consulta e receber uma exceção informando que a consulta does not have a DistributedUnion at the root, siga as etapas em Noções básicas sobre como o Cloud Spanner executa consultas para recuperar um plano de execução da consulta usando o Console do GCP.

Se sua consulta SQL não for compatível, simplifique-a para uma consulta que tenha um "distributed union" como o primeiro operador no plano de execução. Remova as funções agregadas, assim como os operadores DISTINCT, GROUP BY e ORDER, porque eles têm maior probabilidade de impedir a execução da consulta.

Como criar mutações para uma gravação

Use o método Mutation da classe newInsertOrUpdateBuilder() em vez do método newInsertBuilder(), a menos que seja absolutamente necessário. O Cloud Dataflow garante pelo menos uma gravação, o que significa que a mutação provavelmente será gravada várias vezes. Como resultado, é provável que as mutações inseridas gerem erros que causam falha no canal. Para evitar esses erros, crie mutações de inserção ou atualização, que podem ser aplicadas mais de uma vez.

Como gravar no Cloud Spanner e transformar dados

É possível gravar dados no Cloud Spanner com o conector do Cloud Dataflow usando uma transformação SpannerIO.write() para executar um conjunto de mutações de linha de entrada. O conector do Cloud Dataflow agrupa as mutações em lotes para maior eficiência.

Veja no exemplo a seguir como aplicar uma transformação de gravação a um PCollection de mutações:

albums
    // Spanner expects a Mutation object, so create it using the Album's data
    .apply("CreateAlbumMutation", ParDo.of(new DoFn<Album, Mutation>() {
      @ProcessElement
      public void processElement(ProcessContext c) {
        Album album = c.element();
        c.output(Mutation.newInsertOrUpdateBuilder("albums")
            .set("singerId").to(album.singerId)
            .set("albumId").to(album.albumId)
            .set("albumTitle").to(album.albumTitle)
            .build());
      }
    }))
    // Write mutations to Spanner
    .apply("WriteAlbums", SpannerIO.write()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId));

Se uma transformação for interrompida inesperadamente antes da conclusão, as mutações que já foram aplicadas não serão revertidas.

Como aplicar grupos de mutações atomicamente

É possível usar a classe MutationGroup para garantir que um grupo de mutações seja aplicado atomicamente em conjunto. As mutações em um MutationGroup serão enviadas na mesma transação, mas a transação pode se repetir.

Os grupos de mutações têm melhor desempenho quando são usados para agrupar mutações que afetam os dados armazenados juntos no espaço da chave. Como o Cloud Spanner intercala dados das tabelas primária e secundária na tabela primária, esses dados ficam sempre juntos no espaço da chave. Recomendamos que você estruture seu grupo de mutações para que ele contenha uma mutação aplicada a uma tabela primária e outras mutações aplicadas a tabelas secundárias. Ou, então, faça com que todas suas mutações modifiquem dados próximos no espaço da chave. Para mais informações sobre como o Cloud Spanner armazena dados das tabelas primária e secundária, consulte Esquema e modelo de dados. Se você não organizar seus grupos de mutação de acordo com as hierarquias de tabela recomendadas ou se os dados acessados não estiverem próximos no espaço da chave, talvez o Cloud Spanner precise executar efetivações de duas fases, o que resultará em um desempenho mais lento. Para mais informações, consulte Compensações de localidade.

Para usar o MutationGroup, crie uma transformação SpannerIO.write() e chame o método SpannerIO.Write.grouped(). Ele retornará uma transformação a ser aplicada a uma PCollection de objetos MutationGroup.

Ao criar um MutationGroup, a primeira mutação listada se torna a primária. Caso seu grupo afete uma tabela primária e uma secundária, a mutação primária precisa estar incluída na tabela primária. Caso contrário, use qualquer mutação como a primária. O conector do Cloud Dataflow usa a mutação primária para determinar os limites de particionamento a fim de agrupar as mutações de maneira eficiente.

Por exemplo, imagine que seu aplicativo monitore o comportamento e sinalize ações problemáticas dos usuários para análise. Para cada comportamento sinalizado, você quer atualizar a tabela Users a fim de bloquear o acesso do usuário ao seu aplicativo e registrar o incidente em PendingReviews. Para garantir que ambas as tabelas sejam atualizadas atomicamente, use um MutationGroup:

PCollection<MutationGroup> mutations = suspiciousUserIds
    .apply(MapElements.via(new SimpleFunction<String, MutationGroup>() {

      @Override
      public MutationGroup apply(String userId) {
        // Immediately block the user.
        Mutation userMutation = Mutation.newUpdateBuilder("Users")
            .set("id").to(userId)
            .set("state").to("BLOCKED")
            .build();
        long generatedId = Hashing.sha1().newHasher()
            .putString(userId, Charsets.UTF_8)
            .putLong(timestamp.getSeconds())
            .putLong(timestamp.getNanos())
            .hash()
            .asLong();

        // Add an entry to pending review requests.
        Mutation pendingReview = Mutation.newInsertOrUpdateBuilder("PendingReviews")
            .set("id").to(generatedId)  // Must be deterministically generated.
            .set("userId").to(userId)
            .set("action").to("REVIEW ACCOUNT")
            .set("note").to("Suspicious activity detected.")
            .build();

        return MutationGroup.create(userMutation, pendingReview);
      }
    }));

mutations.apply(SpannerIO.write()
    .withInstanceId(instanceId)
    .withDatabaseId(databaseId)
    .grouped());

Ao criar um grupo de mutações, a primeira mutação fornecida como um argumento se torna a primária. Nesse caso, as duas tabelas não estão relacionadas. Portanto, não há uma mutação primária clara. Nós selecionamos userMutation como primária, colocando-a primeiro. Aplicar as duas mutações separadamente seria mais rápido, mas não garantiria a atomicidade. Assim, criar um grupo de mutações é a melhor escolha nessa situação.

A seguir

Esta página foi útil? Conte sua opinião sobre:

Enviar comentários sobre…

Documentação do Cloud Spanner