벡터 데이터베이스에서 AlloyDB로 데이터 마이그레이션


이 튜토리얼에서는 LangChain 벡터 저장소를 사용하여 서드 파티 벡터 데이터베이스에서 PostgreSQL용 AlloyDB로 데이터를 마이그레이션하는 방법을 설명합니다. 다음 벡터 데이터베이스가 지원됩니다.

이 튜토리얼에서는 사용자가 Google Cloud, AlloyDB, 비동기식 Python 프로그래밍에 익숙하다고 가정합니다.

목표

이 튜토리얼에서는 다음 작업을 처리하는 방법을 보여줍니다.

  • 기존 벡터 데이터베이스에서 데이터를 추출합니다.
  • AlloyDB에 연결합니다.
  • AlloyDB 테이블을 초기화합니다.
  • 벡터 저장소 객체를 초기화합니다.
  • 이전 스크립트를 실행하여 데이터를 삽입합니다.

비용

이 문서에서는 비용이 청구될 수 있는 다음과 같은 Google Cloud 구성요소를 사용합니다.

프로젝트 사용량을 기준으로 예상 비용을 산출하려면 가격 계산기를 사용하세요. Google Cloud를 처음 사용하는 사용자는 무료 체험판을 사용할 수 있습니다.

이 문서에 설명된 태스크를 완료했으면 만든 리소스를 삭제하여 청구가 계속되는 것을 방지할 수 있습니다. 자세한 내용은 삭제를 참조하세요.

시작하기 전에

다음 LangChain 서드 파티 데이터베이스 벡터 저장소 중 하나가 있는지 확인합니다.

결제 및 필수 API 사용 설정

  1. Google Cloud 콘솔의 프로젝트 선택기 페이지에서Google Cloud 프로젝트를 선택하거나 만듭니다.

    프로젝트 선택기로 이동

  2. 프로젝트에 결제가 사용 설정되어 있는지 확인합니다 Google Cloud .

  3. PostgreSQL용 AlloyDB를 만들고 연결하는 데 필요한 Cloud API를 사용 설정합니다.

    API 사용 설정

    1. 프로젝트 확인 단계에서 다음을 클릭하여 변경할 프로젝트의 이름을 확인합니다.
    2. API 사용 설정 단계에서 사용 설정을 클릭하여 다음을 사용 설정합니다.

      • AlloyDB API
      • Compute Engine API
      • Service Networking API

필요한 역할

이 튜토리얼의 태스크를 완료하는 데 필요한 권한을 얻으려면 테이블 생성 및 데이터 삽입을 허용하는 다음 Identity and Access Management (IAM) 역할을 보유해야 합니다.

  • 소유자 (roles/owner) 또는 편집자 (roles/editor)
  • 사용자가 소유자 또는 편집자가 아닌 경우 다음 IAM 역할 및 PostgreSQL 권한이 필요합니다.

이 튜토리얼의 기본 제공 인증을 사용하는 대신 IAM 인증을 사용하여 데이터베이스에 인증하려면 PostgreSQL용 AlloyDB를 사용하여 AlloyDBVectorStore 클래스로 벡터 임베딩을 저장하는 방법을 보여주는 노트북을 사용하세요.

AlloyDB 클러스터 및 사용자 만들기

  1. AlloyDB 클러스터 및 인스턴스 만들기
    • 어디서나 이 튜토리얼을 실행하려면 공개 IP를 사용 설정하세요. 비공개 IP를 사용하는 경우 VPC 내에서 이 튜토리얼을 실행해야 합니다.
  2. AlloyDB 데이터베이스 사용자를 만들거나 선택합니다.
    • 인스턴스를 만들면 비밀번호가 있는 postgres 사용자가 생성됩니다. 이 사용자에게 최고 관리자 권한이 있습니다.
    • 이 튜토리얼에서는 기본 제공 인증을 사용하여 인증 관련 불편을 줄입니다. AlloyDBEngine을 사용하여 IAM 인증을 실행할 수 있습니다.

코드 샘플 검색

  1. 저장소를 클론하여 GitHub에서 코드 샘플을 복사합니다.

    git clone https://github.com/googleapis/langchain-google-alloydb-pg-python.git
  2. migrations 디렉터리로 이동합니다.

    cd langchain-google-alloydb-pg-python/samples/migrations

