Memigrasikan data dari database vektor ke AlloyDB


Tutorial ini menjelaskan cara memigrasikan data dari database vektor pihak ketiga ke AlloyDB untuk PostgreSQL menggunakan penyimpanan vektor LangChain. Database vektor berikut didukung:

Tutorial ini mengasumsikan bahwa Anda sudah memahami Google Cloud, AlloyDB, dan pemrograman Python asinkron.

Tujuan

Tutorial ini menunjukkan cara melakukan hal berikut:

  • Mengekstrak data dari database vektor yang ada.
  • Hubungkan ke AlloyDB.
  • Lakukan inisialisasi tabel AlloyDB.
  • Lakukan inisialisasi objek penyimpanan vektor.
  • Jalankan skrip migrasi untuk menyisipkan data.

Biaya

Dalam dokumen ini, Anda akan menggunakan komponen Google Cloudyang dapat ditagih berikut:

Untuk membuat perkiraan biaya berdasarkan proyeksi penggunaan Anda, gunakan kalkulator harga. Pengguna Google Cloud baru mungkin memenuhi syarat untuk mendapatkan uji coba gratis.

Setelah menyelesaikan tugas yang dijelaskan dalam dokumen ini, Anda dapat menghindari penagihan berkelanjutan dengan menghapus resource yang Anda buat. Untuk mengetahui informasi selengkapnya, lihat Pembersihan.

Sebelum memulai

Pastikan Anda memiliki salah satu penyimpanan vektor database pihak ketiga LangChain berikut:

Mengaktifkan penagihan dan API yang diperlukan

  1. Di konsol Google Cloud, pada halaman pemilih project, pilih atau buat projectGoogle Cloud .

    Buka pemilih project

  2. Pastikan penagihan diaktifkan untuk Google Cloud project Anda.

  3. Aktifkan Cloud API yang diperlukan untuk membuat dan terhubung ke AlloyDB untuk PostgreSQL.

    Mengaktifkan API

    1. Pada langkah Konfirmasi project, klik Berikutnya untuk mengonfirmasi nama project yang akan Anda ubah.
    2. Pada langkah Aktifkan API, klik Aktifkan untuk mengaktifkan hal berikut:

      • AlloyDB API
      • Compute Engine API
      • Service Networking API

Peran yang diperlukan

Untuk mendapatkan izin yang Anda perlukan untuk menyelesaikan tugas dalam tutorial ini, miliki peran Identity and Access Management (IAM) berikut yang memungkinkan pembuatan tabel dan penyisipan data:

Jika Anda ingin mengautentikasi ke database menggunakan autentikasi IAM, bukan menggunakan autentikasi bawaan dalam tutorial ini, gunakan notebook yang menunjukkan cara menggunakan AlloyDB untuk PostgreSQL guna menyimpan penyematan vektor dengan class AlloyDBVectorStore.

Membuat cluster dan pengguna AlloyDB

  1. Buat cluster dan instance AlloyDB.
    • Aktifkan IP Publik untuk menjalankan tutorial ini dari mana saja. Jika menggunakan IP Pribadi, Anda harus menjalankan tutorial ini dari dalam VPC.
  2. Buat atau pilih pengguna database AlloyDB.
    • Saat Anda membuat instance, pengguna postgres akan dibuat dengan sandi. Pengguna ini memiliki izin superuser.
    • Tutorial ini menggunakan autentikasi bawaan untuk mengurangi hambatan autentikasi. Autentikasi IAM dapat dilakukan menggunakan AlloyDBEngine.

Mengambil contoh kode

  1. Salin contoh kode dari GitHub dengan meng-clone repositori:

    git clone https://github.com/googleapis/langchain-google-alloydb-pg-python.git
  2. Buka direktori migrations:

    cd langchain-google-alloydb-pg-python/samples/migrations

