创建和管理变更数据流

本页面介绍了如何创建、修改和查看 Spanner 变更数据流。如需详细了解变更数据流,请参阅变更数据流简介

由于变更数据流是架构对象,因此您可以通过任何其他类型的数据库定义工作(例如创建表或添加索引)所使用的由 DDL 驱动的架构更新来创建和管理变更数据流。

在您提交架构更改 DDL 语句(包括用于创建、更改或删除变更数据流的语句)后,Spanner 会启动长时间运行的操作。这项长时间运行的操作完成后,新的或更改的变更数据流将开始监视其新配置指定的列或表。

创建变更流

如需创建变更数据流,您需要提供其名称及其将监控的架构对象:整个数据库或特定表和列的列表。您可以选择指定以下内容:

GoogleSQL

使用 GoogleSQL 创建变更数据流的 DDL 语法如下所示:

CREATE CHANGE STREAM change_stream_name
  [FOR column_or_table_watching_definition[, ... ] ]
  [
    OPTIONS (
      retention_period = timespan,
      value_capture_type = type,
      exclude_ttl_deletes = boolean,
      exclude_insert = boolean,
      exclude_update = boolean,
      exclude_delete = boolean
    )
  ]

PostgreSQL

使用 PostgreSQL 创建变更数据流的 DDL 语法如下所示:

CREATE CHANGE STREAM change_stream_name
  [FOR column_or_table_watching_definition[, ... ] ]
  [
    WITH (
      retention_period = timespan,
      value_capture_type = type,
      exclude_ttl_deletes = boolean,
      exclude_insert = boolean,
      exclude_update = boolean,
      exclude_delete = boolean
    )
  ]

创建新的变更数据流的长时间运行的操作完成后,该变更流会立即开始监视为其分配的架构对象。

以下示例说明了如何使用各种配置创建变更数据流。

监控整个数据库

如需创建变更数据流以监控整个数据库表中执行的每项数据更改,请使用 ALL 关键字:

CREATE CHANGE STREAM EverythingStream
FOR ALL;

ALL 配置会在创建数据库的未来数据表和列后立即隐式包含这些数据表和列。不包括视图、信息架构表或普通数据表之外的其他对象。

查看特定表

如需将变更数据流的范围限定为特定表(而非整个数据库),请指定一个或多个表的列表:

CREATE CHANGE STREAM SingerAlbumStream
FOR Singers, Albums;

Spanner 会自动更新更改流来监控整个表,以反映影响这些表的任何架构更改(例如添加或删除列)。

查看特定列

使用 table(column_1[, column_2, ...]) 语法查看对您命名的表中一个或多个特定非键列的更改:

CREATE CHANGE STREAM NamesAndTitles
FOR Singers(FirstName, LastName), Albums(Title);

您不能在此处指定主键列,因为每个变更数据流都会始终跟踪其所监控的每个表的主键。这样,每个数据更改记录都可以通过主键来识别已更改的行。

在单个数据流中监控表和列

您可以将前两个示例中的表观察和列观察语法合并到一个变更数据流中:

CREATE CHANGE STREAM NamesAndAlbums
FOR Singers(FirstName, LastName), Albums;

指定更长的保留期限

如需指定长于默认期限(一天)的变更数据流数据保留期限,请将 retention_period 设置为最长一周的期限,以小时 (h) 或天 (d) 表示。

两个示例:

GoogleSQL

CREATE CHANGE STREAM LongerDataRetention
FOR ALL
OPTIONS ( retention_period = '36h' );
CREATE CHANGE STREAM MaximumDataRetention
FOR ALL
OPTIONS ( retention_period = '7d' );

PostgreSQL

CREATE CHANGE STREAM LongerDataRetention
FOR ALL
WITH ( retention_period = '36h' );
CREATE CHANGE STREAM MaximumDataRetention
FOR ALL
WITH ( retention_period = '7d' );

指定其他值捕获类型

如需指定 OLD_AND_NEW_VALUES 以外的变更数据流值捕获类型,请将 value_capture_type 设置为 NEW_VALUESNEW_ROW,如以下示例所示:

GoogleSQL

CREATE CHANGE STREAM NewRowChangeStream
FOR ALL
OPTIONS ( value_capture_type = 'NEW_ROW' );
CREATE CHANGE STREAM NewValuesChangeStream
FOR ALL
OPTIONS ( value_capture_type = 'NEW_VALUES' );

