Bigtable 变更数据流至 Pub/Sub 模板

Bigtable 变更数据流至 Pub/Sub 模板是一种流处理流水线,用于流式传输 Bigtable 数据变更记录并使用 Dataflow 将其发布到 Pub/Sub 主题。

Bigtable 变更数据流可让您按表订阅数据变更。订阅表变更数据流时,需要遵循以下限制条件:

  • 仅返回修改后的单元格和删除操作的描述符。
  • 仅返回修改后的单元格的新值。

当数据变更记录发布到 Pub/Sub 主题时,与原始 Bigtable 提交时间戳排序相比,消息可能会无序插入。

无法发布到 Pub/Sub 主题的 Bigtable 数据变更记录暂时放置在 Cloud Storage 的死信队列(未处理的消息队列)目录中。在失败重试次数上限后,这些记录将无限期地放置在同一死信队列中,以供用户审核或进一步处理。

流水线要求目标 Pub/Sub 主题存在。目标主题可能配置为使用架构来验证消息。如果 Pub/Sub 主题指定架构,则仅当架构有效时,流水线才会启动。根据架构类型,请对目标主题使用以下架构定义之一:

Protocol Buffers

syntax = "proto2";

package com.google.cloud.teleport.bigtable;

option java_outer_classname = "ChangeLogEntryProto";

message ChangelogEntryProto{
  required bytes rowKey = 1;
  enum ModType {
    SET_CELL = 0;
    DELETE_FAMILY = 1;
    DELETE_CELLS = 2;
    UNKNOWN = 3;
  }
  required ModType modType = 2;
  required bool isGC = 3;
  required int32 tieBreaker = 4;
  required int64 commitTimestamp = 5;
  required string columnFamily = 6;
  optional bytes column = 7;
  optional int64 timestamp = 8;
  optional int64 timestampFrom = 9;
  optional int64 timestampTo = 10;
  optional bytes value = 11;
  required string sourceInstance = 12;
  required string sourceCluster = 13;
  required string sourceTable = 14;
}
  

Avro

{
    "name" : "ChangelogEntryMessage",
    "type" : "record",
    "namespace" : "com.google.cloud.teleport.bigtable",
    "fields" : [
      { "name" : "rowKey", "type" : "bytes"},
      {
        "name" : "modType",
        "type" : {
          "name": "ModType",
          "type": "enum",
          "symbols": ["SET_CELL", "DELETE_FAMILY", "DELETE_CELLS", "UNKNOWN"]}
      },
      { "name": "isGC", "type": "boolean" },
      { "name": "tieBreaker", "type": "int"},
      { "name": "columnFamily", "type": "string"},
      { "name": "commitTimestamp", "type" : "long"},
      { "name" : "sourceInstance", "type" : "string"},
      { "name" : "sourceCluster", "type" : "string"},
      { "name" : "sourceTable", "type" : "string"},
      { "name": "column", "type" : ["null", "bytes"]},
      { "name": "timestamp", "type" : ["null", "long"]},
      { "name": "timestampFrom", "type" : ["null", "long"]},
      { "name": "timestampTo", "type" : ["null", "long"]},
      { "name" : "value", "type" : ["null", "bytes"]}
   ]
}
    

JSON

使用以下 Protobuf 架构并采用 JSON 消息编码:

syntax = "proto2";

package com.google.cloud.teleport.bigtable;

option java_outer_classname = "ChangelogEntryMessageText";

message ChangelogEntryText{
  required string rowKey = 1;
  enum ModType {
    SET_CELL = 0;
    DELETE_FAMILY = 1;
    DELETE_CELLS = 2;
    UNKNOWN = 3;
  }
  required ModType modType = 2;
  required bool isGC = 3;
  required int32 tieBreaker = 4;
  required int64 commitTimestamp = 5;
  required string columnFamily = 6;
  optional string column = 7;
  optional int64 timestamp = 8;
  optional int64 timestampFrom = 9;
  optional int64 timestampTo = 10;
  optional string value = 11;
  required string sourceInstance = 12;
  required string sourceCluster = 13;
  required string sourceTable = 14;
}
    

每条新的 Pub/Sub 消息都包含变更数据流从 Bigtable 表中对应行返回的一条数据变更记录。

Pub/Sub 输出消息说明

