使用变更数据捕获来流式插入表更新

BigQuery 变更数据捕获 (CDC) 通过处理流式插入的更改并将其应用于现有数据来更新 BigQuery 表。此同步通过更新/插入和删除行操作来完成,BigQuery Storage Write API 会实时流式插入这些操作,您应该先熟悉此 API,然后再继续操作。

准备工作

授予为用户提供执行本文档中的每个任务所需权限的 Identity and Access Management (IAM) 角色,并确保您的工作流满足每个前提条件。

所需权限

如需获得使用 Storage Write API 所需的权限,请让您的管理员为您授予 BigQuery Data Editor (roles/bigquery.dataEditor) IAM 角色。 如需详细了解如何授予角色,请参阅管理访问权限

此预定义角色可提供 bigquery.tables.updateData 权限,使用 Storage Write API 需要该权限。

您也可以使用自定义角色或其他预定义角色来获取此权限。

如需详细了解 BigQuery 中的 IAM 角色和权限,请参阅 IAM 简介

前提条件

如需使用 BigQuery CDC,您的工作流必须满足以下条件:

  • 您必须在默认流中使用 Storage Write API。
  • 您必须为 BigQuery 中的目标表声明主键。支持最多包含 16 列的复合主键。
  • 必须有足够的 BigQuery 计算资源来执行 CDC 行操作。请注意,如果 CDC 行修改操作失败,则您可能无意中保留了要删除的数据。如需了解详情,请参阅已删除的数据注意事项

指定对现有记录的更改

在 BigQuery CDC 中,伪列 _CHANGE_TYPE 表示要针对每行处理的更改类型。如需使用 CDC,请在使用 Storage Write API 流式插入行修改时设置 _CHANGE_TYPE。伪列 _CHANGE_TYPE 仅接受值 UPSERTDELETE。当 Storage Write API 以这种方式流式传输行修改时,表被视为支持 CDC。

使用 UPSERTDELETE 值的示例

请考虑 BigQuery 中的以下表:

ID 名称 薪资
100 账单 2000
101 Lucy 3000
102 Ethan 5000

Storage Write API 会流式插入以下行修改:

ID 名称 薪资 _CHANGE_TYPE
100 DELETE
101 Lucy 8000 UPSERT
105 最大值 6000 UPSERT

更新后的表现在如下:

ID 名称 薪资
101 Lucy 8000
102 Ethan 5000
105 最大值 6000

管理表过时

默认情况下,每次运行查询时,BigQuery 都会返回最新结果。为了在查询启用了 CDC 的表时提供最新结果,BigQuery 必须在查询开始之前应用每个流式插入的行修改,以便查询最新版本的表。在查询运行时应用这些行修改会增加查询延迟时间和费用。但是,如果您不需要全新的查询结果,则可以对表设置 max_staleness 选项来降低查询费用和延迟时间。设置此选项后,BigQuery 会在 max_staleness 值定义的时间间隔内至少应用一次行修改,让您可以运行查询,而无需等待应用更新,但代价是一些数据过时。

此行为对于数据新鲜度不是必需的信息中心和报告特别有用。它还可让您更好地控制 BigQuery 应用行修改的频率,从而帮助您管理费用。

查询设置了 max_staleness 选项的表

当您查询设置了 max_staleness 选项的表时,BigQuery 会根据 max_staleness 的值以及上次发生应用作业的时间(由表的 upsert_stream_apply_watermark 时间戳表示)返回结果。

请考虑以下示例,其中表的 max_staleness 选项设置为 10 分钟,最近的应用作业发生在 T20:

查询运行时间在数据过时的最长时间间隔内。

如果您在 T25 查询表,则表的当前版本过时 5 分钟,小于 10 分钟的 max_staleness 时间间隔。在本例中,BigQuery 返回 T20 处的表版本,这意味着返回的数据也过时 5 分钟。

如果您对表设置 max_staleness 选项,BigQuery 会在 max_staleness 时间间隔内至少应用一次待处理的行修改。但是,在某些情况下,BigQuery 可能无法完成在该时间间隔内应用这些待处理的行修改的过程。

