Leggi un flusso di modifiche con Java

La libreria client di Cloud Bigtable per Java fornisce metodi di basso livello per elaborare i record di modifica dei dati. Prima di leggere questo documento, consulta la panoramica dei flussi di modifiche.

Prima di iniziare

Abilita un flusso di modifiche

Devi attivare un flusso di modifiche in una tabella prima di poterlo leggere. Puoi anche creare una nuova tabella con un flusso di modifiche abilitato.

Ruoli obbligatori

Per ottenere le autorizzazioni necessarie per leggere un flusso di modifiche Bigtable, chiedi all'amministratore di concederti il seguente ruolo IAM.

  • Amministratore Bigtable (roles/bigtable.admin) sull'istanza Bigtable che contiene la tabella da cui vuoi trasmettere modifiche in streaming

Aggiungi la libreria client Java come dipendenza

Aggiungi codice simile al seguente al tuo file Maven pom.xml. Sostituisci VERSION con la versione della libreria client che stai utilizzando. La versione deve essere 2.21.0 o successiva.

<dependencies>
  <dependency>
    <groupId>com.google.cloud</groupId>
    <artifactId>google-cloud-bigtable</artifactId>
    <version>VERSION</version>
  </dependency>
</dependencies>

Determina le partizioni della tabella

Per iniziare a effettuare richieste ReadChangeStream, devi conoscere le partizioni della tabella. Questo può essere determinato utilizzando il metodo GenerateInitialChangeStreamPartitions. L'esempio seguente mostra come utilizzare questo metodo per ottenere un flusso di ByteStringRanges che rappresenta ciascuna partizione della tabella. Ogni ByteStringRange contiene la chiave di inizio e fine per una partizione.

ServerStream<ByteStringRange> partitionStream =
    client.generateInitialChangeStreamPartitions("MyTable");

Elabora modifiche per ogni partizione

Puoi quindi elaborare le modifiche per ogni partizione utilizzando il metodo ReadChangeStream. Questo è un esempio di come aprire un flusso per una partizione, a partire dall'ora attuale.

ReadChangeStreamQuery query =
    ReadChangeStreamQuery.create("MyTable")
        .streamPartition(partition)
        .startTime(Instant.now());
ServerStream<ChangeStreamRecord> changeStream = client.readChangeStream(query);

ReadChangeStreamQuery accetta i seguenti argomenti:

  • Partizione stream (obbligatoria): la partizione da cui trasmettere le modifiche in streaming
  • Uno dei seguenti valori:
    • Ora di inizio: esegui il commit del timestamp da cui iniziare l'elaborazione delle modifiche
    • Token di continuazione: i token che rappresentano una posizione da cui riprendere lo streaming
  • (Facoltativo) Ora di fine: esegui il commit del timestamp per interrompere l'elaborazione delle modifiche una volta raggiunto. Se non fornisci alcun valore, lo stream continuerà a leggere.
  • Durata del battito cardiaco (facoltativo): frequenza dei messaggi del battito cardiaco quando non vengono apportate nuove modifiche (il valore predefinito è di cinque secondi)

Modifica il formato del record dello stream

Un record del flusso di modifiche restituito restituito è uno dei tre tipi di risposta:

  • ChangeStreamMutation: un messaggio che rappresenta un record di modifica dei dati.

  • CloseStream: un messaggio che indica che il client deve interrompere la lettura dallo stream.

    • Stato: indica il motivo per la chiusura dello stream. Una delle seguenti opzioni:
      • OK: è stata raggiunta l'ora di fine per la partizione specificata
      • OUT_OF_RANGE: la partizione specificata non esiste più, il che significa che si sono verificate suddivisioni o unioni su questa partizione. È necessario creare una nuova richiesta ReadChangeStream per ogni nuova partizione.
    • NewPartitions: fornisce informazioni aggiornate sul partizionamento delle risposte OUT_OF_RANGE.
    • ChangeStreamContinuationTokens: elenco dei token utilizzati per riprendere nuove richieste ReadChangeStream dalla stessa posizione. Uno per NewPartition.
  • Heartbeat: un messaggio periodico con informazioni che può essere utilizzato per controllare lo stato del flusso.

    • EstimatedLowWatermark: stima della filigrana bassa per la partizione data
    • ContinuationToken: token per riprendere il flusso della partizione specificata dalla posizione corrente.

