Migrate data from a vector database to AlloyDB


This tutorial describes how to migrate data from a third-party vector database to AlloyDB for PostgreSQL using LangChain vector stores. The following vector databases are supported:

This tutorial assumes that you're familiar with Google Cloud, AlloyDB, and asynchronous Python programming.

Objectives

This tutorial shows you how to do the following:

  • Extract data from an existing vector database.
  • Connect to AlloyDB.
  • Initialize the AlloyDB table.
  • Initialize a vector store object.
  • Run the migration script to insert the data.

Costs

In this document, you use the following billable components of Google Cloud:

To generate a cost estimate based on your projected usage, use the pricing calculator. New Google Cloud users might be eligible for a free trial.

When you finish the tasks that are described in this document, you can avoid continued billing by deleting the resources that you created. For more information, see Clean up.

Before you begin

Make sure that you have one of the following LangChain third-party database vector stores:

Enable billing and required APIs

  1. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  2. Make sure that billing is enabled for your Google Cloud project.

  3. Enable the Cloud APIs necessary to create and connect to AlloyDB for PostgreSQL.

    Enable the APIs

    1. In the Confirm project step, click Next to confirm the name of the project you are going to make changes to.
    2. In the Enable APIs step, click Enable to enable the following:

      • AlloyDB API
      • Compute Engine API
      • Service Networking API

Required roles

To get the permissions that you need to complete the tasks in this tutorial, have the following Identity and Access Management (IAM) roles which allow for table creation and data insertion:

If you want to authenticate to your database using IAM authentication instead of using the built-in authentication in this tutorial, use the notebook that shows how to use AlloyDB for PostgreSQL to store vector embeddings with the AlloyDBVectorStore class.

Create an AlloyDB cluster and user

  1. Create an AlloyDB cluster and an instance.
    • Enable Public IP to run this tutorial from anywhere. If you're using Private IP, you must run this tutorial from within your VPC.
  2. Create or select an AlloyDB database user.
    • When you create the instance, a postgres user is created with a password. This user has superuser permissions.
    • This tutorial uses built-in authentication to reduce any authentication friction. IAM authentication is possible using the AlloyDBEngine.

Retrieve the code sample

  1. Copy the code sample from GitHub by cloning the repository:

    git clone https://github.com/googleapis/langchain-google-alloydb-pg-python.git
  2. Navigate to the migrations directory:

    cd langchain-google-alloydb-pg-python/samples/migrations

