Migrar datos de una base de datos de vectores a AlloyDB


En este tutorial se describe cómo migrar datos de una base de datos de vectores de terceros a AlloyDB para PostgreSQL mediante un VectorStore de LangChain. En este tutorial se da por hecho que los datos de las bases de datos de vectores de terceros se han creado mediante una integración de VectorStore de LangChain. Si introduces información en una de las siguientes bases de datos sin usar LangChain, es posible que tengas que editar las secuencias de comandos que se proporcionan a continuación para que coincidan con el esquema de tus datos. Se admiten las siguientes bases de datos vectoriales:

En este tutorial se da por supuesto que tienes conocimientos sobre Google CloudAlloyDB y programación asíncrona en Python.

Objetivos

En este tutorial te explicamos cómo hacer lo siguiente:

  • Extraer datos de una base de datos vectorial.
  • Conéctate a AlloyDB.
  • Inicializa la tabla de AlloyDB.
  • Inicializa un objeto de almacén vectorial.
  • Ejecuta la secuencia de comandos de migración para insertar los datos.

Costes

En este documento, se utilizan los siguientes componentes facturables de Google Cloud:

You might be eligible for a free trial cluster. For more information, see AlloyDB free trial clusters overview.

Para generar una estimación de costes basada en el uso previsto, utiliza la calculadora de precios.

Los usuarios nuevos Google Cloud pueden disfrutar de una prueba gratuita.

Cuando termines las tareas que se describen en este documento, puedes evitar que se te siga facturando eliminando los recursos que has creado. Para obtener más información, consulta la sección Limpiar.

Antes de empezar

Asegúrate de que tienes uno de los siguientes almacenes de vectores de bases de datos de terceros de LangChain:

Habilitar la facturación y las APIs necesarias

  1. En la Google Cloud consolaGoogle Cloud , en la página del selector de proyectos, selecciona o crea un proyecto.

    Ir al selector de proyectos

  2. Asegúrate de que la facturación esté habilitada en tu Google Cloud proyecto.

  3. Habilita las APIs de Cloud necesarias para crear una instancia de AlloyDB para PostgreSQL y conectarte a ella.

    Habilitar las APIs

    1. En el paso Confirmar proyecto, haz clic en Siguiente para confirmar el nombre del proyecto que vas a modificar.
    2. En el paso Habilitar APIs, haz clic en Habilitar para habilitar lo siguiente:

      • API de AlloyDB
      • API de Compute Engine
      • API Service Networking

Roles obligatorios

Para obtener los permisos que necesitas para completar las tareas de este tutorial, debes tener los siguientes roles de gestión de identidades y accesos (IAM), que permiten crear tablas e insertar datos:

  • Propietario (roles/owner) o editor (roles/editor)
  • Si el usuario no es propietario ni editor, debe tener los siguientes roles de gestión de identidades y accesos y privilegios de PostgreSQL:

Si quieres autenticarte en tu base de datos mediante la autenticación de IAM en lugar de usar la autenticación integrada de este tutorial, utiliza el cuaderno que muestra cómo usar AlloyDB para PostgreSQL para almacenar inserciones de vectores con la clase AlloyDBVectorStore.

Crear un clúster y un usuario de AlloyDB

  1. Crea un clúster y una instancia de AlloyDB.
    • Habilita la IP pública para ejecutar este tutorial desde cualquier lugar. Si usas una IP privada, debes seguir este tutorial desde tu VPC.
  2. Crea o selecciona un usuario de base de datos de AlloyDB.
    • Cuando creas la instancia, se crea un usuario postgres con una contraseña. Este usuario tiene permisos de superusuario.
    • En este tutorial se usa la autenticación integrada para reducir la fricción de autenticación. La autenticación de IAM es posible con AlloyDBEngine.

Obtener el código de ejemplo

  1. Copia el código de ejemplo de GitHub clonando el repositorio:

    git clone https://github.com/googleapis/langchain-google-alloydb-pg-python.git
  2. Ve al directorio migrations:

    cd langchain-google-alloydb-pg-python/samples/migrations