Contenuti del record di modifica dei dati

Ogni record di modifica dei dati contiene quanto segue:

  • Voci: le modifiche apportate alla riga, tra cui una o più delle seguenti condizioni:
    • Scrivi
      • Famiglia di colonne
      • Qualificatore colonna
      • Timestamp
      • Valore
    • Eliminazione delle celle
      • Famiglia di colonne
      • Qualificatore colonna
      • Intervallo timestamp
    • Eliminazione di una famiglia di colonne
      • Famiglia di colonne
      • Eliminazione da una riga: l'eliminazione da una riga viene convertita in un elenco di eliminazioni da famiglie di colonne per ogni famiglia di colonne in cui la riga contiene dati.
  • Chiave di riga: l'identificatore della riga modificata.
  • Tipo di modifica: avviato dall'utente o garbage collection
  • ID del cluster che ha ricevuto la modifica
  • Timestamp di commit: tempo lato server quando la modifica è stata eseguita per la tabella
  • Tie-break: un valore che consente all'applicazione che sta leggendo lo stream di utilizzare i criteri integrati per la risoluzione dei conflitti di Bigtable
  • Token: utilizzato dall'applicazione che lo utilizza per riprendere lo streaming in caso di interruzione
  • Filigrana bassa stimata, il tempo stimato da quando la partizione del record è stata ripristinata con la replica in tutti i cluster. Per maggiori dettagli, consulta Partizioni e Filigrane.

Per ulteriori dettagli sui campi in un record di modifica dei dati, consulta il riferimento API per ReadChangeStream.

Gestire le modifiche alle partizioni

Quando le partizioni di una tabella cambiano, le richieste ReadChangeStream restituiscono un messaggio CloseStream con le informazioni necessarie per riprendere lo streaming dalle nuove partizioni.

Per uno split, questo conterrà più nuove partizioni e un ContinuationToken corrispondente per ogni partizione. Per riprendere lo streaming delle nuove partizioni dalla stessa posizione, devi effettuare una nuova richiesta ReadChangeStream per ogni nuova partizione con il token corrispondente.

Ad esempio, se esegui il flusso della partizione [A,C) e questa si divide in due partizioni, [A,B) e [B,C), puoi aspettarti la seguente sequenza di eventi:

ReadChangeStream(streamPartition = ByteStringRange(A, C)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, B), ByteStringRange(B, C))
    ChangeStreamContinuationTokens = List(foo, bar)
)

Per riprendere il flusso di ciascuna partizione dalla stessa posizione, invii le seguenti richieste ReadChangeStreamQuery:

ReadChangeStreamQuery queryAB =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(A, B))
        .continuationTokens(List.of(foo));

ReadChangeStreamQuery queryBC =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(B, C))
        .continuationTokens(List.of(bar));

Per un'unione, per riprendere dalla stessa partizione, devi inviare una nuova richiesta ReadChangeStream contenente ogni token dalle partizioni unite.

Ad esempio, se stai trasmettendo in streaming due partizioni, [A,B) e [B,C), e queste si uniscono nella partizione [A,C), puoi aspettarti la seguente sequenza di eventi:

ReadChangeStream(streamPartition = ByteStringRange(A, B)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, C)),
    ChangeStreamContinuationTokens = List(foo)
)

ReadChangeStream(streamPartition = ByteStringRange(B, C)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, C)),
    ChangeStreamContinuationTokens = List(bar)
)

Per riprendere la partizione di flussi di [A, C) dalla stessa posizione, invia un ReadChangeStreamQuery come questo:

ReadChangeStreamQuery query =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(A, C))
        .continuationTokens(List.of(foo, bar));

Passaggi successivi