Leggere un flusso di modifiche con Java
La libreria client di Cloud Bigtable per Java offre metodi di basso livello per l'elaborazione dei record delle modifiche dei dati. Tuttavia, nella maggior parte dei casi, ti consigliamo di eseguire il flusso di modifiche con Dataflow anziché utilizzare i metodi descritti in questa pagina, poiché Dataflow gestisce automaticamente le suddivisioni e le unioni di partizione.
Prima di iniziare
Prima di leggere un flusso di modifiche con Java, assicurati di acquisire familiarità con la panoramica sulle modifiche in tempo reale. Quindi completa i seguenti prerequisiti.
Configura l'autenticazione
Per utilizzare gli Java esempi in questa pagina in un ambiente di sviluppo locale, installa e inizializza gcloud CLI, quindi configura le Credenziali predefinite dell'applicazione con le tue credenziali utente.
- Installa Google Cloud CLI.
-
Per initialize gcloud CLI, esegui questo comando:
gcloud init
-
Crea credenziali di autenticazione locali per il tuo Account Google:
gcloud auth application-default login
Per maggiori informazioni, consulta Set up authentication for a local development environment.
Per informazioni sulla configurazione dell'autenticazione per un ambiente di produzione, consulta Set up Application Default Credentials for code running on Google Cloud.
Attivare un flusso di modifiche
Devi abilitare un flusso di modifiche in una tabella per poterla leggere. Puoi anche creare una nuova tabella con un flusso di modifiche abilitato.
Ruoli obbligatori
Per ottenere le autorizzazioni necessarie per leggere un flusso di modifiche Bigtable, chiedi all'amministratore di concederti il ruolo IAM seguente.
- Amministratore Bigtable (
roles/bigtable.admin
) nell'istanza Bigtable che contiene la tabella da cui prevedi di trasferire le modifiche in modalità flusso
Aggiungi la libreria client Java come dipendenza
Aggiungi al file Maven pom.xml
un codice simile al seguente. Sostituisci VERSION
con la versione della libreria client che stai utilizzando. La versione deve essere la 2.21.0 o successiva.
<dependencies>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bigtable</artifactId>
<version>VERSION</version>
</dependency>
</dependencies>
definisci le partizioni della tabella
Per iniziare a effettuare richieste ReadChangeStream
, devi conoscere le partizioni
della tabella. Questo può essere determinato utilizzando il
metodo GenerateInitialChangeStreamPartitions
. L'esempio seguente mostra come utilizzare questo metodo per ottenere un flusso di ByteStringRanges
che rappresenta ogni partizione della tabella. Ogni ByteStringRange
contiene la chiave di inizio e di fine di una partizione.
ServerStream<ByteStringRange> partitionStream =
client.generateInitialChangeStreamPartitions("MyTable");
Elabora le modifiche per ogni partizione
Puoi quindi elaborare le modifiche per ogni partizione utilizzando il metodo ReadChangeStream
. Questo è un esempio di come aprire un flusso per una partizione, a partire dall'ora attuale.
ReadChangeStreamQuery query =
ReadChangeStreamQuery.create("MyTable")
.streamPartition(partition)
.startTime(Instant.now());
ServerStream<ChangeStreamRecord> changeStream = client.readChangeStream(query);
ReadChangeStreamQuery
accetta i seguenti argomenti:
- Partizione stream (obbligatoria): la partizione da cui trasferire le modifiche del flusso
- Uno dei seguenti valori:
- Ora di inizio: esegui il commit del timestamp per avviare l'elaborazione delle modifiche da
- Token di continuazione: token che rappresentano una posizione da cui riprendere il flusso di dati
- (Facoltativo) Ora di fine - Quando viene raggiunta, esegui il commit del timestamp per interrompere l'elaborazione delle modifiche. Se non fornisci un valore, lo stream continua a leggere.
- Durata heartbeat (facoltativo) - Frequenza dei messaggi heartbeat in assenza di nuove modifiche (il valore predefinito è cinque secondi)
Formato record di modifiche in tempo reale
Un record di modifiche in tempo reale restituito è uno dei tre tipi di risposta:
ChangeStreamMutation
: un messaggio che rappresenta un record delle modifiche dei dati.CloseStream
: un messaggio che indica che il client deve interrompere la lettura dallo stream.- Stato. Indica il motivo per la chiusura dello stream. Uno dei seguenti:
OK
: è stata raggiunta l'ora di fine per la partizione specificataOUT_OF_RANGE
- la partizione specificata non esiste più, il che significa che si sono verificate suddivisioni o unioni in questa partizione. Sarà necessario creare una nuova richiestaReadChangeStream
per ogni nuova partizione.
NewPartitions
- Fornisce le informazioni di partizionamento aggiornate per le risposte diOUT_OF_RANGE
.ChangeStreamContinuationTokens
- Elenco di token utilizzati per riprendere nuove richiesteReadChangeStream
dalla stessa posizione. Uno ogniNewPartition
.
- Stato. Indica il motivo per la chiusura dello stream. Uno dei seguenti:
Heartbeat
: un messaggio periodico con informazioni che possono essere utilizzate per controllare lo stato del flusso.EstimatedLowWatermark
- Stima della filigrana bassa per la partizione dataContinuationToken
- Token per riprendere il flusso di dati della partizione specificata dalla posizione attuale.
Contenuti dei record delle modifiche ai dati
Per informazioni sui record di modifiche in tempo reale, consulta Contenuto di un record delle modifiche ai dati.
Gestire le modifiche nelle partizioni
Quando le partizioni di una tabella cambiano, ReadChangeStream
richiede la restituzione di un messaggio CloseStream
con le informazioni necessarie per riprendere il flusso di dati dalle nuove partizioni.
Per una partizione, conterrà più nuove partizioni e un
ContinuationToken
corrispondente per ogni partizione. Per riprendere il flusso di dati delle nuove partizioni
dalla stessa posizione, devi creare una nuova richiesta ReadChangeStream
per ogni nuova partizione con il token corrispondente.
Ad esempio, se stai trasmettendo in streaming la partizione [A,C)
che viene suddivisa in due partizioni, [A,B)
e [B,C)
, puoi aspettarti la seguente sequenza di eventi:
ReadChangeStream(streamPartition = ByteStringRange(A, C)) receives:
CloseStream(
Status = OUT_OF_RANGE,
NewPartitions = List(ByteStringRange(A, B), ByteStringRange(B, C))
ChangeStreamContinuationTokens = List(foo, bar)
)
Per riprendere il flusso di dati di ogni partizione dalla stessa posizione, invierai le seguenti richieste ReadChangeStreamQuery
:
ReadChangeStreamQuery queryAB =
ReadChangeStreamQuery.create("myTable")
.streamPartition(ByteStringRange(A, B))
.continuationTokens(List.of(foo));
ReadChangeStreamQuery queryBC =
ReadChangeStreamQuery.create("myTable")
.streamPartition(ByteStringRange(B, C))
.continuationTokens(List.of(bar));
Per un'unione, per riprendere dalla stessa partizione, devi inviare una nuova richiesta ReadChangeStream
contenente ogni token delle partizioni unite.
Ad esempio, se stai trasmettendo in modalità flusso due partizioni, [A,B)
e [B,C)
, che
vengono uniti nella partizione [A,C)
, puoi aspettarti la seguente sequenza di eventi:
ReadChangeStream(streamPartition = ByteStringRange(A, B)) receives:
CloseStream(
Status = OUT_OF_RANGE,
NewPartitions = List(ByteStringRange(A, C)),
ChangeStreamContinuationTokens = List(foo)
)
ReadChangeStream(streamPartition = ByteStringRange(B, C)) receives:
CloseStream(
Status = OUT_OF_RANGE,
NewPartitions = List(ByteStringRange(A, C)),
ChangeStreamContinuationTokens = List(bar)
)
Per riprendere la partizione del flusso di dati [A, C)
dalla stessa posizione, devi inviare un ReadChangeStreamQuery
come segue:
ReadChangeStreamQuery query =
ReadChangeStreamQuery.create("myTable")
.streamPartition(ByteStringRange(A, C))
.continuationTokens(List.of(foo, bar));