Spanner change streams to BigQuery 模板

Spanner change streams to BigQuery 模板是一种流处理流水线,可使用 Dataflow Runner V2 流式传输 Spanner 数据更改记录并将其写入 BigQuery 表。

所有变更数据流监控的列都会包含在每个 BigQuery 表行中,无论它们是否被 Spanner 事务修改。未监控的列不会包含在 BigQuery 行中。任何小于 Dataflow 水印的 Spanner 更改都会成功应用于 BigQuery 表,或存储在死信队列中进行重试。与原始 Spanner 提交时间戳排序相比,BigQuery 行插入是乱序的。

如果需要的 BigQuery 表不存在,流水线会创建这些表。否则将使用现有 BigQuery 表。现有 BigQuery 表的架构必须包含 Spanner 表的相应跟踪列以及 ignoreFields 选项未显式忽略的任何其他元数据列。请参阅以下列表中的元数据字段的说明。 每个新的 BigQuery 行都包含变更数据流在更改记录的时间戳从 Spanner 表中的对应行中监控的所有列。

以下元数据字段会添加到 BigQuery 表中。如需详细了解这些字段,请参阅“变更数据流分区、记录和查询”中的数据更改记录

  • _metadata_spanner_mod_type:Spanner 事务的修改类型(插入、更新或删除)。从变更数据流数据更改记录中提取。
  • _metadata_spanner_table_name:Spanner 表名称。此字段不是连接器的元数据表名称。
  • _metadata_spanner_commit_timestamp:Spanner 的提交时间戳,这是提交更改的时间。此值从变更数据流数据更改记录中提取。
  • _metadata_spanner_server_transaction_id:代表 Spanner 事务的全局唯一字符串,更改在该事务中提交。仅在处理变更数据流记录的上下文中使用此值。它与 Spanner API 中的事务 ID 无关。此值从变更数据流数据更改记录中提取。
  • _metadata_spanner_record_sequence:Spanner 事务中记录的序列号。序列号在事务中一定是唯一且单调递增的,但不一定是连续的。此值从变更数据流数据更改记录中提取。
  • _metadata_spanner_is_last_record_in_transaction_in_partition:指示记录是否为当前分区中该 Spanner 事务的最后一个记录。此值从变更数据流数据更改记录中提取。
  • _metadata_spanner_number_of_records_in_transaction:所有变更数据流分区中属于该 Spanner 事务的数据更改记录数量。此值从变更数据流数据更改记录中提取。
  • _metadata_spanner_number_of_partitions_in_transaction:返回 Spanner 事务的数据更改记录的分区数量。此值从变更数据流数据更改记录中提取。
  • _metadata_big_query_commit_timestamp:行插入 BigQuery 时的提交时间戳。如果 useStorageWriteApitrue,则流水线不会在更新日志表中自动创建此列。在这种情况下,您必须根据需要在更改日志表中手动添加此列。

使用此模板时,请注意以下详细信息:

  • 您可以使用此模板将现有表或新表中的新列从 Spanner 传播到 BigQuery。如需了解详情,请参阅处理添加跟踪表或列
  • 对于 OLD_AND_NEW_VALUESNEW_VALUES 值捕获类型,当数据更改记录包含 UPDATE 更改时,模板需要在数据更改记录的提交时间戳对 Spanner 执行过时读取,以检索未更改但受监控的列。请确保为过时读取正确配置数据库“version_retention_period”。对于 NEW_ROW 值捕获类型,此模板效率更高,因为数据更改记录会捕获整个新行(包括 UPDATE 请求中未更新的列),并且模板不需要执行过时读取。
  • 如需最大限度地减少网络延迟和网络传输费用,请在 Spanner 实例或 BigQuery 表所在的区域中运行 Dataflow 作业。如果您使用位于作业区域之外的源、接收器、暂存文件位置或临时文件位置,则数据可能会跨区域发送。如需了解详情,请参阅 Dataflow 区域
  • 此模板支持所有有效的 Spanner 数据类型。如果 BigQuery 类型比 Spanner 类型更精确,则在转换期间可能会发生精度损失。具体而言:
    • 对于 Spanner JSON 类型,对象的成员顺序按字典顺序排列,但对于 BigQuery JSON 类型没有此类保证。
    • Spanner 支持纳秒 TIMESTAMP 类型,但 BigQuery 仅支持微秒 TIMESTAMP 类型。
  • 此模板不支持以“正好一次”模式使用 BigQuery Storage Write API