Extraer datos de una base de datos de vectores

  1. Crea un cliente.

    Pinecone

    from pinecone import Pinecone  # type: ignore
    
    pinecone_client = Pinecone(api_key=pinecone_api_key)
    pinecone_index = pinecone_client.Index(pinecone_index_name)

    Weaviate

    import weaviate
    
    # For a locally running weaviate instance, use `weaviate.connect_to_local()`
    weaviate_client = weaviate.connect_to_weaviate_cloud(
        cluster_url=weaviate_cluster_url,
        auth_credentials=weaviate.auth.AuthApiKey(weaviate_api_key),
    )

    Croma

    from langchain_chroma import Chroma
    
    chromadb_client = Chroma(
        collection_name=chromadb_collection_name,
        embedding_function=embeddings_service,
        persist_directory=chromadb_path,
    )

    Qdrant

    from qdrant_client import QdrantClient
    
    qdrant_client = QdrantClient(path=qdrant_path)
    

    Milvus

    milvus_client = MilvusClient(uri=milvus_uri)
  2. Obtiene todos los datos de la base de datos.

    Pinecone

    Recupera los IDs de los vectores del índice de Pinecone:

    results = pinecone_index.list_paginated(
        prefix="", namespace=pinecone_namespace, limit=pinecone_batch_size
    )
    ids = [v.id for v in results.vectors]
    if ids:  # Prevents yielding an empty list.
        yield ids
    
    # Check BOTH pagination and pagination.next
    while results.pagination is not None and results.pagination.get("next") is not None:
        pagination_token = results.pagination.get("next")
        results = pinecone_index.list_paginated(
            prefix="",
            pagination_token=pagination_token,
            namespace=pinecone_namespace,
            limit=pinecone_batch_size,
        )
    
        # Extract and yield the next batch of IDs
        ids = [v.id for v in results.vectors]
        if ids:  # Prevents yielding an empty list.
            yield ids

    A continuación, obtén los registros por ID del índice de Pinecone:

    import uuid
    
    # Iterate through the IDs and download their contents
    for ids in id_iterator:
        all_data = pinecone_index.fetch(ids=ids, namespace=pinecone_namespace)
        ids = []
        embeddings = []
        contents = []
        metadatas = []
    
        # Process each vector in the current batch
        for doc in all_data["vectors"].values():
            # You might need to update this data translation logic according to one or more of your field names
            if pinecone_id_column_name in doc:
                # pinecone_id_column_name stores the unqiue identifier for the content
                ids.append(doc[pinecone_id_column_name])
            else:
                # Generate a uuid if pinecone_id_column_name is missing in source
                ids.append(str(uuid.uuid4()))
            # values is the vector embedding of the content
            embeddings.append(doc["values"])
            # Check if pinecone_content_column_name exists in metadata before accessing
            if pinecone_content_column_name in doc.metadata:
                # pinecone_content_column_name stores the content which was encoded
                contents.append(str(doc.metadata[pinecone_content_column_name]))
                # Remove pinecone_content_column_name after processing
                del doc.metadata[pinecone_content_column_name]
            else:
                # Handle the missing pinecone_content_column_name field appropriately
                contents.append("")
            # metadata is the additional context
            metadatas.append(doc["metadata"])
    
        # Yield the current batch of results
        yield ids, contents, embeddings, metadatas

    Weaviate

    # Iterate through the IDs and download their contents
    weaviate_collection = weaviate_client.collections.get(weaviate_collection_name)
    ids: list[str] = []
    content: list[Any] = []
    embeddings: list[list[float]] = []
    metadatas: list[Any] = []
    
    for item in weaviate_collection.iterator(include_vector=True):
        # You might need to update this data translation logic according to one or more of your field names
        # uuid is the unqiue identifier for the content
        ids.append(str(item.uuid))
        # weaviate_text_key is the content which was encoded
        content.append(item.properties[weaviate_text_key])
        # vector is the vector embedding of the content
        embeddings.append(item.vector["default"])  # type: ignore
        del item.properties[weaviate_text_key]  # type: ignore
        # properties is the additional context
        metadatas.append(item.properties)
    
        if len(ids) >= weaviate_batch_size:
            # Yield the current batch of results
            yield ids, content, embeddings, metadatas
            # Reset lists to start a new batch
            ids = []
            content = []
            embeddings = []
            metadatas = []

    Croma

    # Iterate through the IDs and download their contents
    offset = 0
    while True:
        # You might need to update this data translation logic according to one or more of your field names
        # documents is the content which was encoded
        # embeddings is the vector embedding of the content
        # metadatas is the additional context
        docs = chromadb_client.get(
            include=["metadatas", "documents", "embeddings"],
            limit=chromadb_batch_size,
            offset=offset,
        )
    
        if len(docs["documents"]) == 0:
            break
    
        # ids is the unqiue identifier for the content
        yield docs["ids"], docs["documents"], docs["embeddings"].tolist(), docs[
            "metadatas"
        ]
    
        offset += chromadb_batch_size
    

    Qdrant

    # Iterate through the IDs and download their contents
    offset = None
    while True:
        docs, offset = qdrant_client.scroll(
            collection_name=qdrant_collection_name,
            with_vectors=True,
            limit=qdrant_batch_size,
            offset=offset,
            with_payload=True,
        )
    
        ids: List[str] = []
        contents: List[Any] = []
        embeddings: List[List[float]] = []
        metadatas: List[Any] = []
    
        for doc in docs:
            if doc.payload and doc.vector:
                # You might need to update this data translation logic according to one or more of your field names
                # id is the unqiue identifier for the content
                ids.append(str(doc.id))
                # page_content is the content which was encoded
                contents.append(doc.payload["page_content"])
                # vector is the vector embedding of the content
                embeddings.append(doc.vector)  # type: ignore
                # metatdata is the additional context
                metadatas.append(doc.payload["metadata"])
    
        yield ids, contents, embeddings, metadatas
    
        if not offset:
            break
    

    Milvus

    # Iterate through the IDs and download their contents
    iterator = milvus_client.query_iterator(
        collection_name=milvus_collection_name,
        filter='pk >= "0"',
        output_fields=["pk", "text", "vector", "idv"],
        batch_size=milvus_batch_size,
    )
    
    while True:
        ids = []
        content = []
        embeddings = []
        metadatas = []
        page = iterator.next()
        if len(page) == 0:
            iterator.close()
            break
        for i in range(len(page)):
            # You might need to update this data translation logic according to one or more of your field names
            doc = page[i]
            # pk is the unqiue identifier for the content
            ids.append(doc["pk"])
            # text is the content which was encoded
            content.append(doc["text"])
            # vector is the vector embedding of the content
            embeddings.append(doc["vector"])
            del doc["pk"]
            del doc["text"]
            del doc["vector"]
            # doc is the additional context
            metadatas.append(doc)
        yield ids, content, embeddings, metadatas

