Migrer des données d'une base de données vectorielle vers AlloyDB


Ce tutoriel explique comment migrer des données d'une base de données vectorielle tierce vers AlloyDB pour PostgreSQL à l'aide de magasins de vecteurs LangChain. Les bases de données vectorielles suivantes sont compatibles:

Dans ce tutoriel, nous partons du principe que vous connaissez Google Cloud, AlloyDB et la programmation Python asynchrone.

Objectifs

Ce tutoriel vous explique comment effectuer les tâches suivantes :

  • Extrayez des données à partir d'une base de données vectorielle existante.
  • Connectez-vous à AlloyDB.
  • Initialisez la table AlloyDB.
  • Initialisez un objet de magasin de vecteurs.
  • Exécutez le script de migration pour insérer les données.

Coûts

Dans ce document, vous utilisez les composants facturables suivants de Google Cloud :

Obtenez une estimation des coûts en fonction de votre utilisation prévue à l'aide du simulateur de coût. Les nouveaux utilisateurs de Google Cloud peuvent bénéficier d'un essai gratuit.

Une fois que vous avez terminé les tâches décrites dans ce document, vous pouvez éviter de continuer à payer des frais en supprimant les ressources que vous avez créées. Pour en savoir plus, consultez la section Effectuer un nettoyage.

Avant de commencer

Assurez-vous de disposer de l'un des magasins de vecteurs de base de données tiers LangChain suivants:

Activer la facturation et les API requises

  1. Dans la console Google Cloud, sur la page de sélection du projet, sélectionnez ou créez un projetGoogle Cloud .

    Accéder au sélecteur de projet

  2. Assurez-vous que la facturation est activée pour votre Google Cloud projet.

  3. Activez les APIs Cloud nécessaires pour créer et vous connecter à AlloyDB pour PostgreSQL.

    Activer les API

    1. À l'étape Confirmer le projet, cliquez sur Suivant pour confirmer le nom du projet que vous allez modifier.
    2. À l'étape Activer les API, cliquez sur Activer pour activer les éléments suivants:

      • API AlloyDB
      • API Compute Engine
      • API Service Networking

Rôles requis

Pour obtenir les autorisations nécessaires pour effectuer les tâches de ce tutoriel, vous devez disposer des rôles IAM (Identity and Access Management) suivants, qui vous permettent de créer des tables et d'insérer des données:

  • Propriétaire (roles/owner) ou éditeur (roles/editor)
  • Si l'utilisateur n'est pas propriétaire ou éditeur, les rôles IAM et les droits PostgreSQL suivants sont requis:

Si vous souhaitez vous authentifier auprès de votre base de données à l'aide de l'authentification IAM au lieu d'utiliser l'authentification intégrée dans ce tutoriel, utilisez le notebook qui explique comment utiliser AlloyDB pour PostgreSQL pour stocker des représentations vectorielles continues avec la classe AlloyDBVectorStore.

Créer un cluster et un utilisateur AlloyDB

  1. Créez un cluster AlloyDB et une instance.
    • Activez les adresses IP publiques pour exécuter ce tutoriel partout. Si vous utilisez une adresse IP privée, vous devez exécuter ce tutoriel à partir de votre VPC.
  2. Créez ou sélectionnez un utilisateur de base de données AlloyDB.
    • Lorsque vous créez l'instance, un utilisateur postgres est créé avec un mot de passe. Cet utilisateur dispose des autorisations de super-utilisateur.
    • Ce tutoriel utilise l'authentification intégrée pour réduire les frictions d'authentification. L'authentification IAM est possible à l'aide du moteur AlloyDB.

Récupérer l'exemple de code

  1. Copiez l'exemple de code depuis GitHub en clonant le dépôt:

    git clone https://github.com/googleapis/langchain-google-alloydb-pg-python.git
  2. Accédez au répertoire migrations :

    cd langchain-google-alloydb-pg-python/samples/migrations

