变更数据流分区、记录和查询

本页面详细介绍了变更数据流的以下属性:

  • 它的基于分块的分区模型
  • 变更数据流记录的格式和内容
  • 用于查询这些记录的低级语法
  • 查询工作流示例

此页面上的信息最适合使用 Spanner API 直接查询变更数据流。改为使用 Dataflow 读取变更数据流数据的应用无需直接使用本文所述的数据模型。

如需查看更全面的变更数据流入门指南,请参阅变更数据流概览

变更数据流分区

当变更流所监控的表发生更改时,Spanner 会在数据更改所在的事务中同步写入相应的变更数据流记录。这可确保在事务成功时,Spanner 也已成功捕获并保留更改。在内部,Spanner 会协同定位变更数据流记录和数据更改,以便同一服务器处理它们,从而最大限度地减少写入开销。

作为特定分块的 DML 的一部分,Spanner 会将写入操作附加到同一事务中的相应变更数据流数据分块。由于存在这种对接位置,变更流不会在服务资源之间增加额外的协调,从而最大限度地减少事务提交开销。

图片

Spanner 根据数据库负载和大小动态拆分和合并数据,以及跨服务资源分布分片,从而实现扩缩。

为了使变更流写入和读取实现可扩缩,Spanner 会将内部变更数据流存储与数据库数据一起拆分和合并,从而自动避免热点。为了支持在数据库写入规模化时近乎实时地读取变更数据流记录,Spanner API 旨在支持使用变更数据流分区并发查询变更数据流。变更数据流分区映射以包含变更数据流记录的变更数据流数据拆分。变更数据流的分区会随时间动态变化,并且与 Spanner 动态拆分和合并数据库数据的方式相关。

变更数据流分区包含特定时间范围内不可变键范围的记录。任何变更数据流分区都可以拆分为一个或多个变更数据流分区,也可以与其他变更数据流分区合并。当发生这些拆分或合并事件时,系统会创建子分区,以捕获下一个时间范围中各自不可变键范围的更改。除了数据变更记录之外,变更数据流查询还会返回子分区记录,以通知读取者需要查询的新变更数据流分区,以及检测信号记录,以便在最近未发生任何写入操作时指示前进进度。

查询特定变更数据流分区时,变更记录按提交时间戳顺序返回。每条更改记录仅返回一次。跨变更数据流分区,无法保证变更记录按顺序排列。特定主键的更改记录仅在特定时间范围内的一个分区上返回。

由于存在父子分区沿袭,为了按提交时间戳顺序处理特定键的更改,只有在所有父分区的记录都处理完毕后,才应处理从子分区返回的记录。

变更数据流读取函数和查询语法

GoogleSQL

您可以使用 ExecuteStreamingSql API 查询变更数据流。Spanner 会自动创建一个特殊的读取函数以及变更数据流。读取函数提供对变更数据流记录的访问权限。读取函数命名惯例为 READ_change_stream_name

假设数据库中已存在变更数据流 SingersNameStream,则 GoogleSQL 的查询语法如下:

SELECT ChangeRecord
FROM READ_SingersNameStream (
    start_timestamp,
    end_timestamp,
    partition_token,
    heartbeat_milliseconds,
    read_options
)

read 函数接受以下参数:

参数名称 类型 是否必需? 说明
start_timestamp TIMESTAMP 必填 指定应返回 commit_timestamp 大于或等于 start_timestamp 的记录。该值必须在变更数据流保留期限内,且应小于或等于当前时间,且应大于或等于变更数据流创建的时间戳。
end_timestamp TIMESTAMP 可选(默认值:NULL 指定应返回 commit_timestamp 小于或等于 end_timestamp 的记录。该值必须在变更数据流保留期限内,并且大于或等于 start_timestamp。查询会在返回截止到 end_timestamp 的所有 ChangeRecord 后完成,或者用户终止了连接。如果为 NULL 或未指定,则查询会一直执行,直到返回所有 ChangeRecord 或用户终止连接。
partition_token STRING 可选(默认值:NULL 根据子分区记录的内容,指定要查询的变更数据流分区。如果为 NULL 或未指定,则表示读取器是首次查询变更数据流,并且尚未获得要从中查询的任何特定分区令牌。
heartbeat_milliseconds INT64 必填 确定在此分区中未提交事务的情况下返回检测信号 ChangeRecord 的频率。 该值必须介于 1,000(一秒)到 30,0000(五分钟)之间。
read_options ARRAY 可选(默认值:NULL 预留了其他读取选项,以供日后使用。目前唯一允许的值是 NULL

我们建议您采用一种便捷方法来构建读取函数查询的文本并将参数与其绑定,如以下示例所示。

Java

private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE =
        "SELECT ChangeRecord FROM READ_SingersNameStream"
            + "("
            + "   start_timestamp => @startTimestamp,"
            + "   end_timestamp => @endTimestamp,"
            + "   partition_token => @partitionToken,"
            + "   heartbeat_milliseconds => @heartbeatMillis"
            + ")";

// Helper method to conveniently create change stream query texts and bind parameters.
public static Statement getChangeStreamQuery(
      String partitionToken,
      Timestamp startTimestamp,
      Timestamp endTimestamp,
      long heartbeatMillis) {
  return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE)
                    .bind("startTimestamp")
                    .to(startTimestamp)
                    .bind("endTimestamp")
                    .to(endTimestamp)
                    .bind("partitionToken")
                    .to(partitionToken)
                    .bind("heartbeatMillis")
                    .to(heartbeatMillis)
                    .build();
}

PostgreSQL

您可以使用 ExecuteStreamingSql API 查询变更数据流。Spanner 会自动创建一个特殊的读取函数以及变更数据流。读取函数提供对变更数据流记录的访问权限。读取函数命名惯例为 spanner.read_json_change_stream_name

假设数据库中已存在变更数据流 SingersNameStream,PostgreSQL 的查询语法如下所示:

SELECT *
FROM "spanner"."read_json_SingersNameStream" (
    start_timestamp,
    end_timestamp,
    partition_token,
    heartbeat_milliseconds,
    null
)

read 函数接受以下参数:

参数名称 类型 是否必需? 说明
start_timestamp timestamp with time zone 必填 指定应返回 commit_timestamp 大于或等于 start_timestamp 的变更记录。该值必须在变更数据流保留期限内,且应小于或等于当前时间,且应大于或等于变更数据流创建的时间戳。
end_timestamp timestamp with timezone 可选(默认值:NULL 指定应返回 commit_timestamp 小于或等于 end_timestamp 的变更记录。该值必须在变更数据流保留期限内,并且大于或等于 start_timestamp。查询会在返回直到 end_timestamp 之前的所有更改记录后完成,或者用户终止连接。如果为 NULL,则查询会一直执行,直到返回所有更改记录或用户终止连接。
partition_token text 可选(默认值:NULL 根据子分区记录的内容,指定要查询的变更数据流分区。如果为 NULL 或未指定,则表示读取器是首次查询变更数据流,并且尚未获得要从中查询的任何特定分区令牌。
heartbeat_milliseconds bigint 必填 确定在此分区中未提交事务的情况下返回检测信号 ChangeRecord 的频率。 该值必须介于 1,000(一秒)到 300,000(五分钟)之间。
null null 必填 预留以供日后使用

我们建议您采用一种便捷方法来构建 read 函数的文本并将参数与其绑定,如以下示例所示。

Java

private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE =
        "SELECT * FROM \"spanner\".\"read_json_SingersNameStream\""
            + "($1, $2, $3, $4, null)";

// Helper method to conveniently create change stream query texts and bind parameters.
public static Statement getChangeStreamQuery(
      String partitionToken,
      Timestamp startTimestamp,
      Timestamp endTimestamp,
      long heartbeatMillis) {

  return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE)
                    .bind("p1")
                    .to(startTimestamp)
                    .bind("p2")
                    .to(endTimestamp)
                    .bind("p3")
                    .to(partitionToken)
                    .bind("p4")
                    .to(heartbeatMillis)
                    .build();
}

变更数据流记录格式

GoogleSQL

变更数据流读取函数会返回一个类型为 ARRAY<STRUCT<...>>ChangeRecord 列。在每一行中,此数组始终包含一个元素。

数组元素具有以下类型:

STRUCT <
  data_change_record ARRAY<STRUCT<...>>,
  heartbeat_record ARRAY<STRUCT<...>>,
  child_partitions_record ARRAY<STRUCT<...>>
>

此结构体中有三个字段:data_change_recordheartbeat_recordchild_partitions_record,每个字段的类型为 ARRAY<STRUCT<...>>。在变更数据流读取函数返回的任何行中,这三个字段中只有一个字段包含值;另外两个字段为空或 NULL。这些数组字段最多包含一个元素。

以下部分分别介绍了这三种记录类型。

PostgreSQL

变更数据流读取函数会返回 JSON 类型的单个 ChangeRecord 列,其结构如下:

{
  "data_change_record" : {},
  "heartbeat_record" : {},
  "child_partitions_record" : {}
}

此对象中有三个可能的键:data_change_recordheartbeat_recordchild_partitions_record,对应的值类型为 JSON。在变更数据流读取函数返回的任何行中,这三个键中只有一个存在。

以下部分分别介绍了这三种记录类型。

数据变更记录

数据变更记录包含对表具有一组更改,这些表具有在同一事务的同一变更数据流分区中在同一提交时间戳提交的相同修改类型(插入、更新或删除)。对于同一事务,可以跨多个变更数据流分区返回多条数据变更记录。

所有数据变更记录都包含 commit_timestampserver_transaction_idrecord_sequence 字段,这些字段共同确定了数据流记录的变更流中的顺序。这三个字段足以推导更改的顺序并提供外部一致性。

请注意,如果多个事务涉及不重叠的数据,则可以具有相同的提交时间戳。借助 server_transaction_id 字段,您可以区分同一事务中发出了哪组更改(可能跨变更数据流分区)。将其与 record_sequencenumber_of_records_in_transaction 字段配对后,您还可以缓冲特定事务中的所有记录并对其进行排序。

数据变更记录中的字段包括:

GoogleSQL

字段 类型 说明
commit_timestamp TIMESTAMP 提交更改的时间戳。
record_sequence STRING 事务中记录的序列号。保证序列号在事务中是唯一的,并且单调递增(但不一定连续)。按 record_sequence 对同一 server_transaction_id 的记录进行排序,以在事务中重建更改的顺序。
server_transaction_id STRING 全局唯一字符串,表示在其中提交更改的事务。该值应仅在处理变更数据流记录的上下文中使用,与 Spanner API 中的事务 ID 无关。
is_last_record_in_transaction_in_partition BOOL 指示该记录是否为当前分区中事务的最后一条记录。
table_name STRING 受更改影响的表的名称。
value_capture_type STRING

描述捕获此更改时在变更数据流配置中指定的值捕获类型。

值捕获类型可以是 "OLD_AND_NEW_VALUES""NEW_ROW""NEW_VALUES""NEW_ROW_AND_OLD_VALUES"。 默认为 "OLD_AND_NEW_VALUES"。如需了解详情,请参阅值捕获类型

column_types ARRAY<STRUCT<
name STRING,
 type JSON,
 is_primary_key BOOL,
 ordinal_position INT64
>>
列名称、列类型、是否为主键,以及架构中定义的列位置 (`ordinal_position`)。架构中表的第一列的序数位置为 1。对于数组列,列类型可以嵌套。格式与 Spanner API 参考文档中描述的类型结构匹配。
mods ARRAY<STRUCT<
keys JSON,
 new_values JSON,
 old_values JSON
>>
描述所做的更改,包括主键值、旧值以及已更改或跟踪的列的新值。旧值和新值的可用性和内容取决于配置的 value_capture_type。new_valuesold_values 字段仅包含非键列。
mod_type STRING 描述更改的类型。INSERTUPDATEDELETE 其中之一。
number_of_records_in_transaction INT64 所有变更数据流分区中属于此事务的数据变更记录数。
number_of_partitions_in_transaction INT64 将为此事务返回数据变更记录的分区数。
transaction_tag STRING 与此交易关联的 交易代码
is_system_transaction BOOL 指示事务是否为系统事务。

PostgreSQL

字段 类型 说明
commit_timestamp STRING 提交更改的时间戳。
record_sequence STRING 事务中记录的序列号。保证序列号在事务中是唯一的,并且单调递增(但不一定连续)。按“record_sequence”对同一“server_transaction_id”的记录进行排序,以在事务中重构更改的顺序。
server_transaction_id STRING 全局唯一字符串,表示在其中提交更改的事务。该值应仅在处理变更数据流记录的上下文中使用,与 Spanner API 中的事务 ID 无关
is_last_record_in_transaction_in_partition BOOLEAN 指示该记录是否为当前分区中事务的最后一条记录。
table_name STRING 受更改影响的表的名称。
value_capture_type STRING

描述捕获此更改时在变更数据流配置中指定的值捕获类型。

值捕获类型可以是 "OLD_AND_NEW_VALUES""NEW_ROW""NEW_VALUES""NEW_ROW_AND_OLD_VALUES"。 默认为 "OLD_AND_NEW_VALUES"。如需了解详情,请参阅值捕获类型

column_types
[
  {
      "name": <STRING>,
      "type": {
        "code": <STRING>
      },
      "is_primary_key": <BOOLEAN>,
      "ordinal_position": <NUMBER>
    },
    ...
]
列名称、列类型、是否为主键,以及架构中定义的列位置 (`ordinal_position`)。架构中表的第一列的序数位置为 1。对于数组列,列类型可以嵌套。格式与 Spanner API 参考文档中描述的类型结构匹配。
mods
[
  {
    "keys": {<STRING> : <STRING>},
    "new_values": {
      <STRING> : <VALUE-TYPE>,
      [...]
    },
    "old_values": {
      <STRING> : <VALUE-TYPE>,
      [...]
    },
  },
  [...]
]
描述所做的更改,包括主键值、旧值以及已更改或跟踪的列的新值。旧值和新值的可用性和内容取决于配置的 value_capture_type。new_valuesold_values 字段仅包含非键列。
mod_type STRING 描述更改的类型。INSERTUPDATEDELETE 其中之一。
number_of_records_in_transaction INT64 所有变更数据流分区中属于此事务的数据变更记录数。
number_of_partitions_in_transaction NUMBER 将为此事务返回数据变更记录的分区数。
transaction_tag STRING 与此交易关联的 交易代码
is_system_transaction BOOLEAN 指示事务是否为系统事务。

下面提供了一对示例数据更改记录。它们用于描述在两个帐号之间进行转移的单笔交易。请注意,这两个帐号位于不同的变更数据流分区中。

"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  // record_sequence is unique and monotonically increasing within a
  // transaction, across all partitions.
  "record_sequence": "00000000",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
       "name": "Balance",
       "type": {"code": "INT"},
       "is_primary_key": false,
       "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id1"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 1000
      },
      "old_values": {
        "LastUpdate": "2022-09-26T11:28:00.189413Z",
        "Balance": 1500
      },
    }
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "OLD_AND_NEW_VALUES",
  "number_of_records_in_transaction": 2,
  "number_of_partitions_in_transaction": 2,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false,
}
"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  "record_sequence": "00000001",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
      "name": "Balance",
      "type": {"code": "INT"},
      "is_primary_key": false,
      "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id2"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 2000
      },
      "old_values": {
        "LastUpdate": "2022-01-20T11:25:00.199915Z",
        "Balance": 1500
      },
    },
    ...
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "OLD_AND_NEW_VALUES",
  "number_of_records_in_transaction": 2,
  "number_of_partitions_in_transaction": 2,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false,
}