Mengekstrak data dari database vektor yang ada

  1. Buat klien.

    Pinecone

    from pinecone import Pinecone  # type: ignore
    
    pinecone_client = Pinecone(api_key=pinecone_api_key)
    pinecone_index = pinecone_client.Index(pinecone_index_name)

    Weaviate

    import weaviate
    
    # For a locally running weaviate instance, use `weaviate.connect_to_local()`
    weaviate_client = weaviate.connect_to_weaviate_cloud(
        cluster_url=weaviate_cluster_url,
        auth_credentials=weaviate.auth.AuthApiKey(weaviate_api_key),
    )

    Chroma

    from langchain_chroma import Chroma
    
    chromadb_client = Chroma(
        collection_name=chromadb_collection_name,
        embedding_function=embeddings_service,
        persist_directory=chromadb_path,
    )

    Kuadran

    from qdrant_client import QdrantClient
    
    qdrant_client = QdrantClient(path=qdrant_path)
    

    Milvus

    milvus_client = MilvusClient(uri=milvus_uri)
  2. Dapatkan semua data dari database.

    Pinecone

    Ambil ID vektor dari indeks Pinecone:

    results = pinecone_index.list_paginated(
        prefix="", namespace=pinecone_namespace, limit=pinecone_batch_size
    )
    ids = [v.id for v in results.vectors]
    yield ids
    
    while results.pagination is not None:
        pagination_token = results.pagination.next
        results = pinecone_index.list_paginated(
            prefix="", pagination_token=pagination_token, limit=pinecone_batch_size
        )
    
        # Extract and yield the next batch of IDs
        ids = [v.id for v in results.vectors]
        yield ids

    Kemudian, ambil data berdasarkan ID dari indeks Pinecone:

    # Iterate through the IDs and download their contents
    for ids in id_iterator:
        all_data = pinecone_index.fetch(ids=ids)
        ids = []
        embeddings = []
        contents = []
        metadatas = []
    
        # Process each vector in the current batch
        for doc in all_data["vectors"].values():
            ids.append(doc["id"])
            embeddings.append(doc["values"])
            contents.append(str(doc["metadata"]["text"]))
            del doc["metadata"]["text"]
            metadata = doc["metadata"]
            metadatas.append(metadata)
    
        # Yield the current batch of results
        yield ids, contents, embeddings, metadatas

    Weaviate

    # Iterate through the IDs and download their contents
    weaviate_collection = weaviate_client.collections.get(weaviate_collection_name)
    ids = []
    content = []
    embeddings = []
    metadatas = []
    
    for item in weaviate_collection.iterator(include_vector=True):
        ids.append(str(item.uuid))
        content.append(item.properties[weaviate_text_key])
        embeddings.append(item.vector["default"])
        del item.properties[weaviate_text_key]  # type: ignore
        metadatas.append(item.properties)
    
        if len(ids) >= weaviate_batch_size:
            # Yield the current batch of results
            yield ids, content, embeddings, metadatas
            # Reset lists to start a new batch
            ids = []
            content = []
            embeddings = []
            metadatas = []

    Chroma

    # Iterate through the IDs and download their contents
    offset = 0
    while True:
        docs = chromadb_client.get(
            include=["metadatas", "documents", "embeddings"],
            limit=chromadb_batch_size,
            offset=offset,
        )
    
        if len(docs["documents"]) == 0:
            break
    
        yield docs["ids"], docs["documents"], docs["embeddings"].tolist(), docs[
            "metadatas"
        ]
    
        offset += chromadb_batch_size
    

    Kuadran

    # Iterate through the IDs and download their contents
    offset = None
    while True:
        docs, offset = qdrant_client.scroll(
            collection_name=qdrant_collection_name,
            with_vectors=True,
            limit=qdrant_batch_size,
            offset=offset,
            with_payload=True,
        )
    
        ids: List[str] = []
        contents: List[Any] = []
        embeddings: List[List[float]] = []
        metadatas: List[Any] = []
    
        for doc in docs:
            if doc.payload and doc.vector:
                ids.append(str(doc.id))
                contents.append(doc.payload["page_content"])
                embeddings.append(doc.vector)  # type: ignore
                metadatas.append(doc.payload["metadata"])
    
        yield ids, contents, embeddings, metadatas
    
        if not offset:
            break
    

    Milvus

    # Iterate through the IDs and download their contents
    iterator = milvus_client.query_iterator(
        collection_name=milvus_collection_name,
        filter='pk >= "0"',
        output_fields=["pk", "text", "vector", "idv"],
        batch_size=milvus_batch_size,
    )
    
    while True:
        ids = []
        content = []
        embeddings = []
        metadatas = []
        page = iterator.next()
        if len(page) == 0:
            iterator.close()
            break
        for i in range(len(page)):
            doc = page[i]
            ids.append(doc["pk"])
            content.append(doc["text"])
            embeddings.append(doc["vector"])
            del doc["pk"]
            del doc["text"]
            del doc["vector"]
            metadatas.append(doc)
        yield ids, content, embeddings, metadatas

