Änderungsstream mit Java lesen

Die Cloud Bigtable-Clientbibliothek für Java bietet Methoden auf niedriger Ebene zum Verarbeiten von Datensatzänderungen. In den meisten Fällen empfehlen wir jedoch, Änderungen mit Dataflow zu streamen, anstatt die auf dieser Seite beschriebenen Methoden zu verwenden, da Dataflow Partition-Splits und ‑Merges für Sie übernimmt.

Hinweise

Bevor Sie einen Änderungsstream mit Java lesen, sollten Sie sich mit der Übersicht über Änderungsstreams vertraut machen. Erfüllen Sie dann die folgenden Voraussetzungen.

Authentifizierung einrichten

Wenn Sie die Java -Beispiele auf dieser Seite in einer lokalen Entwicklungsumgebung verwenden möchten, installieren und initialisieren Sie die gcloud CLI und richten Sie dann die Standardanmeldedaten für Anwendungen mit Ihren Nutzeranmeldedaten ein.

    Installieren Sie die Google Cloud CLI.

    Wenn Sie einen externen Identitätsanbieter (IdP) verwenden, müssen Sie sich zuerst mit Ihrer föderierten Identität in der gcloud CLI anmelden.

    If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

Weitere Informationen finden Sie unter Set up authentication for a local development environment.

Informationen zum Einrichten der Authentifizierung für eine Produktionsumgebung finden Sie unter Set up Application Default Credentials for code running on Google Cloud.

Änderungsstream aktivieren

Sie müssen einen Änderungsstream für eine Tabelle aktivieren, bevor Sie sie lesen können. Sie können auch eine neue Tabelle mit aktiviertem Änderungsstream erstellen.

Erforderliche Rollen

Bitten Sie Ihren Administrator, Ihnen die folgende IAM-Rolle zuzuweisen, um die Berechtigungen zu erhalten, die Sie zum Lesen eines Bigtable-Änderungsstreams benötigen.

  • Bigtable-Administrator (roles/bigtable.admin) für die Bigtable-Instanz, die die Tabelle enthält, aus der Sie Änderungen streamen möchten

Java-Clientbibliothek als Abhängigkeit hinzufügen

Fügen Sie Ihrer Maven-Datei pom.xml Code hinzu, der dem folgenden ähnelt. Ersetzen Sie VERSION durch die Version der Clientbibliothek, die Sie verwenden. Die Version muss 2.21.0 oder höher sein.

<dependencies>
  <dependency>
    <groupId>com.google.cloud</groupId>
    <artifactId>google-cloud-bigtable</artifactId>
    <version>VERSION</version>
  </dependency>
</dependencies>

Partitionen der Tabelle ermitteln

Um ReadChangeStream-Anfragen zu stellen, müssen Sie die Partitionen Ihrer Tabelle kennen. Dies kann mit der Methode GenerateInitialChangeStreamPartitions ermittelt werden. Im folgenden Beispiel wird gezeigt, wie Sie mit dieser Methode einen Stream von ByteStringRanges abrufen, der jede Partition in der Tabelle darstellt. Jede ByteStringRange enthält den Start- und Endschlüssel für eine Partition.

ServerStream<ByteStringRange> partitionStream =
    client.generateInitialChangeStreamPartitions("MyTable");

Änderungen für jede Partition verarbeiten

Anschließend können Sie Änderungen für jede Partition mit der Methode ReadChangeStream verarbeiten. Hier sehen Sie ein Beispiel dafür, wie Sie einen Stream für eine Partition ab der aktuellen Zeit öffnen.

ReadChangeStreamQuery query =
    ReadChangeStreamQuery.create("MyTable")
        .streamPartition(partition)
        .startTime(Instant.now());
ServerStream<ChangeStreamRecord> changeStream = client.readChangeStream(query);

ReadChangeStreamQuery akzeptiert die folgenden Argumente:

  • Streampartition (erforderlich): Die Partition, aus der Änderungen gestreamt werden
  • Eine der folgenden:
    • Startzeit – Commit-Zeitstempel, ab dem Änderungen verarbeitet werden sollen
    • Fortsetzungs-Tokens: Tokens, die eine Position darstellen, von der aus das Streaming fortgesetzt werden kann
  • Endzeit (optional): Commit-Zeitstempel, bei dem die Verarbeitung von Änderungen beendet werden soll. Wenn Sie keinen Wert angeben, wird der Stream weiter gelesen.
  • Dauer des Herzschlags (optional): Häufigkeit von Herzschlag-Nachrichten, wenn keine neuen Änderungen vorliegen (Standardwert: 5 Sekunden)

