Migra datos de una base de datos vectorial a AlloyDB


En este instructivo, se describe cómo migrar datos de una base de datos vectorial de terceros a AlloyDB para PostgreSQL con los almacenes de vectores de LangChain. Se admiten las siguientes bases de datos vectoriales:

En este instructivo, se da por sentado que conoces Google Cloud, AlloyDB y la programación asíncrona de Python.

Objetivos

En este instructivo, se muestra cómo realizar lo siguiente:

  • Extraer datos de una base de datos de vectores existente
  • Conéctate a AlloyDB.
  • Inicializa la tabla de AlloyDB.
  • Inicializa un objeto de tienda de vectores.
  • Ejecuta la secuencia de comandos de migración para insertar los datos.

Costos

En este documento, usarás los siguientes componentes facturables de Google Cloud:

Para generar una estimación de costos en función del uso previsto, usa la calculadora de precios. Es posible que los usuarios nuevos de Google Cloud califiquen para obtener una prueba gratuita.

Cuando finalices las tareas que se describen en este documento, puedes borrar los recursos que creaste para evitar que continúe la facturación. Para obtener más información, consulta Cómo realizar una limpieza.

Antes de comenzar

Asegúrate de tener uno de los siguientes almacenes de vectores de bases de datos de terceros de LangChain:

Habilita la facturación y las APIs necesarias

  1. En la página del selector de proyectos de la consola de Google Cloud, selecciona o crea un proyecto deGoogle Cloud .

    Ir al selector de proyectos

  2. Asegúrate de tener habilitada la facturación para tu Google Cloud proyecto.

  3. Habilita las API de Cloud necesarias para crear AlloyDB para PostgreSQL y conectarte a él.

    Habilita las APIs

    1. En el paso Confirmar proyecto, haz clic en Siguiente para confirmar el nombre del proyecto en el que realizarás cambios.
    2. En el paso Habilitar APIs, haz clic en Habilitar para habilitar lo siguiente:

      • API de AlloyDB
      • API de Compute Engine
      • API de Service Networking

Roles obligatorios

Para obtener los permisos que necesitas para completar las tareas de este instructivo, debes tener los siguientes roles de Identity and Access Management (IAM) que permiten la creación de tablas y la inserción de datos:

  • Propietario (roles/owner) o editor (roles/editor)
  • Si el usuario no es propietario ni editor, se requieren los siguientes roles de IAM y privilegios de PostgreSQL:

Si deseas autenticarte en tu base de datos con la autenticación de IAM en lugar de usar la autenticación integrada en este instructivo, usa el notebook que muestra cómo usar AlloyDB para PostgreSQL para almacenar incorporaciones vectoriales con la clase AlloyDBVectorStore.

Crea un clúster y un usuario de AlloyDB

  1. Crea un clúster y una instancia de AlloyDB.
    • Habilita la IP pública para ejecutar este instructivo desde cualquier lugar. Si usas IP privada, debes ejecutar este instructivo desde tu VPC.
  2. Crea o selecciona un usuario de la base de datos de AlloyDB.
    • Cuando creas la instancia, se crea un usuario postgres con una contraseña. Este usuario tiene permisos de superusuario.
    • En este instructivo, se usa la autenticación integrada para reducir cualquier inconveniente de autenticación. La autenticación de IAM es posible con AlloyDBEngine.

Recupera la muestra de código

  1. Clona el repositorio para copiar la muestra de código de GitHub:

    git clone https://github.com/googleapis/langchain-google-alloydb-pg-python.git
  2. Navega al directorio migrations:

    cd langchain-google-alloydb-pg-python/samples/migrations

