变更数据流分区、记录和查询

本页面详细介绍了变更数据流的以下属性:

  • 它的基于分块的分区模型
  • 变更数据流记录的格式和内容
  • 用于查询这些记录的低级语法
  • 查询工作流示例

此页面上的信息最适合使用 Spanner API 直接查询变更数据流。 改为使用 Dataflow 读取变更数据流的应用 数据不需要直接与数据模型搭配使用 此处的说明。

如需查看更全面的变更数据流入门指南,请参阅变更数据流 概览

变更数据流分区

当变更流所监控的表发生变化时, Spanner 在数据库中写入相应的变更数据流记录, 在数据更改所在的事务中同步运行。这个 可以保证如果事务成功,Spanner 也会 成功捕获并保留更改在内部 Spanner 共置变更数据流记录和数据更改 以使它们由同一服务器处理,从而最大限度地减少写入开销。

作为特定分块的 DML 的一部分,Spanner 将写入附加到相应的变更数据流数据 拆分付款。由于存在这种对接网点,因此将 视频流不会在服务资源之间增加额外的协调, 可最大限度地减少事务提交开销。

图片

Spanner 通过基于数据动态拆分和合并数据进行扩缩 数据库负载和大小,以及在服务资源之间分配分块。

接收者 支持扩缩变更数据流写入和读取,Spanner 拆分 将内部变更数据流存储与数据库数据合并, 自动避开热点。如需支持在以下位置读取变更数据流记录: 随着数据库写入规模的增加,Spanner API 几乎实时更新, 专为使用变更数据流并发查询的变更数据流而设计 分区。变更数据流分区映射以更改数据流数据拆分, 包含变更数据流记录。变更数据流的分区发生变化 并且与 Spanner 动态拆分和合并数据库数据。

变更数据流分区包含 特定时间范围内的数据任何变更数据流分区都可以拆分为一个或多个变更数据流分区,也可以与其他变更数据流分区合并。当这些 当发生分屏或合并事件时,会创建子分区来捕获更改 下一个时间范围的不可变键范围。此外 更改为数据变更记录,变更数据流查询会将子分区记录返回到 通知读者需要查询的新变更数据流分区,以及 作为检测信号记录,以指示未发生写入操作时的向前进度 。

在查询特定的变更数据流分区时,变更记录将 按提交时间戳顺序返回。每条更改记录都会 一次。跨变更数据流分区,无法保证更改的顺序 记录。特定主键的更改记录仅在一个主键上返回 特定时间范围的分区中

由于存在父子分区沿袭,为了处理 提交时间戳顺序中的特定键,从子节点返回的记录 只有在所有父级的记录完成后,才应处理分区 分区都已处理完毕。

变更数据流读取函数和查询语法

GoogleSQL

您可以使用 ExecuteStreamingSql API。Spanner 会自动创建一个特殊的读取函数, 更改数据流读取函数提供对更改的访问权限 来记录视频流的记录读取函数命名惯例为 READ_change_stream_name

假设数据库中已存在变更数据流 SingersNameStream, 查询语法如下:

SELECT ChangeRecord
FROM READ_SingersNameStream (
    start_timestamp,
    end_timestamp,
    partition_token,
    heartbeat_milliseconds,
    read_options
)

read 函数接受以下参数:

