使用分区 DML 批量为文本数据生成向量嵌入

本文档介绍了如何使用 SQL 和 Vertex AI textembedding-gecko 模型为存储在 Spanner 中的文本数据(STRINGJSON)批量生成和回填矢量嵌入。

前提条件

您的 Spanner 数据库中必须有一个包含文本数据 (STRINGJSON) 的表。如需详细了解如何导入数据,请参阅 Spanner 导入和导出概览

用例示例

假设您在 Spanner 中有一个架构如下的表。此表包含数百万条记录。

GoogleSQL

CREATE TABLE Products (
  product_id INT64 NOT NULL,
  name STRING(MAX),
  description STRING(MAX)
) PRIMARY KEY(product_id);

PostgreSQL

CREATE TABLE Products (
  product_id INT8 NOT NULL,
  name TEXT,
  description TEXT,
  PRIMARY KEY(product_id)
);

您的目标是为此表中的 description 列生成向量嵌入,以便使用向量搜索功能查找类似商品以推荐给客户,从而改善他们的购物体验。

注册嵌入模型

GoogleSQL

在 Spanner 数据库中向 Vertex AI textembedding-gecko 端点注册嵌入模型:

CREATE MODEL MODEL_NAME
INPUT(
  content STRING(MAX)
)
OUTPUT(
  embeddings STRUCT<values ARRAY<FLOAT32>>
)
REMOTE OPTIONS(
    endpoint = '//aiplatform.googleapis.com/projects/PROJECT/locations/LOCATION/publishers/google/models/textembedding-gecko$MODEL_VERSION',
  default_batch_size = 5
)

请替换以下内容:

  • MODEL_NAME:嵌入模型的名称
  • PROJECT:托管 Vertex AI 端点的项目
  • LOCATION:Vertex AI 端点的位置
  • MODEL_VERSIONtextembedding-gecko 嵌入模型的版本

PostgreSQL

在 PostgreSQL 方言中,无需注册模型。您可以直接将端点名称传递给 spanner.ML_PREDICT_ROW 函数调用。

如需了解最佳实践,请考虑以下事项:

  • 为了保持配额隔离,请使用其他项目中的端点生成嵌入并将其回填到生产端点。预留生产端点以处理生产流量。
  • 确保模型端点支持 default_batch_size 的值。您可以使用查询提示 @{remote_udf_max_rows_per_rpc=NEW_NUMBER} 替换 default_batch_size。如需了解每个区域的 default_batch_size 限制,请参阅获取文本片段的文本嵌入
  • 使用特定模型版本(例如 @003)而不是 @latest 定义端点。这是因为,为同一文本生成的嵌入向量可能会因您使用的模型版本而异;因此,您应避免在同一数据集中使用不同的模型版本生成嵌入。此外,更新模型定义语句中的模型版本不会更新使用此模型生成的嵌入。如需管理嵌入的模型版本,一种方法是在存储模型版本的表中创建一个额外的列。
  • GoogleSQL ML.PREDICT 和 PostgreSQL spanner.ML_PREDICT_ROW 函数不支持自定义调整的 textembedding-gecko 模型。

测试嵌入模型的端到端集成

您可以执行查询,以测试嵌入模型是否已成功配置,以及是否检索到嵌入。例如,运行以下查询:

GoogleSQL

SELECT embeddings.values
FROM SAFE.ML.PREDICT(
  MODEL MODEL_NAME,
  (SELECT description AS content FROM products LIMIT 10)
);

请替换以下内容:

  • MODEL_NAME:嵌入模型的名称

PostgreSQL

SELECT spanner.ML_PREDICT_ROW(
    'projects/PROJECT/locations/LOCATION/publishers/google/models/textembedding-gecko$MODEL_VERSION',
    JSONB_BUILD_OBJECT('instances', JSONB_BUILD_ARRAY(JSONB_BUILD_OBJECT('content', description))))
FROM Products
LIMIT 10;

请替换以下内容:

  • PROJECT:托管 Vertex AI 端点的项目
  • LOCATION:Vertex AI 端点的位置
  • MODEL_VERSIONtextembedding-gecko 嵌入模型的版本

更新源表,添加用于存储嵌入的其他列