Inicializar la tabla de AlloyDB

  1. Define el servicio de inserción.

    La interfaz VectorStore requiere un servicio de inserción. Este flujo de trabajo no genera nuevas inserciones, por lo que se usa la clase FakeEmbeddings para evitar costes.

    Pinecone

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)

    Weaviate

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)

    Croma

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)

    Qdrant

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)

    Milvus

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)
  2. Prepara la tabla de AlloyDB.

    1. Conéctate a AlloyDB mediante una conexión de IP pública. Para obtener más información, consulta Especificar el tipo de dirección IP.

      Pinecone

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
          ip_type=IPTypes.PUBLIC,  # Optionally use IPTypes.PRIVATE
      )

      Weaviate

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
          ip_type=IPTypes.PUBLIC,
      )

      Croma

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
          ip_type=IPTypes.PUBLIC,
      )

      Qdrant

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
          ip_type=IPTypes.PUBLIC,
      )

      Milvus

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
          ip_type=IPTypes.PUBLIC,
      )
    2. Crea una tabla en la que copiar los datos, si aún no existe.

      Pinecone

      from langchain_google_alloydb_pg import Column
      
      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
          # Customize the ID column types if not using the UUID data type
          # id_column=Column("langchain_id", "TEXT"),  # Default is Column("langchain_id", "UUID")
          # overwrite_existing=True,  # Drop the old table and Create a new vector store table
      )

      Weaviate

      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
          # Customize the ID column types with `id_column` if not using the UUID data type
      )
      

      Croma

      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
          # Customize the ID column types with `id_column` if not using the UUID data type
      )

      Qdrant

      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
          # Customize the ID column types with `id_column` if not using the UUID data type
      )

      Milvus

      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
          # Customize the ID column types with `id_column` if not using the UUID data type
      )

Inicializar un objeto de almacén vectorial

Este código añade metadatos de inserción de vector adicionales a la columna langchain_metadata en formato JSON. Para que el filtrado sea más eficiente, organiza estos metadatos en columnas independientes. Para obtener más información, consulta Crear un almacén de vectores personalizado.

  1. Para inicializar un objeto de almacén de vectores, ejecuta el siguiente comando:

    Pinecone

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )

    Weaviate

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )

    Croma

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )

    Qdrant

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )

    Milvus

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )
  2. Inserta datos en la tabla de AlloyDB:

    Pinecone

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

    Weaviate

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

    Croma

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

    Qdrant

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

    Milvus

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

