更改数据流分区、记录和查询

使用集合让一切井井有条 根据您的偏好保存内容并对其进行分类。

本页面详细介绍了变更流的以下属性:

  • 其基于拆分的分区模型
  • 变更流记录的格式和内容
  • 用于查询这些记录的低级语法
  • 查询工作流示例

本页面上的信息与使用 Spanner API 直接查询更改流最为相关。相反,使用 Dataflow 读取变更流数据的应用无需直接与本文所述的数据模型配合使用。

如需了解更宽泛的数据流更改入门指南,请参阅更改数据流概览

更改流分区

当更改流所监控的表发生更改时,Cloud Spanner 会在数据库中写入相应的更改流记录,并在与数据更改相同的事务中同步记录。这保证了如果事务成功,Spanner 也已成功捕获并保留更改。在内部,Spanner 会协同定位更改流记录和数据更改,以便由同一服务器处理,从而最大限度地减少写入开销。

作为特定 DML 的一部分,Spanner 将事务附加到同一事务中的相应更改流数据拆分。由于这种共置,变更流不会跨服务资源增加额外的协调,从而最大限度地减少事务提交开销。

图片

Spanner 会根据数据库负载和大小动态拆分和合并数据,并将资源拆分到各个服务资源中,从而实现扩缩。

为了支持变更流写入和读取扩缩,Spanner 将内部变更流存储与数据库数据拆分并合并,从而自动避免产生热点。为了支持在数据库写入规模时近乎实时地读取更改流记录,Spanner API 旨在使用更改流分区同时查询更改流。更改流分区映射以更改包含更改流记录的流数据拆分。变更流的分区会随时间动态变化,并且与 Spanner 动态拆分和合并数据库数据的方式相关。

变更流分区包含特定时间范围内不可变键范围的记录。任何变更流分区都可以拆分为一个或多个变更流分区,也可以与其他变更流分区合并。当发生这些拆分或合并事件时,系统会创建子分区,以捕获下一个不可变键范围下一个时间范围的更改。除了数据更改记录之外,变更流查询还会返回子分区记录,以将需要查询的新变更流分区告知读取器,以及提供检测信号以指示最近没有写入时向前推进。

查询特定更改流分区时,系统会按提交时间戳顺序返回更改记录。每个更改记录只返回一次。在变更流分区中,不能保证变更记录的排序。特定主键的变更记录仅在特定时间范围内的一个分区上返回。

由于父子分区沿袭,为了按提交时间戳顺序处理特定键的更改,应仅在处理完所有父分区中的记录后才处理子分区返回的记录。

更改数据流查询语法

使用 ExecuteStreamingSql API 查询更改流。系统会随变更流自动创建特殊的表值函数 (TVF)。提供对变更流记录的访问权限。TVF 的命名惯例为 READ_change_stream_name

假设数据库中已存在变更流 SingersNameStream,则查询语法如下所示:

SELECT ChangeRecord
    FROM READ_SingersNameStream (
        start_timestamp,
        end_timestamp,
        partition_token,
        heartbeat_milliseconds
    )

该函数接受以下参数:

参数名称 类型 是否必需? 说明
start_timestamp TIMESTAMP 需要 指定应返回 commit_timestamp 大于或等于 start_timestamp 的记录。该值必须在变更流的保留期限内,且应小于或等于当前时间,并且大于或等于变更流的创建时间戳。
end_timestamp TIMESTAMP 可选(默认值:NULL 指定应返回 commit_timestamp 小于或等于 end_timestamp 的记录。该值必须在变更流保留期限内,且大于或等于 start_timestamp。查询会在返回 end_timestamp 之前的所有 ChangeRecords 或返回一组子分区记录后完成。如果未指定 NULL,则查询会执行直到当前分区完成,并返回所有已设置 child_partition_record 字段的 ChangeRecords。为 end_timestamp 指定 NULL 表示始终读取最新的更改。
partition_token STRING 可选(默认值:NULL 根据子分区记录的内容指定要查询的变更流分区。如果未指定 NULL,则表示读取器首次查询变更流,并且尚未获取任何特定分区令牌进行查询。
heartbeat_milliseconds INT64 需要 确定在此分区中未提交任何事务时返回检测信号 ChangeRecord 的频率。该值必须介于 1000(1 秒)和 300000(5 分钟)之间。

我们建议您构建一个便捷方法,用于构建 TVF 查询的文本并将其绑定到参数,如以下示例所示。

Java

private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE =
        "SELECT ChangeRecord FROM READ_SingersNameStream"
            + "("
            + "   start_timestamp => @startTimestamp,"
            + "   end_timestamp => @endTimestamp,"
            + "   partition_token => @partitionToken,"
            + "   heartbeat_milliseconds => @heartbeatMillis"
            + ")";

// Helper method to conveniently create change stream query texts and bind parameters.
public static Statement getChangeStreamQuery(
      String partitionToken,
      Timestamp startTimestamp,
      Timestamp endTimestamp,
      long heartbeatMillis) {
    return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE)
                    .bind("startTimestamp")
                    .to(startTimestamp)
                    .bind("endTimestamp")
                    .to(endTimestamp)
                    .bind("partitionToken")
                    .to(partitionToken)
                    .bind("heartbeatMillis")
                    .to(heartbeatMillis)
                    .build();
}

更改直播记录格式

变更流 TVF 会返回一个类型为 ARRAY<STRUCT<...>> 的 ChangeRecord 列。在每一行中,此数组始终只包含一个元素。

数组元素具有以下类型:

STRUCT <
  data_change_record ARRAY<STRUCT<...>>,
  heartbeat_record ARRAY<STRUCT<...>>,
  child_partitions_record ARRAY<STRUCT<...>>
>

此结构体中有三个字段:data_change_recordheartbeat_recordchild_partitions_record,它们的类型均为 ARRAY<STRUCT<...>>。在变更流 TVF 返回的任何行中,只有一个字段包含值;另外两个字段为空或 NULL。这些数组字段最多只能包含一个元素。

以下部分将分别介绍这三种记录类型。

数据更改记录

数据更改记录包含对表的同一项修改中同一提交时间戳处的同一修改时间戳(插入、更新或删除)处的一组更改。可以针对多个更改流分区中的同一事务返回多个数据更改记录。

所有数据更改记录都包含 commit_timestampserver_transaction_idrecord_sequence 字段,它们共同决定数据流记录在更改流中的顺序。这三个字段足以推导出更改的顺序并提供外部一致性。

请注意,如果多个事务触摸非重叠的数据,则它们可以具有相同的提交时间戳。通过 server_transaction_id 字段,您可以区分同一事务中可能发出了哪组更改(可能会跨变更流分区发出)。将其与 record_sequencenumber_of_records_in_transaction 字段配对可让您缓冲来自特定事务的所有记录并对其进行排序。

数据更改记录的字段包括以下字段:

字段 类型 说明
commit_timestamp TIMESTAMP 表示提交更改的时间戳。
record_sequence STRING 事务中记录的序列号。序列号保证在事务内是唯一的单调递增(但不一定是连续的)。按“record_Sequence”对同一“server_transaction_id”的记录进行排序,以在事务内重构更改的顺序。
server_transaction_id STRING 表示提交更改的事务的全局唯一字符串。此值应仅在处理更改流记录的上下文中使用,并且与 Spanner 的事务 ID(例如 `TransactionSelector.id`)无关。当与同一上下文中的其他值(即更改流 `data_change_records` 或 Spanner API)相比,这两者都可以唯一标识事务。
is_last_record_in_transaction_in_partition BOOL 指示这是否为当前分区中事务的最后一条记录。
table_name STRING 受更改影响的表的名称。
value_capture_type STRING

说明捕获此变更时在变更流配置中指定的值捕获类型。

目前始终为 "OLD_AND_NEW_VALUES"

column_types ARRAY<STRUCT<
name STRING,
 type JSON,
 is_primary_key BOOL,
 ordinal_position INT64
>>
列名称、列类型、是否是主键,以及架构中定义的列的位置 (`ordinal_position`)。架构中的第一列将有序号位置 `1`。列类型可以嵌套在数组列中。该格式与 Spanner API 参考文档中所述的类型结构相匹配。
mods ARRAY<STRUCT<
keys JSON,
 new_values JSON,
 old_values JSON
>>
说明进行了哪些更改,包括主键值,以及变更流配置了“value_capture_type”时已更改的列的旧值和新值。new_valuesold_values 字段仅包含非键列。JSON 对象的成员按字典顺序排序。
mod_type STRING 描述更改类型。INSERTUPDATEDELETE 中的一个。
number_of_records_in_transaction INT64 所有更改流分区中属于此事务的数据更改记录数。
number_of_partitions_in_transaction INT64 将返回此事务的数据更改记录的分区数。
transaction_tag STRING 与此交易关联的交易代码
is_system_transaction BOOL 指明相应事务是否为系统事务。

以下是一对示例数据更改记录。它们用于描述两个帐号之间存在转移的单个事务。请注意,这两个帐号位于不同的更改流分区中。

data_change_record: {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  // record_sequence is unique and monotonically increasing within a
  // transaction, across all partitions.
  "record_sequence": "00000000",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
       "name": "Balance",
       "type": {"code": "INT"},
       "is_primary_key": false,
       "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id1"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 1000
      },
         "old_values": {
        "LastUpdate": "2022-09-26T11:28:00.189413Z",
        "Balance": 1500
      },
    }
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "OLD_AND_NEW_VALUES",
  "number_of_records_in_transaction": 2,
  "number_of_partitions_in_transaction": 2,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false,
}
data_change_record: {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  "record_sequence": "00000001",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
      "name": "Balance",
      "type": {"code": "INT"},
      "is_primary_key": false,
      "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id2"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 2000
      },
      "old_values": {
        "LastUpdate": "2022-01-20T11:25:00.199915Z",
        "Balance": 1500
      },
    },
    ...
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "OLD_AND_NEW_VALUES",
  "number_of_records_in_transaction": 2,
  "number_of_partitions_in_transaction": 2,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false,
}