接下来,更新源表架构,以添加一个数据类型为 ARRAY<FLOAT32> 的额外列来存储生成的嵌入:

GoogleSQL

ALTER TABLE TABLE_NAME
ADD COLUMN EMBEDDING_COLUMN_NAME ARRAY<FLOAT32>;

请替换以下内容:

  • TABLE_NAME:来源表的名称
  • EMBEDDING_COLUMN_NAME:您要将生成的嵌入添加到的列的名称

PostgreSQL

ALTER TABLE TABLE_NAME
ADD COLUMN EMBEDDING_COLUMN_NAME real[];

请替换以下内容:

  • TABLE_NAME:来源表的名称
  • EMBEDDING_COLUMN_NAME:您要将生成的嵌入添加到的列的名称

例如,使用 products 表示例,请运行以下命令:

GoogleSQL

ALTER TABLE Products
ADD COLUMN desc_embed ARRAY<FLOAT32>;

PostgreSQL

ALTER TABLE Products
ADD COLUMN desc_embed real[];

您可以添加另一个列来管理嵌入模型的版本。

GoogleSQL

ALTER TABLE Products
ADD COLUMN desc_embed_model_version INT64;

PostgreSQL

ALTER TABLE Products
ADD COLUMN desc_embed_model_version INT8;

增加 Vertex AI 的配额

您可能需要在使用该模型的区域中增加 textembedding-gecko 的 Vertex AI API 配额。如需申请增加配额,请参阅 Vertex AI 增加配额

如需了解详情,请参阅 Vertex AI 配额和限制

回填嵌入

最后,使用分区 DML 执行以下 UPDATE 语句,为文本数据列生成嵌入,并将嵌入存储在数据库中。您可以将模型版本与嵌入存储在一起。我们建议您在数据库中流量较低时执行此查询。

GoogleSQL

UPDATE TABLE_NAME
SET 
  TABLE_NAME.EMBEDDING_COLUMN_NAME = (
    SELECT embeddings.values
    FROM SAFE.ML.PREDICT(
      MODEL MODEL_NAME,
      (SELECT TABLE_NAME.DATA_COLUMN_NAME AS content)
    ) @{remote_udf_max_rows_per_rpc=MAX_ROWS}
  ),
  TABLE_NAME.EMBEDDING_VERSION_COLUMN = MODEL_VERSION
WHERE FILTER_CONDITION;

请替换以下内容:

  • TABLE_NAME:包含文本数据的表的名称
  • EMBEDDING_COLUMN_NAME:您要将生成的嵌入添加到的列的名称
  • DATA_COLUMN_NAME:包含文本数据的列的名称
  • MODEL_NAME:嵌入模型的名称
  • MAX_ROWS:每个 RPC 的最大行数
  • EMBEDDING_VERSION_COLUMN:用于管理用于回填嵌入的 textembedding-gecko 嵌入模型版本的列
  • MODEL_VERSIONtextembedding-gecko 嵌入模型的版本
  • FILTER_CONDITION:您要应用的可分区过滤条件

使用 SAFE.ML.PREDICT 会针对失败的请求返回 NULL。您还可以将 SAFE.ML.PREDICTWHERE embedding_column IS NULL 过滤器结合使用,以重运行查询,而无需为已计算的字段计算嵌入。

PostgreSQL

UPDATE TABLE_NAME
SET 
  EMBEDDING_COLUMN_NAME = spanner.FLOAT32_ARRAY(spanner.ML_PREDICT_ROW(
    'projects/PROJECT/locations/LOCATION/publishers/google/models/textembedding-gecko$MODEL_VERSION', 
    JSONB_BUILD_OBJECT('instances', JSONB_BUILD_ARRAY(JSONB_BUILD_OBJECT('content', DATA_COLUMN_NAME)))
  ) /*@ remote_udf_max_rows_per_rpc=MAX_ROWS */ ->'predictions'->0->'embeddings'->'values'),
  EMBEDDING_VERSION_COLUMN = MODEL_VERSION
WHERE FILTER_CONDITION;