参数名称 类型 是否必需? 说明
start_timestamp TIMESTAMP 必填 指定 commit_timestamp 大于或等于 start_timestamp 的记录 。该值必须位于变更数据流中 保留期限,并且应小于或等于当前时间 且大于或等于变更数据流创建的时间戳。
end_timestamp TIMESTAMP 可选(默认值:NULL 指定少 commit_timestamp 的记录 大于或等于 end_timestamp 返回。值必须在变更数据流保留期限内 且大于或等于 start_timestamp。查询 在返回最多 end_timestamp 的所有 ChangeRecord 后完成 或用户终止连接。是否为 NULL 则查询会一直执行,直到返回所有 ChangeRecords 或 用户终止连接。
partition_token STRING 可选(默认值:NULL 指定要查询的变更数据流分区,根据 子分区的内容 记录。如果为 NULL 或未指定,则表示 读取者是第一次查询变更数据流,并且 未获取任何要查询的特定分区令牌。
heartbeat_milliseconds INT64 必填 确定返回检测信号 ChangeRecord 的频率 如果此分区中未提交事务。 该值必须介于 1,000(一秒)到 300,000(五)之间 分钟)。
read_options ARRAY 可选(默认值:NULL 预留了其他读取选项,以供日后使用。目前唯一允许的值是 NULL

我们建议您采用一种便捷的方法来构建 读取函数查询并将参数与其绑定,如下所示 示例。

Java

private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE =
        "SELECT ChangeRecord FROM READ_SingersNameStream"
            + "("
            + "   start_timestamp => @startTimestamp,"
            + "   end_timestamp => @endTimestamp,"
            + "   partition_token => @partitionToken,"
            + "   heartbeat_milliseconds => @heartbeatMillis"
            + ")";

// Helper method to conveniently create change stream query texts and bind parameters.
public static Statement getChangeStreamQuery(
      String partitionToken,
      Timestamp startTimestamp,
      Timestamp endTimestamp,
      long heartbeatMillis) {
  return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE)
                    .bind("startTimestamp")
                    .to(startTimestamp)
                    .bind("endTimestamp")
                    .to(endTimestamp)
                    .bind("partitionToken")
                    .to(partitionToken)
                    .bind("heartbeatMillis")
                    .to(heartbeatMillis)
                    .build();
}

PostgreSQL

您可以使用 ExecuteStreamingSql API。 Spanner 会自动创建一个特殊的读取函数, 更改数据流读取函数提供对更改的访问权限 来记录视频流的记录读取函数命名惯例为 spanner.read_json_change_stream_name

假设数据库中已存在变更数据流 SingersNameStream, 查询语法如下所示:

SELECT *
FROM "spanner"."read_json_SingersNameStream" (
    start_timestamp,
    end_timestamp,
    partition_token,
    heartbeat_milliseconds,
    null
)

read 函数接受以下参数:

参数名称 类型 是否必需? 说明
start_timestamp timestamp with time zone 必填 指定 commit_timestamp 大于或等于 start_timestamp 的变更记录 。该值必须位于变更数据流中 保留期限,并且应小于或等于当前时间 且大于或等于变更数据流创建的时间戳。
end_timestamp timestamp with timezone 可选(默认值:NULL 使用 commit_timestamp 指定变更记录 应小于或等于 end_timestamp 返回。值必须在变更数据流保留期限内 且大于或等于 start_timestamp。 查询会在返回完所有更改记录后,直到 end_timestamp 或用户终止连接。 如果为 NULL,则查询将执行,直到返回所有更改记录 或用户终止连接。
partition_token text 可选(默认值:NULL 指定要查询的变更数据流分区,根据 子分区的内容 记录。如果为 NULL 或未指定,则表示 读取者是第一次查询变更数据流,并且 未获取任何要查询的特定分区令牌。
heartbeat_milliseconds bigint 必填 确定返回检测信号 ChangeRecord 的频率 如果此分区中未提交事务。 该值必须介于 1,000(一秒)到 300,000(五)之间 分钟)。
null null 必填 预留以供日后使用

我们建议您采用一种便捷的方法来构建 读取函数并将参数与其绑定,如下所示 示例。

Java

private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE =
        "SELECT * FROM \"spanner\".\"read_json_SingersNameStream\""
            + "($1, $2, $3, $4, null)";

// Helper method to conveniently create change stream query texts and bind parameters.
public static Statement getChangeStreamQuery(
      String partitionToken,
      Timestamp startTimestamp,
      Timestamp endTimestamp,
      long heartbeatMillis) {

  return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE)
                    .bind("p1")
                    .to(startTimestamp)
                    .bind("p2")
                    .to(endTimestamp)
                    .bind("p3")
                    .to(partitionToken)
                    .bind("p4")
                    .to(heartbeatMillis)
                    .build();
}

变更数据流记录格式

GoogleSQL

变更数据流读取函数会返回单个 ChangeRecord 列类型 ARRAY<STRUCT<...>>。在每一行中,此数组始终包含一个元素。

数组元素具有以下类型:

STRUCT <
  data_change_record ARRAY<STRUCT<...>>,
  heartbeat_record ARRAY<STRUCT<...>>,
  child_partitions_record ARRAY<STRUCT<...>>
>

该结构体中有三个字段:data_change_recordheartbeat_recordchild_partitions_record(每种类型) ARRAY<STRUCT<...>>。在变更数据流读取函数的任何行中 那么这三个字段中只有一个包含值;其他两个 为空或 NULL。这些数组字段最多包含一个元素。

以下部分分别介绍了这三种记录类型。

PostgreSQL

变更数据流读取函数会返回单个 ChangeRecord 列, 具有以下结构的 JSON 类型:

{
  "data_change_record" : {},
  "heartbeat_record" : {},
  "child_partitions_record" : {}
}

此对象中有三个可能的键:data_change_recordheartbeat_recordchild_partitions_record,对应的值 类型为 JSON。 在变更数据流读取函数返回的任何行中,只有 这三个键中的一个存在。

以下部分分别介绍了这三种记录类型。

数据变更记录

数据更改记录包含对表进行的一组更改,该表具有 在同一时间提交的相同修改类型(插入、更新或删除) 同一个变更数据流分区中的提交时间戳 交易。同一个项目可以返回多条数据变更记录 事务。

所有数据变更记录都有 commit_timestampserver_transaction_id, 和 record_sequence 字段,这两个字段共同决定更改的顺序 流录制。这三个字段足以 更改的顺序并提供外部一致性。

请注意,如果存在以下情况,则多个事务可以具有相同的提交时间戳: 它们接触的是互不重叠的数据server_transaction_id 字段 能够区分哪一组更改(可能会 跨变更数据流分区)是在同一个 交易。将它与record_sequence配对并 number_of_records_in_transaction 字段可让您缓冲和排序 特定事务的所有记录

数据变更记录中的字段包括:

GoogleSQL

字段 类型 说明
commit_timestamp TIMESTAMP 提交更改的时间戳。
record_sequence STRING 事务中记录的序列号。序号保证 在一次交易中是唯一且单调递增的(但不一定是连续的)。对同一个 server_transaction_id,由record_sequence上传到 在事务中重建更改的顺序。
server_transaction_id STRING 一个全局唯一字符串,表示以下时间段内的交易: 更改已提交该值只能 在处理变更数据流记录时使用,并非 与 Spanner API 中的事务 ID 相关联。
is_last_record_in_transaction_in_partition BOOL 指示该记录是否为当前分区中事务的最后一条记录。
table_name STRING 受更改影响的表的名称。
value_capture_type STRING

说明在 捕获此变更时的变更数据流配置。

值捕获类型可以是 "OLD_AND_NEW_VALUES""NEW_ROW""NEW_VALUES""NEW_ROW_AND_OLD_VALUES"。 默认为 "OLD_AND_NEW_VALUES"。如需更多信息 请参阅值捕获类型

column_types ARRAY<STRUCT<
name STRING,
 type JSON,
 is_primary_key BOOL,
 ordinal_position INT64
>>
列的名称、列类型 它是否为主键,以及列的位置为 。表格的第一列 序数位置为“1”。列类型 可以嵌套数组列。格式与类型结构相符 Spanner API 参考文档中介绍。
mods ARRAY<STRUCT<
keys JSON,
 new_values JSON,
 old_values JSON
>>
说明所做的更改,包括主键 值、已更改或跟踪的列的新值。 旧值和新值的可用性和内容取决于配置的 value_capture_type。new_valuesold_values 字段仅包含非键列。
mod_type STRING 描述更改的类型。INSERTUPDATEDELETE
number_of_records_in_transaction INT64 所属数据变更记录的数量 事务。
number_of_partitions_in_transaction INT64 将返回数据变更记录的分区数量 。
transaction_tag STRING 与此交易关联的交易代码
is_system_transaction BOOL 指示事务是否为系统事务。

PostgreSQL

字段 类型 说明
commit_timestamp STRING 提交更改的时间戳。
record_sequence STRING 事务中记录的序列号。序号保证 在一次交易中是唯一且单调递增的(但不一定是连续的)。对同一个 执行 `record_sequence` 的 `server_transaction_id`,重建 交易。
server_transaction_id STRING 一个全局唯一字符串,表示以下时间段内的交易: 更改已提交该值只能 在处理变更数据流记录时使用,并非 与 Spanner API 中的事务 ID 相关联
is_last_record_in_transaction_in_partition BOOLEAN 指示该记录是否为当前分区中事务的最后一条记录。
table_name STRING 受更改影响的表的名称。
value_capture_type STRING

说明在 捕获此变更时的变更数据流配置。

值捕获类型可以是 "OLD_AND_NEW_VALUES""NEW_ROW""NEW_VALUES""NEW_ROW_AND_OLD_VALUES"。 默认为 "OLD_AND_NEW_VALUES"。如需更多信息 请参阅值捕获类型

column_types
[
  {
      "name": <STRING>,
      "type": {
        "code": <STRING>
      },
      "is_primary_key": <BOOLEAN>,
      "ordinal_position": <NUMBER>
    },
    ...
]
列的名称、列类型 它是否为主键,以及列的位置为 。表格的第一列 序数位置为“1”。列类型 可以嵌套数组列。格式与类型结构相符 Spanner API 参考文档中介绍。
mods
[
  {
    "keys": {<STRING> : <STRING>},
    "new_values": {
      <STRING> : <VALUE-TYPE>,
      [...]
    },
    "old_values": {
      <STRING> : <VALUE-TYPE>,
      [...]
    },
  },
  [...]
]
说明所做的更改,包括主键 值、旧值以及已更改或跟踪的值的新值 列。旧值和新值的可用性和内容取决于 。new_valuesold_values 字段仅包含非键列。
mod_type STRING 描述更改的类型。INSERTUPDATEDELETE
number_of_records_in_transaction INT64 所属数据变更记录的数量 事务。
number_of_partitions_in_transaction NUMBER 将返回数据变更记录的分区数量 。
transaction_tag STRING 与此交易关联的交易代码
is_system_transaction BOOLEAN 指示事务是否为系统事务。

下面提供了一对示例数据更改记录。它们描述的是单个事务 这里有 在两个账号之间转移。请注意,这两个账号位于 单独的变更数据流 分区。

"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  // record_sequence is unique and monotonically increasing within a
  // transaction, across all partitions.
  "record_sequence": "00000000",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
       "name": "Balance",
       "type": {"code": "INT"},
       "is_primary_key": false,
       "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id1"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 1000
      },
      "old_values": {
        "LastUpdate": "2022-09-26T11:28:00.189413Z",
        "Balance": 1500
      },
    }
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "OLD_AND_NEW_VALUES",
  "number_of_records_in_transaction": 2,
  "number_of_partitions_in_transaction": 2,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false,
}
"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  "record_sequence": "00000001",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
      "name": "Balance",
      "type": {"code": "INT"},
      "is_primary_key": false,
      "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id2"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 2000
      },
      "old_values": {
        "LastUpdate": "2022-01-20T11:25:00.199915Z",
        "Balance": 1500
      },
    },
    ...
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "OLD_AND_NEW_VALUES",
  "number_of_records_in_transaction": 2,
  "number_of_partitions_in_transaction": 2,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false,
}