Extrae datos de una base de datos de vectores existente

  1. Crea un cliente.

    Pinecone

    from pinecone import Pinecone  # type: ignore
    
    pinecone_client = Pinecone(api_key=pinecone_api_key)
    pinecone_index = pinecone_client.Index(pinecone_index_name)

    Weaviate

    import weaviate
    
    # For a locally running weaviate instance, use `weaviate.connect_to_local()`
    weaviate_client = weaviate.connect_to_weaviate_cloud(
        cluster_url=weaviate_cluster_url,
        auth_credentials=weaviate.auth.AuthApiKey(weaviate_api_key),
    )

    Croma

    from langchain_chroma import Chroma
    
    chromadb_client = Chroma(
        collection_name=chromadb_collection_name,
        embedding_function=embeddings_service,
        persist_directory=chromadb_path,
    )

    Qdrant

    from qdrant_client import QdrantClient
    
    qdrant_client = QdrantClient(path=qdrant_path)
    

    Milvus

    milvus_client = MilvusClient(uri=milvus_uri)
  2. Obtén todos los datos de la base de datos.

    Pinecone

    Recupera los IDs de vectores del índice de Pinecone:

    results = pinecone_index.list_paginated(
        prefix="", namespace=pinecone_namespace, limit=pinecone_batch_size
    )
    ids = [v.id for v in results.vectors]
    yield ids
    
    while results.pagination is not None:
        pagination_token = results.pagination.next
        results = pinecone_index.list_paginated(
            prefix="", pagination_token=pagination_token, limit=pinecone_batch_size
        )
    
        # Extract and yield the next batch of IDs
        ids = [v.id for v in results.vectors]
        yield ids

    Luego, recupera registros por ID del índice de Pinecone:

    # Iterate through the IDs and download their contents
    for ids in id_iterator:
        all_data = pinecone_index.fetch(ids=ids)
        ids = []
        embeddings = []
        contents = []
        metadatas = []
    
        # Process each vector in the current batch
        for doc in all_data["vectors"].values():
            ids.append(doc["id"])
            embeddings.append(doc["values"])
            contents.append(str(doc["metadata"]["text"]))
            del doc["metadata"]["text"]
            metadata = doc["metadata"]
            metadatas.append(metadata)
    
        # Yield the current batch of results
        yield ids, contents, embeddings, metadatas

    Weaviate

    # Iterate through the IDs and download their contents
    weaviate_collection = weaviate_client.collections.get(weaviate_collection_name)
    ids = []
    content = []
    embeddings = []
    metadatas = []
    
    for item in weaviate_collection.iterator(include_vector=True):
        ids.append(str(item.uuid))
        content.append(item.properties[weaviate_text_key])
        embeddings.append(item.vector["default"])
        del item.properties[weaviate_text_key]  # type: ignore
        metadatas.append(item.properties)
    
        if len(ids) >= weaviate_batch_size:
            # Yield the current batch of results
            yield ids, content, embeddings, metadatas
            # Reset lists to start a new batch
            ids = []
            content = []
            embeddings = []
            metadatas = []

    Croma

    # Iterate through the IDs and download their contents
    offset = 0
    while True:
        docs = chromadb_client.get(
            include=["metadatas", "documents", "embeddings"],
            limit=chromadb_batch_size,
            offset=offset,
        )
    
        if len(docs["documents"]) == 0:
            break
    
        yield docs["ids"], docs["documents"], docs["embeddings"].tolist(), docs[
            "metadatas"
        ]
    
        offset += chromadb_batch_size
    

    Qdrant

    # Iterate through the IDs and download their contents
    offset = None
    while True:
        docs, offset = qdrant_client.scroll(
            collection_name=qdrant_collection_name,
            with_vectors=True,
            limit=qdrant_batch_size,
            offset=offset,
            with_payload=True,
        )
    
        ids: List[str] = []
        contents: List[Any] = []
        embeddings: List[List[float]] = []
        metadatas: List[Any] = []
    
        for doc in docs:
            if doc.payload and doc.vector:
                ids.append(str(doc.id))
                contents.append(doc.payload["page_content"])
                embeddings.append(doc.vector)  # type: ignore
                metadatas.append(doc.payload["metadata"])
    
        yield ids, contents, embeddings, metadatas
    
        if not offset:
            break
    

    Milvus

    # Iterate through the IDs and download their contents
    iterator = milvus_client.query_iterator(
        collection_name=milvus_collection_name,
        filter='pk >= "0"',
        output_fields=["pk", "text", "vector", "idv"],
        batch_size=milvus_batch_size,
    )
    
    while True:
        ids = []
        content = []
        embeddings = []
        metadatas = []
        page = iterator.next()
        if len(page) == 0:
            iterator.close()
            break
        for i in range(len(page)):
            doc = page[i]
            ids.append(doc["pk"])
            content.append(doc["text"])
            embeddings.append(doc["vector"])
            del doc["pk"]
            del doc["text"]
            del doc["vector"]
            metadatas.append(doc)
        yield ids, content, embeddings, metadatas

Inicializa la tabla de AlloyDB

  1. Define el servicio de incorporación.

    La interfaz de VectorStore requiere un servicio de incorporación. Este flujo de trabajo no genera incorporaciones nuevas, por lo que se usa la clase FakeEmbeddings para evitar cualquier costo.

    Pinecone

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)

    Weaviate

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)

    Croma

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)

    Qdrant

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)

    Milvus

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)
  2. Prepara la tabla de AlloyDB.

    1. Conéctate a AlloyDB con una conexión de IP pública. Para obtener más información, consulta Especifica el tipo de dirección IP.

      Pinecone

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
      )

      Weaviate

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
      )

      Croma

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
      )

      Qdrant

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
      )

      Milvus

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
      )
    2. Crea una tabla en la que copiar los datos, si aún no existe.

      Pinecone

      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
      )

      Weaviate

      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
      )
      

      Croma

      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
      )

      Qdrant

      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
      )

      Milvus

      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
      )

Cómo inicializar un objeto de tienda de vectores

Este código agrega metadatos de incorporación de vectores adicionales a la columna langchain_metadata en formato JSON. Para que el filtrado sea más eficiente, organiza estos metadatos en columnas separadas. Para obtener más información, consulta Cómo crear una tienda de vectores personalizada.

  1. Para inicializar un objeto de almacén de vectores, ejecuta el siguiente comando:

    Pinecone

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )

    Weaviate

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )

    Croma

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )

    Qdrant

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )

    Milvus

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )
  2. Inserta datos en la tabla de AlloyDB:

    Pinecone

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

    Weaviate

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

    Croma

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

    Qdrant

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

    Milvus

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

