Änderungsstream mit Java lesen

Die Cloud Bigtable-Clientbibliothek für Java bietet einfache Methoden zum Verarbeiten von Datenänderungseinträgen. Bevor Sie dieses Dokument lesen, sollten Sie die Übersicht über Änderungsstreams lesen.

Hinweise

Bevor Sie einen Änderungsstream mit Java lesen, müssen Sie die folgenden Voraussetzungen erfüllen.

Authentifizierung einrichten

Wenn Sie die Java-Beispiele auf dieser Seite aus einer lokalen Entwicklungsumgebung heraus verwenden möchten, installieren und initialisieren Sie die gcloud CLI und richten dann die Standardanmeldedaten für Anwendungen mit Ihren Nutzeranmeldedaten ein.

  1. Installieren Sie die Google Cloud CLI.
  2. Führen Sie folgenden Befehl aus, um die gcloud CLI zu initialisieren:

    gcloud init
  3. Erstellen Sie lokale Anmeldedaten zur Authentifizierung für Ihr Google-Konto:

    gcloud auth application-default login

Weitere Informationen: Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

Informationen zum Einrichten der Authentifizierung für eine Produktionsumgebung finden Sie unter Standardanmeldedaten für Anwendungen für Code einrichten, der in Google Cloud ausgeführt wird.

Änderungsstream aktivieren

Sie müssen einen Änderungsstream für eine Tabelle aktivieren, bevor Sie sie lesen können. Sie können auch eine neue Tabelle erstellen, für die ein Änderungsstream aktiviert ist.

Erforderliche Rollen

Bitten Sie Ihren Administrator, Ihnen die folgende IAM-Rolle zu gewähren, um die Berechtigungen zu erhalten, die Sie zum Lesen eines Bigtable-Änderungsstreams benötigen.

  • Bigtable-Administrator (roles/bigtable.admin) auf der Bigtable-Instanz, die die Tabelle enthält, aus der Sie Änderungen streamen möchten

Java-Clientbibliothek als Abhängigkeit hinzufügen

Fügen Sie Ihrer Maven-Datei pom.xml einen Code ähnlich dem folgenden hinzu. Ersetzen Sie VERSION durch die Version der verwendeten Clientbibliothek. Die Version muss 2.21.0 oder höher sein.

<dependencies>
  <dependency>
    <groupId>com.google.cloud</groupId>
    <artifactId>google-cloud-bigtable</artifactId>
    <version>VERSION</version>
  </dependency>
</dependencies>

Die Partitionen der Tabelle ermitteln

Um ReadChangeStream-Anfragen zu senden, müssen Sie die Partitionen Ihrer Tabelle kennen. Dies kann mit der Methode GenerateInitialChangeStreamPartitions ermittelt werden. Das folgende Beispiel zeigt, wie Sie mit dieser Methode einen ByteStringRanges-Stream abrufen können, der jede Partition in der Tabelle darstellt. Jeder ByteStringRange enthält den Start- und den Endschlüssel für eine Partition.

ServerStream<ByteStringRange> partitionStream =
    client.generateInitialChangeStreamPartitions("MyTable");

Änderungen für jede Partition verarbeiten

Mit der Methode ReadChangeStream können Sie dann Änderungen für jede Partition verarbeiten. Dies ist ein Beispiel dafür, wie ein Stream für eine Partition ab der aktuellen Uhrzeit geöffnet wird.

ReadChangeStreamQuery query =
    ReadChangeStreamQuery.create("MyTable")
        .streamPartition(partition)
        .startTime(Instant.now());
ServerStream<ChangeStreamRecord> changeStream = client.readChangeStream(query);

ReadChangeStreamQuery akzeptiert die folgenden Argumente:

  • Streampartition (erforderlich): die Partition, aus der Änderungen gestreamt werden sollen
  • Eine der folgenden:
    • Beginn: Commit-Zeitstempel für Beginn der Verarbeitung von Änderungen ab
    • Fortsetzungstokens: Tokens, die eine Position darstellen, von der das Streaming fortgesetzt werden kann
  • Ende (optional): Commit-Zeitstempel, um die Verarbeitung von Änderungen nach Erreichen des Ziels zu stoppen. Wenn du keinen Wert angibst, wird der Stream weiter gelesen.
  • Heartbeat-Dauer (optional) – Häufigkeit von Heartbeat-Nachrichten, wenn keine neuen Änderungen vorgenommen wurden (standardmäßig fünf Sekunden)

Format des Streameintrags ändern

