创建和管理变更数据流

本页介绍了如何创建、修改和查看 Spanner 更改流。如需详细了解变更数据流,请参阅变更数据流简介

由于更改流是架构对象,因此您可以通过用于任何其他类型的数据库定义工作(例如创建表或添加索引)的相同 DDL 驱动的架构更新来创建和管理更改流。

在您提交用于更改架构的 DDL 语句(包括用于创建、修改或删除变更流的语句)后,Spanner 会开始执行长时间运行的操作。新增或更改的变更数据流将 开始观察新配置指定的列或表 会在此长时间运行的操作完成后触发

创建变更流

如需创建更改流,您需要提供其名称以及它将监控的架构对象:整个数据库,或特定表和列的列表。您可以选择性地指定以下内容:

GoogleSQL

使用 GoogleSQL 创建变更数据流的 DDL 语法如下所示: 如下所示:

CREATE CHANGE STREAM change_stream_name
  [FOR column_or_table_watching_definition[, ... ] ]
  [
    OPTIONS (
      retention_period = timespan,
      value_capture_type = type,
      exclude_ttl_deletes = boolean,
      exclude_insert = boolean,
      exclude_update = boolean,
      exclude_delete = boolean
    )
  ]

PostgreSQL

使用 PostgreSQL 创建更改流的 DDL 语法如下所示:

CREATE CHANGE STREAM change_stream_name
  [FOR column_or_table_watching_definition[, ... ] ]
  [
    WITH (
      retention_period = timespan,
      value_capture_type = type,
      exclude_ttl_deletes = boolean,
      exclude_insert = boolean,
      exclude_update = boolean,
      exclude_delete = boolean
    )
  ]

新的变更数据流一旦开始监控其分配的架构对象, 创建该查询的长时间运行的操作完成。

以下示例展示了如何使用各种配置创建更改流。

监控整个数据库

创建变更数据流以监控执行的所有数据更改 在整个数据库表中,使用 ALL 关键字:

CREATE CHANGE STREAM EverythingStream
FOR ALL;

ALL 配置隐式 包含数据库的所有数据 立即使用这些表格和列除了常规数据表之外,不包含视图、信息架构表或其他对象。

监控特定表

如需将变更数据流的范围限制为特定表(而非整个数据库),请指定一个或多个表的列表:

CREATE CHANGE STREAM SingerAlbumStream
FOR Singers, Albums;

Spanner 会自动更新监控整个表的更改流,以反映影响这些表的任何架构更改,例如添加或删除列。

监控特定列

使用 table(column_1[, column_2, ...]) 语法观察一个或多个特定更改 您命名的表中的非键列:

CREATE CHANGE STREAM NamesAndTitles
FOR Singers(FirstName, LastName), Albums(Title);

您无法在此处指定主键列,因为每个更改流都会始终跟踪其所监控的每个表的主键。这样,每个数据更改记录都可以通过主键识别更改的行。

在单个数据流中监控表和列

您可以将前两个示例中的表观察和列观察语法合并到一个变更数据流中:

CREATE CHANGE STREAM NamesAndAlbums
FOR Singers(FirstName, LastName), Albums;

指定更长的保留期限

指定变更数据流数据保留期限 长于默认的期限(一天),请将retention_period设置为周期 最多一周,以小时 (h) 或天 (d) 的形式表示。

下面两个示例:

GoogleSQL

CREATE CHANGE STREAM LongerDataRetention
FOR ALL
OPTIONS ( retention_period = '36h' );
CREATE CHANGE STREAM MaximumDataRetention
FOR ALL
OPTIONS ( retention_period = '7d' );

PostgreSQL

CREATE CHANGE STREAM LongerDataRetention
FOR ALL
WITH ( retention_period = '36h' );
CREATE CHANGE STREAM MaximumDataRetention
FOR ALL
WITH ( retention_period = '7d' );

指定其他值捕获类型

如需指定 OLD_AND_NEW_VALUES 以外的更改流值捕获类型,请将 value_capture_type 设置为 NEW_VALUESNEW_ROW,如以下示例所示:

GoogleSQL

CREATE CHANGE STREAM NewRowChangeStream
FOR ALL
OPTIONS ( value_capture_type = 'NEW_ROW' );
CREATE CHANGE STREAM NewValuesChangeStream
FOR ALL
OPTIONS ( value_capture_type = 'NEW_VALUES' );