PostgreSQL

CREATE CHANGE STREAM NewRowChangeStream
FOR ALL
WITH ( value_capture_type = 'NEW_ROW' );
CREATE CHANGE STREAM NewValuesChangeStream
FOR ALL
WITH ( value_capture_type = 'NEW_VALUES' );

过滤基于 TTL 的删除操作

您可以使用 exclude_ttl_deletes 过滤条件从变更数据流的范围中过滤基于 TTL 的删除操作

如需详细了解此过滤器的工作原理,请参阅基于存留时间的删除过滤器

GoogleSQL

如需使用基于 TTL 的删除过滤条件创建变更数据流,请使用以下命令:

CREATE CHANGE STREAM CHANGE_STREAM_NAME FOR ALL
OPTIONS (exclude_ttl_deletes = true)

请替换以下内容:

  • CHANGE_STREAM_NAME:新变更数据流的名称

以下示例会创建一个名为 NewFilterChangeStream 的变更数据流,其中排除了所有基于 TTL 的删除操作:

CREATE CHANGE STREAM NewFilterChangeStream FOR ALL
OPTIONS (exclude_ttl_deletes = true)

PostgreSQL

如需使用基于 TTL 的删除过滤条件创建变更数据流,请使用以下命令:

CREATE CHANGE STREAM CHANGE_STREAM_NAME FOR ALL
WITH (exclude_ttl_deletes = true)

请替换以下内容:

  • STREAM_NAME:新变更数据流的名称

以下示例会创建一个名为 NewFilterChangeStream 的变更数据流,其中排除了所有基于 TTL 的删除操作:

CREATE CHANGE STREAM NewFilterChangeStream FOR ALL
WITH (exclude_ttl_deletes = true)

如需在现有变更数据流中添加或移除基于 TTL 的删除过滤条件,请参阅修改基于 TTL 的删除过滤条件。您可以以 DDL 形式查看变更数据流的定义,确认变更数据流过滤条件。

按表修改类型过滤

使用以下可用的过滤选项,从变更数据流的范围中过滤一项或多项表修改:

  • exclude_insert:排除所有 INSERT 表修改
  • exclude_update:排除所有 UPDATE 表修改
  • exclude_delete:排除所有 DELETE 表修改

如需详细了解这些过滤条件的工作原理,请参阅表修改类型过滤条件

GoogleSQL

如需创建具有一个或多个表修改类型过滤条件的变更数据流,请使用以下命令:

CREATE CHANGE STREAM CHANGE_STREAM_NAME FOR ALL
OPTIONS (MOD_TYPE_FILTER_NAME = true)

请替换以下内容:

  • CHANGE_STREAM_NAME:新变更数据流的名称
  • MOD_TYPE_FILTER_NAME:您要添加的过滤条件:exclude_insertexclude_updateexclude_delete。如果一次添加多个过滤条件,请使用英文逗号分隔各个过滤条件。

以下示例会创建一个名为 NewFilterChangeStream 的变更数据流,该数据流将排除 INSERTUPDATE 表修改类型:

CREATE CHANGE STREAM NewFilterChangeStream FOR ALL
OPTIONS (exclude_insert = true, exclude_update = true)

PostgreSQL

如需创建具有一个或多个表修改类型过滤条件的变更数据流,请使用以下命令:

CREATE CHANGE STREAM CHANGE_STREAM_NAME FOR ALL
WITH (MOD_TYPE_FILTER_NAME = true)

请替换以下内容:

  • CHANGE_STREAM_NAME:现有变更数据流的名称
  • MOD_TYPE_FILTER_NAME:您要添加的过滤条件:exclude_insertexclude_updateexclude_delete。如果一次添加多个过滤条件,请使用英文逗号分隔各个过滤条件。

以下示例创建一个名为 NewFilterChangeStream 的变更数据流,用于排除 INSERTUPDATE 表修改事务类型:

CREATE CHANGE STREAM NewFilterChangeStream FOR ALL
WITH (exclude_insert = true, exclude_update = true)

如需在现有变更数据流中添加或移除表修改类型过滤条件,请参阅按表修改类型修改过滤条件。您可以以 DDL 形式查看变更数据流的定义,以确认变更数据流存在哪些表修改类型过滤条件。

修改变更数据流

