Ler um fluxo de alterações com Java
A biblioteca de cliente do Cloud Bigtable para Java fornece métodos de nível inferior para processar registros de alteração de dados. No entanto, na maioria dos casos, recomendamos que você faça streaming de mudanças com o Dataflow em vez de usar os métodos descritos nesta página, porque o Dataflow processa partições e mesclas para você.
Antes de começar
Antes de ler um fluxo de alterações com Java, confira a Visão geral do fluxo de alterações. Em seguida, conclua os pré-requisitos a seguir.
Configurar a autenticação
Para usar os exemplos Java desta página em um ambiente de desenvolvimento local, instale e inicialize o gcloud CLI e e configure o Application Default Credentials com suas credenciais de usuário.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
If you're using a local shell, then create local authentication credentials for your user account:
gcloud auth application-default login
You don't need to do this if you're using Cloud Shell.
Confira mais informações em Set up authentication for a local development environment.
Para informações sobre como configurar a autenticação para um ambiente de produção, consulte Set up Application Default Credentials for code running on Google Cloud.
Ativar um fluxo de alterações
Você precisa ativar um fluxo de alterações em uma tabela antes de poder lê-lo. Também é possível criar uma nova tabela com um fluxo de alterações ativado.
Funções exigidas
Para receber as permissões necessárias para ler um fluxo de alterações do Bigtable, peça ao administrador para conceder a você o seguinte papel do IAM.
- Administrador do Bigtable (
roles/bigtable.admin
) na instância do Bigtable que contém a tabela com as alterações que você pretende mostrar
Adicionar a biblioteca de cliente em Java como uma dependência
Adicione um código semelhante ao código a seguir no seu arquivo pom.xml
do Maven. Substitua VERSION
pela versão da biblioteca de cliente que você usa. A versão precisa ser 2.21.0 ou posterior.
<dependencies>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bigtable</artifactId>
<version>VERSION</version>
</dependency>
</dependencies>
Determinar as partições da tabela
Para começar a fazer solicitações ReadChangeStream
, você precisa conhecer as partições da tabela. Isso pode ser determinado com o método GenerateInitialChangeStreamPartitions
. O exemplo a seguir mostra como
usar esse método para receber um fluxo de
ByteStringRanges
que representa cada partição na tabela. Cada ByteStringRange
contém as chaves de início e fim de uma partição.
ServerStream<ByteStringRange> partitionStream =
client.generateInitialChangeStreamPartitions("MyTable");
Processar alterações de cada partição
Em seguida, processe as alterações de cada partição usando o método ReadChangeStream
. Este é um exemplo de como abrir um fluxo para uma partição, a partir da hora atual.
ReadChangeStreamQuery query =
ReadChangeStreamQuery.create("MyTable")
.streamPartition(partition)
.startTime(Instant.now());
ServerStream<ChangeStreamRecord> changeStream = client.readChangeStream(query);
ReadChangeStreamQuery
aceita os seguintes argumentos:
- Partição de fluxo (obrigatório): a partição que mostra o fluxo de alterações.
- Uma das seguintes opções:
- Horário de início: carimbo de data/hora de confirmação que começa a processar as alterações.
- Tokens de continuação: tokens que representam um ponto de retomada do fluxo.
- Horário de término (opcional): carimbo de data/hora de confirmação que para de processar as alterações quando atingido. Se você não fornecer um valor, a leitura do fluxo continuará.
- Duração do sinal de funcionamento (opcional): frequência das mensagens de sinal de funcionamento quando não há novas alterações (o padrão é cinco segundos).
Formato do registro do fluxo de alterações
Um registro do fluxo de alterações retornado é um dos três tipos de resposta:
ChangeStreamMutation
: uma mensagem que representa um registro de alteração de dados.CloseStream
: uma mensagem que indica que o cliente precisa parar a leitura do fluxo.- Status: indica o motivo para encerramento do fluxo. Uma destas:
OK
: o horário de término da partição especificada foi atingido.OUT_OF_RANGE
: a partição especificada não existe mais. Isso significa que divisões ou mesclagens ocorreram nessa partição. Uma nova solicitaçãoReadChangeStream
precisará ser criada para cada nova partição.
NewPartitions
: contém as informações de particionamento atualizadas sobre respostasOUT_OF_RANGE
.ChangeStreamContinuationTokens
: lista de tokens usados para retomar novas solicitaçõesReadChangeStream
do mesmo ponto. Um porNewPartition
.
- Status: indica o motivo para encerramento do fluxo. Uma destas:
Heartbeat
: uma mensagem periódica com informações que podem ser usadas para verificar o estado do fluxo.EstimatedLowWatermark
: estimativa da marca-d'água baixa da partição especificada.ContinuationToken
: token para retomar o fluxo da partição especificada a partir do ponto atual.
Conteúdo do registro de alteração de dados
Para informações sobre registros de fluxo de alterações, consulte O que há em um registro de alteração de dados.
Processar alterações em partições
Quando as partições de uma tabela são alteradas, as solicitações ReadChangeStream
retornam uma mensagem CloseStream
com as informações necessárias para retomar o fluxo das partições novas.
Em uma divisão, ela conterá várias partições novas e um ContinuationToken
correspondente para cada partição. Para retomar o fluxo das partições novas do mesmo ponto, faça uma nova solicitação ReadChangeStream
para cada partição nova com o token correspondente.
Por exemplo, se você estiver mostrando o fluxo da partição [A,C)
e ela for dividida em duas partições, [A,B)
e [B,C)
, espere a seguinte sequência de eventos:
ReadChangeStream(streamPartition = ByteStringRange(A, C)) receives:
CloseStream(
Status = OUT_OF_RANGE,
NewPartitions = List(ByteStringRange(A, B), ByteStringRange(B, C))
ChangeStreamContinuationTokens = List(foo, bar)
)
Para retomar o fluxo de cada partição do mesmo ponto, envie as seguintes solicitações ReadChangeStreamQuery
:
ReadChangeStreamQuery queryAB =
ReadChangeStreamQuery.create("myTable")
.streamPartition(ByteStringRange(A, B))
.continuationTokens(List.of(foo));
ReadChangeStreamQuery queryBC =
ReadChangeStreamQuery.create("myTable")
.streamPartition(ByteStringRange(B, C))
.continuationTokens(List.of(bar));
Em uma mesclagem, para retomar do mesmo ponto, é necessário enviar uma nova solicitação ReadChangeStream
que contém cada token das partições mescladas.
Por exemplo, se você estiver mostrando o fluxo de duas partições, [A,B)
e [B,C)
, e elas forem mescladas na partição [A,C)
, espere a seguinte sequência de eventos:
ReadChangeStream(streamPartition = ByteStringRange(A, B)) receives:
CloseStream(
Status = OUT_OF_RANGE,
NewPartitions = List(ByteStringRange(A, C)),
ChangeStreamContinuationTokens = List(foo)
)
ReadChangeStream(streamPartition = ByteStringRange(B, C)) receives:
CloseStream(
Status = OUT_OF_RANGE,
NewPartitions = List(ByteStringRange(A, C)),
ChangeStreamContinuationTokens = List(bar)
)
Para retomar o fluxo da partição [A, C)
do mesmo ponto, envie uma ReadChangeStreamQuery
como esta:
ReadChangeStreamQuery query =
ReadChangeStreamQuery.create("myTable")
.streamPartition(ByteStringRange(A, C))
.continuationTokens(List.of(foo, bar));