Esta página demonstra como criar pipelines do Dataflow que consomem e encaminham dados de alteração do Spanner usando fluxos de alterações. Você pode usar o exemplo de código nesta página para criar pipelines personalizados.
Principais conceitos
Confira a seguir alguns conceitos principais de pipelines do Dataflow para fluxos de mudanças.
Dataflow
O Dataflow é um serviço sem servidor, rápido e econômico que oferece suporte a processamento de streaming e em lote. Ele oferece portabilidade com jobs de processamento escritos usando as bibliotecas de código aberto Apache Beam e automatiza o provisionamento de infraestrutura e o gerenciamento de clusters. O Dataflow oferece streaming quase em tempo real ao ler de fluxos de alterações.
É possível usar o Dataflow para consumir fluxos de alterações do Spanner com o conector SpannerIO, que oferece uma abstração sobre a API Spanner para consultar fluxos de alterações. Com esse conector, você não precisa gerenciar o ciclo de vida da partição de fluxos de mudança, o que é necessário quando você usa a API Spanner diretamente. O conector fornece um fluxo de registros de alteração de dados para que você possa se concentrar mais na lógica do aplicativo e menos em detalhes específicos da API e na partição dinâmica do fluxo de mudanças. Recomendamos o uso do conector SpannerIO em vez da API Spanner na maioria das circunstâncias em que você precisa ler dados de fluxo de alterações.
Os modelos do Dataflow são pipelines pré-criados do Dataflow que implementam casos de uso comuns. Consulte Modelos do Dataflow para ter uma visão geral.
Pipeline do Dataflow
Um pipeline do Dataflow de fluxos de alterações do Spanner é composto por quatro partes principais:
- Um banco de dados do Spanner com um fluxo de alterações
- O conector SpannerIO
- Transformações e coletores definidos pelo usuário
- Um gravador de E/S de destino do Apache Beam
Fluxo de alterações do Spanner
Para saber como criar um fluxo de alterações, consulte Criar um fluxo de alterações.
Conector SpannerIO do Apache Beam
Esse é o conector SpannerIO descrito na seção anterior do Dataflow.
É um conector de E/S de origem que emite um PCollection
de registros de alteração de dados
para estágios posteriores do pipeline. O
horário do evento
para cada registro de alteração de dados emitido
será o carimbo de data/hora de confirmação. Os registros emitidos são
não ordenados, e o conector SpannerIO garante que não haverá
registros atrasados.
Ao trabalhar com fluxos de alterações, o Dataflow usa o checkpointing. Como resultado, cada worker pode aguardar até o intervalo de verificação configurado para armazenar as alterações em buffer antes de enviá-las para processamento adicional.
Transformações definidas pelo usuário
Uma transformação definida pelo usuário permite que um usuário agregue, transforme ou modifique dados de processamento em um pipeline do Dataflow. Os casos de uso comuns para isso são a remoção de informações de identificação pessoal, atendendo aos requisitos de formato de dados downstream e a classificação. Consulte a documentação oficial do Apache Beam para conferir o guia de programação sobre transformações.
Gravador de E/S do sink do Apache Beam
O Apache Beam contém conectores de E/S integrados que podem ser usados para gravar de um pipeline do Dataflow em um destino de dados, como o BigQuery. A maioria dos destinos de dados comuns tem suporte nativo.
Modelos do Dataflow
Os modelos do Dataflow fornecem um método para criar jobs do Dataflow com base em imagens do Docker pré-criadas para casos de uso comuns usando o console do Google Cloud, a Google Cloud CLI ou as chamadas da API REST.
Para fluxos de alterações do Spanner, oferecemos três modelos flexíveis do Dataflow:
Definir permissões do IAM para modelos do Dataflow
Antes de criar um job do Dataflow com os três modelos flex listados, verifique se você tem as permissões necessárias do IAM para as seguintes contas de serviço:
Se você não tiver as permissões necessárias do IAM, será necessário especificar uma conta de serviço de worker gerenciada pelo usuário para criar o job do Dataflow. Para mais informações, consulte Segurança e permissões do Dataflow.
Quando você tenta executar um job usando um modelo flexível do Dataflow sem todas as permissões necessárias, o job pode falhar com um erro de leitura do arquivo de resultado ou um erro de permissão negada no recurso. Para mais informações, consulte Resolver problemas de modelos Flex.
criar um pipeline do Dataflow
Esta seção aborda a configuração inicial do conector e fornece exemplos de integrações comuns com o recurso de fluxos de alterações do Spanner.
Para seguir estas etapas, você precisa de um ambiente de desenvolvimento Java para o Dataflow. Para mais informações, consulte Criar um pipeline do Dataflow usando Java.
Criar um stream de alterações
Para saber como criar um fluxo de alterações, consulte Criar um fluxo de alterações. Para continuar com as próximas etapas, você precisa ter um banco de dados do Spanner com um fluxo de alterações configurado.
Conceder privilégios de controle de acesso granular
Se você espera que usuários de controle de acesso detalhado executem o job do Dataflow,
verifique se os usuários têm acesso a uma função de banco de dados
com o privilégio SELECT
no fluxo de alterações e o privilégio EXECUTE
na função com valor de tabela do fluxo de alterações. Além disso, verifique se o
principal especifica o papel do banco de dados na configuração do SpannerIO ou no
modelo flexível do Dataflow.
Para mais informações, consulte Sobre o controle de acesso detalhado.
Adicionar o conector SpannerIO como uma dependência
O conector SpannerIO do Apache Beam encapsula a complexidade do consumo dos fluxos de alterações diretamente usando a API Cloud Spanner, emitindo uma PCollection de registros de dados de fluxo de alterações para estágios posteriores do pipeline.
Esses objetos podem ser consumidos em outras etapas do pipeline do Dataflow do usuário. A integração do fluxo de alterações faz parte do conector SpannerIO. Para usar o conector SpannerIO, a dependência
precisa ser adicionada ao arquivo pom.xml
:
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
<version>${beam-version}</version> <!-- available from version 2.38.0 -->
</dependency>
Criar um banco de dados de metadados
O conector precisa acompanhar cada partição ao executar o pipeline do Apache Beam. Ele mantém esses metadados em uma tabela Spanner criada pelo conector durante a inicialização. Especifique o banco de dados em que essa tabela será criada ao configurar o conector.
Conforme descrito nas práticas recomendadas para fluxos de alteração, recomendamos criar um novo banco de dados para essa finalidade, em vez de permitir que o conector use o banco de dados do aplicativo para armazenar a tabela de metadados.
O proprietário de um job do Dataflow que usa o conector SpannerIO precisa ter as seguintes permissões do IAM definidas com este banco de dados de metadados:
spanner.databases.updateDdl
spanner.databases.beginReadOnlyTransaction
spanner.databases.beginOrRollbackReadWriteTransaction
spanner.databases.read
spanner.databases.select
spanner.databases.write
spanner.sessions.create
spanner.sessions.get
Configurar o conector
O conector de fluxos de alterações do Spanner pode ser configurado da seguinte maneira:
SpannerConfig spannerConfig = SpannerConfig
.create()
.withProjectId("my-project-id")
.withInstanceId("my-instance-id")
.withDatabaseId("my-database-id")
.withDatabaseRole("my-database-role"); // Needed for fine-grained access control only
Timestamp startTime = Timestamp.now();
Timestamp endTime = Timestamp.ofTimeSecondsAndNanos(
startTime.getSeconds() + (10 * 60),
startTime.getNanos()
);
SpannerIO
.readChangeStream()
.withSpannerConfig(spannerConfig)
.withChangeStreamName("my-change-stream")
.withMetadataInstance("my-meta-instance-id")
.withMetadataDatabase("my-meta-database-id")
.withMetadataTable("my-meta-table-name")
.withRpcPriority(RpcPriority.MEDIUM)
.withInclusiveStartAt(startTime)
.withInclusiveEndAt(endTime);
Confira a seguir as descrições das opções readChangeStream()
:
Configuração do Spanner (obrigatória)
Usado para configurar o projeto, a instância e o banco de dados em que o fluxo de alterações foi criado e precisa ser consultado. Também especifica, opcionalmente, o papel do banco de dados a ser usado quando o principal do IAM que está executando o job do Dataflow é um usuário de controle de acesso granular. O job assume essa função de banco de dados para acessar o fluxo de alterações. Para mais informações, consulte Sobre o controle de acesso detalhado.
Nome do fluxo de alterações (obrigatório)
Esse nome identifica exclusivamente o fluxo de mudanças. O nome aqui precisa ser o mesmo usado na criação.
ID da instância de metadados (opcional)
Essa é a instância para armazenar os metadados usados pelo conector para controlar o consumo dos dados da API do fluxo de alterações.
ID do banco de dados de metadados (obrigatório)
É o banco de dados para armazenar os metadados usados pelo conector para controlar o consumo dos dados da API do fluxo de alterações.
Nome da tabela de metadados (opcional)
Use isso apenas ao atualizar um pipeline existente.
Esse é o nome da tabela de metadados preexistente a ser usado pelo conector. O conector usa isso para armazenar os metadados e controlar o consumo dos dados da API do fluxo de alterações. Se essa opção for omitida, o Spanner vai criar uma nova tabela com um nome gerado na inicialização do conector.
Prioridade da RPC (opcional)
A prioridade
da solicitação a ser
usada para as consultas de fluxo de mudanças. Se esse parâmetro for omitido, high
priority
será usado.
InclusiveStartAt (obrigatório)
As mudanças do carimbo de data/hora fornecido são retornadas ao autor da chamada.
InclusiveEndAt (opcional)
As mudanças até o carimbo de data/hora fornecido são retornadas ao autor da chamada. Se esse parâmetro for omitido, as mudanças serão emitidas indefinidamente.
Adicionar transformações e coletores para processar dados de mudança
Com as etapas anteriores concluídas, o conector SpannerIO configurado está pronto
para emitir uma PCollection de objetos DataChangeRecord
.
Consulte Exemplos de transformações e coletores para conferir várias configurações de canal de exemplo que processam esses dados transmitidos de várias maneiras.
Os registros de fluxo de alterações emitidos pelo conector SpannerIO não são ordenados. Isso ocorre porque as PCollections não oferecem garantias de ordenação. Se você precisar de um fluxo ordenado, agrupe e classifique os registros como transformações nos pipelines. Consulte Exemplo: ordenar por chave. Você pode estender esse exemplo para classificar os registros com base em qualquer campo, como por IDs de transação.
Exemplos de transformações e coletores
Você pode definir suas próprias transformações e especificar coletores para gravar os dados. A documentação do Apache Beam oferece uma infinidade de transformações que podem ser aplicadas, além de conectores de E/S prontos para uso para gravar os dados em sistemas externos.
Exemplo: ordenar por chave
Este exemplo de código emite registros de alteração de dados ordenados pelo carimbo de data/hora de confirmação e agrupados por chaves primárias usando o conector do Dataflow.
pipeline
.apply(SpannerIO
.readChangeStream()
.withSpannerConfig(SpannerConfig
.create()
.withProjectId("my-project-id")
.withInstanceId("my-instance-id")
.withDatabaseId("my-database-id")
.withDatabaseRole("my-database-role")) // Needed for fine-grained access control only
.withChangeStreamName("my-change-stream")
.withMetadataInstance("my-metadata-instance-id")
.withMetadataDatabase("my-metadata-database-id")
.withInclusiveStartAt(Timestamp.now()))
.apply(ParDo.of(new BreakRecordByModFn()))
.apply(ParDo.of(new KeyByIdFn()))
.apply(ParDo.of(new BufferKeyUntilOutputTimestamp()))
// Subsequent processing goes here
Este exemplo de código usa estados e timers para armazenar registros de cada chave e define o tempo de expiração do timer para um tempo T
configurado pelo usuário no futuro (definido na função BufferKeyUntilOutputTimestamp). Quando a marca d'água do Dataflow passa do tempo T
, esse código limpa todos os registros no buffer com carimbo de data/hora menor que T
, ordena esses registros por carimbo de data/hora de confirmação e gera um par de chave-valor em que:
- A chave é a chave de entrada, que é a chave primária hashizada para uma matriz de bucket de tamanho 1.000.
- O valor são os registros de alteração de dados ordenados que foram armazenados em buffer para a chave.
Para cada chave, temos as seguintes garantias:
- Os timers são disparados na ordem do carimbo de data/hora de expiração.
- Os estágios downstream têm garantia de receber os elementos na mesma ordem em que foram produzidos.
Por exemplo, com uma chave de valor 100, o timer é acionado em T1
e T10
, respectivamente, produzindo um pacote de registros de mudança de dados em cada carimbo de data/hora. Como os registros de alteração de dados gerados em T1
foram produzidos antes dos registros de alteração de dados gerados em T10
, os registros de alteração de dados gerados em T1
também serão recebidos na próxima etapa antes dos registros de alteração de dados gerados em T10
. Esse mecanismo nos ajuda a garantir a ordenação rígida do carimbo de data/hora de confirmação por chave primária para o processamento downstream.
Esse processo será repetido até que o pipeline termine e todos os registros de mudança de dados sejam processados. Se não houver um horário de término especificado, o processo será repetido indefinidamente.
Observe que este exemplo de código usa estados e timers, em vez de janelas, para realizar a ordenação por chave. O motivo é que não há garantia de que as janelas serão processadas na ordem. Isso significa que janelas mais antigas podem ser processadas mais tarde do que janelas mais recentes, o que pode resultar em processamento fora de ordem.
BreakRecordByModFn
Cada registro de alteração de dados pode conter vários mods. Cada mod representa uma inserção, atualização ou exclusão em uma única chave primária. Essa função divide cada registro de mudança de dados em registros separados, um por mod.
private static class BreakRecordByModFn extends DoFn<DataChangeRecord,
DataChangeRecord> {
@ProcessElement
public void processElement(
@Element DataChangeRecord record, OutputReceiver<DataChangeRecord>
outputReceiver) {
record.getMods().stream()
.map(
mod ->
new DataChangeRecord(
record.getPartitionToken(),
record.getCommitTimestamp(),
record.getServerTransactionId(),
record.isLastRecordInTransactionInPartition(),
record.getRecordSequence(),
record.getTableName(),
record.getRowType(),
Collections.singletonList(mod),
record.getModType(),
record.getValueCaptureType(),
record.getNumberOfRecordsInTransaction(),
record.getNumberOfPartitionsInTransaction(),
record.getTransactionTag(),
record.isSystemTransaction(),
record.getMetadata()))
.forEach(outputReceiver::output);
}
}
KeyByIdFn
Essa função recebe um DataChangeRecord
e gera um DataChangeRecord
com a chave da chave primária do Spanner, que é hashizada para um valor inteiro.
private static class KeyByIdFn extends DoFn<DataChangeRecord, KV<String, DataChangeRecord>> {
// NUMBER_OF_BUCKETS should be configured by the user to match their key cardinality
// Here, we are choosing to hash the Spanner primary keys to a bucket index, in order to have a deterministic number
// of states and timers for performance purposes.
// Note that having too many buckets might have undesirable effects if it results in a low number of records per bucket
// On the other hand, having too few buckets might also be problematic, since many keys will be contained within them.
private static final int NUMBER_OF_BUCKETS = 1000;
@ProcessElement
public void processElement(
@Element DataChangeRecord record,
OutputReceiver<KV<String, DataChangeRecord>> outputReceiver) {
int hashCode = (int) record.getMods().get(0).getKeysJson().hashCode();
// Hash the received keys into a bucket in order to have a
// deterministic number of buffers and timers.
String bucketIndex = String.valueOf(hashCode % NUMBER_OF_BUCKETS);
outputReceiver.output(KV.of(bucketIndex, record));
}
}
BufferKeyUntilOutputTimestamp
Os timers e buffers são por chave. Essa função armazena em buffer cada registro de alteração de dados até que a marca-d'água transmita o carimbo de data/hora em que queremos gerar os registros de alteração de dados em buffer.
Este código usa um timer em loop para determinar quando limpar o buffer:
- Quando um registro de alteração de dados de uma chave é encontrado pela primeira vez, o timer é acionado no carimbo de data/hora de confirmação do registro de alteração de dados +
incrementIntervalSeconds
(uma opção configurável pelo usuário). - Quando o timer é acionado, ele adiciona todos os registros de alteração de dados no buffer com um carimbo de data/hora menor que o tempo de expiração do timer para
recordsToOutput
. Se o buffer tiver registros de mudança de dados com carimbo de data/hora maior ou igual ao tempo de expiração do timer, ele vai adicionar esses registros de mudança de dados de volta ao buffer em vez de gerar saída. Em seguida, ele define o próximo timer para o tempo de expiração do timer atual maisincrementIntervalInSeconds
. - Se
recordsToOutput
não estiver vazio, a função vai ordenar os registros de mudança de dados emrecordsToOutput
por carimbo de data/hora de confirmação e ID da transação e, em seguida, vai gerar a saída.
private static class BufferKeyUntilOutputTimestamp extends
DoFn<KV<String, DataChangeRecord>, KV<String, Iterable<DataChangeRecord>>> {
private static final Logger LOG =
LoggerFactory.getLogger(BufferKeyUntilOutputTimestamp.class);
private final long incrementIntervalInSeconds = 2;
private BufferKeyUntilOutputTimestamp(long incrementIntervalInSeconds) {
this.incrementIntervalInSeconds = incrementIntervalInSeconds;
}
@SuppressWarnings("unused")
@TimerId("timer")
private final TimerSpec timerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
@StateId("buffer")
private final StateSpec<BagState<DataChangeRecord>> buffer = StateSpecs.bag();
@StateId("keyString")
private final StateSpec<ValueState<String>> keyString =
StateSpecs.value(StringUtf8Coder.of());
@ProcessElement
public void process(
@Element KV<String, DataChangeRecord> element,
@StateId("buffer") BagState<DataChangeRecord> buffer,
@TimerId("timer") Timer timer,
@StateId("keyString") ValueState<String> keyString) {
buffer.add(element.getValue());
// Only set the timer if this is the first time we are receiving a data change
// record with this key.
String elementKey = keyString.read();
if (elementKey == null) {
Instant commitTimestamp =
new Instant(element.getValue().getCommitTimestamp().toSqlTimestamp());
Instant outputTimestamp =
commitTimestamp.plus(Duration.standardSeconds(incrementIntervalInSeconds));
timer.set(outputTimestamp);
keyString.write(element.getKey());
}
}
@OnTimer("timer")
public void onExpiry(
OnTimerContext context,
@StateId("buffer") BagState<DataChangeRecord> buffer,
@TimerId("timer") Timer timer,
@StateId("keyString") ValueState<String> keyString) {
if (!buffer.isEmpty().read()) {
String elementKey = keyString.read();
final List<DataChangeRecord> records =
StreamSupport.stream(buffer.read().spliterator(), false)
.collect(Collectors.toList());
buffer.clear();
List<DataChangeRecord> recordsToOutput = new ArrayList<>();
for (DataChangeRecord record : records) {
Instant recordCommitTimestamp =
new Instant(record.getCommitTimestamp().toSqlTimestamp());
final String recordString =
record.getMods().get(0).getNewValuesJson().isEmpty()
? "Deleted record"
: record.getMods().get(0).getNewValuesJson();
// When the watermark passes time T, this means that all records with
// event time < T have been processed and successfully committed. Since the
// timer fires when the watermark passes the expiration time, we should
// only output records with event time < expiration time.
if (recordCommitTimestamp.isBefore(context.timestamp())) {
LOG.info(
"Outputting record with key {} and value {} at expiration " +
"timestamp {}",
elementKey,
recordString,
context.timestamp().toString());
recordsToOutput.add(record);
} else {
LOG.info(
"Expired at {} but adding record with key {} and value {} back to " +
"buffer due to commit timestamp {}",
context.timestamp().toString(),
elementKey,
recordString,
recordCommitTimestamp.toString());
buffer.add(record);
}
}
// Output records, if there are any to output.
if (!recordsToOutput.isEmpty()) {
// Order the records in place, and output them. The user would need
// to implement DataChangeRecordComparator class that sorts the
// data change records by commit timestamp and transaction ID.
Collections.sort(recordsToOutput, new DataChangeRecordComparator());
context.outputWithTimestamp(
KV.of(elementKey, recordsToOutput), context.timestamp());
LOG.info(
"Expired at {}, outputting records for key {}",
context.timestamp().toString(),
elementKey);
} else {
LOG.info("Expired at {} with no records", context.timestamp().toString());
}
}
Instant nextTimer = context.timestamp().plus(Duration.standardSeconds(incrementIntervalInSeconds));
if (buffer.isEmpty() != null && !buffer.isEmpty().read()) {
LOG.info("Setting next timer to {}", nextTimer.toString());
timer.set(nextTimer);
} else {
LOG.info(
"Timer not being set since the buffer is empty: ");
keyString.clear();
}
}
}
Como ordenar transações
Esse pipeline pode ser alterado para ordenar por ID da transação e carimbo de data/hora de confirmação. Para fazer isso, registre os registros de buffer para cada par de ID de transação / carimbo de data/hora de confirmação, em vez de cada chave do Spanner. Isso requer a modificação do código em KeyByIdFn.
Exemplo: montar transações
Este exemplo de código lê registros de alteração de dados, reúne todos os registros de alteração de dados pertencentes à mesma transação em um único elemento e gera esse elemento. As transações geradas por este código de exemplo não são ordenadas por carimbo de data/hora de confirmação.
Este exemplo de código usa buffers para montar transações com base em registros de mudança de dados. Ao receber um registro de alteração de dados pertencente a uma transação pela primeira vez, ele lê o campo numberOfRecordsInTransaction
no registro de alteração de dados, que descreve o número esperado de registros de alteração de dados pertencentes a essa transação. Ele armazena em buffer os registros de mudança de dados pertencentes a essa transação até que o número de registros em buffer corresponda a numberOfRecordsInTransaction
, quando ele vai gerar os registros de mudança de dados agrupados.
pipeline
.apply(SpannerIO
.readChangeStream()
.withSpannerConfig(SpannerConfig
.create()
.withProjectId("my-project-id")
.withInstanceId("my-instance-id")
.withDatabaseId("my-database-id")
.withDatabaseRole("my-database-role")) // Needed for fine-grained access control only
.withChangeStreamName("my-change-stream")
.withMetadataInstance("my-metadata-instance-id")
.withMetadataDatabase("my-metadata-database-id")
.withInclusiveStartAt(Timestamp.now()))
.apply(ParDo.of(new KeyByTransactionIdFn()))
.apply(ParDo.of(new TransactionBoundaryFn()))
// Subsequent processing goes here
KeyByTransactionIdFn
Essa função recebe um DataChangeRecord
e gera um DataChangeRecord
com chave pelo ID da transação.
private static class KeyByTransactionIdFn extends DoFn<DataChangeRecord, KV<String, DataChangeRecord>> {
@ProcessElement
public void processElement(
@Element DataChangeRecord record,
OutputReceiver<KV<String, DataChangeRecord>> outputReceiver) {
outputReceiver.output(KV.of(record.getServerTransactionId(), record));
}
}
TransactionBoundaryFn
TransactionBoundaryFn
armazena em buffer os pares de chave-valor de
{TransactionId, DataChangeRecord}
recebidos de KeyByTransactionIdFn
e
os armazena em grupos com base em TransactionId
. Quando o número de
registros em buffer for igual ao número de registros contidos na
transação inteira, essa função classifica os objetos DataChangeRecord
no grupo por sequência de registro e gera um par de chave-valor de
{CommitTimestamp, TransactionId}
, Iterable<DataChangeRecord>
.
Aqui, estamos presumindo que SortKey
é uma classe definida pelo usuário que representa
um par {CommitTimestamp, TransactionId}
. Para mais informações sobre o
SortKey
, consulte a implementação de exemplo.
private static class TransactionBoundaryFn extends DoFn<KV<String, DataChangeRecord>, KV<SortKey, Iterable<DataChangeRecord>>> {
@StateId("buffer")
private final StateSpec<BagState<DataChangeRecord>> buffer = StateSpecs.bag();
@StateId("count")
private final StateSpec<ValueState<Integer>> countState = StateSpecs.value();
@ProcessElement
public void process(
ProcessContext context,
@StateId("buffer") BagState<DataChangeRecord> buffer,
@StateId("count") ValueState<Integer> countState) {
final KV<String, DataChangeRecord> element = context.element();
final DataChangeRecord record = element.getValue();
buffer.add(record);
int count = (countState.read() != null ? countState.read() : 0);
count = count + 1;
countState.write(count);
if (count == record.getNumberOfRecordsInTransaction()) {
final List<DataChangeRecord> sortedRecords =
StreamSupport.stream(buffer.read().spliterator(), false)
.sorted(Comparator.comparing(DataChangeRecord::getRecordSequence))
.collect(Collectors.toList());
final Instant commitInstant =
new Instant(sortedRecords.get(0).getCommitTimestamp().toSqlTimestamp()
.getTime());
context.outputWithTimestamp(
KV.of(
new SortKey(sortedRecords.get(0).getCommitTimestamp(),
sortedRecords.get(0).getServerTransactionId()),
sortedRecords),
commitInstant);
buffer.clear();
countState.clear();
}
}
}
Exemplo: filtrar por tag de transação
Quando uma transação que modifica os dados do usuário é marcada, a tag correspondente e o tipo dela são armazenados como parte de DataChangeRecord
. Estes exemplos demonstram como filtrar registros de fluxo de mudanças com base em tags de transação definidas pelo usuário e tags do sistema:
Filtragem de tags definidas pelo usuário para my-tx-tag
:
pipeline
.apply(SpannerIO
.readChangeStream()
.withSpannerConfig(SpannerConfig
.create()
.withProjectId("my-project-id")
.withInstanceId("my-instance-id")
.withDatabaseId("my-database-id")
.withDatabaseRole("my-database-role")) // Needed for fine-grained access control only
.withChangeStreamName("my-change-stream")
.withMetadataInstance("my-metadata-instance-id")
.withMetadataDatabase("my-metadata-database-id")
.withInclusiveStartAt(Timestamp.now()))
.apply(Filter.by(record ->
!record.isSystemTransaction()
&& record.getTransactionTag().equalsIgnoreCase("my-tx-tag")))
// Subsequent processing goes here
Filtragem de tags do sistema/auditoria de TTL:
pipeline
.apply(SpannerIO
.readChangeStream()
.withSpannerConfig(SpannerConfig
.create()
.withProjectId("my-project-id")
.withInstanceId("my-instance-id")
.withDatabaseId("my-database-id")
.withDatabaseRole("my-database-role")) // Needed for fine-grained access control only
.withChangeStreamName("my-change-stream")
.withMetadataInstance("my-metadata-instance-id")
.withMetadataDatabase("my-metadata-database-id")
.withInclusiveStartAt(Timestamp.now()))
.apply(Filter.by(record ->
record.isSystemTransaction()
&& record.getTransactionTag().equals("RowDeletionPolicy")))
// Subsequent processing goes here
Exemplo: buscar linha completa
Este exemplo funciona com uma tabela Spanner chamada Singer
que tem a seguinte definição:
CREATE TABLE Singers (
SingerId INT64 NOT NULL,
FirstName STRING(1024),
LastName STRING(1024)
) PRIMARY KEY (SingerId);
No modo de captura de valor OLD_AND_NEW_VALUES
padrão dos fluxos de mudança,
quando há uma atualização em uma linha do Spanner, o registro de mudança
de dados recebido vai conter apenas as colunas que foram alteradas. Colunas rastreadas, mas
inalteradas, não são incluídas no registro. A chave primária do
mod pode ser usada para fazer uma leitura de snapshot do Spanner no carimbo de data/hora de confirmação
do registro de alteração de dados para buscar as colunas inalteradas ou até
recuperar a linha completa.
Talvez seja necessário mudar a política de retenção do banco de dados para um valor maior ou igual à política de retenção do fluxo de alterações para que a leitura do snapshot seja bem-sucedida.
Além disso, o uso do tipo de captura de valor NEW_ROW
é a maneira recomendada e
mais eficiente de fazer isso, já que ele retorna todas as colunas rastreadas da linha
por padrão e não requer uma leitura de instantâneo extra no Spanner.
SpannerConfig spannerConfig = SpannerConfig
.create()
.withProjectId("my-project-id")
.withInstanceId("my-instance-id")
.withDatabaseId("my-database-id")
.withDatabaseRole("my-database-role"); // Needed for fine-grained access control only
pipeline
.apply(SpannerIO
.readChangeStream()
.withSpannerConfig(spannerConfig)
// Assume we have a change stream "my-change-stream" that watches Singers table.
.withChangeStreamName("my-change-stream")
.withMetadataInstance("my-metadata-instance-id")
.withMetadataDatabase("my-metadata-database-id")
.withInclusiveStartAt(Timestamp.now()))
.apply(ParDo.of(new ToFullRowJsonFn(spannerConfig)))
// Subsequent processing goes here
ToFullRowJsonFn
Essa transformação vai realizar uma leitura desatualizada no carimbo de data/hora de confirmação de cada registro recebido e mapear a linha completa para JSON.
public class ToFullRowJsonFn extends DoFn<DataChangeRecord, String> {
// Since each instance of this DoFn will create its own session pool and will
// perform calls to Spanner sequentially, we keep the number of sessions in
// the pool small. This way, we avoid wasting resources.
private static final int MIN_SESSIONS = 1;
private static final int MAX_SESSIONS = 5;
private final String projectId;
private final String instanceId;
private final String databaseId;
private transient DatabaseClient client;
private transient Spanner spanner;
public ToFullRowJsonFn(SpannerConfig spannerConfig) {
this.projectId = spannerConfig.getProjectId().get();
this.instanceId = spannerConfig.getInstanceId().get();
this.databaseId = spannerConfig.getDatabaseId().get();
}
@Setup
public void setup() {
SessionPoolOptions sessionPoolOptions = SessionPoolOptions
.newBuilder()
.setMinSessions(MIN_SESSIONS)
.setMaxSessions(MAX_SESSIONS)
.build();
SpannerOptions options = SpannerOptions
.newBuilder()
.setProjectId(projectId)
.setSessionPoolOption(sessionPoolOptions)
.build();
DatabaseId id = DatabaseId.of(projectId, instanceId, databaseId);
spanner = options.getService();
client = spanner.getDatabaseClient(id);
}
@Teardown
public void teardown() {
spanner.close();
}
@ProcessElement
public void process(
@Element DataChangeRecord element,
OutputReceiver<String> output) {
com.google.cloud.Timestamp commitTimestamp = element.getCommitTimestamp();
element.getMods().forEach(mod -> {
JSONObject keysJson = new JSONObject(mod.getKeysJson());
JSONObject newValuesJson = new JSONObject(mod.getNewValuesJson());
ModType modType = element.getModType();
JSONObject jsonRow = new JSONObject();
long singerId = keysJson.getLong("SingerId");
jsonRow.put("SingerId", singerId);
if (modType == ModType.INSERT) {
// For INSERT mod, get non-primary key columns from mod.
jsonRow.put("FirstName", newValuesJson.get("FirstName"));
jsonRow.put("LastName", newValuesJson.get("LastName"));
} else if (modType == ModType.UPDATE) {
// For UPDATE mod, get non-primary key columns by doing a snapshot read using the primary key column from mod.
try (ResultSet resultSet = client
.singleUse(TimestampBound.ofReadTimestamp(commitTimestamp))
.read(
"Singers",
KeySet.singleKey(com.google.cloud.spanner.Key.of(singerId)),
Arrays.asList("FirstName", "LastName"))) {
if (resultSet.next()) {
jsonRow.put("FirstName", resultSet.isNull("FirstName") ?
JSONObject.NULL : resultSet.getString("FirstName"));
jsonRow.put("LastName", resultSet.isNull("LastName") ?
JSONObject.NULL : resultSet.getString("LastName"));
}
}
} else {
// For DELETE mod, there is nothing to do, as we already set SingerId.
}
output.output(jsonRow.toString());
});
}
}
Esse código cria um cliente de banco de dados do Spanner para realizar a busca
de linhas completa e configura o pool de sessões para ter apenas algumas sessões,
realizando leituras em uma instância do ToFullReowJsonFn
sequencialmente.
O Dataflow gera várias instâncias dessa função,
cada uma com o próprio pool de clientes.
Exemplo: do Spanner para o Pub/Sub
Nesse cenário, o autor da chamada transmite os registros para o Pub/Sub o mais rápido possível, sem agrupamento nem agregação. Essa é uma boa opção para acionar o processamento downstream, como o streaming de todas as novas linhas inseridas em uma tabela do Spanner para o Pub/Sub para processamento adicional.
pipeline
.apply(SpannerIO
.readChangeStream()
.withSpannerConfig(SpannerConfig
.create()
.withProjectId("my-project-id")
.withInstanceId("my-instance-id")
.withDatabaseId("my-database-id")
.withDatabaseRole("my-database-role")) // Needed for fine-grained access control only
.withChangeStreamName("my-change-stream")
.withMetadataInstance("my-metadata-instance-id")
.withMetadataDatabase("my-metadata-database-id")
.withInclusiveStartAt(Timestamp.now()))
.apply(MapElements.into(TypeDescriptors.strings()).via(Object::toString))
.apply(PubsubIO.writeStrings().to("my-topic"));
O coletor do Pub/Sub pode ser configurado para garantir a semântica exatamente uma vez.
Exemplo: Spanner para Cloud Storage
Nesse cenário, o autor da chamada agrupa todos os registros em uma determinada janela e salva o grupo em arquivos separados do Cloud Storage. Isso é adequado para análises e arquivamento pontual, que é independente do período de retenção do Spanner.
pipeline
.apply(SpannerIO
.readChangeStream()
.withSpannerConfig(SpannerConfig
.create()
.withProjectId("my-project-id")
.withInstanceId("my-instance-id")
.withDatabaseId("my-database-id")
.withDatabaseRole("my-database-role")) // Needed for fine-grained access control only
.withChangeStreamName("my-change-stream")
.withMetadataInstance("my-metadata-instance-id")
.withMetadataDatabase("my-metadata-database-id")
.withInclusiveStartAt(Timestamp.now()))
.apply(MapElements.into(TypeDescriptors.strings()).via(Object::toString))
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
.apply(TextIO
.write()
.to("gs://my-bucket/change-stream-results-")
.withSuffix(".txt")
.withWindowedWrites()
.withNumShards(1));
O destino do Cloud Storage oferece a semântica "pelo menos uma vez" por padrão. Com processamento extra, ele pode ser modificado para ter semântica exatamente uma vez.
Também oferecemos um modelo do Dataflow para esse caso de uso: consulte Conectar fluxos de alterações ao Cloud Storage.
Exemplo: Spanner para BigQuery (tabela de registro)
Aqui, o autor da chamada transmite registros de mudança para o BigQuery. Cada registro de alteração de dados é refletido como uma linha no BigQuery. Isso é adequado para análises. Esse código usa as funções definidas anteriormente, na seção Buscar linha completa, para recuperar a linha completa do registro e gravá-la no BigQuery.
SpannerConfig spannerConfig = SpannerConfig
.create()
.withProjectId("my-project-id")
.withInstanceId("my-instance-id")
.withDatabaseId("my-database-id")
.withDatabaseRole("my-database-role"); // Needed for fine-grained access control only
pipeline
.apply(SpannerIO
.readChangeStream()
.withSpannerConfig(spannerConfig)
.withChangeStreamName("my-change-stream")
.withMetadataInstance("my-metadata-instance-id")
.withMetadataDatabase("my-metadata-database-id")
.withInclusiveStartAt(Timestamp.now()))
.apply(ParDo.of(new ToFullRowJsonFn(spannerConfig)))
.apply(BigQueryIO
.<String>write()
.to("my-bigquery-table")
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(Write.WriteDisposition.WRITE_APPEND)
.withSchema(new TableSchema().setFields(Arrays.asList(
new TableFieldSchema()
.setName("SingerId")
.setType("INT64")
.setMode("REQUIRED"),
new TableFieldSchema()
.setName("FirstName")
.setType("STRING")
.setMode("REQUIRED"),
new TableFieldSchema()
.setName("LastName")
.setType("STRING")
.setMode("REQUIRED")
)))
.withAutoSharding()
.optimizedWrites()
.withFormatFunction((String element) -> {
ObjectMapper objectMapper = new ObjectMapper();
JsonNode jsonNode = null;
try {
jsonNode = objectMapper.readTree(element);
} catch (IOException e) {
e.printStackTrace();
}
return new TableRow()
.set("SingerId", jsonNode.get("SingerId").asInt())
.set("FirstName", jsonNode.get("FirstName").asText())
.set("LastName", jsonNode.get("LastName").asText());
}
)
);
O sink do BigQuery oferece semântica do tipo "pelo menos uma vez" por padrão. Com processamento extra, ele pode ser modificado para ter semântica exatamente uma vez.
Também oferecemos um modelo do Dataflow para esse caso de uso. Consulte Conectar fluxos de alterações ao BigQuery.
Monitorar um pipeline
Há duas classes de métricas disponíveis para monitorar um pipeline do Dataflow de fluxo de alterações.
Métricas padrão do Dataflow
O Dataflow fornece várias métricas para garantir que o job esteja saudável, como atualização de dados, atraso do sistema, capacidade de processamento do job, utilização da CPU do worker e muito mais. Confira mais informações em Como usar o Monitoring em pipelines do Dataflow.
Para pipelines de fluxos de alterações, há duas métricas principais que devem ser consideradas: a latência do sistema e a atuação dos dados.
A latência do sistema informa a duração máxima atual (em segundos) em que um item de dados é processado ou está aguardando processamento.
A atualização de dados mostra o tempo entre o momento atual (tempo real) e a marca d'água de saída. A marca-d'água de saída do tempo T
indica que todos os elementos com um tempo de evento (estritamente) antes de T
foram processados para computação. Em outras palavras, a métrica de atualização de dados mede o nível de atualização do pipeline em relação ao processamento dos eventos recebidos.
Se o pipeline estiver com poucos recursos, esse efeito vai aparecer nessas duas métricas. A latência do sistema vai aumentar, porque os itens precisam esperar mais para serem processados. A atualização dos dados também vai aumentar, porque o pipeline não vai conseguir acompanhar a quantidade de dados recebidos.
Métricas personalizadas de transmissão de mudanças
Essas métricas são expostas no Cloud Monitoring e incluem:
- Latência bucketizada (histograma) entre um registro sendo confirmado no Spanner e emitido em uma PCollection pelo conector. Essa métrica pode ser usada para conferir problemas de desempenho (latência) com o pipeline.
- Número total de registros de dados lidos. É uma indicação geral do número de registros emitidos pelo conector. Esse número precisa aumentar cada vez mais, refletindo a tendência de gravações no banco de dados do Spanner.
- Número de partições que estão sendo lidas. Sempre haverá partições sendo lidas. Se esse número for zero, isso indica que ocorreu um erro no pipeline.
- Número total de consultas emitidas durante a execução do conector. Essa é uma indicação geral das consultas de fluxo de mudanças feitas na instância do Spanner durante a execução do pipeline. Isso pode ser usado para estimar a carga do conector para o banco de dados do Spanner.
Atualizar um pipeline
É possível atualizar um pipeline em execução que usa o conector SpannerIO
para processar fluxos de alterações se as verificações de compatibilidade
de jobs forem bem-sucedidas. Para fazer isso, é necessário definir explicitamente o parâmetro de nome da tabela de metadados do novo job ao atualizá-lo. Use o valor da opção de pipeline metadataTable
do job que você está atualizando.
Se você estiver usando um modelo do Dataflow fornecido pelo Google, defina o nome da tabela usando o parâmetro spannerMetadataTableName
. Também é possível modificar
o job atual para usar explicitamente a tabela de metadados com o método
withMetadataTable(your-metadata-table-name)
na
configuração do conector. Depois disso, siga as instruções em Como iniciar o job de substituição na documentação do Dataflow para atualizar um job em execução.
Práticas recomendadas para fluxos de alterações e Dataflow
Confira a seguir algumas práticas recomendadas para criar conexões de fluxos de alterações usando o Dataflow.
Usar um banco de dados de metadados separado
Recomendamos criar um banco de dados separado para o conector SpannerIO usar para armazenamento de metadados, em vez de configurá-lo para usar o banco de dados do aplicativo.
Para mais informações, consulte Considerar um banco de dados de metadados separado.
Dimensionar o cluster
Uma regra geral para um número inicial de workers em um job de fluxos de mudança do Spanner é um worker por 1.000 gravações por segundo. Essa estimativa pode variar dependendo de vários fatores, como o tamanho de cada transação, quantos registros de fluxo de mudanças são produzidos a partir de uma única transação e outras transformações, agregações ou coletores que estão sendo usados no pipeline.
Após a realocação inicial, é importante acompanhar as métricas mencionadas em Monitorar um pipeline, para garantir que o pipeline esteja funcionando corretamente. Recomendamos experimentar um tamanho inicial de pool de workers e monitorar como o pipeline lida com a carga, aumentando o número de nós, se necessário. A utilização da CPU é uma métrica importante para verificar se a carga é adequada e se mais nós são necessários.
Limitações conhecidas
Há algumas limitações conhecidas ao usar fluxos de alterações do Spanner com o Dataflow:
Escalonamento automático
O suporte ao escalonamento automático para todos os pipelines que incluem SpannerIO.readChangeStream
requer o Apache Beam 2.39.0
ou mais recente.
Se você usar uma versão do Apache Beam anterior à 2.39.0
, os pipelines que incluem
SpannerIO.readChangeStream
precisam especificar explicitamente o algoritmo de escalonamento automático
como NONE
, conforme descrito em Escalonamento automático horizontal.
Para escalonar manualmente um pipeline do Dataflow em vez de usar o escalonamento automático, consulte Como escalonar manualmente um pipeline de streaming.
Runner V2
O conector de fluxos de alterações do Spanner requer o Dataflow Runner V2.
Isso precisa ser especificado manualmente durante a execução ou um erro será
gerado. É possível especificar Runner V2
configurando o job com
--experiments=use_unified_worker,use_runner_v2
.
Snapshot
O conector de fluxos de alterações do Spanner não é compatível com snapshots do Dataflow.
Reduzindo
O conector de fluxos de mudanças do Spanner não oferece suporte a esvaziamento de um job. Só é possível cancelar um job atual.
Também é possível atualizar um pipeline existente sem precisar interromper.
OpenCensus
Para usar o OpenCensus para monitorar seu pipeline, especifique a versão 0.28.3 ou mais recente.
NullPointerException
no início do pipeline
Um bug na versão 2.38.0
do Apache Beam pode causar um NullPointerException
ao iniciar o pipeline sob determinadas condições. Isso impediria o início do job e mostraria esta mensagem de erro:
java.lang.NullPointerException: null value in entry: Cloud Storage_PROJECT_ID=null
Para resolver esse problema, use a versão 2.39.0
ou mais recente do Apache Beam ou
especifique manualmente a versão de beam-sdks-java-core
como 2.37.0
:
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<version>2.37.0</version>
</dependency>