Änderungsstream mit Java lesen

Die Cloud Bigtable-Clientbibliothek für Java bietet Low-Level-Methoden zum Verarbeiten von Datenänderungseinträgen. In den meisten Fällen sollten Sie Änderungen mit Dataflow anstelle von die auf dieser Seite beschriebenen Methoden, da Dataflow partition-Splits automatisch zusammengeführt.

Hinweise

Bevor Sie einen Änderungsstream mit Java lesen, sollten Sie sich mit den Übersicht über Änderungsstreams Führen Sie dann folgende Schritte aus: die folgenden Voraussetzungen erfüllen.

Authentifizierung einrichten

Wenn Sie die Java Beispiele auf dieser Seite in einer lokalen Entwicklungsumgebung verwenden möchten, installieren und initialisieren Sie die gcloud CLI und richten dann die Standardanmeldedaten für Anwendungen mit Ihren Nutzeranmeldedaten ein.

  1. Install the Google Cloud CLI.
  2. To initialize the gcloud CLI, run the following command:

    gcloud init
  3. Create local authentication credentials for your user account:

    gcloud auth application-default login

Weitere Informationen unter Set up authentication for a local development environment.

Informationen zum Einrichten der Authentifizierung für eine Produktionsumgebung finden Sie unter Set up Application Default Credentials for code running on Google Cloud.

Änderungsstream aktivieren

Sie müssen eine Änderung aktivieren Stream auf einem Tisch liegen, bevor Sie sie lesen können. Sie können auch ein neues Tabelle mit aktiviertem Änderungsstream.

Erforderliche Rollen

Um die Berechtigungen zu erhalten, die Sie zum Lesen einer Bigtable-Änderung benötigen Streams, bitten Sie Ihren Administrator, Ihnen die folgende IAM-Rolle zu gewähren.

  • Bigtable-Administrator (roles/bigtable.admin) auf der Bigtable-Instanz, die die Tabelle enthält, Streamänderungen von

Java-Clientbibliothek als Abhängigkeit hinzufügen

Fügen Sie Ihrer Maven-Datei pom.xml einen Code wie den folgenden hinzu. Ersetzen VERSION durch die Version der Clientbibliothek, die Sie verwenden. verwenden. Die Version muss 2.21.0 oder höher sein.

<dependencies>
  <dependency>
    <groupId>com.google.cloud</groupId>
    <artifactId>google-cloud-bigtable</artifactId>
    <version>VERSION</version>
  </dependency>
</dependencies>

Die Partitionen der Tabelle bestimmen

Um ReadChangeStream-Anfragen senden zu können, müssen Sie die Partitionen von Tabelle. Dies wird mithilfe der Funktion GenerateInitialChangeStreamPartitions-Methode. Das folgende Beispiel zeigt, wie Sie mit dieser Methode einen Stream ByteStringRanges die die einzelnen Partitionen in der Tabelle darstellen. Jedes ByteStringRange enthält den Start- und Endschlüssel für eine Partition.

ServerStream<ByteStringRange> partitionStream =
    client.generateInitialChangeStreamPartitions("MyTable");

Änderungen für jede Partition verarbeiten

Anschließend können Sie Änderungen für jede Partition mit dem ReadChangeStream verarbeiten. . Dies ist ein Beispiel dafür, wie Sie einen Stream für eine Partition öffnen, aus der aktuellen Zeit.

ReadChangeStreamQuery query =
    ReadChangeStreamQuery.create("MyTable")
        .streamPartition(partition)
        .startTime(Instant.now());
ServerStream<ChangeStreamRecord> changeStream = client.readChangeStream(query);

ReadChangeStreamQuery akzeptiert die folgenden Argumente:

  • Streampartition (erforderlich): die Partition, aus der Änderungen gestreamt werden sollen
  • Eine der folgenden:
    • Beginn: Commit-Zeitstempel, um mit der Verarbeitung von Änderungen zu beginnen
    • Fortsetzungs-Tokens – Tokens, die eine fortzusetzende Position darstellen Streaming von
  • Endzeit (optional) - Commit-Zeitstempel, um die Verarbeitung von Änderungen zu stoppen, wenn erreicht haben. Wenn Sie keinen Wert angeben, wird der Stream weitergelesen.
  • Heartbeat-Dauer (optional): Häufigkeit von Heartbeat-Nachrichten bei Es werden keine Änderungen vorgenommen (standardmäßig 5 Sekunden)