Ejecutar la secuencia de comandos de migración

  1. Configura el entorno de Python.

  2. Instala las dependencias de ejemplo:

    pip install -r requirements.txt
  3. Ejecuta la migración de ejemplo.

    Pinecone

    python migrate_pinecone_vectorstore_to_alloydb.py

    Haz las siguientes sustituciones antes de ejecutar el ejemplo:

    • PINECONE_API_KEY: la clave de API de Pinecone.
    • PINECONE_NAMESPACE: el espacio de nombres de Pinecone.
    • PINECONE_INDEX_NAME: el nombre del índice de Pinecone.
    • PROJECT_ID: el ID del proyecto.
    • REGION: la región en la que se ha desplegado el clúster de AlloyDB.
    • CLUSTER: el nombre del clúster.
    • INSTANCE: el nombre de la instancia.
    • DB_NAME: el nombre de la base de datos.
    • DB_USER: el nombre del usuario de la base de datos.
    • DB_PWD: la contraseña secreta de la base de datos.

    Weaviate

    python migrate_weaviate_vectorstore_to_alloydb.py

    Haz las siguientes sustituciones antes de ejecutar el ejemplo:

    • WEAVIATE_API_KEY: la clave de API de Weaviate.
    • WEAVIATE_CLUSTER_URL: URL del clúster de Weaviate.
    • WEAVIATE_COLLECTION_NAME: el nombre de la colección de Weaviate.
    • PROJECT_ID: el ID del proyecto.
    • REGION: la región en la que se ha desplegado el clúster de AlloyDB.
    • CLUSTER: el nombre del clúster.
    • INSTANCE: el nombre de la instancia.
    • DB_NAME: el nombre de la base de datos.
    • DB_USER: el nombre del usuario de la base de datos.
    • DB_PWD: la contraseña secreta de la base de datos.

    Croma

    python migrate_chromadb_vectorstore_to_alloydb.py

    Haz las siguientes sustituciones antes de ejecutar el ejemplo:

    • CHROMADB_PATH: la ruta de la base de datos de Chroma.
    • CHROMADB_COLLECTION_NAME: el nombre de la colección de la base de datos de Chroma.
    • PROJECT_ID: el ID del proyecto.
    • REGION: la región en la que se ha desplegado el clúster de AlloyDB.
    • CLUSTER: el nombre del clúster.
    • INSTANCE: el nombre de la instancia.
    • DB_NAME: el nombre de la base de datos.
    • DB_USER: el nombre del usuario de la base de datos.
    • DB_PWD: la contraseña secreta de la base de datos.

    Qdrant

    python migrate_qdrant_vectorstore_to_alloydb.py

    Haz las siguientes sustituciones antes de ejecutar el ejemplo:

    • QDRANT_PATH: la ruta de la base de datos de Qdrant.
    • QDRANT_COLLECTION_NAME: el nombre de la colección de Qdrant.
    • PROJECT_ID: el ID del proyecto.
    • REGION: la región en la que se ha desplegado el clúster de AlloyDB.
    • CLUSTER: el nombre del clúster.
    • INSTANCE: el nombre de la instancia.
    • DB_NAME: el nombre de la base de datos.
    • DB_USER: el nombre del usuario de la base de datos.
    • DB_PWD: la contraseña secreta de la base de datos.

    Milvus

    python migrate_milvus_vectorstore_to_alloydb.py

    Haz las siguientes sustituciones antes de ejecutar el ejemplo:

    • MILVUS_URI: el URI de Milvus.
    • MILVUS_COLLECTION_NAME: el nombre de la colección de Milvus.
    • PROJECT_ID: el ID del proyecto.
    • REGION: la región en la que se ha desplegado el clúster de AlloyDB.
    • CLUSTER: el nombre del clúster.
    • INSTANCE: el nombre de la instancia.
    • DB_NAME: el nombre de la base de datos.
    • DB_USER: el nombre del usuario de la base de datos.
    • DB_PWD: la contraseña secreta de la base de datos.

    Si la migración se realiza correctamente, se mostrarán registros similares a los siguientes sin errores:
    Migration completed, inserted all the batches of data to AlloyDB

  4. Abre AlloyDB Studio para ver los datos migrados. Para obtener más información, consulta Gestionar tus datos con AlloyDB Studio.

Limpieza

Para evitar que los recursos utilizados en este tutorial se cobren en tu cuenta de Google Cloud, elimina el proyecto que contiene los recursos o conserva el proyecto y elimina los recursos.

  1. En la Google Cloud consola, ve a la página Clusters.

    Ir a Clústeres

  2. En la columna Nombre del recurso, haz clic en el nombre del clúster que has creado.

  3. Haz clic en Eliminar clúster.

  4. En Eliminar clúster, introduce el nombre del clúster para confirmar que quieres eliminarlo.

  5. Haz clic en Eliminar.

    Si has creado una conexión privada al crear un clúster, elimina la conexión privada:

  6. Ve a la Google Cloud consola página Redes y haz clic en Eliminar red de VPC.

Siguientes pasos