例如,如果您在 T35 查询表,并且应用待处理的行修改的过程尚未完成,则表的当前版本过时 15 分钟,该值大于 10 分钟的 max_staleness 时间间隔。在本例中,查询运行时,BigQuery 会应用 T20 和 T35 之间的所有行修改,这意味着数据是全新的,但代价是额外增加一些查询延迟时间。 此作业被视为运行时合并作业。

查询运行时间超出数据过时的最长时间间隔。

表的 max_staleness 值通常应为以下两个值中的较大者:

  • 工作流的最大可容忍数据过时。
  • 将插入/更新的更改应用到表以及一些额外的缓冲区所需的最长时间的两倍。

如需计算将插入/更改的更改应用到现有表所需的时间,请使用以下 SQL 查询来确定后台应用作业的第 95 百分位时长,并允许一个七分钟的缓冲区,以允许 BigQuery 写入优化存储空间(流式缓冲区)转换。

SELECT
  project_id,
  destination_table.dataset_id,
  destination_table.table_id,
  APPROX_QUANTILES((TIMESTAMP_DIFF(end_time, creation_time,MILLISECOND)/1000), 100)[OFFSET(95)] AS p95_background_apply_duration_in_seconds,
  CEILING(APPROX_QUANTILES((TIMESTAMP_DIFF(end_time, creation_time,MILLISECOND)/1000), 100)[OFFSET(95)]*2/60)+7 AS recommended_max_staleness_with_buffer_in_minutes
FROM `region-us`.INFORMATION_SCHEMA.JOBS AS job
WHERE
  project_id = 'PROJECT_ID'
  AND DATE(creation_time) BETWEEN DATE_SUB(CURRENT_DATE(), INTERVAL 7 DAY) AND CURRENT_DATE()
  AND job_id LIKE "%cdc_background%"
GROUP BY 1,2,3;

PROJECT_ID 替换为包含 BigQuery CDC 要修改的 BigQuery 表的项目的 ID。

后台应用作业的时长受多种因素影响,包括过时间隔内发出的 CDC 操作的数量和复杂性、表大小和 BigQuery 资源可用性。如需详细了解资源可用性,请参阅调整和监控 BACKGROUND 预留

创建使用 max_staleness 选项的表

如需创建使用 max_staleness 选项的表,请使用 CREATE TABLE 语句。以下示例会创建 employees 表,其 max_staleness 限制为 10 分钟:

CREATE TABLE employees (
  id INT64 PRIMARY KEY NOT ENFORCED,
  name STRING)
  CLUSTER BY
    id
  OPTIONS (
    max_staleness = INTERVAL 10 MINUTE);

修改现有表的 max_staleness 选项

如需在现有表中添加或修改 max_staleness 限制,请使用 ALTER TABLE 语句。以下示例会将 employees 表的 max_staleness 限制更改为 15 分钟:

ALTER TABLE employees
SET OPTIONS (
  max_staleness = INTERVAL 15 MINUTE);

确定表的当前 max_staleness

如需确定表的当前 max_staleness 值,请查询 INFORMATION_SCHEMA.TABLE_OPTIONS 视图。以下示例会检查表 mytable 的当前 max_staleness 值:

SELECT
  option_name,
  option_value
FROM
  DATASET_NAME.INFORMATION_SCHEMA.TABLE_OPTIONS
WHERE
  option_name = 'max_staleness'
  AND table_name = 'TABLE_NAME';

请替换以下内容:

  • DATASET_NAME:启用 CDC 的表所在的数据集的名称。
  • TABLE_NAME:启用 CDC 的表的名称。

结果显示 max_staleness 值为 10 分钟:

+---------------------+--------------+
| Row |  option_name  | option_value |
+---------------------+--------------+
|  1  | max_staleness | 0-0 0 0:10:0 |
+---------------------+--------------+

监控表插入/更新操作的进度

如需监控表的状态并检查上次应用行修改的时间,请查询 INFORMATION_SCHEMA.TABLES 视图以获取 upsert_stream_apply_watermark 时间戳。