Melakukan inisialisasi tabel AlloyDB

  1. Tentukan layanan penyematan.

    Antarmuka VectorStore memerlukan layanan penyematan. Alur kerja ini tidak membuat penyematan baru, sehingga class FakeEmbeddings digunakan untuk menghindari biaya apa pun.

    Pinecone

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)

    Weaviate

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)

    Chroma

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)

    Kuadran

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)

    Milvus

    # The VectorStore interface requires an embedding service. This workflow does not
    # generate new embeddings, therefore FakeEmbeddings class is used to avoid any costs.
    from langchain_core.embeddings import FakeEmbeddings
    
    embeddings_service = FakeEmbeddings(size=vector_size)
  2. Siapkan tabel AlloyDB.

    1. Menghubungkan ke AlloyDB menggunakan koneksi IP publik. Untuk mengetahui informasi selengkapnya, lihat Menentukan Jenis Alamat IP.

      Pinecone

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
      )

      Weaviate

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
      )

      Chroma

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
      )

      Kuadran

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
      )

      Milvus

      from langchain_google_alloydb_pg import AlloyDBEngine
      
      alloydb_engine = await AlloyDBEngine.afrom_instance(
          project_id=project_id,
          region=region,
          cluster=cluster,
          instance=instance,
          database=db_name,
          user=db_user,
          password=db_pwd,
      )
    2. Buat tabel untuk menyalin data, jika belum ada.

      Pinecone

      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
      )

      Weaviate

      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
      )
      

      Chroma

      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
      )

      Kuadran

      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
      )

      Milvus

      await alloydb_engine.ainit_vectorstore_table(
          table_name=alloydb_table,
          vector_size=vector_size,
      )

Melakukan inisialisasi objek penyimpanan vektor

Kode ini menambahkan metadata penyematan vektor tambahan ke kolom langchain_metadata dalam format JSON. Agar pemfilteran lebih efisien, atur metadata ini ke dalam kolom terpisah. Untuk informasi selengkapnya, lihat Membuat Vector Store kustom.

  1. Untuk melakukan inisialisasi objek penyimpanan vektor, jalankan perintah berikut:

    Pinecone

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )

    Weaviate

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )

    Chroma

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )

    Kuadran

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )

    Milvus

    from langchain_google_alloydb_pg import AlloyDBVectorStore
    
    vs = await AlloyDBVectorStore.create(
        engine=alloydb_engine,
        embedding_service=embeddings_service,
        table_name=alloydb_table,
    )
  2. Sisipkan data ke dalam tabel AlloyDB:

    Pinecone

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

    Weaviate

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

    Chroma

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

    Kuadran

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

    Milvus

    pending: set[Any] = set()
    for ids, contents, embeddings, metadatas in data_iterator:
        pending.add(
            asyncio.ensure_future(
                vs.aadd_embeddings(
                    texts=contents,
                    embeddings=embeddings,
                    metadatas=metadatas,
                    ids=ids,
                )
            )
        )
        if len(pending) >= max_concurrency:
            _, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )
    if pending:
        await asyncio.wait(pending)