PostgreSQL

CREATE CHANGE STREAM NewRowChangeStream
FOR ALL
WITH ( value_capture_type = 'NEW_ROW' );
CREATE CHANGE STREAM NewValuesChangeStream
FOR ALL
WITH ( value_capture_type = 'NEW_VALUES' );

过滤基于 TTL 的删除操作

您可以过滤基于 TTL 的删除操作 使用 exclude_ttl_deletes 过滤条件从变更数据流的范围中移除。

如需详细了解此过滤条件的运作方式,请参阅基于生命周期的删除过滤条件

GoogleSQL

如需使用基于 TTL 的删除过滤条件创建变更数据流,请使用以下命令:

CREATE CHANGE STREAM CHANGE_STREAM_NAME FOR ALL
OPTIONS (exclude_ttl_deletes = true)

替换以下内容:

  • CHANGE_STREAM_NAME:新实例的名称 变更数据流

以下示例会创建一个名为 NewFilterChangeStream 的变更数据流 (排除所有基于 TTL 的删除):

CREATE CHANGE STREAM NewFilterChangeStream FOR ALL
OPTIONS (exclude_ttl_deletes = true)

PostgreSQL

如需使用基于 TTL 的删除过滤条件创建更改流,请使用以下命令:

CREATE CHANGE STREAM CHANGE_STREAM_NAME FOR ALL
WITH (exclude_ttl_deletes = true)

替换以下内容:

  • STREAM_NAME:新实例的名称 变更数据流

以下示例会创建一个名为 NewFilterChangeStream 的变更数据流 (排除所有基于 TTL 的删除):

CREATE CHANGE STREAM NewFilterChangeStream FOR ALL
WITH (exclude_ttl_deletes = true)

如需在现有过滤器中添加或移除基于 TTL 的删除过滤器,请执行以下操作: 请参阅修改基于 TTL 的删除过滤条件。 您可以通过将更改数据流的定义视为 DDL 来确认更改数据流过滤条件。

按表修改类型过滤

使用以下可用过滤条件选项从更改流的范围中过滤出一个或多个此类表更改:

  • exclude_insert:排除所有 INSERT 表修改
  • exclude_update:排除所有 UPDATE 表修改
  • exclude_delete:排除所有 DELETE 表修改

有关这些过滤器工作方式的详细信息,请参阅 表修改类型过滤条件

GoogleSQL

如需创建包含一个或多个表修改类型过滤条件的更改流,请使用以下命令:

CREATE CHANGE STREAM CHANGE_STREAM_NAME FOR ALL
OPTIONS (MOD_TYPE_FILTER_NAME = true)

替换以下内容:

  • CHANGE_STREAM_NAME:新实例的名称 变更数据流
  • MOD_TYPE_FILTER_NAME:您要添加的过滤条件:exclude_insertexclude_updateexclude_delete。如果一次添加多个过滤条件,请用英文逗号分隔每个过滤条件。

以下示例创建了一个名为 NewFilterChangeStream 的更改流,该更改流会排除 INSERTUPDATE 表修改类型:

CREATE CHANGE STREAM NewFilterChangeStream FOR ALL
OPTIONS (exclude_insert = true, exclude_update = true)

PostgreSQL

如需创建包含一个或多个表修改类型过滤条件的更改流,请使用以下命令:

CREATE CHANGE STREAM CHANGE_STREAM_NAME FOR ALL
WITH (MOD_TYPE_FILTER_NAME = true)

替换以下内容:

  • CHANGE_STREAM_NAME:现有应用的名称 变更数据流
  • MOD_TYPE_FILTER_NAME:您要添加的过滤条件:exclude_insertexclude_updateexclude_delete。如果一次添加多个过滤条件,请使用英文逗号分隔每个过滤条件。

以下示例创建了一个名为 NewFilterChangeStream 的更改流,该更改流会排除 INSERTUPDATE 表修改事务类型:

CREATE CHANGE STREAM NewFilterChangeStream FOR ALL
WITH (exclude_insert = true, exclude_update = true)

如需在现有过滤条件中添加或移除表修改类型过滤条件,请执行以下操作: 请参阅按表修改类型修改过滤器。 您可以通过将变更数据流的定义视为 DDL 来确认您的变更数据流存在哪些表更改类型过滤条件。

修改变更数据流