以下数据变更记录是值捕获类型为 "NEW_VALUES" 的记录示例。请注意,系统只会填充新值。 只有 "LastUpdate" 列发生了修改,因此仅返回了该列。

"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  // record_sequence is unique and monotonically increasing within a
  // transaction, across all partitions.
  "record_sequence": "00000000",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,
  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id1"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z"
      },
      "old_values": {}
    }
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "NEW_VALUES",
  "number_of_records_in_transaction": 1,
  "number_of_partitions_in_transaction": 1,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false
}

以下数据变更记录是值捕获类型为 "NEW_ROW" 的记录示例。只有 "LastUpdate" 列发生了修改,但系统会返回所有跟踪的列。

"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  // record_sequence is unique and monotonically increasing within a
  // transaction, across all partitions.
  "record_sequence": "00000000",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
       "name": "Balance",
       "type": {"code": "INT"},
       "is_primary_key": false,
       "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id1"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 1000
      },
      "old_values": {}
    }
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "NEW_ROW",
  "number_of_records_in_transaction": 1,
  "number_of_partitions_in_transaction": 1,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false
}

以下数据变更记录是值捕获类型为 "NEW_ROW_AND_OLD_VALUES" 的记录示例。只有 "LastUpdate" 列发生了修改,但系统会返回所有跟踪的列。该值捕获类型会捕获 LastUpdate 的新值和旧值。