以下数据变更记录是包含值 捕获类型 "NEW_VALUES"。请注意,系统只会填充新值。 只有 "LastUpdate" 列发生了修改,因此只有该列 已返回。

"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  // record_sequence is unique and monotonically increasing within a
  // transaction, across all partitions.
  "record_sequence": "00000000",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,
  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id1"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z"
      },
      "old_values": {}
    }
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "NEW_VALUES",
  "number_of_records_in_transaction": 1,
  "number_of_partitions_in_transaction": 1,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false
}

以下数据变更记录是包含值 捕获类型 "NEW_ROW"。只有 "LastUpdate" 列已修改,但返回了所有跟踪的列。

"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  // record_sequence is unique and monotonically increasing within a
  // transaction, across all partitions.
  "record_sequence": "00000000",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
       "name": "Balance",
       "type": {"code": "INT"},
       "is_primary_key": false,
       "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id1"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 1000
      },
      "old_values": {}
    }
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "NEW_ROW",
  "number_of_records_in_transaction": 1,
  "number_of_partitions_in_transaction": 1,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false
}

以下数据变更记录是包含值 捕获类型 "NEW_ROW_AND_OLD_VALUES"。只有 "LastUpdate" 列已修改,但返回了所有跟踪的列。此值捕获 类型会捕获 LastUpdate 的新值和旧值。

