Leggere un flusso di modifiche con Java
La libreria client di Cloud Bigtable per Java fornisce metodi di basso livello per l'elaborazione dei record delle modifiche dei dati. Tuttavia, nella maggior parte dei casi, ti consigliamo di trasmettere in streaming le modifiche con Dataflow anziché utilizzare i metodi descritti in questa pagina, perché Dataflow gestisce le suddivisioni e le unioni delle partizioni per te.
Prima di iniziare
Prima di leggere uno stream di modifiche con Java, assicurati di conoscere la Panoramica degli stream di modifiche. Poi completa i seguenti prerequisiti.
Configura l'autenticazione
Per utilizzare gli esempi di Java questa pagina in un ambiente di sviluppo locale, installa e inizializza gcloud CLI, quindi configura le Credenziali predefinite dell'applicazione con le tue credenziali utente.
Installa Google Cloud CLI.
Se utilizzi un provider di identità (IdP) esterno, devi prima accedere alla gcloud CLI con la tua identità federata.
If you're using a local shell, then create local authentication credentials for your user account:
gcloud auth application-default login
You don't need to do this if you're using Cloud Shell.
If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.
Per ulteriori informazioni, vedi Set up authentication for a local development environment.
Per informazioni sulla configurazione dell'autenticazione per un ambiente di produzione, vedi Set up Application Default Credentials for code running on Google Cloud.
Abilitare un flusso di modifiche
Prima di poter leggere una tabella, devi abilitare un flusso di modifiche. Puoi anche creare una nuova tabella con uno stream di modifiche abilitato.
Ruoli obbligatori
Per ottenere le autorizzazioni necessarie per leggere un flusso di modifiche Bigtable, chiedi all'amministratore di concederti il seguente ruolo IAM.
- Amministratore Bigtable
(
roles/bigtable.admin
) nell'istanza Bigtable che contiene la tabella da cui prevedi di trasmettere in streaming le modifiche
Aggiungere la libreria client Java come dipendenza
Aggiungi al file Maven pom.xml
un codice simile al seguente. Sostituisci
VERSION
con la versione della libreria client che stai
utilizzando. La versione deve essere 2.21.0 o successive.
<dependencies>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bigtable</artifactId>
<version>VERSION</version>
</dependency>
</dependencies>
Determinare le partizioni della tabella
Per iniziare a effettuare richieste ReadChangeStream
, devi conoscere le partizioni della
tua tabella. Questo può essere determinato utilizzando il
metodo GenerateInitialChangeStreamPartitions
. L'esempio seguente mostra come utilizzare questo metodo per ottenere un flusso di ByteStringRanges
che rappresenta ogni partizione della tabella. Ogni ByteStringRange
contiene la
chiave iniziale e finale di una partizione.
ServerStream<ByteStringRange> partitionStream =
client.generateInitialChangeStreamPartitions("MyTable");
Elabora modifiche per ogni partizione
Puoi quindi elaborare le modifiche per ogni partizione utilizzando il metodo ReadChangeStream
. Questo è un esempio di come aprire uno stream per una partizione, a partire
dall'ora corrente.
ReadChangeStreamQuery query =
ReadChangeStreamQuery.create("MyTable")
.streamPartition(partition)
.startTime(Instant.now());
ServerStream<ChangeStreamRecord> changeStream = client.readChangeStream(query);
ReadChangeStreamQuery
accetta i seguenti argomenti:
- Partizione di streaming (obbligatoria): la partizione da cui trasmettere in streaming le modifiche
- Uno dei seguenti:
- Ora di inizio: timestamp di commit da cui iniziare a elaborare le modifiche
- Token di continuazione: token che rappresentano una posizione da cui riprendere lo streaming
- Ora di fine (facoltativo): timestamp di commit per interrompere l'elaborazione delle modifiche quando raggiunto. Se non fornisci un valore, lo stream continua a leggere.
- Durata del battito cardiaco (facoltativo): frequenza dei messaggi del battito cardiaco quando non ci sono nuove modifiche (il valore predefinito è cinque secondi)
Modificare il formato di registrazione dello stream
Un record di flusso di modifiche restituito è uno dei tre tipi di risposta:
ChangeStreamMutation
: un messaggio che rappresenta un record di modifica dei dati.CloseStream
: un messaggio che indica che il client deve interrompere la lettura dal flusso.- Stato: indica il motivo della chiusura dello stream. Uno dei seguenti:
OK
- end time has been reached for the given partitionOUT_OF_RANGE
: la partizione specificata non esiste più, il che significa che sono state eseguite divisioni o unioni in questa partizione. Per ogni nuova partizione dovrà essere creata una nuova richiestaReadChangeStream
.
NewPartitions
: fornisce le informazioni di partizionamento aggiornate sulle risposteOUT_OF_RANGE
.ChangeStreamContinuationTokens
- Elenco dei token utilizzati per riprendere nuove richiesteReadChangeStream
dalla stessa posizione. Una perNewPartition
.
- Stato: indica il motivo della chiusura dello stream. Uno dei seguenti:
Heartbeat
: un messaggio periodico con informazioni che possono essere utilizzate per controllare lo stato dello stream.EstimatedLowWatermark
- Stima del minimo per la partizione specificataContinuationToken
: token per riprendere lo streaming della partizione specificata dalla posizione corrente.
Contenuti del record di modifica dei dati
Per informazioni sui record dello stream di modifiche, vedi Contenuto di un record di modifica dei dati.
Gestire le modifiche alle partizioni
Quando le partizioni di una tabella cambiano, le richieste ReadChangeStream
restituiscono un messaggio CloseStream
con le informazioni necessarie per riprendere lo streaming dalle nuove partizioni.
Per una divisione, questo campo conterrà più nuove partizioni e un ContinuationToken
corrispondente per ogni partizione. Per riprendere lo streaming delle nuove partizioni
dalla stessa posizione, devi effettuare una nuova richiesta ReadChangeStream
per ogni nuova partizione con il token corrispondente.
Ad esempio, se stai trasmettendo in streaming la partizione [A,C)
e questa si divide in due
partizioni, [A,B)
e [B,C)
, puoi aspettarti la seguente sequenza di
eventi:
ReadChangeStream(streamPartition = ByteStringRange(A, C)) receives:
CloseStream(
Status = OUT_OF_RANGE,
NewPartitions = List(ByteStringRange(A, B), ByteStringRange(B, C))
ChangeStreamContinuationTokens = List(foo, bar)
)
Per riprendere lo streaming di ogni partizione dalla stessa posizione, invia le seguenti richieste ReadChangeStreamQuery
:
ReadChangeStreamQuery queryAB =
ReadChangeStreamQuery.create("myTable")
.streamPartition(ByteStringRange(A, B))
.continuationTokens(List.of(foo));
ReadChangeStreamQuery queryBC =
ReadChangeStreamQuery.create("myTable")
.streamPartition(ByteStringRange(B, C))
.continuationTokens(List.of(bar));
Per un'unione, per riprendere dalla stessa partizione, devi inviare una nuova richiesta
ReadChangeStream
contenente ogni token delle partizioni unite.
Ad esempio, se stai trasmettendo in streaming due partizioni, [A,B)
e [B,C)
, e queste
vengono unite nella partizione [A,C)
, puoi aspettarti la seguente sequenza di eventi:
ReadChangeStream(streamPartition = ByteStringRange(A, B)) receives:
CloseStream(
Status = OUT_OF_RANGE,
NewPartitions = List(ByteStringRange(A, C)),
ChangeStreamContinuationTokens = List(foo)
)
ReadChangeStream(streamPartition = ByteStringRange(B, C)) receives:
CloseStream(
Status = OUT_OF_RANGE,
NewPartitions = List(ByteStringRange(A, C)),
ChangeStreamContinuationTokens = List(bar)
)
Per riprendere lo streaming della partizione [A, C)
dalla stessa posizione, invia un
ReadChangeStreamQuery
come il seguente:
ReadChangeStreamQuery query =
ReadChangeStreamQuery.create("myTable")
.streamPartition(ByteStringRange(A, C))
.continuationTokens(List.of(foo, bar));