ベクトル データベースから AlloyDB にデータを移行する


このチュートリアルでは、LangChain VectorStore を使用して、サードパーティのベクトル データベースから AlloyDB for PostgreSQL にデータを移行する方法について説明します。このチュートリアルでは、サードパーティのベクトル データベースのデータが LangChain VectorStore インテグレーションを使用して作成されていることを前提としています。LangChain を使用せずに次のいずれかのデータベースに情報を格納する場合は、データのスキーマに合わせて以下のスクリプトを編集する必要があります。サポートされているベクトル データベースは次のとおりです。

このチュートリアルは、 Google Cloud、AlloyDB、非同期 Python プログラミングに精通していることを前提としています。

目標

このチュートリアルでは、次の方法を説明します。

  • 既存のベクトル データベースからデータを抽出します。
  • AlloyDB に接続します。
  • AlloyDB テーブルを初期化します。
  • ベクトルストア オブジェクトを初期化します。
  • 移行スクリプトを実行してデータを挿入します。

費用

このドキュメントでは、課金対象である次の Google Cloudコンポーネントを使用します。

You might be eligible for a free trial cluster. For more information, see AlloyDB free trial clusters overview.

料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。

新規の Google Cloud ユーザーは無料トライアルをご利用いただける場合があります。

このドキュメントに記載されているタスクの完了後、作成したリソースを削除すると、それ以上の請求は発生しません。詳細については、クリーンアップをご覧ください。

始める前に

次のいずれかの LangChain サードパーティ データベース ベクトルストアがあることを確認します。

課金と必要な API を有効にする

  1. Google Cloud コンソールのプロジェクト セレクタページで、Google Cloud プロジェクトを選択または作成します。

    プロジェクト セレクタに移動

  2. Google Cloud プロジェクトに対して課金が有効になっていることを確認します

  3. AlloyDB for PostgreSQL の作成と接続に必要な Cloud APIs を有効にします。

    API を有効にする

    1. [プロジェクトを確認] の手順で、[次へ] をクリックして、変更するプロジェクトの名前を確認します。
    2. [API を有効にする] の手順で、[有効にする] をクリックして、次の機能を有効にします。

      • AlloyDB API
      • Compute Engine API
      • Service Networking API

必要なロール