Menjalankan skrip migrasi

  1. Menyiapkan lingkungan Python.

  2. Instal dependensi contoh:

    pip install -r requirements.txt
  3. Jalankan migrasi contoh.

    Pinecone

    python migrate_pinecone_vectorstore_to_alloydb.py

    Lakukan penggantian berikut sebelum menjalankan contoh:

    • PINECONE_API_KEY: kunci Pinecone API.
    • PINECONE_NAMESPACE: namespace Pinecone.
    • PINECONE_INDEX_NAME: nama indeks Pinecone.
    • PROJECT_ID: project ID.
    • REGION: region tempat cluster AlloyDB di-deploy.
    • CLUSTER: nama cluster.
    • INSTANCE: nama instance.
    • DB_NAME: nama database.
    • DB_USER: nama pengguna database.
    • DB_PWD: sandi secret database.

    Weaviate

    python migrate_weaviate_vectorstore_to_alloydb.py

    Lakukan penggantian berikut sebelum menjalankan contoh:

    • WEAVIATE_API_KEY: kunci Weaviate API.
    • WEAVIATE_CLUSTER_URL: URL cluster Weaviate.
    • WEAVIATE_COLLECTION_NAME: nama koleksi Weaviate.
    • PROJECT_ID: project ID.
    • REGION: region tempat cluster AlloyDB di-deploy.
    • CLUSTER: nama cluster.
    • INSTANCE: nama instance.
    • DB_NAME: nama database.
    • DB_USER: nama pengguna database.
    • DB_PWD: sandi secret database.

    Chroma

    python migrate_chromadb_vectorstore_to_alloydb.py

    Lakukan penggantian berikut sebelum menjalankan contoh:

    • CHROMADB_PATH: jalur database Chroma.
    • CHROMADB_COLLECTION_NAME: nama koleksi database Chroma.
    • PROJECT_ID: project ID.
    • REGION: region tempat cluster AlloyDB di-deploy.
    • CLUSTER: nama cluster.
    • INSTANCE: nama instance.
    • DB_NAME: nama database.
    • DB_USER: nama pengguna database.
    • DB_PWD: sandi secret database.

    Kuadran

    python migrate_qdrant_vectorstore_to_alloydb.py

    Lakukan penggantian berikut sebelum menjalankan contoh:

    • QDRANT_PATH: jalur database Qdrant.
    • QDRANT_COLLECTION_NAME: nama koleksi Qdrant.
    • PROJECT_ID: project ID.
    • REGION: region tempat cluster AlloyDB di-deploy.
    • CLUSTER: nama cluster.
    • INSTANCE: nama instance.
    • DB_NAME: nama database.
    • DB_USER: nama pengguna database.
    • DB_PWD: sandi secret database.

    Milvus

    python migrate_milvus_vectorstore_to_alloydb.py

    Lakukan penggantian berikut sebelum menjalankan contoh:

    • MILVUS_URI: URI Milvus.
    • MILVUS_COLLECTION_NAME: nama koleksi Milvus.
    • PROJECT_ID: project ID.
    • REGION: region tempat cluster AlloyDB di-deploy.
    • CLUSTER: nama cluster.
    • INSTANCE: nama instance.
    • DB_NAME: nama database.
    • DB_USER: nama pengguna database.
    • DB_PWD: sandi secret database.

    Migrasi yang berhasil akan mencetak log yang mirip dengan berikut ini tanpa error:
    Migration completed, inserted all the batches of data to AlloyDB

  4. Buka AlloyDB Studio untuk melihat data yang dimigrasikan. Untuk mengetahui informasi selengkapnya, lihat Mengelola data Anda menggunakan AlloyDB Studio.

Pembersihan

Agar tidak perlu membayar biaya pada akun Google Cloud Anda untuk resource yang digunakan dalam tutorial ini, hapus project yang berisi resource tersebut, atau simpan project dan hapus setiap resource.

  1. Di konsol Google Cloud, buka halaman Clusters.

    Buka Cluster

  2. Di kolom Resource name, klik nama cluster yang Anda buat.

  3. Klik Hapus cluster.

  4. Di Delete cluster, masukkan nama cluster untuk mengonfirmasi bahwa Anda ingin menghapus cluster.

  5. Klik Delete.

    Jika Anda membuat koneksi pribadi saat membuat cluster, hapus koneksi pribadi:

  6. Buka halaman Networking di konsol Google Cloud, lalu klik Delete VPC network.

Langkah selanjutnya