变更数据流分区、记录和查询

本页详细介绍了更改流的以下属性:

  • 其基于分块的分区模型
  • 变更数据流记录的格式和内容
  • 用于查询这些记录的低级语法
  • 查询工作流程示例

此页面上的信息最适合使用 Spanner API 直接查询变更数据流。 改为使用 Dataflow 读取变更数据流的应用 数据不需要直接与数据模型搭配使用 此处所述。

如需查看更全面的变更数据流入门指南,请参阅变更数据流 概览

变更数据流分区

当变更数据流监控的表发生更改时,Spanner 会在数据更改的同一事务中同步将相应的变更数据流记录写入数据库。这个 可以保证如果事务成功,Spanner 也会 成功捕获并保留更改在内部 Spanner 共置变更数据流记录和数据更改 以使它们由同一服务器处理,从而最大限度地减少写入开销。

在对特定分块执行 DML 操作时,Spanner 会在同一事务中将写入附加到相应的更改流数据分块。由于存在这种对接网点,因此将 视频流不会在服务资源之间增加额外的协调, 可最大限度地减少事务提交开销。

图片

Spanner 通过根据数据库负载和大小动态拆分和合并数据,并将分块分布到服务资源来进行扩缩。

为了支持扩展变更数据流写入和读取,Spanner 会拆分和合并内部变更数据流存储空间以及数据库数据,从而自动避免热点。为了支持随着数据库写入规模扩大而近乎实时地读取变更数据流记录,Spanner API 专为使用变更数据流分区并发查询变更数据流而设计。变更数据流分区会映射到包含变更数据流记录的变更数据流数据分块。变更数据流的分区发生变化 并且与 Spanner 动态拆分和合并数据库数据。

变更数据流分区包含特定时间范围内不可变键范围的记录。任何变更数据流分区都可以拆分为一个或多个变更数据流分区,也可以与其他变更数据流分区合并。当这些 当发生分屏或合并事件时,会创建子分区来捕获更改 下一个时间范围的不可变键范围。除了数据更改记录之外,更改流查询还会返回子分区记录,以通知读取器需要查询的新更改流分区,以及心跳记录,以指示最近没有发生任何写入时向前推进的进度。

查询特定更改流分区时,系统会按提交时间戳顺序返回更改记录。系统会恰好返回每个更改记录一次。跨变更数据流分区,无法保证更改的顺序 记录。特定主键的更改记录仅在一个主键上返回 特定时间范围的分区中

由于存在父子分区沿袭,为了处理 提交时间戳顺序中的特定键,从子节点返回的记录 只有在所有父级中的记录之后,才应处理分区 分区都已完成。

变更数据流读取函数和查询语法

GoogleSQL

您可以使用 ExecuteStreamingSql API。Spanner 会自动创建一个特殊的读取函数以及一个变更数据流。读取函数可提供对更改流记录的访问权限。读取函数命名惯例为 READ_change_stream_name

假设数据库中已存在变更数据流 SingersNameStream, 查询语法如下:

SELECT ChangeRecord
FROM READ_SingersNameStream (
    start_timestamp,
    end_timestamp,
    partition_token,
    heartbeat_milliseconds,
    read_options
)

read 函数接受以下参数:

