Daten aus einer Vektordatenbank zu AlloyDB migrieren


In dieser Anleitung wird beschrieben, wie Sie Daten mithilfe von LangChain-Vektorspeichern aus einer Drittanbieter-Vektordatenbank zu AlloyDB for PostgreSQL migrieren. Folgende Vektordatenbanken werden unterstützt:

In dieser Anleitung wird davon ausgegangen, dass Sie mit Google Cloud, AlloyDB und der asynchronen Python-Programmierung vertraut sind.

Lernziele

In dieser Anleitung wird Folgendes beschrieben:

  • Daten aus einer vorhandenen Vektordatenbank extrahieren
  • Stellen Sie eine Verbindung zu AlloyDB her.
  • Initialisieren Sie die AlloyDB-Tabelle.
  • Initialisieren Sie ein Vector Store-Objekt.
  • Führen Sie das Migrationsskript aus, um die Daten einzufügen.

Kosten

In diesem Dokument verwenden Sie die folgenden kostenpflichtigen Komponenten von Google Cloud:

Mit dem Preisrechner können Sie eine Kostenschätzung für Ihre voraussichtliche Nutzung vornehmen. Neuen Google Cloud-Nutzern steht möglicherweise eine kostenlose Testversion zur Verfügung.

Nach Abschluss der in diesem Dokument beschriebenen Aufgaben können Sie weitere Kosten vermeiden, indem Sie die erstellten Ressourcen löschen. Weitere Informationen finden Sie unter Bereinigen.

Hinweis

Sie benötigen einen der folgenden LangChain-Vektorspeicher von Drittanbietern:

Abrechnung und erforderliche APIs aktivieren

  1. Wählen Sie in der Google Cloud Console auf der Seite der Projektauswahl einGoogle Cloud -Projekt aus oder erstellen Sie eines.

    Zur Projektauswahl

  2. Die Abrechnung für Ihr Google Cloud Projekt muss aktiviert sein.

  3. Aktivieren Sie die Cloud APIs, die zum Erstellen von AlloyDB for PostgreSQL und zum Herstellen einer Verbindung zu dieser Instanz erforderlich sind.

    APIs aktivieren

    1. Klicken Sie im Schritt Projekt bestätigen auf Weiter, um den Namen des Projekts zu bestätigen, an dem Sie Änderungen vornehmen möchten.
    2. Klicken Sie im Schritt APIs aktivieren auf Aktivieren, um Folgendes zu aktivieren:

      • AlloyDB API
      • Compute Engine API
      • Service Networking API

Erforderliche Rollen

Sie benötigen die folgenden IAM-Rollen (Identity and Access Management), um die Aufgaben in dieser Anleitung ausführen zu können. Sie ermöglichen das Erstellen von Tabellen und das Einfügen von Daten:

  • Inhaber (roles/owner) oder Mitbearbeiter (roles/editor)
  • Wenn der Nutzer kein Inhaber oder Bearbeiter ist, sind die folgenden IAM-Rollen und PostgreSQL-Berechtigungen erforderlich:

Wenn Sie sich mit der IAM-Authentifizierung an Ihrer Datenbank authentifizieren möchten, anstatt die integrierte Authentifizierung in dieser Anleitung zu verwenden, verwenden Sie das Notebook, in dem gezeigt wird, wie Sie mit AlloyDB for PostgreSQL Vektor-Embeddings mit der Klasse AlloyDBVectorStore speichern.

AlloyDB-Cluster und ‑Nutzer erstellen

  1. Erstellen Sie einen AlloyDB-Cluster und eine Instanz.
    • Aktivieren Sie die öffentliche IP-Adresse, um diese Anleitung von überall aus ausführen zu können. Wenn Sie private IP-Adressen verwenden, müssen Sie diese Anleitung in Ihrer VPC ausführen.
  2. Erstellen oder wählen Sie einen AlloyDB-Datenbanknutzer aus.
    • Wenn Sie die Instanz erstellen, wird ein postgres-Nutzer mit einem Passwort erstellt. Dieser Nutzer hat Superuser-Berechtigungen.
    • In dieser Anleitung wird die integrierte Authentifizierung verwendet, um den Authentifizierungsaufwand zu reduzieren. Die IAM-Authentifizierung ist mit der AlloyDBEngine möglich.