Extract data from an existing vector database

  1. Create a client.

    Pinecone

    from pinecone import Pinecone  # type: ignore
    
    pinecone_client = Pinecone(api_key=pinecone_api_key)
    pinecone_index = pinecone_client.Index(pinecone_index_name)

    Weaviate

    import weaviate
    
    # For a locally running weaviate instance, use `weaviate.connect_to_local()`
    weaviate_client = weaviate.connect_to_weaviate_cloud(
        cluster_url=weaviate_cluster_url,
        auth_credentials=weaviate.auth.AuthApiKey(weaviate_api_key),
    )

    Chroma

    from langchain_chroma import Chroma
    
    chromadb_client = Chroma(
        collection_name=chromadb_collection_name,
        embedding_function=embeddings_service,
        persist_directory=chromadb_path,
    )

    Qdrant

    from qdrant_client import QdrantClient
    
    qdrant_client = QdrantClient(path=qdrant_path)
    

    Milvus

    milvus_client = MilvusClient(uri=milvus_uri)
  2. Get all the data from the database.

    Pinecone

    Retrieve vector IDs from the Pinecone index:

    results = pinecone_index.list_paginated(
        prefix="", namespace=pinecone_namespace, limit=pinecone_batch_size
    )
    ids = [v.id for v in results.vectors]
    yield ids
    
    while results.pagination is not None:
        pagination_token = results.pagination.next
        results = pinecone_index.list_paginated(
            prefix="", pagination_token=pagination_token, limit=pinecone_batch_size
        )
    
        # Extract and yield the next batch of IDs
        ids = [v.id for v in results.vectors]
        yield ids

    And then fetch records by ID from the Pinecone index:

    # Iterate through the IDs and download their contents
    for ids in id_iterator:
        all_data = pinecone_index.fetch(ids=ids)
        ids = []
        embeddings = []
        contents = []
        metadatas = []
    
        # Process each vector in the current batch
        for doc in all_data["vectors"].values():
            ids.append(doc["id"])
            embeddings.append(doc["values"])
            contents.append(str(doc["metadata"]["text"]))
            del doc["metadata"]["text"]
            metadata = doc["metadata"]
            metadatas.append(metadata)
    
        # Yield the current batch of results
        yield ids, contents, embeddings, metadatas

    Weaviate

    # Iterate through the IDs and download their contents
    weaviate_collection = weaviate_client.collections.get(weaviate_collection_name)
    ids = []
    content = []
    embeddings = []
    metadatas = []
    
    for item in weaviate_collection.iterator(include_vector=True):
        ids.append(str(item.uuid))
        content.append(item.properties[weaviate_text_key])
        embeddings.append(item.vector["default"])
        del item.properties[weaviate_text_key]  # type: ignore
        metadatas.append(item.properties)
    
        if len(ids) >= weaviate_batch_size:
            # Yield the current batch of results
            yield ids, content, embeddings, metadatas
            # Reset lists to start a new batch
            ids = []
            content = []
            embeddings = []
            metadatas = []

    Chroma

    # Iterate through the IDs and download their contents
    offset = 0
    while True:
        docs = chromadb_client.get(
            include=["metadatas", "documents", "embeddings"],
            limit=chromadb_batch_size,
            offset=offset,
        )
    
        if len(docs["documents"]) == 0:
            break
    
        yield docs["ids"], docs["documents"], docs["embeddings"].tolist(), docs[
            "metadatas"
        ]
    
        offset += chromadb_batch_size
    

    Qdrant

    # Iterate through the IDs and download their contents
    offset = None
    while True:
        docs, offset = qdrant_client.scroll(
            collection_name=qdrant_collection_name,
            with_vectors=True,
            limit=qdrant_batch_size,
            offset=offset,
            with_payload=True,
        )
    
        ids: List[str] = []
        contents: List[Any] = []
        embeddings: List[List[float]] = []
        metadatas: List[Any] = []
    
        for doc in docs:
            if doc.payload and doc.vector:
                ids.append(str(doc.id))
                contents.append(doc.payload["page_content"])
                embeddings.append(doc.vector)  # type: ignore
                metadatas.append(doc.payload["metadata"])
    
        yield ids, contents, embeddings, metadatas
    
        if not offset:
            break
    

    Milvus

    # Iterate through the IDs and download their contents
    iterator = milvus_client.query_iterator(
        collection_name=milvus_collection_name,
        filter='pk >= "0"',
        output_fields=["pk", "text", "vector", "idv"],
        batch_size=milvus_batch_size,
    )
    
    while True:
        ids = []
        content = []
        embeddings = []
        metadatas = []
        page = iterator.next()
        if len(page) == 0:
            iterator.close()
            break
        for i in range(len(page)):
            doc = page[i]
            ids.append(doc["pk"])
            content.append(doc["text"])
            embeddings.append(doc["vector"])
            del doc["pk"]
            del doc["text"]
            del doc["vector"]
            metadatas.append(doc)
        yield ids, content, embeddings, metadatas

Initialize the AlloyDB table

  1. Define the embedding service.

    The VectorStore interface requires an embedding service. This workflow doesn't generate new embeddings, so the FakeEmbeddings class is used to avoid any costs.

    Pinecone

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)

    Weaviate

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)

    Chroma

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)

    Qdrant

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)

    Milvus

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)
  2. Prepare the AlloyDB table.

    1. Connect to AlloyDB using a public IP connection. For more information, see Specifying IP Address Type.

      Pinecone

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
      )

      Weaviate

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
      )

      Chroma

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
      )

      Qdrant

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
      )

      Milvus

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
      )
    2. Create a table to copy data into, if it doesn't already exist.

      Pinecone

      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
      )

      Weaviate

      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
      )
      

      Chroma

      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
      )

      Qdrant

      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
      )

      Milvus

      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
      )

Initialize a vector store object

