Streaming perubahan Bigtable ke template Vector Search

Template ini membuat pipeline streaming untuk melakukan streaming data Bigtable yang mengubah rekaman dan menuliskannya ke Vertex AI Vector Search menggunakan Dataflow Runner V2.

Persyaratan pipeline

  • Instance sumber Bigtable harus ada.
  • Tabel sumber Bigtable harus ada, dan tabel harus mengaktifkan aliran perubahan.
  • Profil aplikasi Bigtable harus ada.
  • Path indeks Vector Search harus ada.

Parameter template

Parameter yang diperlukan

  • embeddingColumn: Nama kolom yang sepenuhnya memenuhi syarat tempat embedding disimpan. Dalam format cf:col.
  • embeddingByteSize: Ukuran byte setiap entri dalam array embedding. Gunakan 4 untuk Float, dan 8 untuk Double. Default: 4.
  • vectorSearchIndex: Indeks Vector Search tempat perubahan akan di-streaming, dalam format 'projects/{projectID}/locations/{region}/indexes/{indexID}' (tanpa spasi di awal atau akhir) Misalnya, projects/123/locations/us-east1/indexes/456.
  • bigtableChangeStreamAppProfile: ID profil aplikasi Bigtable. Profil aplikasi harus menggunakan perutean cluster tunggal dan mengizinkan transaksi baris tunggal.
  • bigtableReadInstanceId: ID instance Bigtable sumber.
  • bigtableReadTableId: ID tabel Bigtable sumber.

