使用变更数据捕获将数据库复制到 BigQuery

本文档介绍了多种使用变更数据捕获 (CDC) 将各种数据源与 BigQuery 集成的方法。本文档介绍了每种方法的数据一致性、易用性和费用之间的权衡分析。它将帮助您了解现有解决方案,了解如何通过不同方法来使用 CDC 所复制的数据,以及创建方法的费用优势分析

本文档旨在帮助数据架构师、数据工程师和业务分析师开发一种在 BigQuery 中访问复制数据的最佳方法。本文假定您熟悉 BigQuery、SQL 和命令行工具。

CDC 数据复制概览

MySQL、Oracle 和 SAP 等数据库是讨论最频繁的 CDC 数据源。不过,您可以将能够捕捉并更改由主键识别的数据元素的任何系统视为数据源。如果系统未提供内置的 CDC 流程(例如事务日志),则您可以部署增量批处理读取器来进行更改。

本文档讨论符合以下条件的 CDC 流程:

  1. 数据复制会分别捕获对每个表进行的更改。
  2. 每个表都有一个主键或复合主键。
  3. 系统会为发出的每个 CDC 事件分配一个单调递增的更改 ID,它通常是一个数值,例如交易 ID 或时间戳。
  4. 每个 CDC 事件都包含处于已完成状态的已更改行。

下图显示了使用 CDC 将数据源复制到 BigQuery 的通用架构:

使用 CDC 将数据源复制到 BigQuery 的通用架构。

在上图中,在 BigQuery 中为每个源数据表创建了一个主表和一个增量表。主表包含源表的所有列,以及最新更改 ID 值的一列。您可以将最新更改 ID 值视为由记录的主键标识的实体的版本 ID,并使用它来查找最新版本。

增量表包含源表的所有列,以及操作类型列(即更新、插入或删除操作之一)和更改 ID 值。

以下是使用 CDC 将数据复制到 BigQuery 的总体过程:

  1. 系统会提取源表的初始数据转储。
  2. 提取的数据可以选择性地进行转换,然后加载到相应的主表中。如果该表中没有可用作更改 ID 的列(例如上次更新时间戳),则更改 ID 会设置为该列的数据类型的最小值。这样一来,后续处理就能识别在初始数据转储后更新的主表记录。
  3. CDC 捕获过程会捕获在进行初始数据转储后发生变化的行。
  4. 必要的话,CDC 处理层会执行额外的数据转换。例如,CDC 处理层可能会调整时间戳的格式以供 BigQuery 使用、垂直拆分列或移除列。
  5. 使用微批量加载或流式插入将数据插入到 BigQuery 的相应增量表中。

如果在将数据插入 BigQuery 之前执行了其他转换,则列的数量和类型可能与源表不同。但是,主表和增量表中都有同一组列。

增量表包含特定表自上次加载以来的所有更改事件。所有可用的更改事件都有助于确定趋势、表在特定时刻代表的实体的状态或者更改频率。

如需获取由特定主键表示的实体的当前状态,您可以查询主表格以及包含最新更改 ID 的记录的增量表。此查询的费用可能很高,因为您可能需要在主表和增量表之间执行联接,并完成一个或两个表的完整表扫描,以查找特定主键的最新条目。您可以根据主键对表进行聚簇分区,以避免执行全表扫描,但并非总是可行。

本文档比较以下常规方法,有助于您在无法对表进行分区或聚簇时获取实体的当前状态:

  • 即时一致性方法:查询反映了所复制数据的当前状态。即时一致性方法需要一个联接主表和 delta 表的查询,并且为每个主键选择最新行。
  • 费用优化的方法:以更快的速度和更低的费用执行查询,但数据可用性会发生延迟。您可以定期将数据合并到主表中。
  • 混合方法:您可以使用即时一致性方法或费用优化的方法,具体取决于您的要求和预算。

除了这些方法外,本文档还讨论了提升性能的其他方法。

准备工作

