使用 Dataflow 为 AlloyDB 构建实时向量嵌入流水线

本文档介绍了如何使用 Dataflow 创建 AlloyDB for PostgreSQL 提取、转换、加载 (ETL) 流水线。 Google Cloud Dataflow 是一项全托管式 Google Cloud 服务,用于开发和运行数据处理流水线。

您可以按照本文档中的说明操作,这些说明基于使用 Apache Beam 和 AlloyDB 提取向量嵌入 Colab,该 Colab 使用 Python 创建 basic_ingestion_pipeline.py 提取流水线。您可以在语义搜索或检索增强生成 (RAG) 等一些应用场景中应用本文档中的信息。

这些说明介绍了以下 Dataflow 流水线组件:

  • 设置 AlloyDB 和 Dataflow 连接
  • 使用 Apache Beam VertexAITextEmbeddings 处理程序和 Vertex AI 文本嵌入模型在 AlloyDB for PostgreSQL 中生成嵌入
  • 在 Dataflow 中创建流式流水线

准备工作

在使用 Colab 创建 Dataflow 流水线之前,先完成以下前提条件:

设置 AlloyDB for PostgreSQL 实例和流水线组件

首先,配置流水线以连接到 AlloyDB for PostgreSQL 实例。此配置包括定义 Google Cloud 项目 ID、AlloyDB for PostgreSQL 实例 URI、用户和密码,以使用 AlloyDB 语言连接器进行连接。如需详细了解如何设置连接,请参阅数据库设置

特定于检索增强生成 (RAG) 的 Apache Beam 模块提供用于执行以下任务的类:

  • 从 AlloyDB for PostgreSQL 中提取数据
  • 生成嵌入
  • 将这些向量嵌入写回 AlloyDB for PostgreSQL

在构建流水线逻辑之前,将所需的类导入到流水线代码中。如需详细了解流水线组件,请参阅导入流水线组件

创建示例数据

使用 Apache Beam 和 AlloyDB 提取向量嵌入 Colab 提供了用于运行流水线的示例 products_data 数据。流水线使用此示例数据作为输入,与嵌入模型一起生成嵌入。

如需了解详情,请参阅创建示例数据

创建用于存储嵌入的表

该流水线会将生成的嵌入存储在 default_dataflow_product_embeddings 表中。如需详细了解如何创建表架构,请参阅创建具有默认架构的表

可选:准备数据以进行嵌入提取

根据您的数据集,您可以将数据拆分为元数据和文本,嵌入模型必须将这些数据转换为嵌入。MLTransform()VectorDatabaseWriteTransform() 类会将输入数据处理为嵌入模型支持的大小。纳入元数据,并根据所用嵌入模型的规范设置输入数据格式。

如需详细了解如何准备数据,请参阅将商品数据映射到块

配置嵌入处理程序以生成嵌入

VertexAITextEmbeddings() 类定义了用于创建向量嵌入的文本嵌入模型。此嵌入模型会将分块数据转换为嵌入。

如需了解详情,请参阅配置嵌入处理程序

您还可以使用通过 Huggingface SentenceTransformers 框架创建的预训练模型来生成向量嵌入。如需了解详情,请参阅使用 HuggingFace 生成嵌入

创建提取流水线

使用 Apache Beam 和 AlloyDB 提取向量嵌入 Colab 中提供的 basic_ingestion_pipeline.py 流水线纳入了前面部分中的配置,包括 AlloyDB for PostgreSQL 设置、将数据加载到 AlloyDB for PostgreSQL、可选的数据分块和嵌入处理程序配置。

提取流水线会执行以下操作:

  • 创建商品数据表
  • 将数据转换为块
  • 生成嵌入
  • 将转换的嵌入写入 AlloyDB for PostgreSQL 中的 products_data

您可以使用直接本地运行程序或云端运行程序(如 Dataflow)来运行此流水线。

如需详细了解如何创建提取流水线,请参阅将流水线保存到 Python 文件

运行 Dataflow 流水线

您可以通过命令行运行 Dataflow 流水线。传递凭证,例如项目 ID、AlloyDB for PostgreSQL 连接详细信息、Cloud Storage 存储桶位置、执行环境详细信息、网络信息和提取流水线的名称 (basic_ingestion_pipeline.py)。

使用 Apache Beam 和 AlloyDB 提取向量嵌入 Colab 中,AlloyDB for PostgreSQL 实例和 Dataflow 作业在同一 VPC 网络和子网中运行。

如需详细了解如何在 Dataflow 中运行流水线,请参阅在 Dataflow 上运行流水线

在 Google Cloud 控制台的 Dataflow 信息中心内,您可以在流水线执行期间查看执行图、日志和指标。

可选:运行流式 Dataflow 流水线

对于预计会经常更改的数据(例如相似度搜索或商品推荐引擎),请考虑使用 Dataflow 和 Pub/Sub 创建流式流水线。

此流水线不是处理一批数据,而是持续读取来自 Pub/Sub 主题的传入消息,将消息转换为块,使用指定模型(如 Hugging Face 或 Vertex AI)生成嵌入,并更新 AlloyDB for PostgreSQL 表。

如需了解详情,请参阅通过 Pub/Sub 流式传输嵌入更新

验证 AlloyDB for PostgreSQL 中的向量嵌入

流水线执行后,验证流水线是否已将嵌入写入 AlloyDB for PostgreSQL 数据库。

如需了解详情,请参阅验证写入的嵌入

后续步骤