如需修改变更数据流的配置,请使用 ALTER CHANGE STREAM DDL 语句。它使用的语法与 CREATE CHANGE STREAM 类似。您可以更改数据流所监控的列或其数据保留期限。您还可以暂停监控其数据,同时保留其数据变更记录。

修改变更数据流所监控的内容

以下示例将整个 Songs 表添加到之前配置的 NamesAndAlbums 变更数据流中:

ALTER CHANGE STREAM NamesAndAlbums
SET FOR Singers(FirstName, LastName), Albums, Songs;

长时间运行的操作(更新数据库架构中变更数据流的定义)完成后,Spanner 会将已命名变更数据流的行为替换为新配置。

修改变更数据流的数据保留期限

如需修改变更数据流保留其内部记录的时长,请在 ALTER CHANGE STREAM DDL 语句中设置 retention_period

以下示例会根据之前创建的 NamesAndAlbums 变更数据流调整数据保留期限:

GoogleSQL

ALTER CHANGE STREAM NamesAndAlbums
SET OPTIONS ( retention_period = '36h' );

PostgreSQL

ALTER CHANGE STREAM NamesAndAlbums
SET ( retention_period = '36h' );

修改变更数据流的值捕获类型

如需修改变更数据流的值捕获类型,请在 ALTER CHANGE STREAM DDL 语句中设置 value_capture_type 子句。

此示例将值捕获类型调整为 NEW_VALUES

GoogleSQL

ALTER CHANGE STREAM NamesAndAlbums
SET OPTIONS ( value_capture_type = 'NEW_VALUES' );

PostgreSQL

ALTER CHANGE STREAM NamesAndAlbums
SET ( value_capture_type = 'NEW_VALUES' );

修改基于 TTL 的删除过滤条件

如需修改变更流的基于 TTL 的删除过滤条件,请在 ALTER CHANGE STREAM DDL 语句中设置 exclude_ttl_deletes 过滤条件。您可以使用此方法向现有变更数据流添加过滤条件或从中移除过滤条件。

如需详细了解这些过滤器的工作原理,请参阅基于存留时间的删除过滤器

将基于 TTL 的删除过滤条件添加到现有变更数据流

GoogleSQL

如需将基于 TTL 的删除过滤条件添加到现有变更数据流,请使用以下命令将过滤条件设置为 true

ALTER CHANGE STREAM CHANGE_STREAM_NAME FOR ALL
SET OPTIONS (exclude_ttl_deletes = true)

请替换以下内容:

  • CHANGE_STREAM_NAME:现有变更数据流的名称

在以下示例中,exclude_ttl_deletes 过滤条件将添加到名为 NewFilterChangeStream 的现有变更数据流中:其中排除了所有基于 TTL 的删除操作:

ALTER CHANGE STREAM NewFilterChangeStream FOR ALL
SET OPTIONS (exclude_ttl_deletes = true)

这会从变更数据流中排除所有基于 TTL 的删除操作。

PostgreSQL

如需将基于 TTL 的删除过滤条件添加到现有变更数据流,请使用以下命令将过滤条件设置为 True

ALTER CHANGE STREAM STREAM_NAME FOR ALL
SET (exclude_ttl_deletes = true)

请替换以下内容:

  • STREAM_NAME:现有变更数据流的名称

在以下示例中,exclude_ttl_deletes 过滤条件将添加到名为 NewFilterChangeStream 的现有变更数据流中:

ALTER CHANGE STREAM NewFilterChangeStream FOR ALL
SET (exclude_ttl_deletes = true)

这会从变更数据流中排除将来所有基于 TTL 的删除操作。

从现有变更数据流中移除基于 TTL 的删除过滤条件

GoogleSQL

如需移除现有变更数据流的基于 TTL 的删除过滤条件,请使用以下命令将该过滤条件设置为 False

ALTER CHANGE STREAM STREAM_NAME FOR ALL
SET OPTIONS (exclude_ttl_deletes = false)

请替换以下内容:

  • STREAM_NAME:新变更数据流的名称

在以下示例中,exclude_ttl_deletes 过滤条件会从名为 NewFilterChangeStream 的现有变更数据流中移除:

ALTER CHANGE STREAM NewFilterChangeStream FOR ALL
SET OPTIONS (exclude_ttl_deletes = false)

这包括将来对变更数据流进行的所有基于 TTL 的删除操作。

您还可以将过滤条件设置为 null,以移除基于 TTL 的删除过滤条件。

PostgreSQL