"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  // record_sequence is unique and monotonically increasing within a
  // transaction, across all partitions.
  "record_sequence": "00000000",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
       "name": "Balance",
       "type": {"code": "INT"},
       "is_primary_key": false,
       "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id1"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 1000
      },
      "old_values": {
        "LastUpdate": "2022-09-26T11:28:00.189413Z"
      }
    }
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "NEW_ROW_AND_OLD_VALUES",
  "number_of_records_in_transaction": 1,
  "number_of_partitions_in_transaction": 1,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false
}

检测信号记录

如果返回检测信号记录,则表示所有更改 commit_timestamp 小于或等于检测信号记录的 已返回 timestamp,此 分区的提交时间戳必须高于 检测信号记录。没有数据时,将返回检测信号记录 写入分区的更改当有数据更改写入 因此可以改用 data_change_record.commit_timestamp heartbeat_record.timestamp 表示读者正在前进 读取分区的进度

您可以使用分区上返回的检测信号记录来进行同步 读取器。所有读者都收到 检测信号响应时间大于或等于某个时间戳 A,或者已收到数据或子节点 大于或等于时间戳 A 的分区记录,读者知道他们已收到 在时间戳 A 当天或之前提交的所有记录,可以开始 处理已缓冲的记录,例如,对跨分区进行排序 按时间戳排序,并按 server_transaction_id 对记录进行分组。

