将数据从向量数据库迁移到 AlloyDB


本教程介绍了如何使用 LangChain 矢量存储将数据从第三方矢量数据库迁移到 AlloyDB for PostgreSQL。支持以下矢量数据库:

本教程假定您熟悉 Google Cloud、AlloyDB 和异步 Python 编程。

目标

本教程介绍了如何执行以下操作:

  • 从现有矢量数据库中提取数据。
  • 连接到 AlloyDB。
  • 初始化 AlloyDB 表。
  • 初始化矢量存储对象。
  • 运行迁移脚本以插入数据。

费用

在本文档中,您将使用 Google Cloud 的以下收费组件:

您可使用价格计算器根据您的预计使用情况来估算费用。 Google Cloud 新用户可能有资格申请免费试用

完成本文档中描述的任务后,您可以通过删除所创建的资源来避免继续计费。如需了解详情,请参阅清理

准备工作

确保您拥有以下 LangChain 第三方数据库向量存储之一:

启用结算功能和所需的 API

  1. 在 Google Cloud 控制台的项目选择器页面上,选择或创建一个Google Cloud 项目。

    转到“项目选择器”

  2. 确保您的 Google Cloud 项目已启用结算功能

  3. 启用创建和连接到 AlloyDB for PostgreSQL 所需的 Cloud API。

    启用 API

    1. 确认项目步骤中,点击下一步以确认要更改的项目的名称。
    2. 启用 API 步骤中,点击启用以启用以下各项:

      • AlloyDB API
      • Compute Engine API
      • Service Networking API

所需的角色

如需获得完成本教程中任务所需的权限,请拥有以下允许创建表和插入数据的 Identity and Access Management (IAM) 角色:

如果您想使用 IAM 身份验证(而不是本教程中的内置身份验证)对数据库进行身份验证,请使用该笔记本,其中介绍了如何使用 AlloyDB for PostgreSQL 通过 AlloyDBVectorStore 类存储矢量嵌入

创建 AlloyDB 集群和用户

  1. 创建 AlloyDB 集群和实例
    • 启用公共 IP,即可随时随地运行本教程。如果您使用的是专用 IP,则必须在 VPC 中运行本教程。
  2. 创建或选择 AlloyDB 数据库用户
    • 创建实例时,系统会创建一个具有密码的 postgres 用户。此用户拥有超级用户权限。
    • 本教程使用内置身份验证来减少任何身份验证摩擦。您可以使用 AlloyDBEngine 进行 IAM 身份验证。

检索代码示例

  1. 通过克隆代码库从 GitHub 复制代码示例:

    git clone https://github.com/googleapis/langchain-google-alloydb-pg-python.git
  2. 导航到 migrations 目录:

    cd langchain-google-alloydb-pg-python/samples/migrations