Codebeispiel abrufen

  1. Kopieren Sie das Codebeispiel aus GitHub, indem Sie das Repository klonen:

    git clone https://github.com/googleapis/langchain-google-alloydb-pg-python.git
  2. Rufen Sie das Verzeichnis migrations auf:

    cd langchain-google-alloydb-pg-python/samples/migrations

Daten aus einer vorhandenen Vektordatenbank extrahieren

  1. Client erstellen.

    Kiefernzapfen

    from pinecone import Pinecone  # type: ignore
    
    pinecone_client = Pinecone(api_key=pinecone_api_key)
    pinecone_index = pinecone_client.Index(pinecone_index_name)

    Weaviate

    import weaviate
    
    # For a locally running weaviate instance, use `weaviate.connect_to_local()`
    weaviate_client = weaviate.connect_to_weaviate_cloud(
        cluster_url=weaviate_cluster_url,
        auth_credentials=weaviate.auth.AuthApiKey(weaviate_api_key),
    )

    Chroma

    from langchain_chroma import Chroma
    
    chromadb_client = Chroma(
        collection_name=chromadb_collection_name,
        embedding_function=embeddings_service,
        persist_directory=chromadb_path,
    )

    Qdrant

    from qdrant_client import QdrantClient
    
    qdrant_client = QdrantClient(path=qdrant_path)
    

    Milvus

    milvus_client = MilvusClient(uri=milvus_uri)
  2. Alle Daten aus der Datenbank abrufen

    Kiefernzapfen

    Vektor-IDs aus dem Pinecone-Index abrufen:

    results = pinecone_index.list_paginated(
        prefix="", namespace=pinecone_namespace, limit=pinecone_batch_size
    )
    ids = [v.id for v in results.vectors]
    yield ids
    
    while results.pagination is not None:
        pagination_token = results.pagination.next
        results = pinecone_index.list_paginated(
            prefix="", pagination_token=pagination_token, limit=pinecone_batch_size
        )
    
        # Extract and yield the next batch of IDs
        ids = [v.id for v in results.vectors]
        yield ids

    Anschließend können Sie Einträge nach ID aus dem Pinecone-Index abrufen:

    # Iterate through the IDs and download their contents
    for ids in id_iterator:
        all_data = pinecone_index.fetch(ids=ids)
        ids = []
        embeddings = []
        contents = []
        metadatas = []
    
        # Process each vector in the current batch
        for doc in all_data["vectors"].values():
            ids.append(doc["id"])
            embeddings.append(doc["values"])
            contents.append(str(doc["metadata"]["text"]))
            del doc["metadata"]["text"]
            metadata = doc["metadata"]
            metadatas.append(metadata)
    
        # Yield the current batch of results
        yield ids, contents, embeddings, metadatas

    Weaviate

    # Iterate through the IDs and download their contents
    weaviate_collection = weaviate_client.collections.get(weaviate_collection_name)
    ids = []
    content = []
    embeddings = []
    metadatas = []
    
    for item in weaviate_collection.iterator(include_vector=True):
        ids.append(str(item.uuid))
        content.append(item.properties[weaviate_text_key])
        embeddings.append(item.vector["default"])
        del item.properties[weaviate_text_key]  # type: ignore
        metadatas.append(item.properties)
    
        if len(ids) >= weaviate_batch_size:
            # Yield the current batch of results
            yield ids, content, embeddings, metadatas
            # Reset lists to start a new batch
            ids = []
            content = []
            embeddings = []
            metadatas = []

    Chroma

    # Iterate through the IDs and download their contents
    offset = 0
    while True:
        docs = chromadb_client.get(
            include=["metadatas", "documents", "embeddings"],
            limit=chromadb_batch_size,
            offset=offset,
        )
    
        if len(docs["documents"]) == 0:
            break
    
        yield docs["ids"], docs["documents"], docs["embeddings"].tolist(), docs[
            "metadatas"
        ]
    
        offset += chromadb_batch_size
    

    Qdrant

    # Iterate through the IDs and download their contents
    offset = None
    while True:
        docs, offset = qdrant_client.scroll(
            collection_name=qdrant_collection_name,
            with_vectors=True,
            limit=qdrant_batch_size,
            offset=offset,
            with_payload=True,
        )
    
        ids: List[str] = []
        contents: List[Any] = []
        embeddings: List[List[float]] = []
        metadatas: List[Any] = []
    
        for doc in docs:
            if doc.payload and doc.vector:
                ids.append(str(doc.id))
                contents.append(doc.payload["page_content"])
                embeddings.append(doc.vector)  # type: ignore
                metadatas.append(doc.payload["metadata"])
    
        yield ids, contents, embeddings, metadatas
    
        if not offset:
            break
    

    Milvus

    # Iterate through the IDs and download their contents
    iterator = milvus_client.query_iterator(
        collection_name=milvus_collection_name,
        filter='pk >= "0"',
        output_fields=["pk", "text", "vector", "idv"],
        batch_size=milvus_batch_size,
    )
    
    while True:
        ids = []
        content = []
        embeddings = []
        metadatas = []
        page = iterator.next()
        if len(page) == 0:
            iterator.close()
            break
        for i in range(len(page)):
            doc = page[i]
            ids.append(doc["pk"])
            content.append(doc["text"])
            embeddings.append(doc["vector"])
            del doc["pk"]
            del doc["text"]
            del doc["vector"]
            metadatas.append(doc)
        yield ids, content, embeddings, metadatas