기존 벡터 데이터베이스에서 데이터 추출

  1. 클라이언트를 만듭니다.

    Pinecone

    from pinecone import Pinecone  # type: ignore
    
    pinecone_client = Pinecone(api_key=pinecone_api_key)
    pinecone_index = pinecone_client.Index(pinecone_index_name)

    Weaviate

    import weaviate
    
    # For a locally running weaviate instance, use `weaviate.connect_to_local()`
    weaviate_client = weaviate.connect_to_weaviate_cloud(
        cluster_url=weaviate_cluster_url,
        auth_credentials=weaviate.auth.AuthApiKey(weaviate_api_key),
    )

    Chroma

    from langchain_chroma import Chroma
    
    chromadb_client = Chroma(
        collection_name=chromadb_collection_name,
        embedding_function=embeddings_service,
        persist_directory=chromadb_path,
    )

    Qdrant

    from qdrant_client import QdrantClient
    
    qdrant_client = QdrantClient(path=qdrant_path)
    

    Milvus

    milvus_client = MilvusClient(uri=milvus_uri)
  2. 데이터베이스에서 모든 데이터를 가져옵니다.

    Pinecone

    Pinecone 색인에서 벡터 ID를 가져옵니다.

    results = pinecone_index.list_paginated(
        prefix="", namespace=pinecone_namespace, limit=pinecone_batch_size
    )
    ids = [v.id for v in results.vectors]
    yield ids
    
    while results.pagination is not None:
        pagination_token = results.pagination.next
        results = pinecone_index.list_paginated(
            prefix="", pagination_token=pagination_token, limit=pinecone_batch_size
        )
    
        # Extract and yield the next batch of IDs
        ids = [v.id for v in results.vectors]
        yield ids

    그런 다음 Pinecone 색인에서 ID별로 레코드를 가져옵니다.

    # Iterate through the IDs and download their contents
    for ids in id_iterator:
        all_data = pinecone_index.fetch(ids=ids, namespace=pinecone_namespace)
        ids = []
        embeddings = []
        contents = []
        metadatas = []
    
        # Process each vector in the current batch
        for doc in all_data["vectors"].values():
            ids.append(doc["id"])
            embeddings.append(doc["values"])
            contents.append(str(doc["metadata"]["text"]))
            del doc["metadata"]["text"]
            metadata = doc["metadata"]
            metadatas.append(metadata)
    
        # Yield the current batch of results
        yield ids, contents, embeddings, metadatas

    Weaviate

    # Iterate through the IDs and download their contents
    weaviate_collection = weaviate_client.collections.get(weaviate_collection_name)
    ids: list[str] = []
    content: list[Any] = []
    embeddings: list[list[float]] = []
    metadatas: list[Any] = []
    
    for item in weaviate_collection.iterator(include_vector=True):
        ids.append(str(item.uuid))
        content.append(item.properties[weaviate_text_key])
        embeddings.append(item.vector["default"])  # type: ignore
        del item.properties[weaviate_text_key]  # type: ignore
        metadatas.append(item.properties)
    
        if len(ids) >= weaviate_batch_size:
            # Yield the current batch of results
            yield ids, content, embeddings, metadatas
            # Reset lists to start a new batch
            ids = []
            content = []
            embeddings = []
            metadatas = []

    Chroma

    # Iterate through the IDs and download their contents
    offset = 0
    while True:
        docs = chromadb_client.get(
            include=["metadatas", "documents", "embeddings"],
            limit=chromadb_batch_size,
            offset=offset,
        )
    
        if len(docs["documents"]) == 0:
            break
    
        yield docs["ids"], docs["documents"], docs["embeddings"].tolist(), docs[
            "metadatas"
        ]
    
        offset += chromadb_batch_size
    

    Qdrant

    # Iterate through the IDs and download their contents
    offset = None
    while True:
        docs, offset = qdrant_client.scroll(
            collection_name=qdrant_collection_name,
            with_vectors=True,
            limit=qdrant_batch_size,
            offset=offset,
            with_payload=True,
        )
    
        ids: List[str] = []
        contents: List[Any] = []
        embeddings: List[List[float]] = []
        metadatas: List[Any] = []
    
        for doc in docs:
            if doc.payload and doc.vector:
                ids.append(str(doc.id))
                contents.append(doc.payload["page_content"])
                embeddings.append(doc.vector)  # type: ignore
                metadatas.append(doc.payload["metadata"])
    
        yield ids, contents, embeddings, metadatas
    
        if not offset:
            break
    

    Milvus

    # Iterate through the IDs and download their contents
    iterator = milvus_client.query_iterator(
        collection_name=milvus_collection_name,
        filter='pk >= "0"',
        output_fields=["pk", "text", "vector", "idv"],
        batch_size=milvus_batch_size,
    )
    
    while True:
        ids = []
        content = []
        embeddings = []
        metadatas = []
        page = iterator.next()
        if len(page) == 0:
            iterator.close()
            break
        for i in range(len(page)):
            doc = page[i]
            ids.append(doc["pk"])
            content.append(doc["text"])
            embeddings.append(doc["vector"])
            del doc["pk"]
            del doc["text"]
            del doc["vector"]
            metadatas.append(doc)
        yield ids, content, embeddings, metadatas

