Esta página demonstra como criar pipelines do Dataflow que consomem e encaminham dados de alterações do Spanner através de streams de alterações. Pode usar o código de exemplo nesta página para criar pipelines personalizados.
Conceitos principais
Seguem-se alguns conceitos essenciais para pipelines do Dataflow para streams de alterações.
Dataflow
O Dataflow é um serviço sem servidor, rápido e rentável que suporta o processamento por stream e em lote. Oferece portabilidade com tarefas de processamento escritas através das bibliotecas de código aberto Apache Beam e automatiza o aprovisionamento de infraestruturas e a gestão de clusters. O Dataflow oferece streaming quase em tempo real quando lê a partir de streams de alterações.
Pode usar o Dataflow para consumir streams de alterações do Spanner com o conector SpannerIO, que oferece uma abstração sobre a API Spanner para consultar streams de alterações. Com este conector, não tem de gerir o ciclo de vida das partições das streams de alterações, o que é necessário quando usa a API Spanner diretamente. O conector fornece-lhe um fluxo de registos de alterações de dados para que possa concentrar-se mais na lógica da aplicação e menos nos detalhes específicos da API e na partição dinâmica do fluxo de alterações. Recomendamos que use o conector SpannerIO em vez da API Spanner na maioria das circunstâncias em que precisa de ler dados da stream de alterações.
Os modelos do Dataflow são pipelines do Dataflow pré-criados que implementam exemplos de utilização comuns. Consulte os Modelos de fluxo de dados para ver uma descrição geral.
Pipeline do Dataflow
Um pipeline do Dataflow de streams de alterações do Spanner é composto por quatro partes principais:
- Uma base de dados do Spanner com uma stream de alterações
- O conetor SpannerIO
- Transformações e destinos definidos pelo utilizador
- Um escritor de E/S de destino do Apache Beam
Stream de alterações do Spanner
Para ver detalhes sobre como criar uma stream de alterações, consulte o artigo Crie uma stream de alterações.
Conetor Apache Beam SpannerIO
Este é o conetor SpannerIO descrito na secção anterior do Dataflow.
É um conetor de E/S de origem que emite um PCollection
de registos de alterações de dados
para fases posteriores do pipeline. A
hora do evento
para cada registo de alteração de dados emitido
será a indicação de tempo de confirmação. Tenha em atenção que os registos emitidos estão
desordenados e que o conector SpannerIO garante que não existem
registos atrasados.
Quando trabalha com streams de alterações, o Dataflow usa a criação de pontos de verificação. Como resultado, cada trabalhador pode aguardar até ao intervalo de pontos de verificação configurado para colocar as alterações em buffer antes de as enviar para processamento adicional.
Transformações definidas pelo utilizador
Uma transformação definida pelo utilizador permite que um utilizador agregue, transforme ou modifique os dados de processamento num pipeline do Dataflow. Os exemplos de utilização comuns incluem a remoção de informações de identificação pessoal, a satisfação dos requisitos de formato de dados a jusante e a ordenação. Consulte a documentação oficial do Apache Beam para ver o guia de programação sobre transformações.
Gravador de E/S do coletor do Apache Beam
O Apache Beam contém conetores de I/O incorporados que podem ser usados para escrever a partir de um pipeline do Dataflow num ponto de saída de dados, como o BigQuery. Os destinos de dados mais comuns são suportados nativamente.
Modelos do Dataflow
Os modelos do Dataflow oferecem um método para criar tarefas do Dataflow com base em imagens Docker pré-criadas para casos de utilização comuns através da Google Cloud consola, da Google Cloud CLI ou de chamadas da API REST.
Para streams de alterações do Spanner, disponibilizamos três modelos flexíveis do Dataflow:
Aplicam-se as seguintes restrições quando usa o modelo Spanner change streams para Pub/Sub:
O Pub/Sub tem uma limitação de tamanho de mensagem de 10 MB. Para mais informações, consulte o artigo Quotas e limites do Pub/Sub.
O modelo Spanner change streams to Pub/Sub não suporta o processamento de mensagens grandes devido a limitações do Pub/Sub.
Defina autorizações da IAM para modelos do Dataflow
Antes de criar uma tarefa do Dataflow com os três modelos flexíveis indicados, certifique-se de que tem as autorizações de IAM necessárias para as seguintes contas de serviço:
Se não tiver as autorizações da IAM necessárias, tem de especificar uma conta de serviço de worker gerida pelo utilizador para criar a tarefa do Dataflow. Para mais informações, consulte o artigo Segurança e autorizações do fluxo de dados.
Quando tenta executar uma tarefa a partir de um modelo flexível do Dataflow sem todas as autorizações necessárias, a tarefa pode falhar com um erro de leitura do ficheiro de resultados ou um erro de autorização negada no recurso. Para mais informações, consulte o artigo Resolva problemas com modelos flexíveis.
Crie um pipeline do Dataflow
Esta secção aborda a configuração inicial do conector e fornece exemplos de integrações comuns com a funcionalidade de streams de alterações do Spanner.
Para seguir estes passos, precisa de um ambiente de desenvolvimento Java para o Dataflow. Para mais informações, consulte o artigo Crie um pipeline do Dataflow com Java.
Crie uma stream de alterações
Para ver detalhes sobre como criar uma stream de alterações, consulte o artigo Crie uma stream de alterações. Para continuar com os passos seguintes, tem de ter uma base de dados do Spanner com uma stream de alterações configurada.
Conceda privilégios de controlo de acesso detalhados
Se espera que os utilizadores do controlo de acesso detalhado executem a tarefa do Dataflow,
certifique-se de que os utilizadores têm acesso a uma função de base de dados
que tenha o privilégio SELECT
no fluxo de alterações e o privilégio EXECUTE
na função de valor de tabela do fluxo de alterações. Certifique-se também de que o principal especifica a função da base de dados na configuração do SpannerIO ou no modelo flexível do Dataflow.
Para mais informações, consulte o artigo Acerca do controlo de acesso detalhado.
Adicione o conetor SpannerIO como uma dependência
O conetor Apache Beam SpannerIO encapsula a complexidade do consumo dos fluxos de alterações diretamente através da API Cloud Spanner, emitindo uma PCollection de registos de dados de fluxos de alterações para fases posteriores do pipeline.
Estes objetos podem ser consumidos noutras fases do pipeline de Dataflow do utilizador. A integração de fluxo de alterações faz parte do conector SpannerIO. Para poder usar o conector SpannerIO, tem de adicionar a dependência ao ficheiro pom.xml
:
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
<version>${beam-version}</version> <!-- available from version 2.38.0 -->
</dependency>
Crie uma base de dados de metadados
O conetor tem de monitorizar cada partição quando executa o pipeline do Apache Beam. Mantém estes metadados numa tabela do Spanner criada pelo conetor durante a inicialização. Especifica a base de dados na qual esta tabela vai ser criada quando configurar o conetor.
Conforme descrito nas práticas recomendadas para streams de alterações, recomendamos que crie uma nova base de dados para este fim, em vez de permitir que o conector use a base de dados da sua aplicação para armazenar a respetiva tabela de metadados.
O proprietário de uma tarefa do Dataflow que usa o conetor SpannerIO tem de ter as seguintes autorizações da IAM definidas com esta base de dados de metadados:
spanner.databases.updateDdl
spanner.databases.beginReadOnlyTransaction
spanner.databases.beginOrRollbackReadWriteTransaction
spanner.databases.read
spanner.databases.select
spanner.databases.write
spanner.sessions.create
spanner.sessions.get
Configure o conetor
O conetor de streams de alterações do Spanner pode ser configurado da seguinte forma:
SpannerConfig spannerConfig = SpannerConfig
.create()
.withProjectId("my-project-id")
.withInstanceId("my-instance-id")
.withDatabaseId("my-database-id")
.withDatabaseRole("my-database-role"); // Needed for fine-grained access control only
Timestamp startTime = Timestamp.now();
Timestamp endTime = Timestamp.ofTimeSecondsAndNanos(
startTime.getSeconds() + (10 * 60),
startTime.getNanos()
);
SpannerIO
.readChangeStream()
.withSpannerConfig(spannerConfig)
.withChangeStreamName("my-change-stream")
.withMetadataInstance("my-meta-instance-id")
.withMetadataDatabase("my-meta-database-id")
.withMetadataTable("my-meta-table-name")
.withRpcPriority(RpcPriority.MEDIUM)
.withInclusiveStartAt(startTime)
.withInclusiveEndAt(endTime);
Seguem-se as descrições das opções readChangeStream()
:
Configuração do Spanner (obrigatória)
Usado para configurar o projeto, a instância e a base de dados a partir dos quais a stream de alterações foi criada e deve ser consultada. Também especifica opcionalmente a função da base de dados a usar quando o principal do IAM que está a executar a tarefa do Dataflow é um utilizador do controlo de acesso detalhado. A tarefa assume esta função da base de dados para aceder à stream de alterações. Para mais informações, consulte o artigo Acerca do controlo de acesso detalhado.
Alterar nome da stream (obrigatório)
Este nome identifica exclusivamente a stream de alterações. O nome aqui tem de ser o mesmo que o usado quando o criou.
ID da instância de metadados (opcional)
Esta é a instância para armazenar os metadados usados pelo conector para controlar o consumo dos dados da API de fluxo de alterações.
ID da base de dados de metadados (obrigatório)
Esta é a base de dados para armazenar os metadados usados pelo conetor para controlar o consumo dos dados da API Change Stream.
Nome da tabela de metadados (opcional)
Isto só deve ser usado quando atualiza um pipeline existente.
Este é o nome da tabela de metadados pré-existente a ser usado pelo conector. Isto é usado pelo conetor para armazenar os metadados para controlar o consumo dos dados da API Change Stream. Se esta opção for omitida, o Spanner cria uma nova tabela com um nome gerado na inicialização do conetor.
Prioridade da RPC (opcional)
A prioridade
do pedido a usar para as consultas de streams de alterações. Se este parâmetro for omitido, é usado o valor high
priority
.
InclusiveStartAt (obrigatório)
As alterações a partir da data/hora especificada são devolvidas ao autor da chamada.
InclusiveEndAt (opcional)
As alterações até à data/hora especificada são devolvidas ao autor da chamada. Se este parâmetro for omitido, as alterações são emitidas indefinidamente.
Adicione transformações e destinos para processar dados de alterações
Com os passos anteriores concluídos, o conetor SpannerIO configurado está pronto para emitir uma PCollection de objetos DataChangeRecord
.
Consulte Exemplos de transformações e destinos para ver várias configurações de pipelines de exemplo que processam estes dados de streaming de várias formas.
Tenha em atenção que os registos de fluxo de alterações emitidos pelo conetor SpannerIO não estão ordenados. Isto acontece porque as PCollections não oferecem garantias de ordenação. Se precisar de uma transmissão ordenada, tem de agrupar e ordenar os registos como transformações nos seus pipelines: consulte o Exemplo: ordenar por chave. Pode expandir este exemplo para ordenar os registos com base em quaisquer campos dos registos, como os IDs das transações.
Exemplos de transformações e destinos
Pode definir as suas próprias transformações e especificar destinos para escrever os dados. A documentação do Apache Beam oferece inúmeras transformações que podem ser aplicadas, bem como conetores de E/S prontos a usar para escrever os dados em sistemas externos.
Exemplo: ordenar por chave
Este exemplo de código emite registos de alterações de dados ordenados pela data/hora de confirmação e agrupados por chaves primárias através do conetor do Dataflow.
pipeline
.apply(SpannerIO
.readChangeStream()
.withSpannerConfig(SpannerConfig
.create()
.withProjectId("my-project-id")
.withInstanceId("my-instance-id")
.withDatabaseId("my-database-id")
.withDatabaseRole("my-database-role")) // Needed for fine-grained access control only
.withChangeStreamName("my-change-stream")
.withMetadataInstance("my-metadata-instance-id")
.withMetadataDatabase("my-metadata-database-id")
.withInclusiveStartAt(Timestamp.now()))
.apply(ParDo.of(new BreakRecordByModFn()))
.apply(ParDo.of(new KeyByIdFn()))
.apply(ParDo.of(new BufferKeyUntilOutputTimestamp()))
// Subsequent processing goes here
Este exemplo de código usa estados e temporizadores para armazenar em buffer registos para cada chave e define a hora de expiração do temporizador para alguma hora T
configurada pelo utilizador no futuro (definida na função BufferKeyUntilOutputTimestamp). Quando a marca de água do fluxo de dados passa o tempo T
, este código limpa todos os registos na memória intermédia com uma data/hora inferior a T
, ordena estes registos por data/hora de confirmação e gera um par de chave-valor em que:
- A chave é a chave de entrada, ou seja, a chave principal com hash para uma matriz de contentores de tamanho 1000.
- O valor são os registos de alterações de dados ordenados que foram colocados em buffer para a chave.
Para cada chave, temos as seguintes garantias:
- Os temporizadores são acionados pela ordem da data/hora de expiração.
- É garantido que as fases posteriores recebem os elementos na mesma ordem em que foram produzidos.
Por exemplo, com uma chave do valor 100, o temporizador é acionado às T1
e T10
, respetivamente, produzindo um pacote de registos de alterações de dados em cada carimbo de data/hora. Uma vez que os registos de alterações de dados gerados às T1
foram produzidos antes dos registos de alterações de dados gerados às T10
, também é garantido que os registos de alterações de dados gerados às T1
são recebidos pela fase seguinte antes dos registos de alterações de dados gerados às T10
. Este mecanismo ajuda-nos a garantir uma ordenação rigorosa da data/hora de confirmação por chave primária para o processamento a jusante.
Este processo repete-se até que o pipeline termine e todos os registos de alterações de dados tenham sido processados (ou repete-se indefinidamente se não for especificado um horário de fim).
Tenha em atenção que este exemplo de código usa estados e temporizadores, em vez de janelas, para executar a ordenação por chave. A justificação é que não é garantido que as janelas sejam processadas por ordem. Isto significa que as janelas mais antigas podem ser processadas mais tarde do que as janelas mais recentes, o que pode resultar num processamento desordenado.
BreakRecordByModFn
Cada registo de alteração de dados pode conter várias modificações. Cada modificação representa uma inserção, uma atualização ou uma eliminação de um único valor de chave principal. Esta função divide cada registo de alteração de dados em registos de alteração de dados separados, um por modificação.
private static class BreakRecordByModFn extends DoFn<DataChangeRecord,
DataChangeRecord> {
@ProcessElement
public void processElement(
@Element DataChangeRecord record, OutputReceiver<DataChangeRecord>
outputReceiver) {
record.getMods().stream()
.map(
mod ->
new DataChangeRecord(
record.getPartitionToken(),
record.getCommitTimestamp(),
record.getServerTransactionId(),
record.isLastRecordInTransactionInPartition(),
record.getRecordSequence(),
record.getTableName(),
record.getRowType(),
Collections.singletonList(mod),
record.getModType(),
record.getValueCaptureType(),
record.getNumberOfRecordsInTransaction(),
record.getNumberOfPartitionsInTransaction(),
record.getTransactionTag(),
record.isSystemTransaction(),
record.getMetadata()))
.forEach(outputReceiver::output);
}
}
KeyByIdFn
Esta função recebe um DataChangeRecord
e produz um DataChangeRecord
com chave pela chave primária do Spanner com hash para um valor inteiro.
private static class KeyByIdFn extends DoFn<DataChangeRecord, KV<String, DataChangeRecord>> {
// NUMBER_OF_BUCKETS should be configured by the user to match their key cardinality
// Here, we are choosing to hash the Spanner primary keys to a bucket index, in order to have a deterministic number
// of states and timers for performance purposes.
// Note that having too many buckets might have undesirable effects if it results in a low number of records per bucket
// On the other hand, having too few buckets might also be problematic, since many keys will be contained within them.
private static final int NUMBER_OF_BUCKETS = 1000;
@ProcessElement
public void processElement(
@Element DataChangeRecord record,
OutputReceiver<KV<String, DataChangeRecord>> outputReceiver) {
int hashCode = (int) record.getMods().get(0).getKeysJson().hashCode();
// Hash the received keys into a bucket in order to have a
// deterministic number of buffers and timers.
String bucketIndex = String.valueOf(hashCode % NUMBER_OF_BUCKETS);
outputReceiver.output(KV.of(bucketIndex, record));
}
}
BufferKeyUntilOutputTimestamp
Os temporizadores e os buffers são por chave. Esta função armazena em buffer cada registo de alteração de dados até a marca de água passar a data/hora em que queremos gerar os registos de alteração de dados armazenados em buffer.
Este código usa um temporizador de ciclo para determinar quando limpar o buffer:
- Quando vê um registo de alteração de dados para uma chave pela primeira vez, define o temporizador para ser acionado na data/hora de confirmação do registo de alteração de dados +
incrementIntervalSeconds
(uma opção configurável pelo utilizador). - Quando o temporizador é acionado, adiciona todos os registos de alterações de dados no buffer com data/hora inferior à data/hora de expiração do temporizador a
recordsToOutput
. Se o buffer tiver registos de alterações de dados cuja data/hora seja igual ou superior à data/hora de expiração do temporizador, adiciona esses registos de alterações de dados novamente ao buffer em vez de os gerar. Em seguida, define o temporizador seguinte para a hora de expiração do temporizador atual maisincrementIntervalInSeconds
. - Se
recordsToOutput
não estiver vazio, a função ordena os registos de alteração de dados emrecordsToOutput
pela data/hora de confirmação e pelo ID da transação e, em seguida, produz os registos.
private static class BufferKeyUntilOutputTimestamp extends
DoFn<KV<String, DataChangeRecord>, KV<String, Iterable<DataChangeRecord>>> {
private static final Logger LOG =
LoggerFactory.getLogger(BufferKeyUntilOutputTimestamp.class);
private final long incrementIntervalInSeconds = 2;
private BufferKeyUntilOutputTimestamp(long incrementIntervalInSeconds) {
this.incrementIntervalInSeconds = incrementIntervalInSeconds;
}
@SuppressWarnings("unused")
@TimerId("timer")
private final TimerSpec timerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
@StateId("buffer")
private final StateSpec<BagState<DataChangeRecord>> buffer = StateSpecs.bag();
@StateId("keyString")
private final StateSpec<ValueState<String>> keyString =
StateSpecs.value(StringUtf8Coder.of());
@ProcessElement
public void process(
@Element KV<String, DataChangeRecord> element,
@StateId("buffer") BagState<DataChangeRecord> buffer,
@TimerId("timer") Timer timer,
@StateId("keyString") ValueState<String> keyString) {
buffer.add(element.getValue());
// Only set the timer if this is the first time we are receiving a data change
// record with this key.
String elementKey = keyString.read();
if (elementKey == null) {
Instant commitTimestamp =
new Instant(element.getValue().getCommitTimestamp().toSqlTimestamp());
Instant outputTimestamp =
commitTimestamp.plus(Duration.standardSeconds(incrementIntervalInSeconds));
timer.set(outputTimestamp);
keyString.write(element.getKey());
}
}
@OnTimer("timer")
public void onExpiry(
OnTimerContext context,
@StateId("buffer") BagState<DataChangeRecord> buffer,
@TimerId("timer") Timer timer,
@StateId("keyString") ValueState<String> keyString) {
if (!buffer.isEmpty().read()) {
String elementKey = keyString.read();
final List<DataChangeRecord> records =
StreamSupport.stream(buffer.read().spliterator(), false)
.collect(Collectors.toList());
buffer.clear();
List<DataChangeRecord> recordsToOutput = new ArrayList<>();
for (DataChangeRecord record : records) {
Instant recordCommitTimestamp =
new Instant(record.getCommitTimestamp().toSqlTimestamp());
final String recordString =
record.getMods().get(0).getNewValuesJson().isEmpty()
? "Deleted record"
: record.getMods().get(0).getNewValuesJson();
// When the watermark passes time T, this means that all records with
// event time < T have been processed and successfully committed. Since the
// timer fires when the watermark passes the expiration time, we should
// only output records with event time < expiration time.
if (recordCommitTimestamp.isBefore(context.timestamp())) {
LOG.info(
"Outputting record with key {} and value {} at expiration " +
"timestamp {}",
elementKey,
recordString,
context.timestamp().toString());
recordsToOutput.add(record);
} else {
LOG.info(
"Expired at {} but adding record with key {} and value {} back to " +
"buffer due to commit timestamp {}",
context.timestamp().toString(),
elementKey,
recordString,
recordCommitTimestamp.toString());
buffer.add(record);
}
}
// Output records, if there are any to output.
if (!recordsToOutput.isEmpty()) {
// Order the records in place, and output them. The user would need
// to implement DataChangeRecordComparator class that sorts the
// data change records by commit timestamp and transaction ID.
Collections.sort(recordsToOutput, new DataChangeRecordComparator());
context.outputWithTimestamp(
KV.of(elementKey, recordsToOutput), context.timestamp());
LOG.info(
"Expired at {}, outputting records for key {}",
context.timestamp().toString(),
elementKey);
} else {
LOG.info("Expired at {} with no records", context.timestamp().toString());
}
}
Instant nextTimer = context.timestamp().plus(Duration.standardSeconds(incrementIntervalInSeconds));
if (buffer.isEmpty() != null && !buffer.isEmpty().read()) {
LOG.info("Setting next timer to {}", nextTimer.toString());
timer.set(nextTimer);
} else {
LOG.info(
"Timer not being set since the buffer is empty: ");
keyString.clear();
}
}
}
Ordenar transações
É possível alterar esta conduta para ordenar por ID da transação e data/hora de confirmação. Para tal, armazene em buffer os registos de cada par de ID da transação / indicação de tempo de confirmação, em vez de cada chave do Spanner. Isto requer a modificação do código em KeyByIdFn.
Exemplo: reunir transações
Este exemplo de código lê registos de alterações de dados, reúne todos os registos de alterações de dados pertencentes à mesma transação num único elemento e produz esse elemento. Tenha em atenção que as transações geradas por este código de exemplo não estão ordenadas pela data/hora de confirmação.
Este exemplo de código usa buffers para reunir transações a partir de registos de alterações de dados. Ao receber pela primeira vez um registo de alteração de dados pertencente a uma transação, lê o campo numberOfRecordsInTransaction
no registo de alteração de dados, que descreve o número esperado de registos de alteração de dados pertencentes a essa transação. Armazena em buffer os registos de alterações de dados pertencentes a essa transação até que o número de registos armazenados em buffer corresponda a numberOfRecordsInTransaction
, após o que produz os registos de alterações de dados agrupados.
pipeline
.apply(SpannerIO
.readChangeStream()
.withSpannerConfig(SpannerConfig
.create()
.withProjectId("my-project-id")
.withInstanceId("my-instance-id")
.withDatabaseId("my-database-id")
.withDatabaseRole("my-database-role")) // Needed for fine-grained access control only
.withChangeStreamName("my-change-stream")
.withMetadataInstance("my-metadata-instance-id")
.withMetadataDatabase("my-metadata-database-id")
.withInclusiveStartAt(Timestamp.now()))
.apply(ParDo.of(new KeyByTransactionIdFn()))
.apply(ParDo.of(new TransactionBoundaryFn()))
// Subsequent processing goes here
KeyByTransactionIdFn
Esta função recebe um DataChangeRecord
e produz um DataChangeRecord
com chave pelo ID da transação.
private static class KeyByTransactionIdFn extends DoFn<DataChangeRecord, KV<String, DataChangeRecord>> {
@ProcessElement
public void processElement(
@Element DataChangeRecord record,
OutputReceiver<KV<String, DataChangeRecord>> outputReceiver) {
outputReceiver.output(KV.of(record.getServerTransactionId(), record));
}
}
TransactionBoundaryFn
TransactionBoundaryFn
armazena em buffer pares de chave-valor de
{TransactionId, DataChangeRecord}
de KeyByTransactionIdFn
e
armazena-os em buffer em grupos com base em TransactionId
. Quando o número de registos em buffer é igual ao número de registos contidos na transação completa, esta função ordena os objetos DataChangeRecord
no grupo por sequência de registos e gera um par chave-valor de {CommitTimestamp, TransactionId}
, Iterable<DataChangeRecord>
.
Aqui, estamos a assumir que SortKey
é uma classe definida pelo utilizador que representa um par {CommitTimestamp, TransactionId}
. Para mais informações sobre o
SortKey
, consulte a implementação de exemplo.
private static class TransactionBoundaryFn extends DoFn<KV<String, DataChangeRecord>, KV<SortKey, Iterable<DataChangeRecord>>> {
@StateId("buffer")
private final StateSpec<BagState<DataChangeRecord>> buffer = StateSpecs.bag();
@StateId("count")
private final StateSpec<ValueState<Integer>> countState = StateSpecs.value();
@ProcessElement
public void process(
ProcessContext context,
@StateId("buffer") BagState<DataChangeRecord> buffer,
@StateId("count") ValueState<Integer> countState) {
final KV<String, DataChangeRecord> element = context.element();
final DataChangeRecord record = element.getValue();
buffer.add(record);
int count = (countState.read() != null ? countState.read() : 0);
count = count + 1;
countState.write(count);
if (count == record.getNumberOfRecordsInTransaction()) {
final List<DataChangeRecord> sortedRecords =
StreamSupport.stream(buffer.read().spliterator(), false)
.sorted(Comparator.comparing(DataChangeRecord::getRecordSequence))
.collect(Collectors.toList());
final Instant commitInstant =
new Instant(sortedRecords.get(0).getCommitTimestamp().toSqlTimestamp()
.getTime());
context.outputWithTimestamp(
KV.of(
new SortKey(sortedRecords.get(0).getCommitTimestamp(),
sortedRecords.get(0).getServerTransactionId()),
sortedRecords),
commitInstant);
buffer.clear();
countState.clear();
}
}
}
Exemplo: filtre por etiqueta de transação
Quando uma transação que modifica os dados do utilizador é etiquetada, a etiqueta correspondente e o respetivo tipo são armazenados como parte de DataChangeRecord
. Estes exemplos demonstram como filtrar registos de streams de alterações com base em etiquetas de transações definidas pelo utilizador, bem como etiquetas do sistema:
Filtragem de etiquetas definida pelo utilizador para my-tx-tag
:
pipeline
.apply(SpannerIO
.readChangeStream()
.withSpannerConfig(SpannerConfig
.create()
.withProjectId("my-project-id")
.withInstanceId("my-instance-id")
.withDatabaseId("my-database-id")
.withDatabaseRole("my-database-role")) // Needed for fine-grained access control only
.withChangeStreamName("my-change-stream")
.withMetadataInstance("my-metadata-instance-id")
.withMetadataDatabase("my-metadata-database-id")
.withInclusiveStartAt(Timestamp.now()))
.apply(Filter.by(record ->
!record.isSystemTransaction()
&& record.getTransactionTag().equalsIgnoreCase("my-tx-tag")))
// Subsequent processing goes here
Filtragem de etiquetas do sistema/auditoria de TTL:
pipeline
.apply(SpannerIO
.readChangeStream()
.withSpannerConfig(SpannerConfig
.create()
.withProjectId("my-project-id")
.withInstanceId("my-instance-id")
.withDatabaseId("my-database-id")
.withDatabaseRole("my-database-role")) // Needed for fine-grained access control only
.withChangeStreamName("my-change-stream")
.withMetadataInstance("my-metadata-instance-id")
.withMetadataDatabase("my-metadata-database-id")
.withInclusiveStartAt(Timestamp.now()))
.apply(Filter.by(record ->
record.isSystemTransaction()
&& record.getTransactionTag().equals("RowDeletionPolicy")))
// Subsequent processing goes here
Exemplo: obter linha completa
Este exemplo funciona com uma tabela do Spanner denominada Singer
que tem a seguinte definição:
CREATE TABLE Singers (
SingerId INT64 NOT NULL,
FirstName STRING(1024),
LastName STRING(1024)
) PRIMARY KEY (SingerId);
No modo de captura de valores OLD_AND_NEW_VALUES
predefinido das streams de alterações, quando existe uma atualização a uma linha do Spanner, o registo de alteração de dados recebido contém apenas as colunas que foram alteradas. As colunas acompanhadas, mas
inalteradas, não são incluídas no registo. A chave principal da modificação pode ser usada para fazer uma leitura de instantâneo do Spanner na data/hora de confirmação do registo de alteração de dados para obter as colunas inalteradas ou até mesmo recuperar a linha completa.
Tenha em atenção que a política de retenção da base de dados pode ter de ser alterada para um valor superior ou igual à política de retenção do fluxo de alterações para que a leitura da captura de ecrã seja bem-sucedida.
Tenha também em atenção que a utilização do tipo de captura de valor NEW_ROW
é a forma recomendada e mais eficiente de o fazer, uma vez que devolve todas as colunas acompanhadas da linha por predefinição e não requer uma leitura de instantâneo adicional no Spanner.
SpannerConfig spannerConfig = SpannerConfig
.create()
.withProjectId("my-project-id")
.withInstanceId("my-instance-id")
.withDatabaseId("my-database-id")
.withDatabaseRole("my-database-role"); // Needed for fine-grained access control only
pipeline
.apply(SpannerIO
.readChangeStream()
.withSpannerConfig(spannerConfig)
// Assume we have a change stream "my-change-stream" that watches Singers table.
.withChangeStreamName("my-change-stream")
.withMetadataInstance("my-metadata-instance-id")
.withMetadataDatabase("my-metadata-database-id")
.withInclusiveStartAt(Timestamp.now()))
.apply(ParDo.of(new ToFullRowJsonFn(spannerConfig)))
// Subsequent processing goes here
ToFullRowJsonFn
Esta transformação executa uma leitura desatualizada na indicação de tempo de confirmação de cada registo recebido e mapeia a linha completa para JSON.
public class ToFullRowJsonFn extends DoFn<DataChangeRecord, String> {
// Since each instance of this DoFn will create its own session pool and will
// perform calls to Spanner sequentially, we keep the number of sessions in
// the pool small. This way, we avoid wasting resources.
private static final int MIN_SESSIONS = 1;
private static final int MAX_SESSIONS = 5;
private final String projectId;
private final String instanceId;
private final String databaseId;
private transient DatabaseClient client;
private transient Spanner spanner;
public ToFullRowJsonFn(SpannerConfig spannerConfig) {
this.projectId = spannerConfig.getProjectId().get();
this.instanceId = spannerConfig.getInstanceId().get();
this.databaseId = spannerConfig.getDatabaseId().get();
}
@Setup
public void setup() {
SessionPoolOptions sessionPoolOptions = SessionPoolOptions
.newBuilder()
.setMinSessions(MIN_SESSIONS)
.setMaxSessions(MAX_SESSIONS)
.build();
SpannerOptions options = SpannerOptions
.newBuilder()
.setProjectId(projectId)
.setSessionPoolOption(sessionPoolOptions)
.build();
DatabaseId id = DatabaseId.of(projectId, instanceId, databaseId);
spanner = options.getService();
client = spanner.getDatabaseClient(id);
}
@Teardown
public void teardown() {
spanner.close();
}
@ProcessElement
public void process(
@Element DataChangeRecord element,
OutputReceiver<String> output) {
com.google.cloud.Timestamp commitTimestamp = element.getCommitTimestamp();
element.getMods().forEach(mod -> {
JSONObject keysJson = new JSONObject(mod.getKeysJson());
JSONObject newValuesJson = new JSONObject(mod.getNewValuesJson());
ModType modType = element.getModType();
JSONObject jsonRow = new JSONObject();
long singerId = keysJson.getLong("SingerId");
jsonRow.put("SingerId", singerId);
if (modType == ModType.INSERT) {
// For INSERT mod, get non-primary key columns from mod.
jsonRow.put("FirstName", newValuesJson.get("FirstName"));
jsonRow.put("LastName", newValuesJson.get("LastName"));
} else if (modType == ModType.UPDATE) {
// For UPDATE mod, get non-primary key columns by doing a snapshot read using the primary key column from mod.
try (ResultSet resultSet = client
.singleUse(TimestampBound.ofReadTimestamp(commitTimestamp))
.read(
"Singers",
KeySet.singleKey(com.google.cloud.spanner.Key.of(singerId)),
Arrays.asList("FirstName", "LastName"))) {
if (resultSet.next()) {
jsonRow.put("FirstName", resultSet.isNull("FirstName") ?
JSONObject.NULL : resultSet.getString("FirstName"));
jsonRow.put("LastName", resultSet.isNull("LastName") ?
JSONObject.NULL : resultSet.getString("LastName"));
}
}
} else {
// For DELETE mod, there is nothing to do, as we already set SingerId.
}
output.output(jsonRow.toString());
});
}
}
Este código cria um cliente de base de dados do Spanner para realizar a obtenção completa de linhas e configura o conjunto de sessões para ter apenas algumas sessões, realizando leituras numa instância do ToFullReowJsonFn
sequencialmente.
O Dataflow garante que gera muitas instâncias desta função, cada uma com o seu próprio conjunto de clientes.
Exemplo: do Spanner para o Pub/Sub
Neste cenário, o autor da chamada transmite registos para o Pub/Sub o mais rapidamente possível, sem qualquer agrupamento ou agregação. Esta opção é adequada para acionar o processamento a jusante, como o streaming de todas as novas linhas inseridas numa tabela do Spanner para o Pub/Sub para processamento adicional.
pipeline
.apply(SpannerIO
.readChangeStream()
.withSpannerConfig(SpannerConfig
.create()
.withProjectId("my-project-id")
.withInstanceId("my-instance-id")
.withDatabaseId("my-database-id")
.withDatabaseRole("my-database-role")) // Needed for fine-grained access control only
.withChangeStreamName("my-change-stream")
.withMetadataInstance("my-metadata-instance-id")
.withMetadataDatabase("my-metadata-database-id")
.withInclusiveStartAt(Timestamp.now()))
.apply(MapElements.into(TypeDescriptors.strings()).via(Object::toString))
.apply(PubsubIO.writeStrings().to("my-topic"));
Tenha em atenção que o destino do Pub/Sub pode ser configurado para garantir a semântica exatamente uma vez.
Exemplo: do Spanner para o Cloud Storage
Neste cenário, o autor da chamada agrupa todos os registos numa determinada janela e guarda o grupo em ficheiros do Cloud Storage separados. Isto é adequado para estatísticas e arquivo de um momento específico, que é independente do período de retenção do Spanner.
pipeline
.apply(SpannerIO
.readChangeStream()
.withSpannerConfig(SpannerConfig
.create()
.withProjectId("my-project-id")
.withInstanceId("my-instance-id")
.withDatabaseId("my-database-id")
.withDatabaseRole("my-database-role")) // Needed for fine-grained access control only
.withChangeStreamName("my-change-stream")
.withMetadataInstance("my-metadata-instance-id")
.withMetadataDatabase("my-metadata-database-id")
.withInclusiveStartAt(Timestamp.now()))
.apply(MapElements.into(TypeDescriptors.strings()).via(Object::toString))
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
.apply(TextIO
.write()
.to("gs://my-bucket/change-stream-results-")
.withSuffix(".txt")
.withWindowedWrites()
.withNumShards(1));
Tenha em atenção que o destino do Cloud Storage fornece semântica, pelo menos, uma vez por predefinição. Com processamento adicional, pode ser modificado para ter uma semântica exatamente uma vez.
Também disponibilizamos um modelo do Dataflow para este exemplo de utilização: consulte o artigo Ligue streams de alterações ao Cloud Storage.
Exemplo: do Spanner para o BigQuery (tabela de registo)
Aqui, o autor da chamada transmite registos de alterações para o BigQuery. Cada registo de alteração de dados é refletido como uma linha no BigQuery. Esta opção é adequada para as estatísticas. Este código usa as funções definidas anteriormente, na secção Obter linha completa, para obter a linha completa do registo e escrevê-la no BigQuery.
SpannerConfig spannerConfig = SpannerConfig
.create()
.withProjectId("my-project-id")
.withInstanceId("my-instance-id")
.withDatabaseId("my-database-id")
.withDatabaseRole("my-database-role"); // Needed for fine-grained access control only
pipeline
.apply(SpannerIO
.readChangeStream()
.withSpannerConfig(spannerConfig)
.withChangeStreamName("my-change-stream")
.withMetadataInstance("my-metadata-instance-id")
.withMetadataDatabase("my-metadata-database-id")
.withInclusiveStartAt(Timestamp.now()))
.apply(ParDo.of(new ToFullRowJsonFn(spannerConfig)))
.apply(BigQueryIO
.<String>write()
.to("my-bigquery-table")
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(Write.WriteDisposition.WRITE_APPEND)
.withSchema(new TableSchema().setFields(Arrays.asList(
new TableFieldSchema()
.setName("SingerId")
.setType("INT64")
.setMode("REQUIRED"),
new TableFieldSchema()
.setName("FirstName")
.setType("STRING")
.setMode("REQUIRED"),
new TableFieldSchema()
.setName("LastName")
.setType("STRING")
.setMode("REQUIRED")
)))
.withAutoSharding()
.optimizedWrites()
.withFormatFunction((String element) -> {
ObjectMapper objectMapper = new ObjectMapper();
JsonNode jsonNode = null;
try {
jsonNode = objectMapper.readTree(element);
} catch (IOException e) {
e.printStackTrace();
}
return new TableRow()
.set("SingerId", jsonNode.get("SingerId").asInt())
.set("FirstName", jsonNode.get("FirstName").asText())
.set("LastName", jsonNode.get("LastName").asText());
}
)
);
Tenha em atenção que o destino do BigQuery fornece semântica, pelo menos, uma vez por predefinição. Com processamento adicional, pode ser modificado para ter uma semântica exatamente uma vez.
Também fornecemos um modelo do Dataflow para este exemplo de utilização. Consulte o artigo Ligue streams de alterações ao BigQuery.
Monitorize uma pipeline
Existem duas classes de métricas disponíveis para monitorizar um pipeline do Dataflow de fluxo de alterações.
Métricas padrão do Dataflow
O Dataflow fornece várias métricas para garantir que a sua tarefa está em bom estado, como a atualidade dos dados, o atraso do sistema, o débito da tarefa, a utilização da CPU do trabalhador e muito mais. Pode encontrar mais informações no artigo Usar a monitorização para pipelines do Dataflow.
Para pipelines de streams de alterações, existem duas métricas principais que devem ser tidas em conta: a latência do sistema e a atualidade dos dados.
A latência do sistema indica a duração máxima atual (em segundos) durante a qual um item de dados é processado ou está a aguardar processamento.
A atualização dos dados mostra a quantidade de tempo entre o momento atual (tempo real) e a marca de água de saída. A marca d'água de tempo de saída T
indica que todos os elementos com uma hora do evento (estritamente) anterior a T
foram processados para cálculo. Por outras palavras, a atualidade dos dados mede o quão atualizado está o pipeline no que diz respeito ao processamento dos eventos que recebeu.
Se o pipeline tiver poucos recursos, pode ver esse efeito nestas duas métricas. A latência do sistema aumenta, porque os itens têm de esperar mais tempo antes de serem processados. A atualidade dos dados também aumenta, porque o pipeline não consegue acompanhar a quantidade de dados recebidos.
Métricas de stream de alterações personalizadas
Estas métricas são expostas no Cloud Monitoring e incluem:
- Latência (histograma) dividida em grupos entre a confirmação de um registo no Spanner e a sua emissão numa PCollection pelo conetor. Esta métrica pode ser usada para ver problemas de desempenho (latência) com o pipeline.
- Número total de registos de dados lidos. Esta é uma indicação geral do número de registos emitidos pelo conector. Este número deve aumentar sempre, refletindo a tendência de gravações na base de dados do Spanner subjacente.
- Número de partições que estão a ser lidas. Deve haver sempre partições a serem lidas. Se este número for zero, indica que ocorreu um erro no pipeline.
- O número total de consultas emitidas durante a execução do conector. Esta é uma indicação geral das consultas de fluxo de alterações feitas à instância do Spanner durante a execução do pipeline. Pode usar esta opção para obter uma estimativa do carregamento do conetor para a base de dados do Spanner.
Atualize um pipeline existente
É possível atualizar um pipeline em execução que usa o conetor SpannerIO para processar streams de alterações se as verificações de compatibilidade de tarefas forem aprovadas. Para o fazer, tem de definir explicitamente o parâmetro do nome da tabela de metadados da nova tarefa quando a atualiza. Use o valor da opção de pipeline do trabalho que está a atualizar.metadataTable
Se estiver a usar um modelo do Dataflow fornecido pela Google, defina o nome da tabela através do parâmetro spannerMetadataTableName
. Também pode modificar
a tarefa existente para usar explicitamente a tabela de metadados com o método
withMetadataTable(your-metadata-table-name)
na
configuração do conetor. Depois de concluir este processo, pode seguir as instruções em Iniciar a tarefa de substituição na documentação do Dataflow para atualizar uma tarefa em execução.
Práticas recomendadas para streams de alterações e Dataflow
Seguem-se algumas práticas recomendadas para criar ligações de streams de alterações com o Dataflow.
Use uma base de dados de metadados separada
Recomendamos que crie uma base de dados separada para o conector SpannerIO usar para o armazenamento de metadados, em vez de o configurar para usar a base de dados da sua aplicação.
Para mais informações, consulte o artigo Considere usar uma base de dados de metadados separada.
Defina o tamanho do cluster
Uma regra geral para um número inicial de trabalhadores num trabalho de streams de alterações do Spanner é um trabalhador por 1000 gravações por segundo. Tenha em atenção que esta estimativa pode variar consoante vários fatores, como a dimensão de cada transação, quantos registos do fluxo de alterações são produzidos a partir de uma única transação e outras transformações, agregações ou destinos que estão a ser usados no pipeline.
Após a atribuição inicial de recursos, é importante acompanhar as métricas mencionadas no artigo Monitorize um pipeline para garantir que o pipeline está em bom estado. Recomendamos que experimente um tamanho inicial do conjunto de trabalhadores e monitorize a forma como o pipeline lida com a carga, aumentando o número de nós, se necessário. A utilização da CPU é uma métrica fundamental para verificar se o carregamento é adequado e se são necessários mais nós.
Limitações conhecidas
Existem algumas limitações conhecidas quando usa streams de alterações do Spanner com o Dataflow:
Escala automática
O suporte de dimensionamento automático para quaisquer pipelines que incluam SpannerIO.readChangeStream
requer o Apache Beam 2.39.0
ou superior.
Se usar uma versão do Apache Beam anterior à 2.39.0
, os pipelines que incluem SpannerIO.readChangeStream
têm de especificar explicitamente o algoritmo de escala automática como NONE
, conforme descrito em Escala automática horizontal.
Para dimensionar manualmente um pipeline do Dataflow em vez de usar a escala automática, consulte o artigo Dimensionar manualmente um pipeline de streaming.
Runner V2
O conetor de streams de alterações do Spanner requer o Dataflow Runner V2.
Tem de ser especificado manualmente durante a execução ou é gerado um erro. Pode especificar Runner V2
configurando a tarefa com
--experiments=use_unified_worker,use_runner_v2
.
Instantâneo
O conetor de streams de alterações do Spanner não suporta instantâneos do Dataflow.
A drenar
O conetor de streams de alterações do Spanner não suporta a interrupção de uma tarefa. Só é possível cancelar um trabalho existente.
Também pode atualizar um pipeline existente sem ter de o parar.
OpenCensus
Para usar o OpenCensus para monitorizar o seu pipeline, especifique a versão 0.28.3 ou posterior.
NullPointerException
no início do pipeline
Um erro na versão 2.38.0
do Apache Beam pode causar um NullPointerException
ao iniciar o pipeline em determinadas condições. Isto impede o início da tarefa e apresenta esta mensagem de erro:
java.lang.NullPointerException: null value in entry: Cloud Storage_PROJECT_ID=null
Para resolver este problema, use a versão 2.39.0
ou posterior do Apache Beam ou especifique manualmente a versão do beam-sdks-java-core
como 2.37.0
:
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<version>2.37.0</version>
</dependency>