详细了解变更数据流如何构建变更数据流 Dataflow 流水线最佳实践

流水线要求

  • 在运行流水线之前,Spanner 实例必须已存在。
  • 在运行流水线之前,Spanner 数据库必须已存在。
  • 在运行流水线之前,Spanner 元数据实例必须已存在。
  • 在运行流水线之前,Spanner 元数据数据库必须已存在。
  • 在运行流水线之前,Spanner 变更数据流必须已存在。
  • 在运行流水线之前,BigQuery 数据集必须已存在。

处理添加跟踪表或跟踪列

本部分介绍了在流水线运行期间处理添加跟踪 Spanner 表和列的最佳实践。

  • 在将新列添加到 Spanner 变更数据流范围之前,请先将该列添加到 BigQuery 更改日志表。添加的列必须具有匹配的数据类型,并且为 NULLABLE。 请等待至少 10 分钟,然后继续在 Spanner 中创建新列或新表。如果在等待完成之前写入新列,可能会导致未处理的记录在死信队列目录中显示无效错误代码。
  • 如需添加新表,请先在 Spanner 数据库中添加该表。 当流水线收到新表的记录时,系统会在 BigQuery 中自动创建该表。
  • 在 Spanner 数据库中添加新列或新表后,请务必更改变更数据流以跟踪所需的新列或新表(如果尚未隐式跟踪)。
  • 此模板不会从 BigQuery 中删除表或列。如果从 Spanner 表中删除了某个列,则在从 Spanner 表中删除该列后生成的记录的 BigQuery 更改日志列中将填充 null 值,除非您手动从 BigQuery 中删除该列。
  • 该模板不支持列类型更新。虽然 Spanner 支持将 STRING 列更改为 BYTES 列或将 BYTES 列更改为 STRING 列,但您无法在 BigQuery 中修改现有列的数据类型或使用具有不同数据类型的相同列名称。 如果您在 Spanner 中删除并重新创建名称相同但类型不同的列,数据可能会写入现有的 BigQuery 列,但类型保持不变。
  • 此模板不支持列模式更新。复制到 BigQuery 的元数据列会设置为 REQUIRED 模式。复制到 BigQuery 的所有其他列都设置为 NULLABLE,无论它们在 Spanner 表中是否定义为 NOT NULL。您无法在 BigQuery 中将 NULLABLE 列更新为 REQUIRED 模式。
  • 运行中的流水线不支持更改变更数据流的值捕获类型

模板参数

必需参数

  • spannerInstanceId:要从中读取变更数据流的 Spanner 实例。
  • spannerDatabase:要从中读取变更数据流的 Spanner 数据库。
  • spannerMetadataInstanceId:要用于变更数据流连接器元数据表的 Spanner 实例。
  • spannerMetadataDatabase:要用于变更数据流连接器元数据表的 Spanner 数据库。
  • spannerChangeStreamName:要从中读取数据的 Spanner 变更数据流的名称。
  • bigQueryDataset:变更数据流输出的 BigQuery 数据集。

