Änderungsstream mit Java lesen

Die Cloud Bigtable-Clientbibliothek für Java bietet Low-Level-Methoden zur Verarbeitung von Datenänderungseinträgen. In den meisten Fällen empfehlen wir jedoch, Änderungen mit Dataflow zu streamen, anstatt die auf dieser Seite beschriebenen Methoden zu verwenden. In Dataflow werden nämlich Partitionen aufgeteilt und zusammengeführt.

Hinweise

Bevor Sie einen Änderungsstream mit Java lesen, sollten Sie sich mit der Übersicht über Änderungsstreams vertraut machen. Führen Sie dann die folgenden Schritte aus.

Authentifizierung einrichten

Wenn Sie die Java Beispiele auf dieser Seite in einer lokalen Entwicklungsumgebung verwenden möchten, installieren und initialisieren Sie die gcloud CLI und richten dann die Standardanmeldedaten für Anwendungen mit Ihren Nutzeranmeldedaten ein.

  1. Install the Google Cloud CLI.
  2. To initialize the gcloud CLI, run the following command:

    gcloud init
  3. If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

Weitere Informationen unter Set up authentication for a local development environment.

Informationen zum Einrichten der Authentifizierung für eine Produktionsumgebung finden Sie unter Set up Application Default Credentials for code running on Google Cloud.

Änderungsstream aktivieren

Sie müssen einen Änderungsstream für eine Tabelle aktivieren, bevor Sie sie lesen können. Sie können auch eine neue Tabelle erstellen, für die ein Änderungsstream aktiviert ist.

Erforderliche Rollen

Bitten Sie Ihren Administrator, Ihnen die folgende IAM-Rolle zuzuweisen, um die Berechtigungen zum Lesen eines Bigtable-Änderungsstreams zu erhalten.

  • Bigtable-Administrator (roles/bigtable.admin) für die Bigtable-Instanz, die die Tabelle enthält, aus der Sie Änderungen streamen möchten

Java-Clientbibliothek als Abhängigkeit hinzufügen

Fügen Sie Ihrer Maven-pom.xml-Datei Code ähnlich dem folgenden hinzu. Ersetzen Sie VERSION durch die Version der Clientbibliothek, die Sie verwenden. Die Version muss 2.21.0 oder höher sein.

<dependencies>
  <dependency>
    <groupId>com.google.cloud</groupId>
    <artifactId>google-cloud-bigtable</artifactId>
    <version>VERSION</version>
  </dependency>
</dependencies>

Partitionen der Tabelle ermitteln

Damit Sie ReadChangeStream-Anfragen stellen können, müssen Sie die Partitionen Ihrer Tabelle kennen. Dies kann mithilfe der Methode GenerateInitialChangeStreamPartitions ermittelt werden. Im folgenden Beispiel wird gezeigt, wie Sie mit dieser Methode einen Stream von ByteStringRanges abrufen, der jede Partition in der Tabelle darstellt. Jede ByteStringRange enthält den Start- und Endschlüssel für eine Partition.

ServerStream<ByteStringRange> partitionStream =
    client.generateInitialChangeStreamPartitions("MyTable");

Änderungen für jede Partition verarbeiten

Anschließend können Sie Änderungen für jede Partition mit der ReadChangeStream-Methode verarbeiten. Hier sehen Sie ein Beispiel für das Öffnen eines Streams für eine Partition, beginnend mit der aktuellen Zeit.

ReadChangeStreamQuery query =
    ReadChangeStreamQuery.create("MyTable")
        .streamPartition(partition)
        .startTime(Instant.now());
ServerStream<ChangeStreamRecord> changeStream = client.readChangeStream(query);

ReadChangeStreamQuery akzeptiert die folgenden Argumente:

  • Streampartition (erforderlich): die Partition, aus der Änderungen gestreamt werden
  • Eine der folgenden:
    • Startzeit – Commit-Zeitstempel, ab dem die Verarbeitung von Änderungen beginnt
    • Fortsetzungs-Token: Tokens, die eine Position angeben, von der aus der Stream fortgesetzt werden soll
  • Endzeit (optional): Commit-Zeitstempel, bei dem die Verarbeitung von Änderungen beendet wird. Wenn Sie keinen Wert angeben, wird der Stream weiter gelesen.
  • Herzschlagdauer (optional): Häufigkeit von Herzschlagnachrichten, wenn keine neuen Änderungen vorliegen (standardmäßig fünf Sekunden)

