將資料從向量資料庫遷移至 AlloyDB


本教學課程說明如何使用 LangChain VectorStore,將資料從第三方向量資料庫遷移至 PostgreSQL 適用的 AlloyDB。本教學課程假設第三方向量資料庫中的資料是使用 LangChain VectorStore 整合功能建立。如果您未使用 LangChain,而是將資訊放入下列其中一個資料庫,可能需要編輯下方提供的指令碼,以符合資料的結構定義。系統支援下列向量資料庫:

本教學課程假設您熟悉 Google Cloud、AlloyDB 和非同步 Python 程式設計。

目標

本教學課程將說明如何執行下列操作:

  • 從現有向量資料庫擷取資料。
  • 連線至 AlloyDB。
  • 初始化 AlloyDB 資料表。
  • 初始化向量儲存區物件。
  • 執行遷移指令碼來插入資料。

費用

在本文件中,您會使用 Google Cloud的下列計費元件:

You might be eligible for a free trial cluster. For more information, see AlloyDB free trial clusters overview.

如要根據預測用量估算費用,請使用 Pricing Calculator

初次使用 Google Cloud 的使用者可能符合免費試用資格。

完成本文所述工作後,您可以刪除已建立的資源,避免繼續計費。詳情請參閱清除所用資源一節。

事前準備

請確認您具備下列其中一個 LangChain 第三方資料庫向量儲存空間:

啟用計費功能和必要的 API

  1. 在 Google Cloud 控制台的專案選擇器頁面中,選取或建立專案。Google Cloud

    前往專案選取器

  2. 確認您已為 Google Cloud 專案啟用計費功能

  3. 啟用建立及連線至 PostgreSQL 適用的 AlloyDB 時所需的 Cloud API。

    啟用 API

    1. 在「確認專案」步驟中,按一下「下一步」,確認要變更的專案名稱。
    2. 在「啟用 API」步驟中,按一下「啟用」,啟用下列項目:

      • AlloyDB API
      • Compute Engine API
      • Service Networking API

必要的角色

如要取得完成本教學課程中工作所需的權限,請具備下列身分與存取權管理 (IAM) 角色,以便建立資料表及插入資料:

如要使用 IAM 驗證機制驗證資料庫,而非本教學課程中的內建驗證機制,請使用筆記本,瞭解如何使用 PostgreSQL 適用的 AlloyDB 搭配 AlloyDBVectorStore 類別儲存向量嵌入

建立 AlloyDB 叢集和使用者

  1. 建立 AlloyDB 叢集和執行個體
    • 啟用公開 IP,即可隨時隨地執行本教學課程。如果您使用私人 IP,必須在虛擬私有雲中執行本教學課程。
  2. 建立或選取 AlloyDB 資料庫使用者
    • 建立執行個體時,系統會建立 postgres 使用者並指派密碼。這位使用者擁有超級使用者權限。
    • 本教學課程使用內建驗證功能,減少驗證方面的不便。您可以使用 AlloyDBEngine 進行 IAM 驗證。

擷取程式碼範例

  1. 複製存放區,從 GitHub 複製程式碼範例:

    git clone https://github.com/googleapis/langchain-google-alloydb-pg-python.git
  2. 請前往 migrations 目錄:

    cd langchain-google-alloydb-pg-python/samples/migrations