このチュートリアルのタスクを完了するために必要な権限を取得するには、テーブルの作成とデータ挿入を許可する次の Identity and Access Management(IAM)ロールが必要です。

  • オーナー(roles/owner)または編集者(roles/editor
  • ユーザーがオーナーまたは編集者でない場合は、次の IAM ロールと PostgreSQL 権限が必要です。

このチュートリアルの組み込み認証ではなく、IAM 認証を使用してデータベースを認証する場合は、AlloyDB for PostgreSQL を使用して AlloyDBVectorStore クラスでベクトル エンベディングを保存する方法を示すノートブックを使用します。

AlloyDB クラスタとユーザーを作成する

  1. AlloyDB クラスタとインスタンスを作成します
    • パブリック IP を有効にして、どこからでもこのチュートリアルを実行できるようにします。プライベート IP を使用している場合は、VPC 内からこのチュートリアルを実行する必要があります。
  2. AlloyDB データベース ユーザーを作成または選択します
    • インスタンスを作成すると、パスワード付きの postgres ユーザーが作成されます。このユーザーにはスーパーユーザー権限があります。
    • このチュートリアルでは、組み込みの認証を使用して、認証の煩わしさを軽減します。IAM 認証は AlloyDBEngine を使用して行うことができます。

コードサンプルを取得する

  1. リポジトリのクローンを作成して、GitHub からコードサンプルをコピーします。

    git clone https://github.com/googleapis/langchain-google-alloydb-pg-python.git
  2. migrations ディレクトリに移動します。

    cd langchain-google-alloydb-pg-python/samples/migrations

既存のベクトル データベースからデータを抽出する

  1. クライアントを作成します。

    Pinecone

    from pinecone import Pinecone  # type: ignore
    
    pinecone_client = Pinecone(api_key=pinecone_api_key)
    pinecone_index = pinecone_client.Index(pinecone_index_name)

    Weaviate

    import weaviate
    
    # For a locally running weaviate instance, use `weaviate.connect_to_local()`
    weaviate_client = weaviate.connect_to_weaviate_cloud(
        cluster_url=weaviate_cluster_url,
        auth_credentials=weaviate.auth.AuthApiKey(weaviate_api_key),
    )

    Chroma

    from langchain_chroma import Chroma
    
    chromadb_client = Chroma(
        collection_name=chromadb_collection_name,
        embedding_function=embeddings_service,
        persist_directory=chromadb_path,
    )

    Qdrant

    from qdrant_client import QdrantClient
    
    qdrant_client = QdrantClient(path=qdrant_path)
    

    Milvus

    milvus_client = MilvusClient(uri=milvus_uri)
  2. データベースからすべてのデータを取得します。

    Pinecone

    Pinecone インデックスからベクトル ID を取得します。

    results = pinecone_index.list_paginated(
        prefix="", namespace=pinecone_namespace, limit=pinecone_batch_size
    )
    ids = [v.id for v in results.vectors]
    if ids:  # Prevents yielding an empty list.
        yield ids
    
    # Check BOTH pagination and pagination.next
    while results.pagination is not None and results.pagination.get("next") is not None:
        pagination_token = results.pagination.get("next")
        results = pinecone_index.list_paginated(
            prefix="",
            pagination_token=pagination_token,
            namespace=pinecone_namespace,
            limit=pinecone_batch_size,
        )
    
        # Extract and yield the next batch of IDs
        ids = [v.id for v in results.vectors]
        if ids:  # Prevents yielding an empty list.
            yield ids

    次に、Pinecone インデックスから ID でレコードを取得します。

    import uuid
    
    # Iterate through the IDs and download their contents
    for ids in id_iterator:
        all_data = pinecone_index.fetch(ids=ids, namespace=pinecone_namespace)
        ids = []
        embeddings = []
        contents = []
        metadatas = []
    
        # Process each vector in the current batch
        for doc in all_data["vectors"].values():
            # You might need to update this data translation logic according to one or more of your field names
            if pinecone_id_column_name in doc:
                # pinecone_id_column_name stores the unqiue identifier for the content
                ids.append(doc[pinecone_id_column_name])
            else:
                # Generate a uuid if pinecone_id_column_name is missing in source
                ids.append(str(uuid.uuid4()))
            # values is the vector embedding of the content
            embeddings.append(doc["values"])
            # Check if pinecone_content_column_name exists in metadata before accessing
            if pinecone_content_column_name in doc.metadata:
                # pinecone_content_column_name stores the content which was encoded
                contents.append(str(doc.metadata[pinecone_content_column_name]))
                # Remove pinecone_content_column_name after processing
                del doc.metadata[pinecone_content_column_name]
            else:
                # Handle the missing pinecone_content_column_name field appropriately
                contents.append("")
            # metadata is the additional context
            metadatas.append(doc["metadata"])
    
        # Yield the current batch of results
        yield ids, contents, embeddings, metadatas

    Weaviate

    # Iterate through the IDs and download their contents
    weaviate_collection = weaviate_client.collections.get(weaviate_collection_name)
    ids: list[str] = []
    content: list[Any] = []
    embeddings: list[list[float]] = []
    metadatas: list[Any] = []
    
    for item in weaviate_collection.iterator(include_vector=True):
        # You might need to update this data translation logic according to one or more of your field names
        # uuid is the unqiue identifier for the content
        ids.append(str(item.uuid))
        # weaviate_text_key is the content which was encoded
        content.append(item.properties[weaviate_text_key])
        # vector is the vector embedding of the content
        embeddings.append(item.vector["default"])  # type: ignore
        del item.properties[weaviate_text_key]  # type: ignore
        # properties is the additional context
        metadatas.append(item.properties)
    
        if len(ids) >= weaviate_batch_size:
            # Yield the current batch of results
            yield ids, content, embeddings, metadatas
            # Reset lists to start a new batch
            ids = []
            content = []
            embeddings = []
            metadatas = []

    Chroma

    # Iterate through the IDs and download their contents
    offset = 0
    while True:
        # You might need to update this data translation logic according to one or more of your field names
        # documents is the content which was encoded
        # embeddings is the vector embedding of the content
        # metadatas is the additional context
        docs = chromadb_client.get(
            include=["metadatas", "documents", "embeddings"],
            limit=chromadb_batch_size,
            offset=offset,
        )
    
        if len(docs["documents"]) == 0:
            break
    
        # ids is the unqiue identifier for the content
        yield docs["ids"], docs["documents"], docs["embeddings"].tolist(), docs[
            "metadatas"
        ]
    
        offset += chromadb_batch_size
    

    Qdrant

    # Iterate through the IDs and download their contents
    offset = None
    while True:
        docs, offset = qdrant_client.scroll(
            collection_name=qdrant_collection_name,
            with_vectors=True,
            limit=qdrant_batch_size,
            offset=offset,
            with_payload=True,
        )
    
        ids: List[str] = []
        contents: List[Any] = []
        embeddings: List[List[float]] = []
        metadatas: List[Any] = []
    
        for doc in docs:
            if doc.payload and doc.vector:
                # You might need to update this data translation logic according to one or more of your field names
                # id is the unqiue identifier for the content
                ids.append(str(doc.id))
                # page_content is the content which was encoded
                contents.append(doc.payload["page_content"])
                # vector is the vector embedding of the content
                embeddings.append(doc.vector)  # type: ignore
                # metatdata is the additional context
                metadatas.append(doc.payload["metadata"])
    
        yield ids, contents, embeddings, metadatas
    
        if not offset:
            break
    

    Milvus

    # Iterate through the IDs and download their contents
    iterator = milvus_client.query_iterator(
        collection_name=milvus_collection_name,
        filter='pk >= "0"',
        output_fields=["pk", "text", "vector", "idv"],
        batch_size=milvus_batch_size,
    )
    
    while True:
        ids = []
        content = []
        embeddings = []
        metadatas = []
        page = iterator.next()
        if len(page) == 0:
            iterator.close()
            break
        for i in range(len(page)):
            # You might need to update this data translation logic according to one or more of your field names
            doc = page[i]
            # pk is the unqiue identifier for the content
            ids.append(doc["pk"])
            # text is the content which was encoded
            content.append(doc["text"])
            # vector is the vector embedding of the content
            embeddings.append(doc["vector"])
            del doc["pk"]
            del doc["text"]
            del doc["vector"]
            # doc is the additional context
            metadatas.append(doc)
        yield ids, content, embeddings, metadatas

AlloyDB テーブルを初期化する

  1. エンベディング サービスを定義します。

    VectorStore インターフェースにはエンベディング サービスが必要です。このワークフローでは新しいエンベディングを生成しないため、FakeEmbeddings クラスを使用して費用を回避します。

    Pinecone

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)

    Weaviate

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)

    Chroma

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)

    Qdrant

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)

    Milvus

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)
  2. AlloyDB テーブルを準備します。

    1. パブリック IP 接続を使用して AlloyDB に接続します。詳細については、IP アドレスタイプの指定をご覧ください。

      Pinecone

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
          ip_type=IPTypes.PUBLIC,  # Optionally use IPTypes.PRIVATE
      )

      Weaviate

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
          ip_type=IPTypes.PUBLIC,
      )

      Chroma

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
          ip_type=IPTypes.PUBLIC,
      )

      Qdrant

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
          ip_type=IPTypes.PUBLIC,
      )

      Milvus

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
          ip_type=IPTypes.PUBLIC,
      )
    2. データのコピー先となるテーブルが存在しない場合は、作成します。

      Pinecone

      from langchain_google_alloydb_pg import Column
      
      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
          # Customize the ID column types if not using the UUID data type
          # id_column=Column("langchain_id", "TEXT"),  # Default is Column("langchain_id", "UUID")
          # overwrite_existing=True,  # Drop the old table and Create a new vector store table
      )

      Weaviate

      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
          # Customize the ID column types with `id_column` if not using the UUID data type
      )
      

      Chroma

      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
          # Customize the ID column types with `id_column` if not using the UUID data type
      )

      Qdrant

      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
          # Customize the ID column types with `id_column` if not using the UUID data type
      )

      Milvus

      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
          # Customize the ID column types with `id_column` if not using the UUID data type
      )

