Änderungsstream mit Java lesen

Die Cloud Bigtable-Clientbibliothek für Java bietet Low-Level-Methoden zum Verarbeiten von Datenänderungseinträgen. In den meisten Fällen empfehlen wir jedoch, Änderungen mit Dataflow zu streamen, anstatt die auf dieser Seite beschriebenen Methoden zu verwenden, da Dataflow Partitionsaufteilungen und Zusammenführungen für Sie übernimmt.

Hinweise

Bevor Sie einen Änderungsstream mit Java lesen, sollten Sie sich mit der Übersicht zu Änderungsstreams vertraut machen. Erfülle anschließend die folgenden Voraussetzungen.

Authentifizierung einrichten

Wenn Sie die Java Beispiele auf dieser Seite in einer lokalen Entwicklungsumgebung verwenden möchten, installieren und initialisieren Sie die gcloud CLI und richten dann die Standardanmeldedaten für Anwendungen mit Ihren Nutzeranmeldedaten ein.

  1. Installieren Sie die Google Cloud CLI.
  2. Führen Sie folgenden Befehl aus, um die gcloud CLI zu initialisieren:

    gcloud init
  3. Erstellen Sie lokale Anmeldedaten zur Authentifizierung für Ihr Google-Konto:

    gcloud auth application-default login

Weitere Informationen unter Set up authentication for a local development environment.

Informationen zum Einrichten der Authentifizierung für eine Produktionsumgebung finden Sie unter Set up Application Default Credentials for code running on Google Cloud.

Änderungsstream aktivieren

Sie müssen einen Änderungsstream für eine Tabelle aktivieren, bevor Sie sie lesen können. Sie können auch eine neue Tabelle mit einem aktivierten Änderungsstream erstellen.

Erforderliche Rollen

Bitten Sie Ihren Administrator, Ihnen die folgende IAM-Rolle zu gewähren, um die Berechtigungen zu erhalten, die Sie zum Lesen eines Bigtable-Änderungsstreams benötigen.

  • Bigtable-Administrator (roles/bigtable.admin) für die Bigtable-Instanz, die die Tabelle enthält, aus der Sie Änderungen streamen möchten

Java-Clientbibliothek als Abhängigkeit hinzufügen

Fügen Sie Ihrer Maven-Datei pom.xml einen Code wie den folgenden hinzu. Ersetzen Sie VERSION durch die Version der Clientbibliothek, die Sie verwenden. Die Version muss 2.21.0 oder höher sein.

<dependencies>
  <dependency>
    <groupId>com.google.cloud</groupId>
    <artifactId>google-cloud-bigtable</artifactId>
    <version>VERSION</version>
  </dependency>
</dependencies>

Die Partitionen der Tabelle bestimmen

Um ReadChangeStream-Anfragen senden zu können, müssen Sie die Partitionen Ihrer Tabelle kennen. Dies kann mit der Methode GenerateInitialChangeStreamPartitions ermittelt werden. Das folgende Beispiel zeigt, wie Sie mit dieser Methode einen Stream von ByteStringRanges abrufen, der jede Partition in der Tabelle darstellt. Jede ByteStringRange enthält den Start- und Endschlüssel für eine Partition.

ServerStream<ByteStringRange> partitionStream =
    client.generateInitialChangeStreamPartitions("MyTable");

Änderungen für jede Partition verarbeiten

Mit der Methode ReadChangeStream können Sie dann Änderungen für jede Partition verarbeiten. Dies ist ein Beispiel dafür, wie ein Stream für eine Partition ab dem aktuellen Zeitpunkt geöffnet wird.

ReadChangeStreamQuery query =
    ReadChangeStreamQuery.create("MyTable")
        .streamPartition(partition)
        .startTime(Instant.now());
ServerStream<ChangeStreamRecord> changeStream = client.readChangeStream(query);

ReadChangeStreamQuery akzeptiert die folgenden Argumente:

  • Streampartition (erforderlich): die Partition, aus der Änderungen gestreamt werden sollen
  • Eine der folgenden:
    • Beginn: Commit-Zeitstempel, um mit der Verarbeitung von Änderungen zu beginnen
    • Fortsetzungs-Tokens – Tokens, die eine Position darstellen, von der das Streaming fortgesetzt werden soll.
  • Endzeit (optional): Legen Sie einen Commit-Zeitstempel fest, um die Verarbeitung von Änderungen zu stoppen, wenn das Ende erreicht ist. Wenn Sie keinen Wert angeben, wird der Stream weitergelesen.
  • Heartbeat-Dauer (optional): Häufigkeit von Heartbeat-Nachrichten, wenn keine neuen Änderungen vorgenommen wurden (standardmäßig fünf Sekunden)