Extraire des données d'une base de données vectorielle existante

  1. Créer un client

    Pomme de pin

    from pinecone import Pinecone  # type: ignore
    
    pinecone_client = Pinecone(api_key=pinecone_api_key)
    pinecone_index = pinecone_client.Index(pinecone_index_name)

    Weaviate

    import weaviate
    
    # For a locally running weaviate instance, use `weaviate.connect_to_local()`
    weaviate_client = weaviate.connect_to_weaviate_cloud(
        cluster_url=weaviate_cluster_url,
        auth_credentials=weaviate.auth.AuthApiKey(weaviate_api_key),
    )

    Chroma

    from langchain_chroma import Chroma
    
    chromadb_client = Chroma(
        collection_name=chromadb_collection_name,
        embedding_function=embeddings_service,
        persist_directory=chromadb_path,
    )

    Qdrant

    from qdrant_client import QdrantClient
    
    qdrant_client = QdrantClient(path=qdrant_path)
    

    Milvus

    milvus_client = MilvusClient(uri=milvus_uri)
  2. Récupérez toutes les données de la base de données.

    Pomme de pin

    Récupérez les ID de vecteurs à partir de l'index Pinecone:

    results = pinecone_index.list_paginated(
        prefix="", namespace=pinecone_namespace, limit=pinecone_batch_size
    )
    ids = [v.id for v in results.vectors]
    yield ids
    
    while results.pagination is not None:
        pagination_token = results.pagination.next
        results = pinecone_index.list_paginated(
            prefix="", pagination_token=pagination_token, limit=pinecone_batch_size
        )
    
        # Extract and yield the next batch of IDs
        ids = [v.id for v in results.vectors]
        yield ids

    Puis récupérez les enregistrements par ID à partir de l'index Pinecone:

    # Iterate through the IDs and download their contents
    for ids in id_iterator:
        all_data = pinecone_index.fetch(ids=ids)
        ids = []
        embeddings = []
        contents = []
        metadatas = []
    
        # Process each vector in the current batch
        for doc in all_data["vectors"].values():
            ids.append(doc["id"])
            embeddings.append(doc["values"])
            contents.append(str(doc["metadata"]["text"]))
            del doc["metadata"]["text"]
            metadata = doc["metadata"]
            metadatas.append(metadata)
    
        # Yield the current batch of results
        yield ids, contents, embeddings, metadatas

    Weaviate

    # Iterate through the IDs and download their contents
    weaviate_collection = weaviate_client.collections.get(weaviate_collection_name)
    ids = []
    content = []
    embeddings = []
    metadatas = []
    
    for item in weaviate_collection.iterator(include_vector=True):
        ids.append(str(item.uuid))
        content.append(item.properties[weaviate_text_key])
        embeddings.append(item.vector["default"])
        del item.properties[weaviate_text_key]  # type: ignore
        metadatas.append(item.properties)
    
        if len(ids) >= weaviate_batch_size:
            # Yield the current batch of results
            yield ids, content, embeddings, metadatas
            # Reset lists to start a new batch
            ids = []
            content = []
            embeddings = []
            metadatas = []

    Chroma

    # Iterate through the IDs and download their contents
    offset = 0
    while True:
        docs = chromadb_client.get(
            include=["metadatas", "documents", "embeddings"],
            limit=chromadb_batch_size,
            offset=offset,
        )
    
        if len(docs["documents"]) == 0:
            break
    
        yield docs["ids"], docs["documents"], docs["embeddings"].tolist(), docs[
            "metadatas"
        ]
    
        offset += chromadb_batch_size
    

    Qdrant

    # Iterate through the IDs and download their contents
    offset = None
    while True:
        docs, offset = qdrant_client.scroll(
            collection_name=qdrant_collection_name,
            with_vectors=True,
            limit=qdrant_batch_size,
            offset=offset,
            with_payload=True,
        )
    
        ids: List[str] = []
        contents: List[Any] = []
        embeddings: List[List[float]] = []
        metadatas: List[Any] = []
    
        for doc in docs:
            if doc.payload and doc.vector:
                ids.append(str(doc.id))
                contents.append(doc.payload["page_content"])
                embeddings.append(doc.vector)  # type: ignore
                metadatas.append(doc.payload["metadata"])
    
        yield ids, contents, embeddings, metadatas
    
        if not offset:
            break
    

    Milvus

    # Iterate through the IDs and download their contents
    iterator = milvus_client.query_iterator(
        collection_name=milvus_collection_name,
        filter='pk >= "0"',
        output_fields=["pk", "text", "vector", "idv"],
        batch_size=milvus_batch_size,
    )
    
    while True:
        ids = []
        content = []
        embeddings = []
        metadatas = []
        page = iterator.next()
        if len(page) == 0:
            iterator.close()
            break
        for i in range(len(page)):
            doc = page[i]
            ids.append(doc["pk"])
            content.append(doc["text"])
            embeddings.append(doc["vector"])
            del doc["pk"]
            del doc["text"]
            del doc["vector"]
            metadatas.append(doc)
        yield ids, content, embeddings, metadatas

