Änderungsstream mit Java lesen

Die Cloud Bigtable-Clientbibliothek für Java bietet Low-Level-Methoden zum Verarbeiten von Datenänderungseinträgen. In den meisten Fällen empfehlen wir jedoch, Änderungen mit Dataflow zu streamen, anstatt die auf dieser Seite beschriebenen Methoden zu verwenden. In Dataflow werden nämlich Partitionen aufgeteilt und zusammengeführt.

Hinweise

Bevor Sie einen Änderungsstream mit Java lesen, sollten Sie sich mit den Übersicht über Änderungsstreams Führen Sie dann folgende Schritte aus: die folgenden Voraussetzungen erfüllen.

Authentifizierung einrichten

Wenn Sie die Java Beispiele auf dieser Seite in einer lokalen Entwicklungsumgebung verwenden möchten, installieren und initialisieren Sie die gcloud CLI und richten dann die Standardanmeldedaten für Anwendungen mit Ihren Nutzeranmeldedaten ein.

  1. Install the Google Cloud CLI.
  2. To initialize the gcloud CLI, run the following command:

    gcloud init
  3. If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

Weitere Informationen unter Set up authentication for a local development environment.

Informationen zum Einrichten der Authentifizierung für eine Produktionsumgebung finden Sie unter Set up Application Default Credentials for code running on Google Cloud.

Änderungsstream aktivieren

Sie müssen eine Änderung aktivieren Stream auf einem Tisch liegen, bevor Sie sie lesen können. Sie können auch ein neues Tabelle mit aktiviertem Änderungsstream.

Erforderliche Rollen

Bitten Sie Ihren Administrator, Ihnen die folgende IAM-Rolle zuzuweisen, um die Berechtigungen zum Lesen eines Bigtable-Änderungsstreams zu erhalten.

  • Bigtable-Administrator (roles/bigtable.admin) für die Bigtable-Instanz, die die Tabelle enthält, aus der Sie Änderungen streamen möchten

Java-Clientbibliothek als Abhängigkeit hinzufügen

Fügen Sie Ihrer Maven-Datei pom.xml einen Code wie den folgenden hinzu. Ersetzen VERSION durch die Version der Clientbibliothek, die Sie verwenden. verwenden. Die Version muss 2.21.0 oder höher sein.

<dependencies>
  <dependency>
    <groupId>com.google.cloud</groupId>
    <artifactId>google-cloud-bigtable</artifactId>
    <version>VERSION</version>
  </dependency>
</dependencies>

Die Partitionen der Tabelle bestimmen

Um ReadChangeStream-Anfragen senden zu können, müssen Sie die Partitionen von Tabelle. Das lässt sich mit der Methode GenerateInitialChangeStreamPartitions ermitteln. Im folgenden Beispiel wird gezeigt, wie Sie mit dieser Methode einen Stream von ByteStringRanges abrufen, der jede Partition in der Tabelle darstellt. Jedes ByteStringRange enthält den Start- und Endschlüssel für eine Partition.

ServerStream<ByteStringRange> partitionStream =
    client.generateInitialChangeStreamPartitions("MyTable");

Änderungen für jede Partition verarbeiten

Anschließend können Sie Änderungen für jede Partition mit dem ReadChangeStream verarbeiten. . Hier sehen Sie ein Beispiel für das Öffnen eines Streams für eine Partition, beginnend bei der aktuellen Zeit.

ReadChangeStreamQuery query =
    ReadChangeStreamQuery.create("MyTable")
        .streamPartition(partition)
        .startTime(Instant.now());
ServerStream<ChangeStreamRecord> changeStream = client.readChangeStream(query);

ReadChangeStreamQuery akzeptiert die folgenden Argumente:

  • Streampartition (erforderlich): die Partition, aus der Änderungen gestreamt werden sollen
  • Eine der folgenden:
    • Startzeit – Commit-Zeitstempel, ab dem die Verarbeitung von Änderungen beginnt
    • Fortsetzungs-Token: Tokens, die eine Position angeben, von der aus der Stream fortgesetzt werden soll
  • Endzeit (optional): Commit-Zeitstempel, bei dem die Verarbeitung von Änderungen beendet wird. Wenn Sie keinen Wert angeben, wird der Stream weitergelesen.
  • Heartbeat-Dauer (optional): Häufigkeit von Heartbeat-Nachrichten bei Es werden keine Änderungen vorgenommen (standardmäßig 5 Sekunden)

