Ler um fluxo de alterações com Java

A biblioteca de cliente do Cloud Bigtable para Java fornece métodos de nível inferior para processar registros de alteração de dados. No entanto, na maioria dos casos, recomendamos que você faça streaming de mudanças com o Dataflow em vez de usar os métodos descritos nesta página, porque o Dataflow processa partições e mesclas para você.

Antes de começar

Antes de ler um fluxo de alterações com Java, confira a Visão geral do fluxo de alterações. Em seguida, conclua os pré-requisitos a seguir.

Configurar a autenticação

Para usar os exemplos Java desta página em um ambiente de desenvolvimento local, instale e inicialize o gcloud CLI e e configure o Application Default Credentials com suas credenciais de usuário.

  1. Install the Google Cloud CLI.
  2. To initialize the gcloud CLI, run the following command:

    gcloud init
  3. If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

Confira mais informações em Set up authentication for a local development environment.

Para informações sobre como configurar a autenticação para um ambiente de produção, consulte Set up Application Default Credentials for code running on Google Cloud.

Ativar um fluxo de alterações

Você precisa ativar um fluxo de alterações em uma tabela antes de poder lê-lo. Também é possível criar uma nova tabela com um fluxo de alterações ativado.

Funções exigidas

Para receber as permissões necessárias para ler um fluxo de alterações do Bigtable, peça ao administrador para conceder a você o seguinte papel do IAM.

  • Administrador do Bigtable (roles/bigtable.admin) na instância do Bigtable que contém a tabela com as alterações que você pretende mostrar

Adicionar a biblioteca de cliente em Java como uma dependência

Adicione um código semelhante ao código a seguir no seu arquivo pom.xml do Maven. Substitua VERSION pela versão da biblioteca de cliente que você usa. A versão precisa ser 2.21.0 ou posterior.

<dependencies>
  <dependency>
    <groupId>com.google.cloud</groupId>
    <artifactId>google-cloud-bigtable</artifactId>
    <version>VERSION</version>
  </dependency>
</dependencies>

Determinar as partições da tabela

Para começar a fazer solicitações ReadChangeStream, você precisa conhecer as partições da tabela. Isso pode ser determinado com o método GenerateInitialChangeStreamPartitions. O exemplo a seguir mostra como usar esse método para receber um fluxo de ByteStringRanges que representa cada partição na tabela. Cada ByteStringRange contém as chaves de início e fim de uma partição.

ServerStream<ByteStringRange> partitionStream =
    client.generateInitialChangeStreamPartitions("MyTable");

Processar alterações de cada partição

Em seguida, processe as alterações de cada partição usando o método ReadChangeStream. Este é um exemplo de como abrir um fluxo para uma partição, a partir da hora atual.

ReadChangeStreamQuery query =
    ReadChangeStreamQuery.create("MyTable")
        .streamPartition(partition)
        .startTime(Instant.now());
ServerStream<ChangeStreamRecord> changeStream = client.readChangeStream(query);

ReadChangeStreamQuery aceita os seguintes argumentos:

  • Partição de fluxo (obrigatório): a partição que mostra o fluxo de alterações.
  • Uma das seguintes opções:
    • Horário de início: carimbo de data/hora de confirmação que começa a processar as alterações.
    • Tokens de continuação: tokens que representam um ponto de retomada do fluxo.
  • Horário de término (opcional): carimbo de data/hora de confirmação que para de processar as alterações quando atingido. Se você não fornecer um valor, a leitura do fluxo continuará.
  • Duração do sinal de funcionamento (opcional): frequência das mensagens de sinal de funcionamento quando não há novas alterações (o padrão é cinco segundos).

Formato do registro do fluxo de alterações

Um registro do fluxo de alterações retornado é um dos três tipos de resposta:

  • ChangeStreamMutation: uma mensagem que representa um registro de alteração de dados.

  • CloseStream: uma mensagem que indica que o cliente precisa parar a leitura do fluxo.

    • Status: indica o motivo para encerramento do fluxo. Uma destas:
      • OK: o horário de término da partição especificada foi atingido.
      • OUT_OF_RANGE: a partição especificada não existe mais. Isso significa que divisões ou mesclagens ocorreram nessa partição. Uma nova solicitação ReadChangeStream precisará ser criada para cada nova partição.
    • NewPartitions: contém as informações de particionamento atualizadas sobre respostas OUT_OF_RANGE.
    • ChangeStreamContinuationTokens: lista de tokens usados para retomar novas solicitações ReadChangeStream do mesmo ponto. Um por NewPartition.
  • Heartbeat: uma mensagem periódica com informações que podem ser usadas para verificar o estado do fluxo.

    • EstimatedLowWatermark: estimativa da marca-d'água baixa da partição especificada.
    • ContinuationToken: token para retomar o fluxo da partição especificada a partir do ponto atual.

Conteúdo do registro de alteração de dados

Para informações sobre registros de fluxo de alterações, consulte O que há em um registro de alteração de dados.

Processar alterações em partições

Quando as partições de uma tabela são alteradas, as solicitações ReadChangeStream retornam uma mensagem CloseStream com as informações necessárias para retomar o fluxo das partições novas.

Em uma divisão, ela conterá várias partições novas e um ContinuationToken correspondente para cada partição. Para retomar o fluxo das partições novas do mesmo ponto, faça uma nova solicitação ReadChangeStream para cada partição nova com o token correspondente.

Por exemplo, se você estiver mostrando o fluxo da partição [A,C) e ela for dividida em duas partições, [A,B) e [B,C), espere a seguinte sequência de eventos:

ReadChangeStream(streamPartition = ByteStringRange(A, C)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, B), ByteStringRange(B, C))
    ChangeStreamContinuationTokens = List(foo, bar)
)

Para retomar o fluxo de cada partição do mesmo ponto, envie as seguintes solicitações ReadChangeStreamQuery:

ReadChangeStreamQuery queryAB =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(A, B))
        .continuationTokens(List.of(foo));

ReadChangeStreamQuery queryBC =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(B, C))
        .continuationTokens(List.of(bar));

Em uma mesclagem, para retomar do mesmo ponto, é necessário enviar uma nova solicitação ReadChangeStream que contém cada token das partições mescladas.

Por exemplo, se você estiver mostrando o fluxo de duas partições, [A,B) e [B,C), e elas forem mescladas na partição [A,C), espere a seguinte sequência de eventos:

ReadChangeStream(streamPartition = ByteStringRange(A, B)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, C)),
    ChangeStreamContinuationTokens = List(foo)
)

ReadChangeStream(streamPartition = ByteStringRange(B, C)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, C)),
    ChangeStreamContinuationTokens = List(bar)
)

Para retomar o fluxo da partição [A, C) do mesmo ponto, envie uma ReadChangeStreamQuery como esta:

ReadChangeStreamQuery query =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(A, C))
        .continuationTokens(List.of(foo, bar));

A seguir