Initialiser la table AlloyDB

  1. Définissez le service d'embedding.

    L'interface VectorStore nécessite un service d'intégration. Ce workflow ne génère pas de nouveaux embeddings. La classe FakeEmbeddings est donc utilisée pour éviter tout coût.

    Pomme de pin

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)

    Weaviate

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)

    Chroma

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)

    Qdrant

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)

    Milvus

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)
  2. Préparer la table AlloyDB

    1. Connectez-vous à AlloyDB à l'aide d'une connexion par adresse IP publique. Pour en savoir plus, consultez la section Spécifier le type d'adresse IP.

      Pomme de pin

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
      )

      Weaviate

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
      )

      Chroma

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
      )

      Qdrant

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
      )

      Milvus

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
      )
    2. Créez une table dans laquelle copier les données, si elle n'existe pas déjà.

      Pomme de pin

      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
      )

      Weaviate

      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
      )
      

      Chroma

      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
      )

      Qdrant

      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
      )

      Milvus

      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
      )

Initialiser un objet de magasin de vecteurs

Ce code ajoute des métadonnées d'encapsulation vectorielle supplémentaires à la colonne langchain_metadata au format JSON. Pour optimiser le filtrage, organisez ces métadonnées dans des colonnes distinctes. Pour en savoir plus, consultez Créer un dépôt de vecteurs personnalisé.

  1. Pour initialiser un objet de magasin de vecteurs, exécutez la commande suivante:

    Pomme de pin

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )

    Weaviate

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )

    Chroma

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )

    Qdrant

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )

    Milvus

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )
  2. Insérez des données dans la table AlloyDB:

    Pomme de pin

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

    Weaviate

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

    Chroma

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

    Qdrant

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

    Milvus

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