Ein Änderungsstream-Eintrag wird als einer von drei Antworttypen zurückgegeben:

  • ChangeStreamMutation: Eine Nachricht, die einen Datensatz für eine Datenänderung darstellt.

  • CloseStream: Eine Meldung, die angibt, dass der Client nicht mehr aus dem Stream lesen soll.

    • Status: Gibt den Grund für das Schließen des Streams an. Eine der Folgenden:
      • OK: Das Ende für die angegebene Partition wurde erreicht.
      • OUT_OF_RANGE: Die angegebene Partition ist nicht mehr vorhanden, d. h., sie wurden aufgeteilt oder zusammengeführt. Für jede neue Partition muss eine neue ReadChangeStream-Anfrage erstellt werden.
    • NewPartitions gibt die aktualisierten Partitionierungsinformationen zu OUT_OF_RANGE-Antworten an.
    • ChangeStreamContinuationTokens: Liste der Tokens, die zum Fortsetzen neuer ReadChangeStream-Anfragen von derselben Position verwendet werden. Eine pro NewPartition.
  • Heartbeat: Eine regelmäßige Nachricht mit Informationen, mit denen der Status des Streams geprüft werden kann.

    • EstimatedLowWatermark – Schätzung des niedrigen Wasserzeichens für die angegebene Partition
    • ContinuationToken: Token, mit dem das Streaming der angegebenen Partition ab der aktuellen Position fortgesetzt wird.

Inhalt des Datenänderungseintrags

Jeder Datensatz für die Datenänderung enthält Folgendes:

  • Einträge: Änderungen, die an der Zeile vorgenommen wurden, u. a. eine oder mehrere der folgenden Elemente:
    • Schreiben
      • Spaltenfamilie
      • Spaltenqualifizierer
      • Zeitstempel
      • Wert
    • Zellen löschen
      • Spaltenfamilie
      • Spaltenqualifizierer
      • Zeitstempelbereich
    • Löschen einer Spaltenfamilie
      • Spaltenfamilie
      • Löschen aus einer Zeile: Das Löschen aus einer Zeile wird in eine Liste von Löschvorgängen aus Spaltenfamilien für jede Spaltenfamilie konvertiert, in der die Zeile Daten enthält.
  • Zeilenschlüssel: die Kennung für die geänderte Zeile
  • Art der Änderung – entweder vom Nutzer initiiert oder automatische Speicherbereinigung
  • ID des Clusters, der die Änderung erhalten hat
  • Commit-Zeitstempel – serverseitige Zeit, zu der die Änderung in der Tabelle per Commit gespeichert wurde
  • Tie Breaker: Ein Wert, mit dem die Anwendung, die den Stream liest, die integrierte Konfliktlösungsrichtlinie von Bigtable verwenden kann.
  • Token: Dieses Token wird von der nutzenden Anwendung verwendet, um den Stream bei einer Unterbrechung fortzusetzen.
  • Geschätztes geringes Wasserzeichen: die geschätzte Zeit, seit die Partition des Eintrags die Replikation über alle Cluster hinweg erreicht hat. Weitere Informationen finden Sie unter Partitionen und Wasserzeichen.

Weitere Informationen zu den Feldern in einem Datenänderungseintrag finden Sie in der API-Referenz für ReadChangeStream.

Änderungen an Partitionen verarbeiten

Wenn sich die Partitionen einer Tabelle ändern, geben ReadChangeStream-Anfragen eine CloseStream-Nachricht mit den Informationen zurück, die zum Fortsetzen des Streamings der neuen Partition(en) erforderlich sind.

Bei einem Split enthält dies mehrere neue Partitionen und eine entsprechende ContinuationToken für jede Partition. Wenn Sie das Streaming der neuen Partitionen von derselben Position fortsetzen möchten, stellen Sie für jede neue Partition mit dem entsprechenden Token eine neue ReadChangeStream-Anfrage.

Wenn Sie beispielsweise die Partition [A,C) streamen und diese in die beiden Partitionen [A,B) und [B,C) aufteilt, können Sie mit der folgenden Ereignisabfolge rechnen:

ReadChangeStream(streamPartition = ByteStringRange(A, C)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, B), ByteStringRange(B, C))
    ChangeStreamContinuationTokens = List(foo, bar)
)

Senden Sie die folgenden ReadChangeStreamQuery-Anfragen, um das Streaming jeder Partition von derselben Position aus fortzusetzen:

ReadChangeStreamQuery queryAB =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(A, B))
        .continuationTokens(List.of(foo));

ReadChangeStreamQuery queryBC =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(B, C))
        .continuationTokens(List.of(bar));

Damit eine Zusammenführung von derselben Partition fortgesetzt wird, müssen Sie eine neue ReadChangeStream-Anfrage senden, die jedes Token der zusammengeführten Partitionen enthält.

Wenn Sie beispielsweise zwei Partitionen [A,B) und [B,C) streamen und diese in die Partition [A,C) zusammengeführt werden, können Sie mit der folgenden Ereignisabfolge rechnen:

ReadChangeStream(streamPartition = ByteStringRange(A, B)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, C)),
    ChangeStreamContinuationTokens = List(foo)
)

ReadChangeStream(streamPartition = ByteStringRange(B, C)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, C)),
    ChangeStreamContinuationTokens = List(bar)
)

Wenn Sie das Streaming der Partition [A, C) an derselben Position fortsetzen möchten, senden Sie eine ReadChangeStreamQuery wie die folgende:

ReadChangeStreamQuery query =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(A, C))
        .continuationTokens(List.of(foo, bar));

Nächste Schritte