Parameter opsional

  • bigtableMetadataTableTableId: ID Tabel yang digunakan untuk membuat tabel metadata.
  • crowdingTagColumn: Nama kolom yang sepenuhnya memenuhi syarat tempat tag keramaian disimpan. Dalam format cf:col.
  • allowRestrictsMappings: Nama kolom yang memenuhi syarat sepenuhnya yang dipisahkan koma dari kolom yang harus digunakan sebagai pembatasan allow, dengan aliasnya. Dalam format cf:col->alias.
  • denyRestrictsMappings: Nama kolom yang sepenuhnya memenuhi syarat dan dipisahkan koma dari kolom yang harus digunakan sebagai pembatasan deny, dengan aliasnya. Dalam format cf:col->alias.
  • intNumericRestrictsMappings: Nama kolom yang sepenuhnya memenuhi syarat yang dipisahkan koma dari kolom yang harus digunakan sebagai numeric_restricts bilangan bulat, dengan aliasnya. Dalam format cf:col->alias.
  • floatNumericRestrictsMappings: Nama kolom yang sepenuhnya memenuhi syarat yang dipisahkan koma dari kolom yang harus digunakan sebagai numeric_restricts float (4 byte), dengan aliasnya. Dalam format cf:col->alias.
  • doubleNumericRestrictsMappings: Nama kolom yang sepenuhnya memenuhi syarat yang dipisahkan koma dari kolom yang harus digunakan sebagai numeric_restricts ganda (8 byte), dengan aliasnya. Dalam format cf:col->alias.
  • upsertMaxBatchSize: Jumlah maksimum upsert yang akan di-buffer sebelum meng-upsert batch ke Indeks Vector Search. Batch akan dikirim saat ada rekaman upsertBatchSize yang siap, atau waktu tunggu upsertBatchDelay untuk rekaman apa pun telah berlalu. Contoh, 10. Defaultnya adalah: 10.
  • upsertMaxBufferDuration: Penundaan maksimum sebelum batch upsert dikirim ke Vector Search.Batch akan dikirim saat ada upsertBatchSize rekaman yang siap, atau waktu tunggu upsertBatchDelay telah berlalu untuk rekaman apa pun. Format yang diizinkan adalah: Ns (untuk detik, contoh: 5s), Nm (untuk menit, contoh: 12m), Nh (untuk jam, contoh: 2h). Contoh, 10s. Nilai defaultnya adalah: 10 detik.
  • deleteMaxBatchSize: Jumlah maksimum penghapusan yang akan di-buffer sebelum menghapus batch dari Indeks Penelusuran Vektor. Batch akan dikirim saat ada deleteBatchSize data yang siap, atau waktu tunggu deleteBatchDelay untuk data apa pun telah berlalu. Contoh, 10. Defaultnya adalah: 10.
  • deleteMaxBufferDuration: Penundaan maksimum sebelum batch penghapusan dikirim ke Vector Search.Batch akan dikirim saat ada deleteBatchSize rekaman yang siap, atau waktu deleteBatchDelay telah berlalu untuk rekaman apa pun yang menunggu. Format yang diizinkan adalah: Ns (untuk detik, contoh: 5s), Nm (untuk menit, contoh: 12m), Nh (untuk jam, contoh: 2h). Contoh, 10s. Nilai defaultnya adalah: 10 detik.
  • dlqDirectory: Jalur untuk menyimpan semua catatan yang belum diproses beserta alasan kegagalan pemrosesannya. Defaultnya adalah direktori di bawah lokasi sementara tugas Dataflow. Nilai default sudah cukup dalam sebagian besar kondisi.
  • bigtableChangeStreamMetadataInstanceId: ID instance metadata aliran perubahan Bigtable. Nilai defaultnya adalah kosong.
  • bigtableChangeStreamMetadataTableTableId: ID tabel metadata konektor aliran perubahan data Bigtable. Jika tidak disediakan, tabel metadata konektor aliran perubahan Bigtable akan otomatis dibuat selama eksekusi pipeline. Nilai defaultnya adalah kosong.
  • bigtableChangeStreamCharset: Nama charset aliran perubahan Bigtable. Nilai defaultnya adalah: UTF-8.
  • bigtableChangeStreamStartTimestamp: Stempel waktu awal (https://tools.ietf.org/html/rfc3339), inklusif, yang akan digunakan untuk membaca aliran perubahan. Misalnya, 2022-05-05T07:59:59Z. Default-nya adalah stempel waktu waktu mulai pipeline.
  • bigtableChangeStreamIgnoreColumnFamilies: Daftar perubahan nama grup kolom yang dipisahkan koma untuk diabaikan. Nilai defaultnya adalah kosong.
  • bigtableChangeStreamIgnoreColumns: Daftar perubahan nama kolom yang dipisahkan koma untuk diabaikan. Contoh: "cf1:col1,cf2:col2". Nilai defaultnya adalah kosong.
  • bigtableChangeStreamName: Nama unik untuk pipeline klien. Memungkinkan Anda melanjutkan pemrosesan dari titik saat pipeline yang sebelumnya berjalan berhenti. Secara default, nama dibuat secara otomatis. Lihat log tugas Dataflow untuk nilai yang digunakan.
  • bigtableChangeStreamResume: Jika disetel ke true, pipeline baru akan melanjutkan pemrosesan dari titik saat pipeline yang sebelumnya berjalan dengan nilai bigtableChangeStreamName yang sama berhenti. Jika pipeline dengan nilai bigtableChangeStreamName yang diberikan belum pernah dijalankan, pipeline baru tidak akan dimulai. Jika disetel ke false, pipeline baru akan dimulai. Jika pipeline dengan nilai bigtableChangeStreamName yang sama telah berjalan untuk sumber tertentu, pipeline baru tidak akan dimulai. Nilai defaultnya adalah false.
  • bigtableReadChangeStreamTimeoutMs: Waktu tunggu untuk permintaan Bigtable ReadChangeStream dalam milidetik.
  • bigtableReadProjectId: Project ID Bigtable. Defaultnya adalah project untuk tugas Dataflow.

Menjalankan template

Konsol

  1. Buka halaman Dataflow Create job from template.
  2. Buka Membuat tugas dari template
  3. Di kolom Nama tugas, masukkan nama tugas yang unik.
  4. Opsional: Untuk Endpoint regional, pilih nilai dari menu drop-down. Region default-nya adalah us-central1.

    Untuk mengetahui daftar region tempat Anda dapat menjalankan tugas Dataflow, lihat Lokasi Dataflow.

  5. Dari menu drop-down Template Dataflow, pilih the Bigtable Change Streams to Vector Search template.
  6. Di kolom parameter yang disediakan, masukkan nilai parameter Anda.
  7. Klik Run job.

gcloud CLI

Di shell atau terminal Anda, jalankan template:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_Vector_Search \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --parameters \
       embeddingColumn=EMBEDDING_COLUMN,\
       embeddingByteSize=EMBEDDING_BYTE_SIZE,\
       vectorSearchIndex=VECTOR_SEARCH_INDEX,\
       bigtableChangeStreamAppProfile=BIGTABLE_CHANGE_STREAM_APP_PROFILE,\
       bigtableReadInstanceId=BIGTABLE_READ_INSTANCE_ID,\
       bigtableReadTableId=BIGTABLE_READ_TABLE_ID,\

Ganti kode berikut:

  • JOB_NAME: nama tugas unik pilihan Anda
  • VERSION: versi template yang ingin Anda gunakan

    Anda dapat menggunakan nilai berikut:

  • REGION_NAME: region tempat Anda ingin men-deploy tugas Dataflow—misalnya, us-central1
  • EMBEDDING_COLUMN: kolom Embedding
  • EMBEDDING_BYTE_SIZE: Ukuran byte array embedding. Dapat berupa 4 atau 8.
  • VECTOR_SEARCH_INDEX: Jalur indeks Vector Search
  • BIGTABLE_CHANGE_STREAM_APP_PROFILE: ID profil aplikasi Bigtable
  • BIGTABLE_READ_INSTANCE_ID: ID Instance Bigtable sumber
  • BIGTABLE_READ_TABLE_ID: ID tabel Bigtable sumber

API

Untuk menjalankan template menggunakan REST API, kirim permintaan HTTP POST. Untuk mengetahui informasi selengkapnya tentang API dan cakupan otorisasinya, lihat projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launchParameter": {
     "jobName": "JOB_NAME",
     "parameters": {
       "embeddingColumn": "EMBEDDING_COLUMN",
       "embeddingByteSize": "EMBEDDING_BYTE_SIZE",
       "vectorSearchIndex": "VECTOR_SEARCH_INDEX",
       "bigtableChangeStreamAppProfile": "BIGTABLE_CHANGE_STREAM_APP_PROFILE",
       "bigtableReadInstanceId": "BIGTABLE_READ_INSTANCE_ID",
       "bigtableReadTableId": "BIGTABLE_READ_TABLE_ID",
     },
     "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Bigtable_Change_Streams_to_Vector_Search",
     "environment": { "maxWorkers": "10" }
  }
}

Ganti kode berikut:

  • PROJECT_ID: ID Google Cloud project tempat Anda ingin menjalankan tugas Dataflow
  • JOB_NAME: nama tugas unik pilihan Anda
  • VERSION: versi template yang ingin Anda gunakan

    Anda dapat menggunakan nilai berikut:

  • LOCATION: region tempat Anda ingin men-deploy tugas Dataflow—misalnya, us-central1
  • EMBEDDING_COLUMN: kolom Embedding
  • EMBEDDING_BYTE_SIZE: Ukuran byte array embedding. Dapat berupa 4 atau 8.
  • VECTOR_SEARCH_INDEX: Jalur indeks Vector Search
  • BIGTABLE_CHANGE_STREAM_APP_PROFILE: ID profil aplikasi Bigtable
  • BIGTABLE_READ_INSTANCE_ID: ID Instance Bigtable sumber
  • BIGTABLE_READ_TABLE_ID: ID tabel Bigtable sumber