使用 Java 读取变更数据流
Java 版 Cloud Bigtable 客户端库提供处理数据更改记录的初级方法。不过,大多数情况下 我们建议您使用 Dataflow,而不是使用 因为 Dataflow 负责处理 分区拆分 并进行合并
准备工作
在阅读使用 Java 的变更流之前,请确保您熟悉 变更数据流概览。然后再完成 但必须满足以下前提条件
设置身份验证
如需在本地开发环境中使用本页面上的 Java 示例,请安装并初始化 gcloud CLI,然后使用您的用户凭据设置应用默认凭据。
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
If you're using a local shell, then create local authentication credentials for your user account:
gcloud auth application-default login
You don't need to do this if you're using Cloud Shell.
如需了解详情,请参阅 Set up authentication for a local development environment。
如需了解如何为生产环境设置身份验证,请参阅 Set up Application Default Credentials for code running on Google Cloud。
启用变更数据流
您必须先在表上启用变更数据流,然后才能读取数据流。也可以创建新的 表格 并启用变更数据流
所需的角色
如需获得读取 Bigtable 变更数据流所需的权限,请让您的管理员授予您以下 IAM 角色。
- 针对 Bigtable 实例(包含您计划从中流式传输更改的表)的 Bigtable Administrator (
roles/bigtable.admin
)
将 Java 客户端库添加为依赖项
将类似于以下示例的代码添加到您的 Maven pom.xml
文件中。将 VERSION
替换为您正在使用的客户端库的版本。版本必须为 2.21.0 或更高版本。
<dependencies>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bigtable</artifactId>
<version>VERSION</version>
</dependency>
</dependencies>
确定表的分区
如需开始发出 ReadChangeStream
请求,您需要知道表的分区。您可以使用 GenerateInitialChangeStreamPartitions
方法确定。以下示例展示了如何使用此方法获取表示表中每个分区的 ByteStringRanges
流。每个 ByteStringRange
都包含分区的开始和结束键。
ServerStream<ByteStringRange> partitionStream =
client.generateInitialChangeStreamPartitions("MyTable");
处理每个分区的更改
然后,您可以使用 ReadChangeStream
方法处理每个分区的更改。以下示例演示了如何从当前时间开始打开分区的数据流。
ReadChangeStreamQuery query =
ReadChangeStreamQuery.create("MyTable")
.streamPartition(partition)
.startTime(Instant.now());
ServerStream<ChangeStreamRecord> changeStream = client.readChangeStream(query);
ReadChangeStreamQuery
接受以下参数:
- 数据流分区(必需)- 从中流式传输更改的分区
- 下列其中一项:
- 开始时间 - 提交开始处理更改的时间戳
- 连续令牌 - 表示要继续流式传输的位置的令牌
- 结束时间(可选)- 提交在到达时停止处理更改的时间戳。如果您不提供此值,数据流就会继续读取。
- 检测信号时长(可选)- 没有新更改时的检测信号消息的频率(默认为 5 秒)
变更数据流记录格式
返回的变更数据流记录是以下三种响应类型之一:
ChangeStreamMutation
- 表示数据更改记录的消息。CloseStream
- 表示客户端应停止从数据流读取的消息。- 状态 - 表示关闭数据流的原因。以下状态之一:
OK
- 已达到指定分区的结束时间OUT_OF_RANGE
- 指定分区不复存在,表示此分区上发生了拆分或合并。需要为每个新分区创建一个新的ReadChangeStream
请求。
NewPartitions
- 提供有关OUT_OF_RANGE
响应的分区信息更新。ChangeStreamContinuationTokens
- 用于从同一位置恢复新ReadChangeStream
请求的令牌列表。每个NewPartition
一个。
- 状态 - 表示关闭数据流的原因。以下状态之一:
Heartbeat
- 包含可用于检查数据流状态的检查点的定期消息。EstimatedLowWatermark
- 指定分区的低水印估计值ContinuationToken
- 用于从当前位置继续流式传输指定分区的令牌。
数据更改记录内容
如需了解变更数据流记录,请参阅数据更改中包含的内容 记录。
处理分区中的更改
当表分区发生更改时,ReadChangeStream
请求会返回一条 CloseStream
消息,其中包含从新分区继续流式传输所需的信息。
发生拆分时,将包含多个新分区以及每个分区的相应 ContinuationToken
。如需继续从同一位置流式传输新分区,请使用相应令牌对每个新分区发出新的 ReadChangeStream
请求。
例如,如果您要流式传输分区 [A,C)
并将其拆分为 [A,B)
和 [B,C)
两个分区,则会出现以下事件序列:
ReadChangeStream(streamPartition = ByteStringRange(A, C)) receives:
CloseStream(
Status = OUT_OF_RANGE,
NewPartitions = List(ByteStringRange(A, B), ByteStringRange(B, C))
ChangeStreamContinuationTokens = List(foo, bar)
)
若要继续从同一位置流式传输每个分区,请发送以下 ReadChangeStreamQuery
请求:
ReadChangeStreamQuery queryAB =
ReadChangeStreamQuery.create("myTable")
.streamPartition(ByteStringRange(A, B))
.continuationTokens(List.of(foo));
ReadChangeStreamQuery queryBC =
ReadChangeStreamQuery.create("myTable")
.streamPartition(ByteStringRange(B, C))
.continuationTokens(List.of(bar));
发生合并时,如需从同一分区继续,您需要发送包含合并分区中每个令牌的新 ReadChangeStream
请求。
例如,如果您要流式传输 [A,B)
和 [B,C)
两个分区,并且它们合并到分区 [A,C)
中,则会出现以下事件序列:
ReadChangeStream(streamPartition = ByteStringRange(A, B)) receives:
CloseStream(
Status = OUT_OF_RANGE,
NewPartitions = List(ByteStringRange(A, C)),
ChangeStreamContinuationTokens = List(foo)
)
ReadChangeStream(streamPartition = ByteStringRange(B, C)) receives:
CloseStream(
Status = OUT_OF_RANGE,
NewPartitions = List(ByteStringRange(A, C)),
ChangeStreamContinuationTokens = List(bar)
)
如需从同一位置继续流式传输分区 [A, C)
,请按如下所示发送 ReadChangeStreamQuery
:
ReadChangeStreamQuery query =
ReadChangeStreamQuery.create("myTable")
.streamPartition(ByteStringRange(A, C))
.continuationTokens(List.of(foo, bar));