AlloyDB-Tabelle initialisieren

  1. Definiere den Einbettungsservice.

    Für die VectorStore-Benutzeroberfläche ist ein Einbettungsservice erforderlich. Bei diesem Workflow werden keine neuen Einbettungen generiert. Daher wird die Klasse FakeEmbeddings verwendet, um Kosten zu vermeiden.

    Kiefernzapfen

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)

    Weaviate

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)

    Chroma

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)

    Qdrant

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)

    Milvus

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)
  2. Bereiten Sie die AlloyDB-Tabelle vor.

    1. Stellen Sie eine Verbindung zu AlloyDB über eine öffentliche IP-Adresse her. Weitere Informationen finden Sie unter IP-Adresstyp angeben.

      Kiefernzapfen

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
      )

      Weaviate

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
      )

      Chroma

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
      )

      Qdrant

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
      )

      Milvus

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
      )
    2. Erstellen Sie eine Tabelle, in die Daten kopiert werden sollen, falls sie noch nicht vorhanden ist.

      Kiefernzapfen

      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
      )

      Weaviate

      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
      )
      

      Chroma

      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
      )

      Qdrant

      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
      )

      Milvus

      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
      )

Vektorspeicherobjekt initialisieren

Mit diesem Code werden der Spalte langchain_metadata zusätzliche Metadaten für die Vektor-Embeddings im JSON-Format hinzugefügt. Für eine effizientere Filterung sollten Sie diese Metadaten in separaten Spalten organisieren. Weitere Informationen finden Sie unter Benutzerdefinierten Vektor-Shop erstellen.

  1. Führen Sie den folgenden Befehl aus, um ein Vector Store-Objekt zu initialisieren:

    Kiefernzapfen

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )

    Weaviate

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )

    Chroma

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )

    Qdrant

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )

    Milvus

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )
  2. Fügen Sie Daten in die AlloyDB-Tabelle ein:

    Kiefernzapfen

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

    Weaviate

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

    Chroma

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

    Qdrant

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

    Milvus

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