Exécuter le script de migration

  1. Configurez l'environnement Python.

  2. Installez les dépendances de l'exemple:

    pip install -r requirements.txt
  3. Exécutez l'exemple de migration.

    Pomme de pin

    python migrate_pinecone_vectorstore_to_alloydb.py

    Effectuez les remplacements suivants avant d'exécuter l'exemple:

    • PINECONE_API_KEY: clé API Pinecone.
    • PINECONE_NAMESPACE: espace de noms Pinecone.
    • PINECONE_INDEX_NAME: nom de l'index Pinecone.
    • PROJECT_ID : ID du projet.
    • REGION: région dans laquelle le cluster AlloyDB est déployé.
    • CLUSTER : nom du cluster.
    • INSTANCE : nom de l'instance.
    • DB_NAME: nom de la base de données.
    • DB_USER: nom de l'utilisateur de la base de données.
    • DB_PWD: mot de passe secret de la base de données.

    Weaviate

    python migrate_weaviate_vectorstore_to_alloydb.py

    Effectuez les remplacements suivants avant d'exécuter l'exemple:

    • WEAVIATE_API_KEY: clé API Weaviate.
    • WEAVIATE_CLUSTER_URL: URL du cluster Weaviate.
    • WEAVIATE_COLLECTION_NAME: nom de la collection Weaviate.
    • PROJECT_ID : ID du projet.
    • REGION: région dans laquelle le cluster AlloyDB est déployé.
    • CLUSTER : nom du cluster.
    • INSTANCE : nom de l'instance.
    • DB_NAME: nom de la base de données.
    • DB_USER: nom de l'utilisateur de la base de données.
    • DB_PWD: mot de passe secret de la base de données.

    Chroma

    python migrate_chromadb_vectorstore_to_alloydb.py

    Effectuez les remplacements suivants avant d'exécuter l'exemple:

    • CHROMADB_PATH: chemin d'accès à la base de données Chroma.
    • CHROMADB_COLLECTION_NAME: nom de la collection de la base de données Chroma.
    • PROJECT_ID : ID du projet.
    • REGION: région dans laquelle le cluster AlloyDB est déployé.
    • CLUSTER : nom du cluster.
    • INSTANCE : nom de l'instance.
    • DB_NAME: nom de la base de données.
    • DB_USER: nom de l'utilisateur de la base de données.
    • DB_PWD: mot de passe secret de la base de données.

    Qdrant

    python migrate_qdrant_vectorstore_to_alloydb.py

    Effectuez les remplacements suivants avant d'exécuter l'exemple:

    • QDRANT_PATH: chemin d'accès à la base de données Qdrant.
    • QDRANT_COLLECTION_NAME: nom de la collection Qdrant.
    • PROJECT_ID : ID du projet.
    • REGION: région dans laquelle le cluster AlloyDB est déployé.
    • CLUSTER : nom du cluster.
    • INSTANCE : nom de l'instance.
    • DB_NAME: nom de la base de données.
    • DB_USER: nom de l'utilisateur de la base de données.
    • DB_PWD: mot de passe secret de la base de données.

    Milvus

    python migrate_milvus_vectorstore_to_alloydb.py

    Effectuez les remplacements suivants avant d'exécuter l'exemple:

    • MILVUS_URI: URI Milvus.
    • MILVUS_COLLECTION_NAME: nom de la collection Milvus.
    • PROJECT_ID : ID du projet.
    • REGION: région dans laquelle le cluster AlloyDB est déployé.
    • CLUSTER : nom du cluster.
    • INSTANCE : nom de l'instance.
    • DB_NAME: nom de la base de données.
    • DB_USER: nom de l'utilisateur de la base de données.
    • DB_PWD: mot de passe secret de la base de données.

    En cas de migration réussie, des journaux semblables à ceux-ci s'affichent sans erreur:
    Migration completed, inserted all the batches of data to AlloyDB

  4. Ouvrez AlloyDB Studio pour afficher vos données migrées. Pour en savoir plus, consultez la section Gérer vos données à l'aide d'AlloyDB Studio.

Effectuer un nettoyage

Pour éviter que les ressources utilisées lors de ce tutoriel soient facturées sur votre compte Google Cloud, supprimez le projet contenant les ressources, ou conservez le projet et supprimez les ressources individuelles.

  1. Dans la console Google Cloud, accédez à la page Clusters.

    accéder aux clusters

  2. Dans la colonne Nom de la ressource, cliquez sur le nom du cluster que vous avez créé.

  3. Cliquez sur  Supprimer le cluster.

  4. Dans Supprimer le cluster, saisissez le nom de votre cluster pour confirmer que vous souhaitez le supprimer.

  5. Cliquez sur Supprimer.

    Si vous avez créé une connexion privée lorsque vous avez créé un cluster, supprimez-la:

  6. Accédez à la page Networking (Réseaux) de la console Google Cloud, puis cliquez sur Delete VPC network (Supprimer le réseau VPC).

Étape suivante