参数名称 类型 是否必需? 说明
start_timestamp TIMESTAMP 必填 指定 commit_timestamp 大于或等于 start_timestamp 的记录 。该值必须位于变更数据流中 保留期限,并且应小于或等于当前时间 且大于或等于变更数据流创建的时间戳。
end_timestamp TIMESTAMP 可选(默认值:NULL 指定少 commit_timestamp 的记录 大于或等于 end_timestamp 返回。值必须在变更数据流保留期限内 且大于或等于 start_timestamp。查询 在返回最多 end_timestamp 的所有 ChangeRecord 后完成 或用户终止连接。是否为 NULL 则查询会一直执行,直到返回所有 ChangeRecords 或 用户终止连接。
partition_token STRING 可选(默认值:NULL 根据子分区记录的内容,指定要查询的变更数据流分区。如果为 NULL 或未指定,则表示 读取者是第一次查询变更数据流,并且 未获取任何要查询的特定分区令牌。
heartbeat_milliseconds INT64 必填 确定在没有在此分区中提交任何事务的情况下,返回心跳 ChangeRecord 的频率。 该值必须介于 1,000(1 秒)到 300,000(5 分钟)之间。
read_options ARRAY 可选(默认值:NULL 预留的其他读取选项,供日后使用。目前唯一允许的值是 NULL

我们建议您采用一种便捷的方法来构建 读取函数查询并将参数与其绑定,如下所示 示例。

Java

private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE =
        "SELECT ChangeRecord FROM READ_SingersNameStream"
            + "("
            + "   start_timestamp => @startTimestamp,"
            + "   end_timestamp => @endTimestamp,"
            + "   partition_token => @partitionToken,"
            + "   heartbeat_milliseconds => @heartbeatMillis"
            + ")";

// Helper method to conveniently create change stream query texts and bind parameters.
public static Statement getChangeStreamQuery(
      String partitionToken,
      Timestamp startTimestamp,
      Timestamp endTimestamp,
      long heartbeatMillis) {
  return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE)
                    .bind("startTimestamp")
                    .to(startTimestamp)
                    .bind("endTimestamp")
                    .to(endTimestamp)
                    .bind("partitionToken")
                    .to(partitionToken)
                    .bind("heartbeatMillis")
                    .to(heartbeatMillis)
                    .build();
}

PostgreSQL

您可以使用 ExecuteStreamingSql API。 Spanner 会自动创建一个特殊的读取函数, 更改数据流读取函数提供对更改的访问权限 来记录视频流的记录读取函数命名惯例为 spanner.read_json_change_stream_name

假设数据库中存在更改流 SingersNameStream,则 PostgreSQL 的查询语法如下所示:

SELECT *
FROM "spanner"."read_json_SingersNameStream" (
    start_timestamp,
    end_timestamp,
    partition_token,
    heartbeat_milliseconds,
    null
)

read 函数接受以下参数:

参数名称 类型 是否必需? 说明
start_timestamp timestamp with time zone 必填 指定应返回 commit_timestamp 大于或等于 start_timestamp 的更改记录。该值必须位于变更数据流中 保留期限,并且应小于或等于当前时间 且大于或等于变更数据流创建的时间戳。
end_timestamp timestamp with timezone 可选(默认值:NULL 指定应返回 commit_timestamp 小于或等于 end_timestamp 的更改记录。此值必须在更改流保留期限内,并且大于或等于 start_timestamp。查询会在返回完所有更改记录后,直到 end_timestamp 或用户终止连接。 如果为 NULL,则查询会一直执行,直到返回所有更改记录或用户终止连接。
partition_token text 可选(默认值:NULL 根据子分区记录的内容指定要查询的变更数据流分区。如果为 NULL 或未指定,则表示 读取者是第一次查询变更数据流,并且 未获取任何要查询的特定分区令牌。
heartbeat_milliseconds bigint 必填 确定返回检测信号 ChangeRecord 的频率 如果此分区中未提交事务。 该值必须介于 1,000(一秒)到 300,000(五)之间 分钟)。
null null 必填 预留以供日后使用

我们建议您采用一种便捷的方法来构建 读取函数并将参数与其绑定,如下所示 示例。

Java

private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE =
        "SELECT * FROM \"spanner\".\"read_json_SingersNameStream\""
            + "($1, $2, $3, $4, null)";

// Helper method to conveniently create change stream query texts and bind parameters.
public static Statement getChangeStreamQuery(
      String partitionToken,
      Timestamp startTimestamp,
      Timestamp endTimestamp,
      long heartbeatMillis) {

  return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE)
                    .bind("p1")
                    .to(startTimestamp)
                    .bind("p2")
                    .to(endTimestamp)
                    .bind("p3")
                    .to(partitionToken)
                    .bind("p4")
                    .to(heartbeatMillis)
                    .build();
}

变更数据流记录格式

GoogleSQL

变更数据流读取函数会返回一个类型为 ARRAY<STRUCT<...>>ChangeRecord 列。在每一行中,此数组始终包含一个元素。

数组元素具有以下类型:

STRUCT <
  data_change_record ARRAY<STRUCT<...>>,
  heartbeat_record ARRAY<STRUCT<...>>,
  child_partitions_record ARRAY<STRUCT<...>>
>

该结构体中有三个字段:data_change_recordheartbeat_recordchild_partitions_record(每种类型) ARRAY<STRUCT<...>>。在更改流读取函数返回的任何行中,这三个字段中只有一个包含值;另外两个字段为空或 NULL。这些数组字段最多包含一个元素。

以下各部分将分别介绍这三种记录类型。

PostgreSQL

更改流读取函数会返回一个类型为 JSON 的单个 ChangeRecord 列,其结构如下所示:

{
  "data_change_record" : {},
  "heartbeat_record" : {},
  "child_partitions_record" : {}
}

此对象中有三个可能的键:data_change_recordheartbeat_recordchild_partitions_record,对应的值类型为 JSON。在变更数据流读取函数返回的任何行中,只有这三个键之一存在。

以下各部分将分别介绍这三种记录类型。

数据变更记录

数据更改记录包含对同一表的一组更改,这些更改具有相同的修改类型(插入、更新或删除),在同一事务的同一更改流分区中以相同的提交时间戳提交。系统可能会针对多个更改流分区中的同一事务返回多个数据更改记录。

所有数据更改记录都包含 commit_timestampserver_transaction_idrecord_sequence 字段,这些字段共同决定了数据流记录在变更数据流中的顺序。这三个字段足以派生更改的顺序并提供外部一致性。

请注意,如果多笔事务涉及的数据不重叠,则它们可以具有相同的提交时间戳。server_transaction_id 字段 能够区分哪一组更改(可能会 跨变更数据流分区)是在同一个 交易。将其与 record_sequencenumber_of_records_in_transaction 字段搭配使用,您还可以缓冲和排序特定事务中的所有记录。

数据变更记录中的字段包括:

GoogleSQL

字段 类型 说明
commit_timestamp TIMESTAMP 提交更改的时间戳。
record_sequence STRING 事务中记录的序列号。序列号保证 在一次交易中是唯一且单调递增的(但不一定是连续的)。按 record_sequence 对同一 server_transaction_id 的记录进行排序,以重建事务中更改的顺序。Spanner 可能会优化此排序以获得更好的性能,但不一定始终与用户提供的原始排序一致。
server_transaction_id STRING 一个全局唯一字符串,表示以下时间段内的交易: 更改已提交此值应仅在处理变更数据流记录的上下文中使用,并且与 Spanner API 中的事务 ID 无关。
is_last_record_in_transaction_in_partition BOOL 指示该记录是否为当前分区中事务的最后一条记录。
table_name STRING 受更改影响的表的名称。
value_capture_type STRING

说明在 捕获此变更时的变更数据流配置。

值捕获类型可以是 "OLD_AND_NEW_VALUES""NEW_ROW""NEW_VALUES""NEW_ROW_AND_OLD_VALUES"。 默认情况下,它为 "OLD_AND_NEW_VALUES"。如需了解详情,请参阅值捕获类型

column_types ARRAY<STRUCT<
name STRING,
 type JSON,
 is_primary_key BOOL,
 ordinal_position INT64
>>
列的名称、列类型 它是否为主键,以及列的位置为 。表格的第一列 序数位置为“1”。列类型 可以嵌套数组列。格式与类型结构相符 Spanner API 参考文档中介绍。
mods ARRAY<STRUCT<
keys JSON,
 new_values JSON,
 old_values JSON
>>
说明所做的更改,包括主键 值、旧值以及已更改或跟踪的列的新值。 旧值和新值的可用性和内容取决于配置的 value_capture_type。new_valuesold_values 字段仅包含非键列。
mod_type STRING 描述更改类型。INSERTUPDATEDELETE
number_of_records_in_transaction INT64 所属数据更改记录的数量 事务。
number_of_partitions_in_transaction INT64 将返回数据变更记录的分区数量 。
transaction_tag STRING 与此交易关联的交易代码
is_system_transaction BOOL 指示事务是否为系统事务。

PostgreSQL

字段 类型 说明
commit_timestamp STRING 更改提交的时间戳。
record_sequence STRING 事务中记录的序列号。序列号保证 在一次交易中是唯一且单调递增的(但不一定是连续的)。按“record_sequence”对具有相同“server_transaction_id”的记录进行排序,以重建事务中更改的顺序。
server_transaction_id STRING 一个全局唯一字符串,表示以下时间段内的交易: 更改已提交该值只能 在处理变更数据流记录时使用,并非 与 Spanner API 中的事务 ID 相关联
is_last_record_in_transaction_in_partition BOOLEAN 指示这是当前分区中该事务的最后一个记录。
table_name STRING 受更改影响的表的名称。
value_capture_type STRING

说明在 捕获此变更时的变更数据流配置。

值捕获类型可以是 "OLD_AND_NEW_VALUES""NEW_ROW""NEW_VALUES""NEW_ROW_AND_OLD_VALUES"。 默认为 "OLD_AND_NEW_VALUES"。如需更多信息 请参阅值捕获类型

column_types
[
  {
      "name": <STRING>,
      "type": {
        "code": <STRING>
      },
      "is_primary_key": <BOOLEAN>,
      "ordinal_position": <NUMBER>
    },
    ...
]
列的名称、列类型、是否为主键,以及架构中定义的列位置(“ordinal_position”)。架构中表的第一列的序数位置为“1”。对于数组列,列类型可以是嵌套的。格式与类型结构相符 Spanner API 参考文档中介绍。
mods
[
  {
    "keys": {<STRING> : <STRING>},
    "new_values": {
      <STRING> : <VALUE-TYPE>,
      [...]
    },
    "old_values": {
      <STRING> : <VALUE-TYPE>,
      [...]
    },
  },
  [...]
]
描述所做的更改,包括更改或跟踪列的主键值、旧值和新值。旧值和新值的可用性和内容将取决于配置的 value_capture_type。new_valuesold_values 字段仅包含非键列。
mod_type STRING 描述更改类型。INSERTUPDATEDELETE 之一。
number_of_records_in_transaction INT64 所属数据更改记录的数量 事务。
number_of_partitions_in_transaction NUMBER 将返回此事务的数据更改记录的分区数量。
transaction_tag STRING 与此交易关联的交易代码
is_system_transaction BOOLEAN 指示事务是否为系统事务。

下面是一对数据更改记录示例。它们用于描述在两个账号之间进行转账的单笔交易。请注意,这两个账号位于不同的更改流分区中。

"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  // record_sequence is unique and monotonically increasing within a
  // transaction, across all partitions.
  "record_sequence": "00000000",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
       "name": "Balance",
       "type": {"code": "INT"},
       "is_primary_key": false,
       "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id1"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 1000
      },
      "old_values": {
        "LastUpdate": "2022-09-26T11:28:00.189413Z",
        "Balance": 1500
      },
    }
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "OLD_AND_NEW_VALUES",
  "number_of_records_in_transaction": 2,
  "number_of_partitions_in_transaction": 2,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false,
}
"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  "record_sequence": "00000001",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
      "name": "Balance",
      "type": {"code": "INT"},
      "is_primary_key": false,
      "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id2"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 2000
      },
      "old_values": {
        "LastUpdate": "2022-01-20T11:25:00.199915Z",
        "Balance": 1500
      },
    },
    ...
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "OLD_AND_NEW_VALUES",
  "number_of_records_in_transaction": 2,
  "number_of_partitions_in_transaction": 2,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false,
}