检测信号记录仅包含一个字段:

GoogleSQL

字段 类型 说明
timestamp TIMESTAMP 检测信号记录的时间戳。

PostgreSQL

字段 类型 说明
timestamp STRING 检测信号记录的时间戳。

检测信号示例,传达带有时间戳的所有记录 时间戳小于或等于此记录的时间戳:

heartbeat_record: {
  "timestamp": "2022-09-27T12:35:00.312486Z"
}

子分区记录

子分区记录返回子分区的相关信息:子分区的分区令牌、父分区的令牌,以及 start_timestamp,表示子项执行的最早时间戳 分区包含 的变更记录。包含提交时间戳的记录 紧跟在child_partitions_record.start_timestamp之前 返回的值。在返回所有 子分区记录,此查询将返回 成功状态,这表示相应查询已返回所有记录 。

子分区记录的字段包括:

GoogleSQL

字段 类型 说明
start_timestamp TIMESTAMP 从子级返回的数据更改记录 此子分区记录中的分区具有提交时间戳 大于或等于 start_timestamp。查询子分区时,查询应 指定子分区令牌,且 start_timestamp 大于或等于 child_partitions_token.start_timestamp。所有子分区记录 具有相同的 start_timestamp 和 时间戳始终介于查询的指定 start_timestamp 之间 和end_timestamp
record_sequence STRING 单调递增序列 该数字可用于定义 有多个子分区记录时, 在 特定分区。分区令牌 start_timestamprecord_sequence唯一标识 子分区记录。
child_partitions ARRAY<STRUCT<
token STRING,
parent_partition_tokens
ARRAY<STRING>
>>
返回一组子分区及其相关信息。 这包括用于标识子节点的分区令牌字符串 分块及其父级的词元 分区。

PostgreSQL

字段 类型 说明
start_timestamp STRING 从子级返回的数据更改记录 此子分区记录中的分区具有提交时间戳 大于或等于 start_timestamp。在查询子级时 则查询应指定子分区令牌和 start_timestamp大于或等于 child_partitions_token.start_timestamp。所有子分区 每个分区返回的记录都具有相同的 start_timestamp 和时间戳始终介于 查询的指定 start_timestampend_timestamp
record_sequence STRING 单调递增序列 该数字可用于定义 有多个子分区记录时, 在 特定分区。分区令牌 start_timestamprecord_sequence唯一标识 子分区记录。
child_partitions
[
  {
    "token": <STRING>,
    "parent_partition_tokens": [<STRING>],
  }, [...]
]
返回子分区及其相关信息的数组。 这包括用于标识子节点的分区令牌字符串 分块及其父级的词元 分区。

以下是子分区记录的示例:

child_partitions_record: {
  "start_timestamp": "2022-09-27T12:40:00.562986Z",
  "record_sequence": "00000001",
  "child_partitions": [
    {
      "token": "child_token_1",
      // To make sure changes for a key is processed in timestamp
      // order, wait until the records returned from all parents
      // have been processed.
      "parent_partition_tokens": ["parent_token_1", "parent_token_2"]
    }
  ],
}

变更数据流查询工作流

使用 ExecuteStreamingSql API,一次性 只读 交易和 强时间戳边界。变化 使用数据流读取函数,您可以指定 start_timestampend_timestamp:表示感兴趣的时间范围。所有变更记录 可在保留期限内访问 时间戳边界。

所有其他 TransactionOptions 无效。此外, 如果 TransactionOptions.read_only.return_read_timestamp 设置为 true, Transaction 中会返回一个特殊值 kint64max - 1 用于描述事务的消息,而不是有效读取 时间戳。应舍弃这个特殊值, 后续查询。

