Änderungsstream mit Java lesen
Die Cloud Bigtable-Clientbibliothek für Java bietet Low-Level-Methoden zur Verarbeitung von Datenänderungseinträgen. In den meisten Fällen empfehlen wir jedoch, Änderungen mit Dataflow zu streamen, anstatt die auf dieser Seite beschriebenen Methoden zu verwenden. In Dataflow werden nämlich Partitionen aufgeteilt und zusammengeführt.
Hinweise
Bevor Sie einen Änderungsstream mit Java lesen, sollten Sie sich mit der Übersicht über Änderungsstreams vertraut machen. Führen Sie dann die folgenden Schritte aus.
Authentifizierung einrichten
Wenn Sie die Java Beispiele auf dieser Seite in einer lokalen Entwicklungsumgebung verwenden möchten, installieren und initialisieren Sie die gcloud CLI und richten dann die Standardanmeldedaten für Anwendungen mit Ihren Nutzeranmeldedaten ein.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
If you're using a local shell, then create local authentication credentials for your user account:
gcloud auth application-default login
You don't need to do this if you're using Cloud Shell.
Weitere Informationen unter Set up authentication for a local development environment.
Informationen zum Einrichten der Authentifizierung für eine Produktionsumgebung finden Sie unter Set up Application Default Credentials for code running on Google Cloud.
Änderungsstream aktivieren
Sie müssen einen Änderungsstream für eine Tabelle aktivieren, bevor Sie sie lesen können. Sie können auch eine neue Tabelle erstellen, für die ein Änderungsstream aktiviert ist.
Erforderliche Rollen
Bitten Sie Ihren Administrator, Ihnen die folgende IAM-Rolle zuzuweisen, um die Berechtigungen zum Lesen eines Bigtable-Änderungsstreams zu erhalten.
- Bigtable-Administrator (
roles/bigtable.admin
) für die Bigtable-Instanz, die die Tabelle enthält, aus der Sie Änderungen streamen möchten
Java-Clientbibliothek als Abhängigkeit hinzufügen
Fügen Sie Ihrer Maven-pom.xml
-Datei Code ähnlich dem folgenden hinzu. Ersetzen Sie VERSION
durch die Version der Clientbibliothek, die Sie verwenden. Die Version muss 2.21.0 oder höher sein.
<dependencies>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bigtable</artifactId>
<version>VERSION</version>
</dependency>
</dependencies>
Partitionen der Tabelle ermitteln
Wenn Sie ReadChangeStream
-Anfragen senden möchten, müssen Sie die Partitionen Ihrer Tabelle kennen. Dies kann mithilfe der Methode GenerateInitialChangeStreamPartitions
ermittelt werden. Im folgenden Beispiel wird gezeigt, wie Sie mit dieser Methode einen Stream von ByteStringRanges
abrufen, der jede Partition in der Tabelle darstellt. Jede ByteStringRange
enthält den Start- und Endschlüssel für eine Partition.
ServerStream<ByteStringRange> partitionStream =
client.generateInitialChangeStreamPartitions("MyTable");
Änderungen für jede Partition verarbeiten
Anschließend können Sie Änderungen für jede Partition mit der ReadChangeStream
-Methode verarbeiten. Hier sehen Sie ein Beispiel für das Öffnen eines Streams für eine Partition, beginnend bei der aktuellen Zeit.
ReadChangeStreamQuery query =
ReadChangeStreamQuery.create("MyTable")
.streamPartition(partition)
.startTime(Instant.now());
ServerStream<ChangeStreamRecord> changeStream = client.readChangeStream(query);
ReadChangeStreamQuery
akzeptiert die folgenden Argumente:
- Streampartition (erforderlich): die Partition, aus der Änderungen gestreamt werden
- Eine der folgenden:
- Startzeit – Commit-Zeitstempel, ab dem die Verarbeitung von Änderungen beginnt
- Fortsetzungs-Token: Tokens, die eine Position angeben, von der aus der Stream fortgesetzt werden soll
- Endzeit (optional): Commit-Zeitstempel, bei dem die Verarbeitung von Änderungen beendet wird. Wenn Sie keinen Wert angeben, wird der Stream weiter gelesen.
- Herzschlagdauer (optional): Häufigkeit von Herzschlagnachrichten, wenn keine neuen Änderungen vorliegen (standardmäßig fünf Sekunden)
Format des Streameintrags ändern
Ein zurückgegebener Änderungsstream-Eintrag hat einen der drei Antworttypen:
ChangeStreamMutation
: Eine Nachricht, die einen Datenänderungsdatensatz darstellt.CloseStream
– Eine Nachricht, die angibt, dass der Client das Lesen aus dem Stream beenden soll.- Status: Gibt den Grund für das Schließen des Streams an. Eine der Folgenden:
OK
– die Endzeit für die angegebene Partition wurde erreichtOUT_OF_RANGE
– die angegebene Partition existiert nicht mehr. Das bedeutet, dass diese Partition aufgeteilt oder zusammengeführt wurde. Für jede neue Partition muss eine neueReadChangeStream
-Anfrage erstellt werden.
NewPartitions
: Enthält die aktualisierten Partitionierungsinformationen zuOUT_OF_RANGE
-Antworten.ChangeStreamContinuationTokens
– Liste der Tokens, mit denen neueReadChangeStream
-Anfragen an derselben Position fortgesetzt werden. Eine proNewPartition
.
- Status: Gibt den Grund für das Schließen des Streams an. Eine der Folgenden:
Heartbeat
– Eine regelmäßige Nachricht mit Informationen, die zum Prüfen des Status des Streams verwendet werden können.EstimatedLowWatermark
– Schätzung des niedrigen Wasserzeichens für die angegebene PartitionContinuationToken
– Token, um das Streaming der angegebenen Partition an der aktuellen Position fortzusetzen.
Inhalt von Datenänderungseinträgen
Informationen zu Änderungsstream-Datensätzen finden Sie unter Inhalt eines Datenänderungsdatensatzes.
Änderungen an Partitionen verarbeiten
Wenn sich die Partitionen einer Tabelle ändern, geben ReadChangeStream
-Anfragen eine CloseStream
-Nachricht mit den Informationen zurück, die zum Fortsetzen des Streamings von den neuen Partitionen erforderlich sind.
Bei einem Split enthält er mehrere neue Partitionen und eine entsprechende ContinuationToken
für jede Partition. Wenn du das Streaming der neuen Partitionen an derselben Position fortsetzen möchtest, musst du für jede neue Partition mit dem entsprechenden Token eine neue ReadChangeStream
-Anfrage stellen.
Wenn Sie beispielsweise die Partition [A,C)
streamen und sie in zwei Partitionen aufgeteilt wird, [A,B)
und [B,C)
, können Sie mit der folgenden Abfolge von Ereignissen rechnen:
ReadChangeStream(streamPartition = ByteStringRange(A, C)) receives:
CloseStream(
Status = OUT_OF_RANGE,
NewPartitions = List(ByteStringRange(A, B), ByteStringRange(B, C))
ChangeStreamContinuationTokens = List(foo, bar)
)
Wenn du das Streaming jeder Partition an derselben Position fortsetzen möchtest, sende die folgenden ReadChangeStreamQuery
-Anfragen:
ReadChangeStreamQuery queryAB =
ReadChangeStreamQuery.create("myTable")
.streamPartition(ByteStringRange(A, B))
.continuationTokens(List.of(foo));
ReadChangeStreamQuery queryBC =
ReadChangeStreamQuery.create("myTable")
.streamPartition(ByteStringRange(B, C))
.continuationTokens(List.of(bar));
Wenn Sie nach einer Zusammenführung von derselben Partition fortfahren möchten, müssen Sie eine neue ReadChangeStream
-Anfrage mit jedem Token aus den zusammengeführten Partitionen senden.
Wenn Sie beispielsweise zwei Partitionen ([A,B)
und [B,C)
) streamen und diese in Partition [A,C)
zusammengeführt werden, ist die folgende Abfolge von Ereignissen zu erwarten:
ReadChangeStream(streamPartition = ByteStringRange(A, B)) receives:
CloseStream(
Status = OUT_OF_RANGE,
NewPartitions = List(ByteStringRange(A, C)),
ChangeStreamContinuationTokens = List(foo)
)
ReadChangeStream(streamPartition = ByteStringRange(B, C)) receives:
CloseStream(
Status = OUT_OF_RANGE,
NewPartitions = List(ByteStringRange(A, C)),
ChangeStreamContinuationTokens = List(bar)
)
Wenn Sie das Streaming der Partition [A, C)
an derselben Position fortsetzen möchten, senden Sie eine ReadChangeStreamQuery
wie die folgende:
ReadChangeStreamQuery query =
ReadChangeStreamQuery.create("myTable")
.streamPartition(ByteStringRange(A, C))
.continuationTokens(List.of(foo, bar));