以下数据更改记录是值捕获类型为 "NEW_VALUES" 的记录示例。请注意,系统只会填充新值。 只有 "LastUpdate" 列被修改,因此系统仅返回该列。

"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  // record_sequence is unique and monotonically increasing within a
  // transaction, across all partitions.
  "record_sequence": "00000000",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,
  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id1"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z"
      },
      "old_values": {}
    }
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "NEW_VALUES",
  "number_of_records_in_transaction": 1,
  "number_of_partitions_in_transaction": 1,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false
}

以下数据变更记录是包含值 捕获类型 "NEW_ROW"。只有 "LastUpdate" 列已修改,但返回了所有跟踪的列。

"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  // record_sequence is unique and monotonically increasing within a
  // transaction, across all partitions.
  "record_sequence": "00000000",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
       "name": "Balance",
       "type": {"code": "INT"},
       "is_primary_key": false,
       "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id1"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 1000
      },
      "old_values": {}
    }
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "NEW_ROW",
  "number_of_records_in_transaction": 1,
  "number_of_partitions_in_transaction": 1,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false
}

以下数据更改记录是值捕获类型为 "NEW_ROW_AND_OLD_VALUES" 的记录示例。只有 "LastUpdate" 列被修改,但系统会返回所有被跟踪的列。此值捕获 类型会捕获 LastUpdate 的新值和旧值。

