使用 Dataflow 為 AlloyDB 建構即時向量嵌入 pipeline

本文說明如何使用 Dataflow 建立 PostgreSQL 適用的 AlloyDB「擷取、轉換及載入」(ETL) 管道。 Google Cloud Dataflow 是全代管 Google Cloud 服務,用於開發及執行資料處理管道。

您可以按照文件中的操作說明,以 Vector Embedding Ingestion with Apache Beam and AlloyDB Colab 為基礎,使用 Python 建立 basic_ingestion_pipeline.py 擷取管道。您可以將本文資訊用於語意搜尋或檢索增強生成 (RAG) 等用途。

這些操作說明會介紹下列 Dataflow 管道元件:

  • 設定 AlloyDB 和 Dataflow 連線
  • 使用 Apache Beam VertexAITextEmbeddings 處理常式和 Vertex AI 文字嵌入模型,在 PostgreSQL 適用的 AlloyDB 中生成嵌入
  • 在 Dataflow 中建立串流管道

事前準備

使用 Colab 建立 Dataflow 管道前,請先完成下列必要條件:

設定 AlloyDB for PostgreSQL 執行個體和管道元件

首先,請設定管道,連線至 AlloyDB for PostgreSQL 執行個體。這項設定包括定義 Google Cloud 專案 ID、PostgreSQL 適用的 AlloyDB 執行個體 URI、使用者和密碼,以便使用 AlloyDB 語言連接器連線。如要進一步瞭解如何設定連線,請參閱「資料庫設定」。

檢索增強生成 (RAG) 專用的 Apache Beam 模組提供下列工作的類別:

  • 從 PostgreSQL 適用的 AlloyDB 擷取資料
  • 生成嵌入項目
  • 將這些向量嵌入內容寫回 PostgreSQL 適用的 AlloyDB

建構管道邏輯前,請先將必要類別匯入管道程式碼。如要進一步瞭解管道元件,請參閱「匯入管道元件」。

建立樣本資料

使用 Apache Beam 和 AlloyDB 擷取向量嵌入 Colab 提供範例 products_data 資料,用於執行管道。管道會將這項範例資料和嵌入模型做為輸入內容,產生嵌入。

詳情請參閱「建立範例資料」。

建立資料表來儲存嵌入

管道會將產生的嵌入儲存在 default_dataflow_product_embeddings 資料表中。如要進一步瞭解如何建立資料表結構定義,請參閱「建立含有預設結構定義的資料表」。

選用:準備要擷取的嵌入內容資料

根據資料集,您可以將資料分割為中繼資料和文字,嵌入模型必須將這些資料轉換為嵌入項目。MLTransform()VectorDatabaseWriteTransform() 類別會將輸入資料處理成嵌入模型支援的大小。請根據所用嵌入模型的規格,加入中繼資料並設定輸入資料格式。

如要進一步瞭解如何準備資料,請參閱「將產品資料對應至區塊」。

設定嵌入處理常式來產生嵌入

VertexAITextEmbeddings() 類別會定義建立向量嵌入項目的文字嵌入模型。這個嵌入模型會將分塊資料轉換為嵌入。

詳情請參閱「設定嵌入處理常式」。

您也可以使用以 Huggingface SentenceTransformers 架構建立的預先訓練模型,生成向量嵌入。詳情請參閱「使用 HuggingFace 生成嵌入內容」。

建立擷取管道

basic_ingestion_pipeline.py 管道 (位於 Vector Embedding Ingestion with Apache Beam and AlloyDB Colab 中) 會納入先前各節的設定,包括 AlloyDB for PostgreSQL 設定、將資料載入 AlloyDB for PostgreSQL、選用資料分塊,以及嵌入處理常式設定。

擷取管道會執行下列作業:

  • 建立產品資料表
  • 將資料轉換為區塊
  • 生成嵌入
  • 將轉換後的嵌入寫入 PostgreSQL 適用的 AlloyDB 中的 products_data 資料表

您可以使用直接本機執行器或雲端執行器 (例如 Dataflow) 執行這個管道。

如要進一步瞭解如何建立擷取管道,請參閱「將管道儲存至 Python 檔案」。

執行 Dataflow 管道

您可以透過指令列執行 Dataflow 管道。傳遞憑證,例如專案 ID、PostgreSQL 適用的 AlloyDB 連線詳細資料、Cloud Storage 值區位置、執行環境詳細資料、網路資訊,以及擷取管道的名稱 (basic_ingestion_pipeline.py)。

在「使用 Apache Beam 和 AlloyDB 擷取向量嵌入內容」Colab 中,PostgreSQL 適用的 AlloyDB 執行個體和 Dataflow 工作會在相同的虛擬私有雲網路和子網路中執行。

如要進一步瞭解如何在 Dataflow 中執行管道,請參閱「在 Dataflow 上執行管道」。

在 Google Cloud 控制台的 Dataflow 資訊主頁中,您可以在管道執行時查看執行圖、記錄和指標。

選用:執行串流 Dataflow 管道

如果資料預計會經常變更 (例如相似性搜尋或推薦引擎),請考慮使用 Dataflow 和 Pub/Sub 建立串流管道。

這個管道不會處理批次資料,而是持續從 Pub/Sub 主題讀取傳入的訊息、將訊息轉換為區塊、使用指定模型 (例如 Hugging Face 或 Vertex AI) 產生嵌入內容,並更新 AlloyDB for PostgreSQL 表格。

詳情請參閱「從 Pub/Sub 串流嵌入更新」。

驗證 AlloyDB for PostgreSQL 中的向量嵌入

管道執行完畢後,請確認管道是否已將嵌入寫入 AlloyDB for PostgreSQL 資料庫。

詳情請參閱「驗證手寫內嵌內容」。

後續步驟