本页面介绍了适用于 GoogleSQL 方言数据库和 PostgreSQL 方言数据库的 Spanner 变更数据流,包括:
- 基于分块的分区模型
- 变更数据流记录的格式和内容
- 用于查询这些记录的低级语法
- 查询工作流示例
您可以使用 Spanner API 直接查询变更数据流。改为使用 Dataflow 读取变更数据流数据的应用无需直接处理此处介绍的数据模型。
如需查看有关变更数据流的更广泛的入门指南,请参阅变更数据流概览。
变更数据流分区
当变更数据流监控的表发生更改时,Spanner 会在数据库中写入相应的变更数据流记录(在与数据更改相同的事务中同步进行)。这意味着,如果相应事务成功,Spanner 也已成功捕获并持久保留了更改。在内部,Spanner 会将变更数据流记录和数据更改放置在同一位置,以便由同一服务器处理,从而最大限度地减少写入开销。
作为针对特定分块的 DML 的一部分,Spanner 会在同一事务中将写入操作附加到相应的变更数据流数据分块。得益于这种并置方法,变更数据流无需跨服务资源进行额外的协调,从而可最大限度地减少事务提交开销。
Spanner 会根据数据库负载和大小动态拆分和合并数据,并在各个服务资源间分配分块,从而实现扩缩。
为了使变更数据流写入和读取能够扩缩,Spanner 会随着数据库数据一起拆分和合并内部变更数据流存储空间,从而自动避免热点。为了支持在数据库写入扩缩时近乎实时地读取变更数据流记录,Spanner API 设计为可使用变更数据流分区并发查询变更数据流。变更数据流分区会映射到包含变更数据流记录的变更数据流数据分块。变更数据流分区会随时间动态变化,并且与 Spanner 动态拆分和合并数据库数据的方式相关。
变更数据流分区包含特定时间范围内不可变键范围的记录。任何变更数据流分区都可以拆分为一个或多个变更数据流分区,也可以与其他变更数据流分区合并。当发生这些拆分或合并事件时,系统会创建子分区,以捕获相应不可变键范围在下一个时间范围内的更改。除了数据更改记录之外,变更数据流查询还会返回子分区记录(用于向读取器通知需要查询的新变更数据流分区),以及检测信号记录(用于在最近未发生写入操作时指示处理进度)。
查询特定变更数据流分区时,系统会按提交时间戳顺序返回更改记录。每个更改记录仅返回一次。在变更数据流分区之间,无法保证变更记录排序。特定主键的更改记录在特定时间范围内仅在一个分区中返回。
由于存在父子分区沿袭,如需按提交时间戳顺序处理特定键的更改,只有在处理完所有来自父分区的记录后,才应处理从子分区返回的记录。
变更数据流读取函数和查询语法
GoogleSQL
如需查询变更数据流,请使用 ExecuteStreamingSql
API。Spanner 会随变更数据流一起自动创建一个特殊的读取函数。读取函数可提供对变更数据流记录的访问权限。读取函数命名惯例为 READ_change_stream_name
。
假设数据库中存在变更数据流 SingersNameStream
,则适用于 GoogleSQL 的查询语法如下所示:
SELECT ChangeRecord
FROM READ_SingersNameStream (
start_timestamp,
end_timestamp,
partition_token,
heartbeat_milliseconds,
read_options
)
读取函数接受以下参数:
参数名称 | 类型 | 是否必需? | 说明 |
---|---|---|---|
start_timestamp |
TIMESTAMP |
必需 | 指定应返回 commit_timestamp 大于或等于 start_timestamp 的记录。该值必须在变更数据流保留期限内,并且应小于或等于当前时间,大于或等于变更数据流的创建时间戳。 |
end_timestamp |
TIMESTAMP |
可选(默认值:NULL ) |
指定应返回 commit_timestamp 小于或等于 end_timestamp 的记录。该值必须在变更数据流保留期限内,且大于或等于 start_timestamp 。查询会在返回所有 ChangeRecords (最多到 end_timestamp )后或在您终止连接时完成。如果 end_timestamp 设置为 NULL 或未指定,查询将继续执行,直到返回所有 ChangeRecords 或您终止连接为止。 |
partition_token |
STRING |
可选(默认值:NULL ) |
指定要查询的变更数据流分区(根据子分区记录的内容)。如果值为 NULL 或未指定,则表示读取器是首次查询变更数据流,尚未获得要用于查询的任何特定分区令牌。 |
heartbeat_milliseconds |
INT64 |
必需 | 确定在相应分区中没有已提交的事务时,返回检测信号 ChangeRecord 的频率。
该值必须介于 1,000 (1 秒)和 300,000 (5 分钟)之间。 |
read_options |
ARRAY |
可选(默认值:NULL ) |
添加为将来使用而预留的读取选项。唯一允许的值是 NULL 。 |
我们建议您创建一个辅助方法来构建读取函数查询的文本并向其绑定参数,如以下示例所示。
Java
private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE = "SELECT ChangeRecord FROM READ_SingersNameStream" + "(" + " start_timestamp => @startTimestamp," + " end_timestamp => @endTimestamp," + " partition_token => @partitionToken," + " heartbeat_milliseconds => @heartbeatMillis" + ")"; // Helper method to conveniently create change stream query texts and // bind parameters. public static Statement getChangeStreamQuery( String partitionToken, Timestamp startTimestamp, Timestamp endTimestamp, long heartbeatMillis) { return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE) .bind("startTimestamp") .to(startTimestamp) .bind("endTimestamp") .to(endTimestamp) .bind("partitionToken") .to(partitionToken) .bind("heartbeatMillis") .to(heartbeatMillis) .build(); }
PostgreSQL
如需查询变更数据流,请使用 ExecuteStreamingSql
API。Spanner 会随变更数据流一起自动创建一个特殊的读取函数。读取函数可提供对变更数据流记录的访问权限。读取函数命名惯例为 spanner.read_json_change_stream_name
。
假设数据库中存在变更数据流 SingersNameStream
,则适用于 PostgreSQL 的查询语法如下所示:
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
start_timestamp,
end_timestamp,
partition_token,
heartbeat_milliseconds,
null
)
读取函数接受以下参数:
参数名称 | 类型 | 是否必需? | 说明 |
---|---|---|---|
start_timestamp |
timestamp with time zone |
必需 | 指定应返回 commit_timestamp 大于或等于 start_timestamp 的更改记录。该值必须在变更数据流保留期限内,并且应小于或等于当前时间,大于或等于变更数据流的创建时间戳。 |
end_timestamp |
timestamp with timezone |
可选(默认值:NULL ) |
指定应返回 commit_timestamp 小于或等于 end_timestamp 的更改记录。该值必须在变更数据流保留期限内,且大于或等于 start_timestamp 。查询会在返回所有更改记录(最多到 end_timestamp )后或在您终止连接后完成。如果为 NULL ,则查询会继续执行,直到返回所有更改记录或您终止连接为止。 |
partition_token |
text |
可选(默认值:NULL ) |
指定要查询的变更数据流分区(根据子分区记录的内容)。如果值为 NULL 或未指定,则表示读取器是首次查询变更数据流,尚未获得要用于查询的任何特定分区令牌。 |
heartbeat_milliseconds |
bigint |
必需 | 确定在相应分区中没有已提交的事务时,返回检测信号 ChangeRecord 的频率。该值必须介于 1,000 (1 秒)和 300,000 (5 分钟)之间。 |
null |
null |
必需 | 预留以供将来使用。 |
我们建议您创建一个辅助方法来构建读取函数的文本并向其绑定参数,如以下示例所示。
Java
private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE = "SELECT * FROM \"spanner\".\"read_json_SingersNameStream\"" + "($1, $2, $3, $4, null)"; // Helper method to conveniently create change stream query texts and // bind parameters. public static Statement getChangeStreamQuery( String partitionToken, Timestamp startTimestamp, Timestamp endTimestamp, long heartbeatMillis) { return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE) .bind("p1") .to(startTimestamp) .bind("p2") .to(endTimestamp) .bind("p3") .to(partitionToken) .bind("p4") .to(heartbeatMillis) .build(); }
变更数据流记录格式
GoogleSQL
变更数据流读取函数会返回单个类型为 ARRAY<STRUCT<...>>
的 ChangeRecord
列。在每行中,此数组始终包含单个元素。
数组元素具有以下类型:
STRUCT < data_change_record ARRAY<STRUCT<...>>, heartbeat_record ARRAY<STRUCT<...>>, child_partitions_record ARRAY<STRUCT<...>> >
此 STRUCT
中有三个字段:data_change_record
、heartbeat_record
和 child_partitions_record
,每个字段的类型均为 ARRAY<STRUCT<...>>
。在变更数据流读取函数返回的任何行中,这三个字段中只有一个包含值;其他两个字段为空或为 NULL
。这些数组字段最多包含一个元素。
以下各部分将分别介绍这三种记录类型。
PostgreSQL
变更数据流读取函数会返回单个类型为 JSON
的 ChangeRecord
列,该列具有以下结构:
{
"data_change_record" : {},
"heartbeat_record" : {},
"child_partitions_record" : {}
}
此对象中可能有三个键:data_change_record
、heartbeat_record
和 child_partitions_record
,对应的值类型为 JSON
。在变更数据流读取函数返回的任何行中,这三个键中只有一个键存在。
以下各部分将分别介绍这三种记录类型。
数据更改记录
数据更改记录包含对表进行的一组更改,这些更改具有相同的修改类型(插入、更新或删除),对于同一事务在一个变更数据流分区中以相同的提交时间戳提交。对于同一事务,可以跨多个变更数据流分区返回多个数据更改记录。
所有数据更改记录都包含 commit_timestamp
、server_transaction_id
和 record_sequence
字段,这些字段共同决定了数据流记录在变更数据流中的顺序。这三个字段足以推导出更改的排序并提供外部一致性。
请注意,如果多个事务涉及不重叠的数据,则它们可以具有相同的提交时间戳。借助 server_transaction_id
字段,您可以区分哪些更改集(可能跨变更数据流分区)是在同一事务中发出的。通过将其与 record_sequence
和 number_of_records_in_transaction
字段配对使用,您还可以缓冲和排序来自特定事务的所有记录。
数据更改记录的字段包括以下内容:
GoogleSQL
字段 | 类型 | 说明 |
---|---|---|
commit_timestamp |
TIMESTAMP |
指示提交更改的时间戳。 |
record_sequence |
STRING |
指示记录在事务中的序列号。序列号在事务中是唯一且单调递增的(但不一定是连续的)。按 record_sequence 对同一 server_transaction_id 的记录进行排序,可重建事务内的更改排序。Spanner 可能会优化此排序以获得更好的性能,并且它可能并不总是与您提供的原始排序相匹配。 |
server_transaction_id |
STRING |
提供一个全局唯一字符串,用于表示在其中提交更改的事务。该值应仅在处理变更数据流记录的上下文中使用,并且与 Spanner API 中的事务 ID 无关。 |
is_last_record_in_transaction_in_partition |
BOOL |
指示相应记录是否为某个事务在当前分区中的最后一个记录。 |
table_name |
STRING |
受更改影响的表的名称。 |
value_capture_type |
STRING |
描述捕获相应更改时的值捕获类型(在变更数据流配置中指定)。 值捕获类型可以是以下类型之一:
默认为 |
column_types |
[ { "name": "STRING", "type": { "code": "STRING" }, "is_primary_key": BOOLEAN "ordinal_position": NUMBER }, ... ] |
指示列的名称、列类型、是否为主键,以及架构中定义的列位置 (ordinal_position )。架构中表的第一列的序号位置为 1 。对于数组列,列类型可以是嵌套的。格式与 Spanner API 参考文档中介绍的类型结构相匹配。 |
mods |
[ { "keys": {"STRING" : "STRING"}, "new_values": { "STRING" : "VALUE-TYPE", [...] }, "old_values": { "STRING" : "VALUE-TYPE", [...] }, }, [...] ] |
描述所进行的更改,包括更改或跟踪的列的主键值、旧值和新值。旧值和新值的可用性及内容取决于配置的 value_capture_type 。new_values 和 old_values 字段仅包含非键列。 |
mod_type |
STRING |
描述更改的类型。INSERT 、UPDATE 或 DELETE 之一。 |
number_of_records_in_transaction |
INT64 |
指示所有变更数据流分区中属于相应事务的数据更改记录数量。 |
number_of_partitions_in_transaction |
INT64 |
指示返回相应事务的数据更改记录的分区数量。 |
transaction_tag |
STRING |
指示与相应事务相关联的事务标记。 |
is_system_transaction |
BOOL |
指示相应事务是否为系统事务。 |
PostgreSQL
字段 | 类型 | 说明 |
---|---|---|
commit_timestamp |
STRING |
指示提交更改的时间戳。 |
record_sequence |
STRING |
指示记录在事务中的序列号。序列号在事务中是唯一且单调递增的(但不一定是连续的)。按 record_sequence 对同一 server_transaction_id 的记录进行排序,可重建事务内的更改排序。 |
server_transaction_id |
STRING |
提供一个全局唯一字符串,用于表示在其中提交更改的事务。该值应仅在处理变更数据流记录的上下文中使用,并且与 Spanner API 中的事务 ID 无关 |
is_last_record_in_transaction_in_partition |
BOOLEAN |
指示相应记录是否为某个事务在当前分区中的最后一个记录。 |
table_name |
STRING |
指示受更改影响的表的名称。 |
value_capture_type |
STRING |
描述捕获相应更改时的值捕获类型(在变更数据流配置中指定)。 值捕获类型可以是以下类型之一:
默认为 |
column_types |
[ { "name": "STRING", "type": { "code": "STRING" }, "is_primary_key": BOOLEAN "ordinal_position": NUMBER }, ... ] |
指示列的名称、列类型、是否为主键,以及架构中定义的列位置 (ordinal_position )。架构中表的第一列的序号位置为 1 。对于数组列,列类型可以是嵌套的。格式与 Spanner API 参考文档中介绍的类型结构相匹配。 |
mods |
[ { "keys": {"STRING" : "STRING"}, "new_values": { "STRING" : "VALUE-TYPE", [...] }, "old_values": { "STRING" : "VALUE-TYPE", [...] }, }, [...] ] |
描述所进行的更改,包括更改或跟踪的列的主键值、旧值和新值。旧值和新值的可用性及内容取决于配置的 value_capture_type 。new_values 和 old_values 字段仅包含非键列。 |
mod_type |
STRING |
描述更改的类型。INSERT 、UPDATE 或 DELETE 之一。 |
number_of_records_in_transaction |
INT64 |
指示所有变更数据流分区中属于相应事务的数据更改记录数量。 |
number_of_partitions_in_transaction |
NUMBER |
指示返回相应事务的数据更改记录的分区数量。 |
transaction_tag |
STRING |
指示与相应事务相关联的事务标记。 |
is_system_transaction |
BOOLEAN |
指示相应事务是否为系统事务。 |
示例数据更改记录
下面是一对示例数据更改记录。它们描述在两个账号之间进行转移的单个事务。这两个账号位于单独的变更数据流分区中。
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 1000
},
"old_values": {
"LastUpdate": "2022-09-26T11:28:00.189413Z",
"Balance": 1500
},
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "OLD_AND_NEW_VALUES",
"number_of_records_in_transaction": 2,
"number_of_partitions_in_transaction": 2,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false,
}
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
"record_sequence": "00000001",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id2"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 2000
},
"old_values": {
"LastUpdate": "2022-01-20T11:25:00.199915Z",
"Balance": 1500
},
},
...
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "OLD_AND_NEW_VALUES",
"number_of_records_in_transaction": 2,
"number_of_partitions_in_transaction": 2,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false,
}
以下数据更改记录是值捕获类型为 NEW_VALUES
的记录示例。请注意,系统只会填充新值。只有 LastUpdate
列进行了修改,因此系统仅返回该列。
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z"
},
"old_values": {}
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "NEW_VALUES",
"number_of_records_in_transaction": 1,
"number_of_partitions_in_transaction": 1,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false
}
以下数据更改记录是值捕获类型为 NEW_ROW
的记录示例。只有 LastUpdate
列进行了修改,但系统会返回所有跟踪的列。
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 1000
},
"old_values": {}
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "NEW_ROW",
"number_of_records_in_transaction": 1,
"number_of_partitions_in_transaction": 1,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false
}
以下数据更改记录是值捕获类型为 NEW_ROW_AND_OLD_VALUES
的记录示例。只有 LastUpdate
列进行了修改,但系统会返回所有跟踪的列。此值捕获类型会捕获 LastUpdate
的新值和旧值。
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 1000
},
"old_values": {
"LastUpdate": "2022-09-26T11:28:00.189413Z"
}
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "NEW_ROW_AND_OLD_VALUES",
"number_of_records_in_transaction": 1,
"number_of_partitions_in_transaction": 1,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false
}
检测信号记录
当返回检测信号记录时,表示所有 commit_timestamp
小于或等于检测信号记录 timestamp
的更改都已返回,并且相应分区中的未来数据记录必须具有比检测信号记录返回的提交时间戳更高的提交时间戳。当没有数据更改写入分区时,系统会返回检测信号记录。当有数据更改写入分区时,可以使用 data_change_record.commit_timestamp
而不是 heartbeat_record.timestamp
来告知读取器正在继续向前读取分区。
您可以使用分区上返回的检测信号记录在所有分区间同步读取器。一旦所有读取器都收到大于或等于某个时间戳 A
的检测信号,或者收到大于或等于时间戳 A
的数据或子分区记录,读取器便知道它们已收到在时间戳 A
时或之前提交的所有记录,并且可以开始处理缓冲的记录,例如按时间戳对跨分区记录进行排序并按 server_transaction_id
对其进行分组。
一个检测信号记录仅包含一个字段:
GoogleSQL
字段 | 类型 | 说明 |
---|---|---|
timestamp |
TIMESTAMP |
指示检测信号记录的时间戳。 |
PostgreSQL
字段 | 类型 | 说明 |
---|---|---|
timestamp |
STRING |
指示检测信号记录的时间戳。 |
示例检测信号记录
一个示例检测信号记录,用于传达时间戳小于或等于此记录时间戳的所有记录都已返回:
heartbeat_record: {
"timestamp": "2022-09-27T12:35:00.312486Z"
}
子分区记录
子分区记录会返回有关子分区的信息:其分区令牌、父分区的令牌,以及表示子分区所包含更改记录的最早时间戳的 start_timestamp
。提交时间戳恰好在 child_partitions_record.start_timestamp
之前的记录会在当前分区中返回。在返回相应分区的所有子分区记录后,此查询会返回成功状态,表明已返回相应分区的所有记录。
子分区记录的字段包括以下内容:
GoogleSQL
字段 | 类型 | 说明 |
---|---|---|
start_timestamp |
TIMESTAMP |
指示相应子分区记录中从子分区返回的数据更改记录的提交时间戳大于或等于 start_timestamp 。查询子分区时,查询应指定子分区令牌和大于或等于 child_partitions_token.start_timestamp 的 start_timestamp 。分区返回的所有子分区记录都具有相同的 start_timestamp ,并且时间戳始终介于查询的指定 start_timestamp 和 end_timestamp 之间。 |
record_sequence |
STRING |
指示一个单调递增的序列号,当在特定分区中返回多个具有相同 start_timestamp 的子分区记录时,该序列号可用于定义子分区记录的排序。分区令牌、start_timestamp 和 record_sequence 可唯一标识子分区记录。
|
child_partitions |
[ { "token" : "STRING", "parent_partition_tokens" : ["STRING"] } ] |
返回一组子分区及其关联信息。这包括用于在查询中标识子分区的分区令牌字符串以及其父分区的令牌。 |
PostgreSQL
字段 | 类型 | 说明 |
---|---|---|
start_timestamp |
STRING |
指示相应子分区记录中从子分区返回的数据更改记录的提交时间戳大于或等于 start_timestamp 。查询子分区时,查询应指定子分区令牌和大于或等于 child_partitions_token.start_timestamp 的 start_timestamp 。分区返回的所有子分区记录都具有相同的 start_timestamp ,并且时间戳始终介于查询的指定 start_timestamp 和 end_timestamp 之间。
|
record_sequence |
STRING |
指示一个单调递增的序列号,当在特定分区中返回多个具有相同 start_timestamp 的子分区记录时,该序列号可用于定义子分区记录的排序。分区令牌、start_timestamp 和 record_sequence 可唯一标识子分区记录。
|
child_partitions |
[ { "token": "STRING", "parent_partition_tokens": ["STRING"], }, [...] ] |
返回子分区的数组及其关联信息。这包括用于在查询中标识子分区的分区令牌字符串以及其父分区的令牌。 |
示例子分区记录
以下是子分区记录的示例:
child_partitions_record: {
"start_timestamp": "2022-09-27T12:40:00.562986Z",
"record_sequence": "00000001",
"child_partitions": [
{
"token": "child_token_1",
// To make sure changes for a key is processed in timestamp
// order, wait until the records returned from all parents
// have been processed.
"parent_partition_tokens": ["parent_token_1", "parent_token_2"]
}
],
}
变更数据流查询工作流
使用 ExecuteStreamingSql
API 运行变更数据流查询,并使用一次性只读事务和强时间戳边界。变更数据流读取函数使您可以为感兴趣的时间范围指定 start_timestamp
和 end_timestamp
。您可以使用强只读时间戳边界访问保留期限内的所有变更记录。
所有其他 TransactionOptions
对变更数据流查询都无效。此外,如果 TransactionOptions.read_only.return_read_timestamp
设置为 true
,则描述事务的 Transaction
消息中会返回特殊值 kint64max - 1
,而不是有效的读取时间戳。您应舍弃此特殊值,不要将其用于任何后续查询。
每个变更数据流查询都可以返回任意数量的行,每行都包含数据更改记录、检测信号记录或子分区记录。无需为请求设置截止时间。
示例变更数据流查询工作流
流式查询工作流首先会通过将 partition_token
指定为 NULL
,来发出第一个变更数据流查询。查询需要指定变更数据流的读取函数、感兴趣的开始和结束时间戳以及检测信号间隔。当 end_timestamp
为 NULL
时,查询会持续返回数据更改,直到分区结束。
GoogleSQL
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:00Z",
end_timestamp => NULL,
partition_token => NULL,
heartbeat_milliseconds => 10000
);
PostgreSQL
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:00Z',
NULL,
NULL,
10000,
NULL
) ;
处理来自相应查询的数据记录,直到返回所有子分区记录。在以下示例中,系统会返回两个子分区记录和三个分区令牌,然后查询会终止。来自特定查询的子分区记录始终共享相同的 start_timestamp
。
child_partitions_record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01T09:00:01Z",
"record_sequence": "1000012389",
"child_partitions": [
{
"token": "child_token_1",
// Note parent tokens are null for child partitions returned
// from the initial change stream queries.
"parent_partition_tokens": [NULL]
}
{
"token": "child_token_2",
"parent_partition_tokens": [NULL]
}
],
}
child_partitions_record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01T09:00:01Z",
"record_sequence": "1000012390",
"child_partitions": [
{
"token": "child_token_3",
"parent_partition_tokens": [NULL]
}
],
}
如需处理 2022-05-01T09:00:01Z
之后的更改,请创建三个新查询并且并行运行它们。这三个查询一起使用时,会返回其父级所涵盖的相同键范围内的数据更改。请始终将 start_timestamp
设置为同一子分区记录中的 start_timestamp
,并使用相同的 end_timestamp
和检测信号间隔,以便在所有查询间以一致的方式处理所有记录。
GoogleSQL
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:01Z",
end_timestamp => NULL,
partition_token => "child_token_1",
heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:01Z",
end_timestamp => NULL,
partition_token => "child_token_2",
heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:01Z",
end_timestamp => NULL,
partition_token => "child_token_3",
heartbeat_milliseconds => 10000
);
PostgreSQL
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:01Z',
NULL,
'child_token_1',
10000,
NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:01Z',
NULL,
'child_token_2',
10000,
NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:01Z',
NULL,
'child_token_3',
10000,
NULL
);
在返回另一个子分区记录后,对 child_token_2
的查询会完成。此记录表示,从 2022-05-01T09:30:15Z
开始,新分区涵盖对 child_token_2
和 child_token_3
的更改。对 child_token_3
的查询会返回完全相同的记录,因为两者都是新 child_token_4
的父分区。为了确保严格按顺序处理特定键的数据记录,对 child_token_4
的查询必须在所有父级完成后开始。在本例中,父级为 child_token_2
和 child_token_3
。仅为每个子分区令牌创建一个查询。查询工作流设计应指定一个父级来等待并安排对 child_token_4
的查询。
child_partitions_record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01T09:30:15Z",
"record_sequence": "1000012389",
"child_partitions": [
{
"token": "child_token_4",
"parent_partition_tokens": ["child_token_2", "child_token_3"],
}
],
}
GoogleSQL
SELECT ChangeRecord FROM READ_SingersNameStream(
start_timestamp => "2022-05-01T09:30:15Z",
end_timestamp => NULL,
partition_token => "child_token_4",
heartbeat_milliseconds => 10000
);
PostgreSQL
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:30:15Z',
NULL,
'child_token_4',
10000,
NULL
);
您可以在 GitHub 上的 Apache Beam SpannerIO Dataflow 连接器内容中找到处理和解析变更数据流记录的示例。