Bigtable 变更数据流至 Pub/Sub 模板是一种流处理流水线,用于流式传输 Bigtable 数据变更记录并使用 Dataflow 将其发布到 Pub/Sub 主题。
Bigtable 变更数据流可让您按表订阅数据变更。订阅表变更数据流时,需要遵循以下限制条件:
- 仅返回修改后的单元格和删除操作的描述符。
- 仅返回修改后的单元格的新值。
当数据变更记录发布到 Pub/Sub 主题时,与原始 Bigtable 提交时间戳排序相比,消息可能会无序插入。
无法发布到 Pub/Sub 主题的 Bigtable 数据变更记录暂时放置在 Cloud Storage 的死信队列(未处理的消息队列)目录中。在失败重试次数上限后,这些记录将无限期地放置在同一死信队列中,以供用户审核或进一步处理。
流水线要求目标 Pub/Sub 主题存在。目标主题可能配置为使用架构来验证消息。如果 Pub/Sub 主题指定架构,则仅当架构有效时,流水线才会启动。根据架构类型,请对目标主题使用以下架构定义之一:
Protocol Buffers
syntax = "proto2"; package com.google.cloud.teleport.bigtable; option java_outer_classname = "ChangeLogEntryProto"; message ChangelogEntryProto{ required bytes rowKey = 1; enum ModType { SET_CELL = 0; DELETE_FAMILY = 1; DELETE_CELLS = 2; UNKNOWN = 3; } required ModType modType = 2; required bool isGC = 3; required int32 tieBreaker = 4; required int64 commitTimestamp = 5; required string columnFamily = 6; optional bytes column = 7; optional int64 timestamp = 8; optional int64 timestampFrom = 9; optional int64 timestampTo = 10; optional bytes value = 11; required string sourceInstance = 12; required string sourceCluster = 13; required string sourceTable = 14; }
Avro
{ "name" : "ChangelogEntryMessage", "type" : "record", "namespace" : "com.google.cloud.teleport.bigtable", "fields" : [ { "name" : "rowKey", "type" : "bytes"}, { "name" : "modType", "type" : { "name": "ModType", "type": "enum", "symbols": ["SET_CELL", "DELETE_FAMILY", "DELETE_CELLS", "UNKNOWN"]} }, { "name": "isGC", "type": "boolean" }, { "name": "tieBreaker", "type": "int"}, { "name": "columnFamily", "type": "string"}, { "name": "commitTimestamp", "type" : "long"}, { "name" : "sourceInstance", "type" : "string"}, { "name" : "sourceCluster", "type" : "string"}, { "name" : "sourceTable", "type" : "string"}, { "name": "column", "type" : ["null", "bytes"]}, { "name": "timestamp", "type" : ["null", "long"]}, { "name": "timestampFrom", "type" : ["null", "long"]}, { "name": "timestampTo", "type" : ["null", "long"]}, { "name" : "value", "type" : ["null", "bytes"]} ] }
JSON
使用以下 Protobuf 架构并采用 JSON
消息编码:
syntax = "proto2"; package com.google.cloud.teleport.bigtable; option java_outer_classname = "ChangelogEntryMessageText"; message ChangelogEntryText{ required string rowKey = 1; enum ModType { SET_CELL = 0; DELETE_FAMILY = 1; DELETE_CELLS = 2; UNKNOWN = 3; } required ModType modType = 2; required bool isGC = 3; required int32 tieBreaker = 4; required int64 commitTimestamp = 5; required string columnFamily = 6; optional string column = 7; optional int64 timestamp = 8; optional int64 timestampFrom = 9; optional int64 timestampTo = 10; optional string value = 11; required string sourceInstance = 12; required string sourceCluster = 13; required string sourceTable = 14; }
每条新的 Pub/Sub 消息都包含变更数据流从 Bigtable 表中对应行返回的一条数据变更记录。
Pub/Sub 输出消息说明
字段名称 | 说明 |
---|---|
rowKey |
已变更行的行键。以字节数组的形式到达。配置 JSON 消息编码后,行键以字符串形式返回。指定 useBase64Rowkeys 时,行键采用 Base64 编码。否则,由 bigtableChangeStreamCharset 指定的字符集会用于将行键字节解码为字符串。 |
modType |
行变更的类型。请使用以下某个值:SET_CELL 、DELETE_CELLS 或 DELETE_FAMILY 。 |
columnFamily |
受行变更影响的列族。 |
column |
受行变更影响的列限定符。对于 DELETE_FAMILY 变更类型,未设置列字段。以字节数组的形式到达。配置 JSON 消息编码后,列将作为字符串返回。指定 useBase64ColumnQualifier 时,列字段采用 Base64 编码。否则,由 bigtableChangeStreamCharset 指定的字符集会用于将行键字节解码为字符串。 |
commitTimestamp |
Bigtable 应用变更的时间。 时间从 Unix 计时原点(从 UTC 1970 年 1 月 1 日起)开始计算。 |
timestamp |
受变更影响的单元的时间戳值。对于 DELETE_CELLS 和 DELETE_FAMILY 变更类型,未设置时间戳。时间从 Unix 计时原点(从 UTC 1970 年 1 月 1 日起)开始计算。 |
timestampFrom |
描述 DELETE_CELLS 变更删除的所有单元的时间戳间隔(含边界值)开始。对于其他变更类型,未设置 timestampFrom 。时间从 Unix 计时原点(从 UTC 1970 年 1 月 1 日起)开始计算。 |
timestampTo |
描述 DELETE_CELLS 变更删除的所有单元的时间戳间隔的专有结尾。对于其他变更类型,未设置 timestampTo 。 |
isGC |
一个布尔值,用于指示变更是否由 Bigtable 垃圾回收机制生成。 |
tieBreaker |
当不同的 Bigtable 集群同时注册两项变更时,tiebreaker 值最高的变更会应用于源表。值较低的 tiebreaker 变更会被舍弃。 |
value |
由变更设置的新值。除非设置了 stripValues 流水线选项,否则将为 SET_CELL 变更设置该值。对于其他变更类型,系统不会设置此值。以字节数组的形式到达。配置 JSON 消息编码后,值以字符串形式返回。指定 useBase64Values 时,值会采用 Base64 编码。否则,由 bigtableChangeStreamCharset 指定的字符集会用于将值字节解码为字符串。 |
sourceInstance |
已注册变更项的 Bigtable 实例的名称。可能的情况是多个流水线将不同实例的变更流式传输到同一 Pub/Sub 主题。 |
sourceCluster |
已注册变更项的 Bigtable 集群的名称。如果多个流水线将变更从不同的实例流式传输到同一 Pub/Sub 主题,则可使用此字段。 |
sourceTable |
已接收变更项的 Bigtable 表的名称。如果多个流水线将变更从不同的表流式传输到同一 Pub/Sub 主题,则可使用此字段。 |
流水线要求
- 指定的 Bigtable 源实例。
- 指定的 Bigtable 源表。该表必须启用变更数据流。
- 指定的 Bigtable 应用配置文件。
- 指定的 Pub/Sub 主题必须存在。
模板参数
参数 | 说明 |
---|---|
bigtableReadInstanceId |
源 Bigtable 实例 ID。 |
bigtableReadTableId |
源 Bigtable 表 ID。 |
bigtableChangeStreamAppProfile |
Bigtable 应用配置文件 ID。应用配置文件必须使用单集群路由并允许单行事务。 |
pubSubTopic |
目标 Pub/Sub 主题的名称。 |
messageFormat |
可选:目标主题配置了架构后,消息格式取决于已配置的架构和编码。要发布到 Pub/Sub 主题的消息的格式。支持的值:AVRO 、PROTOCOL_BUFFERS 和 JSON 。默认值为 JSON 。使用 JSON 格式时,消息的 rowKey、column 和 value 字段是字符串,其内容由 useBase64Rowkeys 、useBase64ColumnQualifiers 、useBase64Values 和 bigtableChangeStreamCharset 流水线选项决定。 |
messageEncoding |
可选:目标主题配置了架构后,消息编码将由主题设置决定。需要发布到 Pub/Sub 主题的消息的编码。支持的值:BINARY 、JSON 和 JSON 。默认值为 JSON 。 |
stripValues |
可选:设置为 true 时,将返回 SET_CELL 变更,而无需设置新值。默认值为 false。
如果您不需要存在新值(也称为缓存失效操作),或者值非常大并超出 Pub/Sub 消息大小限制,此参数非常有用。 |
bigtableReadProjectId |
可选:Bigtable 项目 ID。默认为 Dataflow 作业的项目。 |
pubSubProjectId |
可选:Bigtable 项目 ID。默认为 Dataflow 作业的项目。 |
bigtableChangeStreamMetadataInstanceId |
可选:Bigtable 变更数据流元数据实例 ID。 |
bigtableChangeStreamMetadataTableTableId |
可选:Bigtable 变更数据流元数据表 ID。 |
bigtableChangeStreamCharset |
可选:读取 rowkey、value 和 column 限定符时的 Bigtable 变更数据流字符集名称。消息编码为 JSON 时使用此选项。 |
bigtableChangeStreamStartTimestamp |
可选:要用于读取变更数据流的开始 timestamp(含边界值)。例如 2022-05-05T07:59:59Z 。默认为流水线开始时间的时间戳。 |
bigtableChangeStreamIgnoreColumnFamilies |
可选:要忽略的列族名称变更的英文逗号分隔列表。 |
bigtableChangeStreamIgnoreColumns |
可选:要忽略的列名称变更的英文逗号分隔列表。 |
bigtableChangeStreamName |
可选:客户端流水线的唯一名称。允许您从之前运行的流水线停止的位置继续处理。默认为自动生成的名称。如需查找该值,请参阅 Dataflow 作业日志。 |
bigtableChangeStreamResume |
可选:设置为 true 时,新流水线将从具有相同 bigtableChangeStreamName 值的先前运行的流水线停止时的点开始处理。如果具有给定 bigtableChangeStreamName 值的流水线从未运行,则新流水线不会启动。设置为 false 时,系统会启动新的流水线。如果给定来源已运行具有相同 bigtableChangeStreamName 值的流水线,则新流水线无法启动。默认值为 false 。 |
useBase64Rowkeys |
可选:与 JSON 消息编码搭配使用。如果设置为 true ,则 rowKey 字段是 Base64 编码的字符串。否则,使用 bigtableChangeStreamCharset 将字节解码为字符串来生成 rowKey 。默认值为 false 。 |
useBase64ColumnQualifiers |
可选:与 JSON 消息编码搭配使用。如果设置为 true ,则 column 字段是 Base64 编码的字符串。否则,使用 bigtableChangeStreamCharset 将字节解码为字符串来生成该列。默认值为 false 。 |
useBase64Values |
可选:与 JSON 消息编码搭配使用。如果设置为 true ,则 value 字段是 Base64 编码的字符串。否则,使用 bigtableChangeStreamCharset 将字节解码为字符串来生成该值。默认值为 false 。 |
dlqMaxRetries |
可选:死信重试次数上限。默认值为 5 。 |
dlqRetryMinutes |
可选:死信队列重试之间的分钟数。默认值为 10 。 |
dlqDirectory |
可选:死信队列的目录。无法处理的记录存储在此目录中。默认值为 Dataflow 作业的临时位置下的目录。在大多数情况下,您可以使用默认路径。 |
运行模板
控制台
- 转到 Dataflow 基于模板创建作业页面。 转到“基于模板创建作业”
- 在作业名称字段中,输入唯一的作业名称。
- 可选:对于区域性端点,从下拉菜单中选择一个值。默认地区为
us-central1
。如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置。
- 从 Dataflow 模板下拉菜单中,选择 the Bigtable change streams to Pub/Sub template。
- 在提供的参数字段中,输入您的参数值。
- 点击运行作业。
gcloud
在 shell 或终端中,运行模板:
gcloud dataflow flex-template run JOB_NAME \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_PubSub \ --parameters \ bigtableReadInstanceId=BIGTABLE_INSTANCE_ID,\ bigtableReadTableId=BIGTABLE_TABLE_ID,\ bigtableChangeStreamAppProfile=BIGTABLE_APPLICATION_PROFILE_ID,\ pubSubTopic=PUBSUB_TOPIC
请替换以下内容:
PROJECT_ID
:您要在其中运行 Dataflow 作业的 Google Cloud 项目的 IDJOB_NAME
:您选择的唯一性作业名称VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates-REGION_NAME/latest/) 中可用- 版本名称(如
2023-09-12-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates-REGION_NAME/) 中
REGION_NAME
:要在其中部署 Dataflow 作业的区域,例如us-central1
BIGTABLE_INSTANCE_ID
:您的 Bigtable 实例 ID。BIGTABLE_TABLE_ID
:您的 Bigtable 表 ID。BIGTABLE_APPLICATION_PROFILE_ID
:您的 Bigtable 应用配置文件 ID。PUBSUB_TOPIC
:Pub/Sub 目标主题名称
API
如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch
。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_PubSub", "parameters": { "bigtableReadInstanceId": "BIGTABLE_INSTANCE_ID", "bigtableReadTableId": "BIGTABLE_TABLE_ID", "bigtableChangeStreamAppProfile": "BIGTABLE_APPLICATION_PROFILE_ID", "pubSubTopic": "PUBSUB_TOPIC" } } }
请替换以下内容:
PROJECT_ID
:您要在其中运行 Dataflow 作业的 Google Cloud 项目的 IDJOB_NAME
:您选择的唯一性作业名称VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates-REGION_NAME/latest/) 中可用- 版本名称(如
2023-09-12-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates-REGION_NAME/) 中
LOCATION
:要在其中部署 Dataflow 作业的区域,例如us-central1
BIGTABLE_INSTANCE_ID
:您的 Bigtable 实例 ID。BIGTABLE_TABLE_ID
:您的 Bigtable 表 ID。BIGTABLE_APPLICATION_PROFILE_ID
:您的 Bigtable 应用配置文件 ID。PUBSUB_TOPIC
:Pub/Sub 目标主题名称
后续步骤
- 了解 Dataflow 模板。
- 参阅 Google 提供的模板列表。