從現有向量資料庫擷取資料

  1. 建立用戶端。

    Pinecone

    from pinecone import Pinecone  # type: ignore
    
    pinecone_client = Pinecone(api_key=pinecone_api_key)
    pinecone_index = pinecone_client.Index(pinecone_index_name)

    Weaviate

    import weaviate
    
    # For a locally running weaviate instance, use `weaviate.connect_to_local()`
    weaviate_client = weaviate.connect_to_weaviate_cloud(
        cluster_url=weaviate_cluster_url,
        auth_credentials=weaviate.auth.AuthApiKey(weaviate_api_key),
    )

    Chroma

    from langchain_chroma import Chroma
    
    chromadb_client = Chroma(
        collection_name=chromadb_collection_name,
        embedding_function=embeddings_service,
        persist_directory=chromadb_path,
    )

    Qdrant

    from qdrant_client import QdrantClient
    
    qdrant_client = QdrantClient(path=qdrant_path)
    

    Milvus

    milvus_client = MilvusClient(uri=milvus_uri)
  2. 從資料庫取得所有資料。

    Pinecone

    從 Pinecone 索引擷取向量 ID:

    results = pinecone_index.list_paginated(
        prefix="", namespace=pinecone_namespace, limit=pinecone_batch_size
    )
    ids = [v.id for v in results.vectors]
    if ids:  # Prevents yielding an empty list.
        yield ids
    
    # Check BOTH pagination and pagination.next
    while results.pagination is not None and results.pagination.get("next") is not None:
        pagination_token = results.pagination.get("next")
        results = pinecone_index.list_paginated(
            prefix="",
            pagination_token=pagination_token,
            namespace=pinecone_namespace,
            limit=pinecone_batch_size,
        )
    
        # Extract and yield the next batch of IDs
        ids = [v.id for v in results.vectors]
        if ids:  # Prevents yielding an empty list.
            yield ids

    然後從 Pinecone 索引依 ID 擷取記錄:

    import uuid
    
    # Iterate through the IDs and download their contents
    for ids in id_iterator:
        all_data = pinecone_index.fetch(ids=ids, namespace=pinecone_namespace)
        ids = []
        embeddings = []
        contents = []
        metadatas = []
    
        # Process each vector in the current batch
        for doc in all_data["vectors"].values():
            # You might need to update this data translation logic according to one or more of your field names
            if pinecone_id_column_name in doc:
                # pinecone_id_column_name stores the unqiue identifier for the content
                ids.append(doc[pinecone_id_column_name])
            else:
                # Generate a uuid if pinecone_id_column_name is missing in source
                ids.append(str(uuid.uuid4()))
            # values is the vector embedding of the content
            embeddings.append(doc["values"])
            # Check if pinecone_content_column_name exists in metadata before accessing
            if pinecone_content_column_name in doc.metadata:
                # pinecone_content_column_name stores the content which was encoded
                contents.append(str(doc.metadata[pinecone_content_column_name]))
                # Remove pinecone_content_column_name after processing
                del doc.metadata[pinecone_content_column_name]
            else:
                # Handle the missing pinecone_content_column_name field appropriately
                contents.append("")
            # metadata is the additional context
            metadatas.append(doc["metadata"])
    
        # Yield the current batch of results
        yield ids, contents, embeddings, metadatas

    Weaviate

    # Iterate through the IDs and download their contents
    weaviate_collection = weaviate_client.collections.get(weaviate_collection_name)
    ids: list[str] = []
    content: list[Any] = []
    embeddings: list[list[float]] = []
    metadatas: list[Any] = []
    
    for item in weaviate_collection.iterator(include_vector=True):
        # You might need to update this data translation logic according to one or more of your field names
        # uuid is the unqiue identifier for the content
        ids.append(str(item.uuid))
        # weaviate_text_key is the content which was encoded
        content.append(item.properties[weaviate_text_key])
        # vector is the vector embedding of the content
        embeddings.append(item.vector["default"])  # type: ignore
        del item.properties[weaviate_text_key]  # type: ignore
        # properties is the additional context
        metadatas.append(item.properties)
    
        if len(ids) >= weaviate_batch_size:
            # Yield the current batch of results
            yield ids, content, embeddings, metadatas
            # Reset lists to start a new batch
            ids = []
            content = []
            embeddings = []
            metadatas = []

    Chroma

    # Iterate through the IDs and download their contents
    offset = 0
    while True:
        # You might need to update this data translation logic according to one or more of your field names
        # documents is the content which was encoded
        # embeddings is the vector embedding of the content
        # metadatas is the additional context
        docs = chromadb_client.get(
            include=["metadatas", "documents", "embeddings"],
            limit=chromadb_batch_size,
            offset=offset,
        )
    
        if len(docs["documents"]) == 0:
            break
    
        # ids is the unqiue identifier for the content
        yield docs["ids"], docs["documents"], docs["embeddings"].tolist(), docs[
            "metadatas"
        ]
    
        offset += chromadb_batch_size
    

    Qdrant

    # Iterate through the IDs and download their contents
    offset = None
    while True:
        docs, offset = qdrant_client.scroll(
            collection_name=qdrant_collection_name,
            with_vectors=True,
            limit=qdrant_batch_size,
            offset=offset,
            with_payload=True,
        )
    
        ids: List[str] = []
        contents: List[Any] = []
        embeddings: List[List[float]] = []
        metadatas: List[Any] = []
    
        for doc in docs:
            if doc.payload and doc.vector:
                # You might need to update this data translation logic according to one or more of your field names
                # id is the unqiue identifier for the content
                ids.append(str(doc.id))
                # page_content is the content which was encoded
                contents.append(doc.payload["page_content"])
                # vector is the vector embedding of the content
                embeddings.append(doc.vector)  # type: ignore
                # metatdata is the additional context
                metadatas.append(doc.payload["metadata"])
    
        yield ids, contents, embeddings, metadatas
    
        if not offset:
            break
    

    Milvus

    # Iterate through the IDs and download their contents
    iterator = milvus_client.query_iterator(
        collection_name=milvus_collection_name,
        filter='pk >= "0"',
        output_fields=["pk", "text", "vector", "idv"],
        batch_size=milvus_batch_size,
    )
    
    while True:
        ids = []
        content = []
        embeddings = []
        metadatas = []
        page = iterator.next()
        if len(page) == 0:
            iterator.close()
            break
        for i in range(len(page)):
            # You might need to update this data translation logic according to one or more of your field names
            doc = page[i]
            # pk is the unqiue identifier for the content
            ids.append(doc["pk"])
            # text is the content which was encoded
            content.append(doc["text"])
            # vector is the vector embedding of the content
            embeddings.append(doc["vector"])
            del doc["pk"]
            del doc["text"]
            del doc["vector"]
            # doc is the additional context
            metadatas.append(doc)
        yield ids, content, embeddings, metadatas