如需移除现有变更数据流的基于 TTL 的删除过滤条件,请使用以下命令将该过滤条件设置为 False

ALTER CHANGE STREAM STREAM_NAME FOR ALL
SET (exclude_ttl_deletes = false)

请替换以下内容:

  • STREAM_NAME:新变更数据流的名称

在以下示例中,exclude_ttl_deletes 过滤条件会从名为 NewFilterChangeStream 的现有变更数据流中移除:

ALTER CHANGE STREAM NewFilterChangeStream FOR ALL
SET (exclude_ttl_deletes = false)

这包括将来对变更数据流进行的所有基于 TTL 的删除操作。

您还可以将过滤条件设置为 null,以移除基于 TTL 的删除过滤条件。

按表修改类型修改过滤条件

如需修改变更数据流的表修改类型过滤条件,请在 ALTER CHANGE STREAM DDL 语句中设置过滤条件类型。您可以使用此工具向变更数据流添加新过滤条件,或从变更数据流中移除现有过滤条件。

向现有变更数据流添加表修改类型过滤条件

GoogleSQL

如需将一个或多个新的表修改类型过滤条件添加到现有变更数据流,请使用以下命令将过滤条件设置为 true

ALTER CHANGE STREAM CHANGE_STREAM_NAME
SET OPTIONS (MOD_TYPE_FILTER_NAME = true)

请替换以下内容:

  • CHANGE_STREAM_NAME:替换为您现有变更数据流的名称
  • MOD_TYPE_FILTER_NAME:替换为您要添加的过滤条件:exclude_insertexclude_updateexclude_delete。如果一次添加多个过滤条件,请使用英文逗号分隔各个过滤条件。

在以下示例中,exclude_delete 过滤条件将添加到名为 NewFilterChangeStream 的现有变更数据流中:

ALTER CHANGE STREAM NewFilterChangeStream
SET OPTIONS (exclude_delete = true)

PostgreSQL

如需将一个或多个新的表修改类型过滤条件添加到现有变更数据流,请使用以下命令将过滤条件设置为 true

ALTER CHANGE STREAM CHANGE_STREAM_NAME
SET (MOD_TYPE_FILTER_NAME = true)

请替换以下内容:

  • CHANGE_STREAM_NAME:替换为您现有变更数据流的名称
  • MOD_TYPE_FILTER_NAME:替换为您要添加的过滤条件:exclude_insertexclude_updateexclude_delete。如果一次添加多个过滤条件,请使用英文逗号分隔各个过滤条件。

在以下示例中,exclude_delete 过滤条件将添加到名为 NewFilterChangeStream 的现有变更数据流中:

ALTER CHANGE STREAM NewFilterChangeStream
SET (exclude_delete = true)

从现有变更数据流中移除表修改类型过滤条件

GoogleSQL

如需移除变更数据流中的一个或多个现有的表修改类型过滤条件,请使用以下命令将过滤条件设置为 false

ALTER CHANGE STREAM CHANGE_STREAM_NAME
SET OPTIONS (MOD_TYPE_FILTER_NAME = false)

请替换以下内容:

  • CHANGE_STREAM_NAME:替换为您现有变更数据流的名称
  • MOD_TYPE_FILTER_NAME:替换为要移除的过滤条件:exclude_insertexclude_updateexclude_delete。如果一次移除多个过滤条件,请使用英文逗号分隔各个过滤条件。

在以下示例中,exclude_delete 过滤条件会从名为 NewFilterChangeStream 的现有变更数据流中移除:

ALTER CHANGE STREAM NewFilterChangeStream
SET OPTIONS (exclude_delete = false)

您还可以通过将过滤条件重新设置为默认值来移除表修改过滤条件。为此,请将过滤条件值设置为 null

PostgreSQL

如需移除变更数据流中的一个或多个现有的表修改类型过滤条件,请使用以下命令将过滤条件设置为 False

ALTER CHANGE STREAM CHANGE_STREAM_NAME
SET (MOD_TYPE_FILTER_NAME = false)

请替换以下内容:

  • CHANGE_STREAM_NAME:替换为您现有变更数据流的名称
  • MOD_TYPE_FILTER_NAME:替换为您要添加的过滤条件:exclude_insertexclude_updateexclude_delete。如果一次添加多个过滤条件,请使用英文逗号分隔各个过滤条件。