AlloyDB 테이블 초기화

  1. 임베딩 서비스를 정의합니다.

    VectorStore 인터페이스에는 삽입 서비스가 필요합니다. 이 워크플로는 새 임베딩을 생성하지 않으므로 비용을 방지하기 위해 FakeEmbeddings 클래스가 사용됩니다.

    Pinecone

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)

    Weaviate

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)

    Chroma

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)

    Qdrant

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)

    Milvus

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)
  2. AlloyDB 테이블을 준비합니다.

    1. 공개 IP 연결을 사용하여 AlloyDB에 연결합니다. 자세한 내용은 IP 주소 유형 지정을 참고하세요.

      Pinecone

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
          ip_type=IPTypes.PUBLIC,
      )

      Weaviate

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
          ip_type=IPTypes.PUBLIC,
      )

      Chroma

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
          ip_type=IPTypes.PUBLIC,
      )

      Qdrant

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
          ip_type=IPTypes.PUBLIC,
      )

      Milvus

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
          ip_type=IPTypes.PUBLIC,
      )
    2. 데이터를 복사할 테이블이 아직 없으면 테이블을 만듭니다.

      Pinecone

      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
      )

      Weaviate

      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
      )
      

      Chroma

      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
      )

      Qdrant

      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
      )

      Milvus

      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
      )

벡터 저장소 객체 초기화

이 코드는 langchain_metadata 열에 JSON 형식의 벡터 임베딩 메타데이터를 추가합니다. 필터링을 더 효율적으로 하려면 이 메타데이터를 별도의 열로 구성하세요. 자세한 내용은 커스텀 벡터 스토어 만들기를 참고하세요.

  1. 벡터 저장소 객체를 초기화하려면 다음 명령어를 실행합니다.

    Pinecone

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )

    Weaviate

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )

    Chroma

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )

    Qdrant

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )

    Milvus

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )
  2. AlloyDB 테이블에 데이터를 삽입합니다.

    Pinecone

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

    Weaviate

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

    Chroma

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

    Qdrant

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

    Milvus

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