如需修改变更数据流的配置,请使用 ALTER CHANGE STREAM DDL 声明。它使用的语法与 CREATE CHANGE STREAM 类似。您可以 更改数据流所监控的列或其数据长度 保留期限。您还可以完全暂停其观看,同时保留其数据更改记录。

修改变更数据流监控的内容

以下示例将整个 Songs 表添加到之前配置的 NamesAndAlbums 变更数据流:

ALTER CHANGE STREAM NamesAndAlbums
SET FOR Singers(FirstName, LastName), Albums, Songs;

在更新数据库架构中更改流的定义的长时间运行操作完成后,Spanner 会将命名更改流的行为替换为新配置。

修改变更数据流的数据保留期限

如需修改变更数据流保留其内部记录的时长,请在 ALTER CHANGE STREAM DDL 语句中设置 retention_period

此示例根据 NamesAndAlbums 调整数据保留期限 之前创建的变更数据流:

GoogleSQL

ALTER CHANGE STREAM NamesAndAlbums
SET OPTIONS ( retention_period = '36h' );

PostgreSQL

ALTER CHANGE STREAM NamesAndAlbums
SET ( retention_period = '36h' );

修改变更数据流的值捕获类型

如需修改变更数据流的值捕获类型,请将 ALTER CHANGE STREAM DDL 语句中的 value_capture_type 子句。

此示例将值捕获类型调整为 NEW_VALUES

GoogleSQL

ALTER CHANGE STREAM NamesAndAlbums
SET OPTIONS ( value_capture_type = 'NEW_VALUES' );

PostgreSQL

ALTER CHANGE STREAM NamesAndAlbums
SET ( value_capture_type = 'NEW_VALUES' );

修改基于 TTL 的删除过滤条件

针对某项更改修改基于 TTL 的删除过滤条件 数据流,请在 ALTER CHANGE STREAM DDL 中设置 exclude_ttl_deletes 过滤条件 声明。你可以在此菜单中添加或移除过滤条件 变更数据流

有关这些过滤器工作方式的详细信息,请参阅 基于存留时间的删除过滤条件

向现有更改流添加基于 TTL 的删除过滤条件

GoogleSQL

如需向现有更改流添加基于 TTL 的删除过滤条件,请使用以下命令将过滤条件设置为 true

ALTER CHANGE STREAM CHANGE_STREAM_NAME FOR ALL
SET OPTIONS (exclude_ttl_deletes = true)

替换以下内容:

  • CHANGE_STREAM_NAME:现有应用的名称 变更数据流

在以下示例中,exclude_ttl_deletes 过滤条件会添加到名为 NewFilterChangeStream 的现有更改流中,从而排除所有基于 TTL 的删除操作:

ALTER CHANGE STREAM NewFilterChangeStream FOR ALL
SET OPTIONS (exclude_ttl_deletes = true)

这会从变更数据流中排除所有基于 TTL 的删除操作。

PostgreSQL

如需向现有更改流添加基于 TTL 的删除过滤条件,请使用以下命令将过滤条件设置为 True

ALTER CHANGE STREAM STREAM_NAME FOR ALL
SET (exclude_ttl_deletes = true)

替换以下内容:

  • STREAM_NAME:现有应用的名称 变更数据流

在以下示例中,exclude_ttl_deletes 过滤条件会添加到 名为“NewFilterChangeStream”的现有变更数据流:

ALTER CHANGE STREAM NewFilterChangeStream FOR ALL
SET (exclude_ttl_deletes = true)

这会从变更数据流中排除所有未来基于 TTL 的删除操作。

从现有更改数据流中移除基于 TTL 的删除过滤条件

GoogleSQL

如需移除现有更改流基于 TTL 的删除过滤条件,请使用以下命令将过滤条件设为 False

ALTER CHANGE STREAM STREAM_NAME FOR ALL
SET OPTIONS (exclude_ttl_deletes = false)

替换以下内容:

  • STREAM_NAME:新更改流的名称

在以下示例中,系统会从名为 NewFilterChangeStream 的现有更改流中移除 exclude_ttl_deletes 过滤条件:

ALTER CHANGE STREAM NewFilterChangeStream FOR ALL
SET OPTIONS (exclude_ttl_deletes = false)

这包括未来对更改流基于 TTL 的所有删除操作。

您还可以将过滤条件设为 null,以移除基于 TTL 的删除过滤条件。