Migrationsskript ausführen

  1. Richten Sie die Python-Umgebung ein.

  2. Installieren Sie die Beispielabhängigkeiten:

    pip install -r requirements.txt
  3. Beispielmigration ausführen

    Kiefernzapfen

    python migrate_pinecone_vectorstore_to_alloydb.py

    Ersetzen Sie vor dem Ausführen des Beispiels die folgenden Werte:

    • PINECONE_API_KEY: den Pinecone API-Schlüssel.
    • PINECONE_NAMESPACE: den Pinecone-Namespace.
    • PINECONE_INDEX_NAME: Der Name des Pinecone-Index.
    • PROJECT_ID: Projekt-ID.
    • REGION: die Region, in der der AlloyDB-Cluster bereitgestellt wird.
    • CLUSTER ist der Name des Clusters.
    • INSTANCE: der Name der Instanz.
    • DB_NAME: der Name der Datenbank
    • DB_USER: der Name des Datenbanknutzers
    • DB_PWD: das geheime Passwort für die Datenbank.

    Weaviate

    python migrate_weaviate_vectorstore_to_alloydb.py

    Ersetzen Sie vor dem Ausführen des Beispiels die folgenden Werte:

    • WEAVIATE_API_KEY: den Weaviate API-Schlüssel.
    • WEAVIATE_CLUSTER_URL: die Weaviate-Cluster-URL.
    • WEAVIATE_COLLECTION_NAME: der Name der Weaviate-Sammlung.
    • PROJECT_ID: Projekt-ID.
    • REGION: die Region, in der der AlloyDB-Cluster bereitgestellt wird.
    • CLUSTER ist der Name des Clusters.
    • INSTANCE: der Name der Instanz.
    • DB_NAME: der Name der Datenbank
    • DB_USER: der Name des Datenbanknutzers
    • DB_PWD: das geheime Passwort für die Datenbank.

    Chroma

    python migrate_chromadb_vectorstore_to_alloydb.py

    Ersetzen Sie vor dem Ausführen des Beispiels die folgenden Werte:

    • CHROMADB_PATH: Der Pfad zur Chroma-Datenbank.
    • CHROMADB_COLLECTION_NAME: Der Name der Chroma-Datenbanksammlung.
    • PROJECT_ID: Projekt-ID.
    • REGION: die Region, in der der AlloyDB-Cluster bereitgestellt wird.
    • CLUSTER ist der Name des Clusters.
    • INSTANCE: der Name der Instanz.
    • DB_NAME: der Name der Datenbank
    • DB_USER: der Name des Datenbanknutzers
    • DB_PWD: das geheime Passwort für die Datenbank.

    Qdrant

    python migrate_qdrant_vectorstore_to_alloydb.py

    Ersetzen Sie vor dem Ausführen des Beispiels die folgenden Werte:

    • QDRANT_PATH: der Pfad zur Qdrant-Datenbank.
    • QDRANT_COLLECTION_NAME: der Name der Qdrant-Sammlung.
    • PROJECT_ID: Projekt-ID.
    • REGION: die Region, in der der AlloyDB-Cluster bereitgestellt wird.
    • CLUSTER ist der Name des Clusters.
    • INSTANCE: der Name der Instanz.
    • DB_NAME: der Name der Datenbank
    • DB_USER: der Name des Datenbanknutzers
    • DB_PWD: das geheime Passwort für die Datenbank.

    Milvus

    python migrate_milvus_vectorstore_to_alloydb.py

    Ersetzen Sie vor dem Ausführen des Beispiels die folgenden Werte:

    • MILVUS_URI: den Milvus-URI.
    • MILVUS_COLLECTION_NAME: Der Name der Milvus-Sammlung.
    • PROJECT_ID: Projekt-ID.
    • REGION: die Region, in der der AlloyDB-Cluster bereitgestellt wird.
    • CLUSTER ist der Name des Clusters.
    • INSTANCE: der Name der Instanz.
    • DB_NAME: der Name der Datenbank
    • DB_USER: der Name des Datenbanknutzers
    • DB_PWD: das geheime Passwort für die Datenbank.

    Bei einer erfolgreichen Migration werden Protokolle wie die folgenden ohne Fehler ausgegeben:
    Migration completed, inserted all the batches of data to AlloyDB

  4. Öffnen Sie AlloyDB Studio, um die migrierten Daten aufzurufen. Weitere Informationen finden Sie unter Daten mit AlloyDB Studio verwalten.

Bereinigen

Damit Ihrem Google Cloud-Konto die in dieser Anleitung verwendeten Ressourcen nicht in Rechnung gestellt werden, löschen Sie entweder das Projekt, das die Ressourcen enthält, oder Sie behalten das Projekt und löschen die einzelnen Ressourcen.

  1. Rufen Sie in der Google Cloud Console die Seite Cluster auf.

    Zu den Clustern

  2. Klicken Sie in der Spalte Ressourcenname auf den Namen des Clusters, den Sie erstellt haben.

  3. Klicken Sie auf Cluster löschen.

  4. Geben Sie unter Cluster löschen den Namen Ihres Clusters ein, um zu bestätigen, dass Sie ihn löschen möchten.

  5. Klicken Sie auf Löschen.

    Wenn Sie beim Erstellen eines Clusters eine private Verbindung erstellt haben, löschen Sie sie:

  6. Rufen Sie in der Google Cloud Console die Seite „Networking“ auf und klicken Sie auf VPC-Netzwerk löschen.

Nächste Schritte