本文档演示了如何使用 bq 命令行工具和 SQL 语句来查看和查询 BigQuery 数据。本文档后面部分显示了表布局和查询的示例。如果您想要对示例数据展开实验,请完成以下设置:

  1. 选择项目创建项目并为项目启用结算功能
    • 如果您创建项目,则系统会自动启用 BigQuery。
    • 如果您选择现有项目,请启用 BigQuery API
  2. 在 Google Cloud Console 中,打开 Cloud Shell
  3. 要更新 BigQuery 配置文件,请使用文本编辑器打开 ~/.bigqueryrc 文件,然后在文件中的任意位置添加以下行或进行更新:

    [query]
    --use_legacy_sql=false
    
    [mk]
    --use_legacy_sql=false
    
  4. 克隆包含设置 BigQuery 环境所用脚本的 GitHub 代码库:

    git clone https://github.com/GoogleCloudPlatform/bq-mirroring-cdc.git
    
  5. 创建数据集、主表和增量表:

    cd bq-mirroring-cdc/tutorial
    chmod +x *.sh
    ./create-tables.sh
    

为避免在完成实验后可能产生费用,请关停项目删除数据集

设置 BigQuery 数据

为了演示如何通过不同的 CDC 解决方案将数据复制到 BigQuery,您可以使用一对主表和增量表,其中填充了示例数据,例如以下简单的示例表。

要采用比本文档中的描述更为复杂的设置,您可以使用 CDC BigQuery 集成演示。该演示将自动执行数据填充过程,并且包含用于监控复制过程的脚本。如果要运行演示,请按照 README 文件中的说明进行操作,该文件位于您在本文档的准备工作部分中克隆的 GitHub 代码库的根目录中。

示例数据使用简单的数据模型:包含所需系统生成的会话 ID 和可选用户名的网络会话。会话开始时,用户名为 null。用户登录后,系统会填充用户名。

要将数据从 BigQuery 环境脚本加载到主表中,您可以运行如下所示的命令:

bq load cdc_tutorial.session_main init.csv

要获取主表内容,可以运行如下所示的查询:

bq query "select * from cdc_tutorial.session_main limit 1000"

输出如下所示:

+-----+----------+-----------+
| id  | username | change_id |
+-----+----------+-----------+
| 100 | NULL     |         1 |
| 101 | Sam      |         2 |
| 102 | Jamie    |         3 |
+-----+----------+-----------+

接下来,将第一批 CDC 更改加载到增量表中。要将第一批 CDC 更改从 BigQuery 环境脚本加载到增量表中,您可以运行如下所示的命令:

bq load cdc_tutorial.session_delta first-batch.csv

要获取增量表内容,可以运行如下所示的查询:

bq query "select * from cdc_tutorial.session_delta limit 1000"

输出如下所示:

+-----+----------+-----------+-------------+
| id  | username | change_id | change_type |
+-----+----------+-----------+-------------+
| 100 | Cory     |         4 | U           |
| 101 | Sam      |         5 | D           |
| 103 | NULL     |         6 | I           |
| 104 | Jamie    |         7 | I           |
+-----+----------+-----------+-------------+

在前面的输出中,change_id 值是表行更改的唯一 ID。change_type 列中的值表示以下内容:

  • U:更新操作
  • D:删除操作
  • I:插入操作

主表包含关于会话 100、101 和 102 的信息。增量表具有以下变化:

  • 会话 100 中更新了用户名“Cory”。
  • 会话 101 已被删除。
  • 已创建新会话 103 和 104。

源系统中会话的当前状态如下所示:

+-----+----------+
| id  | username |
+-----+----------+
| 100 | Cory     |
| 102 | Jamie    |
| 103 | NULL     |
| 104 | Jamie    |
+-----+----------+

虽然当前状态显示为表,但此表不存在具体化形式。此表是主表和增量表的组合。

查询数据

您可以使用多种方法来确定会话的整体状态。以下各部分介绍了每种方法的优缺点。

即时一致性方法

如果即时数据一致性是您的主要目标,并且源数据会经常更改,则您可以使用单个查询来联接主表和增量表并选择最新行(时间戳最新的行或最高值)。

要创建联接主表和增量表并寻找最新行的 BigQuery 视图,您可以运行 bq 工具命令,如下所示:

