Esegui la migrazione dei dati da un database vettoriale ad AlloyDB


Questo tutorial descrive come eseguire la migrazione dei dati da un database di vettori di terze parti ad AlloyDB per PostgreSQL utilizzando gli store di vettori LangChain. Sono supportati i seguenti database di vettori:

Questo tutorial presuppone che tu abbia familiarità con Google Cloud, AlloyDB e la programmazione Python asincrona.

Obiettivi

Questo tutorial illustra come:

  • Estrai i dati da un database vettoriale esistente.
  • Connettiti ad AlloyDB.
  • Inizializza la tabella AlloyDB.
  • Inizializza un oggetto vettore.
  • Esegui lo script di migrazione per inserire i dati.

Costi

In questo documento utilizzi i seguenti componenti fatturabili di Google Cloud:

Per generare una stima dei costi in base all'utilizzo previsto, utilizza il Calcolatore prezzi. I nuovi Google Cloud utenti potrebbero avere diritto a una prova gratuita.

Al termine delle attività descritte in questo documento, puoi evitare la fatturazione continua eliminando le risorse che hai creato. Per ulteriori informazioni, consulta la sezione Pulizia.

Prima di iniziare

Assicurati di avere uno dei seguenti vettori di database di terze parti di LangChain:

Abilita la fatturazione e le API richieste

  1. Nella console Google Cloud, nella pagina di selezione del progetto, seleziona o crea un Google Cloud progetto.

    Vai al selettore dei progetti

  2. Assicurati che la fatturazione sia attivata per il tuo Google Cloud progetto.

  3. Abilita le API Cloud necessarie per creare e connetterti ad AlloyDB per PostgreSQL.

    Abilita le API

    1. Nel passaggio Conferma progetto, fai clic su Avanti per confermare il nome del progetto a cui apporterai modifiche.
    2. Nel passaggio Abilita API, fai clic su Abilita per attivare quanto segue:

      • API AlloyDB
      • API Compute Engine
      • API Service Networking

Ruoli obbligatori

Per ottenere le autorizzazioni necessarie per completare le attività di questo tutorial, devi disporre dei seguenti ruoli IAM (Identity and Access Management) che consentono la tabella creazione e inserimento dei dati:

Se vuoi autenticarti al database utilizzando l'autenticazione IAM anziché l'autenticazione integrata in questo tutorial, utilizza il notebook che mostra come utilizzare AlloyDB per PostgreSQL per archiviare gli incorporamenti vettoriali con la classe AlloyDBVectorStore.

Crea un cluster e un utente AlloyDB

  1. Crea un cluster AlloyDB e un'istanza.
    • Abilita l'IP pubblico per eseguire questo tutorial da qualsiasi luogo. Se utilizzi un IP privato, devi eseguire questo tutorial all'interno della tua VPC.
  2. Crea o seleziona un utente del database AlloyDB.
    • Quando crei l'istanza, viene creato un utente postgres con una password. Questo utente dispone delle autorizzazioni super user.
    • Questo tutorial utilizza l'autenticazione integrata per ridurre eventuali problemi di autenticazione. L'autenticazione IAM è possibile utilizzando AlloyDBEngine.

Recuperare il esempio di codice

  1. Copia l'esempio di codice da GitHub clonando il repository:

    git clone https://github.com/googleapis/langchain-google-alloydb-pg-python.git
  2. Vai alla directory migrations:

    cd langchain-google-alloydb-pg-python/samples/migrations

