Leggi una modifica in tempo reale con Java

La libreria client di Cloud Bigtable per Java fornisce metodi di basso livello per l'elaborazione dei record delle modifiche ai dati. Tuttavia, nella maggior parte dei casi, consigliamo di trasmettere le modifiche in modalità flusso con Dataflow anziché utilizzare i metodi descritti in questa pagina, poiché Dataflow gestisce automaticamente le divisioni e le unioni di partizione.

Prima di iniziare

Prima di leggere una modifica in tempo reale con Java, assicurati di conoscere la panoramica delle modifiche in tempo reale. Completa poi i seguenti prerequisiti.

Configura l'autenticazione

Per utilizzare gli Java esempi di questa pagina in un ambiente di sviluppo locale, installa e inizializza gcloud CLI, quindi configura Credenziali predefinite dell'applicazione con le tue credenziali utente.

  1. Installa Google Cloud CLI.
  2. Per initialize gcloud CLI, esegui questo comando:

    gcloud init
  3. Crea credenziali di autenticazione locali per il tuo Account Google:

    gcloud auth application-default login

Per ulteriori informazioni, consulta Set up authentication for a local development environment.

Per informazioni sulla configurazione dell'autenticazione per un ambiente di produzione, consulta Set up Application Default Credentials for code running on Google Cloud.

Attivare un flusso di modifiche

Devi abilitare un flusso di modifiche su una tabella prima di poterla leggere. Puoi anche creare una nuova tabella con un flusso di modifiche abilitato.

Ruoli obbligatori

Per ottenere le autorizzazioni necessarie per leggere un flusso di modifiche Bigtable, chiedi all'amministratore di concederti il ruolo IAM seguente.

  • Amministratore Bigtable (roles/bigtable.admin) sull'istanza Bigtable che contiene la tabella da cui prevedi di trasmettere le modifiche

Aggiungi la libreria client Java come dipendenza

Aggiungi un codice simile al seguente al file Maven pom.xml. Sostituisci VERSION con la versione della libreria client che stai utilizzando. La versione deve essere la 2.21.0 o successiva.

<dependencies>
  <dependency>
    <groupId>com.google.cloud</groupId>
    <artifactId>google-cloud-bigtable</artifactId>
    <version>VERSION</version>
  </dependency>
</dependencies>

Determinare le partizioni della tabella

Per iniziare a effettuare richieste ReadChangeStream, devi conoscere le partizioni della tabella. Questo può essere determinato utilizzando il metodo GenerateInitialChangeStreamPartitions. L'esempio seguente mostra come utilizzare questo metodo per ottenere un flusso di ByteStringRanges che rappresenti ogni partizione della tabella. Ogni ByteStringRange contiene le chiavi di inizio e di fine di una partizione.

ServerStream<ByteStringRange> partitionStream =
    client.generateInitialChangeStreamPartitions("MyTable");

Elabora le modifiche per ogni partizione

Puoi quindi elaborare le modifiche per ogni partizione utilizzando il metodo ReadChangeStream. Questo è un esempio di come aprire un flusso per una partizione, a partire dall'ora attuale.

ReadChangeStreamQuery query =
    ReadChangeStreamQuery.create("MyTable")
        .streamPartition(partition)
        .startTime(Instant.now());
ServerStream<ChangeStreamRecord> changeStream = client.readChangeStream(query);

ReadChangeStreamQuery accetta i seguenti argomenti:

  • Partizione flusso (obbligatoria): la partizione da cui trasmettere il flusso cambia
  • Il valore sarà uno dei seguenti:
    • Ora di inizio: data di commit da cui iniziare l'elaborazione delle modifiche
    • Token di continuazione: token che rappresentano una posizione da cui riprendere la trasmissione del flusso
  • Ora di fine (facoltativo): esegui il commit del timestamp per interrompere l'elaborazione delle modifiche una volta raggiunta. Se non specifichi alcun valore, lo stream continua a leggere.
  • Durata dell'heartbeat (facoltativo): frequenza dei messaggi heartbeat quando non ci sono nuove modifiche (il valore predefinito è cinque secondi)

Formato di registrazione delle modifiche in tempo reale

Un record di modifiche in tempo reale restituito è uno di questi tre tipi di risposta:

  • ChangeStreamMutation: un messaggio che rappresenta un record di modifica dei dati.

  • CloseStream - Un messaggio che indica che il client deve interrompere la lettura dal flusso.

    • Stato: indica il motivo per cui è stato chiuso lo stream. Uno dei seguenti:
      • OK: è stata raggiunta l'ora di fine per la partizione specificata
      • OUT_OF_RANGE: la partizione specificata non esiste più, il che significa che si sono verificate suddivisioni o unioni in questa partizione. Sarà necessario creare una nuova richiesta ReadChangeStream per ogni nuova partizione.
    • NewPartitions - Fornisce le informazioni di partizionamento aggiornate sulle risposte di OUT_OF_RANGE.
    • ChangeStreamContinuationTokens: elenco dei token utilizzati per riprendere le nuove richieste ReadChangeStream dalla stessa posizione. Uno per NewPartition.
  • Heartbeat - Un messaggio periodico con informazioni che possono essere utilizzate per verificare lo stato del flusso.

    • EstimatedLowWatermark: stima del livello di filigrana basso per la partizione specificata
    • ContinuationToken: token per riprendere il flusso di dati della partizione specificata dalla posizione attuale.

Contenuti dei record di modifica dei dati

Per informazioni sui record delle modifiche in tempo reale, consulta Contenuto di un record delle modifiche dei dati.

Gestire le modifiche nelle partizioni

Quando le partizioni di una tabella cambiano, le richieste ReadChangeStream restituiscono un messaggio CloseStream con le informazioni necessarie per riprendere la trasmissione del flusso dalle nuove partizioni.

Per una suddivisione, questo conterrà più nuove partizioni e un valore ContinuationToken corrispondente per ogni partizione. Per riprendere il flusso di dati per le nuove partizioni dalla stessa posizione, devi creare una nuova richiesta ReadChangeStream per ogni nuova partizione con il token corrispondente.

Ad esempio, se stai trasmettendo in streaming la partizione [A,C) e questa viene suddivisa in due partizioni, [A,B) e [B,C), puoi aspettarti la seguente sequenza di eventi:

ReadChangeStream(streamPartition = ByteStringRange(A, C)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, B), ByteStringRange(B, C))
    ChangeStreamContinuationTokens = List(foo, bar)
)

Per riprendere il flusso di dati di ogni partizione dalla stessa posizione, invii le seguenti richieste ReadChangeStreamQuery:

ReadChangeStreamQuery queryAB =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(A, B))
        .continuationTokens(List.of(foo));

ReadChangeStreamQuery queryBC =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(B, C))
        .continuationTokens(List.of(bar));

Per riprendere un'unione dalla stessa partizione, devi inviare una nuova richiesta ReadChangeStream contenente ogni token delle partizioni unite.

Ad esempio, se esegui il flusso di due partizioni, [A,B) e [B,C), e queste vengono unite nella partizione [A,C), potresti aspettarti la seguente sequenza di eventi:

ReadChangeStream(streamPartition = ByteStringRange(A, B)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, C)),
    ChangeStreamContinuationTokens = List(foo)
)

ReadChangeStream(streamPartition = ByteStringRange(B, C)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, C)),
    ChangeStreamContinuationTokens = List(bar)
)

Per riprendere la partizione di streaming [A, C) dalla stessa posizione, invia un ReadChangeStreamQuery come segue:

ReadChangeStreamQuery query =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(A, C))
        .continuationTokens(List.of(foo, bar));

Passaggi successivi