Leggere un flusso di modifiche con Java
La libreria client di Cloud Bigtable Java offre metodi di basso livello per l'elaborazione dei record delle modifiche dei dati. Tuttavia, nella maggior parte dei casi casi, ti consigliamo di eseguire il flusso di modifiche con Dataflow anziché utilizzare i metodi descritti in questa pagina, perché Dataflow gestisce suddivisioni della partizione e unioni per te.
Prima di iniziare
Prima di leggere un flusso di modifiche con Java, assicurati di acquisire familiarità con le Panoramica degli stream di modifiche. Quindi completa i seguenti prerequisiti.
Configura l'autenticazione
Per utilizzare gli Java esempi in questa pagina in una località dell'ambiente di sviluppo, installare e inizializzare gcloud CLI quindi configura Credenziali predefinite dell'applicazione con le tue credenziali utente.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
If you're using a local shell, then create local authentication credentials for your user account:
gcloud auth application-default login
You don't need to do this if you're using Cloud Shell.
Per ulteriori informazioni, consulta Set up authentication for a local development environment.
Per informazioni sulla configurazione dell'autenticazione per un ambiente vedi l'ambiente Set up Application Default Credentials for code running on Google Cloud.
Attivare un flusso di modifiche
Devi attivare una modifica streaming su una tabella prima di poterla leggere. Puoi anche creare una nuova tavola con un flusso di modifiche abilitato.
Ruoli obbligatori
Per ottenere le autorizzazioni necessarie per leggere una modifica di Bigtable chiedi all'amministratore di concederti il seguente ruolo IAM.
- Amministratore Bigtable
(
roles/bigtable.admin
) nell'istanza Bigtable che contiene la tabella che prevedi modifiche al flusso da
Aggiungi la libreria client Java come dipendenza
Aggiungi al file Maven pom.xml
un codice simile al seguente. Sostituisci
VERSION
con la versione della libreria client che utilizzi
che utilizzano. La versione deve essere la 2.21.0 o successiva.
<dependencies>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bigtable</artifactId>
<version>VERSION</version>
</dependency>
</dependencies>
definisci le partizioni della tabella
Per iniziare a effettuare richieste ReadChangeStream
, devi conoscere le partizioni di
della tabella. Questo può essere determinato utilizzando
GenerateInitialChangeStreamPartitions
. L'esempio seguente mostra come
utilizzare questo metodo per ottenere un flusso di
ByteStringRanges
che rappresentano ciascuna partizione della tabella. Ciascun ByteStringRange
contiene
le chiavi di inizio e di fine di una partizione.
ServerStream<ByteStringRange> partitionStream =
client.generateInitialChangeStreamPartitions("MyTable");
Elabora le modifiche per ogni partizione
Puoi quindi elaborare le modifiche per ogni partizione utilizzando ReadChangeStream
. Questo è un esempio di come aprire un flusso per una partizione,
dall'ora attuale.
ReadChangeStreamQuery query =
ReadChangeStreamQuery.create("MyTable")
.streamPartition(partition)
.startTime(Instant.now());
ServerStream<ChangeStreamRecord> changeStream = client.readChangeStream(query);
ReadChangeStreamQuery
accetta i seguenti argomenti:
- Partizione stream (obbligatoria): la partizione da cui trasferire le modifiche del flusso
- Uno dei seguenti valori:
- Ora di inizio: esegui il commit del timestamp per avviare l'elaborazione delle modifiche da
- Token di continuazione: token che rappresentano una posizione da riprendere trasmettendo in streaming da
- (Facoltativo) Ora di fine: esegui il commit del timestamp per interrompere l'elaborazione delle modifiche quando raggiunto. Se non fornisci un valore, lo stream continua a leggere.
- (Facoltativo) Durata heartbeat - La frequenza dei messaggi heartbeat quando non ci sono nuove modifiche (il valore predefinito è cinque secondi)
Formato di record in modalità di modifica
Un record di modifiche in tempo reale restituito è uno dei tre tipi di risposta:
ChangeStreamMutation
: un messaggio che rappresenta un record delle modifiche dei dati.CloseStream
- Un messaggio che indica che il client deve interrompere la lettura dallo stream.- Stato. Indica il motivo per la chiusura dello stream. Uno dei seguenti:
OK
: è stata raggiunta l'ora di fine per la partizione specificataOUT_OF_RANGE
: la partizione specificata non esiste più. il che significa che su questa partizione sono state eseguite suddivisioni o unioni. Un nuovo Sarà necessario creareReadChangeStream
richiesta per ogni nuova partizione.
NewPartitions
- Fornisce le informazioni di partizionamento aggiornate suOUT_OF_RANGE
risposte.ChangeStreamContinuationTokens
- Elenco dei token utilizzati per riprendere nuove richiesteReadChangeStream
dalla stessa posizione. Uno ogniNewPartition
.
- Stato. Indica il motivo per la chiusura dello stream. Uno dei seguenti:
Heartbeat
: un messaggio periodico con informazioni che possono essere utilizzate per un checkpoint dello stato del flusso.EstimatedLowWatermark
- Stima della filigrana bassa per partizione specificaContinuationToken
- Token per riprendere il flusso di dati specificati dalla posizione corrente.
Contenuti dei record delle modifiche ai dati
Per informazioni sui record di modifiche in tempo reale, consulta Che cosa c'è in una modifica ai dati registrazione.
Gestire le modifiche nelle partizioni
Quando le partizioni di una tabella cambiano, ReadChangeStream
richiede
restituisce un messaggio CloseStream
con le informazioni necessarie per riprendere il flusso di dati
dalle nuove partizioni.
Per una split, conterrà più nuove partizioni e un
ContinuationToken
per ogni partizione. Per riprendere il flusso di dati delle nuove partizioni
dalla stessa posizione, effettui una nuova richiesta ReadChangeStream
per ogni nuova partizione con il token corrispondente.
Ad esempio, se stai trasmettendo in streaming la partizione [A,C)
, che si divide in due
partizioni, [A,B)
e [B,C)
, puoi aspettarti la seguente sequenza
eventi:
ReadChangeStream(streamPartition = ByteStringRange(A, C)) receives:
CloseStream(
Status = OUT_OF_RANGE,
NewPartitions = List(ByteStringRange(A, B), ByteStringRange(B, C))
ChangeStreamContinuationTokens = List(foo, bar)
)
Per riprendere il flusso di dati di ogni partizione dalla stessa posizione, invii
ReadChangeStreamQuery
richieste seguenti:
ReadChangeStreamQuery queryAB =
ReadChangeStreamQuery.create("myTable")
.streamPartition(ByteStringRange(A, B))
.continuationTokens(List.of(foo));
ReadChangeStreamQuery queryBC =
ReadChangeStreamQuery.create("myTable")
.streamPartition(ByteStringRange(B, C))
.continuationTokens(List.of(bar));
Per un'unione, per riprendere dalla stessa partizione, devi inviare un nuovo
Richiesta ReadChangeStream
contenente ogni token delle partizioni unite.
Ad esempio, se stai trasmettendo in streaming due partizioni, [A,B)
e [B,C)
, che
vengono uniti nella partizione [A,C)
, la sequenza di eventi sarà la seguente:
ReadChangeStream(streamPartition = ByteStringRange(A, B)) receives:
CloseStream(
Status = OUT_OF_RANGE,
NewPartitions = List(ByteStringRange(A, C)),
ChangeStreamContinuationTokens = List(foo)
)
ReadChangeStream(streamPartition = ByteStringRange(B, C)) receives:
CloseStream(
Status = OUT_OF_RANGE,
NewPartitions = List(ByteStringRange(A, C)),
ChangeStreamContinuationTokens = List(bar)
)
Per riprendere la partizione del flusso di dati [A, C)
dalla stessa posizione, devi inviare un
ReadChangeStreamQuery
come il seguente:
ReadChangeStreamQuery query =
ReadChangeStreamQuery.create("myTable")
.streamPartition(ByteStringRange(A, C))
.continuationTokens(List.of(foo, bar));