ベクトル データベースから AlloyDB にデータを移行する


このチュートリアルでは、LangChain ベクトル ストアを使用して、サードパーティのベクトル データベースから AlloyDB for PostgreSQL にデータを移行する方法について説明します。サポートされているベクトル データベースは次のとおりです。

このチュートリアルは、 Google Cloud、AlloyDB、非同期 Python プログラミングに精通していることを前提としています。

目標

このチュートリアルでは、次の方法を説明します。

  • 既存のベクトル データベースからデータを抽出します。
  • AlloyDB に接続します。
  • AlloyDB テーブルを初期化します。
  • ベクトルストア オブジェクトを初期化します。
  • 移行スクリプトを実行してデータを挿入します。

料金

このドキュメントでは、Google Cloud の次の課金対象のコンポーネントを使用します。

料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。 新しい Google Cloud ユーザーは無料トライアルをご利用いただける場合があります。

このドキュメントに記載されているタスクの完了後、作成したリソースを削除すると、それ以上の請求は発生しません。詳細については、クリーンアップをご覧ください。

始める前に

次のいずれかの LangChain サードパーティ データベース ベクトルストアがあることを確認します。

課金と必要な API を有効にする

  1. Google Cloud コンソールの [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。

    プロジェクト セレクタに移動

  2. Google Cloud プロジェクトに対して課金が有効になっていることを確認します

  3. AlloyDB for PostgreSQL の作成と接続に必要な Cloud API を有効にします。

    API を有効にする

    1. [プロジェクトを確認] の手順で、[次へ] をクリックして、変更するプロジェクトの名前を確認します。
    2. [API を有効にする] の手順で、[有効にする] をクリックして、次の機能を有効にします。

      • AlloyDB API
      • Compute Engine API
      • Service Networking API

必要なロール

このチュートリアルのタスクを完了するために必要な権限を取得するには、テーブルの作成とデータ挿入を許可する次の Identity and Access Management(IAM)ロールが必要です。

  • オーナー(roles/owner)または編集者(roles/editor
  • ユーザーがオーナーまたは編集者でない場合は、次の IAM ロールと PostgreSQL 権限が必要です。

このチュートリアルの組み込み認証ではなく、IAM 認証を使用してデータベースを認証する場合は、AlloyDB for PostgreSQL を使用して AlloyDBVectorStore クラスでベクトル エンベディングを保存する方法を示すノートブックを使用します。

AlloyDB クラスタとユーザーを作成する

  1. AlloyDB クラスタとインスタンスを作成する
    • パブリック IP を有効にして、どこからでもこのチュートリアルを実行できるようにします。プライベート IP を使用している場合は、VPC 内からこのチュートリアルを実行する必要があります。
  2. AlloyDB データベース ユーザーを作成または選択します
    • インスタンスを作成すると、パスワード付きの postgres ユーザーが作成されます。このユーザーにはスーパーユーザー権限があります。
    • このチュートリアルでは、組み込みの認証を使用して、認証の煩わしさを軽減します。IAM 認証は AlloyDBEngine を使用して行うことができます。

コードサンプルを取得する

  1. リポジトリのクローンを作成して、GitHub からコードサンプルをコピーします。

    git clone https://github.com/googleapis/langchain-google-alloydb-pg-python.git
  2. migrations ディレクトリに移動します。

    cd langchain-google-alloydb-pg-python/samples/migrations

既存のベクトル データベースからデータを抽出する

  1. クライアントを作成します。

    Pinecone

    from pinecone import Pinecone  # type: ignore
    
    pinecone_client = Pinecone(api_key=pinecone_api_key)
    pinecone_index = pinecone_client.Index(pinecone_index_name)

    Weaviate

    import weaviate
    
    # For a locally running weaviate instance, use `weaviate.connect_to_local()`
    weaviate_client = weaviate.connect_to_weaviate_cloud(
        cluster_url=weaviate_cluster_url,
        auth_credentials=weaviate.auth.AuthApiKey(weaviate_api_key),
    )

    クロマ

    from langchain_chroma import Chroma
    
    chromadb_client = Chroma(
        collection_name=chromadb_collection_name,
        embedding_function=embeddings_service,
        persist_directory=chromadb_path,
    )

    Qdrant

    from qdrant_client import QdrantClient
    
    qdrant_client = QdrantClient(path=qdrant_path)
    

    Milvus

    milvus_client = MilvusClient(uri=milvus_uri)
  2. データベースからすべてのデータを取得します。

    Pinecone

    Pinecone インデックスからベクトル ID を取得します。

    results = pinecone_index.list_paginated(
        prefix="", namespace=pinecone_namespace, limit=pinecone_batch_size
    )
    ids = [v.id for v in results.vectors]
    yield ids
    
    while results.pagination is not None:
        pagination_token = results.pagination.next
        results = pinecone_index.list_paginated(
            prefix="", pagination_token=pagination_token, limit=pinecone_batch_size
        )
    
        # Extract and yield the next batch of IDs
        ids = [v.id for v in results.vectors]
        yield ids

    次に、Pinecone インデックスから ID でレコードを取得します。

    # Iterate through the IDs and download their contents
    for ids in id_iterator:
        all_data = pinecone_index.fetch(ids=ids, namespace=pinecone_namespace)
        ids = []
        embeddings = []
        contents = []
        metadatas = []
    
        # Process each vector in the current batch
        for doc in all_data["vectors"].values():
            ids.append(doc["id"])
            embeddings.append(doc["values"])
            contents.append(str(doc["metadata"]["text"]))
            del doc["metadata"]["text"]
            metadata = doc["metadata"]
            metadatas.append(metadata)
    
        # Yield the current batch of results
        yield ids, contents, embeddings, metadatas

    Weaviate

    # Iterate through the IDs and download their contents
    weaviate_collection = weaviate_client.collections.get(weaviate_collection_name)
    ids: list[str] = []
    content: list[Any] = []
    embeddings: list[list[float]] = []
    metadatas: list[Any] = []
    
    for item in weaviate_collection.iterator(include_vector=True):
        ids.append(str(item.uuid))
        content.append(item.properties[weaviate_text_key])
        embeddings.append(item.vector["default"])  # type: ignore
        del item.properties[weaviate_text_key]  # type: ignore
        metadatas.append(item.properties)
    
        if len(ids) >= weaviate_batch_size:
            # Yield the current batch of results
            yield ids, content, embeddings, metadatas
            # Reset lists to start a new batch
            ids = []
            content = []
            embeddings = []
            metadatas = []

    クロマ

    # Iterate through the IDs and download their contents
    offset = 0
    while True:
        docs = chromadb_client.get(
            include=["metadatas", "documents", "embeddings"],
            limit=chromadb_batch_size,
            offset=offset,
        )
    
        if len(docs["documents"]) == 0:
            break
    
        yield docs["ids"], docs["documents"], docs["embeddings"].tolist(), docs[
            "metadatas"
        ]
    
        offset += chromadb_batch_size
    

    Qdrant

    # Iterate through the IDs and download their contents
    offset = None
    while True:
        docs, offset = qdrant_client.scroll(
            collection_name=qdrant_collection_name,
            with_vectors=True,
            limit=qdrant_batch_size,
            offset=offset,
            with_payload=True,
        )
    
        ids: List[str] = []
        contents: List[Any] = []
        embeddings: List[List[float]] = []
        metadatas: List[Any] = []
    
        for doc in docs:
            if doc.payload and doc.vector:
                ids.append(str(doc.id))
                contents.append(doc.payload["page_content"])
                embeddings.append(doc.vector)  # type: ignore
                metadatas.append(doc.payload["metadata"])
    
        yield ids, contents, embeddings, metadatas
    
        if not offset:
            break
    

    Milvus

    # Iterate through the IDs and download their contents
    iterator = milvus_client.query_iterator(
        collection_name=milvus_collection_name,
        filter='pk >= "0"',
        output_fields=["pk", "text", "vector", "idv"],
        batch_size=milvus_batch_size,
    )
    
    while True:
        ids = []
        content = []
        embeddings = []
        metadatas = []
        page = iterator.next()
        if len(page) == 0:
            iterator.close()
            break
        for i in range(len(page)):
            doc = page[i]
            ids.append(doc["pk"])
            content.append(doc["text"])
            embeddings.append(doc["vector"])
            del doc["pk"]
            del doc["text"]
            del doc["vector"]
            metadatas.append(doc)
        yield ids, content, embeddings, metadatas

AlloyDB テーブルを初期化する

  1. エンベディング サービスを定義します。

    VectorStore インターフェースにはエンベディング サービスが必要です。このワークフローは新しいエンベディングを生成しないため、FakeEmbeddings クラスを使用して費用を回避します。

    Pinecone

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)

    Weaviate

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)

    クロマ

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)

    Qdrant

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)

    Milvus

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)
  2. AlloyDB テーブルを準備します。

    1. パブリック IP 接続を使用して AlloyDB に接続します。詳細については、IP アドレスタイプの指定をご覧ください。

      Pinecone

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
          ip_type=IPTypes.PUBLIC,
      )

      Weaviate

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
          ip_type=IPTypes.PUBLIC,
      )

      クロマ

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
          ip_type=IPTypes.PUBLIC,
      )

      Qdrant

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
          ip_type=IPTypes.PUBLIC,
      )

      Milvus

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
          ip_type=IPTypes.PUBLIC,
      )
    2. データのコピー先となるテーブルが存在しない場合は、作成します。

      Pinecone

      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
      )

      Weaviate

      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
      )
      

      クロマ

      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
      )

      Qdrant

      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
      )

      Milvus

      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
      )

