Migre dados de uma base de dados vetorial para o AlloyDB


Este tutorial descreve como migrar dados de uma base de dados vetorial de terceiros para o AlloyDB for PostgreSQL através de um VectorStore do LangChain. Este tutorial pressupõe que os dados em bases de dados vetoriais de terceiros foram criados com uma integração do LangChain VectorStore. Se introduzir informações numa das seguintes bases de dados sem usar o LangChain, pode ter de editar os scripts fornecidos abaixo para corresponderem ao esquema dos seus dados. As seguintes bases de dados vetoriais são suportadas:

Este tutorial pressupõe que tem conhecimentos de Google Cloud, AlloyDB e programação Python assíncrona.

Objetivos

Este tutorial mostra como fazer o seguinte:

  • Extrair dados de uma base de dados vetorial existente.
  • Associe-a ao AlloyDB.
  • Inicialize a tabela do AlloyDB.
  • Inicialize um objeto de banco de dados vetorial.
  • Execute o script de migração para inserir os dados.

Custos

Neste documento, usa os seguintes componentes faturáveis do Google Cloud:

You might be eligible for a free trial cluster. For more information, see AlloyDB free trial clusters overview.

Para gerar uma estimativa de custos com base na sua utilização projetada, use a calculadora de preços.

Os novos Google Cloud utilizadores podem ser elegíveis para uma avaliação gratuita.

Quando terminar as tarefas descritas neste documento, pode evitar a faturação contínua eliminando os recursos que criou. Para mais informações, consulte o artigo Limpe.

Antes de começar

Certifique-se de que tem uma das seguintes lojas de vetores de base de dados de terceiros do LangChain:

Ative a faturação e as APIs necessárias

  1. Na Google Cloud consola, na página do seletor de projetos, selecione ou crie um Google Cloud projeto.

    Aceder ao seletor de projetos

  2. Certifique-se de que a faturação está ativada para o seu Google Cloud projeto.

  3. Ative as APIs Cloud necessárias para criar e estabelecer ligação ao AlloyDB para PostgreSQL.

    Ative as APIs

    1. No passo Confirmar projeto, clique em Seguinte para confirmar o nome do projeto ao qual vai fazer alterações.
    2. No passo Ativar APIs, clique em Ativar para ativar o seguinte:

      • API AlloyDB
      • API Compute Engine
      • API de rede de serviços

Funções necessárias

Para receber as autorizações necessárias para concluir as tarefas neste tutorial, tem de ter as seguintes funções de gestão de identidade e acesso (IAM), que permitem a criação de tabelas e a inserção de dados:

  • Proprietário (roles/owner) ou editor (roles/editor)
  • Se o utilizador não for proprietário nem editor, são necessárias as seguintes funções de IAM e privilégios do PostgreSQL:

Se quiser autenticar-se na sua base de dados através da autenticação do IAM, em vez de usar a autenticação incorporada neste tutorial, use o bloco de notas que mostra como usar o AlloyDB for PostgreSQL para armazenar incorporações de vetores com a classe AlloyDBVectorStore.

Crie um cluster e um utilizador do AlloyDB

  1. Crie um cluster e uma instância do AlloyDB.
    • Ative o IP público para executar este tutorial a partir de qualquer lugar. Se estiver a usar o IP privado, tem de executar este tutorial a partir da sua VPC.
  2. Crie ou selecione um utilizador da base de dados do AlloyDB.
    • Quando cria a instância, é criado um utilizador postgres com uma palavra-passe. Este utilizador tem autorizações de superutilizador.
    • Este tutorial usa a autenticação incorporada para reduzir qualquer atrito de autenticação. A autenticação IAM é possível através do AlloyDBEngine.

Obtenha o exemplo de código

  1. Copie o exemplo de código do GitHub clonando o repositório:

    git clone https://github.com/googleapis/langchain-google-alloydb-pg-python.git
  2. Navegue para o diretório migrations:

    cd langchain-google-alloydb-pg-python/samples/migrations