以下示例会检查表 mytableupsert_stream_apply_watermark 值:

SELECT upsert_stream_apply_watermark
FROM DATASET_NAME.INFORMATION_SCHEMA.TABLES
WHERE table_name = 'TABLE_NAME';

请替换以下内容:

  • DATASET_NAME:启用 CDC 的表所在的数据集的名称。
  • TABLE_NAME:启用 CDC 的表的名称。

结果类似于以下内容:

[{
 "upsert_stream_apply_watermark": "2022-09-15T04:17:19.909Z"
}]

插入/更新操作由 bigquery-adminbot@system.gserviceaccount.com 服务账号执行,并显示在包含启用了 CDC 的表的项目的作业历史记录中。

配置 BigQuery 预留以用于 CDC

您可以使用 BigQuery 预留为 CDC 行修改操作分配专用的 BigQuery 计算资源。借助预留,您可以设置这些操作的费用上限。此方法对于针对大型表执行频繁 CDC 操作的工作流特别有用,由于执行每个操作时要处理大量字节,因此按需费用较高。

max_staleness 间隔内应用待处理行修改的 BigQuery CDC 作业被视为后台作业,并使用 BACKGROUND 分配类型而不是 QUERY 分配类型。相比之下,需要在查询运行时应用行修改的 max_staleness 间隔之外的查询使用 QUERY 分配类型。在没有 BACKGROUND 分配的情况下执行的 BigQuery CDC 后台作业采用按需价格。在为 BigQuery CDC 设计工作负载管理策略时,此注意事项非常重要。

如需配置 BigQuery 预留以用于 CDC,请先购买容量承诺,并在 BigQuery 表所在的区域中配置预留。如需查看有关预留大小的指导信息,请参阅调整 BACKGROUND 预留的大小并进行监控。创建预留后,为预留分配 BigQuery 项目,并通过运行以下 CREATE ASSIGNMENT 语句job_type 选项设置为 BACKGROUND

CREATE ASSIGNMENT
  `ADMIN_PROJECT_ID.region-LOCATION.RESERVATION_NAME.ASSIGNMENT_ID`
OPTIONS (
  assignee = 'projects/PROJECT_ID',
  job_type = 'BACKGROUND');

请替换以下内容:

  • ADMIN_PROJECT_ID:拥有预留的管理项目的 ID。
  • LOCATION:预留的位置
  • RESERVATION_NAME:预留的名称。
  • ASSIGNMENT_ID:分配的 ID。此 ID 对项目和位置来说必须是唯一的,以小写字母或数字开头和结尾,并且只能包含小写字母、数字和短划线。
  • PROJECT_ID:包含 BigQuery CDC 所修改的 BigQuery 表的项目的 ID。此项目已分配到预留。

调整 BACKGROUND 预留的大小并进行监控

预留决定了可用于执行 BigQuery 计算操作的计算资源数量。预留大小不足可能会增加 CDC 行修改操作的处理时间。如需准确调整预留大小,请通过查询 INFORMATION_SCHEMA.JOBS_TIMELINE 视图来监控执行 CDC 操作的项目的历史槽用量:

SELECT
  period_start,
  SUM(period_slot_ms) / (1000 * 60) AS slots_used
FROM
  REGION.INFORMATION_SCHEMA.JOBS_TIMELINE_BY_PROJECT
WHERE
  DATE(job_creation_time) BETWEEN DATE_SUB(CURRENT_DATE(), INTERVAL 7 DAY)
  AND CURRENT_DATE()
  AND job_id LIKE '%cdc_background%'
GROUP BY
  period_start
ORDER BY
  period_start DESC;

REGION 替换为您的项目所在的区域名称。例如 region-us

