Ler um fluxo de alterações com Java

A biblioteca de cliente do Cloud Bigtable para Java oferece métodos de baixo nível para o processamento de registros de alteração de dados. No entanto, na maioria dos casos, recomendamos que você faça streaming das alterações com o Dataflow em vez de usar os métodos descritos nesta página, porque o Dataflow processa divisões e mesclagens da partição para você.

Antes de começar

Antes de ler um fluxo de alterações com Java, familiarize-se com a Visão geral dos fluxos de alterações. Em seguida, conclua os seguintes pré-requisitos.

Configurar a autenticação

Para usar as amostras de Java nesta página de um ambiente de desenvolvimento local, instale e inicialize a CLI gcloud e, em seguida, configure o Application Default Credentials com as credenciais de usuário.

  1. Instale a CLI do Google Cloud.
  2. Para inicializar a CLI gcloud, execute o seguinte comando:

    gcloud init
  3. Crie as credenciais de autenticação para sua Conta do Google:

    gcloud auth application-default login

Veja mais informações em: Configurar a autenticação para um ambiente de desenvolvimento local.

Para informações sobre como configurar a autenticação para um ambiente de produção, consulte Configure o Application Default Credentials para o código em execução no Google Cloud.

Ativar um fluxo de alterações

Para ler uma tabela, é preciso ativar um fluxo de alterações. Também é possível criar uma nova tabela com um fluxo de alterações ativado.

Funções exigidas

Para receber as permissões necessárias para ler um fluxo de alterações do Bigtable, peça ao administrador para conceder a você o seguinte papel do IAM.

  • Administrador do Bigtable (roles/bigtable.admin) na instância do Bigtable que contém a tabela com as alterações que você pretende mostrar

Adicionar a biblioteca de cliente em Java como uma dependência

Adicione um código semelhante ao código a seguir no seu arquivo pom.xml do Maven. Substitua VERSION pela versão da biblioteca de cliente que você usa. A versão precisa ser 2.21.0 ou posterior.

<dependencies>
  <dependency>
    <groupId>com.google.cloud</groupId>
    <artifactId>google-cloud-bigtable</artifactId>
    <version>VERSION</version>
  </dependency>
</dependencies>

Determinar as partições da tabela

Para começar a fazer solicitações ReadChangeStream, você precisa conhecer as partições da tabela. Isso pode ser determinado com o método GenerateInitialChangeStreamPartitions. O exemplo a seguir mostra como usar esse método para ter um stream de ByteStringRanges que representa cada partição na tabela. Cada ByteStringRange contém as chaves de início e fim de uma partição.

ServerStream<ByteStringRange> partitionStream =
    client.generateInitialChangeStreamPartitions("MyTable");

Processar alterações de cada partição

Em seguida, processe as alterações de cada partição usando o método ReadChangeStream. Este é um exemplo de como abrir um fluxo para uma partição, a partir da hora atual.

ReadChangeStreamQuery query =
    ReadChangeStreamQuery.create("MyTable")
        .streamPartition(partition)
        .startTime(Instant.now());
ServerStream<ChangeStreamRecord> changeStream = client.readChangeStream(query);

ReadChangeStreamQuery aceita os seguintes argumentos:

  • Partição de fluxo (obrigatório): a partição que mostra o fluxo de alterações.
  • Uma das seguintes opções:
    • Horário de início: carimbo de data/hora de confirmação que começa a processar as alterações.
    • Tokens de continuação: tokens que representam um ponto de retomada do fluxo.
  • Horário de término (opcional): carimbo de data/hora de confirmação que para de processar as alterações quando atingido. Se você não fornecer um valor, a leitura do fluxo continuará.
  • Duração do sinal de funcionamento (opcional): frequência das mensagens de sinal de funcionamento quando não há novas alterações (o padrão é cinco segundos).

Formato do registro do fluxo de alterações

Um registro do fluxo de alterações retornado é um dos três tipos de resposta:

  • ChangeStreamMutation: uma mensagem que representa um registro de alteração de dados.

  • CloseStream: uma mensagem que indica que o cliente precisa parar a leitura do fluxo.

    • Status: indica o motivo para encerramento do fluxo. Uma destas:
      • OK: o horário de término da partição especificada foi atingido.
      • OUT_OF_RANGE: a partição especificada não existe mais. Isso significa que divisões ou mesclagens ocorreram nessa partição. Uma nova solicitação ReadChangeStream precisará ser criada para cada nova partição.
    • NewPartitions: contém as informações de particionamento atualizadas sobre respostas OUT_OF_RANGE.
    • ChangeStreamContinuationTokens: lista de tokens usados para retomar novas solicitações ReadChangeStream do mesmo ponto. Um por NewPartition.
  • Heartbeat: uma mensagem periódica com informações que podem ser usadas para verificar o estado do fluxo.

    • EstimatedLowWatermark: estimativa da marca-d'água baixa da partição especificada.
    • ContinuationToken: token para retomar o fluxo da partição especificada a partir do ponto atual.

Conteúdo do registro de alteração de dados

Para informações sobre registros de fluxo de alterações, consulte O que é um registro de alteração de dados.

Processar alterações em partições

Quando as partições de uma tabela são alteradas, as solicitações ReadChangeStream retornam uma mensagem CloseStream com as informações necessárias para retomar o fluxo das partições novas.

Em uma divisão, ela conterá várias partições novas e um ContinuationToken correspondente para cada partição. Para retomar o fluxo das partições novas do mesmo ponto, faça uma nova solicitação ReadChangeStream para cada partição nova com o token correspondente.

Por exemplo, se você estiver mostrando o fluxo da partição [A,C) e ela for dividida em duas partições, [A,B) e [B,C), espere a seguinte sequência de eventos:

ReadChangeStream(streamPartition = ByteStringRange(A, C)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, B), ByteStringRange(B, C))
    ChangeStreamContinuationTokens = List(foo, bar)
)

Para retomar o fluxo de cada partição do mesmo ponto, envie as seguintes solicitações ReadChangeStreamQuery:

ReadChangeStreamQuery queryAB =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(A, B))
        .continuationTokens(List.of(foo));

ReadChangeStreamQuery queryBC =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(B, C))
        .continuationTokens(List.of(bar));

Em uma mesclagem, para retomar do mesmo ponto, é necessário enviar uma nova solicitação ReadChangeStream que contém cada token das partições mescladas.

Por exemplo, se você estiver mostrando o fluxo de duas partições, [A,B) e [B,C), e elas forem mescladas na partição [A,C), espere a seguinte sequência de eventos:

ReadChangeStream(streamPartition = ByteStringRange(A, B)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, C)),
    ChangeStreamContinuationTokens = List(foo)
)

ReadChangeStream(streamPartition = ByteStringRange(B, C)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, C)),
    ChangeStreamContinuationTokens = List(bar)
)

Para retomar o fluxo da partição [A, C) do mesmo ponto, envie uma ReadChangeStreamQuery como esta:

ReadChangeStreamQuery query =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(A, C))
        .continuationTokens(List.of(foo, bar));

A seguir