检测信号记录

如果返回检测信号记录,则表示已返回 commit_timestamp 小于或等于检测信号记录 timestamp 的所有更改,并且此分区中未来数据记录的提交时间戳必须高于检测信号记录返回的提交时间戳。当没有任何数据写入分区时,系统会返回检测信号记录。当有数据写入分区时,可以使用 data_change_record.commit_timestamp 代替 heartbeat_record.timestamp,以告知读取器正在读取分区。

您可以使用针对各个分区返回的检测信号记录,跨所有分区同步读取器。一旦所有读取者都收到大于或等于某个时间戳 A 的检测信号,或者收到不小于时间戳 A 的数据或子分区记录,这些读取器就知道自己已收到在该时间戳 A 当天或之前提交的所有记录,并且可以开始处理已缓冲记录 - 例如,按时间戳对跨分区记录进行排序,然后按 server_transaction_id 对它们进行分组。

检测信号记录仅包含一个字段:

字段 类型 说明
timestamp TIMESTAMP 检测信号记录时间戳。

检测信号记录示例,说明已返回不超过此时间戳的时间戳的所有记录:

heartbeat_record: {
  "timestamp": "2022-09-27T12:35:00.312486Z"
}

子分区记录

子分区记录会返回有关子分区的信息:子分区的分区令牌、其子分区的令牌,以及代表子分区包含变更记录的最早时间戳的 start_timestamp。系统会在当前分区中返回提交时间戳早于 child_partitions_record.start_timestamp 的记录。返回此分区的所有子分区记录后,此查询将返回成功状态,表示已返回此分区的所有记录。

子分区记录的字段包括以下内容:

字段 类型 说明
start_timestamp TIMESTAMP 从此子分区记录中的子分区返回的数据更改记录的提交时间戳将大于或等于 start_timestamp。查询子分区时,查询应指定子分区令牌以及大于或等于 child_partitions_token.start_timestampstart_timestamp。分区返回的所有子分区记录都将具有相同的 start_timestamp,且时间戳始终介于查询的指定 start_timestampend_timestamp 之间。
record_sequence STRING 当特定子分区中返回了具有相同 start_timestamp 的多个子分区记录时,该编号可用于定义子分区记录的顺序。分区令牌 start_timestamprecord_sequence 可唯一标识子分区记录。
child_partitions ARRAY<STRUCT<
token STRING,
parent_partition_tokens
ARRAY<STRING>
>>
返回一组子分区及其关联信息。 这包括用于标识查询中的子分区的分区令牌字符串,以及其父分区的令牌。

子分区记录示例:

child_partitions_record: {
  "start_timestamp": "2022-09-27T12:40:00.562986Z",
  "record_sequence": "00000001",
  "child_partitions": [
    {
      "token": "child_token_1",
      // To make sure changes for a key is processed in timestamp
      // order, wait until the records returned from all parents
      // have been processed.
      "parent_partition_tokens": ["parent_token_1", "parent_token_2"],
    }
  ],
}

变更流查询工作流

使用 ExecuteStreamingSql API 运行变更流查询,具有一次性只读事务和强大的时间戳边界。变更流 TVF 允许用户为感兴趣的时间范围指定 start_timestampend_timestamp。可以使用强只读时间戳边界访问保留期限内的所有更改记录。