字段名称 说明
rowKey 已变更行的行键。以字节数组的形式到达。配置 JSON 消息编码后,行键以字符串形式返回。指定 useBase64Rowkeys 时,行键采用 Base64 编码。否则,由 bigtableChangeStreamCharset 指定的字符集会用于将行键字节解码为字符串。
modType 行变更的类型。请使用以下某个值:SET_CELLDELETE_CELLSDELETE_FAMILY
columnFamily 受行变更影响的列族。
column 受行变更影响的列限定符。对于 DELETE_FAMILY 变更类型,未设置列字段。以字节数组的形式到达。配置 JSON 消息编码后,列将作为字符串返回。指定 useBase64ColumnQualifier 时,列字段采用 Base64 编码。否则,由 bigtableChangeStreamCharset 指定的字符集会用于将行键字节解码为字符串。
commitTimestamp Bigtable 应用变更的时间。 时间从 Unix 计时原点(从 UTC 1970 年 1 月 1 日起)开始计算。
timestamp 受变更影响的单元的时间戳值。对于 DELETE_CELLSDELETE_FAMILY 变更类型,未设置时间戳。时间从 Unix 计时原点(从 UTC 1970 年 1 月 1 日起)开始计算。
timestampFrom 描述 DELETE_CELLS 变更删除的所有单元的时间戳间隔(含边界值)开始。对于其他变更类型,未设置 timestampFrom。时间从 Unix 计时原点(从 UTC 1970 年 1 月 1 日起)开始计算。
timestampTo 描述 DELETE_CELLS 变更删除的所有单元的时间戳间隔的专有结尾。对于其他变更类型,未设置 timestampTo
isGC 一个布尔值,用于指示变更是否由 Bigtable 垃圾回收机制生成。
tieBreaker 当不同的 Bigtable 集群同时注册两项变更时,tiebreaker 值最高的变更会应用于源表。值较低的 tiebreaker 变更会被舍弃。
value 由变更设置的新值。除非设置了 stripValues 流水线选项,否则将为 SET_CELL 变更设置该值。对于其他变更类型,系统不会设置此值。以字节数组的形式到达。配置 JSON 消息编码后,值以字符串形式返回。指定 useBase64Values 时,值会采用 Base64 编码。否则,由 bigtableChangeStreamCharset 指定的字符集会用于将值字节解码为字符串。
sourceInstance 已注册变更项的 Bigtable 实例的名称。可能的情况是多个流水线将不同实例的变更流式传输到同一 Pub/Sub 主题。
sourceCluster 已注册变更项的 Bigtable 集群的名称。如果多个流水线将变更从不同的实例流式传输到同一 Pub/Sub 主题,则可使用此字段。
sourceTable 已接收变更项的 Bigtable 表的名称。如果多个流水线将变更从不同的表流式传输到同一 Pub/Sub 主题,则可使用此字段。

流水线要求

  • 指定的 Bigtable 源实例。
  • 指定的 Bigtable 源表。该表必须启用变更数据流。
  • 指定的 Bigtable 应用配置文件。
  • 指定的 Pub/Sub 主题必须存在。

模板参数

