Spanner change streams to BigQuery 模板

Spanner change streams to BigQuery 模板是一种流处理流水线,可使用 Dataflow Runner V2 流式传输 Spanner 数据更改记录并将其写入 BigQuery 表。

所有变更数据流监控的列都会包含在每个 BigQuery 表行中,无论它们是否被 Spanner 事务修改。未监控的列不会包含在 BigQuery 行中。任何小于 Dataflow 水印的 Spanner 更改都会成功应用于 BigQuery 表,或存储在死信队列中进行重试。与原始 Spanner 提交时间戳排序相比,BigQuery 行插入是乱序的。

如果需要的 BigQuery 表不存在,流水线会创建这些表。否则将使用现有 BigQuery 表。现有 BigQuery 表的架构必须包含 Spanner 表的相应跟踪列以及 ignoreFields 选项未显式忽略的任何其他元数据列。请参阅以下列表中的元数据字段的说明。 每个新的 BigQuery 行都包含变更数据流在更改记录的时间戳从 Spanner 表中的对应行中监控的所有列。

以下元数据字段会添加到 BigQuery 表中。如需详细了解这些字段,请参阅“变更数据流分区、记录和查询”中的数据更改记录

使用此模板时,请注意以下详细信息:

  • 此模板不会将 Spanner 中的架构更改传播到 BigQuery。由于在 Spanner 中执行架构更改可能会中断流水线,因此您可能需要在架构更改后重新创建流水线。
  • 对于 OLD_AND_NEW_VALUESNEW_VALUES 值捕获类型,当数据更改记录包含 UPDATE 更改时,模板需要在数据更改记录的提交时间戳对 Spanner 执行过时读取,以检索未更改但受监控的列。请确保为过时读取正确配置数据库“version_retention_period”。对于 NEW_ROW 值捕获类型,此模板效率更高,因为数据更改记录会捕获整个新行(包括 UPDATE 请求中未更新的列),并且模板不需要执行过时读取。
  • 如需最大限度地减少网络延迟和网络传输费用,请在 Spanner 实例或 BigQuery 表所在的区域中运行 Dataflow 作业。如果您使用位于作业区域之外的源、接收器、暂存文件位置或临时文件位置,则数据可能会跨区域发送。如需了解详情,请参阅 Dataflow 区域
  • 此模板支持所有有效的 Spanner 数据类型。如果 BigQuery 类型比 Spanner 类型更精确,则在转换期间可能会发生精度损失。具体而言:
    • 对于 Spanner JSON 类型,对象的成员顺序按字典顺序排列,但对于 BigQuery JSON 类型没有此类保证。
    • Spanner 支持纳秒 TIMESTAMP 类型,但 BigQuery 仅支持微秒 TIMESTAMP 类型。
  • 此模板不支持以“正好一次”模式使用 BigQuery Storage Write API

详细了解变更数据流如何构建变更数据流 Dataflow 流水线最佳实践

流水线要求

  • 在运行流水线之前,Spanner 实例必须已存在。
  • 在运行流水线之前,Spanner 数据库必须已存在。
  • 在运行流水线之前,Spanner 元数据实例必须已存在。
  • 在运行流水线之前,Spanner 元数据数据库必须已存在。
  • 在运行流水线之前,Spanner 变更数据流必须已存在。
  • 在运行流水线之前,BigQuery 数据集必须已存在。

模板参数

