使用分区 DML 为文本数据批量生成向量嵌入

本文档介绍如何使用 SQL 和 Vertex AI textembedding-gecko 模型为存储在 Spanner 中的文本数据(STRINGJSON)批量生成和回填矢量嵌入。

前提条件

您的 Spanner 数据库中必须具有一个包含文本数据(STRINGJSON)的表。如需详细了解如何导入数据,请参阅 Spanner 导入和导出概览

用例示例

假设您在 Spanner 中有一个具有以下架构的表。此表包含数百万条记录。

GoogleSQL

CREATE TABLE Products (
  product_id INT64 NOT NULL,
  name STRING(MAX),
  description STRING(MAX)
) PRIMARY KEY(product_id);

PostgreSQL

CREATE TABLE Products (
  product_id INT8 NOT NULL,
  name TEXT,
  description TEXT,
  PRIMARY KEY(product_id)
);

您的目标是为此表中的 description 列生成向量嵌入,以查找要向客户推荐的相似商品,从而使用向量搜索来改善他们的购物体验。

注册嵌入模型

GoogleSQL

向 Spanner 数据库中的 Vertex AI textembedding-gecko 端点注册嵌入模型:

CREATE MODEL MODEL_NAME
INPUT(
  content STRING(MAX)
)
OUTPUT(
  embeddings STRUCT<values ARRAY<FLOAT32>>
)
REMOTE OPTIONS(
    endpoint = '//aiplatform.googleapis.com/projects/PROJECT/locations/LOCATION/publishers/google/models/textembedding-gecko$MODEL_VERSION',
  default_batch_size = 5
)

替换以下内容:

  • MODEL_NAME:嵌入模型的名称
  • PROJECT:托管 Vertex AI 端点的项目
  • LOCATION:Vertex AI 端点的位置
  • MODEL_VERSIONtextembedding-gecko 嵌入模型的版本

PostgreSQL

在 PostgreSQL 方言中,无需注册模型。您可以直接将端点名称传递给 spanner.ML_PREDICT_ROW 函数调用。

有关最佳实践,请考虑以下事项:

  • 如需保持配额的隔离,请使用其他项目中的端点来生成和回填生产端点的嵌入。预留生产端点以处理生产流量。
  • 确保模型端点支持 default_batch_size 的值。您可以使用查询提示 @{remote_udf_max_rows_per_rpc=NEW_NUMBER} 替换 default_batch_size。如需了解每个区域的 default_batch_size 限制,请参阅获取一段文本的文本嵌入
  • 使用特定的模型版本(例如 @003)而不是 @latest 来定义端点。这是因为为同一段文本生成的嵌入向量可能因您使用的模型版本而异;这就是为什么您应该避免使用不同的模型版本在同一数据集中生成嵌入的原因。此外,更新模型定义语句中的模型版本不会更新已使用此模型生成的嵌入。管理嵌入的模型版本的一种方法是在表中额外创建一个列来存储模型版本。
  • GoogleSQL ML.PREDICT 和 PostgreSQL spanner.ML_PREDICT_ROW 函数不支持自定义调整后的 textembedding-gecko 模型。

测试嵌入模型的端到端集成

您可以执行查询以测试嵌入模型是否配置成功以及是否检索到嵌入。例如,运行以下查询:

GoogleSQL

SELECT embeddings.values
FROM SAFE.ML.PREDICT(
  MODEL MODEL_NAME,
  (SELECT description AS content FROM products LIMIT 10)
);

替换以下内容:

  • MODEL_NAME:嵌入模型的名称

PostgreSQL

SELECT spanner.ML_PREDICT_ROW(
    'projects/PROJECT/locations/LOCATION/publishers/google/models/textembedding-gecko$MODEL_VERSION',
    JSONB_BUILD_OBJECT('instances', JSONB_BUILD_ARRAY(JSONB_BUILD_OBJECT('content', description))))
FROM Products
LIMIT 10;

替换以下内容:

  • PROJECT:托管 Vertex AI 端点的项目
  • LOCATION:Vertex AI 端点的位置
  • MODEL_VERSIONtextembedding-gecko 嵌入模型的版本

