Streaming perubahan Bigtable ke template Pub/Sub adalah pipeline streaming yang mengalirkan kumpulan data perubahan data Bigtable dan memublikasikannya ke topik Pub/Sub menggunakan Dataflow.
Aliran data perubahan Bigtable memungkinkan Anda berlangganan mutasi data per tabel. Jika Anda berlangganan aliran data perubahan tabel, batasan berikut akan berlaku:
- Hanya sel yang dimodifikasi dan deskriptor dari operasi penghapusan yang ditampilkan.
- Hanya nilai baru dari sel yang dimodifikasi yang dikembalikan.
Saat kumpulan data perubahan data dipublikasikan ke topik Pub/Sub, pesan mungkin disisipkan secara tidak berurutan dibandingkan dengan pengurutan stempel waktu commit Bigtable asli.
Catatan perubahan data Bigtable yang tidak dapat dipublikasikan ke topik Pub/Sub untuk sementara ditempatkan di direktori yang dihentikan pengirimannya (antrean pesan yang belum diproses) di Cloud Storage. Setelah jumlah maksimum percobaan ulang yang gagal, data ini akan ditempatkan tanpa batas waktu di direktori antrean yang dihentikan pengirimannya untuk peninjauan manual atau diproses lebih lanjut oleh pengguna.
Pipeline mengharuskan topik Pub/Sub tujuan ada. Topik tujuan dapat dikonfigurasi untuk memvalidasi pesan menggunakan skema. Saat topik Pub/Sub menentukan skema, pipeline hanya akan dimulai jika skemanya valid. Bergantung pada jenis skema, gunakan salah satu definisi skema berikut untuk topik tujuan:
Buffering protokol
syntax = "proto2"; package com.google.cloud.teleport.bigtable; option java_outer_classname = "ChangeLogEntryProto"; message ChangelogEntryProto{ required bytes rowKey = 1; enum ModType { SET_CELL = 0; DELETE_FAMILY = 1; DELETE_CELLS = 2; UNKNOWN = 3; } required ModType modType = 2; required bool isGC = 3; required int32 tieBreaker = 4; required int64 commitTimestamp = 5; required string columnFamily = 6; optional bytes column = 7; optional int64 timestamp = 8; optional int64 timestampFrom = 9; optional int64 timestampTo = 10; optional bytes value = 11; required string sourceInstance = 12; required string sourceCluster = 13; required string sourceTable = 14; }
Avro
{ "name" : "ChangelogEntryMessage", "type" : "record", "namespace" : "com.google.cloud.teleport.bigtable", "fields" : [ { "name" : "rowKey", "type" : "bytes"}, { "name" : "modType", "type" : { "name": "ModType", "type": "enum", "symbols": ["SET_CELL", "DELETE_FAMILY", "DELETE_CELLS", "UNKNOWN"]} }, { "name": "isGC", "type": "boolean" }, { "name": "tieBreaker", "type": "int"}, { "name": "columnFamily", "type": "string"}, { "name": "commitTimestamp", "type" : "long"}, { "name" : "sourceInstance", "type" : "string"}, { "name" : "sourceCluster", "type" : "string"}, { "name" : "sourceTable", "type" : "string"}, { "name": "column", "type" : ["null", "bytes"]}, { "name": "timestamp", "type" : ["null", "long"]}, { "name": "timestampFrom", "type" : ["null", "long"]}, { "name": "timestampTo", "type" : ["null", "long"]}, { "name" : "value", "type" : ["null", "bytes"]} ] }
JSON
Gunakan skema Protobuf berikut dengan encoding pesan JSON
:
syntax = "proto2"; package com.google.cloud.teleport.bigtable; option java_outer_classname = "ChangelogEntryMessageText"; message ChangelogEntryText{ required string rowKey = 1; enum ModType { SET_CELL = 0; DELETE_FAMILY = 1; DELETE_CELLS = 2; UNKNOWN = 3; } required ModType modType = 2; required bool isGC = 3; required int32 tieBreaker = 4; required int64 commitTimestamp = 5; required string columnFamily = 6; optional string column = 7; optional int64 timestamp = 8; optional int64 timestampFrom = 9; optional int64 timestampTo = 10; optional string value = 11; required string sourceInstance = 12; required string sourceCluster = 13; required string sourceTable = 14; }
Setiap pesan Pub/Sub baru menyertakan satu catatan perubahan data yang ditampilkan oleh aliran perubahan dari baris yang sesuai dalam tabel Bigtable Anda.
Deskripsi pesan output Pub/Sub
Nama kolom | Deskripsi |
---|---|
rowKey |
Tombol baris untuk baris yang diubah. Tiba dalam bentuk array byte. Jika encoding pesan JSON dikonfigurasi, kunci baris akan ditampilkan sebagai string. Saat useBase64Rowkeys ditentukan, kunci baris berenkode Base64. Jika tidak, charset yang ditentukan oleh bigtableChangeStreamCharset akan digunakan untuk mendekode byte kunci baris menjadi string. |
modType |
Jenis mutasi baris. Gunakan salah satu nilai berikut: SET_CELL , DELETE_CELLS , atau DELETE_FAMILY . |
columnFamily |
Grup kolom yang terpengaruh oleh mutasi baris. |
column |
Penentu kolom yang terpengaruh oleh mutasi baris. Untuk jenis mutasi DELETE_FAMILY , kolom kolom belum ditetapkan. Tiba dalam bentuk array byte. Jika encoding pesan JSON dikonfigurasi, kolom akan ditampilkan sebagai string. Jika useBase64ColumnQualifier ditentukan, kolom kolom berenkode Base64. Jika tidak, charset yang ditentukan oleh bigtableChangeStreamCharset akan digunakan untuk mendekode byte kunci baris menjadi string. |
commitTimestamp |
Waktu ketika Bigtable menerapkan mutasi. Waktu diukur dalam mikrodetik sejak Unix epoch (1 Januari 1970 pada UTC). |
timestamp |
Nilai stempel waktu sel yang terpengaruh oleh mutasi. Untuk jenis mutasi DELETE_CELLS dan DELETE_FAMILY , stempel waktu tidak ditetapkan. Waktu diukur dalam mikrodetik sejak Unix epoch (1 Januari 1970 pada UTC). |
timestampFrom |
Menjelaskan awal inklusif interval stempel waktu untuk semua sel yang dihapus oleh mutasi DELETE_CELLS . Untuk jenis mutasi lainnya, timestampFrom tidak ditetapkan. Waktu diukur dalam mikrodetik sejak Unix epoch (1 Januari 1970 pada UTC). |
timestampTo |
Menjelaskan akhir eksklusif interval stempel waktu untuk semua sel yang dihapus oleh mutasi DELETE_CELLS . Untuk jenis mutasi lainnya, timestampTo tidak ditetapkan. |
isGC |
Nilai boolean yang menunjukkan apakah mutasi dihasilkan oleh mekanisme pembersihan sampah memori Bigtable. |
tieBreaker |
Jika dua mutasi didaftarkan bersamaan oleh cluster Bigtable berbeda, mutasi dengan nilai tiebreaker tertinggi akan diterapkan ke tabel sumber. Mutasi dengan nilai tiebreaker yang lebih rendah akan dihapus. |
value |
Nilai baru yang ditetapkan oleh mutasi. Kecuali opsi pipeline stripValues ditetapkan, nilai ditetapkan untuk mutasi SET_CELL . Untuk jenis mutasi lainnya, nilai belum ditetapkan. Tiba dalam bentuk array byte. Jika encoding pesan JSON dikonfigurasi, nilai akan ditampilkan sebagai string.
Jika useBase64Values ditentukan, nilainya berenkode Base64. Jika tidak, charset yang ditentukan oleh bigtableChangeStreamCharset akan digunakan untuk mendekode byte nilai ke dalam string. |
sourceInstance |
Nama instance Bigtable yang mendaftarkan mutasi. Mungkin terjadi saat beberapa pipeline mengalirkan perubahan dari instance yang berbeda ke topik Pub/Sub yang sama. |
sourceCluster |
Nama cluster Bigtable yang mendaftarkan mutasi. Dapat digunakan saat beberapa pipeline mengalirkan perubahan dari instance yang berbeda ke topik Pub/Sub yang sama. |
sourceTable |
Nama tabel Bigtable yang menerima mutasi. Dapat digunakan jika beberapa pipeline mengalirkan perubahan dari tabel yang berbeda ke topik Pub/Sub yang sama. |
Persyaratan pipeline
- Instance sumber Bigtable yang ditentukan.
- Tabel sumber Bigtable yang ditentukan. Tabel harus mengaktifkan aliran perubahan data.
- Profil aplikasi Bigtable yang ditentukan.
- Topik Pub/Sub yang ditentukan harus ada.
Parameter template
Parameter | Deskripsi |
---|---|
bigtableReadInstanceId |
ID instance Bigtable sumber. |
bigtableReadTableId |
ID tabel Bigtable sumber. |
bigtableChangeStreamAppProfile |
ID profil aplikasi Bigtable. Profil aplikasi harus menggunakan perutean cluster tunggal dan mengizinkan transaksi baris tunggal. |
pubSubTopic |
Nama topik Pub/Sub tujuan. |
messageFormat |
Opsional: Jika topik tujuan memiliki skema yang dikonfigurasi, format pesan akan ditentukan oleh encoding dan skema yang dikonfigurasi. Format pesan yang akan dipublikasikan ke topik Pub/Sub. Nilai yang didukung: AVRO , PROTOCOL_BUFFERS , dan JSON . Setelan defaultnya adalah JSON . Jika format JSON digunakan, kolom rowKey, kolom, dan nilai pesan berupa string, yang isinya ditentukan oleh opsi pipeline useBase64Rowkeys , useBase64ColumnQualifiers , useBase64Values , dan bigtableChangeStreamCharset . |
messageEncoding |
Opsional: Jika topik tujuan memiliki skema yang dikonfigurasi, encoding pesan akan ditentukan oleh setelan topik. Encoding pesan yang akan dipublikasikan ke topik Pub/Sub. Nilai yang didukung: BINARY , JSON , dan JSON . Setelan defaultnya adalah JSON . |
stripValues |
Opsional: Jika ditetapkan ke true , mutasi SET_CELL akan ditampilkan tanpa menetapkan nilai baru. Nilai defaultnya adalah false (salah).
Parameter ini berguna jika nilai baru tidak diperlukan, yang juga dikenal sebagai pembatalan cache, atau jika nilai sangat besar dan melebihi batas ukuran pesan Pub/Sub. |
bigtableReadProjectId |
Opsional: Project ID Bigtable. Defaultnya adalah project tugas Dataflow. |
pubSubProjectId |
Opsional: Project ID Bigtable. Defaultnya adalah project tugas Dataflow. |
bigtableChangeStreamMetadataInstanceId |
Opsional: Perubahan Bigtable melakukan streaming ID instance metadata. |
bigtableChangeStreamMetadataTableTableId |
Opsional: Perubahan Bigtable melakukan streaming ID tabel metadata. |
bigtableChangeStreamCharset |
Opsional: Perubahan Bigtable mengalirkan nama charset saat membaca rowkey, nilai, dan penentu kolom. Opsi ini digunakan saat encoding pesan adalah JSON. |
bigtableChangeStreamStartTimestamp |
Opsional: Stempel waktu awal, inklusif, yang akan digunakan untuk membaca aliran perubahan. Contoh, 2022-05-05T07:59:59Z . Setelan defaultnya adalah stempel waktu waktu mulai pipeline. |
bigtableChangeStreamIgnoreColumnFamilies |
Opsional: Daftar yang dipisahkan koma untuk nama grup kolom yang akan diabaikan akan diabaikan. |
bigtableChangeStreamIgnoreColumns |
Opsional: Daftar yang dipisahkan koma untuk perubahan nama kolom yang akan diabaikan. |
bigtableChangeStreamName |
Opsional: Nama unik untuk pipeline klien. Memungkinkan Anda melanjutkan pemrosesan dari titik penghentian pipeline yang berjalan sebelumnya. Setelan defaultnya adalah nama yang dibuat secara otomatis. Untuk menemukan nilainya, lihat log tugas Dataflow. |
bigtableChangeStreamResume |
Opsional: Jika ditetapkan ke true , pipeline baru akan melanjutkan pemrosesan dari titik saat pipeline yang sebelumnya berjalan dengan nilai bigtableChangeStreamName yang sama dihentikan.
Jika pipeline dengan nilai bigtableChangeStreamName yang diberikan belum pernah berjalan, pipeline baru tidak akan dimulai.
Jika ditetapkan ke false , pipeline baru akan dimulai. Jika pipeline dengan nilai bigtableChangeStreamName yang sama
telah berjalan untuk sumber tertentu, pipeline baru tidak akan dimulai. Nilai default-nya adalah false . |
useBase64Rowkeys |
Opsional: Digunakan dengan encoding pesan JSON. Jika ditetapkan ke true , kolom rowKey adalah string berenkode Base64.
Jika tidak, rowKey akan dihasilkan dengan menggunakan bigtableChangeStreamCharset untuk mendekode byte ke dalam string. Setelan defaultnya adalah false . |
useBase64ColumnQualifiers |
Opsional: Digunakan dengan encoding pesan JSON. Jika ditetapkan ke true , kolom column adalah string berenkode Base64.
Jika tidak, kolom akan dihasilkan dengan menggunakan bigtableChangeStreamCharset untuk mendekode byte ke dalam string. Setelan defaultnya adalah false . |
useBase64Values |
Opsional: Digunakan dengan encoding pesan JSON. Jika ditetapkan ke true , kolom value adalah string berenkode Base64.
Jika tidak, nilai akan dihasilkan menggunakan bigtableChangeStreamCharset untuk mendekode byte ke dalam string. Setelan defaultnya adalah false . |
dlqMaxRetries |
Opsional: Percobaan ulang maksimum huruf yang mati. Nilai defaultnya adalah 5 . |
dlqRetryMinutes |
Opsional: Jumlah menit antara percobaan ulang antrean yang dihentikan pengirimannya. Nilai defaultnya adalah 10 . |
dlqDirectory |
Opsional: Direktori untuk antrean yang dihentikan pengirimannya. Kumpulan data yang gagal diproses akan disimpan di direktori ini. Defaultnya adalah direktori di bawah lokasi sementara tugas Dataflow. Pada umumnya, Anda dapat menggunakan jalur default. |
Menjalankan template
Konsol
- Buka halaman Create job from template Dataflow. Buka Buat tugas dari template
- Di kolom Job name, masukkan nama pekerjaan yang unik.
- Opsional: Untuk Endpoint regional, pilih nilai dari menu drop-down. Region
default-nya adalah
us-central1
.Untuk daftar region tempat Anda dapat menjalankan tugas Dataflow, lihat Lokasi Dataflow.
- Dari menu drop-down Dataflow template, pilih the Bigtable change streams to Pub/Sub template.
- Di kolom parameter yang disediakan, masukkan parameter value Anda.
- Klik Run job.
gcloud
Di shell atau terminal Anda, jalankan template:
gcloud dataflow flex-template run JOB_NAME \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_PubSub \ --parameters \ bigtableReadInstanceId=BIGTABLE_INSTANCE_ID,\ bigtableReadTableId=BIGTABLE_TABLE_ID,\ bigtableChangeStreamAppProfile=BIGTABLE_APPLICATION_PROFILE_ID,\ pubSubTopic=PUBSUB_TOPIC
Ganti kode berikut:
PROJECT_ID
: ID project Google Cloud tempat Anda ingin menjalankan tugas DataflowJOB_NAME
: nama pekerjaan unik pilihan AndaVERSION
: versi template yang ingin Anda gunakanAnda dapat menggunakan nilai berikut:
latest
untuk menggunakan versi terbaru template, yang tersedia di folder induk tidak bertanggal di bucket— gs://dataflow-templates-REGION_NAME/latest/- nama versi, seperti
2023-09-12-00_RC00
, untuk menggunakan versi template tertentu, yang dapat ditemukan bertingkat di folder induk bertanggal masing-masing dalam bucket— gs://dataflow-templates-REGION_NAME/
REGION_NAME
: region tempat Anda ingin men-deploy tugas Dataflow, misalnyaus-central1
BIGTABLE_INSTANCE_ID
: ID instance Bigtable Anda.BIGTABLE_TABLE_ID
: ID tabel Bigtable Anda.BIGTABLE_APPLICATION_PROFILE_ID
: ID profil aplikasi Bigtable Anda.PUBSUB_TOPIC
: nama topik tujuan Pub/Sub
API
Untuk menjalankan template menggunakan REST API, kirim permintaan HTTP POST. Untuk informasi selengkapnya tentang API dan cakupan otorisasinya, lihat projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_PubSub", "parameters": { "bigtableReadInstanceId": "BIGTABLE_INSTANCE_ID", "bigtableReadTableId": "BIGTABLE_TABLE_ID", "bigtableChangeStreamAppProfile": "BIGTABLE_APPLICATION_PROFILE_ID", "pubSubTopic": "PUBSUB_TOPIC" } } }
Ganti kode berikut:
PROJECT_ID
: ID project Google Cloud tempat Anda ingin menjalankan tugas DataflowJOB_NAME
: nama pekerjaan unik pilihan AndaVERSION
: versi template yang ingin Anda gunakanAnda dapat menggunakan nilai berikut:
latest
untuk menggunakan versi terbaru template, yang tersedia di folder induk tidak bertanggal di bucket— gs://dataflow-templates-REGION_NAME/latest/- nama versi, seperti
2023-09-12-00_RC00
, untuk menggunakan versi template tertentu, yang dapat ditemukan bertingkat di folder induk bertanggal masing-masing dalam bucket— gs://dataflow-templates-REGION_NAME/
LOCATION
: region tempat Anda ingin men-deploy tugas Dataflow, misalnyaus-central1
BIGTABLE_INSTANCE_ID
: ID instance Bigtable Anda.BIGTABLE_TABLE_ID
: ID tabel Bigtable Anda.BIGTABLE_APPLICATION_PROFILE_ID
: ID profil aplikasi Bigtable Anda.PUBSUB_TOPIC
: nama topik tujuan Pub/Sub
Langkah selanjutnya
- Pelajari Template Dataflow.
- Lihat daftar template yang disediakan Google.