ベクトルストア オブジェクトを初期化する

このコードは、JSON 形式でベクトル エンベディング メタデータを langchain_metadata 列に追加します。フィルタリングを効率化するには、このメタデータを個別の列に整理します。詳細については、カスタム ベクトル ストアを作成するをご覧ください。

  1. ベクトル ストア オブジェクトを初期化するには、次のコマンドを実行します。

    Pinecone

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )

    Weaviate

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )

    Chroma

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )

    Qdrant

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )

    Milvus

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )
  2. AlloyDB テーブルにデータを挿入します。

    Pinecone

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

    Weaviate

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

    Chroma

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

    Qdrant

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

    Milvus

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

移行スクリプトを実行する

  1. Python 環境を設定します

  2. サンプルの依存関係をインストールします。

    pip install -r requirements.txt
  3. サンプル移行を実行します。

    Pinecone

    python migrate_pinecone_vectorstore_to_alloydb.py

    サンプルを実行する前に、次のように置き換えます。

    • PINECONE_API_KEY: Pinecone API キー。
    • PINECONE_NAMESPACE: Pinecone 名前空間。
    • PINECONE_INDEX_NAME: Pinecone インデックスの名前。
    • PROJECT_ID: プロジェクト ID。
    • REGION: AlloyDB クラスタがデプロイされているリージョン。
    • CLUSTER: クラスタの名前。
    • INSTANCE: インスタンスの名前。
    • DB_NAME: データベースの名前。
    • DB_USER: データベース ユーザーの名前。
    • DB_PWD: データベースの Secret のパスワード。

    Weaviate

    python migrate_weaviate_vectorstore_to_alloydb.py

    サンプルを実行する前に、次のように置き換えます。

    • WEAVIATE_API_KEY: Weaviate API キー。
    • WEAVIATE_CLUSTER_URL: Weaviate クラスタの URL。
    • WEAVIATE_COLLECTION_NAME: Weaviate コレクション名。
    • PROJECT_ID: プロジェクト ID。
    • REGION: AlloyDB クラスタがデプロイされているリージョン。
    • CLUSTER: クラスタの名前。
    • INSTANCE: インスタンスの名前。
    • DB_NAME: データベースの名前。
    • DB_USER: データベース ユーザーの名前。
    • DB_PWD: データベースの Secret のパスワード。

    Chroma

    python migrate_chromadb_vectorstore_to_alloydb.py

    サンプルを実行する前に、次のように置き換えます。

    • CHROMADB_PATH: Chroma データベースのパス。
    • CHROMADB_COLLECTION_NAME: Chroma データベース コレクションの名前。
    • PROJECT_ID: プロジェクト ID。
    • REGION: AlloyDB クラスタがデプロイされているリージョン。
    • CLUSTER: クラスタの名前。
    • INSTANCE: インスタンスの名前。
    • DB_NAME: データベースの名前。
    • DB_USER: データベース ユーザーの名前。
    • DB_PWD: データベースの Secret のパスワード。

    Qdrant

    python migrate_qdrant_vectorstore_to_alloydb.py

    サンプルを実行する前に、次のように置き換えます。

    • QDRANT_PATH: Qdrant データベースのパス。
    • QDRANT_COLLECTION_NAME: Qdrant コレクションの名前。
    • PROJECT_ID: プロジェクト ID。
    • REGION: AlloyDB クラスタがデプロイされているリージョン。
    • CLUSTER: クラスタの名前。
    • INSTANCE: インスタンスの名前。
    • DB_NAME: データベースの名前。
    • DB_USER: データベース ユーザーの名前。
    • DB_PWD: データベースの Secret のパスワード。

    Milvus

    python migrate_milvus_vectorstore_to_alloydb.py

    サンプルを実行する前に、次のように置き換えます。

    • MILVUS_URI: Milvus の URI。
    • MILVUS_COLLECTION_NAME: Milvus コレクションの名前。
    • PROJECT_ID: プロジェクト ID。
    • REGION: AlloyDB クラスタがデプロイされているリージョン。
    • CLUSTER: クラスタの名前。
    • INSTANCE: インスタンスの名前。
    • DB_NAME: データベースの名前。
    • DB_USER: データベース ユーザーの名前。
    • DB_PWD: データベースの Secret のパスワード。

    移行が成功すると、エラーなしで次のようなログが出力されます。
    Migration completed, inserted all the batches of data to AlloyDB

  4. AlloyDB Studio を開いて、移行されたデータを表示します。詳細については、AlloyDB Studio を使用してデータを管理するをご覧ください。

クリーンアップ

このチュートリアルで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、リソースを含むプロジェクトを削除するか、プロジェクトを維持して個々のリソースを削除します。

  1. Google Cloud コンソールで、[クラスタ] ページに移動します。

    [クラスタ] に移動

  2. [リソース名] 列で、作成したクラスタの名前をクリックします。

  3. [クラスタを削除] をクリックします。

  4. [クラスタを削除] でクラスタの名前を入力して、クラスタの削除を確認します。

  5. [削除] をクリックします。

    クラスタを作成するときにプライベート接続を作成した場合は、プライベート接続を削除します。

  6. Google Cloud コンソールの [ネットワーキング] ページに移動し、[VPC ネットワークの削除] をクリックします。

次のステップ