更新源表以包含用于存储嵌入的其他列

接下来,更新源表架构以添加数据类型 ARRAY<FLOAT32> 的额外列,以存储生成的嵌入:

GoogleSQL

ALTER TABLE TABLE_NAME
ADD COLUMN EMBEDDING_COLUMN_NAME ARRAY<FLOAT32>;

替换以下内容:

  • TABLE_NAME:源表的名称
  • EMBEDDING_COLUMN_NAME:要在其中添加生成的嵌入的列的名称

PostgreSQL

ALTER TABLE TABLE_NAME
ADD COLUMN EMBEDDING_COLUMN_NAME real[];

替换以下内容:

  • TABLE_NAME:源表的名称
  • EMBEDDING_COLUMN_NAME:要在其中添加生成的嵌入的列的名称

例如,使用 products 表示例,运行以下命令:

GoogleSQL

ALTER TABLE Products
ADD COLUMN desc_embed ARRAY<FLOAT32>;

PostgreSQL

ALTER TABLE Products
ADD COLUMN desc_embed real[];

您可以再添加一列来管理嵌入模型的版本。

GoogleSQL

ALTER TABLE Products
ADD COLUMN desc_embed_model_version INT64;

PostgreSQL

ALTER TABLE Products
ADD COLUMN desc_embed_model_version INT8;

增加 Vertex AI 的配额

您可能需要在使用该模型的区域增加 textembedding-gecko 的 Vertex AI API 配额。如需申请增加配额,请参阅 Vertex AI 增加配额

如需了解详情,请参阅 Vertex AI 配额和限制

回填嵌入

最后,使用分区 DML 执行以下 UPDATE 语句,为文本数据列生成嵌入并将嵌入存储在数据库中。您可以将模型版本与嵌入一起存储。我们建议您在数据库流量较低的时段执行此查询。

GoogleSQL

UPDATE TABLE_NAME
SET 
  TABLE_NAME.EMBEDDING_COLUMN_NAME = (
    SELECT embeddings.values
    FROM SAFE.ML.PREDICT(
      MODEL MODEL_NAME,
      (SELECT TABLE_NAME.DATA_COLUMN_NAME AS content)
    ) @{remote_udf_max_rows_per_rpc=MAX_ROWS}
  ),
  TABLE_NAME.EMBEDDING_VERSION_COLUMN = MODEL_VERSION
WHERE FILTER_CONDITION;

替换以下内容:

  • TABLE_NAME:包含文本数据的表的名称
  • EMBEDDING_COLUMN_NAME:要在其中添加生成的嵌入的列的名称
  • DATA_COLUMN_NAME:包含文本数据的列的名称
  • MODEL_NAME:嵌入模型的名称
  • MAX_ROWS:每个 RPC 的最大行数
  • EMBEDDING_VERSION_COLUMN:用于管理用于回填嵌入的 textembedding-gecko 嵌入模型版本的列
  • MODEL_VERSIONtextembedding-gecko 嵌入模型的版本
  • FILTER_CONDITION:要应用的可分区过滤条件

使用 SAFE.ML.PREDICT 会针对失败的请求返回 NULL。您还可以将 SAFE.ML.PREDICTWHERE embedding_column IS NULL 过滤条件结合使用,以重新运行查询,而不计算已计算字段的嵌入。

PostgreSQL

UPDATE TABLE_NAME
SET 
  EMBEDDING_COLUMN_NAME = spanner.FLOAT32_ARRAY(spanner.ML_PREDICT_ROW(
    'projects/PROJECT/locations/LOCATION/publishers/google/models/textembedding-gecko$MODEL_VERSION', 
    JSONB_BUILD_OBJECT('instances', JSONB_BUILD_ARRAY(JSONB_BUILD_OBJECT('content', DATA_COLUMN_NAME)))
  ) /*@ remote_udf_max_rows_per_rpc=MAX_ROWS */ ->'predictions'->0->'embeddings'->'values'),
  EMBEDDING_VERSION_COLUMN = MODEL_VERSION
WHERE FILTER_CONDITION;