Extraia dados de uma base de dados vetorial existente

  1. Crie um cliente.

    Pinecone

    from pinecone import Pinecone  # type: ignore
    
    pinecone_client = Pinecone(api_key=pinecone_api_key)
    pinecone_index = pinecone_client.Index(pinecone_index_name)

    Weaviate

    import weaviate
    
    # For a locally running weaviate instance, use `weaviate.connect_to_local()`
    weaviate_client = weaviate.connect_to_weaviate_cloud(
        cluster_url=weaviate_cluster_url,
        auth_credentials=weaviate.auth.AuthApiKey(weaviate_api_key),
    )

    Croma

    from langchain_chroma import Chroma
    
    chromadb_client = Chroma(
        collection_name=chromadb_collection_name,
        embedding_function=embeddings_service,
        persist_directory=chromadb_path,
    )

    Qdrant

    from qdrant_client import QdrantClient
    
    qdrant_client = QdrantClient(path=qdrant_path)
    

    Milvus

    milvus_client = MilvusClient(uri=milvus_uri)
  2. Obter todos os dados da base de dados.

    Pinecone

    Obtenha IDs de vetores do índice do Pinecone:

    results = pinecone_index.list_paginated(
        prefix="", namespace=pinecone_namespace, limit=pinecone_batch_size
    )
    ids = [v.id for v in results.vectors]
    if ids:  # Prevents yielding an empty list.
        yield ids
    
    # Check BOTH pagination and pagination.next
    while results.pagination is not None and results.pagination.get("next") is not None:
        pagination_token = results.pagination.get("next")
        results = pinecone_index.list_paginated(
            prefix="",
            pagination_token=pagination_token,
            namespace=pinecone_namespace,
            limit=pinecone_batch_size,
        )
    
        # Extract and yield the next batch of IDs
        ids = [v.id for v in results.vectors]
        if ids:  # Prevents yielding an empty list.
            yield ids

    Em seguida, obtenha os registos por ID do índice do Pinecone:

    import uuid
    
    # Iterate through the IDs and download their contents
    for ids in id_iterator:
        all_data = pinecone_index.fetch(ids=ids, namespace=pinecone_namespace)
        ids = []
        embeddings = []
        contents = []
        metadatas = []
    
        # Process each vector in the current batch
        for doc in all_data["vectors"].values():
            # You might need to update this data translation logic according to one or more of your field names
            if pinecone_id_column_name in doc:
                # pinecone_id_column_name stores the unqiue identifier for the content
                ids.append(doc[pinecone_id_column_name])
            else:
                # Generate a uuid if pinecone_id_column_name is missing in source
                ids.append(str(uuid.uuid4()))
            # values is the vector embedding of the content
            embeddings.append(doc["values"])
            # Check if pinecone_content_column_name exists in metadata before accessing
            if pinecone_content_column_name in doc.metadata:
                # pinecone_content_column_name stores the content which was encoded
                contents.append(str(doc.metadata[pinecone_content_column_name]))
                # Remove pinecone_content_column_name after processing
                del doc.metadata[pinecone_content_column_name]
            else:
                # Handle the missing pinecone_content_column_name field appropriately
                contents.append("")
            # metadata is the additional context
            metadatas.append(doc["metadata"])
    
        # Yield the current batch of results
        yield ids, contents, embeddings, metadatas

    Weaviate

    # Iterate through the IDs and download their contents
    weaviate_collection = weaviate_client.collections.get(weaviate_collection_name)
    ids: list[str] = []
    content: list[Any] = []
    embeddings: list[list[float]] = []
    metadatas: list[Any] = []
    
    for item in weaviate_collection.iterator(include_vector=True):
        # You might need to update this data translation logic according to one or more of your field names
        # uuid is the unqiue identifier for the content
        ids.append(str(item.uuid))
        # weaviate_text_key is the content which was encoded
        content.append(item.properties[weaviate_text_key])
        # vector is the vector embedding of the content
        embeddings.append(item.vector["default"])  # type: ignore
        del item.properties[weaviate_text_key]  # type: ignore
        # properties is the additional context
        metadatas.append(item.properties)
    
        if len(ids) >= weaviate_batch_size:
            # Yield the current batch of results
            yield ids, content, embeddings, metadatas
            # Reset lists to start a new batch
            ids = []
            content = []
            embeddings = []
            metadatas = []

    Croma

    # Iterate through the IDs and download their contents
    offset = 0
    while True:
        # You might need to update this data translation logic according to one or more of your field names
        # documents is the content which was encoded
        # embeddings is the vector embedding of the content
        # metadatas is the additional context
        docs = chromadb_client.get(
            include=["metadatas", "documents", "embeddings"],
            limit=chromadb_batch_size,
            offset=offset,
        )
    
        if len(docs["documents"]) == 0:
            break
    
        # ids is the unqiue identifier for the content
        yield docs["ids"], docs["documents"], docs["embeddings"].tolist(), docs[
            "metadatas"
        ]
    
        offset += chromadb_batch_size
    

    Qdrant

    # Iterate through the IDs and download their contents
    offset = None
    while True:
        docs, offset = qdrant_client.scroll(
            collection_name=qdrant_collection_name,
            with_vectors=True,
            limit=qdrant_batch_size,
            offset=offset,
            with_payload=True,
        )
    
        ids: List[str] = []
        contents: List[Any] = []
        embeddings: List[List[float]] = []
        metadatas: List[Any] = []
    
        for doc in docs:
            if doc.payload and doc.vector:
                # You might need to update this data translation logic according to one or more of your field names
                # id is the unqiue identifier for the content
                ids.append(str(doc.id))
                # page_content is the content which was encoded
                contents.append(doc.payload["page_content"])
                # vector is the vector embedding of the content
                embeddings.append(doc.vector)  # type: ignore
                # metatdata is the additional context
                metadatas.append(doc.payload["metadata"])
    
        yield ids, contents, embeddings, metadatas
    
        if not offset:
            break
    

    Milvus

    # Iterate through the IDs and download their contents
    iterator = milvus_client.query_iterator(
        collection_name=milvus_collection_name,
        filter='pk >= "0"',
        output_fields=["pk", "text", "vector", "idv"],
        batch_size=milvus_batch_size,
    )
    
    while True:
        ids = []
        content = []
        embeddings = []
        metadatas = []
        page = iterator.next()
        if len(page) == 0:
            iterator.close()
            break
        for i in range(len(page)):
            # You might need to update this data translation logic according to one or more of your field names
            doc = page[i]
            # pk is the unqiue identifier for the content
            ids.append(doc["pk"])
            # text is the content which was encoded
            content.append(doc["text"])
            # vector is the vector embedding of the content
            embeddings.append(doc["vector"])
            del doc["pk"]
            del doc["text"]
            del doc["vector"]
            # doc is the additional context
            metadatas.append(doc)
        yield ids, content, embeddings, metadatas

