Migrar dados de um banco de dados de vetores para o AlloyDB


Este tutorial descreve como migrar dados de um banco de dados de vetores de terceiros para o AlloyDB para PostgreSQL usando as lojas de vetores LangChain. Os seguintes bancos de dados de vetores são compatíveis:

Neste tutorial, presumimos que você já conheça Google Cloud, AlloyDB e a programação assíncrona do Python.

Objetivos

Este tutorial mostra como fazer o seguinte:

  • Extrair dados de um banco de dados de vetores.
  • Conecte-se ao AlloyDB.
  • Inicialize a tabela do AlloyDB.
  • Inicialize um objeto de armazenamento de vetores.
  • Execute o script de migração para inserir os dados.

Custos

Neste documento, você usará os seguintes componentes faturáveis do Google Cloud:

Para gerar uma estimativa de custo baseada na projeção de uso deste tutorial, use a calculadora de preços. Novos usuários do Google Cloud podem estar qualificados para uma avaliação gratuita.

Ao concluir as tarefas descritas neste documento, é possível evitar o faturamento contínuo excluindo os recursos criados. Saiba mais em Limpeza.

Antes de começar

Verifique se você tem uma das seguintes lojas de vetores de banco de dados de terceiros do LangChain:

Ativar o faturamento e as APIs necessárias

  1. No console do Google Cloud, na página do seletor de projetos, selecione ou crie um projetoGoogle Cloud .

    Acessar o seletor de projetos

  2. Verifique se o faturamento foi ativado para o Google Cloud projeto.

  3. Ative as APIs do Cloud necessárias para criar e se conectar ao AlloyDB para PostgreSQL.

    Ativar as APIs

    1. Na etapa Confirmar projeto, clique em Próxima para confirmar o nome do projeto em que você vai fazer mudanças.
    2. Na etapa Ativar APIs, clique em Ativar para ativar o seguinte:

      • API AlloyDB
      • API Compute Engine
      • API Service Networking

Funções exigidas

Para receber as permissões necessárias para concluir as tarefas neste tutorial, tenha os seguintes papéis de Identity and Access Management (IAM) que permitem a criação de tabelas e a inserção de dados:

  • Proprietário (roles/owner) ou editor (roles/editor)
  • Se o usuário não for proprietário ou editor, os seguintes papéis do IAM e privilégios do PostgreSQL serão necessários:

Se você quiser fazer a autenticação no seu banco de dados usando a autenticação do IAM em vez da autenticação integrada neste tutorial, use o notebook que mostra como usar o AlloyDB para PostgreSQL para armazenar embeddings de vetores com a classe AlloyDBVectorStore.

Criar um cluster e um usuário do AlloyDB

  1. Crie um cluster do AlloyDB e uma instância.
    • Ative o IP público para executar este tutorial de qualquer lugar. Se você estiver usando o IP particular, execute este tutorial na VPC.
  2. Crie ou selecione um usuário do banco de dados do AlloyDB.
    • Quando você cria a instância, um usuário postgres é criado com uma senha. Esse usuário tem permissões de superusuário.
    • Este tutorial usa a autenticação integrada para reduzir qualquer fricção de autenticação. A autenticação do IAM é possível usando o AlloyDBEngine.

Recuperar a amostra de código

  1. Clone o repositório para copiar o exemplo de código do GitHub:

    git clone https://github.com/googleapis/langchain-google-alloydb-pg-python.git
  2. Navegue até o diretório migrations:

    cd langchain-google-alloydb-pg-python/samples/migrations

