Bigtable change streams to BigQuery 模板是一种流处理流水线,可使用 Dataflow 流式传输 Bigtable 数据更改记录并将其写入 BigQuery 表。
Bigtable 变更数据流可让您按表订阅数据变更。订阅表变更数据流时,需要遵循以下限制条件:
- 仅返回修改后的单元格和删除操作的描述符。
- 仅返回修改后的单元格的新值。
将数据变更记录写入 BigQuery 时,与原始 Bigtable 提交时间戳排序相比,可能会无序插入行。
由于永久性错误而无法写入 BigQuery 的变更日志表行将被永久放入 Cloud Storage 中的死信队列(未处理的消息队列)目录,以供用户审核或进一步处理。
如果需要的 BigQuery 表不存在,流水线会创建该表。否则将使用现有 BigQuery 表。现有 BigQuery 表的架构必须包含下表中的列。
每个新的 BigQuery 行都包含变更数据流从 Bigtable 表中的对应行返回的数据变更记录。
BigQuery 输出表架构
列名 | 类型 | 是否可为 null | 说明 |
---|---|---|---|
row_key |
STRING 或 BYTES |
否 | 已变更行的行键。当 writeRowkeyAsBytes 流水线选项设置为 true 时,列的类型必须是 BYTES 。否则,请使用 STRING 类型。 |
mod_type |
STRING |
否 | 行变更的类型。请使用以下某个值:SET_CELL 、DELETE_CELLS 或 DELETE_FAMILY 。 |
column_family |
STRING |
否 | 受行变更影响的列族。 |
column |
STRING |
是 | 受行变更影响的列限定符。对于 DELETE_FAMILY 变更类型,请设置为 NULL 。 |
commit_timestamp |
TIMESTAMP |
否 | Bigtable 应用变更的时间。 |
big_query_commit_timestamp |
TIMESTAMP |
是 | 可选:指定 BigQuery 将行写入输出表的时间。如果 bigQueryChangelogTableFieldsToIgnore 流水线选项值中存在列名称,则系统不会填充该字段。 |
timestamp |
TIMESTAMP 或 INT64 |
是 | 受变更影响的单元的时间戳值。当 writeNumericTimestamps 流水线选项设置为 true 时,列的类型必须是 INT64 。否则,请使用 TIMESTAMP 类型。对于 DELETE_CELLS 和 DELETE_FAMILY 变更类型,请设置为 NULL 。 |
timestamp_from |
TIMESTAMP 或 INT64 |
是 | 描述 DELETE_CELLS 变更删除的所有单元的时间戳间隔(含边界值)开始。对于其他变更类型,请设置为 NULL 。 |
timestamp_to |
TIMESTAMP 或 INT64 |
是 | 描述 DELETE_CELLS 变更删除的所有单元的时间戳间隔的专有结尾。对于其他变更类型,请设置为 NULL 。 |
is_gc |
BOOL |
否 | 可选:当垃圾回收政策触发变更时,请设置为 true 。在所有其他情况下,请设置为 false 。如果 bigQueryChangelogTableFieldsToIgnore 流水线选项值中存在列名称,则系统不会填充该字段。 |
source_instance |
STRING |
否 | 可选:描述变更所来自的 Bigtable 实例的名称。如果 bigQueryChangelogTableFieldsToIgnore 流水线选项值中存在列名称,则系统不会填充该字段。 |
source_cluster |
STRING |
否 | 可选:描述变更所来自的 Bigtable 集群的名称。如果 bigQueryChangelogTableFieldsToIgnore 流水线选项值中存在列名称,则系统不会填充该字段。 |
source_table |
STRING |
否 | 可选:描述变更适用的 Bigtable 表的名称。如果多个 Bigtable 表将变更流式传输到同一个 BigQuery 表,则此列中的值可能很有用。如果 bigQueryChangelogTableFieldsToIgnore 流水线选项值中存在列名称,则系统不会填充该字段。 |
tiebreaker |
INT64 |
否 | 可选:当不同的 Bigtable 集群同时注册两项变更时,tiebreaker 值最高的变更会应用于源表。值较低的 tiebreaker 变更会被舍弃。如果 bigQueryChangelogTableFieldsToIgnore 流水线选项值中存在列名称,则系统不会填充该字段。 |
value |
STRING 或 BYTES |
是 | 由变更设置的新值。当 writeValuesAsBytes 流水线选项设置为 true 时,列的类型必须是 BYTES 。否则,请使用 STRING 类型。为 SET_CELL 变更设置该值。对于其他变更类型,该值设置为 NULL 。 |
流水线要求
- 指定的 Bigtable 源实例。
- 指定的 Bigtable 源表。该表必须启用变更数据流。
- 指定的 Bigtable 应用配置文件。
- 指定的 BigQuery 目标数据集。
模板参数
参数 | 说明 |
---|---|
bigtableReadInstanceId |
源 Bigtable 实例 ID。 |
bigtableReadTableId |
源 Bigtable 表 ID。 |
bigtableChangeStreamAppProfile |
Bigtable 应用配置文件 ID。应用配置文件必须使用单集群路由并允许单行事务。 |
bigQueryDataset |
目标 BigQuery 表的数据集名称。 |
writeNumericTimestamps |
可选:将 Bigtable 时间戳写为 BigQuery INT64 。设置为 true 时,值会写入 INT64 列。否则,值会写入 TIMESTAMP 列。受影响的列:timestamp 、timestamp_from 和 timestamp_to 。默认值为 false 。如果设置为 true ,时间从 Unix 计时原点(从 UTC 1970 年 1 月 1 日起)开始计算。 |
writeRowkeyAsBytes |
可选:将行键编写为 BigQuery BYTES 。设置为 true 时,行键将写入 BYTES 列。否则,行键将写入 STRING 列。默认值为 false 。 |
writeValuesAsBytes |
可选:将值写入为 BigQuery BYTES 。设置为 true 时,值会写入 BYTES 列。否则,值会写入 STRING 列。默认值为 false 。 |
bigQueryChangelogTableName |
可选:目标 BigQuery 表名称。如果未指定,则系统会使用值 bigtableReadTableId + "_changelog" |
bigQueryProjectId |
可选:BigQuery 数据集项目 ID。默认为 Dataflow 作业的项目。 |
bigtableReadProjectId |
可选:Bigtable 项目 ID。默认为 Dataflow 作业的项目。 |
bigtableChangeStreamMetadataInstanceId |
可选:Bigtable 变更数据流元数据实例 ID。 |
bigtableChangeStreamMetadataTableTableId |
可选:Bigtable 变更数据流元数据表 ID。 |
bigtableChangeStreamCharset |
可选:读取值和列限定符时的 Bigtable 变更数据流字符集名称。 |
bigtableChangeStreamStartTimestamp |
可选:要用于读取变更数据流的开始 timestamp(含边界值)。例如 2022-05-05T07:59:59Z 。默认为流水线开始时间的时间戳。 |
bigtableChangeStreamIgnoreColumnFamilies |
可选:要忽略的列族名称变更的英文逗号分隔列表。 |
bigtableChangeStreamIgnoreColumns |
可选:要忽略的列名称变更的英文逗号分隔列表。 |
bigtableChangeStreamName |
可选:客户端流水线的唯一名称。允许您从之前运行的流水线停止的位置继续处理。默认为自动生成的名称。如需了解所用的值,请参阅 Dataflow 作业日志。 |
bigtableChangeStreamResume |
可选:设置为 true 时,新流水线将从具有相同 bigtableChangeStreamName 值的先前运行的流水线停止时的点开始处理。如果具有给定 bigtableChangeStreamName 值的流水线从未运行,则新流水线不会启动。设置为 false 时,系统会启动新的流水线。如果给定来源已运行具有相同 bigtableChangeStreamName 值的流水线,则新流水线无法启动。默认值为 false 。 |
bigQueryChangelogTableFieldsToIgnore |
可选:以英文逗号分隔的变更日志列(如果指定,则不会创建和填充)的列表。使用以下某个受支持的值:is_gc 、source_instance 、source_cluster 、source_table 、tiebreaker 或 big_query_commit_timestamp 。默认情况下,系统会填充所有列。 |
bigQueryChangelogTablePartitionExpirationMs |
可选:设置更改日志表分区到期时间(以毫秒为单位)。设置为 true 时,系统会删除存在时间超过指定毫秒数的分区。默认情况下,未设置到期时间。 |
bigQueryChangelogTablePartitionGranularity |
可选:指定对更改日志表进行分区的粒度。设置后,表进行了分区。使用以下某个受支持的值:HOUR 、DAY 、MONTH 或 YEAR 。默认情况下,该表未进行分区。 |
dlqDirectory |
可选:死信队列的目录。无法处理的记录存储在此目录中。默认值为 Dataflow 作业的临时位置下的目录。在大多数情况下,您可以使用默认路径。 |
运行模板
控制台
- 转到 Dataflow 基于模板创建作业页面。 转到“基于模板创建作业”
- 在作业名称字段中,输入唯一的作业名称。
- 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域为
us-central1
。如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置。
- 从 Dataflow 模板下拉菜单中,选择 the Bigtable change streams to BigQuery template。
- 在提供的参数字段中,输入您的参数值。
- 点击运行作业。
gcloud
在 shell 或终端中,运行模板:
gcloud dataflow flex-template run JOB_NAME \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_BigQuery \ --parameters \ bigtableReadInstanceId=BIGTABLE_INSTANCE_ID,\ bigtableReadTableId=BIGTABLE_TABLE_ID,\ bigtableChangeStreamAppProfile=BIGTABLE_APPLICATION_PROFILE_ID,\ bigQueryDataset=BIGQUERY_DESTINATION_DATASET
请替换以下内容:
PROJECT_ID
:您要在其中运行 Dataflow 作业的 Google Cloud 项目的 IDJOB_NAME
:您选择的唯一性作业名称VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates-REGION_NAME/latest/) 中可用- 版本名称(如
2023-09-12-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates-REGION_NAME/) 中
REGION_NAME
:要在其中部署 Dataflow 作业的区域,例如us-central1
BIGTABLE_INSTANCE_ID
:您的 Bigtable 实例 ID。BIGTABLE_TABLE_ID
:您的 Bigtable 表 ID。BIGTABLE_APPLICATION_PROFILE_ID
:您的 Bigtable 应用配置文件 ID。BIGQUERY_DESTINATION_DATASET
:BigQuery 目标数据集名称
API
如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch
。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_BigQuery", "parameters": { "bigtableReadInstanceId": "BIGTABLE_INSTANCE_ID", "bigtableReadTableId": "BIGTABLE_TABLE_ID", "bigtableChangeStreamAppProfile": "BIGTABLE_APPLICATION_PROFILE_ID", "bigQueryDataset": "BIGQUERY_DESTINATION_DATASET" } } }
请替换以下内容:
PROJECT_ID
:您要在其中运行 Dataflow 作业的 Google Cloud 项目的 IDJOB_NAME
:您选择的唯一性作业名称VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates-REGION_NAME/latest/) 中可用- 版本名称(如
2023-09-12-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates-REGION_NAME/) 中
LOCATION
:要在其中部署 Dataflow 作业的区域,例如us-central1
BIGTABLE_INSTANCE_ID
:您的 Bigtable 实例 ID。BIGTABLE_TABLE_ID
:您的 Bigtable 表 ID。BIGTABLE_APPLICATION_PROFILE_ID
:您的 Bigtable 应用配置文件 ID。BIGQUERY_DESTINATION_DATASET
:BigQuery 目标数据集名称
后续步骤
- 了解 Dataflow 模板。
- 参阅 Google 提供的模板列表。