PostgreSQL

如需为现有变更数据流移除基于 TTL 的删除过滤条件,请执行以下操作: 请使用以下命令将过滤条件设置为 False

ALTER CHANGE STREAM STREAM_NAME FOR ALL
SET (exclude_ttl_deletes = false)

替换以下内容:

  • STREAM_NAME:新更改流的名称

在以下示例中,系统会从名为 NewFilterChangeStream 的现有更改流中移除 exclude_ttl_deletes 过滤条件:

ALTER CHANGE STREAM NewFilterChangeStream FOR ALL
SET (exclude_ttl_deletes = false)

这包括未来对更改流基于 TTL 的所有删除操作。

您还可以将过滤条件设置为 null,以移除基于 TTL 的删除过滤条件。

按表格修改类型修改过滤条件

如需修改更改流的表更改类型过滤条件,请在 ALTER CHANGE STREAM DDL 语句中设置过滤条件类型。您可以使用此 API 向更改流添加新过滤条件或从更改流中移除现有过滤条件。

向现有变更数据流添加表修改类型过滤条件

GoogleSQL

如需向现有更改数据流添加一个或多个新的表修改类型过滤条件,请使用以下命令将过滤条件设置为 true

ALTER CHANGE STREAM CHANGE_STREAM_NAME
SET OPTIONS (MOD_TYPE_FILTER_NAME = true)

替换以下内容:

  • CHANGE_STREAM_NAME:替换为您现有名称 变更数据流
  • MOD_TYPE_FILTER_NAME:替换为您要添加的过滤条件:exclude_insertexclude_updateexclude_delete。如果一次添加多个过滤条件,请用英文逗号分隔每个过滤条件。

在以下示例中,exclude_delete 过滤条件会添加到名为 NewFilterChangeStream 的现有更改流中:

ALTER CHANGE STREAM NewFilterChangeStream
SET OPTIONS (exclude_delete = true)

PostgreSQL

如需向现有更改数据流添加一个或多个新的表修改类型过滤条件,请使用以下命令将过滤条件设置为 true

ALTER CHANGE STREAM CHANGE_STREAM_NAME
SET (MOD_TYPE_FILTER_NAME = true)

替换以下内容:

  • CHANGE_STREAM_NAME:替换为您现有名称 变更数据流
  • MOD_TYPE_FILTER_NAME:替换为您要添加的过滤条件:exclude_insertexclude_updateexclude_delete。如果一次添加多个过滤条件,请用英文逗号分隔每个过滤条件。

在以下示例中,exclude_delete 过滤条件会添加到名为 NewFilterChangeStream 的现有更改流中:

ALTER CHANGE STREAM NewFilterChangeStream
SET (exclude_delete = true)

从现有变更数据流中移除表修改类型过滤条件

GoogleSQL

如需移除更改流中一个或多个现有表修改类型过滤条件,请使用以下命令将过滤条件设为 false

ALTER CHANGE STREAM CHANGE_STREAM_NAME
SET OPTIONS (MOD_TYPE_FILTER_NAME = false)

替换以下内容:

  • CHANGE_STREAM_NAME:替换为现有更改流的名称
  • MOD_TYPE_FILTER_NAME:替换为要移除的过滤条件:exclude_insertexclude_updateexclude_delete。如果一次要移除多个过滤条件,请用英文逗号分隔每个过滤条件。

在以下示例中,系统会从名为 NewFilterChangeStream 的现有更改流中移除 exclude_delete 过滤器:

ALTER CHANGE STREAM NewFilterChangeStream
SET OPTIONS (exclude_delete = false)

您还可以通过设置过滤条件来移除表格修改过滤条件 还原为默认值。为此,请将过滤器值设置为 null

PostgreSQL

要移除一个或多个现有的表修改类型过滤条件,请执行以下操作: 变更数据流,请使用以下命令将过滤条件设置为 False

ALTER CHANGE STREAM CHANGE_STREAM_NAME
SET (MOD_TYPE_FILTER_NAME = false)

替换以下内容:

  • CHANGE_STREAM_NAME:替换为您现有名称 变更数据流
  • MOD_TYPE_FILTER_NAME:请替换为您所用的过滤条件 要添加:exclude_insertexclude_updateexclude_delete。 如果一次添加多个过滤条件,请使用英文逗号分隔每个过滤条件。