"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  // record_sequence is unique and monotonically increasing within a
  // transaction, across all partitions.
  "record_sequence": "00000000",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
       "name": "Balance",
       "type": {"code": "INT"},
       "is_primary_key": false,
       "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id1"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 1000
      },
      "old_values": {
        "LastUpdate": "2022-09-26T11:28:00.189413Z"
      }
    }
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "NEW_ROW_AND_OLD_VALUES",
  "number_of_records_in_transaction": 1,
  "number_of_partitions_in_transaction": 1,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false
}

检测信号记录

如果返回检测信号记录,则表示已返回 commit_timestamp 小于或等于检测信号记录的 timestamp 的所有更改,并且此分区中将来的数据记录的提交时间戳必须高于检测信号记录所返回的提交时间戳。如果没有数据更改写入分区,系统会返回检测信号记录。当有数据更改写入分区时,可以使用 data_change_record.commit_timestamp(而非 heartbeat_record.timestamp)来指示读取器正在读取分区。

您可以使用在分区上返回的检测信号记录来同步所有分区中的读取器。一旦所有读取器都收到大于或等于某个时间戳 A 的检测信号,或者已收到大于或等于时间戳 A 的数据或子分区记录,读取器就会知道自己已收到该时间戳 A 当天或之前提交的所有记录,并且可以开始处理缓冲记录 - 例如,按时间戳对跨分区记录进行排序,并按 server_transaction_id 对其进行分组。