Format des Streameintrags ändern

Ein zurückgegebener Änderungsstreameintrag ist einer von drei Antworttypen:

  • ChangeStreamMutation: eine Nachricht, die einen Datensatz für Datenänderung darstellt.

  • CloseStream: Eine Meldung, die darauf hinweist, dass der Client nicht mehr aus dem Stream lesen soll.

    • Status: Gibt den Grund für das Schließen des Streams an. Eine der Folgenden:
      • OK: Das Ende der angegebenen Partition wurde erreicht.
      • OUT_OF_RANGE: Die angegebene Partition existiert nicht mehr. Das bedeutet, dass für diese Partition Splits oder Zusammenführungen durchgeführt wurden. Für jede neue Partition muss eine neue ReadChangeStream-Anfrage erstellt werden.
    • NewPartitions: Gibt die aktualisierten Partitionierungsinformationen für OUT_OF_RANGE-Antworten an.
    • ChangeStreamContinuationTokens: Liste der Tokens, die zum Fortsetzen neuer ReadChangeStream-Anfragen an derselben Position verwendet werden. Eine pro NewPartition.
  • Heartbeat: Eine regelmäßige Nachricht mit Informationen, mit denen der Status des Streams geprüft werden kann.

    • EstimatedLowWatermark: Schätzung des niedrigen Wasserzeichens für die angegebene Partition
    • ContinuationToken: Token, mit dem das Streaming der angegebenen Partition ab der aktuellen Position fortgesetzt werden soll.

Inhalt des Datenänderungseintrags

Informationen zu Änderungsstreameinträgen finden Sie unter Inhalt eines Änderungseintrags.

Änderungen in Partitionen verarbeiten

Wenn sich die Partitionen einer Tabelle ändern, geben ReadChangeStream-Anfragen eine CloseStream-Nachricht mit den erforderlichen Informationen zurück, um das Streaming aus den neuen Partitionen fortzusetzen.

Bei einem Split enthält dieser mehrere neue Partitionen und eine entsprechende ContinuationToken für jede Partition. Um das Streaming der neuen Partitionen ab derselben Position fortzusetzen, stellen Sie für jede neue Partition eine neue ReadChangeStream-Anfrage mit dem entsprechenden Token.

Wenn Sie beispielsweise die Partition [A,C) streamen und diese in die beiden Partitionen [A,B) und [B,C) aufgeteilt wird, können Sie mit der folgenden Ereignissequenz rechnen:

ReadChangeStream(streamPartition = ByteStringRange(A, C)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, B), ByteStringRange(B, C))
    ChangeStreamContinuationTokens = List(foo, bar)
)

Senden Sie die folgenden ReadChangeStreamQuery-Anfragen, um das Streaming jeder Partition ab derselben Position fortzusetzen:

ReadChangeStreamQuery queryAB =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(A, B))
        .continuationTokens(List.of(foo));

ReadChangeStreamQuery queryBC =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(B, C))
        .continuationTokens(List.of(bar));

Damit eine Zusammenführung von derselben Partition fortgesetzt werden kann, müssen Sie eine neue ReadChangeStream-Anfrage senden, die alle Tokens aus den zusammengeführten Partitionen enthält.

Wenn Sie beispielsweise die zwei Partitionen [A,B) und [B,C) streamen und in die Partition [A,C) zusammengeführt werden, können Sie die folgende Ereignisabfolge erwarten:

ReadChangeStream(streamPartition = ByteStringRange(A, B)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, C)),
    ChangeStreamContinuationTokens = List(foo)
)

ReadChangeStream(streamPartition = ByteStringRange(B, C)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, C)),
    ChangeStreamContinuationTokens = List(bar)
)

Um das Streaming der Partition [A, C) an derselben Position fortzusetzen, senden Sie eine ReadChangeStreamQuery wie die folgende:

ReadChangeStreamQuery query =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(A, C))
        .continuationTokens(List.of(foo, bar));

Nächste Schritte