从现有矢量数据库中提取数据

  1. 创建客户端。

    Pinecone

    from pinecone import Pinecone  # type: ignore
    
    pinecone_client = Pinecone(api_key=pinecone_api_key)
    pinecone_index = pinecone_client.Index(pinecone_index_name)

    Weaviate

    import weaviate
    
    # For a locally running weaviate instance, use `weaviate.connect_to_local()`
    weaviate_client = weaviate.connect_to_weaviate_cloud(
        cluster_url=weaviate_cluster_url,
        auth_credentials=weaviate.auth.AuthApiKey(weaviate_api_key),
    )

    色度

    from langchain_chroma import Chroma
    
    chromadb_client = Chroma(
        collection_name=chromadb_collection_name,
        embedding_function=embeddings_service,
        persist_directory=chromadb_path,
    )

    Qdrant

    from qdrant_client import QdrantClient
    
    qdrant_client = QdrantClient(path=qdrant_path)
    

    Milvus

    milvus_client = MilvusClient(uri=milvus_uri)
  2. 从数据库中获取所有数据。

    Pinecone

    从 Pinecone 索引检索矢量 ID:

    results = pinecone_index.list_paginated(
        prefix="", namespace=pinecone_namespace, limit=pinecone_batch_size
    )
    ids = [v.id for v in results.vectors]
    yield ids
    
    while results.pagination is not None:
        pagination_token = results.pagination.next
        results = pinecone_index.list_paginated(
            prefix="", pagination_token=pagination_token, limit=pinecone_batch_size
        )
    
        # Extract and yield the next batch of IDs
        ids = [v.id for v in results.vectors]
        yield ids

    然后,从 Pinecone 索引中按 ID 提取记录:

    # Iterate through the IDs and download their contents
    for ids in id_iterator:
        all_data = pinecone_index.fetch(ids=ids)
        ids = []
        embeddings = []
        contents = []
        metadatas = []
    
        # Process each vector in the current batch
        for doc in all_data["vectors"].values():
            ids.append(doc["id"])
            embeddings.append(doc["values"])
            contents.append(str(doc["metadata"]["text"]))
            del doc["metadata"]["text"]
            metadata = doc["metadata"]
            metadatas.append(metadata)
    
        # Yield the current batch of results
        yield ids, contents, embeddings, metadatas

    Weaviate

    # Iterate through the IDs and download their contents
    weaviate_collection = weaviate_client.collections.get(weaviate_collection_name)
    ids = []
    content = []
    embeddings = []
    metadatas = []
    
    for item in weaviate_collection.iterator(include_vector=True):
        ids.append(str(item.uuid))
        content.append(item.properties[weaviate_text_key])
        embeddings.append(item.vector["default"])
        del item.properties[weaviate_text_key]  # type: ignore
        metadatas.append(item.properties)
    
        if len(ids) >= weaviate_batch_size:
            # Yield the current batch of results
            yield ids, content, embeddings, metadatas
            # Reset lists to start a new batch
            ids = []
            content = []
            embeddings = []
            metadatas = []

    色度

    # Iterate through the IDs and download their contents
    offset = 0
    while True:
        docs = chromadb_client.get(
            include=["metadatas", "documents", "embeddings"],
            limit=chromadb_batch_size,
            offset=offset,
        )
    
        if len(docs["documents"]) == 0:
            break
    
        yield docs["ids"], docs["documents"], docs["embeddings"].tolist(), docs[
            "metadatas"
        ]
    
        offset += chromadb_batch_size
    

    Qdrant

    # Iterate through the IDs and download their contents
    offset = None
    while True:
        docs, offset = qdrant_client.scroll(
            collection_name=qdrant_collection_name,
            with_vectors=True,
            limit=qdrant_batch_size,
            offset=offset,
            with_payload=True,
        )
    
        ids: List[str] = []
        contents: List[Any] = []
        embeddings: List[List[float]] = []
        metadatas: List[Any] = []
    
        for doc in docs:
            if doc.payload and doc.vector:
                ids.append(str(doc.id))
                contents.append(doc.payload["page_content"])
                embeddings.append(doc.vector)  # type: ignore
                metadatas.append(doc.payload["metadata"])
    
        yield ids, contents, embeddings, metadatas
    
        if not offset:
            break
    

    Milvus

    # Iterate through the IDs and download their contents
    iterator = milvus_client.query_iterator(
        collection_name=milvus_collection_name,
        filter='pk >= "0"',
        output_fields=["pk", "text", "vector", "idv"],
        batch_size=milvus_batch_size,
    )
    
    while True:
        ids = []
        content = []
        embeddings = []
        metadatas = []
        page = iterator.next()
        if len(page) == 0:
            iterator.close()
            break
        for i in range(len(page)):
            doc = page[i]
            ids.append(doc["pk"])
            content.append(doc["text"])
            embeddings.append(doc["vector"])
            del doc["pk"]
            del doc["text"]
            del doc["vector"]
            metadatas.append(doc)
        yield ids, content, embeddings, metadatas

初始化 AlloyDB 表

  1. 定义嵌入服务。

    VectorStore 接口需要嵌入服务。此工作流不会生成新的嵌入,因此使用 FakeEmbeddings 类可避免任何开销。

    Pinecone

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)

    Weaviate

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)

    色度

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)

    Qdrant

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)

    Milvus

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)
  2. 准备 AlloyDB 表。

    1. 使用公共 IP 连接到 AlloyDB。 如需了解详情,请参阅指定 IP 地址类型

      Pinecone

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
      )

      Weaviate

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
      )

      色度

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
      )

      Qdrant

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
      )

      Milvus

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
      )
    2. 创建要将数据复制到的表(如果不存在)。

      Pinecone

      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
      )

      Weaviate

      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
      )
      

      色度

      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
      )

      Qdrant

      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
      )

      Milvus

      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
      )

初始化矢量存储对象