在以下示例中,exclude_delete 过滤条件会从名为 NewFilterChangeStream 的现有变更数据流中移除:

ALTER CHANGE STREAM NewFilterChangeStream
SET (exclude_delete = false)

您还可以通过将过滤条件重新设置为默认值来移除表修改过滤条件。为此,请将过滤条件值设置为 null

暂停变更数据流

如果您希望变更数据流停止其活动,但保留其内部记录(至少在其数据保留期限内),则可以进行更改,使其不监控任何内容。

为此,请发出一个 ALTER CHANGE STREAM DDL 语句,以将变更数据流的定义替换为特殊短语 DROP FOR ALL。例如:

ALTER CHANGE STREAM MyStream DROP FOR ALL;

数据流继续存在于数据库中,但不会监控任何对象,并且不会生成更多数据变更记录。其现有更改记录将保持不变,但受数据流的数据保留政策的约束。

如需恢复已暂停的数据流,请发出另一个包含先前配置的 ALTER CHANGE STREAM 语句。

删除变更数据流

如需永久删除变更数据流,请发出包含相应数据流名称的 DROP CHANGE STREAM 语句:

DROP CHANGE STREAM NamesAndAlbums;

Spanner 会立即停止数据流,将其从数据库的架构中移除,并删除其数据变更记录。

列出和查看变更数据流

Google Cloud 控制台提供了一个网页界面,用于列出和查看数据库的变更数据流定义。您还可以将变更数据流的结构视为等效 DDL 语句,或通过查询数据库的信息架构来查看。

使用 Google Cloud 控制台查看变更数据流

如需查看数据库的变更数据流列表并查看其定义,请执行以下操作:

  1. 访问 Google Cloud 控制台的 Spanner 实例页面。

    打开实例页面

  2. 前往相应的实例和数据库。

  3. 点击导航菜单中的变更数据流

系统会显示该数据库的所有变更数据流列表,并汇总每个变更数据流的配置。点击流的名称即可查看其所监控的表和列的更多详细信息。

以 DDL 形式查看变更数据流的定义

如果以 DDL 的形式查看数据库的架构,则会看到其所有变更数据流的说明,这些变更流以 CREATE CHANGE STREAM 语句的形式显示。

  • 如需从控制台执行此操作,请点击 Google Cloud 控制台中数据库页面上的显示等效 DDL 链接。

  • 如需通过命令行执行此操作,请使用 Google Cloud CLI 的 ddl describe 命令

查询有关变更数据流的信息架构

您可以直接查询数据库的信息架构,了解其变更数据流。下表包含用于定义变更数据流名称的元数据、变更数据流所监控的表和列及其保留期限:

变更数据流最佳实践

以下是配置和管理变更数据流的一些最佳实践。

考虑使用单独的元数据数据库

变更数据流使用元数据数据库来维护内部状态。元数据数据库可以与包含变更数据流的数据库相同或不同。我们建议您为元数据存储创建一个单独的数据库。

Spanner 变更数据流连接器需要对元数据数据库拥有读写权限。您不需要为此数据库准备架构;连接器会负责这一点。

使用单独的元数据数据库可消除因允许连接器直接写入应用数据库而可能出现的复杂性:

  • 通过使用变更数据流将元数据数据库与生产数据库分开,连接器只需要对生产数据库拥有读取权限。

  • 通过将连接器的流量限制到单独的元数据数据库,连接器本身执行的写入不会包含在生产变更数据流中。这与监控整个数据库的变更数据流密切相关。

如果没有使用单独的数据库来存储元数据,我们建议监控变更数据流连接器对其实例的 CPU 影响。

对新的变更数据流进行基准测试,并根据需要调整大小

在将新的变更数据流添加到生产实例之前,请考虑在启用了变更流的预演实例上对实际工作负载进行基准化分析。这可让您确定是否需要向实例添加节点,以增加其计算和存储容量。

运行这些测试,直到 CPU 和存储指标稳定为止。理想情况下,此实例的 CPU 利用率应保持在建议的最大值以下,并且其存储空间使用量不应超过实例的存储空间上限。

使用不同的区域进行负载均衡

多区域实例配置中使用变更数据流时,请考虑在默认主要区域之外的其他区域运行其处理流水线。这有助于在非主要副本之间分布流式负载。但是,如果您需要优先考虑尽可能最短的流式传输延迟而非负载均衡,请在主要区域中运行流式负载。

后续步骤