可选参数

  • spannerProjectId:读取变更数据流的项目。此值也是创建变更数据流连接器元数据表的项目。此参数的默认值是运行 Dataflow 流水线的项目。
  • spannerDatabaseRole:运行模板时使用的 Spanner 数据库角色。仅当运行模板的 IAM 主账号是精细访问权限控制用户时,才需要此参数。数据库角色必须拥有变更数据流的 SELECT 特权和变更数据流的读取函数的 EXECUTE 特权。如需了解详情,请参阅变更数据流的精细访问权限控制 (https://cloud.google.com/spanner/docs/fgac-change-streams)。
  • spannerMetadataTableName:要使用的 Spanner 变更数据流连接器元数据表名称。如果未提供,系统会在流水线流期间自动创建 Spanner 变更数据流连接器元数据表。更新现有流水线时,您必须提供此参数。 否则,请勿提供此参数。
  • rpcPriority:Spanner 调用的请求优先级。该值必须为以下值之一:HIGHMEDIUMLOW。默认值为 HIGH
  • spannerHost:要在模板中调用的 Cloud Spanner 端点。仅用于测试。(示例:https://batch-spanner.googleapis.com)。
  • startTimestamp:要用于读取变更数据流的起始 DateTime (https://datatracker.ietf.org/doc/html/rfc3339)(含边界值)。Ex-2021-10-12T07:20:50.52Z. 默认为流水线启动时的时间戳,即当前时间。
  • endTimestamp:用于读取变更数据流的结束日期时间 (https://datatracker.ietf.org/doc/html/rfc3339)(含边界值),例如:2021-10-12T07:20:50.52Z。默认为未来的无限时间。
  • bigQueryProjectId:BigQuery 项目。默认值为 Dataflow 作业的项目。
  • bigQueryChangelogTableNameTemplate:包含更改日志的 BigQuery 表的名称模板。默认为 {_metadata_spanner_table_name}_changelog。
  • deadLetterQueueDirectory:用于存储任何未处理记录的路径。默认路径为 Dataflow 作业的临时位置下的目录。默认值通常就足够了。
  • dlqRetryMinutes:死信队列重试之间的分钟数。默认值为 10。
  • ignoreFields:要忽略的字段(区分大小写)的逗号分隔列表。这些字段可以是被监控表的字段,也可以是流水线添加的元数据字段。被忽略的字段不会插入到 BigQuery 中。如果您忽略 _metadata_spanner_table_name 字段,则 bigQueryChangelogTableNameTemplate 参数也会被忽略。默认值为空。
  • disableDlqRetries:是否为 DLQ 停用重试。默认值为:false。
  • useStorageWriteApi:如果为 true,则流水线使用 BigQuery Storage Write API (https://cloud.google.com/bigquery/docs/write-api)。默认值为 false。如需了解详情,请参阅“使用 Storage Write API”(https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api)。
  • useStorageWriteApiAtLeastOnce:使用 Storage Write API 时,指定写入语义。如需使用“至少一次”语义 (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics),请将此参数设置为 true。如需使用“正好一次”语义,请将参数设置为 false。仅当 useStorageWriteApitrue 时,此参数才适用。默认值为 false
  • numStorageWriteApiStreams:使用 Storage Write API 时,指定写入流的数量。如果 useStorageWriteApitrueuseStorageWriteApiAtLeastOncefalse,则必须设置此参数。默认值为 0。
  • storageWriteApiTriggeringFrequencySec:使用 Storage Write API 时,指定触发频率(以秒为单位)。如果 useStorageWriteApitrueuseStorageWriteApiAtLeastOncefalse,则必须设置此参数。

运行模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Cloud Spanner change streams to BigQuery template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_Change_Streams_to_BigQuery \
    --region REGION_NAME \
    --parameters \
spannerInstanceId=SPANNER_INSTANCE_ID,\
spannerDatabase=SPANNER_DATABASE,\
spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\
spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\
spannerChangeStreamName=SPANNER_CHANGE_STREAM,\
bigQueryDataset=BIGQUERY_DATASET

替换以下内容:

  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

  • REGION_NAME:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • SPANNER_INSTANCE_ID:Spanner 实例 ID
  • SPANNER_DATABASE:Spanner 数据库
  • SPANNER_METADATA_INSTANCE_ID:Spanner 元数据实例 ID
  • SPANNER_METADATA_DATABASE:Spanner 元数据数据库
  • SPANNER_CHANGE_STREAM:Spanner 变更数据流
  • BIGQUERY_DATASET:变更数据流输出的 BigQuery 数据集

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "spannerInstanceId": "SPANNER_INSTANCE_ID",
          "spannerDatabase": "SPANNER_DATABASE",
          "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID",
          "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE",
          "spannerChangeStreamName": "SPANNER_CHANGE_STREAM",
          "bigQueryDataset": "BIGQUERY_DATASET"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_Change_Streams_to_BigQuery",
   }
}

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Google Cloud 项目的 ID
  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

  • LOCATION:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • SPANNER_INSTANCE_ID:Spanner 实例 ID
  • SPANNER_DATABASE:Spanner 数据库
  • SPANNER_METADATA_INSTANCE_ID:Spanner 元数据实例 ID
  • SPANNER_METADATA_DATABASE:Spanner 元数据数据库
  • SPANNER_CHANGE_STREAM:Spanner 变更数据流
  • BIGQUERY_DATASET:变更数据流输出的 BigQuery 数据集

后续步骤