参数 说明
spannerInstanceId 要从中读取变更数据流的 Spanner 实例。
spannerDatabase 要从中读取变更数据流的 Spanner 数据库。
spannerDatabaseRole 可选:运行模板时使用的 Spanner 数据库角色。仅当运行模板的 IAM 主账号是精细访问权限控制用户时,才需要此参数。数据库角色必须拥有变更数据流的 SELECT 特权和变更数据流的读取函数的 EXECUTE 特权。如需了解详情,请参阅变更数据流的精细访问权限控制
spannerMetadataInstanceId 要用于变更数据流连接器元数据表的 Spanner 实例。
spannerMetadataDatabase 要用于变更数据流连接器元数据表的 Spanner 数据库。
spannerChangeStreamName 要从中读取数据的 Spanner 变更数据流的名称。
bigQueryDataset 变更数据流输出的 BigQuery 数据集。
spannerProjectId 可选:读取变更数据流的项目。此值也是创建变更数据流连接器元数据表的项目。此参数的默认值是运行 Dataflow 流水线的项目。
spannerMetadataTableName 可选:要使用的 Spanner 变更数据流连接器元数据表名称。如果未提供,系统会在流水线流期间自动创建 Spanner 变更数据流连接器元数据表。更新现有流水线时,您必须提供此参数。 否则,请勿提供此参数。
rpcPriority 可选:Spanner 调用的请求优先级。该值必须为以下值之一:HIGHMEDIUMLOW。默认值为 HIGH
startTimestamp 可选:要用于读取变更数据流的起始 DateTime(含边界值)。Ex-2021-10-12T07:20:50.52Z。默认为流水线启动时的时间戳,即当前时间。
endTimestamp 可选:要用于读取变更数据流的结束 DateTime(含边界值)。Ex-2021-10-12T07:20:50.52Z。默认为未来的无限时间。
bigQueryProjectId 可选:BigQuery 项目。默认值为 Dataflow 作业的项目。
bigQueryChangelogTableNameTemplate 可选:BigQuery 更新日志表名称的模板。默认为 _metadata_spanner_table_name_changelog。
deadLetterQueueDirectory 可选:用于存储处理失败的任何未处理记录的路径。默认路径为 Dataflow 作业的临时位置下的目录。默认值通常就足够了。
dlqRetryMinutes 可选:死信队列重试之间的分钟数。默认值为 10。
ignoreFields 可选:要忽略的字段(区分大小写)的逗号分隔列表。这些字段可以是被监控表的字段,也可以是流水线添加的元数据字段。被忽略的字段不会插入到 BigQuery 中。如果您忽略 _metadata_spanner_table_name 字段,则 bigQueryChangelogTableNameTemplate 参数也会被忽略。
useStorageWriteApi 可选:如果为 true,则流水线使用 BigQuery Storage Write API。默认值为 false。如需了解详情,请参阅使用 Storage Write API
useStorageWriteApiAtLeastOnce 可选:使用 Storage Write API 时,请指定写入语义。如需使用“至少一次”语义,请将此参数设置为 true。如需使用“正好一次”语义,请将参数设置为 false。仅当 useStorageWriteApitrue 时,此参数才适用。默认值为 false
numStorageWriteApiStreams 可选:使用 Storage Write API 时,指定写入流的数量。如果 useStorageWriteApitrueuseStorageWriteApiAtLeastOncefalse,则必须设置此参数。
storageWriteApiTriggeringFrequencySec 可选:使用 Storage Write API 时,指定触发频率(以秒为单位)。如果 useStorageWriteApitrueuseStorageWriteApiAtLeastOncefalse,则必须设置此参数。

运行模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Cloud Spanner change streams to BigQuery template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_Change_Streams_to_BigQuery \
    --region REGION_NAME \
    --parameters \
spannerInstanceId=SPANNER_INSTANCE_ID,\
spannerDatabase=SPANNER_DATABASE,\
spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\
spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\
spannerChangeStreamName=SPANNER_CHANGE_STREAM,\
bigQueryDataset=BIGQUERY_DATASET

请替换以下内容:

  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

  • REGION_NAME:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • SPANNER_INSTANCE_ID:Spanner 实例 ID
  • SPANNER_DATABASE:Spanner 数据库
  • SPANNER_METADATA_INSTANCE_ID:Spanner 元数据实例 ID
  • SPANNER_METADATA_DATABASE:Spanner 元数据数据库
  • SPANNER_CHANGE_STREAM:Spanner 变更数据流
  • BIGQUERY_DATASET:变更数据流输出的 BigQuery 数据集

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "spannerInstanceId": "SPANNER_INSTANCE_ID",
          "spannerDatabase": "SPANNER_DATABASE",
          "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID",
          "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE",
          "spannerChangeStreamName": "SPANNER_CHANGE_STREAM",
          "bigQueryDataset": "BIGQUERY_DATASET"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_Change_Streams_to_BigQuery",
   }
}

请替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Google Cloud 项目的 ID
  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

  • LOCATION:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • SPANNER_INSTANCE_ID:Spanner 实例 ID
  • SPANNER_DATABASE:Spanner 数据库
  • SPANNER_METADATA_INSTANCE_ID:Spanner 元数据实例 ID
  • SPANNER_METADATA_DATABASE:Spanner 元数据数据库
  • SPANNER_CHANGE_STREAM:Spanner 变更数据流
  • BIGQUERY_DATASET:变更数据流输出的 BigQuery 数据集

后续步骤