이전 스크립트 실행

  1. Python 환경을 설정합니다.

  2. 샘플 종속 항목을 설치합니다.

    pip install -r requirements.txt
  3. 샘플 이전을 실행합니다.

    Pinecone

    python migrate_pinecone_vectorstore_to_alloydb.py

    샘플을 실행하기 전에 다음을 바꿉니다.

    • PINECONE_API_KEY: Pinecone API 키입니다.
    • PINECONE_NAMESPACE: Pinecone 네임스페이스입니다.
    • PINECONE_INDEX_NAME: Pinecone 색인의 이름입니다.
    • PROJECT_ID: 프로젝트 ID입니다.
    • REGION: AlloyDB 클러스터가 배포되는 리전입니다.
    • CLUSTER: 클러스터의 이름입니다.
    • INSTANCE: 인스턴스 이름입니다.
    • DB_NAME: 데이터베이스의 이름
    • DB_USER: 데이터베이스 사용자의 이름입니다.
    • DB_PWD: 데이터베이스 보안 비밀 비밀번호입니다.

    Weaviate

    python migrate_weaviate_vectorstore_to_alloydb.py

    샘플을 실행하기 전에 다음을 바꿉니다.

    • WEAVIATE_API_KEY: Weaviate API 키입니다.
    • WEAVIATE_CLUSTER_URL: Weaviate 클러스터 URL입니다.
    • WEAVIATE_COLLECTION_NAME: Weaviate 컬렉션 이름입니다.
    • PROJECT_ID: 프로젝트 ID입니다.
    • REGION: AlloyDB 클러스터가 배포되는 리전입니다.
    • CLUSTER: 클러스터의 이름입니다.
    • INSTANCE: 인스턴스 이름입니다.
    • DB_NAME: 데이터베이스의 이름
    • DB_USER: 데이터베이스 사용자의 이름입니다.
    • DB_PWD: 데이터베이스 보안 비밀 비밀번호입니다.

    Chroma

    python migrate_chromadb_vectorstore_to_alloydb.py

    샘플을 실행하기 전에 다음을 바꿉니다.

    • CHROMADB_PATH: Chroma 데이터베이스 경로입니다.
    • CHROMADB_COLLECTION_NAME: Chroma 데이터베이스 모음의 이름입니다.
    • PROJECT_ID: 프로젝트 ID입니다.
    • REGION: AlloyDB 클러스터가 배포되는 리전입니다.
    • CLUSTER: 클러스터의 이름입니다.
    • INSTANCE: 인스턴스 이름입니다.
    • DB_NAME: 데이터베이스의 이름
    • DB_USER: 데이터베이스 사용자의 이름입니다.
    • DB_PWD: 데이터베이스 보안 비밀 비밀번호입니다.

    Qdrant

    python migrate_qdrant_vectorstore_to_alloydb.py

    샘플을 실행하기 전에 다음을 바꿉니다.

    • QDRANT_PATH: Qdrant 데이터베이스 경로입니다.
    • QDRANT_COLLECTION_NAME: Qdrant 컬렉션 이름입니다.
    • PROJECT_ID: 프로젝트 ID입니다.
    • REGION: AlloyDB 클러스터가 배포되는 리전입니다.
    • CLUSTER: 클러스터의 이름입니다.
    • INSTANCE: 인스턴스 이름입니다.
    • DB_NAME: 데이터베이스의 이름
    • DB_USER: 데이터베이스 사용자의 이름입니다.
    • DB_PWD: 데이터베이스 보안 비밀 비밀번호입니다.

    Milvus

    python migrate_milvus_vectorstore_to_alloydb.py

    샘플을 실행하기 전에 다음을 바꿉니다.

    • MILVUS_URI: Milvus URI입니다.
    • MILVUS_COLLECTION_NAME: Milvus 컬렉션의 이름입니다.
    • PROJECT_ID: 프로젝트 ID입니다.
    • REGION: AlloyDB 클러스터가 배포되는 리전입니다.
    • CLUSTER: 클러스터의 이름입니다.
    • INSTANCE: 인스턴스 이름입니다.
    • DB_NAME: 데이터베이스의 이름
    • DB_USER: 데이터베이스 사용자의 이름입니다.
    • DB_PWD: 데이터베이스 보안 비밀 비밀번호입니다.

    마이그레이션에 성공하면 오류 없이 다음과 유사한 로그가 출력됩니다.
    Migration completed, inserted all the batches of data to AlloyDB

  4. AlloyDB 스튜디오를 열어 이전된 데이터를 확인합니다. 자세한 내용은 AlloyDB 스튜디오를 사용하여 데이터 관리를 참고하세요.

삭제

이 튜토리얼에서 사용된 리소스 비용이 Google Cloud 계정에 청구되지 않도록 하려면 리소스가 포함된 프로젝트를 삭제하거나 프로젝트를 유지하고 개별 리소스를 삭제하세요.

  1. Google Cloud 콘솔에서 클러스터 페이지로 이동합니다.

    클러스터로 이동

  2. 리소스 이름 열에서 내가 만든 클러스터의 이름을 클릭합니다.

  3. 클러스터 삭제를 클릭합니다.

  4. 클러스터 삭제에서 클러스터 이름을 입력하여 클러스터 삭제를 확인합니다.

  5. 삭제를 클릭합니다.

    클러스터를 만들 때 비공개 연결을 만든 경우 비공개 연결을 삭제합니다.

  6. Google Cloud 콘솔 네트워킹 페이지로 이동하여 VPC 네트워크 삭제를 클릭합니다.

다음 단계