初始化 AlloyDB 資料表

  1. 定義嵌入服務。

    VectorStore 介面需要嵌入服務。這個工作流程不會產生新的嵌入內容,因此使用 FakeEmbeddings 類別可避免任何費用。

    Pinecone

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)

    Weaviate

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)

    Chroma

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)

    Qdrant

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)

    Milvus

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)
  2. 準備 AlloyDB 資料表。

    1. 使用公開 IP 連線連線至 AlloyDB。 詳情請參閱「指定 IP 位址類型」。

      Pinecone

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
          ip_type=IPTypes.PUBLIC,  # Optionally use IPTypes.PRIVATE
      )

      Weaviate

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
          ip_type=IPTypes.PUBLIC,
      )

      Chroma

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
          ip_type=IPTypes.PUBLIC,
      )

      Qdrant

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
          ip_type=IPTypes.PUBLIC,
      )

      Milvus

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
          ip_type=IPTypes.PUBLIC,
      )
    2. 如果資料表不存在,請建立資料表,以便複製資料。

      Pinecone

      from langchain_google_alloydb_pg import Column
      
      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
          # Customize the ID column types if not using the UUID data type
          # id_column=Column("langchain_id", "TEXT"),  # Default is Column("langchain_id", "UUID")
          # overwrite_existing=True,  # Drop the old table and Create a new vector store table
      )

      Weaviate

      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
          # Customize the ID column types with `id_column` if not using the UUID data type
      )
      

      Chroma

      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
          # Customize the ID column types with `id_column` if not using the UUID data type
      )

      Qdrant

      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
          # Customize the ID column types with `id_column` if not using the UUID data type
      )

      Milvus

      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
          # Customize the ID column types with `id_column` if not using the UUID data type
      )

初始化向量儲存庫物件

這段程式碼會以 JSON 格式,在 langchain_metadata 資料欄中新增額外的向量嵌入中繼資料。如要更有效率地篩選,請將這項中繼資料整理到不同的資料欄。詳情請參閱「建立自訂向量儲存空間」。

  1. 如要初始化向量儲存物件,請執行下列指令:

    Pinecone

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )

    Weaviate

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )

    Chroma

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )

    Qdrant

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )

    Milvus

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )
  2. 將資料插入 AlloyDB 資料表:

    Pinecone

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

    Weaviate

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

    Chroma

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

    Qdrant

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

    Milvus

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

