Bigtable 变更数据流至 BigQuery 模板

Bigtable change streams to BigQuery 模板是一种流处理流水线,可使用 Dataflow 流式传输 Bigtable 数据更改记录并将其写入 BigQuery 表。

Bigtable 变更数据流可让您按表订阅数据变更。订阅表变更数据流时,需要遵循以下限制条件:

  • 仅返回修改后的单元格和删除操作的描述符。
  • 仅返回修改后的单元格的新值。

将数据变更记录写入 BigQuery 时,与原始 Bigtable 提交时间戳排序相比,可能会无序插入行。

由于永久性错误而无法写入 BigQuery 的变更日志表行将被永久放入 Cloud Storage 中的死信队列(未处理的消息队列)目录,以供用户审核或进一步处理。

如果需要的 BigQuery 表不存在,流水线会创建该表。否则将使用现有 BigQuery 表。现有 BigQuery 表的架构必须包含下表中的列。

每个新的 BigQuery 行都包含变更数据流从 Bigtable 表中的对应行返回的数据变更记录。

BigQuery 输出表架构

列名 类型 可以为 Null 说明
row_key STRINGBYTES 已变更行的行键。当 writeRowkeyAsBytes 流水线选项设置为 true 时,列的类型必须是 BYTES。否则,请使用 STRING 类型。
mod_type STRING 行变更的类型。请使用以下某个值:SET_CELLDELETE_CELLSDELETE_FAMILY
column_family STRING 受行变更影响的列族。
column STRING 受行变更影响的列限定符。对于 DELETE_FAMILY 变更类型,请设置为 NULL
commit_timestamp TIMESTAMP Bigtable 应用变更的时间。
big_query_commit_timestamp TIMESTAMP 可选:指定 BigQuery 将行写入输出表的时间。如果 bigQueryChangelogTableFieldsToIgnore 流水线选项值中存在列名称,则系统不会填充该字段。
timestamp TIMESTAMPINT64 受变更影响的单元的时间戳值。当 writeNumericTimestamps 流水线选项设置为 true 时,列的类型必须是 INT64。否则,请使用 TIMESTAMP 类型。对于 DELETE_CELLSDELETE_FAMILY 变更类型,请设置为 NULL
timestamp_from TIMESTAMPINT64 描述 DELETE_CELLS 变更删除的所有单元的时间戳间隔(含边界值)开始。对于其他变更类型,请设置为 NULL
timestamp_to TIMESTAMPINT64 描述 DELETE_CELLS 变更删除的所有单元的时间戳间隔的专有结尾。对于其他变更类型,请设置为 NULL
is_gc BOOL 可选:当垃圾回收政策触发变更时,请设置为 true。在所有其他情况下,请设置为 false。如果 bigQueryChangelogTableFieldsToIgnore 流水线选项值中存在列名称,则系统不会填充该字段。
source_instance STRING 可选:描述变更所来自的 Bigtable 实例的名称。如果 bigQueryChangelogTableFieldsToIgnore 流水线选项值中存在列名称,则系统不会填充该字段。
source_cluster STRING 可选:描述变更所来自的 Bigtable 集群的名称。如果 bigQueryChangelogTableFieldsToIgnore 流水线选项值中存在列名称,则系统不会填充该字段。
source_table STRING 可选:描述变更适用的 Bigtable 表的名称。如果多个 Bigtable 表将变更流式传输到同一个 BigQuery 表,则此列中的值可能很有用。如果 bigQueryChangelogTableFieldsToIgnore 流水线选项值中存在列名称,则系统不会填充该字段。
tiebreaker INT64 可选:当不同的 Bigtable 集群同时注册两项变更时,tiebreaker 值最高的变更会应用于源表。值较低的 tiebreaker 变更会被舍弃。如果 bigQueryChangelogTableFieldsToIgnore 流水线选项值中存在列名称,则系统不会填充该字段。
value STRINGBYTES 由变更设置的新值。当 writeValuesAsBytes 流水线选项设置为 true 时,列的类型必须是 BYTES。否则,请使用 STRING 类型。为 SET_CELL 变更设置该值。对于其他变更类型,该值设置为 NULL

流水线要求

  • 指定的 Bigtable 源实例。
  • 指定的 Bigtable 源表。该表必须启用变更数据流。
  • 指定的 Bigtable 应用配置文件。
  • 指定的 BigQuery 目标数据集。

模板参数

必需参数

  • bigQueryDataset:目标 BigQuery 表的数据集名称。
  • bigtableChangeStreamAppProfile:Bigtable 应用配置文件 ID。应用配置文件必须使用单集群路由并允许单行事务。
  • bigtableReadInstanceId:源 Bigtable 实例 ID。
  • bigtableReadTableId:源 Bigtable 表 ID。