对于变更流查询,所有其他 TransactionOptions 均无效。此外,如果 TransactionOptions.read_only.return_read_timestamp 设置为 true,系统将在描述事务的 Transaction 消息中返回特殊值 kint64max - 1,而不是有效的读取时间戳。应舍弃此特殊值,不要将其用于任何后续查询。

每个更改流查询都可以返回任意数量的行,每行都包含数据更改记录、检测信号记录或子分区记录。您无需为请求设置截止时间。

示例:

首先,通过将 partition_token 指定为 NULL 来发出第一个更改流查询。该查询需要为变更流、相关开始和结束时间戳以及检测信号间隔指定 TVF 函数。当 end_timestampNULL 时,查询将持续返回数据更改,直到子分区诞生为止。

SELECT ChangeRecord FROM READ_SingersNameStream(
  start_timestamp => "2022-05-01 09:00:00-00",
  end_timestamp => NULL,
  partition_token => NULL,
  heartbeat_milliseconds => 10000
);

处理来自此查询的数据记录,直到返回子分区记录。在以下示例中,返回了两个子分区记录和三个分区令牌,然后查询会终止。特定查询的子分区记录将始终共用相同的 start_timestamp

child_partitions_record: {
  "record_type": "child_partitions",
  "start_timestamp": "2022-05-01 09:00:01-00",
  "record_sequence": 1000012389,
  "child_partitions": [
    {
      "token": "child_token_1",
      // Note parent tokens are null for child partitions returned
      // from the initial change stream queries.
      "parent_partition_tokens": [NULL],
    }
    {
      "token": "child_token_2",
      "parent_partition_tokens": [NULL],
    }
  ],
}
child partitions record: {
  "record_type": "child_partitions",
  "start_timestamp": "2022-05-01 09:00:01-00",
  "record_sequence": 1000012390,
  "child_partitions": [
    {
      "token": "child_token_3",
      "parent_partition_tokens": [NULL],
    }
  ],
}

如需在 2022-05-01 09:00:01-00 后处理将来的更改,请创建三个新查询并并行运行这些查询。这三个查询会同时返回其父级所覆盖的同一键范围的未来数据更改。始终在同一子分区记录中将 start_timestamp 设置为 start_timestamp,并使用相同的 end_timestamp 和检测信号间隔,以便在所有查询中一致地处理记录。

SELECT ChangeRecord FROM READ_SingersNameStream(
  start_timestamp => "2022-05-01 09:00:01-00",
  end_timestamp => NULL,
  partition_token => "child_token_1",
  heartbeat_milliseconds => 10000);
SELECT ChangeRecord FROM READ_SingersNameStream(
  start_timestamp => "2022-05-01 09:00:01-00",
  end_timestamp => NULL,
  partition_token => "child_token_2",
  heartbeat_milliseconds => 10000);
SELECT ChangeRecord FROM READ_SingersNameStream(
  start_timestamp => "2022-05-01 09:00:01-00",
  end_timestamp => NULL,
  partition_token => "child_token_3",
  heartbeat_milliseconds => 10000);

一段时间后,在返回另一个子分区记录后,对 child_token_2 的查询完成,此记录表示,新分区将从 2022-05-01 09:30:15-00 开始涵盖 child_token_2child_token_3 的未来更改。对 child_token_3 的查询将返回完全相同的记录,因为这两者都是新 child_token_4 的父分区。为了确保对特定键的数据记录进行严格排序,child_token_4 中的查询只能在所有父级都完成后才能开始,在本例中为 child_token_2child_token_3。只能为每个子分区令牌创建一个查询,查询工作流设计应指定 1 个父级来等待并在 child_token_4 上安排该查询。

child partitions record: {
  "record_type": "child_partitions",
  "start_timestamp": "2022-05-01 09:30:15-00",
  "record_sequence": 1000012389,
  "child_partitions": [
    {
      "token": "child_token_4",
      "parent_partition_tokens": [child_token_2, child_token_3],
    }
  ],
}
SELECT ChangeRecord FROM READ_SingersNameStream(
  start_timestamp => "2022-05-01 09:30:15-00",
  end_timestamp => NULL,
  partition_token => "child_token_4",
  heartbeat_milliseconds => 10000
);

GitHub 上查找 Apache Beam SpannerIO Dataflow 连接器处理和解析更改流记录的示例。