Änderungsstream mit Java lesen
Die Cloud Bigtable-Clientbibliothek für Java bietet Low-Level-Methoden zum Verarbeiten von Datenänderungseinträgen. In den meisten Fällen empfehlen wir jedoch, Änderungen mit Dataflow zu streamen, anstatt die auf dieser Seite beschriebenen Methoden zu verwenden. In Dataflow werden nämlich Partitionen aufgeteilt und zusammengeführt.
Hinweise
Bevor Sie einen Änderungsstream mit Java lesen, sollten Sie sich mit den Übersicht über Änderungsstreams Führen Sie dann folgende Schritte aus: die folgenden Voraussetzungen erfüllen.
Authentifizierung einrichten
Wenn Sie die Java Beispiele auf dieser Seite in einer lokalen Entwicklungsumgebung verwenden möchten, installieren und initialisieren Sie die gcloud CLI und richten dann die Standardanmeldedaten für Anwendungen mit Ihren Nutzeranmeldedaten ein.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
If you're using a local shell, then create local authentication credentials for your user account:
gcloud auth application-default login
You don't need to do this if you're using Cloud Shell.
Weitere Informationen unter Set up authentication for a local development environment.
Informationen zum Einrichten der Authentifizierung für eine Produktionsumgebung finden Sie unter Set up Application Default Credentials for code running on Google Cloud.
Änderungsstream aktivieren
Sie müssen eine Änderung aktivieren Stream auf einem Tisch liegen, bevor Sie sie lesen können. Sie können auch ein neues Tabelle mit aktiviertem Änderungsstream.
Erforderliche Rollen
Bitten Sie Ihren Administrator, Ihnen die folgende IAM-Rolle zuzuweisen, um die Berechtigungen zum Lesen eines Bigtable-Änderungsstreams zu erhalten.
- Bigtable-Administrator (
roles/bigtable.admin
) für die Bigtable-Instanz, die die Tabelle enthält, aus der Sie Änderungen streamen möchten
Java-Clientbibliothek als Abhängigkeit hinzufügen
Fügen Sie Ihrer Maven-Datei pom.xml
einen Code wie den folgenden hinzu. Ersetzen
VERSION
durch die Version der Clientbibliothek, die Sie verwenden.
verwenden. Die Version muss 2.21.0 oder höher sein.
<dependencies>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bigtable</artifactId>
<version>VERSION</version>
</dependency>
</dependencies>
Die Partitionen der Tabelle bestimmen
Um ReadChangeStream
-Anfragen senden zu können, müssen Sie die Partitionen von
Tabelle. Das lässt sich mit der Methode GenerateInitialChangeStreamPartitions
ermitteln. Im folgenden Beispiel wird gezeigt, wie Sie mit dieser Methode einen Stream von ByteStringRanges
abrufen, der jede Partition in der Tabelle darstellt. Jedes ByteStringRange
enthält den
Start- und Endschlüssel für eine Partition.
ServerStream<ByteStringRange> partitionStream =
client.generateInitialChangeStreamPartitions("MyTable");
Änderungen für jede Partition verarbeiten
Anschließend können Sie Änderungen für jede Partition mit dem ReadChangeStream
verarbeiten.
. Hier sehen Sie ein Beispiel für das Öffnen eines Streams für eine Partition, beginnend bei der aktuellen Zeit.
ReadChangeStreamQuery query =
ReadChangeStreamQuery.create("MyTable")
.streamPartition(partition)
.startTime(Instant.now());
ServerStream<ChangeStreamRecord> changeStream = client.readChangeStream(query);
ReadChangeStreamQuery
akzeptiert die folgenden Argumente:
- Streampartition (erforderlich): die Partition, aus der Änderungen gestreamt werden sollen
- Eine der folgenden:
- Startzeit – Commit-Zeitstempel, ab dem die Verarbeitung von Änderungen beginnt
- Fortsetzungs-Token: Tokens, die eine Position angeben, von der aus der Stream fortgesetzt werden soll
- Endzeit (optional): Commit-Zeitstempel, bei dem die Verarbeitung von Änderungen beendet wird. Wenn Sie keinen Wert angeben, wird der Stream weitergelesen.
- Heartbeat-Dauer (optional): Häufigkeit von Heartbeat-Nachrichten bei Es werden keine Änderungen vorgenommen (standardmäßig 5 Sekunden)
Format des Streameintrags ändern
Ein zurückgegebener Änderungsstream-Eintrag ist einer von drei Antworttypen:
ChangeStreamMutation
: Eine Nachricht, die einen Datenänderungsdatensatz darstellt.CloseStream
: eine Nachricht, die angibt, dass der Client den Lesevorgang beenden soll aus dem Stream.- Status: Gibt den Grund für das Schließen des Streams an. Eine der Folgenden:
OK
: Das Ende der angegebenen Partition wurde erreicht.OUT_OF_RANGE
: Die angegebene Partition ist nicht mehr vorhanden. Das bedeutet, dass Splits oder Merges bei dieser Partition stattgefunden haben. Für jede neue Partition muss eine neueReadChangeStream
-Anfrage erstellt werden.
NewPartitions
: Enthält die aktualisierten Partitionierungsinformationen zuOUT_OF_RANGE
-Antworten.ChangeStreamContinuationTokens
– Liste der Tokens, die zum Fortsetzen verwendet werden neueReadChangeStream
-Anfragen von derselben Position Eine proNewPartition
.
- Status: Gibt den Grund für das Schließen des Streams an. Eine der Folgenden:
Heartbeat
– Eine regelmäßige Nachricht mit Informationen, die zum Prüfen des Status des Streams verwendet werden können.EstimatedLowWatermark
– Schätzung des niedrigen Wasserzeichens für die angegebene PartitionContinuationToken
– Token, um das Streaming der angegebenen Partition an der aktuellen Position fortzusetzen.
Inhalt von Datenänderungseinträgen
Informationen zu Änderungsstream-Datensätzen finden Sie unter Inhalt eines Datenänderungsdatensatzes.
Änderungen an Partitionen verarbeiten
Wenn sich die Partitionen einer Tabelle ändern, geben ReadChangeStream
-Anfragen eine CloseStream
-Nachricht mit den Informationen zurück, die zum Fortsetzen des Streamings von den neuen Partitionen erforderlich sind.
Bei einem Split enthält dieser mehrere neue Partitionen und eine entsprechende
ContinuationToken
für jede Partition. Wenn du das Streaming der neuen Partitionen an derselben Position fortsetzen möchtest, musst du für jede neue Partition mit dem entsprechenden Token eine neue ReadChangeStream
-Anfrage stellen.
Wenn Sie beispielsweise die Partition [A,C)
streamen und diese in zwei Teile
[A,B)
und [B,C)
haben, können Sie die folgende Reihenfolge
Events:
ReadChangeStream(streamPartition = ByteStringRange(A, C)) receives:
CloseStream(
Status = OUT_OF_RANGE,
NewPartitions = List(ByteStringRange(A, B), ByteStringRange(B, C))
ChangeStreamContinuationTokens = List(foo, bar)
)
Um das Streaming jeder Partition von derselben Position aus fortzusetzen, senden Sie den Befehl
folgende ReadChangeStreamQuery
-Anfragen:
ReadChangeStreamQuery queryAB =
ReadChangeStreamQuery.create("myTable")
.streamPartition(ByteStringRange(A, B))
.continuationTokens(List.of(foo));
ReadChangeStreamQuery queryBC =
ReadChangeStreamQuery.create("myTable")
.streamPartition(ByteStringRange(B, C))
.continuationTokens(List.of(bar));
Wenn Sie nach einer Zusammenführung von derselben Partition fortfahren möchten, müssen Sie eine neue ReadChangeStream
-Anfrage mit jedem Token aus den zusammengeführten Partitionen senden.
Wenn Sie beispielsweise die zwei Partitionen [A,B)
und [B,C)
streamen,
in Partition [A,C)
zusammengeführt werden, können Sie mit der folgenden Ereignissequenz rechnen:
ReadChangeStream(streamPartition = ByteStringRange(A, B)) receives:
CloseStream(
Status = OUT_OF_RANGE,
NewPartitions = List(ByteStringRange(A, C)),
ChangeStreamContinuationTokens = List(foo)
)
ReadChangeStream(streamPartition = ByteStringRange(B, C)) receives:
CloseStream(
Status = OUT_OF_RANGE,
NewPartitions = List(ByteStringRange(A, C)),
ChangeStreamContinuationTokens = List(bar)
)
Um das Streaming der Partition [A, C)
an derselben Position fortzusetzen, senden Sie eine
ReadChangeStreamQuery
wie folgt:
ReadChangeStreamQuery query =
ReadChangeStreamQuery.create("myTable")
.streamPartition(ByteStringRange(A, C))
.continuationTokens(List.of(foo, bar));