已删除的数据注意事项

  • BigQuery CDC 操作利用 BigQuery 计算资源。如果 CDC 操作配置为使用按需结算,则系统会使用内部 BigQuery 资源定期执行 CDC 操作。如果 CDC 操作配置了 BACKGROUND 预留,则 CDC 操作取决于已配置的预留的资源可用性。如果已配置的预留中没有足够的资源,则处理 CDC 操作(包括删除)的时间可能会超出预期。
  • 仅当 upsert_stream_apply_watermark 时间戳超过 Storage Write API 流式插入操作的时间戳时,才会考虑应用 CDC DELETE 操作。应用该操作后,标准 Google Cloud 数据删除过程将开始。如需详细了解 upsert_stream_apply_watermark 时间戳,请参阅监控表更新/插入操作进度

限制

  • BigQuery CDC 不会强制要求使用键,因此主键必须是唯一的。
  • 主键不能超过 16 列。
  • 启用了 CDC 的表不支持以下各项:
  • 由于表的 max_staleness 值过低,执行运行时合并作业的启用了 CDC 的表无法支持以下各项:
  • 对启用了 CDC 的表的 BigQuery 导出操作不会导出最近被后台作业应用的流式行修改。如需导出完整表,请使用 EXPORT DATA 语句
  • 如果您的查询触发了分区表上的运行时合并,则系统会扫描整个表,无论查询是否仅限于一部分分区。
  • 如果您使用的是标准版,则 BACKGROUND 预留不可用,因此应用待处理的行修改时使用的是按需价格模式。但是,无论您使用什么版本,都可以查询启用 CDC 的表。

BigQuery CDC 价格

BigQuery CDC 使用 Storage Write API 注入数据,使用 BigQuery 存储存储数据,并使用 BigQuery 计算执行行修改操作,所有这些都会产生费用。如需了解价格信息,请参阅 BigQuery 价格

估算 BigQuery CDC 费用

除了一般的 BigQuery 费用估算最佳实践,估算 BigQuery CDC 的费用对于具有大量数据、低 max_staleness 配置或频繁更改的数据的工作流可能很重要。

BigQuery 数据注入价格BigQuery 存储价格直接按您注入和存储的数据量计算。但是,BigQuery 计算价格可能更难估算,因为它与用于运行 BigQuery CDC 作业的计算资源消耗有关。

BigQuery CDC 作业分为三类:

  • 后台应用作业:在后台定期运行的作业,这些作业由表的 max_staleness 值定义。这些作业会应用最近流式插入到启用了 CDC 的表的行修改。
  • 查询作业:max_staleness 时段内运行且仅从 CDC 基准表中读取的 GoogleSQL 查询。
  • 运行时合并作业:max_staleness 时段之外运行的临时 GoogleSQL 查询触发的作业。这些作业必须在查询运行时对 CDC 基准表和最近流式插入的行修改执行即时合并。

所有三种类型的 BigQuery CDC 作业都使用 BigQuery 聚类,但只有查询作业利用 BigQuery 分区。后台应用作业和运行时合并作业不能使用分区,因为在应用最近流式插入的行修改时,无法保证最近流式插入的 upsert 会应用于哪个表分区。换句话说,系统会在后台应用作业和运行时合并作业期间读取完整的基准表。了解执行 CDC 操作要读取的数据量有助于估算总费用。

如果从表基准中读取的数据量很大,请考虑使用 BigQuery 容量价格模式,该模式并非基于处理的数据量。

BigQuery CDC 费用最佳实践

除了一般的 BigQuery 费用最佳实践之外,您还可以使用以下方法优化 BigQuery CDC 操作的费用:

  • 除非必要,否则请避免使用非常低的值配置表的 max_staleness 选项。max_staleness 值可能会增加后台应用作业和运行时合并作业的出现次数,这些作业比查询作业的费用更高且速度更慢。如需获取详细指导,请参阅建议的表 max_staleness
  • 考虑配置 BigQuery 预留以用于 CDC 表。否则,后台应用作业和运行时合并作业使用按需价格,由于数据处理量较多,按需价格的费用可能更高。如需了解详情,请参阅 BigQuery 预留,并按照有关如何调整 BACKGROUND 预留的大小并进行监控以用于 BigQuery CDC 的指导操作。

后续步骤