检测信号记录仅包含一个字段:

GoogleSQL

字段 类型 说明
timestamp TIMESTAMP 检测信号记录的时间戳。

PostgreSQL

字段 类型 说明
timestamp STRING 检测信号记录的时间戳。

检测信号记录示例,用于表明已返回时间戳小于或等于此记录时间戳的所有记录:

heartbeat_record: {
  "timestamp": "2022-09-27T12:35:00.312486Z"
}

子分区记录

子分区记录会返回有关子分区的信息:子分区的分区令牌、其父分区的令牌,以及表示子分区包含变更记录的最早时间戳的 start_timestamp。提交时间戳紧接在 child_partitions_record.start_timestamp 之前的记录会在当前分区中返回。返回此分区的所有子分区记录后,此查询将返回成功状态,表示已为此分区返回所有记录。

子分区记录的字段包括:

GoogleSQL

字段 类型 说明
start_timestamp TIMESTAMP 从此子分区记录中的子分区返回的数据更改记录具有大于或等于 start_timestamp 的提交时间戳。在查询子分区时,应在查询中指定子分区令牌和一个大于或等于 child_partitions_token.start_timestampstart_timestamp。分区返回的所有子分区记录都具有相同的 start_timestamp,并且时间戳始终介于查询的指定 start_timestampend_timestamp 之间。
record_sequence STRING 单调递增的序列号,当在特定分区中返回多个具有相同 start_timestamp 的子分区记录时,可用于定义子分区记录的顺序。分区令牌 start_timestamprecord_sequence 是子分区记录的唯一标识符。
child_partitions ARRAY<STRUCT<
token STRING,
parent_partition_tokens
ARRAY<STRING>
>>
返回一组子分区及其相关信息。 这包括用于在查询中标识子分区的分区令牌字符串,及其父分区的令牌。

PostgreSQL

字段 类型 说明
start_timestamp STRING 从此子分区记录中的子分区返回的数据更改记录具有大于或等于 start_timestamp 的提交时间戳。在查询子分区时,应在查询中指定子分区令牌和一个大于或等于 child_partitions_token.start_timestampstart_timestamp。分区返回的所有子分区记录都具有相同的 start_timestamp,并且时间戳始终介于查询指定的 start_timestampend_timestamp 之间。
record_sequence STRING 单调递增的序列号,当在特定分区中返回多个具有相同 start_timestamp 的子分区记录时,可用于定义子分区记录的顺序。分区令牌 start_timestamprecord_sequence 是子分区记录的唯一标识符。
child_partitions
[
  {
    "token": <STRING>,
    "parent_partition_tokens": [<STRING>],
  }, [...]
]
返回子分区及其相关信息的数组。 这包括用于在查询中标识子分区的分区令牌字符串,及其父分区的令牌。

以下是子分区记录的示例:

child_partitions_record: {
  "start_timestamp": "2022-09-27T12:40:00.562986Z",
  "record_sequence": "00000001",
  "child_partitions": [
    {
      "token": "child_token_1",
      // To make sure changes for a key is processed in timestamp
      // order, wait until the records returned from all parents
      // have been processed.
      "parent_partition_tokens": ["parent_token_1", "parent_token_2"]
    }
  ],
}

变更数据流查询工作流

使用 ExecuteStreamingSql API 运行变更数据流查询,并具有一次性只读事务和强时间戳边界。通过变更数据流读取函数,您可以为感兴趣的时间范围指定 start_timestampend_timestamp。保留期限内的所有变更记录均可使用强只读时间戳边界访问。

所有其他 TransactionOptions 对于变更数据流查询均无效。此外,如果 TransactionOptions.read_only.return_read_timestamp 设置为 true,则描述事务的 Transaction 消息中将返回特殊值 kint64max - 1,而不是有效的读取时间戳。应舍弃此特殊值,且不得将其用于任何后续查询。