替换以下内容:

  • TABLE_NAME:包含文本数据的表的名称
  • EMBEDDING_COLUMN_NAME:要在其中添加生成的嵌入的列的名称
  • DATA_COLUMN_NAME:包含文本数据的列的名称
  • PROJECT:托管 Vertex AI 端点的项目
  • LOCATION:Vertex AI 端点的位置
  • MODEL_VERSIONtextembedding-gecko 嵌入模型的版本
  • MAX_ROWS:每个 RPC 的最大行数
  • EMBEDDING_VERSION_COLUMN:用于管理用于回填嵌入的 textembedding-gecko 嵌入模型版本的列
  • FILTER_CONDITION:要应用的可分区过滤条件

products 表的回填查询示例:

GoogleSQL

UPDATE products
SET
  products.desc_embed = (
    SELECT embeddings.values
    FROM SAFE.ML.PREDICT(
      MODEL gecko_model,
      (SELECT products.description AS content)
    ) @{remote_udf_max_rows_per_rpc=200}
  ),
  products.desc_embed_model_version = 3
WHERE products.desc_embed IS NULL;

PostgreSQL

UPDATE products
SET
  desc_embed = spanner.FLOAT32_ARRAY(spanner.ML_PREDICT_ROW(
    'projects/PROJECT/locations/LOCATION/publishers/google/models/textembedding-gecko@003', 
    JSONB_BUILD_OBJECT('instances', JSONB_BUILD_ARRAY(JSONB_BUILD_OBJECT('content', description)))
  ) /*@ remote_udf_max_rows_per_rpc=200 */ ->'predictions'->0->'embeddings'->'values'),
  desc_embed_model_version = 3
WHERE desc_embed IS NULL;

有关最佳实践,请考虑以下事项:

  • Spanner API 的默认 gRPC 超时为一小时。根据您回填的嵌入数量,您可能需要增加此超时,以确保 UPDATE 分区 DML 有足够的时间完成。如需了解详情,请参阅配置自定义超时和重试

性能和其他注意事项

在回填嵌入数据时,请考虑以下因素来优化性能。

节点数量

分区 DML 在不同分区上并行执行指定的 DML 语句。对于具有大量节点的实例,您可能会在执行分区 DML 期间观察到配额错误。如果 Vertex AI API 请求因 Vertex AI API 配额限制而受到限制,则 Spanner 会在分区 DML 事务模式下重试这些失败(最多 20 次)。如果您发现 Vertex AI 中的配额错误率较高,请增加 Vertex AI 的配额。您也可以在使用 GoogleSql 时通过语句级提示 @{pdml_max_parallelism=DESIRED_NUMBER} 调整并行性。以下示例将并行处理设置为“5”:

GoogleSQL

@{pdml_max_parallelism=5} UPDATE products
SET products.desc_embed =(
  SELECT embeddings.values
  FROM SAFE.ML.PREDICT(MODEL gecko_model, (
        SELECT products.value AS CONTENT
        )
  )
      @{remote_udf_max_rows_per_rpc=200}
),
products.desc_embed_model_version = 003
WHERE products.desc_embed IS NULL;

数据列中文字的大小

Vertex AI 嵌入模型对每个文本输入的词元数量上限设有限制。不同的模型版本具有不同的词元限制。每个 Vertex AI 请求可以有多个输入文本字段,但单个请求中存在的词元数量上限有限制。对于 GoogleSQL 数据库,如果遇到 INVALID_ARGUMENT 错误并收到“Request is too large”消息,请尝试减小批次大小以避免该错误。为此,您可以配置 default_batch_size,或在注册模型时使用 @{remote_udf_max_outstanding_rpcs} 查询提示。

发送到 Vertex AI 的 API 请求数量

您可以使用查询提示 @{remote_udf_max_outstanding_rpcs} 来增加或减少从 Spanner 发送到 Vertex AI 的请求数量。请注意,提高此限制可能会增加 Spanner 实例的 CPU 和内存使用量。对于 GoogleSQL 数据库,使用此查询提示会替换为您的模型配置的 default_batch_size

监控回填进度

您可以使用系统数据分析信息中心监控从 Spanner 发送到 Vertex AI 的请求数、延迟时间和网络字节数。

后续步骤