bq mk --view \
"SELECT * EXCEPT(change_type, row_num)
FROM (
  SELECT *, ROW_NUMBER() OVER (PARTITION BY id ORDER BY change_id DESC) AS row_num
  FROM (
    SELECT * EXCEPT(change_type), change_type
    FROM \`$(gcloud config get-value project).cdc_tutorial.session_delta\` UNION ALL
    SELECT *, 'I'
    FROM \`$(gcloud config get-value project).cdc_tutorial.session_main\`))
WHERE
  row_num = 1
  AND change_type <> 'D'" \
 cdc_tutorial.session_latest_v

上述 BigQuery 视图中的 SQL 语句会执行以下操作:

  • 最内层的 UNION ALL 会从主表和增量表生成行:
    • SELECT * EXCEPT(change_type), change_type FROM session_delta 强制将 change_type 列设为列表中的最后一列。
    • SELECT *, ‘I' FROM session_main 从主表中选择行,就像它是插入行一样。
    • 使用 * 运算符可简化示例。如果存在其他列或其他列顺序,请将快捷键替换为显式列列表。
  • SELECT *, ROW_NUMBER() OVER (PARTITION BY id ORDER BY change_id DESC) AS row_num使用 BigQuery 中的分析函数,为具有相同值 id(由 PARTITION BY 定义)的每组行分配行号,从 1 开始。这些行按照该组中的 change_id 降序排序。由于系统会保证 change_id 增加,因此最新的更改具有一个值为 1 的 row_num 列。
  • WHERE row_num = 1 AND change_type <> 'D' 仅选择每个组中的最新行。这是 BigQuery 中常见的重复信息删除技术。如果其更改类型为删除,则此子句还会从结果中移除该行。
  • 最顶层的 SELECT * EXCEPT(change_type, row_num) 会移除为了处理而引入的额外列,这些列与其他情况不相关。

上述示例未在视图中使用插入和更新更改类型,因为引用最高的 change_id 值会选择原始插入或最新更新。在这种情况下,每行都包含所有列的完整数据。

创建视图后,您可以针对视图运行查询。如需获取最近的更改,您可以运行如下所示的查询:

bq query 'select * from cdc_tutorial.session_latest_v order by id limit 10'

输出如下所示:

+-----+----------+-----------+
| id  | username | change_id |
+-----+----------+-----------+
| 100 | Cory     |         4 |
| 102 | Jamie    |         3 |
| 103 | NULL     |         6 |
| 104 | Jamie    |         7 |
+-----+----------+-----------+

当您查询视图时,如果使用数据操纵语言 (DML) 语句更新了增量表中的数据,则可以立即查看增量表中的数据,如果您在增量表中流式插入了数据,则这些数据几乎立即可见。

费用优化的方法

即时一致性方法很简单,但效率可能低下,因为它需要 BigQuery 读取所有历史记录、按主键排序以及处理查询中的其他操作来实现视图。如果您经常查询会话状态,则即时一致性方法会降低性能并增加在 BigQuery 中存储和处理数据的费用。

为了最大限度地降低费用,您可以将增量表更改合并到主表中,并定期从增量表中清除合并的行。合并和清除操作会产生额外的费用,但如果您经常查询主表,则费用与持续寻找增量表中某个键的最新记录的费用相比可以忽略不计。

如需将增量表中的数据合并到主表中,您可以运行 MERGE 语句,如下所示:

bq query \
'MERGE `cdc_tutorial.session_main` m
USING
  (
  SELECT * EXCEPT(row_num)
  FROM (
    SELECT *, ROW_NUMBER() OVER(PARTITION BY delta.id ORDER BY delta.change_id DESC) AS row_num
    FROM `cdc_tutorial.session_delta` delta )
  WHERE row_num = 1) d
ON  m.id = d.id
  WHEN NOT MATCHED
AND change_type IN ("I", "U") THEN
INSERT (id, username, change_id)
VALUES (d.id, d.username, d.change_id)
  WHEN MATCHED
  AND d.change_type = "D" THEN
DELETE
  WHEN MATCHED
  AND d.change_type = "U"
  AND (m.change_id < d.change_id) THEN
UPDATE
SET username = d.username, change_id = d.change_id'

上述 MERGE 语句影响了四行,并且主表包含会话的当前状态。如需在此视图中查询主表,您可以运行如下所示的查询:

  bq query 'select * from cdc_tutorial.session_main order by id limit 10'

输出类似于以下内容:

+-----+----------+-----------+
| id  | username | change_id |
+-----+----------+-----------+
| 100 | Cory     |         4 |
| 102 | Jamie    |         3 |
| 103 | NULL     |         6 |
| 104 | Jamie    |         7 |
+-----+----------+-----------+

主表中的数据反映最新会话状态。

频繁且连续地合并数据的最佳方式是使用 MERGE 语句,它允许您将 INSERTUPDATEDELETE 这几个语句转换为单个原子操作。以下是上述 MERGE 语句之间的一些细微差别:

  • session_main 表与 USING 子句中指定的数据源合并,在本例中为子查询。
  • 子查询在即时一致性方法中使用与视图相同的技术:它会在具有相同 id 值(即 ROW_NUMBER() OVER(PARTITION BY id ORDER BY change_id DESC) row_numWHERE row_num = 1 的组合)的一组记录中选择最新行。
  • 系统将对两个表的 id 列(主键)执行合并操作。
  • WHEN NOT MATCHED 子句会检查匹配项。如果没有匹配项,则查询会检查最新记录是插入还是更新,然后插入记录。
    • 如果记录匹配且更改类型为删除,则记录将在主表中被删除。
    • 当记录匹配时,更改类型为更新,并且增量表的 change_id 值大于主记录的 change_id 值,系统会更新数据(包括最新的 change_id 值)。

上述 MERGE 语句适用于以下更改的任意组合:

  • 同一主键有多个更新行:仅应用最新更新。
  • 主表中的更新不匹配:如果主表中主键下没有记录,则会插入新记录。

    此方法会跳过主表的提取过程,并从增量表开始。系统会自动填充主表。

  • 在未处理的增量批次中插入和更新行。系统会使用最新的更新行,并在主表中插入新记录。

  • 在未处理的批次中插入和删除行。未插入此记录。

上述 MERGE 语句具有幂等性:多次运行此语句会导致主表处于相同的状态,并且不会造成任何副作用。如果您重新运行 MERGE 语句而没有向增量表添加新行,则输出将如下所示:

Number of affected rows: 0

您可以定期运行 MERGE 语句,使每个表在合并后保持最新。主表中的数据新鲜度取决于合并频率。如需了解如何自动运行 MERGE 语句,请参阅先前下载的演示 README 文件中的“安排合并”部分。

混合方法

即时一致性方法和费用优化的方法并不互斥。如果您对 session_latest_v 视图和 session_main 表运行查询,则查询返回的结果相同。您可以根据您的需求和预算选择使用方法:费用更高且几乎即时一致或费用更低但可能过时的数据。以下各部分介绍如何比较各种方法和可能的替代方案。

比较方法

本部分介绍了如何通过考虑每种解决方案的费用和性能,以及可接受的数据延迟与运行合并费用之间的平衡,来比较各种方法。

查询费用

为评估每种解决方案的费用和性能,下例对由 CDC BigQuery 集成演示生成的约 50 万个会话进行了分析。该演示中的会话模型比本文档前面介绍的模型稍微复杂一些,它将部署在其他数据集中,但概念是相同的。

您可以使用简单的聚合查询来比较各查询的费用。以下示例查询将针对结合了增量数据与主表的视图测试即时一致性方法:

SELECT status, count(*) FROM `cdc_demo.session_latest_v`
GROUP BY status ORDER BY status

该查询会产生以下费用:

Slot time consumed: 15.115 sec, Bytes shuffled 80.66 MB

以下示例查询会针对主表测试费用优化的方法:

SELECT status, count(*) FROM `cdc_demo.session_main`
GROUP BY status ORDER BY status

该查询会产生以下费用:

Slot time consumed: 1.118 sec, Bytes shuffled 609 B

如果您多次执行相同的查询,则槽消耗量会有所不同,但平均值相当稳定。不同执行中的 Bytes shuffled 值是一致的。

性能测试结果因查询类型和表布局的不同而异。前面的演示未使用数据聚簇或分区功能。

数据延迟

如果使用费用优化的方法,则数据延迟时间是以下各项相加的总和:

  • 数据复制触发器的延迟时间。这是源事件期间持久保存数据之时到复制系统触发复制流程之间的这段时间。
  • 将数据插入到 BigQuery 的时间(因复制解决方案不同而异)。
  • BigQuery 流式缓冲区数据出现在增量表中的时间。如果您使用流式插入,则通常只有几秒钟时间。
  • 合并运行之间的延迟时间。
  • 执行合并的时间。

如果使用即时一致性方法,则数据延迟时间是以下各项相加的总和:

  • 数据复制触发器的延迟时间。
  • 将数据插入到 BigQuery 的时间。
  • BigQuery 流式缓冲区数据出现在增量表中的时间。

您可以根据运行合并的费用与数据一致性的需求之间的权衡取舍,来配置合并运行之间的延迟时间。必要的话,您可以使用更复杂的方案,例如在营业时间进行常见合并,在下班时段进行每小时合并。

需考虑的替代方案

将各种数据源与 BigQuery 集成时,即时一致性方法和费用优化的方法是最通用的 CDC 选项。本部分介绍了更加简单和价格更低的数据集成选项。

使用增量表作为唯一可靠来源

如果增量表包含所做更改的完整历史记录,则您可以仅针对增量表创建视图,而不使用主表。使用增量表作为唯一可靠来源是事件数据库的一个示例。这种方法能够以较低的费用提供即时一致性方法,且性能几乎不受影响。如果您拥有缓慢变化的维度表且其中包含一小部分记录,请考虑使用此方法。

不使用 CDC 的完整数据转储

如果您拥有其大小可管理的表(例如,小于 1 GB),则可以按照以下顺序轻松执行完整数据转储:

  1. 将初始数据转储导入到名称独一无二的表中。
  2. 创建仅引用新表的视图。
  3. 对视图而非底层表执行查询。
  4. 将下一个数据转储导入到另一个表中。
  5. 重新创建视图,使其指向新上传的数据。
  6. (可选)删除原始表。
  7. 重复上述步骤来执行常规导入、重新创建和删除操作。

在主表中保留更改历史记录

在费用优化的方法中,系统不会保留更改历史记录,而最新更改会覆盖先前的数据。如果您需要保留历史记录,可以使用一系列变更来存储,因此请注意不要超出最大行大小上限。当您在主表中保留更改历史记录时,合并的 DML 会更加复杂,因为单个 MERGE 操作可以将增量表中的多行合并到主表中的一行。

使用联合数据源

在某些情况下,您可以复制到 BigQuery 以外的数据源,然后使用联合查询来公开该数据源。BigQuery 支持多种外部数据源。例如,如果从 MySQL 数据库中复制类似星标的架构,您可以使用原生 MySQL 复制功能将缓慢变化的维度复制到只读版本的 MySQL 中。使用此方法时,您只需将频繁更改的事实表复制到 BigQuery 中。如果要使用联合数据源,请考虑查询联合源时受到的一些限制

进一步提升性能

本部分将介绍如何通过对表进行聚簇和分区以及删减合并数据来进一步提升性能。

对 BigQuery 表进行聚簇和分区

如果您拥有经常查询的数据集,请分析每个表的使用情况,并使用聚簇分区功能来调整表的设计。相比其他方法,根据主键将主表及/或增量表进行聚簇的性能可能更佳。如需验证性能,请对不小于 10 GB 的数据集测试查询。

删减合并的数据

增量表会随着时间的推移不断增大,并且每个合并请求都会读取最终结果不需要的多个行,从而浪费资源。如果您仅使用增量表数据来计算最新状态,则删减合并记录可以降低存储在 BigQuery 中的数据量,从而通过降低合并费用来降低总体费用。

您可以通过以下方式删减合并的数据:

  • 定期查询主表以获取最大的 change_id 值,并删除其 change_id 值低于该上限的所有增量记录。如果要将数据流式插入到增量表中,则系统可能不会删除一段时间的插入数据。
  • 使用增量表基于提取的分区功能并运行每日脚本,以删除已经过处理的分区。当有更精细的 BigQuery 分区可用时,您可以提高清除频率。如需了解实现方式,请参阅先前下载的演示 README 文件中的“删减已处理的数据”部分。

总结

要选择正确的方法或选择多种方法,请考虑您尝试解决的使用场景。您可以使用现有的数据库迁移技术来解决数据复制需求。如果您的需求很复杂(例如需要解决近乎实时的使用场景并优化剩余数据访问模式的费用),则可能需要根据其他产品或开放程式码解决方案来设置自定义数据库迁移频率。本文档中介绍的方法和技术可帮助您成功实现此类解决方案。

后续步骤