"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  // record_sequence is unique and monotonically increasing within a
  // transaction, across all partitions.
  "record_sequence": "00000000",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
       "name": "Balance",
       "type": {"code": "INT"},
       "is_primary_key": false,
       "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id1"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 1000
      },
      "old_values": {
        "LastUpdate": "2022-09-26T11:28:00.189413Z"
      }
    }
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "NEW_ROW_AND_OLD_VALUES",
  "number_of_records_in_transaction": 1,
  "number_of_partitions_in_transaction": 1,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false
}

心跳记录

如果返回检测信号记录,则表示所有更改 commit_timestamp 小于或等于检测信号记录的 已返回 timestamp,此 分区的提交时间戳必须高于 检测信号记录。没有数据时,将返回检测信号记录 写入分区的更改当有数据更改写入分区时,可以使用 data_change_record.commit_timestamp 代替 heartbeat_record.timestamp,以表明读取器正在读取分区并取得进展。

您可以使用分区上返回的检测信号记录来进行同步 读取器。所有读者都收到 检测信号响应时间大于或等于某个时间戳 A,或者已收到数据或子节点 大于或等于时间戳 A 的分区记录,读者知道他们已收到 在时间戳 A 当天或之前提交的所有记录,可以开始 处理已缓冲的记录,例如,对跨分区进行排序 按时间戳排序,并按 server_transaction_id 对记录进行分组。