Format des Streameintrags ändern

Ein zurückgegebener Änderungsstream-Eintrag ist einer von drei Antworttypen:

  • ChangeStreamMutation: Eine Nachricht, die einen Datenänderungsdatensatz darstellt.

  • CloseStream – Eine Nachricht, die angibt, dass der Client das Lesen aus dem Stream beenden soll.

    • Status: Gibt den Grund für das Schließen des Streams an. Eine der Folgenden:
      • OK – die Endzeit für die angegebene Partition wurde erreicht
      • OUT_OF_RANGE – die angegebene Partition existiert nicht mehr. Das bedeutet, dass diese Partition aufgeteilt oder zusammengeführt wurde. Für jede neue Partition muss eine neue ReadChangeStream-Anfrage erstellt werden.
    • NewPartitions: Enthält die aktualisierten Partitionierungsinformationen zu OUT_OF_RANGE-Antworten.
    • ChangeStreamContinuationTokens – Liste der Tokens, mit denen neue ReadChangeStream-Anfragen an derselben Position fortgesetzt werden. Eine pro NewPartition.
  • Heartbeat – Eine periodische Nachricht mit Informationen, die zum Prüfen des Status des Streams verwendet werden können.

    • EstimatedLowWatermark – Schätzung des niedrigen Wasserzeichens für die angegebene Partition
    • ContinuationToken – Token, um das Streaming der angegebenen Partition an der aktuellen Position fortzusetzen.

Inhalt von Datenänderungseinträgen

Informationen zu Änderungsstream-Datensätzen finden Sie unter Inhalt eines Datenänderungsdatensatzes.

Änderungen an Partitionen verarbeiten

Wenn sich die Partitionen einer Tabelle ändern, geben ReadChangeStream-Anfragen eine CloseStream-Nachricht mit den Informationen zurück, die zum Fortsetzen des Streamings von den neuen Partitionen erforderlich sind.

Bei einem Split enthält er mehrere neue Partitionen und eine entsprechende ContinuationToken für jede Partition. Wenn du das Streaming der neuen Partitionen an derselben Position fortsetzen möchtest, musst du für jede neue Partition mit dem entsprechenden Token eine neue ReadChangeStream-Anfrage stellen.

Wenn Sie beispielsweise die Partition [A,C) streamen und sie in zwei Partitionen aufgeteilt wird, [A,B) und [B,C), können Sie mit der folgenden Abfolge von Ereignissen rechnen:

ReadChangeStream(streamPartition = ByteStringRange(A, C)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, B), ByteStringRange(B, C))
    ChangeStreamContinuationTokens = List(foo, bar)
)

Wenn du das Streaming jeder Partition an derselben Position fortsetzen möchtest, sende die folgenden ReadChangeStreamQuery-Anfragen:

ReadChangeStreamQuery queryAB =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(A, B))
        .continuationTokens(List.of(foo));

ReadChangeStreamQuery queryBC =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(B, C))
        .continuationTokens(List.of(bar));

Wenn Sie nach einer Zusammenführung von derselben Partition fortfahren möchten, müssen Sie eine neue ReadChangeStream-Anfrage mit jedem Token aus den zusammengeführten Partitionen senden.

Wenn Sie beispielsweise zwei Partitionen ([A,B) und [B,C)) streamen und diese in Partition [A,C) zusammengeführt werden, ist die folgende Abfolge von Ereignissen zu erwarten:

ReadChangeStream(streamPartition = ByteStringRange(A, B)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, C)),
    ChangeStreamContinuationTokens = List(foo)
)

ReadChangeStream(streamPartition = ByteStringRange(B, C)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, C)),
    ChangeStreamContinuationTokens = List(bar)
)

Wenn Sie das Streaming der Partition [A, C) an derselben Position fortsetzen möchten, senden Sie eine ReadChangeStreamQuery wie die folgende:

ReadChangeStreamQuery query =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(A, C))
        .continuationTokens(List.of(foo, bar));

Nächste Schritte