O Dataflow é um serviço gerenciado para transformar e enriquecer dados. Com o conector do Dataflow para o Spanner, é possível ler os dados e gravar dados no Spanner em um pipeline do Dataflow. transformando ou modificando os dados. Também é possível criar pipelines que transferem dados entre o Spanner e outros produtos do Google Cloud.
O conector do Dataflow é o método recomendado para operações mover dados em massa para dentro e para fora do Spanner e realizar grandes transformações em um banco de dados que não são compatíveis com DML particionada; como transferências de tabelas, exclusões em massa que exigem JOIN e assim por diante. Durante o trabalho com bancos de dados individuais, há outros métodos que você pode usar para importar e exportar dados:
- Use o console do Google Cloud para exportar um banco de dados individual de Spanner para o Cloud Storage no formato Avro.
- Use o console do Google Cloud para importar um banco de dados de volta para o o Spanner dos arquivos exportados para o Cloud Storage.
- Use a API REST ou a Google Cloud CLI para executar os comandos export ou import do Spanner para o Cloud Storage e vice-versa (também usando o formato Avro).
O conector do Dataflow para Spanner faz parte do SDK do Apache Beam para Java (em inglês) e fornece uma API para fazer isso ações. Para mais informações sobre alguns dos conceitos discutidos abaixo, como como objetos e transformações PCollection, consulte o guia de programação do Apache Beam.
Adicionar o conector ao seu projeto do Maven
Para adicionar o conector do Google Cloud Dataflow a um projeto do
Maven, adicione o artefato beam-sdks-java-io-google-cloud-platform
do Maven ao
arquivo pom.xml
como uma dependência.
Por exemplo, se o arquivo pom.xml
definir beam.version
como o número de versão apropriado, adicione a seguinte dependência:
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
<version>${beam.version}</version>
</dependency>
Ler dados do Spanner
Para ler dados do Spanner, aplique a transformação SpannerIO.read().
Configure a leitura usando os métodos na classe SpannerIO.Read
.
Aplicar a transformação retorna um PCollection<Struct>
, em que
cada elemento do conjunto representa uma linha individual retornada pela operação de leitura (links em inglês). É possível ler dados do Spanner com e sem um SQL específico
consulta, dependendo do resultado desejado.
Aplicar a transformação SpannerIO.read()
retorna uma visualização consistente dos dados
devido à execução de uma leitura forte. A menos que você especifique o contrário, um snapshot do resultado da leitura
será criado no momento em que você a iniciar. Consulte as leituras para saber mais
informações sobre os diferentes tipos de leituras que o Spanner pode realizar.
Ler dados usando uma consulta
Para ler um conjunto específico de dados do Spanner, configure a transformação
usando o método SpannerIO.Read.withQuery()
para especificar um objeto
consulta. Exemplo:
Ler dados sem especificar uma consulta
Para ler um banco de dados sem usar uma consulta, especifique uma tabela nome usando o método SpannerIO.Read.withTable() e especifique um lista de colunas para leitura usando SpannerIO.Read.withColumns() . Exemplo:
GoogleSQL
PostgreSQL
Para limitar as linhas lidas, especifique um conjunto de chaves primárias para leitura usando o SpannerIO.Read.withKeySet().
Você também pode ler uma tabela usando um índice secundário especificado. Assim como acontece com chamada da API readUsingIndex(), o índice deve conter todos os dados que aparece nos resultados da consulta.
Para fazer isso, especifique a tabela como mostrado no exemplo anterior e especifique os
index que contém os valores de coluna desejados usando o
SpannerIO.Read.withIndex()
. O índice deve armazenar todos
das colunas que a transformação precisa ler. A chave primária da tabela base é
armazenadas implicitamente. Por exemplo, para ler a tabela Songs
usando o índice
SongsBySongName
, use o
seguinte código:
GoogleSQL
PostgreSQL
Controlar a desatualização dos dados de transação
Uma transformação será executada em um snapshot consistente de dados. Para
controlar a inatividade dos dados, use o
método SpannerIO.Read.withTimestampBound()
(em inglês). Consulte
Transações para mais informações.
Ler de várias tabelas na mesma transação
Se quiser ler dados de várias tabelas ao mesmo tempo para garantir a
consistência dos dados, execute todas as leituras em uma única transação. Para
isso, aplique uma transformação createTransaction()
(em inglês) e crie
um objeto PCollectionView<Transaction>
que depois gere uma transação. Para transmitir a exibição resultante a uma operação de leitura, use
SpannerIO.Read.withTransaction()
(em inglês).
GoogleSQL
PostgreSQL
Ler dados de todas as tabelas disponíveis
É possível ler dados de todas as tabelas disponíveis em um banco de dados do Spanner.
GoogleSQL
PostgreSQL
Resolver problemas de consultas incompatíveis
O conector do Dataflow só oferece suporte a consultas SQL do Spanner
em que o primeiro operador no plano de execução da consulta é um objeto Distributed
União. Se você tentar ler dados do Spanner usando uma consulta
Você receber uma exceção informando que a consulta does not have a DistributedUnion at
the root
, siga as etapas em Entender como o Spanner é executado.
consultas para recuperar um plano de execução para sua consulta usando o
console do Google Cloud.
Se sua consulta SQL não for compatível, simplifique-a para uma consulta que tenha um "distributed union" como o primeiro operador no plano de execução. Remova funções de
agregação, assim como os operadores DISTINCT
, GROUP BY
e ORDER
, porque eles
têm maior probabilidade de impedir a execução da consulta.
Criar mutações para uma gravação
Use o método Mutation
newInsertOrUpdateBuilder()
em vez do método
Método newInsertBuilder()
a menos que seja absolutamente necessário para pipelines Java. Para pipelines Python, use
SpannerInsertOrUpdate()
em vez de
SpannerInsert()
O Dataflow oferece
garante pelo menos uma vez, o que significa que a mutação pode ser escrita
várias vezes. Como resultado, somente INSERT
mutações podem gerar
com.google.cloud.spanner.SpannerException: ALREADY_EXISTS
erros que causam
que o pipeline falhe. Para evitar esse erro, use o INSERT_OR_UPDATE
mutação, que adiciona uma nova linha ou atualiza os valores da coluna se a linha
já existe. A mutação INSERT_OR_UPDATE
pode ser aplicada mais de uma vez.
Gravar no Spanner e transformar dados
É possível gravar dados no Spanner com o
usando uma transformação SpannerIO.write()
para executar uma
coleção de mutações de linha de entrada. Grupos de conectores do Dataflow
em lotes para aumentar a eficiência.
Veja no exemplo a seguir como aplicar uma transformação de gravação a um PCollection
de
mutações:
GoogleSQL
PostgreSQL
Se uma transformação for interrompida inesperadamente antes da conclusão, as mutações que já foram aplicadas não serão revertidas.
Aplicar grupos de mutações atomicamente
É possível usar a classe MutationGroup
(em inglês) para garantir que um
grupo de mutações seja aplicado atomicamente em conjunto. As mutações em um
MutationGroup
serão enviadas na mesma transação, mas a
transação pode se repetir.
Os grupos de mutações têm melhor desempenho quando são usados para agrupar mutações que afetam os dados armazenados juntos no espaço da chave. Como o Spanner intercala os dados das tabelas mãe e filha na tabela mãe, os dados estão sempre próximos no espaço da chave. Recomendamos que você estruture seu grupo de mutações para que ele contenha uma mutação aplicada a uma tabela primária e outras mutações aplicadas a tabelas secundárias. Ou, então, faça com que todas suas mutações modifiquem dados próximos no espaço da chave. Para mais informações sobre como o Spanner armazena para dados da tabela filha, consulte Esquema e modelo de dados. Se você não organiza seus grupos de mutação em torno das hierarquias de tabela recomendadas ou se os dados que o acesso não está próximo no espaço da chave, o Spanner pode executar confirmações em duas fases, o que resultará em um desempenho mais lento. Para mais informações, consulte Compensações de localidade.
Para usar MutationGroup
, crie uma transformação SpannerIO.write()
e chame o
método SpannerIO.Write.grouped()
(em inglês). Ele retornará uma
transformação a ser aplicada a uma PCollection
de objetos MutationGroup
.
Ao criar um MutationGroup
, a primeira mutação listada se torna a
primária. Caso seu grupo afete uma tabela mãe e uma filha, a mutação primária precisa estar incluída na tabela mãe. Caso contrário, use qualquer mutação como a primária. O conector do Dataflow usa a mutação primária para determinar os limites de particionamento a fim de agrupar as mutações de maneira eficiente.
Por exemplo, imagine que seu aplicativo monitore o comportamento e sinalize
ações problemáticas dos usuários para análise. Para cada comportamento sinalizado, você quer
atualizar a tabela Users
para bloquear o acesso do usuário ao seu aplicativo e
registrar o incidente na tabela PendingReviews
. Para garantir
que ambas as tabelas sejam atualizadas atomicamente, use um MutationGroup
:
GoogleSQL
PostgreSQL
Ao criar um grupo de mutações, a primeira mutação fornecida como um argumento se torna a primária. Nesse caso, as duas tabelas não estão relacionadas. Portanto,
não há uma mutação primária clara. Selecionamos userMutation
como primária
colocando-a primeiro. Aplicar as duas mutações separadamente seria mais rápido, mas
não garantiria a atomicidade. Assim, criar um grupo de mutações é a melhor escolha nessa
situação.
A seguir
- Saiba mais sobre como criar um canal de dados do Apache Beam.
- Exportar e importar bancos de dados do Spanner no console do Google Cloud usando o Dataflow.