Bigtable change streams to Vector Search 模板

此模板会创建一个流处理流水线,以使用 Dataflow Runner V2 流式传输 Bigtable 数据更改记录并将其写入 Vertex AI Vector Search。

流水线要求

  • Bigtable 源实例必须存在。
  • Bigtable 源表必须存在,并且该表必须启用变更数据流。
  • Bigtable 应用配置文件必须存在。
  • Vector Search 索引路径必须存在。

模板参数

参数 说明
embeddingColumn 在其中存储嵌入的完全限定列名。格式为 cf:col。
embeddingByteSize 嵌入数组中每个条目的字节大小。对于浮点数,请使用 4;对于双精度数,请使用 8。默认值为 4
vectorSearchIndex 要将更改流式传输到的向量搜索索引,格式为“projects/{projectID}/locations/区域/indexes/{indexID}”(不带前导或尾随空格)。例如:projects/123/locations/us-east1/indexes/456
bigtableChangeStreamAppProfile 用于区分 Bigtable 中工作负载的应用配置文件。
bigtableReadInstanceId 表所属的 Bigtable 实例的 ID。
bigtableReadTableId 要从中读取数据的 Bigtable 表。
bigtableMetadataTableTableId 可选:所创建元数据表的 ID。如果未设置,Bigtable 会生成一个 ID。
crowdingTagColumn 可选:在其中存储拥挤标记的完全限定列名,格式为 cf:col
allowRestrictsMappings 可选:要用作 allow 限制的列的以逗号分隔的完全限定列名称及其别名。每个列名称都必须采用 cf:col->alias 格式。
denyRestrictsMappings 可选:要用作 deny 限制的列的以逗号分隔的完全限定列名称及其别名。每个列名称都必须采用 cf:col->alias 格式。
intNumericRestrictsMappings 可选:要用作整数 numeric_restricts 的列的以逗号分隔完全限定列名及其别名。每个列名称都必须采用 cf:col->alias 格式。
floatNumericRestrictsMappings 可选:要用作浮点数(4 字节)numeric_restricts 的列的以逗号分隔完全限定列名及其别名。每个列名称都必须采用 cf:col->alias 格式
doubleNumericRestrictsMappings 可选:要用作双精度(8 字节)numeric_restricts 的列的英文逗号分隔的完全限定列名称及其别名。每个列名称都必须采用 cf:col->alias 格式
upsertMaxBatchSize 可选:在将批次更新/插入向量搜索索引之前要缓冲的更新/插入数量上限。当有 upsertBatchSize 条记录准备就绪时,系统会发送批量数据。示例:10
upsertMaxBufferDuration 可选:在向向量搜索发送批量更新/插入之前的最大延迟时间。当有 upsertBatchSize 条记录准备就绪时,系统会发送批量数据。允许的格式如下:Ns(表示秒,例如:5s)、Nm(表示分钟,例如 12m)、Nh(表示小时,例如 2h)。默认值:10s
deleteMaxBatchSize 可选:从向量搜索索引中删除批次之前要缓冲的最大删除次数。当有 deleteBatchSize 条记录准备就绪时,系统会发送批量数据。例如:10
deleteMaxBufferDuration 可选:在向向量搜索发送批量删除操作之前的最大延迟时间。当有 deleteBatchSize 条记录准备就绪时,系统会发送批量数据。允许的格式如下:Ns(表示秒,例如:5s)、Nm(表示分钟,例如 12m)、Nh(表示小时,例如 2h)。默认值:10s
dlqDirectory 可选:用于存储未处理记录以及无法处理原因的路径。默认值为 Dataflow 作业的临时位置下的目录。默认值适用于大多数场景。
bigtableChangeStreamMetadataInstanceId 可选:要用于变更数据流连接器元数据表的 Bigtable 实例。默认值为空。
bigtableChangeStreamMetadataTableTableId 可选:要使用的 Bigtable 变更数据流连接器元数据表 ID。如果未提供,系统会在流水线流期间自动创建 Bigtable 变更数据流连接器元数据表。默认值为空。
bigtableChangeStreamCharset 可选:读取值和列限定符时的 Bigtable 变更数据流字符集名称。默认值为 UTF-8。
bigtableChangeStreamStartTimestamp 可选:要用于读取变更数据流的起始日期时间(含边界值)(https://tools.ietf.org/html/rfc3339)。例如,2022-05-05T07:59:59Z。默认为流水线启动时的时间戳。
bigtableChangeStreamIgnoreColumnFamilies 可选:要忽略的列族名称变更的英文逗号分隔列表。默认值为空。
bigtableChangeStreamIgnoreColumns 可选:要忽略的列名称变更的英文逗号分隔列表。默认值为空。
bigtableChangeStreamName 可选:客户端流水线的唯一名称。此参数允许您从先前运行的流水线停止的位置继续处理。默认为自动生成的名称。如需了解所用的值,请参阅 Dataflow 作业日志。
bigtableChangeStreamResume

可选:设置为 true 时,新流水线将从具有相同名称的先前运行的流水线停止时的点开始处理。如果过去从未运行过同名流水线,则新流水线将无法启动。使用 bigtableChangeStreamName 参数指定流水线。

设置为 false 时,系统会启动新流水线。如果给定来源之前已运行与 bigtableChangeStreamName 同名的流水线,则新流水线将无法启动。

默认值为 false。

bigtableReadProjectId 可选:要从中读取 Bigtable 数据的项目。此参数的默认项目是 Dataflow 流水线在其中运行的项目。

运行模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Bigtable Change Streams to Vector Search template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud CLI

在 shell 或终端中,运行模板:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_Vector_Search \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --parameters \
       embeddingColumn=EMBEDDING_COLUMN,\
       embeddingByteSize=EMBEDDING_BYTE_SIZE,\
       vectorSearchIndex=VECTOR_SEARCH_INDEX,\
       bigtableChangeStreamAppProfile=BIGTABLE_CHANGE_STREAM_APP_PROFILE,\
       bigtableReadInstanceId=BIGTABLE_READ_INSTANCE_ID,\
       bigtableReadTableId=BIGTABLE_READ_TABLE_ID,\

替换以下内容:

  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

  • REGION_NAME:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • EMBEDDING_COLUMN:嵌入列
  • EMBEDDING_BYTE_SIZE:嵌入数组的字节大小。可以是 4 或 8。
  • VECTOR_SEARCH_INDEX:向量搜索索引路径
  • BIGTABLE_CHANGE_STREAM_APP_PROFILE:Bigtable 应用配置文件 ID
  • BIGTABLE_READ_INSTANCE_ID:源 Bigtable 实例 ID
  • BIGTABLE_READ_TABLE_ID:源 Bigtable 表 ID

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launchParameter": {
     "jobName": "JOB_NAME",
     "parameters": {
       "embeddingColumn": "EMBEDDING_COLUMN",
       "embeddingByteSize": "EMBEDDING_BYTE_SIZE",
       "vectorSearchIndex": "VECTOR_SEARCH_INDEX",
       "bigtableChangeStreamAppProfile": "BIGTABLE_CHANGE_STREAM_APP_PROFILE",
       "bigtableReadInstanceId": "BIGTABLE_READ_INSTANCE_ID",
       "bigtableReadTableId": "BIGTABLE_READ_TABLE_ID",
     },
     "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Bigtable_Change_Streams_to_Vector_Search",
     "environment": { "maxWorkers": "10" }
  }
}

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Google Cloud 项目的 ID
  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

  • LOCATION:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • EMBEDDING_COLUMN:嵌入列
  • EMBEDDING_BYTE_SIZE:嵌入数组的字节大小。可以是 4 或 8。
  • VECTOR_SEARCH_INDEX:向量搜索索引路径
  • BIGTABLE_CHANGE_STREAM_APP_PROFILE:Bigtable 应用配置文件 ID
  • BIGTABLE_READ_INSTANCE_ID:源 Bigtable 实例 ID
  • BIGTABLE_READ_TABLE_ID:源 Bigtable 表 ID