This code adds additional vector embedding metadata to the langchain_metadata column in a JSON format. To make filtering more efficient, organize this metadata into separate columns. For more information, see Create a custom Vector Store.

  1. To initialize a vector store object, run the following command:

    Pinecone

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )

    Weaviate

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )

    Chroma

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )

    Qdrant

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )

    Milvus

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )
  2. Insert data into the AlloyDB table:

    Pinecone

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

    Weaviate

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

    Chroma

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

    Qdrant

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

    Milvus

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

Run the migration script

  1. Set up the Python environment.

  2. Install the sample dependencies:

    pip install -r requirements.txt
  3. Run the sample migration.

    Pinecone

    python migrate_pinecone_vectorstore_to_alloydb.py

    Make the following replacements before you run the sample:

    • PINECONE_API_KEY: the Pinecone API key.
    • PINECONE_NAMESPACE: the Pinecone namespace.
    • PINECONE_INDEX_NAME: the name of the Pinecone index.
    • PROJECT_ID: the project ID.
    • REGION: the region in which the AlloyDB cluster is deployed.
    • CLUSTER: the name of the cluster.
    • INSTANCE: the name of the instance.
    • DB_NAME: the name of the database.
    • DB_USER: the name of the database user.
    • DB_PWD: the database secret password.

    Weaviate

    python migrate_weaviate_vectorstore_to_alloydb.py

    Make the following replacements before you run the sample:

    • WEAVIATE_API_KEY: the Weaviate API key.
    • WEAVIATE_CLUSTER_URL: the Weaviate cluster URL.
    • WEAVIATE_COLLECTION_NAME: the Weaviate collection name.
    • PROJECT_ID: the project ID.
    • REGION: the region in which the AlloyDB cluster is deployed.
    • CLUSTER: the name of the cluster.
    • INSTANCE: the name of the instance.
    • DB_NAME: the name of the database.
    • DB_USER: the name of the database user.
    • DB_PWD: the database secret password.

    Chroma

    python migrate_chromadb_vectorstore_to_alloydb.py

    Make the following replacements before you run the sample:

    • CHROMADB_PATH: the Chroma database path.
    • CHROMADB_COLLECTION_NAME: the name of the Chroma database collection.
    • PROJECT_ID: the project ID.
    • REGION: the region in which the AlloyDB cluster is deployed.
    • CLUSTER: the name of the cluster.
    • INSTANCE: the name of the instance.
    • DB_NAME: the name of the database.
    • DB_USER: the name of the database user.
    • DB_PWD: the database secret password.

    Qdrant

    python migrate_qdrant_vectorstore_to_alloydb.py

    Make the following replacements before you run the sample:

    • QDRANT_PATH: the Qdrant database path.
    • QDRANT_COLLECTION_NAME: the name of the Qdrant collection name.
    • PROJECT_ID: the project ID.
    • REGION: the region in which the AlloyDB cluster is deployed.
    • CLUSTER: the name of the cluster.
    • INSTANCE: the name of the instance.
    • DB_NAME: the name of the database.
    • DB_USER: the name of the database user.
    • DB_PWD: the database secret password.

    Milvus

    python migrate_milvus_vectorstore_to_alloydb.py

    Make the following replacements before you run the sample:

    • MILVUS_URI: the Milvus URI.
    • MILVUS_COLLECTION_NAME: the name of the Milvus collection.
    • PROJECT_ID: the project ID.
    • REGION: the region in which the AlloyDB cluster is deployed.
    • CLUSTER: the name of the cluster.
    • INSTANCE: the name of the instance.
    • DB_NAME: the name of the database.
    • DB_USER: the name of the database user.
    • DB_PWD: the database secret password.

    A successful migration prints logs similar to the following without any errors:
    Migration completed, inserted all the batches of data to AlloyDB

  4. Open AlloyDB Studio to view your migrated data. For more information, see Manage your data using AlloyDB Studio.

Clean up

To avoid incurring charges to your Google Cloud account for the resources used in this tutorial, either delete the project that contains the resources, or keep the project and delete the individual resources.

  1. In the Google Cloud console, go to the Clusters page.

    Go to Clusters

  2. In the Resource name column, click the name of the cluster that you created.

  3. Click Delete cluster.

  4. In Delete cluster, enter the name of your cluster to confirm that you want to delete your cluster.

  5. Click Delete.

    If you created a private connection when you created a cluster, delete the private connection:

  6. Go to the Google Cloud console Networking page and click Delete VPC network.

What's next