Read a change stream with Java

The Cloud Bigtable client library for Java provides low-level methods for processing data change records. However, in most cases, we recommend that you stream changes with Dataflow instead of using the methods described on this page, because Dataflow handles partition splits and merges for you.

Before you begin

Before you read a change stream with Java, make sure you are familiar with the Change streams overview. Then complete the following prerequisites.

Set up authentication

To use the Java samples on this page from a local development environment, install and initialize the gcloud CLI, and then set up Application Default Credentials with your user credentials.

  1. Install the Google Cloud CLI.
  2. To initialize the gcloud CLI, run the following command:

    gcloud init
  3. Create local authentication credentials for your Google Account:

    gcloud auth application-default login

For more information, see Set up authentication for a local development environment.

For information about setting up authentication for a production environment, see Set up Application Default Credentials for code running on Google Cloud.

Enable a change stream

You must enable a change stream on a table before you can read it. You can also create a new table with a change stream enabled.

Required roles

To get the permissions that you need to read a Bigtable change stream, ask your administrator to grant you the following IAM role.

  • Bigtable Administrator (roles/bigtable.admin) on the Bigtable instance that contains the table you plan to stream changes from

Add the Java client library as a dependency

Add code similar to the following to your Maven pom.xml file. Replace VERSION with the version of the client library that you are using. The version must be 2.21.0 or later.

<dependencies>
  <dependency>
    <groupId>com.google.cloud</groupId>
    <artifactId>google-cloud-bigtable</artifactId>
    <version>VERSION</version>
  </dependency>
</dependencies>

Determine the table's partitions

To start making ReadChangeStream requests, you need to know the partitions of your table. This can be determined using the GenerateInitialChangeStreamPartitions method. The following example shows how to use this method to get a stream of ByteStringRanges representing each partition in the table. Each ByteStringRange contains the start and end key for a partition.

ServerStream<ByteStringRange> partitionStream =
    client.generateInitialChangeStreamPartitions("MyTable");

Process changes for each partition

You can then process changes for each partition using the ReadChangeStream method. This is an example of how to open a stream for a partition, starting from the current time.

ReadChangeStreamQuery query =
    ReadChangeStreamQuery.create("MyTable")
        .streamPartition(partition)
        .startTime(Instant.now());
ServerStream<ChangeStreamRecord> changeStream = client.readChangeStream(query);

ReadChangeStreamQuery accepts the following arguments:

  • Stream partition (Required) - the partition to stream changes from
  • One of the following:
    • Start time - Commit timestamp to start processing changes from
    • Continuation tokens - Tokens representing a position to resume streaming from
  • End Time (Optional) - Commit timestamp to stop processing changes when reached. If you don't provide a value, the stream continues reading.
  • Heartbeat duration (Optional) - Frequency of heartbeat messages when there are no new changes (defaults to five seconds)

Change stream record format

A returned change stream record is one of three response types:

  • ChangeStreamMutation - A message representing a data change record.

  • CloseStream - A message indicating that the client should stop reading from the stream.

    • Status - Indicates the reason for closing the stream. One of:
      • OK - end time has been reached for the given partition
      • OUT_OF_RANGE - the given partition no longer exists, meaning splits or merges have happened on this partition. A new ReadChangeStream request will need to be created for each new partition.
    • NewPartitions - Gives the updated partitioning information on OUT_OF_RANGE responses.
    • ChangeStreamContinuationTokens - List of tokens used to resume new ReadChangeStream requests from the same position. One per NewPartition.
  • Heartbeat - A periodic message with information that can be used to checkpoint the state of the stream.

    • EstimatedLowWatermark - Estimation of the low watermark for the given partition
    • ContinuationToken - Token to resume streaming the given partition from the current position.

Data change record contents

Each data change record contains the following:

  • Entries - changes made to the row, including one or more of the following:
    • Write
      • Column family
      • Column qualifier
      • Timestamp
      • Value
    • Deletion of cells
      • Column family
      • Column qualifier
      • Timestamp range
    • Deletion of a column family
      • Column family
      • Deletion from a row - Deletion from a row is converted to a list of deletions from column families for each column family that the row has data in.
  • Row key - the identifier for the changed row
  • Change type - either user-initiated or garbage collection
  • ID of the cluster that received the change
  • Commit timestamp - server-side time when the change was committed to the table
  • Tie breaker - a value that lets the application that is reading the stream use Bigtable's built-in conflict resolution policy
  • Token - used by the consuming application to resume the stream if it's interrupted
  • Estimated low watermark - the estimated time since the record's partition caught up with replication across all clusters. For details, see Partitions and Watermarks.

For additional details about the fields in a data change record, see the API reference for ReadChangeStream.

Handle changes in partitions

When the partitions of a table changes, ReadChangeStream requests return a CloseStream message with the information required to resume streaming from the new partition(s).

For a split, this will contain multiple new partitions and a corresponding ContinuationToken for each partition. To resume streaming the new partitions from the same position, you make a new ReadChangeStream request for each new partition with its corresponding token.

For example, if you are streaming partition [A,C) and it splits into two partitions, [A,B) and [B,C), you can expect the following sequence of events:

ReadChangeStream(streamPartition = ByteStringRange(A, C)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, B), ByteStringRange(B, C))
    ChangeStreamContinuationTokens = List(foo, bar)
)

To resume streaming each partition from the same position you send the following ReadChangeStreamQuery requests:

ReadChangeStreamQuery queryAB =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(A, B))
        .continuationTokens(List.of(foo));

ReadChangeStreamQuery queryBC =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(B, C))
        .continuationTokens(List.of(bar));

For a merge, to resume from the same partition, you need to send a new ReadChangeStream request containing each token from the merged partitions.

For example, if you are streaming two partitions, [A,B) and [B,C), and they merge into partition [A,C), you can expect the following sequence of events:

ReadChangeStream(streamPartition = ByteStringRange(A, B)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, C)),
    ChangeStreamContinuationTokens = List(foo)
)

ReadChangeStream(streamPartition = ByteStringRange(B, C)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, C)),
    ChangeStreamContinuationTokens = List(bar)
)

To resume streaming partition [A, C) from the same position, you send a ReadChangeStreamQuery like the following:

ReadChangeStreamQuery query =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(A, C))
        .continuationTokens(List.of(foo, bar));

What's next