此代码会以 JSON 格式将其他矢量嵌入元数据添加到 langchain_metadata 列。为了提高过滤效率,请将这些元数据整理到单独的列中。如需了解详情,请参阅创建自定义矢量存储空间

  1. 如需初始化矢量存储区对象,请运行以下命令:

    Pinecone

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )

    Weaviate

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )

    色度

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )

    Qdrant

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )

    Milvus

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )
  2. 将数据插入 AlloyDB 表中:

    Pinecone

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

    Weaviate

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

    色度

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

    Qdrant

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

    Milvus

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

运行迁移脚本

  1. 设置 Python 环境

  2. 安装示例依赖项:

    pip install -r requirements.txt
  3. 运行示例迁移。

    Pinecone

    python migrate_pinecone_vectorstore_to_alloydb.py

    在运行示例之前,请先进行以下替换:

    • PINECONE_API_KEY:Pinecone API 密钥。
    • PINECONE_NAMESPACE:Pinecone 命名空间。
    • PINECONE_INDEX_NAME:Pinecone 索引的名称。
    • PROJECT_ID:项目 ID。
    • REGION:AlloyDB 集群的部署区域。
    • CLUSTER:集群的名称。
    • INSTANCE:实例的名称。
    • DB_NAME:数据库的名称。
    • DB_USER:数据库用户的名称。
    • DB_PWD:数据库 Secret 密码。

    Weaviate

    python migrate_weaviate_vectorstore_to_alloydb.py

    在运行示例之前,请先进行以下替换:

    • WEAVIATE_API_KEY:Weaviate API 密钥。
    • WEAVIATE_CLUSTER_URL:Weaviate 集群网址。
    • WEAVIATE_COLLECTION_NAME:Weaviate 集合名称。
    • PROJECT_ID:项目 ID。
    • REGION:AlloyDB 集群的部署区域。
    • CLUSTER:集群的名称。
    • INSTANCE:实例的名称。
    • DB_NAME:数据库的名称。
    • DB_USER:数据库用户的名称。
    • DB_PWD:数据库 Secret 密码。

    色度

    python migrate_chromadb_vectorstore_to_alloydb.py

    在运行示例之前,请先进行以下替换:

    • CHROMADB_PATH:Chroma 数据库路径。
    • CHROMADB_COLLECTION_NAME:Chroma 数据库集合的名称。
    • PROJECT_ID:项目 ID。
    • REGION:AlloyDB 集群的部署区域。
    • CLUSTER:集群的名称。
    • INSTANCE:实例的名称。
    • DB_NAME:数据库的名称。
    • DB_USER:数据库用户的名称。
    • DB_PWD:数据库 Secret 密码。

    Qdrant

    python migrate_qdrant_vectorstore_to_alloydb.py

    在运行示例之前,请先进行以下替换:

    • QDRANT_PATH:Qdrant 数据库路径。
    • QDRANT_COLLECTION_NAME:Qdrant 集合名称。
    • PROJECT_ID:项目 ID。
    • REGION:AlloyDB 集群的部署区域。
    • CLUSTER:集群的名称。
    • INSTANCE:实例的名称。
    • DB_NAME:数据库的名称。
    • DB_USER:数据库用户的名称。
    • DB_PWD:数据库 Secret 密码。

    Milvus

    python migrate_milvus_vectorstore_to_alloydb.py

    在运行示例之前,请先进行以下替换:

    • MILVUS_URI:Milvus URI。
    • MILVUS_COLLECTION_NAME:Milvus 集合的名称。
    • PROJECT_ID:项目 ID。
    • REGION:AlloyDB 集群的部署区域。
    • CLUSTER:集群的名称。
    • INSTANCE:实例的名称。
    • DB_NAME:数据库的名称。
    • DB_USER:数据库用户的名称。
    • DB_PWD:数据库 Secret 密码。

    迁移成功后,系统会输出类似于以下内容的日志,且不会出现任何错误:
    Migration completed, inserted all the batches of data to AlloyDB

  4. 打开 AlloyDB Studio 查看迁移的数据。如需了解详情,请参阅使用 AlloyDB Studio 管理数据

清理

为避免因本教程中使用的资源导致您的 Google Cloud 账号产生费用,请删除包含这些资源的项目,或者保留项目但删除各个资源。

  1. 在 Google Cloud 控制台中,前往集群页面。

    转到集群

  2. 资源名称列中,点击您创建的集群的名称。

  3. 点击 Delete cluster(删除集群)。

  4. 删除集群中,输入集群的名称以确认您要删除集群。

  5. 点击删除

    如果您在创建集群时创建了专用连接,请删除该专用连接:

  6. 前往 Google Cloud 控制台的“网络”页面,然后点击删除 VPC 网络

后续步骤