Bigtable change streams to BigQuery 模板是一种流处理流水线,可使用 Dataflow 流式传输 Bigtable 数据更改记录并将其写入 BigQuery 表。
Bigtable 变更数据流可让您按表订阅数据变更。订阅表变更数据流时,需要遵循以下限制条件:
- 仅返回修改后的单元格和删除操作的描述符。
- 仅返回修改后的单元格的新值。
将数据变更记录写入 BigQuery 时,与原始 Bigtable 提交时间戳排序相比,可能会无序插入行。
由于永久性错误而无法写入 BigQuery 的变更日志表行将被永久放入 Cloud Storage 中的死信队列(未处理的消息队列)目录,以供用户审核或进一步处理。
如果需要的 BigQuery 表不存在,流水线会创建该表。否则将使用现有 BigQuery 表。现有 BigQuery 表的架构必须包含下表中的列。
每个新的 BigQuery 行都包含变更数据流从 Bigtable 表中的对应行返回的数据变更记录。
BigQuery 输出表架构
列名 | 类型 | 可以为 Null | 说明 |
---|---|---|---|
row_key |
STRING 或 BYTES |
否 | 已变更行的行键。当 writeRowkeyAsBytes 流水线选项设置为 true 时,列的类型必须是 BYTES 。否则,请使用 STRING 类型。 |
mod_type |
STRING |
否 | 行变更的类型。请使用以下某个值:SET_CELL 、DELETE_CELLS 或 DELETE_FAMILY 。 |
column_family |
STRING |
否 | 受行变更影响的列族。 |
column |
STRING |
是 | 受行变更影响的列限定符。对于 DELETE_FAMILY 变更类型,请设置为 NULL 。 |
commit_timestamp |
TIMESTAMP |
否 | Bigtable 应用变更的时间。 |
big_query_commit_timestamp |
TIMESTAMP |
是 | 可选:指定 BigQuery 将行写入输出表的时间。如果 bigQueryChangelogTableFieldsToIgnore 流水线选项值中存在列名称,则系统不会填充该字段。 |
timestamp |
TIMESTAMP 或 INT64 |
是 | 受变更影响的单元的时间戳值。当 writeNumericTimestamps 流水线选项设置为 true 时,列的类型必须是 INT64 。否则,请使用 TIMESTAMP 类型。对于 DELETE_CELLS 和 DELETE_FAMILY 变更类型,请设置为 NULL 。 |
timestamp_from |
TIMESTAMP 或 INT64 |
是 | 描述 DELETE_CELLS 变更删除的所有单元的时间戳间隔(含边界值)开始。对于其他变更类型,请设置为 NULL 。 |
timestamp_to |
TIMESTAMP 或 INT64 |
是 | 描述 DELETE_CELLS 变更删除的所有单元的时间戳间隔的专有结尾。对于其他变更类型,请设置为 NULL 。 |
is_gc |
BOOL |
否 | 可选:当垃圾回收政策触发变更时,请设置为 true 。在所有其他情况下,请设置为 false 。如果 bigQueryChangelogTableFieldsToIgnore 流水线选项值中存在列名称,则系统不会填充该字段。 |
source_instance |
STRING |
否 | 可选:描述变更所来自的 Bigtable 实例的名称。如果 bigQueryChangelogTableFieldsToIgnore 流水线选项值中存在列名称,则系统不会填充该字段。 |
source_cluster |
STRING |
否 | 可选:描述变更所来自的 Bigtable 集群的名称。如果 bigQueryChangelogTableFieldsToIgnore 流水线选项值中存在列名称,则系统不会填充该字段。 |
source_table |
STRING |
否 | 可选:描述变更适用的 Bigtable 表的名称。如果多个 Bigtable 表将变更流式传输到同一个 BigQuery 表,则此列中的值可能很有用。如果 bigQueryChangelogTableFieldsToIgnore 流水线选项值中存在列名称,则系统不会填充该字段。 |
tiebreaker |
INT64 |
否 | 可选:当不同的 Bigtable 集群同时注册两项变更时,tiebreaker 值最高的变更会应用于源表。值较低的 tiebreaker 变更会被舍弃。如果 bigQueryChangelogTableFieldsToIgnore 流水线选项值中存在列名称,则系统不会填充该字段。 |
value |
STRING 或 BYTES |
是 | 由变更设置的新值。当 writeValuesAsBytes 流水线选项设置为 true 时,列的类型必须是 BYTES 。否则,请使用 STRING 类型。为 SET_CELL 变更设置该值。对于其他变更类型,该值设置为 NULL 。 |
流水线要求
- 指定的 Bigtable 源实例。
- 指定的 Bigtable 源表。该表必须启用变更数据流。
- 指定的 Bigtable 应用配置文件。
- 指定的 BigQuery 目标数据集。
模板参数
必需参数
- bigQueryDataset:目标 BigQuery 表的数据集名称。
- bigtableChangeStreamAppProfile:Bigtable 应用配置文件 ID。应用配置文件必须使用单集群路由并允许单行事务。
- bigtableReadInstanceId:源 Bigtable 实例 ID。
- bigtableReadTableId:源 Bigtable 表 ID。
可选参数
- writeRowkeyAsBytes:是否将行键编写为 BigQuery
BYTES
。设置为true
时,行键将写入BYTES
列。否则,行键将写入STRING
列。默认值为false
。 - writeValuesAsBytes:设置为
true
时,值会写入类型为 BYTES 的列;否则,值会写入类型为 STRING 的列。默认值为false
。 - writeNumericTimestamps:是否将 Bigtable 时间戳写为 BigQuery INT64。设置为
true
时,值会写入 INT64 列。否则,值会写入TIMESTAMP
列。受影响的列:timestamp
、timestamp_from
和timestamp_to
。默认值为false
。如果设置为true
,时间从 Unix 计时原点(从 UTC 1970 年 1 月 1 日起)开始计算。 - bigQueryProjectId:BigQuery 数据集项目 ID。默认为 Dataflow 作业的项目。
- bigQueryChangelogTableName:目标 BigQuery 表的名称。如果未指定,则系统会使用值
bigtableReadTableId + "_changelog"
。默认值为空。 - bigQueryChangelogTablePartitionGranularity:指定对更改日志表进行分区的粒度。设置后,表进行了分区。使用以下某个受支持的值:
HOUR
、DAY
、MONTH
或YEAR
。默认情况下,该表未进行分区。 - bigQueryChangelogTablePartitionExpirationMs:设置更改日志表分区到期时间(以毫秒为单位)。设置为
true
时,系统会删除存在时间超过指定毫秒数的分区。默认情况下,未设置到期时间。 - bigQueryChangelogTableFieldsToIgnore:以英文逗号分隔的变更日志列(如果指定,则不会创建和填充)的列表。使用以下某个受支持的值:
is_gc
、source_instance
、source_cluster
、source_table
、tiebreaker
或big_query_commit_timestamp
。默认情况下,系统会填充所有列。 - dlqDirectory:死信队列使用的目录。无法处理的记录存储在此目录中。默认值为 Dataflow 作业的临时位置下的目录。在大多数情况下,您可以使用默认路径。
- bigtableChangeStreamMetadataInstanceId:Bigtable 变更数据流元数据实例 ID。默认值为空。
- bigtableChangeStreamMetadataTableTableId:Bigtable 变更数据流连接器元数据表的 ID。如果未提供,系统会在流水线执行期间自动创建 Bigtable 变更数据流连接器元数据表。默认值为空。
- bigtableChangeStreamCharset:Bigtable 变更数据流字符集名称。默认为 UTF-8。
- bigtableChangeStreamStartTimestamp:用于读取变更数据流的起始时间戳 (https://tools.ietf.org/html/rfc3339)(含)。例如
2022-05-05T07:59:59Z
。默认为流水线开始时间的时间戳。 - bigtableChangeStreamIgnoreColumnFamilies:要忽略的列族名称更改的英文逗号分隔列表。默认值为空。
- bigtableChangeStreamIgnoreColumns:要忽略的列名称更改的英文逗号分隔列表。默认值为空。
- bigtableChangeStreamName:客户端流水线的唯一名称。允许您从之前运行的流水线停止的位置继续处理。默认为自动生成的名称。如需了解所用的值,请参阅 Dataflow 作业日志。
- bigtableChangeStreamResume:设置为
true
时,新流水线将从具有相同bigtableChangeStreamName
值的先前运行的流水线停止时的点开始处理。如果具有给定bigtableChangeStreamName
值的流水线从未运行,则新流水线不会启动。设置为false
时,新流水线会启动。如果给定来源已运行具有相同bigtableChangeStreamName
值的流水线,则新流水线无法启动。默认值为false
。 - bigtableReadProjectId:Bigtable 项目 ID。默认为 Dataflow 作业的项目。
运行模板
控制台
- 转到 Dataflow 基于模板创建作业页面。 转到“基于模板创建作业”
- 在作业名称字段中,输入唯一的作业名称。
- 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域为
us-central1
。如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置。
- 从 Dataflow 模板下拉菜单中,选择 the Bigtable change streams to BigQuery template。
- 在提供的参数字段中,输入您的参数值。
- 点击运行作业。
gcloud
在 shell 或终端中,运行模板:
gcloud dataflow flex-template run JOB_NAME \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_BigQuery \ --parameters \ bigtableReadInstanceId=BIGTABLE_INSTANCE_ID,\ bigtableReadTableId=BIGTABLE_TABLE_ID,\ bigtableChangeStreamAppProfile=BIGTABLE_APPLICATION_PROFILE_ID,\ bigQueryDataset=BIGQUERY_DESTINATION_DATASET
替换以下内容:
PROJECT_ID
:您要在其中运行 Dataflow 作业的 Google Cloud 项目的 IDJOB_NAME
:您选择的唯一性作业名称VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates-REGION_NAME/latest/) 中可用- 版本名称(如
2023-09-12-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates-REGION_NAME/) 中
REGION_NAME
:要在其中部署 Dataflow 作业的区域,例如us-central1
BIGTABLE_INSTANCE_ID
:您的 Bigtable 实例 ID。BIGTABLE_TABLE_ID
:您的 Bigtable 表 ID。BIGTABLE_APPLICATION_PROFILE_ID
:您的 Bigtable 应用配置文件 ID。BIGQUERY_DESTINATION_DATASET
:BigQuery 目标数据集名称
API
如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch
。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_BigQuery", "parameters": { "bigtableReadInstanceId": "BIGTABLE_INSTANCE_ID", "bigtableReadTableId": "BIGTABLE_TABLE_ID", "bigtableChangeStreamAppProfile": "BIGTABLE_APPLICATION_PROFILE_ID", "bigQueryDataset": "BIGQUERY_DESTINATION_DATASET" } } }
替换以下内容:
PROJECT_ID
:您要在其中运行 Dataflow 作业的 Google Cloud 项目的 IDJOB_NAME
:您选择的唯一性作业名称VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates-REGION_NAME/latest/) 中可用- 版本名称(如
2023-09-12-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates-REGION_NAME/) 中
LOCATION
:要在其中部署 Dataflow 作业的区域,例如us-central1
BIGTABLE_INSTANCE_ID
:您的 Bigtable 实例 ID。BIGTABLE_TABLE_ID
:您的 Bigtable 表 ID。BIGTABLE_APPLICATION_PROFILE_ID
:您的 Bigtable 应用配置文件 ID。BIGQUERY_DESTINATION_DATASET
:BigQuery 目标数据集名称
后续步骤
- 了解 Dataflow 模板。
- 参阅 Google 提供的模板列表。