Inicialize a tabela do AlloyDB

  1. Defina o serviço de incorporação.

    A interface VectorStore requer um serviço de incorporação. Este fluxo de trabalho não gera novas incorporações, pelo que a classe FakeEmbeddings é usada para evitar custos.

    Pinecone

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)

    Weaviate

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)

    Croma

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)

    Qdrant

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)

    Milvus

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)
  2. Prepare a tabela do AlloyDB.

    1. Ligue-se ao AlloyDB através de uma ligação de IP público. Para mais informações, consulte o artigo Especificar o tipo de endereço IP.

      Pinecone

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
          ip_type=IPTypes.PUBLIC,  # Optionally use IPTypes.PRIVATE
      )

      Weaviate

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
          ip_type=IPTypes.PUBLIC,
      )

      Croma

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
          ip_type=IPTypes.PUBLIC,
      )

      Qdrant

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
          ip_type=IPTypes.PUBLIC,
      )

      Milvus

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
          ip_type=IPTypes.PUBLIC,
      )
    2. Crie uma tabela para copiar os dados, se ainda não existir.

      Pinecone

      from langchain_google_alloydb_pg import Column
      
      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
          # Customize the ID column types if not using the UUID data type
          # id_column=Column("langchain_id", "TEXT"),  # Default is Column("langchain_id", "UUID")
          # overwrite_existing=True,  # Drop the old table and Create a new vector store table
      )

      Weaviate

      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
          # Customize the ID column types with `id_column` if not using the UUID data type
      )
      

      Croma

      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
          # Customize the ID column types with `id_column` if not using the UUID data type
      )

      Qdrant

      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
          # Customize the ID column types with `id_column` if not using the UUID data type
      )

      Milvus

      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
          # Customize the ID column types with `id_column` if not using the UUID data type
      )

Inicialize um objeto de banco de dados vetorial

Este código adiciona metadados de incorporação de vetores adicionais à coluna num formato JSON.langchain_metadata Para tornar a filtragem mais eficiente, organize estes metadados em colunas separadas. Para mais informações, consulte o artigo Crie um arquivo vetorial personalizado.

  1. Para inicializar um objeto de banco de dados vetorial, execute o seguinte comando:

    Pinecone

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )

    Weaviate

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )

    Croma

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )

    Qdrant

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )

    Milvus

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )
  2. Insira dados na tabela do AlloyDB:

    Pinecone

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

    Weaviate

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

    Croma

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

    Qdrant

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

    Milvus

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