检测信号记录仅包含一个字段:

GoogleSQL

字段 类型 说明
timestamp TIMESTAMP 心跳记录的时间戳。

PostgreSQL

字段 类型 说明
timestamp STRING 检测信号记录的时间戳。

检测信号示例,传达带有时间戳的所有记录 时间戳小于或等于此记录的时间戳:

heartbeat_record: {
  "timestamp": "2022-09-27T12:35:00.312486Z"
}

子分区记录

子分区记录返回子分区的相关信息:子分区的分区令牌、父分区的令牌,以及 start_timestamp,表示子项执行的最早时间戳 分区包含 的变更记录。提交时间戳紧随 child_partitions_record.start_timestamp 之后的记录会在当前分区中返回。在返回所有 子分区记录,此查询将返回 成功状态,这表示相应查询已返回所有记录 。

子分区记录的字段包括:

GoogleSQL

字段 类型 说明
start_timestamp TIMESTAMP 从子级返回的数据更改记录 此子分区记录中的分区具有提交时间戳 大于或等于 start_timestamp。查询子分区时,查询应指定子分区令牌和大于或等于 child_partitions_token.start_timestampstart_timestamp。分区返回的所有子分区记录都具有相同的 start_timestamp,并且时间戳始终介于查询指定的 start_timestampend_timestamp 之间。
record_sequence STRING 单调递增序列 该数字可用于定义 有多个子分区记录时, 在 特定分区。分区令牌 start_timestamprecord_sequence 可唯一标识子分区记录。
child_partitions ARRAY<STRUCT<
token STRING,
parent_partition_tokens
ARRAY<STRING>
>>
返回一组子分区及其相关信息。 这包括用于标识子级的分区令牌字符串 分块及其父级的词元 分区。

PostgreSQL

字段 类型 说明
start_timestamp STRING 此子分区记录中从子分区返回的数据更改记录的提交时间戳大于或等于 start_timestamp。在查询子级时 则查询应指定子分区令牌和 start_timestamp大于或等于 child_partitions_token.start_timestamp。分区返回的所有子分区记录具有相同的 start_timestamp,并且时间戳始终介于查询指定的 start_timestampend_timestamp 之间。
record_sequence STRING 单调递增序列 该数字可用于定义 有多个子分区记录时, 在 特定分区。分区令牌 start_timestamprecord_sequence 可唯一标识子分区记录。
child_partitions
[
  {
    "token": <STRING>,
    "parent_partition_tokens": [<STRING>],
  }, [...]
]
返回子分区及其关联信息的数组。 这包括用于标识子级的分区令牌字符串 分块及其父级的词元 分区。

以下是子分区记录示例:

child_partitions_record: {
  "start_timestamp": "2022-09-27T12:40:00.562986Z",
  "record_sequence": "00000001",
  "child_partitions": [
    {
      "token": "child_token_1",
      // To make sure changes for a key is processed in timestamp
      // order, wait until the records returned from all parents
      // have been processed.
      "parent_partition_tokens": ["parent_token_1", "parent_token_2"]
    }
  ],
}

变更数据流查询工作流程

使用 ExecuteStreamingSql API,一次性 只读 交易和 强时间戳边界。借助更改数据流读取函数,您可以为感兴趣的时间范围指定 start_timestampend_timestamp。您可以使用强效的只读时间戳边界访问保留期限内的所有更改记录。

所有其他 TransactionOptions 无效。此外, 如果 TransactionOptions.read_only.return_read_timestamp 设置为 true, Transaction 中将返回特殊值 kint64max - 1 用于描述事务的消息,而不是有效读取 时间戳。应舍弃这个特殊值, 后续查询。

