本文說明如何使用 Dataflow 建立 PostgreSQL 適用的 AlloyDB「擷取、轉換及載入」(ETL) 管道。 Google Cloud Dataflow 是全代管 Google Cloud 服務,用於開發及執行資料處理管道。
您可以按照文件中的操作說明,以 Vector Embedding Ingestion with Apache Beam and AlloyDB Colab 為基礎,使用 Python 建立 basic_ingestion_pipeline.py
擷取管道。您可以將本文資訊用於語意搜尋或檢索增強生成 (RAG) 等用途。
這些操作說明會介紹下列 Dataflow 管道元件:
- 設定 AlloyDB 和 Dataflow 連線
- 使用 Apache Beam
VertexAITextEmbeddings
處理常式和 Vertex AI 文字嵌入模型,在 PostgreSQL 適用的 AlloyDB 中生成嵌入 - 在 Dataflow 中建立串流管道
事前準備
使用 Colab 建立 Dataflow 管道前,請先完成下列必要條件:
- 設定環境,建立 Dataflow 管道。
啟用 PostgreSQL 適用的 AlloyDB 和其他必要 API:
gcloud services enable alloydb.googleapis.com cloudresourcemanager.googleapis.com \ servicenetworking.googleapis.com
設定 AlloyDB for PostgreSQL 執行個體和管道元件
首先,請設定管道,連線至 AlloyDB for PostgreSQL 執行個體。這項設定包括定義 Google Cloud 專案 ID、PostgreSQL 適用的 AlloyDB 執行個體 URI、使用者和密碼,以便使用 AlloyDB 語言連接器連線。如要進一步瞭解如何設定連線,請參閱「資料庫設定」。
檢索增強生成 (RAG) 專用的 Apache Beam 模組提供下列工作的類別:
- 從 PostgreSQL 適用的 AlloyDB 擷取資料
- 生成嵌入項目
- 將這些向量嵌入內容寫回 PostgreSQL 適用的 AlloyDB
建構管道邏輯前,請先將必要類別匯入管道程式碼。如要進一步瞭解管道元件,請參閱「匯入管道元件」。
建立樣本資料
使用 Apache Beam 和 AlloyDB 擷取向量嵌入 Colab 提供範例 products_data
資料,用於執行管道。管道會將這項範例資料和嵌入模型做為輸入內容,產生嵌入。
詳情請參閱「建立範例資料」。
建立資料表來儲存嵌入
管道會將產生的嵌入儲存在 default_dataflow_product_embeddings
資料表中。如要進一步瞭解如何建立資料表結構定義,請參閱「建立含有預設結構定義的資料表」。
選用:準備要擷取的嵌入內容資料
根據資料集,您可以將資料分割為中繼資料和文字,嵌入模型必須將這些資料轉換為嵌入項目。MLTransform()
和 VectorDatabaseWriteTransform()
類別會將輸入資料處理成嵌入模型支援的大小。請根據所用嵌入模型的規格,加入中繼資料並設定輸入資料格式。
如要進一步瞭解如何準備資料,請參閱「將產品資料對應至區塊」。
設定嵌入處理常式來產生嵌入
VertexAITextEmbeddings()
類別會定義建立向量嵌入項目的文字嵌入模型。這個嵌入模型會將分塊資料轉換為嵌入。
詳情請參閱「設定嵌入處理常式」。
您也可以使用以 Huggingface SentenceTransformers 架構建立的預先訓練模型,生成向量嵌入。詳情請參閱「使用 HuggingFace 生成嵌入內容」。
建立擷取管道
basic_ingestion_pipeline.py
管道 (位於 Vector Embedding Ingestion with Apache Beam and AlloyDB Colab 中) 會納入先前各節的設定,包括 AlloyDB for PostgreSQL 設定、將資料載入 AlloyDB for PostgreSQL、選用資料分塊,以及嵌入處理常式設定。
擷取管道會執行下列作業:
- 建立產品資料表
- 將資料轉換為區塊
- 生成嵌入
- 將轉換後的嵌入寫入 PostgreSQL 適用的 AlloyDB 中的
products_data
資料表
您可以使用直接本機執行器或雲端執行器 (例如 Dataflow) 執行這個管道。
如要進一步瞭解如何建立擷取管道,請參閱「將管道儲存至 Python 檔案」。
執行 Dataflow 管道
您可以透過指令列執行 Dataflow 管道。傳遞憑證,例如專案 ID、PostgreSQL 適用的 AlloyDB 連線詳細資料、Cloud Storage 值區位置、執行環境詳細資料、網路資訊,以及擷取管道的名稱 (basic_ingestion_pipeline.py
)。
在「使用 Apache Beam 和 AlloyDB 擷取向量嵌入內容」Colab 中,PostgreSQL 適用的 AlloyDB 執行個體和 Dataflow 工作會在相同的虛擬私有雲網路和子網路中執行。
如要進一步瞭解如何在 Dataflow 中執行管道,請參閱「在 Dataflow 上執行管道」。
在 Google Cloud 控制台的 Dataflow 資訊主頁中,您可以在管道執行時查看執行圖、記錄和指標。
選用:執行串流 Dataflow 管道
如果資料預計會經常變更 (例如相似性搜尋或推薦引擎),請考慮使用 Dataflow 和 Pub/Sub 建立串流管道。
這個管道不會處理批次資料,而是持續從 Pub/Sub 主題讀取傳入的訊息、將訊息轉換為區塊、使用指定模型 (例如 Hugging Face 或 Vertex AI) 產生嵌入內容,並更新 AlloyDB for PostgreSQL 表格。
詳情請參閱「從 Pub/Sub 串流嵌入更新」。
驗證 AlloyDB for PostgreSQL 中的向量嵌入
管道執行完畢後,請確認管道是否已將嵌入寫入 AlloyDB for PostgreSQL 資料庫。
詳情請參閱「驗證手寫內嵌內容」。