Extrair dados de um banco de dados de vetores

  1. Criar um cliente.

    Pinecone

    from pinecone import Pinecone  # type: ignore
    
    pinecone_client = Pinecone(api_key=pinecone_api_key)
    pinecone_index = pinecone_client.Index(pinecone_index_name)

    Weaviate

    import weaviate
    
    # For a locally running weaviate instance, use `weaviate.connect_to_local()`
    weaviate_client = weaviate.connect_to_weaviate_cloud(
        cluster_url=weaviate_cluster_url,
        auth_credentials=weaviate.auth.AuthApiKey(weaviate_api_key),
    )

    Chroma

    from langchain_chroma import Chroma
    
    chromadb_client = Chroma(
        collection_name=chromadb_collection_name,
        embedding_function=embeddings_service,
        persist_directory=chromadb_path,
    )

    Qdrant

    from qdrant_client import QdrantClient
    
    qdrant_client = QdrantClient(path=qdrant_path)
    

    Milvus

    milvus_client = MilvusClient(uri=milvus_uri)
  2. Receba todos os dados do banco de dados.

    Pinecone

    Extraia IDs de vetores do índice da Pinecone:

    results = pinecone_index.list_paginated(
        prefix="", namespace=pinecone_namespace, limit=pinecone_batch_size
    )
    ids = [v.id for v in results.vectors]
    yield ids
    
    while results.pagination is not None:
        pagination_token = results.pagination.next
        results = pinecone_index.list_paginated(
            prefix="", pagination_token=pagination_token, limit=pinecone_batch_size
        )
    
        # Extract and yield the next batch of IDs
        ids = [v.id for v in results.vectors]
        yield ids

    Em seguida, busque os registros por ID no índice da Pinecone:

    # Iterate through the IDs and download their contents
    for ids in id_iterator:
        all_data = pinecone_index.fetch(ids=ids)
        ids = []
        embeddings = []
        contents = []
        metadatas = []
    
        # Process each vector in the current batch
        for doc in all_data["vectors"].values():
            ids.append(doc["id"])
            embeddings.append(doc["values"])
            contents.append(str(doc["metadata"]["text"]))
            del doc["metadata"]["text"]
            metadata = doc["metadata"]
            metadatas.append(metadata)
    
        # Yield the current batch of results
        yield ids, contents, embeddings, metadatas

    Weaviate

    # Iterate through the IDs and download their contents
    weaviate_collection = weaviate_client.collections.get(weaviate_collection_name)
    ids = []
    content = []
    embeddings = []
    metadatas = []
    
    for item in weaviate_collection.iterator(include_vector=True):
        ids.append(str(item.uuid))
        content.append(item.properties[weaviate_text_key])
        embeddings.append(item.vector["default"])
        del item.properties[weaviate_text_key]  # type: ignore
        metadatas.append(item.properties)
    
        if len(ids) >= weaviate_batch_size:
            # Yield the current batch of results
            yield ids, content, embeddings, metadatas
            # Reset lists to start a new batch
            ids = []
            content = []
            embeddings = []
            metadatas = []

    Chroma

    # Iterate through the IDs and download their contents
    offset = 0
    while True:
        docs = chromadb_client.get(
            include=["metadatas", "documents", "embeddings"],
            limit=chromadb_batch_size,
            offset=offset,
        )
    
        if len(docs["documents"]) == 0:
            break
    
        yield docs["ids"], docs["documents"], docs["embeddings"].tolist(), docs[
            "metadatas"
        ]
    
        offset += chromadb_batch_size
    

    Qdrant

    # Iterate through the IDs and download their contents
    offset = None
    while True:
        docs, offset = qdrant_client.scroll(
            collection_name=qdrant_collection_name,
            with_vectors=True,
            limit=qdrant_batch_size,
            offset=offset,
            with_payload=True,
        )
    
        ids: List[str] = []
        contents: List[Any] = []
        embeddings: List[List[float]] = []
        metadatas: List[Any] = []
    
        for doc in docs:
            if doc.payload and doc.vector:
                ids.append(str(doc.id))
                contents.append(doc.payload["page_content"])
                embeddings.append(doc.vector)  # type: ignore
                metadatas.append(doc.payload["metadata"])
    
        yield ids, contents, embeddings, metadatas
    
        if not offset:
            break
    

    Milvus

    # Iterate through the IDs and download their contents
    iterator = milvus_client.query_iterator(
        collection_name=milvus_collection_name,
        filter='pk >= "0"',
        output_fields=["pk", "text", "vector", "idv"],
        batch_size=milvus_batch_size,
    )
    
    while True:
        ids = []
        content = []
        embeddings = []
        metadatas = []
        page = iterator.next()
        if len(page) == 0:
            iterator.close()
            break
        for i in range(len(page)):
            doc = page[i]
            ids.append(doc["pk"])
            content.append(doc["text"])
            embeddings.append(doc["vector"])
            del doc["pk"]
            del doc["text"]
            del doc["vector"]
            metadatas.append(doc)
        yield ids, content, embeddings, metadatas

Inicializar a tabela do AlloyDB

  1. Defina o serviço de incorporação.

    A interface VectorStore requer um serviço de incorporação. Esse fluxo de trabalho não gera novas embeddings. Portanto, a classe FakeEmbeddings é usada para evitar custos.

    Pinecone

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)

    Weaviate

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)

    Chroma

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)

    Qdrant

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)

    Milvus

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)
  2. Prepare a tabela do AlloyDB.

    1. Conecte-se ao AlloyDB usando uma conexão de IP público. Para mais informações, consulte Como especificar o tipo de endereço IP.

      Pinecone

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
      )

      Weaviate

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
      )

      Chroma

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
      )

      Qdrant

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
      )

      Milvus

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
      )
    2. Crie uma tabela para copiar os dados, caso ela ainda não exista.

      Pinecone

      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
      )

      Weaviate

      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
      )
      

      Chroma

      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
      )

      Qdrant

      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
      )

      Milvus

      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
      )

Inicializar um objeto de armazenamento de vetores

Esse código adiciona metadados de incorporação de vetores adicionais à coluna langchain_metadata em um formato JSON. Para tornar a filtragem mais eficiente, organize esses metadados em colunas separadas. Para mais informações, consulte Criar uma loja de vetores personalizada.

  1. Para inicializar um objeto de armazenamento de vetores, execute o seguinte comando:

    Pinecone

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )

    Weaviate

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )

    Chroma

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )

    Qdrant

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )

    Milvus

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )
  2. Insira dados na tabela do AlloyDB:

    Pinecone

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

    Weaviate

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

    Chroma

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

    Qdrant

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

    Milvus

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

