Änderungsstream mit Java lesen
Die Cloud Bigtable-Clientbibliothek für Java bietet einfache Methoden zum Verarbeiten von Datenänderungseinträgen. Bevor Sie dieses Dokument lesen, sollten Sie die Übersicht über Änderungsstreams lesen.
Hinweise
Bevor Sie einen Änderungsstream mit Java lesen, müssen Sie die folgenden Voraussetzungen erfüllen.
Authentifizierung einrichten
Wenn Sie die Java-Beispiele auf dieser Seite aus einer lokalen Entwicklungsumgebung heraus verwenden möchten, installieren und initialisieren Sie die gcloud CLI und richten dann die Standardanmeldedaten für Anwendungen mit Ihren Nutzeranmeldedaten ein.
- Installieren Sie die Google Cloud CLI.
-
Führen Sie folgenden Befehl aus, um die gcloud CLI zu initialisieren:
gcloud init
-
Erstellen Sie lokale Anmeldedaten zur Authentifizierung für Ihr Google-Konto:
gcloud auth application-default login
Weitere Informationen: Authentifizierung für eine lokale Entwicklungsumgebung einrichten.
Informationen zum Einrichten der Authentifizierung für eine Produktionsumgebung finden Sie unter Standardanmeldedaten für Anwendungen für Code einrichten, der in Google Cloud ausgeführt wird.
Änderungsstream aktivieren
Sie müssen einen Änderungsstream für eine Tabelle aktivieren, bevor Sie sie lesen können. Sie können auch eine neue Tabelle erstellen, für die ein Änderungsstream aktiviert ist.
Erforderliche Rollen
Bitten Sie Ihren Administrator, Ihnen die folgende IAM-Rolle zu gewähren, um die Berechtigungen zu erhalten, die Sie zum Lesen eines Bigtable-Änderungsstreams benötigen.
- Bigtable-Administrator (
roles/bigtable.admin
) auf der Bigtable-Instanz, die die Tabelle enthält, aus der Sie Änderungen streamen möchten
Java-Clientbibliothek als Abhängigkeit hinzufügen
Fügen Sie Ihrer Maven-Datei pom.xml
einen Code ähnlich dem folgenden hinzu. Ersetzen Sie VERSION
durch die Version der verwendeten Clientbibliothek. Die Version muss 2.21.0 oder höher sein.
<dependencies>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bigtable</artifactId>
<version>VERSION</version>
</dependency>
</dependencies>
Die Partitionen der Tabelle ermitteln
Um ReadChangeStream
-Anfragen zu senden, müssen Sie die Partitionen Ihrer Tabelle kennen. Dies kann mit der Methode GenerateInitialChangeStreamPartitions
ermittelt werden. Das folgende Beispiel zeigt, wie Sie mit dieser Methode einen ByteStringRanges
-Stream abrufen können, der jede Partition in der Tabelle darstellt. Jeder ByteStringRange
enthält den Start- und den Endschlüssel für eine Partition.
ServerStream<ByteStringRange> partitionStream =
client.generateInitialChangeStreamPartitions("MyTable");
Änderungen für jede Partition verarbeiten
Mit der Methode ReadChangeStream
können Sie dann Änderungen für jede Partition verarbeiten. Dies ist ein Beispiel dafür, wie ein Stream für eine Partition ab der aktuellen Uhrzeit geöffnet wird.
ReadChangeStreamQuery query =
ReadChangeStreamQuery.create("MyTable")
.streamPartition(partition)
.startTime(Instant.now());
ServerStream<ChangeStreamRecord> changeStream = client.readChangeStream(query);
ReadChangeStreamQuery
akzeptiert die folgenden Argumente:
- Streampartition (erforderlich): die Partition, aus der Änderungen gestreamt werden sollen
- Eine der folgenden:
- Beginn: Commit-Zeitstempel für Beginn der Verarbeitung von Änderungen ab
- Fortsetzungstokens: Tokens, die eine Position darstellen, von der das Streaming fortgesetzt werden kann
- Ende (optional): Commit-Zeitstempel, um die Verarbeitung von Änderungen nach Erreichen des Ziels zu stoppen. Wenn du keinen Wert angibst, wird der Stream weiter gelesen.
- Heartbeat-Dauer (optional) – Häufigkeit von Heartbeat-Nachrichten, wenn keine neuen Änderungen vorgenommen wurden (standardmäßig fünf Sekunden)
Format des Streameintrags ändern
Ein Änderungsstream-Eintrag wird als einer von drei Antworttypen zurückgegeben:
ChangeStreamMutation
: Eine Nachricht, die einen Datensatz für eine Datenänderung darstellt.CloseStream
: Eine Meldung, die angibt, dass der Client nicht mehr aus dem Stream lesen soll.- Status: Gibt den Grund für das Schließen des Streams an. Eine der Folgenden:
OK
: Das Ende für die angegebene Partition wurde erreicht.OUT_OF_RANGE
: Die angegebene Partition ist nicht mehr vorhanden, d. h., sie wurden aufgeteilt oder zusammengeführt. Für jede neue Partition muss eine neueReadChangeStream
-Anfrage erstellt werden.
NewPartitions
gibt die aktualisierten Partitionierungsinformationen zuOUT_OF_RANGE
-Antworten an.ChangeStreamContinuationTokens
: Liste der Tokens, die zum Fortsetzen neuerReadChangeStream
-Anfragen von derselben Position verwendet werden. Eine proNewPartition
.
- Status: Gibt den Grund für das Schließen des Streams an. Eine der Folgenden:
Heartbeat
: Eine regelmäßige Nachricht mit Informationen, mit denen der Status des Streams geprüft werden kann.EstimatedLowWatermark
– Schätzung des niedrigen Wasserzeichens für die angegebene PartitionContinuationToken
: Token, mit dem das Streaming der angegebenen Partition ab der aktuellen Position fortgesetzt wird.
Inhalt des Datenänderungseintrags
Jeder Datensatz für die Datenänderung enthält Folgendes:
- Einträge: Änderungen, die an der Zeile vorgenommen wurden, u. a. eine oder mehrere der folgenden Elemente:
- Schreiben
- Spaltenfamilie
- Spaltenqualifizierer
- Zeitstempel
- Wert
- Zellen löschen
- Spaltenfamilie
- Spaltenqualifizierer
- Zeitstempelbereich
- Löschen einer Spaltenfamilie
- Spaltenfamilie
- Löschen aus einer Zeile: Das Löschen aus einer Zeile wird in eine Liste von Löschvorgängen aus Spaltenfamilien für jede Spaltenfamilie konvertiert, in der die Zeile Daten enthält.
- Schreiben
- Zeilenschlüssel: die Kennung für die geänderte Zeile
- Art der Änderung – entweder vom Nutzer initiiert oder automatische Speicherbereinigung
- ID des Clusters, der die Änderung erhalten hat
- Commit-Zeitstempel – serverseitige Zeit, zu der die Änderung in der Tabelle per Commit gespeichert wurde
- Tie Breaker: Ein Wert, mit dem die Anwendung, die den Stream liest, die integrierte Konfliktlösungsrichtlinie von Bigtable verwenden kann.
- Token: Dieses Token wird von der nutzenden Anwendung verwendet, um den Stream bei einer Unterbrechung fortzusetzen.
- Geschätztes geringes Wasserzeichen: die geschätzte Zeit, seit die Partition des Eintrags die Replikation über alle Cluster hinweg erreicht hat. Weitere Informationen finden Sie unter Partitionen und Wasserzeichen.
Weitere Informationen zu den Feldern in einem Datenänderungseintrag finden Sie in der API-Referenz für ReadChangeStream
.
Änderungen an Partitionen verarbeiten
Wenn sich die Partitionen einer Tabelle ändern, geben ReadChangeStream
-Anfragen eine CloseStream
-Nachricht mit den Informationen zurück, die zum Fortsetzen des Streamings der neuen Partition(en) erforderlich sind.
Bei einem Split enthält dies mehrere neue Partitionen und eine entsprechende ContinuationToken
für jede Partition. Wenn Sie das Streaming der neuen Partitionen von derselben Position fortsetzen möchten, stellen Sie für jede neue Partition mit dem entsprechenden Token eine neue ReadChangeStream
-Anfrage.
Wenn Sie beispielsweise die Partition [A,C)
streamen und diese in die beiden Partitionen [A,B)
und [B,C)
aufteilt, können Sie mit der folgenden Ereignisabfolge rechnen:
ReadChangeStream(streamPartition = ByteStringRange(A, C)) receives:
CloseStream(
Status = OUT_OF_RANGE,
NewPartitions = List(ByteStringRange(A, B), ByteStringRange(B, C))
ChangeStreamContinuationTokens = List(foo, bar)
)
Senden Sie die folgenden ReadChangeStreamQuery
-Anfragen, um das Streaming jeder Partition von derselben Position aus fortzusetzen:
ReadChangeStreamQuery queryAB =
ReadChangeStreamQuery.create("myTable")
.streamPartition(ByteStringRange(A, B))
.continuationTokens(List.of(foo));
ReadChangeStreamQuery queryBC =
ReadChangeStreamQuery.create("myTable")
.streamPartition(ByteStringRange(B, C))
.continuationTokens(List.of(bar));
Damit eine Zusammenführung von derselben Partition fortgesetzt wird, müssen Sie eine neue ReadChangeStream
-Anfrage senden, die jedes Token der zusammengeführten Partitionen enthält.
Wenn Sie beispielsweise zwei Partitionen [A,B)
und [B,C)
streamen und diese in die Partition [A,C)
zusammengeführt werden, können Sie mit der folgenden Ereignisabfolge rechnen:
ReadChangeStream(streamPartition = ByteStringRange(A, B)) receives:
CloseStream(
Status = OUT_OF_RANGE,
NewPartitions = List(ByteStringRange(A, C)),
ChangeStreamContinuationTokens = List(foo)
)
ReadChangeStream(streamPartition = ByteStringRange(B, C)) receives:
CloseStream(
Status = OUT_OF_RANGE,
NewPartitions = List(ByteStringRange(A, C)),
ChangeStreamContinuationTokens = List(bar)
)
Wenn Sie das Streaming der Partition [A, C)
an derselben Position fortsetzen möchten, senden Sie eine ReadChangeStreamQuery
wie die folgende:
ReadChangeStreamQuery query =
ReadChangeStreamQuery.create("myTable")
.streamPartition(ByteStringRange(A, C))
.continuationTokens(List.of(foo, bar));