Format des Streameintrags ändern

Ein zurückgegebener Änderungsstreameintrag ist einer von drei Antworttypen:

  • ChangeStreamMutation: eine Nachricht, die einen Datensatz für Datenänderung darstellt.

  • CloseStream: eine Nachricht, die angibt, dass der Client den Lesevorgang beenden soll aus dem Stream.

    • Status: Gibt den Grund für das Schließen des Streams an. Eine der Folgenden:
      • OK: Das Ende der angegebenen Partition wurde erreicht.
      • OUT_OF_RANGE: Die angegebene Partition ist nicht mehr vorhanden. Das bedeutet, dass Splits oder Merges bei dieser Partition stattgefunden haben. Eine neue Für jede neue Partition muss eine ReadChangeStream-Anfrage erstellt werden.
    • NewPartitions – liefert die aktualisierten Partitionierungsinformationen für OUT_OF_RANGE Antworten.
    • ChangeStreamContinuationTokens – Liste der Tokens, die zum Fortsetzen verwendet werden neue ReadChangeStream-Anfragen von derselben Position Eine pro NewPartition.
  • Heartbeat – eine periodische Nachricht mit Informationen, die für folgende Aktionen verwendet werden können: den Status des Streams überprüfen.

    • EstimatedLowWatermark: Schätzung des niedrigen Wasserzeichens für die gegebene Partition
    • ContinuationToken – Token zum Fortsetzen des Streamings der angegebenen von der aktuellen Position aus.

Inhalt des Datenänderungseintrags

Informationen zu Änderungsstreameinträgen finden Sie unter Was beinhaltet eine Datenänderung? Datensatz

Änderungen in Partitionen verarbeiten

Wenn sich die Partitionen einer Tabelle ändern, fordert ReadChangeStream Folgendes an: Eine CloseStream-Nachricht mit den Informationen zurückgeben, die zum Fortsetzen des Streamings erforderlich sind aus der/den neuen Partition(en).

Bei einem Split enthält dieser mehrere neue Partitionen und eine entsprechende ContinuationToken für jede Partition. Um das Streaming der neuen Partitionen fortzusetzen von derselben Position aus stellen, stellen Sie eine neue ReadChangeStream-Anfrage für jede neue Partition mit dem zugehörigen Token.

Wenn Sie beispielsweise die Partition [A,C) streamen und diese in zwei Teile [A,B) und [B,C) haben, können Sie die folgende Reihenfolge Events:

ReadChangeStream(streamPartition = ByteStringRange(A, C)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, B), ByteStringRange(B, C))
    ChangeStreamContinuationTokens = List(foo, bar)
)

Um das Streaming jeder Partition von derselben Position aus fortzusetzen, senden Sie den Befehl folgende ReadChangeStreamQuery-Anfragen:

ReadChangeStreamQuery queryAB =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(A, B))
        .continuationTokens(List.of(foo));

ReadChangeStreamQuery queryBC =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(B, C))
        .continuationTokens(List.of(bar));

Um bei einer Zusammenführung von derselben Partition aus fortzufahren, müssen Sie eine neue ReadChangeStream-Anfrage, die jedes Token aus den zusammengeführten Partitionen enthält.

Wenn Sie beispielsweise die beiden Partitionen [A,B) und [B,C) streamen, in Partition [A,C) zusammengeführt werden, können Sie mit der folgenden Ereignissequenz rechnen:

ReadChangeStream(streamPartition = ByteStringRange(A, B)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, C)),
    ChangeStreamContinuationTokens = List(foo)
)

ReadChangeStream(streamPartition = ByteStringRange(B, C)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, C)),
    ChangeStreamContinuationTokens = List(bar)
)

Um das Streaming der Partition [A, C) an derselben Position fortzusetzen, senden Sie eine ReadChangeStreamQuery wie folgt:

ReadChangeStreamQuery query =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(A, C))
        .continuationTokens(List.of(foo, bar));

Nächste Schritte