Format von Streamaufzeichnungen ändern

Ein zurückgegebener Änderungsstream-Datensatz ist einer von drei Antworttypen:

  • ChangeStreamMutation: Eine Nachricht, die einen Datenänderungsdatensatz darstellt.

  • CloseStream: Eine Nachricht, die angibt, dass der Client das Lesen aus dem Stream beenden soll.

    • Status: Gibt den Grund für das Schließen des Streams an. Eine der Folgenden:
      • OK – Die Endzeit für die angegebene Partition wurde erreicht.
      • OUT_OF_RANGE – Die angegebene Partition ist nicht mehr vorhanden. Das bedeutet, dass diese Partition aufgeteilt oder zusammengeführt wurde. Für jede neue Partition muss eine neue ReadChangeStream-Anfrage erstellt werden.
    • NewPartitions: Gibt die aktualisierten Partitionierungsinformationen für OUT_OF_RANGE-Antworten an.
    • ChangeStreamContinuationTokens: Liste der Tokens, die verwendet werden, um neue ReadChangeStream-Anfragen an derselben Position fortzusetzen. Eine pro NewPartition.
  • Heartbeat: Eine regelmäßige Nachricht mit Informationen, die verwendet werden können, um den Status des Streams zu überprüfen.

    • EstimatedLowWatermark: Schätzung des niedrigsten Werts für die angegebene Partition
    • ContinuationToken: Token zum Fortsetzen des Streamings der angegebenen Partition ab der aktuellen Position.

Inhalt von Datensatzänderungen

Informationen zu Änderungsstream-Datensätzen finden Sie unter Inhalt eines Datenänderungsdatensatzes.

Umgang mit Änderungen an Partitionen

Wenn sich die Partitionen einer Tabelle ändern, geben ReadChangeStream-Anfragen eine CloseStream-Meldung mit den Informationen zurück, die zum Fortsetzen des Streamings aus den neuen Partitionen erforderlich sind.

Bei einem Split enthält er mehrere neue Partitionen und für jede Partition einen entsprechenden ContinuationToken. Wenn Sie das Streaming der neuen Partitionen an derselben Position fortsetzen möchten, stellen Sie für jede neue Partition mit dem entsprechenden Token eine neue ReadChangeStream-Anfrage.

Wenn Sie beispielsweise die Partition [A,C) streamen und sie in zwei Partitionen aufgeteilt wird, [A,B) und [B,C), können Sie die folgende Ereignissequenz erwarten:

ReadChangeStream(streamPartition = ByteStringRange(A, C)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, B), ByteStringRange(B, C))
    ChangeStreamContinuationTokens = List(foo, bar)
)

Wenn Sie das Streamen jeder Partition an derselben Position fortsetzen möchten, senden Sie die folgenden ReadChangeStreamQuery-Anfragen:

ReadChangeStreamQuery queryAB =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(A, B))
        .continuationTokens(List.of(foo));

ReadChangeStreamQuery queryBC =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(B, C))
        .continuationTokens(List.of(bar));

Wenn Sie nach dem Zusammenführen von Partitionen mit derselben Partition fortfahren möchten, müssen Sie eine neue ReadChangeStream-Anfrage mit allen Tokens aus den zusammengeführten Partitionen senden.

Wenn Sie beispielsweise zwei Partitionen, [A,B) und [B,C), streamen und diese in der Partition [A,C) zusammengeführt werden, können Sie die folgende Ereignissequenz erwarten:

ReadChangeStream(streamPartition = ByteStringRange(A, B)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, C)),
    ChangeStreamContinuationTokens = List(foo)
)

ReadChangeStream(streamPartition = ByteStringRange(B, C)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, C)),
    ChangeStreamContinuationTokens = List(bar)
)

Wenn Sie die Streamingpartition [A, C) an derselben Position fortsetzen möchten, senden Sie einen ReadChangeStreamQuery wie den folgenden:

ReadChangeStreamQuery query =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(A, C))
        .continuationTokens(List.of(foo, bar));

Nächste Schritte