Halaman ini menjelaskan aliran perubahan di Spanner untuk database dialek GoogleSQL dan database dialek PostgreSQL, termasuk:
- Model partisi berbasis pemisahan
- Format dan konten kumpulan data aliran perubahan
- Sintaksis tingkat rendah yang digunakan untuk membuat kueri kumpulan data tersebut
- Contoh alur kerja kueri
Anda menggunakan Spanner API untuk mengkueri aliran data perubahan secara langsung. Aplikasi yang menggunakan Dataflow untuk membaca data stream perubahan tidak perlu bekerja langsung dengan model data yang dijelaskan di sini.
Untuk panduan pengantar yang lebih luas tentang aliran perubahan, lihat Ringkasan aliran perubahan.
Mengubah partisi aliran data
Saat perubahan terjadi pada tabel yang dipantau oleh aliran perubahan, Spanner akan menulis data aliran perubahan yang sesuai di database, secara sinkron dalam transaksi yang sama dengan perubahan data. Hal ini berarti bahwa jika transaksi berhasil, Spanner juga telah berhasil merekam dan mempertahankan perubahan. Secara internal, Spanner menempatkan bersama data aliran perubahan dan perubahan data sehingga diproses oleh server yang sama untuk meminimalkan overhead operasi tulis.
Sebagai bagian dari DML ke bagian tertentu, Spanner menambahkan operasi tulis ke bagian data streaming perubahan yang sesuai dalam transaksi yang sama. Karena colocation ini, aliran perubahan tidak menambahkan koordinasi tambahan di seluruh resource penayangan, yang meminimalkan overhead commit transaksi.
Spanner diskalakan dengan membagi dan menggabungkan data secara dinamis berdasarkan beban dan ukuran database, serta mendistribusikan bagian di seluruh resource penayangan.
Untuk mengaktifkan operasi tulis dan baca aliran data perubahan agar dapat diskalakan, Spanner membagi dan menggabungkan penyimpanan aliran data perubahan internal bersama dengan data database, sehingga secara otomatis menghindari hotspot. Untuk mendukung pembacaan data aliran data perubahan secara mendekati real-time saat penulisan database diskalakan, Spanner API dirancang agar aliran data perubahan dikueri secara serentak menggunakan partisi aliran data perubahan. Partisi aliran perubahan dipetakan ke pembagian data aliran perubahan yang berisi kumpulan data aliran perubahan. Partisi aliran data perubahan berubah secara dinamis dari waktu ke waktu dan berkorelasi dengan cara Spanner memisahkan dan menggabungkan data database secara dinamis.
Partisi aliran perubahan berisi data untuk rentang kunci yang tidak dapat diubah selama rentang waktu tertentu. Setiap partisi aliran perubahan dapat dibagi menjadi satu atau beberapa partisi aliran perubahan, atau digabungkan dengan partisi aliran perubahan lainnya. Saat peristiwa pemisahan atau penggabungan ini terjadi, partisi turunan dibuat untuk merekam perubahan untuk rentang kunci masing-masing yang tidak dapat diubah untuk rentang waktu berikutnya. Selain data perubahan, kueri aliran perubahan menampilkan kumpulan data turunan untuk memberi tahu pembaca tentang partisi aliran perubahan baru yang perlu di-kueri, serta kumpulan data heartbeat untuk menunjukkan progres maju jika tidak ada penulisan yang terjadi baru-baru ini.
Saat membuat kueri partisi aliran perubahan tertentu, kumpulan data perubahan akan ditampilkan dalam urutan stempel waktu commit. Setiap data perubahan ditampilkan tepat sekali. Di seluruh partisi aliran perubahan, pengurutan kumpulan data perubahan tidak dijamin. Data perubahan untuk kunci utama tertentu hanya ditampilkan di satu partisi untuk rentang waktu tertentu.
Karena garis keturunan partisi induk-turunan, untuk memproses perubahan pada kunci tertentu dalam urutan stempel waktu commit, data yang ditampilkan dari partisi turunan hanya boleh diproses setelah data dari semua partisi induk diproses.
Fungsi baca streaming perubahan dan sintaksis kueri
GoogleSQL
Untuk membuat kueri aliran perubahan, gunakan
ExecuteStreamingSql
API. Spanner otomatis membuat fungsi baca khusus
bersama dengan aliran perubahan. Fungsi baca memberikan akses ke
data aliran perubahan. Konvensi penamaan fungsi baca adalah READ_change_stream_name
.
Dengan asumsi bahwa aliran perubahan SingersNameStream
ada di database, sintaksis kueri untuk GoogleSQL adalah sebagai berikut:
SELECT ChangeRecord
FROM READ_SingersNameStream (
start_timestamp,
end_timestamp,
partition_token,
heartbeat_milliseconds,
read_options
)
Fungsi baca menerima argumen berikut:
Nama argumen | Jenis | Wajib? | Deskripsi |
---|---|---|---|
start_timestamp |
TIMESTAMP |
Wajib | Menentukan bahwa data dengan commit_timestamp lebih besar
dari atau sama dengan start_timestamp harus ditampilkan. Nilai
harus berada dalam periode retensi aliran perubahan, dan harus
kurang dari atau sama dengan waktu saat ini, dan lebih besar dari atau sama dengan
stempel waktu pembuatan aliran perubahan. |
end_timestamp |
TIMESTAMP |
Opsional (Default: NULL ) |
Menentukan bahwa data dengan commit_timestamp kurang dari atau sama dengan end_timestamp harus ditampilkan. Nilai
harus berada dalam periode retensi stream perubahan
dan lebih besar atau sama dengan start_timestamp . Kueri
selesai setelah menampilkan semua ChangeRecords
hingga end_timestamp atau saat Anda menghentikan
koneksi. Jika end_timestamp disetel ke NULL
atau tidak ditentukan, kueri akan melanjutkan eksekusi hingga semua
ChangeRecords ditampilkan atau hingga Anda menghentikan
koneksi. |
partition_token |
STRING |
Opsional (Default: NULL ) |
Menentukan partisi aliran perubahan yang akan dikueri, berdasarkan
konten kumpulan data
partisi turunan. Jika NULL atau tidak ditentukan, artinya pembaca membuat kueri aliran perubahan untuk pertama kalinya, dan belum mendapatkan token partisi tertentu untuk membuat kueri. |
heartbeat_milliseconds |
INT64 |
Wajib | Menentukan frekuensi ChangeRecord heartbeat
ditampilkan jika tidak ada transaksi yang di-commit di
partisi ini.
Nilai harus antara 1,000 (satu detik) dan
300,000 (lima menit). |
read_options |
ARRAY |
Opsional (Default: NULL ) |
Menambahkan opsi baca yang dicadangkan untuk penggunaan di masa mendatang. Satu-satunya
nilai yang diizinkan adalah NULL . |
Sebaiknya buat metode bantuan untuk membuat teks kueri fungsi baca dan parameter binding ke dalamnya, seperti yang ditunjukkan dalam contoh berikut.
Java
private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE = "SELECT ChangeRecord FROM READ_SingersNameStream" + "(" + " start_timestamp => @startTimestamp," + " end_timestamp => @endTimestamp," + " partition_token => @partitionToken," + " heartbeat_milliseconds => @heartbeatMillis" + ")"; // Helper method to conveniently create change stream query texts and // bind parameters. public static Statement getChangeStreamQuery( String partitionToken, Timestamp startTimestamp, Timestamp endTimestamp, long heartbeatMillis) { return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE) .bind("startTimestamp") .to(startTimestamp) .bind("endTimestamp") .to(endTimestamp) .bind("partitionToken") .to(partitionToken) .bind("heartbeatMillis") .to(heartbeatMillis) .build(); }
PostgreSQL
Untuk membuat kueri aliran perubahan, gunakan
ExecuteStreamingSql
API. Spanner otomatis membuat fungsi baca khusus
bersama dengan aliran perubahan. Fungsi baca memberikan akses ke
data aliran perubahan. Konvensi penamaan fungsi baca adalah spanner.read_json_change_stream_name
.
Dengan asumsi bahwa aliran perubahan SingersNameStream
ada di database, sintaksis kueri untuk PostgreSQL adalah sebagai berikut:
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
start_timestamp,
end_timestamp,
partition_token,
heartbeat_milliseconds,
null
)
Fungsi baca menerima argumen berikut:
Nama argumen | Jenis | Wajib? | Deskripsi |
---|---|---|---|
start_timestamp |
timestamp with time zone |
Wajib | Menentukan bahwa data perubahan dengan commit_timestamp
lebih besar dari atau sama dengan start_timestamp
harus ditampilkan. Nilai harus berada dalam periode retensi streaming perubahan, dan harus kurang dari atau sama dengan waktu saat ini, dan lebih besar dari atau sama dengan stempel waktu pembuatan streaming perubahan. |
end_timestamp |
timestamp with timezone |
Opsional (Default: NULL ) |
Menentukan bahwa data perubahan dengan commit_timestamp
kurang dari atau sama dengan end_timestamp harus
ditampilkan. Nilai harus berada dalam periode retensi stream perubahan dan lebih besar atau sama dengan start_timestamp .
Kueri selesai setelah menampilkan semua data perubahan hingga
end_timestamp atau hingga Anda menghentikan koneksi.
Jika NULL , kueri akan melanjutkan eksekusi hingga semua data perubahan ditampilkan atau hingga Anda menghentikan koneksi. |
partition_token |
text |
Opsional (Default: NULL ) |
Menentukan partisi aliran perubahan yang akan dikueri, berdasarkan
konten kumpulan data
partisi turunan. Jika NULL atau tidak ditentukan, artinya pembaca membuat kueri aliran perubahan untuk pertama kalinya, dan belum mendapatkan token partisi tertentu untuk membuat kueri. |
heartbeat_milliseconds |
bigint |
Wajib | Menentukan seberapa sering ChangeRecord heartbeat
ditampilkan saat tidak ada transaksi yang di-commit di
partisi ini.
Nilainya harus antara 1,000 (satu detik) dan
300,000 (lima menit). |
null |
null |
Wajib | Disimpan untuk penggunaan di masa mendatang |
Sebaiknya buat metode bantuan untuk membuat teks fungsi baca dan parameter binding ke fungsi tersebut, seperti yang ditunjukkan dalam contoh berikut.
Java
private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE = "SELECT * FROM \"spanner\".\"read_json_SingersNameStream\"" + "($1, $2, $3, $4, null)"; // Helper method to conveniently create change stream query texts and // bind parameters. public static Statement getChangeStreamQuery( String partitionToken, Timestamp startTimestamp, Timestamp endTimestamp, long heartbeatMillis) { return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE) .bind("p1") .to(startTimestamp) .bind("p2") .to(endTimestamp) .bind("p3") .to(partitionToken) .bind("p4") .to(heartbeatMillis) .build(); }
Mengubah format kumpulan data aliran data
GoogleSQL
Fungsi baca aliran perubahan menampilkan satu kolom ChangeRecord
dari
jenis ARRAY<STRUCT<...>>
. Di setiap baris, array ini selalu berisi satu elemen.
Elemen array memiliki jenis berikut:
STRUCT < data_change_record ARRAY<STRUCT<...>>, heartbeat_record ARRAY<STRUCT<...>>, child_partitions_record ARRAY<STRUCT<...>> >
Ada tiga kolom dalam STRUCT
ini: data_change_record
,
heartbeat_record
, dan child_partitions_record
, masing-masing berjenis
ARRAY<STRUCT<...>>
. Di baris mana pun yang ditampilkan oleh fungsi baca aliran perubahan, hanya satu dari tiga kolom ini yang berisi nilai; dua kolom lainnya kosong atau NULL
. Kolom array ini berisi maksimal satu elemen.
Bagian berikut akan memeriksa setiap jenis data tersebut.
PostgreSQL
Fungsi baca aliran perubahan menampilkan satu kolom ChangeRecord
dari
jenis JSON
dengan struktur berikut:
{
"data_change_record" : {},
"heartbeat_record" : {},
"child_partitions_record" : {}
}
Ada tiga kemungkinan kunci dalam objek ini: data_change_record
,
heartbeat_record
, dan child_partitions_record
, jenis nilai
yang sesuai adalah JSON
. Di baris mana pun yang ditampilkan oleh fungsi baca aliran perubahan,
hanya ada satu dari tiga kunci ini.
Bagian berikut akan memeriksa setiap jenis data tersebut.
Data mengubah catatan
Data perubahan data berisi kumpulan perubahan pada tabel dengan jenis modifikasi yang sama (menyisipkan, memperbarui, atau menghapus) yang di-commit pada stempel waktu commit yang sama dalam satu partisi aliran perubahan untuk transaksi yang sama. Beberapa data perubahan data dapat ditampilkan untuk transaksi yang sama di beberapa partisi aliran perubahan.
Semua kumpulan data perubahan memiliki kolom commit_timestamp
, server_transaction_id
, dan record_sequence
, yang bersama-sama menentukan urutan dalam aliran perubahan untuk kumpulan data aliran. Tiga kolom ini sudah cukup untuk mendapatkan
urutan perubahan dan memberikan konsistensi eksternal.
Perhatikan bahwa beberapa transaksi dapat memiliki stempel waktu commit yang sama jika
data yang disentuh tidak tumpang-tindih. Kolom server_transaction_id
menawarkan kemampuan untuk membedakan kumpulan perubahan mana (yang mungkin di seluruh partisi aliran perubahan) yang dikeluarkan dalam transaksi yang sama. Dengan menyambungkan kolom record_sequence
dan
number_of_records_in_transaction
, Anda juga dapat melakukan buffering dan mengurutkan
semua data dari transaksi tertentu.
Kolom data perubahan data mencakup hal berikut:
GoogleSQL
Kolom | Jenis | Deskripsi |
---|---|---|
commit_timestamp |
TIMESTAMP |
Menunjukkan stempel waktu saat perubahan dilakukan. |
record_sequence |
STRING |
Menunjukkan nomor urutan untuk kumpulan data dalam transaksi.
Nomor urutan bersifat unik dan meningkat secara monoton (tetapi tidak
harus berurutan) dalam transaksi. Urutkan data untuk server_transaction_id
yang sama menurut record_sequence untuk
merekonstruksi pengurutan perubahan dalam transaksi.
Spanner dapat mengoptimalkan pengurutan ini untuk performa yang lebih baik
dan mungkin tidak selalu cocok dengan pengurutan asli yang Anda berikan. |
server_transaction_id |
STRING |
Memberikan string unik secara global yang mewakili transaksi tempat perubahan dilakukan. Nilai ini hanya boleh digunakan dalam konteks pemrosesan data aliran perubahan dan tidak dikorelasi dengan ID transaksi di API Spanner. |
is_last_record_in_transaction_in_partition |
BOOL |
Menunjukkan apakah ini adalah data terakhir untuk transaksi dalam partisi saat ini. |
table_name |
STRING |
Nama tabel yang terpengaruh oleh perubahan tersebut. |
value_capture_type |
STRING |
Menjelaskan jenis pengambilan nilai yang ditentukan dalam konfigurasi aliran perubahan saat perubahan ini diambil. Jenis pengambilan nilai dapat berupa salah satu dari berikut:
Secara default, nilainya adalah |
column_types |
[ { "name": "STRING", "type": { "code": "STRING" }, "is_primary_key": BOOLEAN "ordinal_position": NUMBER }, ... ] |
Menunjukkan nama kolom, jenis kolom,
apakah itu kunci utama, dan posisi kolom seperti
yang ditentukan dalam skema (ordinal_position ). Kolom pertama
tabel dalam skema akan memiliki posisi ordinal 1 . Jenis
kolom dapat disusun bertingkat untuk kolom array. Formatnya cocok dengan struktur
jenis yang dijelaskan dalam referensi Spanner API.
|
mods |
[ { "keys": {"STRING" : "STRING"}, "new_values": { "STRING" : "VALUE-TYPE", [...] }, "old_values": { "STRING" : "VALUE-TYPE", [...] }, }, [...] ] |
Menjelaskan perubahan yang dilakukan, termasuk nilai kunci utama, nilai lama, dan nilai baru dari kolom yang diubah atau dilacak.
Ketersediaan dan konten nilai lama dan baru bergantung pada
value_capture_type yang dikonfigurasi. Kolom new_values dan
old_values hanya berisi kolom non-kunci. |
mod_type |
STRING |
Menjelaskan jenis perubahan. Salah satu dari INSERT ,
UPDATE , atau DELETE . |
number_of_records_in_transaction |
INT64 |
Menunjukkan jumlah data perubahan yang merupakan bagian dari transaksi ini di semua partisi aliran perubahan. |
number_of_partitions_in_transaction |
INT64 |
Menunjukkan jumlah partisi yang menampilkan data perubahan data untuk transaksi ini. |
transaction_tag |
STRING |
Menunjukkan Tag transaksi yang terkait dengan transaksi ini. |
is_system_transaction |
BOOL |
Menunjukkan apakah transaksi merupakan transaksi sistem. |
PostgreSQL
Kolom | Jenis | Deskripsi |
---|---|---|
commit_timestamp |
STRING |
Menunjukkan stempel waktu saat perubahan dilakukan. |
record_sequence |
STRING |
Menunjukkan nomor urutan untuk kumpulan data dalam transaksi.
Nomor urutan bersifat unik dan meningkat secara monoton (tetapi tidak
harus berurutan) dalam transaksi. Urutkan data untuk server_transaction_id
yang sama menurut record_sequence untuk
merekonstruksi pengurutan perubahan dalam transaksi. |
server_transaction_id |
STRING |
Memberikan string unik secara global yang mewakili transaksi tempat perubahan dilakukan. Nilai hanya boleh digunakan dalam konteks pemrosesan data aliran perubahan dan tidak dikorelasi dengan ID transaksi di API Spanner |
is_last_record_in_transaction_in_partition |
BOOLEAN |
Menunjukkan apakah ini adalah data terakhir untuk transaksi dalam partisi saat ini. |
table_name |
STRING |
Menunjukkan nama tabel yang terpengaruh oleh perubahan tersebut. |
value_capture_type |
STRING |
Menjelaskan jenis pengambilan nilai yang ditentukan dalam konfigurasi aliran perubahan saat perubahan ini diambil. Jenis pengambilan nilai dapat berupa salah satu dari berikut:
Secara default, nilainya adalah |
column_types |
[ { "name": "STRING", "type": { "code": "STRING" }, "is_primary_key": BOOLEAN "ordinal_position": NUMBER }, ... ] |
Menunjukkan nama kolom, jenis kolom,
apakah itu kunci utama, dan posisi kolom seperti
yang ditentukan dalam skema (ordinal_position ). Kolom pertama
tabel dalam skema akan memiliki posisi ordinal 1 . Jenis
kolom dapat disusun bertingkat untuk kolom array. Formatnya cocok dengan struktur
jenis yang dijelaskan dalam
referensi Spanner API.
|
mods |
[ { "keys": {"STRING" : "STRING"}, "new_values": { "STRING" : "VALUE-TYPE", [...] }, "old_values": { "STRING" : "VALUE-TYPE", [...] }, }, [...] ] |
Menjelaskan perubahan yang dilakukan, termasuk nilai kunci utama, nilai lama, dan nilai baru dari kolom yang diubah atau dilacak. Ketersediaan dan konten nilai lama dan baru bergantung
pada value_capture_type yang dikonfigurasi. Kolom
new_values dan old_values hanya berisi
kolom non-kunci.
|
mod_type |
STRING |
Menjelaskan jenis perubahan. Salah satu dari INSERT ,
UPDATE , atau DELETE . |
number_of_records_in_transaction |
INT64 |
Menunjukkan jumlah data perubahan yang merupakan bagian dari transaksi ini di semua partisi aliran perubahan. |
number_of_partitions_in_transaction |
NUMBER |
Menunjukkan jumlah partisi yang menampilkan data perubahan data untuk transaksi ini. |
transaction_tag |
STRING |
Menunjukkan Tag transaksi yang terkait dengan transaksi ini. |
is_system_transaction |
BOOLEAN |
Menunjukkan apakah transaksi merupakan transaksi sistem. |
Contoh data perubahan data
Berikut adalah contoh sepasang data perubahan data. Kedua tabel tersebut menjelaskan satu transaksi dengan transfer antara dua akun. Kedua akun berada dalam partisi aliran perubahan terpisah.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 1000
},
"old_values": {
"LastUpdate": "2022-09-26T11:28:00.189413Z",
"Balance": 1500
},
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "OLD_AND_NEW_VALUES",
"number_of_records_in_transaction": 2,
"number_of_partitions_in_transaction": 2,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false,
}
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
"record_sequence": "00000001",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id2"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 2000
},
"old_values": {
"LastUpdate": "2022-01-20T11:25:00.199915Z",
"Balance": 1500
},
},
...
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "OLD_AND_NEW_VALUES",
"number_of_records_in_transaction": 2,
"number_of_partitions_in_transaction": 2,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false,
}
Kumpulan data perubahan data berikut adalah contoh kumpulan data dengan jenis pengambilan nilai NEW_VALUES
. Perhatikan bahwa hanya nilai baru yang diisi.
Hanya kolom LastUpdate
yang diubah, sehingga hanya kolom tersebut
yang ditampilkan.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z"
},
"old_values": {}
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "NEW_VALUES",
"number_of_records_in_transaction": 1,
"number_of_partitions_in_transaction": 1,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false
}
Kumpulan data perubahan data berikut adalah contoh kumpulan data dengan jenis pengambilan nilai NEW_ROW
. Hanya kolom LastUpdate
yang diubah, tetapi semua kolom yang dilacak ditampilkan.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 1000
},
"old_values": {}
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "NEW_ROW",
"number_of_records_in_transaction": 1,
"number_of_partitions_in_transaction": 1,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false
}
Kumpulan data perubahan data berikut adalah contoh kumpulan data dengan jenis pengambilan nilai NEW_ROW_AND_OLD_VALUES
. Hanya kolom LastUpdate
yang
diubah, tetapi semua kolom yang dilacak akan ditampilkan. Jenis pengambilan nilai ini menangkap
nilai baru dan nilai lama LastUpdate
.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 1000
},
"old_values": {
"LastUpdate": "2022-09-26T11:28:00.189413Z"
}
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "NEW_ROW_AND_OLD_VALUES",
"number_of_records_in_transaction": 1,
"number_of_partitions_in_transaction": 1,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false
}
Data heartbeat
Jika kumpulan data heartbeat ditampilkan, hal ini menunjukkan bahwa semua perubahan dengan
commit_timestamp
kurang dari atau sama dengan timestamp
kumpulan data heartbeat telah ditampilkan, dan kumpulan data mendatang di
partisi ini harus memiliki stempel waktu commit yang lebih tinggi daripada yang ditampilkan oleh
kumpulan data heartbeat. Data heartbeat ditampilkan saat tidak ada perubahan data yang ditulis ke partisi. Jika ada perubahan data yang ditulis ke
partisi, data_change_record.commit_timestamp
dapat digunakan sebagai pengganti
heartbeat_record.timestamp
untuk memberi tahu bahwa pembaca membuat progres
maju dalam membaca partisi.
Anda dapat menggunakan data heartbeat yang ditampilkan di partisi untuk menyinkronkan pembaca di semua partisi. Setelah semua pembaca menerima
heartbeat yang lebih besar dari atau sama dengan beberapa stempel waktu A
atau telah menerima data atau
data partisi turunan yang lebih besar dari atau sama dengan stempel waktu A
, pembaca tahu
bahwa mereka telah menerima semua data yang di-commit pada atau sebelum stempel waktu A
tersebut dan dapat
mulai memproses data yang di-buffer—misalnya, mengurutkan data lintas-partisi
berdasarkan stempel waktu dan mengelompokkan data tersebut berdasarkan server_transaction_id
.
Data heartbeat hanya berisi satu kolom:
GoogleSQL
Kolom | Jenis | Deskripsi |
---|---|---|
timestamp |
TIMESTAMP |
Menunjukkan stempel waktu data heartbeat. |
PostgreSQL
Kolom | Jenis | Deskripsi |
---|---|---|
timestamp |
STRING |
Menunjukkan stempel waktu data heartbeat. |
Contoh data heartbeat
Contoh data heartbeat, yang menyampaikan bahwa semua data dengan stempel waktu yang kurang dari atau sama dengan stempel waktu data ini telah ditampilkan:
heartbeat_record: {
"timestamp": "2022-09-27T12:35:00.312486Z"
}
Data partisi turunan
Data partisi turunan menampilkan informasi tentang partisi turunan: token partisi, token partisi induk, dan
start_timestamp
yang mewakili stempel waktu paling awal yang berisi data perubahan
partisi turunan. Data yang stempel waktu commit-nya
langsung sebelum child_partitions_record.start_timestamp
ditampilkan di partisi saat ini. Setelah menampilkan semua
data partisi turunan untuk partisi ini, kueri ini akan ditampilkan dengan
status berhasil, yang menunjukkan bahwa semua data telah ditampilkan untuk
partisi ini.
Kolom data partisi turunan mencakup hal berikut:
GoogleSQL
Kolom | Jenis | Deskripsi |
---|---|---|
start_timestamp |
TIMESTAMP |
Menunjukkan bahwa data perubahan data yang ditampilkan dari partisi
turunan dalam data perubahan partisi turunan ini memiliki stempel waktu commit
yang lebih besar dari atau sama dengan start_timestamp . Saat membuat kueri pada partisi turunan, kueri harus menentukan token partisi turunan dan start_timestamp yang lebih besar dari atau sama dengan child_partitions_token.start_timestamp . Semua data partisi turunan
yang ditampilkan oleh partisi memiliki start_timestamp yang sama
dan stempel waktu selalu berada di antara start_timestamp dan end_timestamp
yang ditentukan kueri. |
record_sequence |
STRING |
Menunjukkan nomor urutan yang meningkat secara monoton yang dapat digunakan untuk
menentukan pengurutan catatan partisi turunan jika ada beberapa
catatan partisi turunan yang ditampilkan dengan start_timestamp yang sama
di partisi tertentu. Token partisi, start_timestamp , dan record_sequence secara unik mengidentifikasi data partisi turunan.
|
child_partitions |
[ { "token" : "STRING", "parent_partition_tokens" : ["STRING"] } ] |
Menampilkan kumpulan partisi turunan dan informasi terkaitnya. Ini mencakup string token partisi yang digunakan untuk mengidentifikasi partisi turunan dalam kueri, serta token partisi induknya. |
PostgreSQL
Kolom | Jenis | Deskripsi |
---|---|---|
start_timestamp |
STRING |
Menunjukkan bahwa data perubahan yang ditampilkan dari partisi turunan dalam data perubahan partisi turunan ini memiliki stempel waktu commit yang lebih besar dari atau sama dengan start_timestamp . Saat membuat kueri pada partisi turunan, kueri harus menentukan token partisi turunan dan start_timestamp yang lebih besar dari atau sama dengan child_partitions_token.start_timestamp . Semua data partisi turunan
yang ditampilkan oleh partisi memiliki start_timestamp yang sama
dan stempel waktu selalu berada di antara start_timestamp dan end_timestamp
yang ditentukan kueri.
|
record_sequence |
STRING |
Menunjukkan nomor urutan yang meningkat secara monoton yang dapat digunakan untuk
menentukan pengurutan catatan partisi turunan jika ada beberapa
catatan partisi turunan yang ditampilkan dengan start_timestamp yang sama
di partisi tertentu. Token partisi, start_timestamp , dan record_sequence secara unik mengidentifikasi data partisi turunan.
|
child_partitions |
[ { "token": "STRING", "parent_partition_tokens": ["STRING"], }, [...] ] |
Menampilkan array partisi turunan dan informasi terkaitnya. Hal ini mencakup string token partisi yang digunakan untuk mengidentifikasi partisi turunan dalam kueri, serta token partisi induknya. |
Contoh data partisi turunan
Berikut adalah contoh data partisi turunan:
child_partitions_record: {
"start_timestamp": "2022-09-27T12:40:00.562986Z",
"record_sequence": "00000001",
"child_partitions": [
{
"token": "child_token_1",
// To make sure changes for a key is processed in timestamp
// order, wait until the records returned from all parents
// have been processed.
"parent_partition_tokens": ["parent_token_1", "parent_token_2"]
}
],
}
Alur kerja kueri aliran data perubahan
Jalankan kueri aliran perubahan menggunakan
ExecuteStreamingSql
API, dengan transaksi
hanya baca sekali pakai dan
batas stempel waktu yang kuat. Fungsi baca
aliran perubahan memungkinkan Anda menentukan start_timestamp
dan
end_timestamp
untuk rentang waktu yang diinginkan. Semua data perubahan
dalam periode retensi dapat diakses menggunakan batas stempel waktu
hanya baca yang kuat.
Semua TransactionOptions
lainnya tidak valid untuk kueri aliran perubahan. Selain itu,
jika TransactionOptions.read_only.return_read_timestamp
ditetapkan ke true
,
nilai khusus kint64max - 1
akan ditampilkan dalam pesan Transaction
yang menjelaskan transaksi, bukan stempel waktu
baca yang valid. Nilai khusus ini harus dihapus dan tidak digunakan untuk kueri berikutnya.
Setiap kueri aliran perubahan dapat menampilkan sejumlah baris, yang masing-masing berisi data perubahan data, data heartbeat, atau data partisi turunan. Anda tidak perlu menetapkan batas waktu untuk permintaan tersebut.
Contoh alur kerja kueri aliran perubahan
Alur kerja kueri streaming dimulai dengan menerbitkan kueri aliran perubahan pertama
dengan menentukan partition_token
ke NULL
. Kueri harus menentukan fungsi baca untuk aliran perubahan, stempel waktu awal dan akhir yang diinginkan, serta interval heartbeat. Jika end_timestamp
adalah NULL
, kueri akan terus
menampilkan perubahan data hingga partisi berakhir.
GoogleSQL
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:00Z",
end_timestamp => NULL,
partition_token => NULL,
heartbeat_milliseconds => 10000
);
PostgreSQL
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:00Z',
NULL,
NULL,
10000,
NULL
) ;
Proses data dari kueri ini hingga semua data partisi turunan ditampilkan. Dalam contoh berikut, dua kumpulan data partisi turunan dan tiga token partisi ditampilkan, lalu kueri dihentikan. Data partisi turunan dari kueri tertentu selalu memiliki start_timestamp
yang sama.
child_partitions_record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01T09:00:01Z",
"record_sequence": 1000012389,
"child_partitions": [
{
"token": "child_token_1",
// Note parent tokens are null for child partitions returned
// from the initial change stream queries.
"parent_partition_tokens": [NULL]
}
{
"token": "child_token_2",
"parent_partition_tokens": [NULL]
}
],
}
child_partitions_record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01T09:00:01Z",
"record_sequence": 1000012390,
"child_partitions": [
{
"token": "child_token_3",
"parent_partition_tokens": [NULL]
}
],
}
Untuk memproses perubahan setelah 2022-05-01T09:00:01Z
, buat tiga kueri baru dan
jalankan secara paralel. Jika digunakan bersama, ketiga kueri akan menampilkan perubahan data untuk
rentang kunci yang sama dengan yang dicakup induknya. Selalu tetapkan start_timestamp
ke
start_timestamp
dalam data partisi turunan yang sama dan gunakan end_timestamp
dan interval heartbeat yang sama untuk memproses data secara konsisten
di semua kueri.
GoogleSQL
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:01Z",
end_timestamp => NULL,
partition_token => "child_token_1",
heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:01Z",
end_timestamp => NULL,
partition_token => "child_token_2",
heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:01Z",
end_timestamp => NULL,
partition_token => "child_token_3",
heartbeat_milliseconds => 10000
);
PostgreSQL
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:01Z',
NULL,
'child_token_1',
10000,
NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:01Z',
NULL,
'child_token_2',
10000,
NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:01Z',
NULL,
'child_token_3',
10000,
NULL
);
Kueri di child_token_2
selesai setelah menampilkan data
partisi turunan lainnya. Data ini menunjukkan bahwa partisi baru mencakup perubahan untuk
child_token_2
dan child_token_3
mulai dari 2022-05-01T09:30:15Z
. Data
yang sama persis ditampilkan oleh kueri di child_token_3
, karena keduanya
adalah partisi induk dari child_token_4
baru. Untuk memastikan pemrosesan data yang terurut
ketat untuk kunci tertentu, kueri di child_token_4
harus dimulai setelah semua induk selesai. Dalam hal ini, induknya adalah
child_token_2
dan child_token_3
. Hanya buat satu kueri untuk setiap token
partisi turunan. Desain alur kerja kueri harus menunjuk satu induk untuk menunggu dan
menjadwalkan kueri di child_token_4
.
child_partitions_record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01T09:30:15Z",
"record_sequence": 1000012389,
"child_partitions": [
{
"token": "child_token_4",
"parent_partition_tokens": [child_token_2, child_token_3],
}
],
}
GoogleSQL
SELECT ChangeRecord FROM READ_SingersNameStream(
start_timestamp => "2022-05-01T09:30:15Z",
end_timestamp => NULL,
partition_token => "child_token_4",
heartbeat_milliseconds => 10000
);
PostgreSQL
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:30:15Z',
NULL,
'child_token_4',
10000,
NULL
);
Temukan contoh penanganan dan penguraian data aliran perubahan di konektor Dataflow SpannerIO Apache Beam di GitHub.