可选参数

  • writeRowkeyAsBytes:是否将行键编写为 BigQuery BYTES。设置为 true 时,行键将写入 BYTES 列。否则,行键将写入 STRING 列。默认值为 false
  • writeValuesAsBytes:设置为 true 时,值会写入类型为 BYTES 的列;否则,值会写入类型为 STRING 的列。默认值为 false
  • writeNumericTimestamps:是否将 Bigtable 时间戳写为 BigQuery INT64。设置为 true 时,值会写入 INT64 列。否则,值会写入 TIMESTAMP 列。受影响的列:timestamptimestamp_fromtimestamp_to。默认值为 false。如果设置为 true,时间从 Unix 计时原点(从 UTC 1970 年 1 月 1 日起)开始计算。
  • bigQueryProjectId:BigQuery 数据集项目 ID。默认为 Dataflow 作业的项目。
  • bigQueryChangelogTableName:目标 BigQuery 表的名称。如果未指定,则系统会使用值 bigtableReadTableId + "_changelog"。默认值为空。
  • bigQueryChangelogTablePartitionGranularity:指定对更改日志表进行分区的粒度。设置后,表进行了分区。使用以下某个受支持的值:HOURDAYMONTHYEAR。默认情况下,该表未进行分区。
  • bigQueryChangelogTablePartitionExpirationMs:设置更改日志表分区到期时间(以毫秒为单位)。设置为 true 时,系统会删除存在时间超过指定毫秒数的分区。默认情况下,未设置到期时间。
  • bigQueryChangelogTableFieldsToIgnore:以英文逗号分隔的变更日志列(如果指定,则不会创建和填充)的列表。使用以下某个受支持的值:is_gcsource_instancesource_clustersource_tabletiebreakerbig_query_commit_timestamp。默认情况下,系统会填充所有列。
  • dlqDirectory:死信队列使用的目录。无法处理的记录存储在此目录中。默认值为 Dataflow 作业的临时位置下的目录。在大多数情况下,您可以使用默认路径。
  • bigtableChangeStreamMetadataInstanceId:Bigtable 变更数据流元数据实例 ID。默认值为空。
  • bigtableChangeStreamMetadataTableTableId:Bigtable 变更数据流连接器元数据表的 ID。如果未提供,系统会在流水线执行期间自动创建 Bigtable 变更数据流连接器元数据表。默认值为空。
  • bigtableChangeStreamCharset:Bigtable 变更数据流字符集名称。默认为 UTF-8。
  • bigtableChangeStreamStartTimestamp:用于读取变更数据流的起始时间戳 (https://tools.ietf.org/html/rfc3339)(含)。例如 2022-05-05T07:59:59Z。默认为流水线开始时间的时间戳。
  • bigtableChangeStreamIgnoreColumnFamilies:要忽略的列族名称更改的英文逗号分隔列表。默认值为空。
  • bigtableChangeStreamIgnoreColumns:要忽略的列名称更改的英文逗号分隔列表。默认值为空。
  • bigtableChangeStreamName:客户端流水线的唯一名称。允许您从之前运行的流水线停止的位置继续处理。默认为自动生成的名称。如需了解所用的值,请参阅 Dataflow 作业日志。
  • bigtableChangeStreamResume:设置为 true 时,新流水线将从具有相同 bigtableChangeStreamName 值的先前运行的流水线停止时的点开始处理。如果具有给定 bigtableChangeStreamName 值的流水线从未运行,则新流水线不会启动。设置为 false 时,新流水线会启动。如果给定来源已运行具有相同 bigtableChangeStreamName 值的流水线,则新流水线无法启动。默认值为 false
  • bigtableReadProjectId:Bigtable 项目 ID。默认为 Dataflow 作业的项目。

运行模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Bigtable change streams to BigQuery template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud dataflow flex-template run JOB_NAME \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_BigQuery \
    --parameters \
bigtableReadInstanceId=BIGTABLE_INSTANCE_ID,\
bigtableReadTableId=BIGTABLE_TABLE_ID,\
bigtableChangeStreamAppProfile=BIGTABLE_APPLICATION_PROFILE_ID,\
bigQueryDataset=BIGQUERY_DESTINATION_DATASET

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Google Cloud 项目的 ID
  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

  • REGION_NAME:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • BIGTABLE_INSTANCE_ID:您的 Bigtable 实例 ID。
  • BIGTABLE_TABLE_ID:您的 Bigtable 表 ID。
  • BIGTABLE_APPLICATION_PROFILE_ID:您的 Bigtable 应用配置文件 ID。
  • BIGQUERY_DESTINATION_DATASET:BigQuery 目标数据集名称

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
  "launch_parameter": {
    "jobName": "JOB_NAME",
    "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_BigQuery",
    "parameters": {
        "bigtableReadInstanceId": "BIGTABLE_INSTANCE_ID",
        "bigtableReadTableId": "BIGTABLE_TABLE_ID",
        "bigtableChangeStreamAppProfile": "BIGTABLE_APPLICATION_PROFILE_ID",
        "bigQueryDataset": "BIGQUERY_DESTINATION_DATASET"
    }
  }
}

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Google Cloud 项目的 ID
  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

  • LOCATION:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • BIGTABLE_INSTANCE_ID:您的 Bigtable 实例 ID。
  • BIGTABLE_TABLE_ID:您的 Bigtable 表 ID。
  • BIGTABLE_APPLICATION_PROFILE_ID:您的 Bigtable 应用配置文件 ID。
  • BIGQUERY_DESTINATION_DATASET:BigQuery 目标数据集名称

后续步骤