Estrarre i dati da un database di vettori esistente

  1. Crea un client.

    Pigna

    from pinecone import Pinecone  # type: ignore
    
    pinecone_client = Pinecone(api_key=pinecone_api_key)
    pinecone_index = pinecone_client.Index(pinecone_index_name)

    Weaviate

    import weaviate
    
    # For a locally running weaviate instance, use `weaviate.connect_to_local()`
    weaviate_client = weaviate.connect_to_weaviate_cloud(
        cluster_url=weaviate_cluster_url,
        auth_credentials=weaviate.auth.AuthApiKey(weaviate_api_key),
    )

    Chroma

    from langchain_chroma import Chroma
    
    chromadb_client = Chroma(
        collection_name=chromadb_collection_name,
        embedding_function=embeddings_service,
        persist_directory=chromadb_path,
    )

    Qdrant

    from qdrant_client import QdrantClient
    
    qdrant_client = QdrantClient(path=qdrant_path)
    

    Milvus

    milvus_client = MilvusClient(uri=milvus_uri)
  2. Recupera tutti i dati dal database.

    Pigna

    Recupera gli ID vettore dall'indice Pinecone:

    results = pinecone_index.list_paginated(
        prefix="", namespace=pinecone_namespace, limit=pinecone_batch_size
    )
    ids = [v.id for v in results.vectors]
    yield ids
    
    while results.pagination is not None:
        pagination_token = results.pagination.next
        results = pinecone_index.list_paginated(
            prefix="", pagination_token=pagination_token, limit=pinecone_batch_size
        )
    
        # Extract and yield the next batch of IDs
        ids = [v.id for v in results.vectors]
        yield ids

    Quindi recupera i record per ID dall'indice Pinecone:

    # Iterate through the IDs and download their contents
    for ids in id_iterator:
        all_data = pinecone_index.fetch(ids=ids)
        ids = []
        embeddings = []
        contents = []
        metadatas = []
    
        # Process each vector in the current batch
        for doc in all_data["vectors"].values():
            ids.append(doc["id"])
            embeddings.append(doc["values"])
            contents.append(str(doc["metadata"]["text"]))
            del doc["metadata"]["text"]
            metadata = doc["metadata"]
            metadatas.append(metadata)
    
        # Yield the current batch of results
        yield ids, contents, embeddings, metadatas

    Weaviate

    # Iterate through the IDs and download their contents
    weaviate_collection = weaviate_client.collections.get(weaviate_collection_name)
    ids = []
    content = []
    embeddings = []
    metadatas = []
    
    for item in weaviate_collection.iterator(include_vector=True):
        ids.append(str(item.uuid))
        content.append(item.properties[weaviate_text_key])
        embeddings.append(item.vector["default"])
        del item.properties[weaviate_text_key]  # type: ignore
        metadatas.append(item.properties)
    
        if len(ids) >= weaviate_batch_size:
            # Yield the current batch of results
            yield ids, content, embeddings, metadatas
            # Reset lists to start a new batch
            ids = []
            content = []
            embeddings = []
            metadatas = []

    Chroma

    # Iterate through the IDs and download their contents
    offset = 0
    while True:
        docs = chromadb_client.get(
            include=["metadatas", "documents", "embeddings"],
            limit=chromadb_batch_size,
            offset=offset,
        )
    
        if len(docs["documents"]) == 0:
            break
    
        yield docs["ids"], docs["documents"], docs["embeddings"].tolist(), docs[
            "metadatas"
        ]
    
        offset += chromadb_batch_size
    

    Qdrant

    # Iterate through the IDs and download their contents
    offset = None
    while True:
        docs, offset = qdrant_client.scroll(
            collection_name=qdrant_collection_name,
            with_vectors=True,
            limit=qdrant_batch_size,
            offset=offset,
            with_payload=True,
        )
    
        ids: List[str] = []
        contents: List[Any] = []
        embeddings: List[List[float]] = []
        metadatas: List[Any] = []
    
        for doc in docs:
            if doc.payload and doc.vector:
                ids.append(str(doc.id))
                contents.append(doc.payload["page_content"])
                embeddings.append(doc.vector)  # type: ignore
                metadatas.append(doc.payload["metadata"])
    
        yield ids, contents, embeddings, metadatas
    
        if not offset:
            break
    

    Milvus

    # Iterate through the IDs and download their contents
    iterator = milvus_client.query_iterator(
        collection_name=milvus_collection_name,
        filter='pk >= "0"',
        output_fields=["pk", "text", "vector", "idv"],
        batch_size=milvus_batch_size,
    )
    
    while True:
        ids = []
        content = []
        embeddings = []
        metadatas = []
        page = iterator.next()
        if len(page) == 0:
            iterator.close()
            break
        for i in range(len(page)):
            doc = page[i]
            ids.append(doc["pk"])
            content.append(doc["text"])
            embeddings.append(doc["vector"])
            del doc["pk"]
            del doc["text"]
            del doc["vector"]
            metadatas.append(doc)
        yield ids, content, embeddings, metadatas

Inizializza la tabella AlloyDB

  1. Definisci il servizio di embedding.

    L'interfaccia VectorStore richiede un servizio di embedding. Questo flusso di lavoro non genera nuovi embedding, pertanto viene utilizzata la classe FakeEmbeddings per evitare costi.

    Pigna

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)

    Weaviate

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)

    Chroma

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)

    Qdrant

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)

    Milvus

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)
  2. Prepara la tabella AlloyDB.

    1. Connettiti ad AlloyDB utilizzando una connessione IP pubblico. Per ulteriori informazioni, consulta Specificare il tipo di indirizzo IP.

      Pigna

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
      )

      Weaviate

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
      )

      Chroma

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
      )

      Qdrant

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
      )

      Milvus

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
      )
    2. Crea una tabella in cui copiare i dati, se non esiste già.

      Pigna

      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
      )

      Weaviate

      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
      )
      

      Chroma

      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
      )

      Qdrant

      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
      )

      Milvus

      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
      )

Inizializzare un oggetto del negozio di vettori

Questo codice aggiunge ulteriori metadati di embedding di vettori alla colonna langchain_metadata in formato JSON. Per rendere il filtro più efficiente, organizza questi metadati in colonne separate. Per ulteriori informazioni, vedi Creare un negozio di vektori personalizzato.

  1. Per inizializzare un oggetto dello spazio vettoriale, esegui il seguente comando:

    Pigna

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )

    Weaviate

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )

    Chroma

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )

    Qdrant

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )

    Milvus

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )
  2. Inserisci i dati nella tabella AlloyDB:

    Pigna

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

    Weaviate

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

    Chroma

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

    Qdrant

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

    Milvus

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