Ejecuta la secuencia de comandos de migración

  1. Configura el entorno de Python.

  2. Instala las dependencias de muestra:

    pip install -r requirements.txt
  3. Ejecuta la migración de ejemplo.

    Pinecone

    python migrate_pinecone_vectorstore_to_alloydb.py

    Realiza los siguientes reemplazos antes de ejecutar la muestra:

    • PINECONE_API_KEY: La clave de API de Pinecone.
    • PINECONE_NAMESPACE: Es el espacio de nombres de Pinecone.
    • PINECONE_INDEX_NAME: Es el nombre del índice Pinecone.
    • PROJECT_ID: El ID del proyecto.
    • REGION: Es la región en la que se implementa el clúster de AlloyDB.
    • CLUSTER: el nombre del clúster
    • INSTANCE: El nombre de la instancia.
    • DB_NAME: Es el nombre de la base de datos.
    • DB_USER: Es el nombre del usuario de la base de datos.
    • DB_PWD: La contraseña secreta de la base de datos.

    Weaviate

    python migrate_weaviate_vectorstore_to_alloydb.py

    Realiza los siguientes reemplazos antes de ejecutar la muestra:

    • WEAVIATE_API_KEY: La clave de API de Weaviate.
    • WEAVIATE_CLUSTER_URL: La URL del clúster de Weaviate.
    • WEAVIATE_COLLECTION_NAME: Es el nombre de la colección de Weaviate.
    • PROJECT_ID: El ID del proyecto.
    • REGION: Es la región en la que se implementa el clúster de AlloyDB.
    • CLUSTER: el nombre del clúster
    • INSTANCE: El nombre de la instancia.
    • DB_NAME: Es el nombre de la base de datos.
    • DB_USER: Es el nombre del usuario de la base de datos.
    • DB_PWD: La contraseña secreta de la base de datos.

    Croma

    python migrate_chromadb_vectorstore_to_alloydb.py

    Realiza los siguientes reemplazos antes de ejecutar la muestra:

    • CHROMADB_PATH: Es la ruta de acceso de la base de datos de Chroma.
    • CHROMADB_COLLECTION_NAME: Es el nombre de la colección de la base de datos de Chroma.
    • PROJECT_ID: El ID del proyecto.
    • REGION: Es la región en la que se implementa el clúster de AlloyDB.
    • CLUSTER: el nombre del clúster
    • INSTANCE: El nombre de la instancia.
    • DB_NAME: Es el nombre de la base de datos.
    • DB_USER: Es el nombre del usuario de la base de datos.
    • DB_PWD: La contraseña secreta de la base de datos.

    Qdrant

    python migrate_qdrant_vectorstore_to_alloydb.py

    Realiza los siguientes reemplazos antes de ejecutar la muestra:

    • QDRANT_PATH: Es la ruta de acceso de la base de datos de Qdrant.
    • QDRANT_COLLECTION_NAME: Es el nombre de la colección de Qdrant.
    • PROJECT_ID: El ID del proyecto.
    • REGION: Es la región en la que se implementa el clúster de AlloyDB.
    • CLUSTER: el nombre del clúster
    • INSTANCE: El nombre de la instancia.
    • DB_NAME: Es el nombre de la base de datos.
    • DB_USER: Es el nombre del usuario de la base de datos.
    • DB_PWD: La contraseña secreta de la base de datos.

    Milvus

    python migrate_milvus_vectorstore_to_alloydb.py

    Realiza los siguientes reemplazos antes de ejecutar la muestra:

    • MILVUS_URI: Es el URI de Milvus.
    • MILVUS_COLLECTION_NAME: Es el nombre de la colección de Milvus.
    • PROJECT_ID: El ID del proyecto.
    • REGION: Es la región en la que se implementa el clúster de AlloyDB.
    • CLUSTER: el nombre del clúster
    • INSTANCE: El nombre de la instancia.
    • DB_NAME: Es el nombre de la base de datos.
    • DB_USER: Es el nombre del usuario de la base de datos.
    • DB_PWD: La contraseña secreta de la base de datos.

    Una migración correcta imprime registros similares a los siguientes sin errores:
    Migration completed, inserted all the batches of data to AlloyDB

  4. Abre AlloyDB Studio para ver tus datos migrados. Para obtener más información, consulta Administra tus datos con AlloyDB Studio.

Limpia

Para evitar que se apliquen cargos a tu cuenta de Google Cloud por los recursos usados en este instructivo, borra el proyecto que contiene los recursos o conserva el proyecto y borra los recursos individuales.

  1. En la consola de Google Cloud, ve a la página Clústeres.

    Ir a los clústeres

  2. En la columna Nombre del recurso, haz clic en el nombre del clúster que creaste.

  3. Haz clic en Borrar clúster.

  4. En Borrar clúster, ingresa el nombre de tu clúster para confirmar que quieres borrarlo.

  5. Haz clic en Borrar.

    Si creaste una conexión privada cuando creaste un clúster, bórrala:

  6. Ve a la página Redes de la consola de Google Cloud y haz clic en Borrar red de VPC.

¿Qué sigue?