每个变更数据流查询可以返回任意数量的行,每行都包含一条数据变更记录、检测信号记录或子分区记录。无需为请求设置截止期限。

示例:

流式查询工作流首先将 partition_token 指定为 NULL,以发出第一个变更流查询。该查询需要指定变更数据流的读取函数、感兴趣的开始和结束时间戳以及检测信号间隔。当 end_timestampNULL 时,查询会一直返回数据,直到分区结束。

GoogleSQL

SELECT ChangeRecord FROM READ_SingersNameStream (
  start_timestamp => "2022-05-01T09:00:00Z",
  end_timestamp => NULL,
  partition_token => NULL,
  heartbeat_milliseconds => 10000
);

PostgreSQL

SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:00:00Z',
  NULL,
  NULL,
  10000,
  NULL
) ;

处理此查询的数据记录,直到返回子分区记录。在以下示例中,系统会返回两条子分区记录和三个分区令牌,然后查询终止。特定查询中的子分区记录始终共用同一 start_timestamp

child_partitions_record: {
  "record_type": "child_partitions",
  "start_timestamp": "2022-05-01T09:00:01Z",
  "record_sequence": 1000012389,
  "child_partitions": [
    {
      "token": "child_token_1",
      // Note parent tokens are null for child partitions returned
        // from the initial change stream queries.
      "parent_partition_tokens": [NULL]
    }
    {
      "token": "child_token_2",
      "parent_partition_tokens": [NULL]
    }
  ],
}
child partitions record: {
  "record_type": "child_partitions",
  "start_timestamp": "2022-05-01T09:00:01Z",
  "record_sequence": 1000012390,
  "child_partitions": [
    {
      "token": "child_token_3",
      "parent_partition_tokens": [NULL]
    }
  ],
}

如需在 2022-05-01T09:00:01Z 之后处理将来的更改,请创建三个新查询并并行运行它们。这三个查询共同返回其父级覆盖的同一键范围的未来数据更改。请务必将 start_timestamp 设置为同一子分区记录中的 start_timestamp,并使用相同的 end_timestamp 和检测信号间隔,以一致的方式处理所有查询中的记录。

GoogleSQL

SELECT ChangeRecord FROM READ_SingersNameStream (
  start_timestamp => "2022-05-01T09:00:01Z",
  end_timestamp => NULL,
  partition_token => "child_token_1",
  heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
  start_timestamp => "2022-05-01T09:00:01Z",
  end_timestamp => NULL,
  partition_token => "child_token_2",
  heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
  start_timestamp => "2022-05-01T09:00:01Z",
  end_timestamp => NULL,
  partition_token => "child_token_3",
  heartbeat_milliseconds => 10000
);

PostgreSQL

SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:00:01Z',
  NULL,
  'child_token_1',
  10000,
  NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:00:01Z',
  NULL,
  'child_token_2',
  10000,
  NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:00:01Z',
  NULL,
  'child_token_3',
  10000,
  NULL
);

一段时间后,对 child_token_2 的查询会在返回另一个子分区记录后完成,此记录表示新分区将涵盖从 2022-05-01T09:30:15Z 开始的 child_token_2child_token_3 的未来更改。对 child_token_3 的查询将返回完全相同的记录,因为两者都是新的 child_token_4 的父分区。为了保证对特定键的数据记录进行严格有序的处理,只有在所有父级都完成后(在本例中为 child_token_2child_token_3)之后,对 child_token_4 的查询才能开始。只需为每个子分区令牌创建一个查询,查询工作流设计应指定一个父项等待,并将查询安排在 child_token_4 上。

child partitions record: {
  "record_type": "child_partitions",
  "start_timestamp": "2022-05-01T09:30:15Z",
  "record_sequence": 1000012389,
  "child_partitions": [
    {
      "token": "child_token_4",
      "parent_partition_tokens": [child_token_2, child_token_3],
    }
  ],
}

GoogleSQL

SELECT ChangeRecord FROM READ_SingersNameStream(
  start_timestamp => "2022-05-01T09:30:15Z",
  end_timestamp => NULL,
  partition_token => "child_token_4",
  heartbeat_milliseconds => 10000
);

PostgreSQL

SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:30:15Z',
  NULL,
  'child_token_4',
  10000,
  NULL
);

GitHub 上的 Apache Beam SpannerIO Dataflow 连接器中查找处理和解析变更数据流记录的示例。