参数 说明
bigtableReadInstanceId 源 Bigtable 实例 ID。
bigtableReadTableId 源 Bigtable 表 ID。
bigtableChangeStreamAppProfile Bigtable 应用配置文件 ID。应用配置文件必须使用单集群路由并允许单行事务。
pubSubTopic 目标 Pub/Sub 主题的名称。
messageFormat 可选:目标主题配置了架构后,消息格式取决于已配置的架构和编码。要发布到 Pub/Sub 主题的消息的格式。支持的值:AVROPROTOCOL_BUFFERSJSON。默认值为 JSON。使用 JSON 格式时,消息的 rowKey、column 和 value 字段是字符串,其内容由 useBase64RowkeysuseBase64ColumnQualifiersuseBase64ValuesbigtableChangeStreamCharset 流水线选项决定。
messageEncoding 可选:目标主题配置了架构后,消息编码将由主题设置决定。需要发布到 Pub/Sub 主题的消息的编码。支持的值:BINARYJSONJSON。默认值为 JSON
stripValues 可选:设置为 true 时,将返回 SET_CELL 变更,而无需设置新值。默认值为 false。 如果您不需要存在新值(也称为缓存失效操作),或者值非常大并超出 Pub/Sub 消息大小限制,此参数非常有用。
bigtableReadProjectId 可选:Bigtable 项目 ID。默认为 Dataflow 作业的项目。
pubSubProjectId 可选:Bigtable 项目 ID。默认为 Dataflow 作业的项目。
bigtableChangeStreamMetadataInstanceId 可选:Bigtable 变更数据流元数据实例 ID。
bigtableChangeStreamMetadataTableTableId 可选:Bigtable 变更数据流元数据表 ID。
bigtableChangeStreamCharset 可选:读取 rowkey、value 和 column 限定符时的 Bigtable 变更数据流字符集名称。消息编码为 JSON 时使用此选项。
bigtableChangeStreamStartTimestamp 可选:要用于读取变更数据流的开始 timestamp(含边界值)。例如 2022-05-05T07:59:59Z。默认为流水线开始时间的时间戳。
bigtableChangeStreamIgnoreColumnFamilies 可选:要忽略的列族名称变更的英文逗号分隔列表。
bigtableChangeStreamIgnoreColumns 可选:要忽略的列名称变更的英文逗号分隔列表。
bigtableChangeStreamName 可选:客户端流水线的唯一名称。允许您从之前运行的流水线停止的位置继续处理。默认为自动生成的名称。如需查找该值,请参阅 Dataflow 作业日志。
bigtableChangeStreamResume 可选:设置为 true 时,新流水线将从具有相同 bigtableChangeStreamName 值的先前运行的流水线停止时的点开始处理。如果具有给定 bigtableChangeStreamName 值的流水线从未运行,则新流水线不会启动。设置为 false 时,系统会启动新的流水线。如果给定来源已运行具有相同 bigtableChangeStreamName 值的流水线,则新流水线无法启动。默认值为 false
useBase64Rowkeys 可选:与 JSON 消息编码搭配使用。如果设置为 true,则 rowKey 字段是 Base64 编码的字符串。否则,使用 bigtableChangeStreamCharset 将字节解码为字符串来生成 rowKey。默认值为 false
useBase64ColumnQualifiers 可选:与 JSON 消息编码搭配使用。如果设置为 true,则 column 字段是 Base64 编码的字符串。否则,使用 bigtableChangeStreamCharset 将字节解码为字符串来生成该列。默认值为 false
useBase64Values 可选:与 JSON 消息编码搭配使用。如果设置为 true,则 value 字段是 Base64 编码的字符串。否则,使用 bigtableChangeStreamCharset 将字节解码为字符串来生成该值。默认值为 false
dlqMaxRetries 可选:死信重试次数上限。默认值为 5
dlqRetryMinutes 可选:死信队列重试之间的分钟数。默认值为 10
dlqDirectory 可选:死信队列的目录。无法处理的记录存储在此目录中。默认值为 Dataflow 作业的临时位置下的目录。在大多数情况下,您可以使用默认路径。

运行模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认地区为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Bigtable change streams to Pub/Sub template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud dataflow flex-template run JOB_NAME \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_PubSub \
    --parameters \
bigtableReadInstanceId=BIGTABLE_INSTANCE_ID,\
bigtableReadTableId=BIGTABLE_TABLE_ID,\
bigtableChangeStreamAppProfile=BIGTABLE_APPLICATION_PROFILE_ID,\
pubSubTopic=PUBSUB_TOPIC

请替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Google Cloud 项目的 ID
  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

  • REGION_NAME:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • BIGTABLE_INSTANCE_ID:您的 Bigtable 实例 ID。
  • BIGTABLE_TABLE_ID:您的 Bigtable 表 ID。
  • BIGTABLE_APPLICATION_PROFILE_ID:您的 Bigtable 应用配置文件 ID。
  • PUBSUB_TOPIC:Pub/Sub 目标主题名称

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
  "launch_parameter": {
    "jobName": "JOB_NAME",
    "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_PubSub",
    "parameters": {
        "bigtableReadInstanceId": "BIGTABLE_INSTANCE_ID",
        "bigtableReadTableId": "BIGTABLE_TABLE_ID",
        "bigtableChangeStreamAppProfile": "BIGTABLE_APPLICATION_PROFILE_ID",
        "pubSubTopic": "PUBSUB_TOPIC"
    }
  }
}

请替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Google Cloud 项目的 ID
  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

  • LOCATION:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • BIGTABLE_INSTANCE_ID:您的 Bigtable 实例 ID。
  • BIGTABLE_TABLE_ID:您的 Bigtable 表 ID。
  • BIGTABLE_APPLICATION_PROFILE_ID:您的 Bigtable 应用配置文件 ID。
  • PUBSUB_TOPIC:Pub/Sub 目标主题名称

后续步骤