每个更改流查询都可以返回任意数量的行,每行都包含数据更改记录、心跳记录或子分区记录。无需为请求设置截止期限。

示例:

流式查询工作流从通过将 partition_token 指定为 NULL 来发出第一个更改流查询开始。查询需要指定 变更数据流的读取函数、感兴趣的开始和结束时间戳,以及 检测信号间隔时间。当 end_timestampNULL 时,查询会一直返回数据更改,直到分区结束。

GoogleSQL

SELECT ChangeRecord FROM READ_SingersNameStream (
  start_timestamp => "2022-05-01T09:00:00Z",
  end_timestamp => NULL,
  partition_token => NULL,
  heartbeat_milliseconds => 10000
);

PostgreSQL

SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:00:00Z',
  NULL,
  NULL,
  10000,
  NULL
) ;

处理此查询中的数据记录,直到返回子分区记录。在以下示例中,系统会返回两个子分区记录和三个分区令牌,然后查询会终止。特定查询中的子分区记录始终共享相同的 start_timestamp

child_partitions_record: {
  "record_type": "child_partitions",
  "start_timestamp": "2022-05-01T09:00:01Z",
  "record_sequence": 1000012389,
  "child_partitions": [
    {
      "token": "child_token_1",
      // Note parent tokens are null for child partitions returned
        // from the initial change stream queries.
      "parent_partition_tokens": [NULL]
    }
    {
      "token": "child_token_2",
      "parent_partition_tokens": [NULL]
    }
  ],
}
child partitions record: {
  "record_type": "child_partitions",
  "start_timestamp": "2022-05-01T09:00:01Z",
  "record_sequence": 1000012390,
  "child_partitions": [
    {
      "token": "child_token_3",
      "parent_partition_tokens": [NULL]
    }
  ],
}

如需处理 2022-05-01T09:00:01Z 之后的未来更改,请创建三个新查询并并行运行它们。这三个查询共同返回 future 其父级覆盖的键范围的数据更改。始终将 start_timestamp 设置为同一子分区记录中的 start_timestamp,并使用相同的 end_timestamp 和心跳间隔,以便在所有查询中一致地处理记录。

GoogleSQL

SELECT ChangeRecord FROM READ_SingersNameStream (
  start_timestamp => "2022-05-01T09:00:01Z",
  end_timestamp => NULL,
  partition_token => "child_token_1",
  heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
  start_timestamp => "2022-05-01T09:00:01Z",
  end_timestamp => NULL,
  partition_token => "child_token_2",
  heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
  start_timestamp => "2022-05-01T09:00:01Z",
  end_timestamp => NULL,
  partition_token => "child_token_3",
  heartbeat_milliseconds => 10000
);

PostgreSQL

SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:00:01Z',
  NULL,
  'child_token_1',
  10000,
  NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:00:01Z',
  NULL,
  'child_token_2',
  10000,
  NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:00:01Z',
  NULL,
  'child_token_3',
  10000,
  NULL
);

过了一段时间后,针对 child_token_2 的查询会返回另一个子分区记录后完成,此记录表明,从 2022-05-01T09:30:15Z 开始,新分区将涵盖 child_token_2child_token_3 的未来更改。以下查询将返回完全相同的记录: child_token_3,因为两者都是新的 child_token_4 的父分区。 为了保证对特定键的数据记录进行严格有序的处理,对 child_token_4 的查询必须仅在所有父级(在本例中为 child_token_2child_token_3)完成后才开始。仅创建一个查询 对于每个子分区令牌,查询工作流设计应指定一个 父项等待,并在 child_token_4 上安排查询。

child partitions record: {
  "record_type": "child_partitions",
  "start_timestamp": "2022-05-01T09:30:15Z",
  "record_sequence": 1000012389,
  "child_partitions": [
    {
      "token": "child_token_4",
      "parent_partition_tokens": [child_token_2, child_token_3],
    }
  ],
}

GoogleSQL

SELECT ChangeRecord FROM READ_SingersNameStream(
  start_timestamp => "2022-05-01T09:30:15Z",
  end_timestamp => NULL,
  partition_token => "child_token_4",
  heartbeat_milliseconds => 10000
);

PostgreSQL

SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:30:15Z',
  NULL,
  'child_token_4',
  10000,
  NULL
);

查找在 Apache Beam SpannerIO 中处理和解析变更数据流记录的示例 Dataflow 连接器已开启 GitHub