请替换以下内容:

  • TABLE_NAME:包含文本数据的表的名称
  • EMBEDDING_COLUMN_NAME:您要将生成的嵌入添加到的列的名称
  • DATA_COLUMN_NAME:包含文本数据的列的名称
  • PROJECT:托管 Vertex AI 端点的项目
  • LOCATION:Vertex AI 端点的位置
  • MODEL_VERSIONtextembedding-gecko 嵌入模型的版本
  • MAX_ROWS:每个 RPC 的最大行数
  • EMBEDDING_VERSION_COLUMN:用于管理用于回填嵌入的 textembedding-gecko 嵌入模型版本的列
  • FILTER_CONDITION:您要应用的可分区过滤条件

products 表的回填查询示例:

GoogleSQL

UPDATE products
SET
  products.desc_embed = (
    SELECT embeddings.values
    FROM SAFE.ML.PREDICT(
      MODEL gecko_model,
      (SELECT products.description AS content)
    ) @{remote_udf_max_rows_per_rpc=200}
  ),
  products.desc_embed_model_version = 3
WHERE products.desc_embed IS NULL;

PostgreSQL

UPDATE products
SET
  desc_embed = spanner.FLOAT32_ARRAY(spanner.ML_PREDICT_ROW(
    'projects/PROJECT/locations/LOCATION/publishers/google/models/textembedding-gecko@003', 
    JSONB_BUILD_OBJECT('instances', JSONB_BUILD_ARRAY(JSONB_BUILD_OBJECT('content', description)))
  ) /*@ remote_udf_max_rows_per_rpc=200 */ ->'predictions'->0->'embeddings'->'values'),
  desc_embed_model_version = 3
WHERE desc_embed IS NULL;

如需了解最佳实践,请考虑以下事项:

  • Spanner API 的默认 gRPC 超时时间为 1 小时。根据要回填的嵌入数量,您可能需要延长此超时设置,以确保 UPDATE 分区 DML 有足够的时间完成。如需了解详情,请参阅配置自定义超时和重试

性能和其他注意事项

在回填嵌入数据时,请考虑以下事项以优化性能。

节点数

分区 DML 会并行在不同的分区上执行给定的 DML 语句。对于节点数量较多的实例,您可能会在执行分区 DML 期间观察到配额错误。如果 Vertex AI API 请求因 Vertex AI API 配额限制而被节流,Spanner 会在分区 DML 事务模式下重试这些失败的请求,最多重试 20 次。如果您发现 Vertex AI 中的配额错误率较高,请增加 Vertex AI 的配额。在使用 GoogleSQL 时,您还可以使用语句级提示 @{pdml_max_parallelism=DESIRED_NUMBER} 来调整并行度。以下示例将并行性设置为“5”:

GoogleSQL

@{pdml_max_parallelism=5} UPDATE products
SET products.desc_embed =(
  SELECT embeddings.values
  FROM SAFE.ML.PREDICT(MODEL gecko_model, (
        SELECT products.value AS CONTENT
        )
  )
      @{remote_udf_max_rows_per_rpc=200}
),
products.desc_embed_model_version = 003
WHERE products.desc_embed IS NULL;

数据列中文本的大小

Vertex AI 嵌入模型对每个文本输入的令牌数量上限有限制。不同的模型版本具有不同的令牌限制。每个 Vertex AI 请求可以包含多个输入文本字段,但单个请求中存在的令牌数量上限有限。对于 GoogleSQL 数据库,如果您遇到显示“请求过大”消息的 INVALID_ARGUMENT 错误,请尝试减小批次大小以避免出现此错误。为此,您可以在注册模型时配置 default_batch_size 或使用 @{remote_udf_max_outstanding_rpcs} 查询提示。

发送到 Vertex AI 的 API 请求数

您可以使用查询提示 @{remote_udf_max_outstanding_rpcs} 来增加或减少从 Spanner 发送到 Vertex AI 的请求数量。请注意,提高此限制可能会增加 Spanner 实例的 CPU 和内存用量。对于 GoogleSQL 数据库,使用此查询提示会替换为模型配置的 default_batch_size

监控回填进度

您可以使用系统数据分析信息中心监控从 Spanner 发送到 Vertex AI 的请求数、延迟时间和网络字节数。

后续步骤