Executar o script de migração

  1. Configure o ambiente Python.

  2. Instale as dependências de amostra:

    pip install -r requirements.txt
  3. Execute a migração de exemplo.

    Pinecone

    python migrate_pinecone_vectorstore_to_alloydb.py

    Faça as seguintes substituições antes de executar o exemplo:

    • PINECONE_API_KEY: a chave da API Pinecone.
    • PINECONE_NAMESPACE: o namespace da Pinecone.
    • PINECONE_INDEX_NAME: o nome do índice da Pinecone.
    • PROJECT_ID: o ID do projeto;
    • REGION: a região em que o cluster do AlloyDB é implantado.
    • CLUSTER: o nome do cluster.
    • INSTANCE: o nome da instância.
    • DB_NAME: o nome do banco de dados.
    • DB_USER: o nome do usuário do banco de dados.
    • DB_PWD: a senha secreta do banco de dados.

    Weaviate

    python migrate_weaviate_vectorstore_to_alloydb.py

    Faça as seguintes substituições antes de executar o exemplo:

    • WEAVIATE_API_KEY: a chave da API Weaviate.
    • WEAVIATE_CLUSTER_URL: o URL do cluster do Weaviate.
    • WEAVIATE_COLLECTION_NAME: o nome da coletânea do Weaviate.
    • PROJECT_ID: o ID do projeto;
    • REGION: a região em que o cluster do AlloyDB é implantado.
    • CLUSTER: o nome do cluster.
    • INSTANCE: o nome da instância.
    • DB_NAME: o nome do banco de dados.
    • DB_USER: o nome do usuário do banco de dados.
    • DB_PWD: a senha secreta do banco de dados.

    Chroma

    python migrate_chromadb_vectorstore_to_alloydb.py

    Faça as seguintes substituições antes de executar o exemplo:

    • CHROMADB_PATH: o caminho do banco de dados do Chroma.
    • CHROMADB_COLLECTION_NAME: o nome da coleção de banco de dados do Chroma.
    • PROJECT_ID: o ID do projeto;
    • REGION: a região em que o cluster do AlloyDB é implantado.
    • CLUSTER: o nome do cluster.
    • INSTANCE: o nome da instância.
    • DB_NAME: o nome do banco de dados.
    • DB_USER: o nome do usuário do banco de dados.
    • DB_PWD: a senha secreta do banco de dados.

    Qdrant

    python migrate_qdrant_vectorstore_to_alloydb.py

    Faça as seguintes substituições antes de executar o exemplo:

    • QDRANT_PATH: o caminho do banco de dados do Qdrant.
    • QDRANT_COLLECTION_NAME: o nome da coleção do Qdrant.
    • PROJECT_ID: o ID do projeto;
    • REGION: a região em que o cluster do AlloyDB é implantado.
    • CLUSTER: o nome do cluster.
    • INSTANCE: o nome da instância.
    • DB_NAME: o nome do banco de dados.
    • DB_USER: o nome do usuário do banco de dados.
    • DB_PWD: a senha secreta do banco de dados.

    Milvus

    python migrate_milvus_vectorstore_to_alloydb.py

    Faça as seguintes substituições antes de executar o exemplo:

    • MILVUS_URI: o URI do Milvus.
    • MILVUS_COLLECTION_NAME: o nome da coleção Milvus.
    • PROJECT_ID: o ID do projeto;
    • REGION: a região em que o cluster do AlloyDB é implantado.
    • CLUSTER: o nome do cluster.
    • INSTANCE: o nome da instância.
    • DB_NAME: o nome do banco de dados.
    • DB_USER: o nome do usuário do banco de dados.
    • DB_PWD: a senha secreta do banco de dados.

    Uma migração bem-sucedida imprime registros semelhantes a estes sem erros:
    Migration completed, inserted all the batches of data to AlloyDB

  4. Abra o AlloyDB Studio para conferir os dados migrados. Para mais informações, consulte Gerenciar dados com o AlloyDB Studio.

Limpar

Para evitar cobranças na sua conta do Google Cloud pelos recursos usados no tutorial, exclua o projeto que os contém ou mantenha o projeto e exclua os recursos individuais.

  1. No console do Google Cloud, acesse a página Clusters.

    Acessar Clusters

  2. Na coluna Nome do recurso, clique no nome do cluster que você criou.

  3. Clique em Excluir cluster.

  4. Em Excluir cluster, insira o nome do cluster para confirmar que você quer excluir o cluster.

  5. Clique em Excluir.

    Se você criou uma conexão particular ao criar um cluster, exclua a conexão particular:

  6. Acesse a página de rede do console do Google Cloud e clique em Excluir rede VPC.

A seguir