Format des Streameintrags ändern

Ein zurückgegebener Änderungsstream-Eintrag ist einer von drei Antworttypen:

  • ChangeStreamMutation: Eine Nachricht, die einen Datenänderungsdatensatz darstellt.

  • CloseStream: eine Nachricht, die angibt, dass der Client den Lesevorgang beenden soll aus dem Stream.

    • Status: Gibt den Grund für das Schließen des Streams an. Eine der Folgenden:
      • OK: Das Ende der angegebenen Partition wurde erreicht.
      • OUT_OF_RANGE: Die angegebene Partition ist nicht mehr vorhanden. Das bedeutet, dass Splits oder Merges bei dieser Partition stattgefunden haben. Für jede neue Partition muss eine neue ReadChangeStream-Anfrage erstellt werden.
    • NewPartitions: Enthält die aktualisierten Partitionierungsinformationen zu OUT_OF_RANGE-Antworten.
    • ChangeStreamContinuationTokens – Liste der Tokens, die zum Fortsetzen verwendet werden neue ReadChangeStream-Anfragen von derselben Position Eine pro NewPartition.
  • Heartbeat – Eine regelmäßige Nachricht mit Informationen, die zum Prüfen des Status des Streams verwendet werden können.

    • EstimatedLowWatermark – Schätzung des niedrigen Wasserzeichens für die angegebene Partition
    • ContinuationToken – Token, um das Streaming der angegebenen Partition an der aktuellen Position fortzusetzen.

Inhalt von Datenänderungseinträgen

Informationen zu Änderungsstream-Datensätzen finden Sie unter Inhalt eines Datenänderungsdatensatzes.

Änderungen an Partitionen verarbeiten

Wenn sich die Partitionen einer Tabelle ändern, geben ReadChangeStream-Anfragen eine CloseStream-Nachricht mit den Informationen zurück, die zum Fortsetzen des Streamings von den neuen Partitionen erforderlich sind.

Bei einem Split enthält dieser mehrere neue Partitionen und eine entsprechende ContinuationToken für jede Partition. Wenn du das Streaming der neuen Partitionen an derselben Position fortsetzen möchtest, musst du für jede neue Partition mit dem entsprechenden Token eine neue ReadChangeStream-Anfrage stellen.

Wenn Sie beispielsweise die Partition [A,C) streamen und diese in zwei Teile [A,B) und [B,C) haben, können Sie die folgende Reihenfolge Events:

ReadChangeStream(streamPartition = ByteStringRange(A, C)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, B), ByteStringRange(B, C))
    ChangeStreamContinuationTokens = List(foo, bar)
)

Um das Streaming jeder Partition von derselben Position aus fortzusetzen, senden Sie den Befehl folgende ReadChangeStreamQuery-Anfragen:

ReadChangeStreamQuery queryAB =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(A, B))
        .continuationTokens(List.of(foo));

ReadChangeStreamQuery queryBC =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(B, C))
        .continuationTokens(List.of(bar));

Wenn Sie nach einer Zusammenführung von derselben Partition fortfahren möchten, müssen Sie eine neue ReadChangeStream-Anfrage mit jedem Token aus den zusammengeführten Partitionen senden.

Wenn Sie beispielsweise die zwei Partitionen [A,B) und [B,C) streamen, in Partition [A,C) zusammengeführt werden, können Sie mit der folgenden Ereignissequenz rechnen:

ReadChangeStream(streamPartition = ByteStringRange(A, B)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, C)),
    ChangeStreamContinuationTokens = List(foo)
)

ReadChangeStream(streamPartition = ByteStringRange(B, C)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, C)),
    ChangeStreamContinuationTokens = List(bar)
)

Um das Streaming der Partition [A, C) an derselben Position fortzusetzen, senden Sie eine ReadChangeStreamQuery wie folgt:

ReadChangeStreamQuery query =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(A, C))
        .continuationTokens(List.of(foo, bar));

Nächste Schritte