每个变更数据流查询可以返回任意数量的行,每行包含 数据变更记录、检测信号记录或子分区 记录。无需为请求设置截止期限。

示例:

流式查询工作流从发出第一个变更流开始 方法是将 partition_token 指定为 NULL。查询需要指定 变更数据流的读取函数、感兴趣的开始和结束时间戳,以及 检测信号间隔时间。当 end_timestampNULL 时,查询会保留 返回数据更改,直到分区结束。

GoogleSQL

SELECT ChangeRecord FROM READ_SingersNameStream (
  start_timestamp => "2022-05-01T09:00:00Z",
  end_timestamp => NULL,
  partition_token => NULL,
  heartbeat_milliseconds => 10000
);

PostgreSQL

SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:00:00Z',
  NULL,
  NULL,
  10000,
  NULL
) ;

处理此查询的数据记录,直到子分区记录完成 返回。在以下示例中,有两个子分区记录和三个分区 则查询会终止。从命令行 特定查询始终具有相同的 start_timestamp

child_partitions_record: {
  "record_type": "child_partitions",
  "start_timestamp": "2022-05-01T09:00:01Z",
  "record_sequence": 1000012389,
  "child_partitions": [
    {
      "token": "child_token_1",
      // Note parent tokens are null for child partitions returned
        // from the initial change stream queries.
      "parent_partition_tokens": [NULL]
    }
    {
      "token": "child_token_2",
      "parent_partition_tokens": [NULL]
    }
  ],
}
child partitions record: {
  "record_type": "child_partitions",
  "start_timestamp": "2022-05-01T09:00:01Z",
  "record_sequence": 1000012390,
  "child_partitions": [
    {
      "token": "child_token_3",
      "parent_partition_tokens": [NULL]
    }
  ],
}

如需在 2022-05-01T09:00:01Z 之后处理后续更改,请创建三个新的 并行运行查询。这三个查询共同返回 future 其父项涵盖的键范围的数据更改。请务必将 将 start_timestamp 映射到同一子分区记录中的 start_timestamp;以及 使用相同的 end_timestamp 和检测信号间隔来处理记录 在所有查询中保持一致。

GoogleSQL

SELECT ChangeRecord FROM READ_SingersNameStream (
  start_timestamp => "2022-05-01T09:00:01Z",
  end_timestamp => NULL,
  partition_token => "child_token_1",
  heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
  start_timestamp => "2022-05-01T09:00:01Z",
  end_timestamp => NULL,
  partition_token => "child_token_2",
  heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
  start_timestamp => "2022-05-01T09:00:01Z",
  end_timestamp => NULL,
  partition_token => "child_token_3",
  heartbeat_milliseconds => 10000
);

PostgreSQL

SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:00:01Z',
  NULL,
  'child_token_1',
  10000,
  NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:00:01Z',
  NULL,
  'child_token_2',
  10000,
  NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:00:01Z',
  NULL,
  'child_token_3',
  10000,
  NULL
);

一段时间后,对 child_token_2 的查询会在返回另一个查询完成后完成 子分区记录,此记录表示将创建一个新分区 涵盖 child_token_2child_token_3 的未来变更,此变更从以下时间开始: 2022-05-01T09:30:15Z。以下查询将返回完全相同的记录: child_token_3,因为两者都是新的 child_token_4 的父分区。 为了保证对特定键的数据记录进行严格有序的处理, 只有在所有父级均完成后,对 child_token_4 的查询才能开始; 在本例中为 child_token_2child_token_3。仅创建一个查询 对于每个子分区令牌,查询工作流设计应指定一个 父项等待,并在 child_token_4 上安排查询。

child partitions record: {
  "record_type": "child_partitions",
  "start_timestamp": "2022-05-01T09:30:15Z",
  "record_sequence": 1000012389,
  "child_partitions": [
    {
      "token": "child_token_4",
      "parent_partition_tokens": [child_token_2, child_token_3],
    }
  ],
}

GoogleSQL

SELECT ChangeRecord FROM READ_SingersNameStream(
  start_timestamp => "2022-05-01T09:30:15Z",
  end_timestamp => NULL,
  partition_token => "child_token_4",
  heartbeat_milliseconds => 10000
);

PostgreSQL

SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:30:15Z',
  NULL,
  'child_token_4',
  10000,
  NULL
);

查找在 Apache Beam SpannerIO 中处理和解析变更数据流记录的示例 Dataflow 连接器已开启 GitHub