Esegui lo script di migrazione

  1. Configura l'ambiente Python.

  2. Installa le dipendenze di esempio:

    pip install -r requirements.txt
  3. Esegui la migrazione di esempio.

    Pigna

    python migrate_pinecone_vectorstore_to_alloydb.py

    Apporta le seguenti sostituzioni prima di eseguire l'esempio:

    • PINECONE_API_KEY: la chiave API Pinecone.
    • PINECONE_NAMESPACE: lo spazio dei nomi Pinecone.
    • PINECONE_INDEX_NAME: il nome dell'indice Pinecone.
    • PROJECT_ID: l'ID progetto.
    • REGION: la regione in cui è dipiegato il cluster AlloyDB.
    • CLUSTER: il nome del cluster.
    • INSTANCE: il nome dell'istanza.
    • DB_NAME: il nome del database.
    • DB_USER: il nome dell'utente del database.
    • DB_PWD: la password del secret del database.

    Weaviate

    python migrate_weaviate_vectorstore_to_alloydb.py

    Apporta le seguenti sostituzioni prima di eseguire l'esempio:

    • WEAVIATE_API_KEY: la chiave API Weaviate.
    • WEAVIATE_CLUSTER_URL: l'URL del cluster Weaviate.
    • WEAVIATE_COLLECTION_NAME: il nome della raccolta Weaviate.
    • PROJECT_ID: l'ID progetto.
    • REGION: la regione in cui è dipiegato il cluster AlloyDB.
    • CLUSTER: il nome del cluster.
    • INSTANCE: il nome dell'istanza.
    • DB_NAME: il nome del database.
    • DB_USER: il nome dell'utente del database.
    • DB_PWD: la password del secret del database.

    Chroma

    python migrate_chromadb_vectorstore_to_alloydb.py

    Apporta le seguenti sostituzioni prima di eseguire l'esempio:

    • CHROMADB_PATH: il percorso del database Chroma.
    • CHROMADB_COLLECTION_NAME: il nome della raccolta del database Chroma.
    • PROJECT_ID: l'ID progetto.
    • REGION: la regione in cui è dipiegato il cluster AlloyDB.
    • CLUSTER: il nome del cluster.
    • INSTANCE: il nome dell'istanza.
    • DB_NAME: il nome del database.
    • DB_USER: il nome dell'utente del database.
    • DB_PWD: la password del secret del database.

    Qdrant

    python migrate_qdrant_vectorstore_to_alloydb.py

    Apporta le seguenti sostituzioni prima di eseguire l'esempio:

    • QDRANT_PATH: il percorso del database Qdrant.
    • QDRANT_COLLECTION_NAME: il nome della raccolta Qdrant.
    • PROJECT_ID: l'ID progetto.
    • REGION: la regione in cui è dipiegato il cluster AlloyDB.
    • CLUSTER: il nome del cluster.
    • INSTANCE: il nome dell'istanza.
    • DB_NAME: il nome del database.
    • DB_USER: il nome dell'utente del database.
    • DB_PWD: la password del secret del database.

    Milvus

    python migrate_milvus_vectorstore_to_alloydb.py

    Apporta le seguenti sostituzioni prima di eseguire l'esempio:

    • MILVUS_URI: l'URI di Milvus.
    • MILVUS_COLLECTION_NAME: il nome della raccolta Milvus.
    • PROJECT_ID: l'ID progetto.
    • REGION: la regione in cui è dipiegato il cluster AlloyDB.
    • CLUSTER: il nome del cluster.
    • INSTANCE: il nome dell'istanza.
    • DB_NAME: il nome del database.
    • DB_USER: il nome dell'utente del database.
    • DB_PWD: la password del secret del database.

    Una migrazione riuscita stampa log simili al seguente senza errori:
    Migration completed, inserted all the batches of data to AlloyDB

  4. Apri AlloyDB Studio per visualizzare i dati di cui è stata eseguita la migrazione. Per ulteriori informazioni, consulta Gestire i dati utilizzando AlloyDB Studio.

Esegui la pulizia

Per evitare che al tuo account Google Cloud vengano addebitati costi relativi alle risorse utilizzate in questo tutorial, elimina il progetto che contiene le risorse oppure mantieni il progetto ed elimina le singole risorse.

  1. Nella console Google Cloud, vai alla pagina Cluster.

    Vai a Cluster

  2. Nella colonna Nome risorsa, fai clic sul nome del cluster che hai creato.

  3. Fai clic su Elimina cluster.

  4. In Elimina cluster, inserisci il nome del cluster per confermare che vuoi eliminarlo.

  5. Fai clic su Elimina.

    Se hai creato una connessione privata quando hai creato un cluster, eliminala:

  6. Vai alla pagina Networking della console Google Cloud e fai clic su Elimina rete VPC.

Passaggi successivi