Execute o script de migração

  1. Configure o ambiente Python.

  2. Instale as dependências de exemplo:

    pip install -r requirements.txt
  3. Execute a migração de exemplo.

    Pinecone

    python migrate_pinecone_vectorstore_to_alloydb.py

    Faça as seguintes substituições antes de executar o exemplo:

    • PINECONE_API_KEY: a chave da API Pinecone.
    • PINECONE_NAMESPACE: o espaço de nomes do Pinecone.
    • PINECONE_INDEX_NAME: o nome do índice do Pinecone.
    • PROJECT_ID: o ID do projeto.
    • REGION: a região na qual o cluster do AlloyDB está implementado.
    • CLUSTER: o nome do cluster.
    • INSTANCE: o nome da instância.
    • DB_NAME: o nome da base de dados.
    • DB_USER: o nome do utilizador da base de dados.
    • DB_PWD: a palavra-passe secreta da base de dados.

    Weaviate

    python migrate_weaviate_vectorstore_to_alloydb.py

    Faça as seguintes substituições antes de executar o exemplo:

    • WEAVIATE_API_KEY: a chave da API Weaviate.
    • WEAVIATE_CLUSTER_URL: o URL do cluster do Weaviate.
    • WEAVIATE_COLLECTION_NAME: o nome da coleção do Weaviate.
    • PROJECT_ID: o ID do projeto.
    • REGION: a região na qual o cluster do AlloyDB está implementado.
    • CLUSTER: o nome do cluster.
    • INSTANCE: o nome da instância.
    • DB_NAME: o nome da base de dados.
    • DB_USER: o nome do utilizador da base de dados.
    • DB_PWD: a palavra-passe secreta da base de dados.

    Croma

    python migrate_chromadb_vectorstore_to_alloydb.py

    Faça as seguintes substituições antes de executar o exemplo:

    • CHROMADB_PATH: o caminho da base de dados Chroma.
    • CHROMADB_COLLECTION_NAME: o nome da coleção da base de dados Chroma.
    • PROJECT_ID: o ID do projeto.
    • REGION: a região na qual o cluster do AlloyDB está implementado.
    • CLUSTER: o nome do cluster.
    • INSTANCE: o nome da instância.
    • DB_NAME: o nome da base de dados.
    • DB_USER: o nome do utilizador da base de dados.
    • DB_PWD: a palavra-passe secreta da base de dados.

    Qdrant

    python migrate_qdrant_vectorstore_to_alloydb.py

    Faça as seguintes substituições antes de executar o exemplo:

    • QDRANT_PATH: o caminho da base de dados Qdrant.
    • QDRANT_COLLECTION_NAME: o nome da coleção do Qdrant.
    • PROJECT_ID: o ID do projeto.
    • REGION: a região na qual o cluster do AlloyDB está implementado.
    • CLUSTER: o nome do cluster.
    • INSTANCE: o nome da instância.
    • DB_NAME: o nome da base de dados.
    • DB_USER: o nome do utilizador da base de dados.
    • DB_PWD: a palavra-passe secreta da base de dados.

    Milvus

    python migrate_milvus_vectorstore_to_alloydb.py

    Faça as seguintes substituições antes de executar o exemplo:

    • MILVUS_URI: o URI do Milvus.
    • MILVUS_COLLECTION_NAME: o nome da coleção do Milvus.
    • PROJECT_ID: o ID do projeto.
    • REGION: a região na qual o cluster do AlloyDB está implementado.
    • CLUSTER: o nome do cluster.
    • INSTANCE: o nome da instância.
    • DB_NAME: o nome da base de dados.
    • DB_USER: o nome do utilizador da base de dados.
    • DB_PWD: a palavra-passe secreta da base de dados.

    Uma migração bem-sucedida imprime registos semelhantes aos seguintes sem erros:
    Migration completed, inserted all the batches of data to AlloyDB

  4. Abra o AlloyDB Studio para ver os dados migrados. Para mais informações, consulte o artigo Faça a gestão dos seus dados com o AlloyDB Studio.

Limpar

Para evitar incorrer em custos na sua conta do Google Cloud pelos recursos usados neste tutorial, elimine o projeto que contém os recursos ou mantenha o projeto e elimine os recursos individuais.

  1. Na Google Cloud consola, aceda à página Clusters.

    Aceda a Clusters

  2. Na coluna Nome do recurso, clique no nome do cluster que criou.

  3. Clique em Eliminar cluster.

  4. Em Eliminar cluster, introduza o nome do cluster para confirmar que quer eliminar o cluster.

  5. Clique em Eliminar.

    Se criou uma ligação privada quando criou um cluster, elimine a ligação privada:

  6. Aceda à Google Cloud página Rededa consola e clique em Eliminar rede de VPC.

O que se segue?