在以下示例中,系统会从名为 NewFilterChangeStream 的现有更改流中移除 exclude_delete 过滤器:

ALTER CHANGE STREAM NewFilterChangeStream
SET (exclude_delete = false)

您还可以通过设置过滤条件来移除表格修改过滤条件 还原为默认值。为此,请将过滤条件值设置为 null

暂停变更数据流

如果您希望变更数据流停止其活动,但保留其 内部记录(至少在数据保留期限内), 可以改变它,使其什么都不看

为此,可发出一个 ALTER CHANGE STREAM DDL 语句来替换 使用特殊短语 DROP FOR ALL 替换变更数据流的定义。 例如:

ALTER CHANGE STREAM MyStream DROP FOR ALL;

该数据流继续存在于数据库中,但不会观察任何对象, 并且不会生成更多数据变更记录。其现有更改记录将保持不变,具体取决于数据流的数据保留政策。

如需恢复已暂停的串流,请使用其之前的配置发出另一个 ALTER CHANGE STREAM 语句。

删除变更数据流

如需永久删除变更数据流,请发出包含数据流名称的 DROP CHANGE STREAM 语句:

DROP CHANGE STREAM NamesAndAlbums;

Spanner 会立即停止数据流,将其从数据库的架构中移除,并删除其数据更改记录。

列出和查看变更数据流

Google Cloud 控制台提供了一个网页界面,用于列出和 查看数据库的变更数据流定义。您还可以查看 变更数据流的结构作为其等效 DDL 语句,或通过 查询数据库的信息架构。

使用 Google Cloud 控制台查看变更数据流

如需查看数据库的变更数据流列表并查看其定义,请执行以下操作:

  1. 访问 Google Cloud 控制台的 Spanner 实例页面。

    打开实例页面

  2. 前往相应实例和数据库。

  3. 点击导航菜单中的更改流

此操作会显示该数据库的所有更改流的列表,并汇总每个更改流的配置。点击数据流的名称可详细了解其所监控的表和列。

以 DDL 的形式查看变更数据流的定义

以 DDL 格式查看数据库架构时,其中会包含对其所有更改流的说明,这些更改流会显示为 CREATE CHANGE STREAM 语句。

  • 如需在控制台中执行此操作,请点击 Google Cloud 控制台中数据库页面上的显示等效 DDL 链接。

  • 如需通过命令行执行此操作,请使用 Google Cloud CLI 的 ddl describe 命令

查询有关变更数据流的信息架构

您可以直接查询数据库的信息架构,了解其更改流。通过 下表包含定义变更数据流的元数据 监控的数据表和列,以及数据保留 周期:

变更流最佳实践

以下是配置和管理更改流的一些最佳实践。

考虑使用单独的元数据数据库

变更数据流使用元数据数据库来维护内部状态。 元数据数据库可以与之前创建的数据库 包含变更数据流我们建议您创建一个单独的数据库 元数据存储。

Spanner 变更数据流连接器需要读写 对元数据数据库的权限。您无需使用架构准备此数据库;连接器会负责处理此事宜。

使用单独的元数据数据库可消除允许连接器直接写入应用数据库时可能出现的复杂性:

  • 通过将元数据数据库与 包含变更数据流的生产数据库,连接器只需 对生产数据库的读取权限。

  • 通过将连接器的流量限制到单独的元数据数据库,生产变更数据流中不会包含连接器本身执行的写入。这对于变更数据流 监控整个数据库

如果未使用单独的数据库来存储元数据,我们建议 监控变更数据流连接器对其实例的 CPU 影响。

对新的变更数据流进行基准测试,并根据需要调整大小

在向生产实例添加新的变更数据流之前,请考虑以下事项: 通过更改对预演实例上的实际工作负载进行基准测试 视频流。这样,您就可以确定是否需要向实例添加节点,以增加其计算容量和存储容量。

运行这些测试,直到 CPU 和存储指标稳定下来。理想情况下, 实例的 CPU 利用率应低于 建议的上限,以及 其存储空间用量不应超过实例的存储空间上限。

使用不同区域进行负载均衡

多区域实例配置中使用更改流时,请考虑在与默认主副本区域不同的区域中运行其处理流水线。这有助于在非主要副本之间分布流式负载。如果 您需要优先考虑尽可能短的流式传输延迟时间 但是,在主要区域中运行流式负载。

后续步骤