ベクトルストア オブジェクトを初期化する

このコードは、JSON 形式でベクトル エンベディング メタデータを langchain_metadata 列に追加します。フィルタリングを効率化するには、このメタデータを個別の列に整理します。詳細については、カスタム ベクトル ストアを作成するをご覧ください。

  1. ベクトル ストア オブジェクトを初期化するには、次のコマンドを実行します。

    Pinecone

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )

    Weaviate

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )

    クロマ

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )

    Qdrant

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )

    Milvus

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )
  2. AlloyDB テーブルにデータを挿入します。

    Pinecone

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

    Weaviate

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

    クロマ

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

    Qdrant

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

    Milvus

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

移行スクリプトを実行する

  1. Python 環境を設定する

  2. サンプルの依存関係をインストールします。

    pip install -r requirements.txt
  3. サンプル移行を実行します。

    Pinecone

    python migrate_pinecone_vectorstore_to_alloydb.py

    サンプルを実行する前に、次のように置き換えます。

    • PINECONE_API_KEY: Pinecone API キー。
    • PINECONE_NAMESPACE: Pinecone Namespace。
    • PINECONE_INDEX_NAME: Pinecone インデックスの名前。
    • PROJECT_ID: プロジェクト ID。
    • REGION: AlloyDB クラスタがデプロイされているリージョン。
    • CLUSTER: クラスタの名前。
    • INSTANCE: インスタンスの名前。
    • DB_NAME: データベースの名前。
    • DB_USER: データベース ユーザーの名前。
    • DB_PWD: データベースのシークレットのパスワード。

    Weaviate

    python migrate_weaviate_vectorstore_to_alloydb.py

    サンプルを実行する前に、次のように置き換えます。

    • WEAVIATE_API_KEY: Weaviate API キー。
    • WEAVIATE_CLUSTER_URL: Weaviate クラスタの URL。
    • WEAVIATE_COLLECTION_NAME: Weaviate コレクション名。
    • PROJECT_ID: プロジェクト ID。
    • REGION: AlloyDB クラスタがデプロイされているリージョン。
    • CLUSTER: クラスタの名前。
    • INSTANCE: インスタンスの名前。
    • DB_NAME: データベースの名前。
    • DB_USER: データベース ユーザーの名前。
    • DB_PWD: データベースのシークレットのパスワード。

    クロマ

    python migrate_chromadb_vectorstore_to_alloydb.py

    サンプルを実行する前に、次のように置き換えます。

    • CHROMADB_PATH: Chroma データベースのパス。
    • CHROMADB_COLLECTION_NAME: Chroma データベース コレクションの名前。
    • PROJECT_ID: プロジェクト ID。
    • REGION: AlloyDB クラスタがデプロイされているリージョン。
    • CLUSTER: クラスタの名前。
    • INSTANCE: インスタンスの名前。
    • DB_NAME: データベースの名前。
    • DB_USER: データベース ユーザーの名前。
    • DB_PWD: データベースのシークレットのパスワード。

    Qdrant

    python migrate_qdrant_vectorstore_to_alloydb.py

    サンプルを実行する前に、次のように置き換えます。

    • QDRANT_PATH: Qdrant データベースのパス。
    • QDRANT_COLLECTION_NAME: Qdrant コレクションの名前。
    • PROJECT_ID: プロジェクト ID。
    • REGION: AlloyDB クラスタがデプロイされているリージョン。
    • CLUSTER: クラスタの名前。
    • INSTANCE: インスタンスの名前。
    • DB_NAME: データベースの名前。
    • DB_USER: データベース ユーザーの名前。
    • DB_PWD: データベースのシークレットのパスワード。

    Milvus

    python migrate_milvus_vectorstore_to_alloydb.py

    サンプルを実行する前に、次のように置き換えます。

    • MILVUS_URI: Milvus URI。
    • MILVUS_COLLECTION_NAME: Milvus コレクションの名前。
    • PROJECT_ID: プロジェクト ID。
    • REGION: AlloyDB クラスタがデプロイされているリージョン。
    • CLUSTER: クラスタの名前。
    • INSTANCE: インスタンスの名前。
    • DB_NAME: データベースの名前。
    • DB_USER: データベース ユーザーの名前。
    • DB_PWD: データベースのシークレットのパスワード。

    移行が成功すると、エラーなしで次のようなログが出力されます。
    Migration completed, inserted all the batches of data to AlloyDB

  4. AlloyDB Studio を開いて、移行されたデータを表示します。詳細については、AlloyDB Studio を使用してデータを管理するをご覧ください。

クリーンアップ

このチュートリアルで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、リソースを含むプロジェクトを削除するか、プロジェクトを維持して個々のリソースを削除します。

  1. Google Cloud コンソール で、[クラスタ] ページに移動します。

    クラスタに移動

  2. [リソース名] 列で、作成したクラスタの名前をクリックします。

  3. [クラスタを削除] をクリックします。

  4. [クラスタを削除] で、クラスタの名前を入力して、クラスタを削除することを確認します。

  5. [削除] をクリックします。

    クラスタを作成するときにプライベート接続を作成した場合は、プライベート接続を削除します。

  6. Google Cloud コンソールの [ネットワーキング] ページに移動し、[VPC ネットワークを削除] をクリックします。

次のステップ