執行遷移指令碼

  1. 設定 Python 環境

  2. 安裝範例依附元件:

    pip install -r requirements.txt
  3. 執行範例遷移作業。

    Pinecone

    python migrate_pinecone_vectorstore_to_alloydb.py

    執行範例前,請先進行下列替換作業:

    • PINECONE_API_KEY:Pinecone API 金鑰。
    • PINECONE_NAMESPACE:Pinecone 命名空間。
    • PINECONE_INDEX_NAME:Pinecone 索引的名稱。
    • PROJECT_ID:專案 ID。
    • REGION:部署 AlloyDB 叢集的區域。
    • CLUSTER:叢集名稱。
    • INSTANCE:執行個體的名稱。
    • DB_NAME:資料庫名稱。
    • DB_USER:資料庫使用者的名稱。
    • DB_PWD:資料庫密碼。

    Weaviate

    python migrate_weaviate_vectorstore_to_alloydb.py

    執行範例前,請先進行下列替換作業:

    • WEAVIATE_API_KEY:Weaviate API 金鑰。
    • WEAVIATE_CLUSTER_URL:Weaviate 叢集網址。
    • WEAVIATE_COLLECTION_NAME:Weaviate 集合名稱。
    • PROJECT_ID:專案 ID。
    • REGION:部署 AlloyDB 叢集的區域。
    • CLUSTER:叢集名稱。
    • INSTANCE:執行個體的名稱。
    • DB_NAME:資料庫名稱。
    • DB_USER:資料庫使用者的名稱。
    • DB_PWD:資料庫密碼。

    Chroma

    python migrate_chromadb_vectorstore_to_alloydb.py

    執行範例前,請先進行下列替換作業:

    • CHROMADB_PATH:Chroma 資料庫路徑。
    • CHROMADB_COLLECTION_NAME:Chroma 資料庫集合的名稱。
    • PROJECT_ID:專案 ID。
    • REGION:部署 AlloyDB 叢集的區域。
    • CLUSTER:叢集名稱。
    • INSTANCE:執行個體的名稱。
    • DB_NAME:資料庫名稱。
    • DB_USER:資料庫使用者的名稱。
    • DB_PWD:資料庫密碼。

    Qdrant

    python migrate_qdrant_vectorstore_to_alloydb.py

    執行範例前,請先進行下列替換作業:

    • QDRANT_PATH:Qdrant 資料庫路徑。
    • QDRANT_COLLECTION_NAME:Qdrant 集合名稱。
    • PROJECT_ID:專案 ID。
    • REGION:部署 AlloyDB 叢集的區域。
    • CLUSTER:叢集名稱。
    • INSTANCE:執行個體的名稱。
    • DB_NAME:資料庫名稱。
    • DB_USER:資料庫使用者的名稱。
    • DB_PWD:資料庫密碼。

    Milvus

    python migrate_milvus_vectorstore_to_alloydb.py

    執行範例前,請先進行下列替換作業:

    • MILVUS_URI:Milvus URI。
    • MILVUS_COLLECTION_NAME:Milvus 集合的名稱。
    • PROJECT_ID:專案 ID。
    • REGION:部署 AlloyDB 叢集的區域。
    • CLUSTER:叢集名稱。
    • INSTANCE:執行個體的名稱。
    • DB_NAME:資料庫名稱。
    • DB_USER:資料庫使用者的名稱。
    • DB_PWD:資料庫密碼。

    如果遷移成功,系統會列印類似下列內容的記錄,且不會發生任何錯誤:
    Migration completed, inserted all the batches of data to AlloyDB

  4. 開啟 AlloyDB Studio,查看已遷移的資料。詳情請參閱使用 AlloyDB Studio 管理資料

清除所用資源

如要避免系統向您的 Google Cloud 帳戶收取本教學課程中所用資源的相關費用,請刪除含有該項資源的專案,或者保留專案但刪除個別資源。

  1. 前往 Google Cloud 控制台的「Clusters」(叢集) 頁面。

    前往「Clusters」(叢集) 頁面

  2. 在「資源名稱」欄中,按一下您建立的叢集名稱。

  3. 按一下 「刪除叢集」

  4. 在「刪除叢集」中輸入叢集名稱,確認要刪除叢集。

  5. 點選「刪除」。

    如果您在建立叢集時建立了私人連線,請刪除該連線:

  6. 前往 Google Cloud 控制台的「Networking」(網路) 頁面,然後按一下「Delete VPC network」(刪除虛擬私有雲網路)

後續步驟