使用 Java 读取变更数据流

Java 版 Cloud Bigtable 客户端库提供了用于处理数据更改记录的低级别方法。但在大多数情况下,我们建议您使用 Dataflow 流式传输更改,而不是使用本页介绍的方法,因为 Dataflow 会为您处理分区拆分和合并。

准备工作

在使用 Java 阅读变更数据流之前,请确保您已熟悉变更数据流概览。然后满足以下前提条件。

设置身份验证

如需从本地开发环境使用本页面上的 Java 示例,请安装并初始化 gcloud CLI,然后使用用户凭据设置应用默认凭据。

  1. 安装 Google Cloud CLI。
  2. 如需初始化 gcloud CLI,请运行以下命令:

    gcloud init
  3. 为您的 Google 账号创建本地身份验证凭据:

    gcloud auth application-default login

如需了解详情,请参阅 为本地开发环境设置身份验证

如需了解如何为生产环境设置身份验证,请参阅 为在 Google Cloud 上运行的代码设置应用默认凭据

启用变更数据流

您必须先对表启用更改流,然后才能读取该表。您还可以创建新表并启用变更数据流。

所需的角色

如需获得读取 Bigtable 变更数据流所需的权限,请让您的管理员授予您以下 IAM 角色。

  • 针对 Bigtable 实例(包含您计划从中流式传输更改的表)的 Bigtable Administrator (roles/bigtable.admin)

将 Java 客户端库添加为依赖项

将类似于以下示例的代码添加到您的 Maven pom.xml 文件中。将 VERSION 替换为您正在使用的客户端库的版本。版本必须为 2.21.0 或更高版本。

<dependencies>
  <dependency>
    <groupId>com.google.cloud</groupId>
    <artifactId>google-cloud-bigtable</artifactId>
    <version>VERSION</version>
  </dependency>
</dependencies>

确定表的分区

如需开始发出 ReadChangeStream 请求,您需要知道表的分区。您可以使用 GenerateInitialChangeStreamPartitions 方法确定。以下示例展示了如何使用此方法来获取表示表中的每个分区的 ByteStringRanges 流。每个 ByteStringRange 都包含分区的开始和结束键。

ServerStream<ByteStringRange> partitionStream =
    client.generateInitialChangeStreamPartitions("MyTable");

处理每个分区的更改

然后,您可以使用 ReadChangeStream 方法处理每个分区的更改。以下示例演示了如何从当前时间开始打开分区的数据流。

ReadChangeStreamQuery query =
    ReadChangeStreamQuery.create("MyTable")
        .streamPartition(partition)
        .startTime(Instant.now());
ServerStream<ChangeStreamRecord> changeStream = client.readChangeStream(query);

ReadChangeStreamQuery 接受以下参数:

  • 数据流分区(必需)- 从中流式传输更改的分区
  • 下列其中一项:
    • 开始时间 - 提交开始处理更改的时间戳
    • 连续令牌 - 表示要继续流式传输的位置的令牌
  • 结束时间(可选)- 提交在到达时停止处理更改的时间戳。如果您不提供此值,数据流就会继续读取。
  • 检测信号时长(可选)- 没有新更改时的检测信号消息的频率(默认为 5 秒)

变更数据流记录格式

返回的变更数据流记录是以下三种响应类型之一:

  • ChangeStreamMutation - 表示数据更改记录的消息。

  • CloseStream - 表示客户端应停止从数据流读取的消息。

    • 状态 - 表示关闭数据流的原因。以下状态之一:
      • OK - 已达到指定分区的结束时间
      • OUT_OF_RANGE - 指定分区不复存在,表示此分区上发生了拆分或合并。需要为每个新分区创建一个新的 ReadChangeStream 请求。
    • NewPartitions - 提供有关 OUT_OF_RANGE 响应的分区信息更新。
    • ChangeStreamContinuationTokens - 用于从同一位置恢复新 ReadChangeStream 请求的令牌列表。每个 NewPartition 一个。
  • Heartbeat - 包含可用于检查数据流状态的检查点的定期消息。

    • EstimatedLowWatermark - 指定分区的低水印估计值
    • ContinuationToken - 用于从当前位置继续流式传输指定分区的令牌。

数据更改记录内容

如需了解变更数据流记录,请参阅数据变更记录中包含的内容

处理分区中的更改

当表分区发生更改时,ReadChangeStream 请求会返回一条 CloseStream 消息,其中包含从新分区继续流式传输所需的信息。

发生拆分时,将包含多个新分区以及每个分区的相应 ContinuationToken。如需继续从同一位置流式传输新分区,请使用相应令牌对每个新分区发出新的 ReadChangeStream 请求。

例如,如果您要流式传输分区 [A,C) 并将其拆分为 [A,B)[B,C) 两个分区,则会出现以下事件序列:

ReadChangeStream(streamPartition = ByteStringRange(A, C)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, B), ByteStringRange(B, C))
    ChangeStreamContinuationTokens = List(foo, bar)
)

若要继续从同一位置流式传输每个分区,请发送以下 ReadChangeStreamQuery 请求:

ReadChangeStreamQuery queryAB =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(A, B))
        .continuationTokens(List.of(foo));

ReadChangeStreamQuery queryBC =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(B, C))
        .continuationTokens(List.of(bar));

发生合并时,如需从同一分区继续,您需要发送包含合并分区中每个令牌的新 ReadChangeStream 请求。

例如,如果您要流式传输 [A,B)[B,C) 两个分区,并且它们合并到分区 [A,C) 中,则会出现以下事件序列:

ReadChangeStream(streamPartition = ByteStringRange(A, B)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, C)),
    ChangeStreamContinuationTokens = List(foo)
)

ReadChangeStream(streamPartition = ByteStringRange(B, C)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, C)),
    ChangeStreamContinuationTokens = List(bar)
)

如需从同一位置继续流式传输分区 [A, C),请按如下所示发送 ReadChangeStreamQuery

ReadChangeStreamQuery query =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(A, C))
        .continuationTokens(List.of(foo, bar));

后续步骤