Halaman ini menjelaskan atribut aliran perubahan berikut secara mendetail:
- Model partisi berbasis terpisah
- Format dan konten rekaman aliran data perubahan
- Sintaks tingkat rendah yang digunakan untuk melakukan kueri terhadap kumpulan data tersebut
- Contoh alur kerja kueri
Informasi di halaman ini paling relevan untuk menggunakan Spanner API guna melakukan kueri aliran data perubahan secara langsung. Aplikasi yang menggunakan Dataflow untuk membaca data aliran data perubahan tidak perlu bekerja langsung dengan model data yang dijelaskan di sini.
Untuk panduan pengantar yang lebih luas terkait aliran data perubahan, lihat Ringkasan aliran perubahan.
Mengubah partisi stream
Ketika perubahan terjadi pada tabel yang dipantau oleh aliran perubahan, Spanner akan menulis catatan aliran perubahan yang terkait dalam database, secara sinkron dalam transaksi yang sama dengan perubahan data. Hal ini menjamin bahwa jika transaksi berhasil, Spanner juga telah berhasil menangkap dan mempertahankan perubahan. Secara internal, Spanner bersama-sama menemukan rekaman aliran perubahan dan perubahan data sehingga diproses oleh server yang sama untuk meminimalkan overhead operasi tulis.
Sebagai bagian dari DML ke pemisahan tertentu, Spanner menambahkan operasi tulis ke bagian data aliran perubahan yang sesuai dalam transaksi yang sama. Karena kolokasi ini, aliran data perubahan tidak menambahkan koordinasi tambahan di seluruh resource inferensi, yang meminimalkan overhead commit transaksi.
Spanner melakukan penskalaan dengan memisahkan dan menggabungkan data secara dinamis berdasarkan ukuran dan muatan database, serta mendistribusikan pemisahan di seluruh resource penyaluran.
Untuk mengaktifkan penulisan dan pembacaan aliran perubahan agar dapat diskalakan, Spanner membagi dan menggabungkan penyimpanan aliran perubahan internal bersama dengan data database, sehingga otomatis menghindari hotspot. Untuk mendukung pembacaan data aliran data perubahan hampir secara real-time seiring dengan skala penulisan database, Spanner API dirancang agar aliran perubahan dikueri secara serentak menggunakan partisi aliran perubahan. Ubah peta partisi aliran data untuk mengubah pemisahan data aliran data yang berisi catatan aliran data perubahan. Partisi aliran perubahan berubah secara dinamis dari waktu ke waktu dan berkorelasi dengan cara Spanner secara dinamis membagi dan menggabungkan data database.
Partisi aliran perubahan berisi kumpulan data untuk rentang kunci yang tidak dapat diubah selama rentang waktu tertentu. Setiap partisi stream perubahan dapat dibagi menjadi satu atau beberapa partisi stream perubahan, atau digabungkan dengan partisi stream perubahan lainnya. Saat peristiwa pemisahan atau penggabungan ini terjadi, partisi turunan akan dibuat untuk merekam perubahan masing-masing rentang kuncinya yang tidak dapat diubah untuk rentang waktu berikutnya. Selain catatan perubahan data, kueri aliran data perubahan menampilkan kumpulan data turunan untuk memberi tahu pembaca tentang partisi aliran perubahan baru yang perlu dikueri, serta catatan detak jantung untuk menunjukkan progres maju saat tidak ada operasi tulis yang terjadi baru-baru ini.
Saat membuat kueri untuk partisi aliran perubahan tertentu, data perubahan ditampilkan dalam urutan stempel waktu commit. Setiap pencatatan perubahan ditampilkan tepat satu kali. Di seluruh partisi aliran perubahan, tidak ada urutan kumpulan data perubahan yang dijamin. Kumpulan data perubahan untuk kunci utama tertentu hanya ditampilkan pada satu partisi selama rentang waktu tertentu.
Karena garis turunan partisi induk-turunan, untuk memproses perubahan untuk kunci tertentu dalam urutan stempel waktu commit, kumpulan data yang ditampilkan dari partisi turunan hanya boleh diproses setelah data dari semua partisi induk telah diproses.
Mengubah fungsi pembacaan aliran dan sintaksis kueri
GoogleSQL
Anda membuat kueri aliran data perubahan menggunakan
ExecuteStreamingSql
API. Spanner secara otomatis membuat fungsi baca khusus beserta
aliran perubahan. Fungsi baca menyediakan akses ke data aliran data perubahan. Konvensi penamaan fungsi baca adalah READ_change_stream_name
.
Dengan asumsi SingersNameStream
aliran perubahan data ada dalam database, sintaksis kueri untuk GoogleSQL adalah sebagai berikut:
SELECT ChangeRecord
FROM READ_SingersNameStream (
start_timestamp,
end_timestamp,
partition_token,
heartbeat_milliseconds,
read_options
)
Fungsi baca menerima argumen berikut:
Nama argumen | Jenis | Wajib? | Deskripsi |
---|---|---|---|
start_timestamp |
TIMESTAMP |
Diperlukan | Menentukan bahwa data dengan commit_timestamp yang lebih besar dari atau sama dengan start_timestamp
harus ditampilkan. Nilainya harus dalam periode retensi data aliran perubahan, dan harus kurang dari atau sama dengan waktu saat ini, dan lebih besar dari atau sama dengan stempel waktu pembuatan aliran perubahan. |
end_timestamp |
TIMESTAMP |
Opsional (Default: NULL ) |
Menentukan bahwa data dengan commit_timestamp kurang dari atau sama dengan end_timestamp harus ditampilkan. Nilai harus dalam periode retensi data perubahan
dan lebih besar atau sama dari start_timestamp . Kueri
selesai setelah menampilkan semua ChangeRecords hingga end_timestamp
atau pengguna menghentikan koneksi. Jika NULL atau tidak
ditentukan, kueri akan dijalankan hingga semua ChangeRecords ditampilkan atau
pengguna menghentikan koneksi. |
partition_token |
STRING |
Opsional (Default: NULL ) |
Menentukan partisi aliran data yang akan dikueri, berdasarkan konten data partisi turunan. Jika NULL atau tidak ditentukan, ini berarti pembaca membuat kueri aliran data perubahan untuk pertama kalinya, dan belum memperoleh token partisi tertentu untuk membuat kueri. |
heartbeat_milliseconds |
INT64 |
Diperlukan | Menentukan seberapa sering ChangeRecord detak jantung ditampilkan
jika tidak ada transaksi yang dilakukan dalam partisi ini.
Nilainya harus antara 1,000 (satu detik) dan 30,0000 (lima
menit). |
read_options |
ARRAY |
Opsional (Default: NULL ) |
Opsi baca tambahan yang dicadangkan untuk penggunaan pada masa mendatang. Saat ini, satu-satunya nilai yang diizinkan adalah NULL . |
Sebaiknya buat metode yang praktis untuk membuat teks kueri fungsi baca dan parameter binding ke kueri tersebut, seperti yang ditunjukkan dalam contoh berikut.
Java
private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE = "SELECT ChangeRecord FROM READ_SingersNameStream" + "(" + " start_timestamp => @startTimestamp," + " end_timestamp => @endTimestamp," + " partition_token => @partitionToken," + " heartbeat_milliseconds => @heartbeatMillis" + ")"; // Helper method to conveniently create change stream query texts and bind parameters. public static Statement getChangeStreamQuery( String partitionToken, Timestamp startTimestamp, Timestamp endTimestamp, long heartbeatMillis) { return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE) .bind("startTimestamp") .to(startTimestamp) .bind("endTimestamp") .to(endTimestamp) .bind("partitionToken") .to(partitionToken) .bind("heartbeatMillis") .to(heartbeatMillis) .build(); }
PostgreSQL
Anda membuat kueri aliran data perubahan menggunakan
ExecuteStreamingSql
API.
Spanner secara otomatis membuat fungsi baca khusus beserta
aliran perubahan. Fungsi baca menyediakan akses ke data aliran data perubahan. Konvensi penamaan fungsi baca adalah spanner.read_json_change_stream_name
.
Dengan asumsi SingersNameStream
aliran perubahan data ada dalam database, sintaksis kueri untuk PostgreSQL adalah sebagai berikut:
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
start_timestamp,
end_timestamp,
partition_token,
heartbeat_milliseconds,
null
)
Fungsi baca menerima argumen berikut:
Nama argumen | Jenis | Wajib? | Deskripsi |
---|---|---|---|
start_timestamp |
timestamp with time zone |
Diperlukan | Menentukan bahwa data perubahan dengan commit_timestamp yang lebih besar dari atau sama dengan start_timestamp
harus ditampilkan. Nilainya harus dalam periode retensi data aliran perubahan, dan harus kurang dari atau sama dengan waktu saat ini, dan lebih besar dari atau sama dengan stempel waktu pembuatan aliran perubahan. |
end_timestamp |
timestamp with timezone |
Opsional (Default: NULL ) |
Menentukan bahwa data perubahan dengan commit_timestamp
kurang dari atau sama dengan end_timestamp harus
ditampilkan. Nilai harus dalam periode retensi data perubahan
dan lebih besar atau sama dari start_timestamp .
Kueri selesai setelah menampilkan semua data perubahan hingga
end_timestamp atau pengguna menghentikan koneksi.
Jika NULL kueri dieksekusi hingga semua data perubahan ditampilkan
atau pengguna menghentikan koneksi. |
partition_token |
text |
Opsional (Default: NULL ) |
Menentukan partisi aliran data yang akan dikueri, berdasarkan konten data partisi turunan. Jika NULL atau tidak ditentukan, ini berarti pembaca membuat kueri aliran data perubahan untuk pertama kalinya, dan belum memperoleh token partisi tertentu untuk membuat kueri. |
heartbeat_milliseconds |
bigint |
Diperlukan | Menentukan seberapa sering ChangeRecord detak jantung akan ditampilkan jika tidak ada transaksi yang dilakukan dalam partisi ini.
Nilainya harus antara 1,000 (satu detik) dan 300,000 (lima
menit). |
null |
null |
Diperlukan | Disediakan untuk digunakan pada masa mendatang |
Sebaiknya buat metode yang praktis untuk membuat teks fungsi baca dan mengikat parameter ke teks tersebut, seperti ditunjukkan dalam contoh berikut.
Java
private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE = "SELECT * FROM \"spanner\".\"read_json_SingersNameStream\"" + "($1, $2, $3, $4, null)"; // Helper method to conveniently create change stream query texts and bind parameters. public static Statement getChangeStreamQuery( String partitionToken, Timestamp startTimestamp, Timestamp endTimestamp, long heartbeatMillis) { return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE) .bind("p1") .to(startTimestamp) .bind("p2") .to(endTimestamp) .bind("p3") .to(partitionToken) .bind("p4") .to(heartbeatMillis) .build(); }
Mengubah format catatan streaming
GoogleSQL
Fungsi baca aliran data perubahan menampilkan satu kolom ChangeRecord
dengan jenis
ARRAY<STRUCT<...>>
. Di setiap baris, array ini selalu berisi satu elemen.
Elemen array memiliki jenis berikut:
STRUCT <
data_change_record ARRAY<STRUCT<...>>,
heartbeat_record ARRAY<STRUCT<...>>,
child_partitions_record ARRAY<STRUCT<...>>
>
Ada tiga kolom dalam struct ini: data_change_record
,
heartbeat_record
, dan child_partitions_record
, masing-masing dari jenis
ARRAY<STRUCT<...>>
. Di setiap baris yang ditampilkan oleh fungsi pembacaan aliran perubahan, hanya satu dari tiga kolom ini yang berisi nilai; dua kolom lainnya
kosong atau NULL
. Kolom array ini berisi maksimal satu elemen.
Bagian berikut memeriksa ketiga jenis data ini.
PostgreSQL
Fungsi baca aliran perubahan menampilkan satu kolom ChangeRecord
dengan
jenis JSON
dengan struktur berikut:
{
"data_change_record" : {},
"heartbeat_record" : {},
"child_partitions_record" : {}
}
Ada tiga kemungkinan kunci dalam objek ini: data_change_record
, heartbeat_record
, dan child_partitions_record
, jenis nilai yang terkait adalah JSON
.
Di setiap baris yang ditampilkan oleh fungsi baca aliran data perubahan, hanya
ada satu dari tiga kunci ini.
Bagian berikut memeriksa ketiga jenis data ini.
Catatan perubahan data
Kumpulan data perubahan data berisi kumpulan perubahan pada tabel dengan jenis modifikasi yang sama (menyisipkan, memperbarui, atau menghapus) yang dilakukan pada stempel waktu commit yang sama dalam satu partisi aliran perubahan untuk transaksi yang sama. Beberapa catatan perubahan data dapat ditampilkan untuk transaksi yang sama di beberapa partisi aliran perubahan.
Semua kumpulan data perubahan data memiliki kolom commit_timestamp
, server_transaction_id
,
dan record_sequence
, yang bersama-sama menentukan urutan dalam aliran
perubahan untuk catatan aliran. Ketiga kolom ini cukup untuk memperoleh
urutan perubahan dan memberikan konsistensi eksternal.
Perhatikan bahwa beberapa transaksi dapat memiliki stempel waktu commit yang sama jika
menangani data yang tidak tumpang-tindih. Kolom server_transaction_id
menawarkan kemampuan untuk membedakan kumpulan perubahan mana (kemungkinan
di seluruh partisi aliran perubahan) yang dikeluarkan dalam transaksi
yang sama. Dengan memasangkannya ke kolom record_sequence
dan
number_of_records_in_transaction
, Anda juga dapat melakukan buffering dan mengurutkan
semua data dari transaksi tertentu.
Kolom catatan perubahan data mencakup hal-hal berikut:
GoogleSQL
Kolom | Jenis | Deskripsi |
---|---|---|
commit_timestamp |
TIMESTAMP |
Stempel waktu saat perubahan di-commit. |
record_sequence |
STRING |
Nomor urut untuk data dalam transaksi. Nomor urut dijamin
unik dan meningkat secara monoton (tetapi tidak harus berdekatan) dalam transaksi. Urutkan data untuk
server_transaction_id yang sama menurut record_sequence untuk
merekonstruksi urutan perubahan dalam transaksi. |
server_transaction_id |
STRING |
String unik global yang mewakili transaksi tempat perubahan di-commit. Nilai ini hanya boleh digunakan dalam konteks pemrosesan data aliran data perubahan dan tidak berkorelasi dengan ID transaksi di API Spanner. |
is_last_record_in_transaction_in_partition |
BOOL |
Menunjukkan apakah ini adalah data terakhir untuk transaksi di partisi saat ini. |
table_name |
STRING |
Nama tabel yang terpengaruh oleh perubahan tersebut. |
value_capture_type |
STRING |
Menjelaskan jenis pengambilan nilai yang ditentukan dalam konfigurasi aliran data perubahan saat perubahan ini ditangkap. Jenis pengambilan nilai dapat berupa |
column_types |
ARRAY<STRUCT< |
Nama kolom, jenis kolom, apakah itu kunci utama, dan posisi kolom seperti yang ditentukan dalam skema (`ordinal_position`). Kolom pertama tabel dalam skema akan memiliki posisi ordinal `1`. Jenis kolom dapat disarangkan untuk kolom array. Formatnya cocok dengan struktur jenis yang dijelaskan dalam referensi Spanner API. |
mods |
ARRAY<STRUCT< |
Menjelaskan perubahan yang dibuat, termasuk nilai kunci utama, nilai lama, dan nilai baru dari kolom yang diubah atau dilacak.
Ketersediaan dan konten nilai lama dan baru akan bergantung pada value_capture_type yang dikonfigurasi. Kolom new_values dan old_values hanya berisi kolom non-kunci. |
mod_type |
STRING |
Menjelaskan jenis perubahan. Salah satu dari INSERT , UPDATE , atau
DELETE . |
number_of_records_in_transaction |
INT64 |
Jumlah catatan perubahan data yang merupakan bagian dari transaksi ini di semua partisi aliran perubahan. |
number_of_partitions_in_transaction |
INT64 |
Jumlah partisi yang akan menampilkan catatan perubahan data untuk transaksi ini. |
transaction_tag |
STRING |
Tag transaksi yang terkait dengan transaksi ini. |
is_system_transaction |
BOOL |
Menunjukkan apakah transaksi merupakan transaksi sistem. |
PostgreSQL
Kolom | Jenis | Deskripsi |
---|---|---|
commit_timestamp |
STRING |
Stempel waktu saat perubahan di-commit. |
record_sequence |
STRING |
Nomor urut untuk data dalam transaksi. Nomor urut dijamin unik dan meningkat secara monoton (tetapi tidak harus berdekatan) dalam transaksi. Urutkan data untuk `server_transaction_id` yang sama berdasarkan `record_sequence` untuk merekonstruksi urutan perubahan dalam transaksi. |
server_transaction_id |
STRING |
String unik global yang mewakili transaksi tempat perubahan di-commit. Nilai ini hanya boleh digunakan dalam konteks pemrosesan data aliran data perubahan dan tidak berkorelasi dengan ID transaksi di API Spanner |
is_last_record_in_transaction_in_partition |
BOOLEAN |
Menunjukkan apakah ini adalah data terakhir untuk transaksi di partisi saat ini. |
table_name |
STRING |
Nama tabel yang terpengaruh oleh perubahan tersebut. |
value_capture_type |
STRING |
Menjelaskan jenis pengambilan nilai yang ditentukan dalam konfigurasi aliran data perubahan saat perubahan ini ditangkap. Jenis pengambilan nilai dapat berupa |
column_types |
[ { "name": <STRING>, "type": { "code": <STRING> }, "is_primary_key": <BOOLEAN>, "ordinal_position": <NUMBER> }, ... ] |
Nama kolom, jenis kolom, apakah itu kunci utama, dan posisi kolom seperti yang ditentukan dalam skema (`ordinal_position`). Kolom pertama tabel dalam skema akan memiliki posisi ordinal `1`. Jenis kolom dapat disarangkan untuk kolom array. Formatnya cocok dengan struktur jenis yang dijelaskan dalam referensi Spanner API. |
mods |
[ { "keys": {<STRING> : <STRING>}, "new_values": { <STRING> : <VALUE-TYPE>, [...] }, "old_values": { <STRING> : <VALUE-TYPE>, [...] }, }, [...] ] |
Menjelaskan perubahan yang dilakukan, termasuk nilai kunci utama, nilai lama, dan nilai baru dari kolom yang diubah atau dilacak. Ketersediaan dan konten nilai lama dan baru akan bergantung pada value_capture_type yang dikonfigurasi. Kolom new_values dan
old_values hanya berisi kolom non-kunci.
|
mod_type |
STRING |
Menjelaskan jenis perubahan. Salah satu dari INSERT , UPDATE , atau
DELETE . |
number_of_records_in_transaction |
INT64 |
Jumlah catatan perubahan data yang merupakan bagian dari transaksi ini di semua partisi aliran perubahan. |
number_of_partitions_in_transaction |
NUMBER |
Jumlah partisi yang akan menampilkan catatan perubahan data untuk transaksi ini. |
transaction_tag |
STRING |
Tag transaksi yang terkait dengan transaksi ini. |
is_system_transaction |
BOOLEAN |
Menunjukkan apakah transaksi merupakan transaksi sistem. |
Sepasang contoh catatan perubahan data mengikuti. Contoh tersebut menjelaskan satu transaksi di mana ada transfer antara dua akun. Perhatikan bahwa kedua akun tersebut berada dalam partisi aliran perubahan yang terpisah.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 1000
},
"old_values": {
"LastUpdate": "2022-09-26T11:28:00.189413Z",
"Balance": 1500
},
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "OLD_AND_NEW_VALUES",
"number_of_records_in_transaction": 2,
"number_of_partitions_in_transaction": 2,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false,
}
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
"record_sequence": "00000001",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id2"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 2000
},
"old_values": {
"LastUpdate": "2022-01-20T11:25:00.199915Z",
"Balance": 1500
},
},
...
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "OLD_AND_NEW_VALUES",
"number_of_records_in_transaction": 2,
"number_of_partitions_in_transaction": 2,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false,
}
Kumpulan data perubahan data berikut adalah contoh data dengan jenis pengambilan nilai "NEW_VALUES"
. Perhatikan bahwa hanya nilai baru yang diisi.
Hanya kolom "LastUpdate"
yang diubah, sehingga hanya kolom tersebut yang ditampilkan.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z"
},
"old_values": {}
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "NEW_VALUES",
"number_of_records_in_transaction": 1,
"number_of_partitions_in_transaction": 1,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false
}
Kumpulan data perubahan data berikut adalah contoh data dengan jenis pengambilan nilai "NEW_ROW"
. Hanya kolom "LastUpdate"
yang diubah, tetapi semua kolom yang dilacak ditampilkan.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 1000
},
"old_values": {}
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "NEW_ROW",
"number_of_records_in_transaction": 1,
"number_of_partitions_in_transaction": 1,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false
}
Kumpulan data perubahan data berikut adalah contoh data dengan jenis pengambilan nilai "NEW_ROW_AND_OLD_VALUES"
. Hanya kolom "LastUpdate"
yang diubah, tetapi semua kolom yang dilacak ditampilkan. Jenis pengambilan nilai ini
menangkap nilai baru dan nilai lama LastUpdate
.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 1000
},
"old_values": {
"LastUpdate": "2022-09-26T11:28:00.189413Z"
}
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "NEW_ROW_AND_OLD_VALUES",
"number_of_records_in_transaction": 1,
"number_of_partitions_in_transaction": 1,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false
}
Catatan Heartbeat
Saat kumpulan data detak ditampilkan, hal ini menunjukkan bahwa semua perubahan dengan
commit_timestamp
kurang dari atau sama dengan timestamp
catatan detak jantung telah ditampilkan, dan kumpulan data mendatang dalam
partisi ini harus memiliki stempel waktu commit yang lebih tinggi daripada yang ditampilkan oleh
catatan detak jantung. Catatan detak jantung ditampilkan jika tidak ada perubahan
data yang ditulis pada partisi. Jika ada perubahan data yang ditulis ke
partisi, data_change_record.commit_timestamp
dapat digunakan sebagai pengganti
heartbeat_record.timestamp
untuk memberi tahu bahwa pembaca membuat progres
dalam membaca partisi.
Anda dapat menggunakan kumpulan data detak yang ditampilkan di partisi untuk menyinkronkan
pembaca di semua partisi. Setelah semua pembaca menerima
detak jantung yang lebih besar dari atau sama dengan beberapa stempel waktu A
atau telah menerima kumpulan data partisi atau data
yang lebih besar dari atau sama dengan stempel waktu A
, pembaca akan tahu bahwa mereka telah menerima
semua kumpulan data yang di-commit pada atau sebelum stempel waktu tersebut A
dan dapat mulai
memproses kumpulan data yang di-buffer—misalnya, mengurutkan kumpulan data partisi lintas
berdasarkan stempel waktu dan mengelompokkannya menurut server_transaction_id
.
Catatan detak jantung hanya berisi satu bidang:
GoogleSQL
Kolom | Jenis | Deskripsi |
---|---|---|
timestamp |
TIMESTAMP |
Stempel waktu catatan detak jantung. |
PostgreSQL
Kolom | Jenis | Deskripsi |
---|---|---|
timestamp |
STRING |
Stempel waktu catatan detak jantung. |
Contoh data heartbeat yang menyampaikan bahwa semua data dengan stempel waktu yang kurang atau sama dari stempel waktu data ini telah ditampilkan:
heartbeat_record: {
"timestamp": "2022-09-27T12:35:00.312486Z"
}
Data partisi turunan
Kumpulan data partisi turunan menampilkan informasi tentang partisi turunan: token partisinya, token partisi induknya, dan
start_timestamp
yang mewakili stempel waktu paling awal dari partisi
turunan yang memuat data perubahan. Kumpulan data yang stempel waktu commitnya
segera ditampilkan sebelum child_partitions_record.start_timestamp
ditampilkan di partisi saat ini. Setelah menampilkan semua
data partisi turunan untuk partisi ini, kueri ini akan ditampilkan dengan
status berhasil, yang menunjukkan semua kumpulan data telah ditampilkan untuk
partisi ini.
Kolom data partisi turunan meliputi:
GoogleSQL
Kolom | Jenis | Deskripsi |
---|---|---|
start_timestamp |
TIMESTAMP |
Kumpulan data perubahan data yang ditampilkan dari partisi turunan dalam kumpulan data partisi turunan ini memiliki stempel waktu commit lebih besar atau sama dengan start_timestamp . Saat membuat kueri untuk partisi turunan, kueri tersebut harus menentukan token partisi turunan dan start_timestamp yang lebih besar dari atau sama dengan child_partitions_token.start_timestamp . Semua kumpulan data partisi turunan yang ditampilkan oleh sebuah partisi memiliki start_timestamp yang sama, dan stempel waktu selalu berada di antara start_timestamp dan end_timestamp yang ditentukan oleh kueri. |
record_sequence |
STRING |
Nomor urut yang meningkat secara monoton, yang dapat digunakan untuk menentukan pengurutan kumpulan data partisi turunan, jika ada beberapa kumpulan data partisi turunan yang ditampilkan dengan start_timestamp yang sama dalam partisi tertentu. Token partisi, start_timestamp dan record_sequence secara unik mengidentifikasi kumpulan data partisi turunan. |
child_partitions |
ARRAY<STRUCT< |
Menampilkan kumpulan partisi turunan dan informasinya yang terkait. Ini termasuk string token partisi yang digunakan untuk mengidentifikasi partisi turunan dalam kueri, serta token dari partisi induknya. |
PostgreSQL
Kolom | Jenis | Deskripsi |
---|---|---|
start_timestamp |
STRING |
Kumpulan data perubahan data yang ditampilkan dari partisi
turunan dalam kumpulan data partisi turunan ini memiliki stempel waktu commit
yang lebih besar dari atau sama dengan start_timestamp . Saat membuat kueri partisi
turunan, kueri tersebut harus menentukan token partisi turunan dan
start_timestamp yang lebih besar dari atau sama dengan
child_partitions_token.start_timestamp . Semua kumpulan data partisi turunan yang ditampilkan oleh partisi memiliki start_timestamp yang sama dan stempel waktu selalu berada di antara start_timestamp dan end_timestamp yang ditentukan oleh kueri.
|
record_sequence |
STRING |
Nomor urut yang meningkat secara monoton, yang dapat digunakan untuk menentukan pengurutan kumpulan data partisi turunan, jika ada beberapa kumpulan data partisi turunan yang ditampilkan dengan start_timestamp yang sama dalam partisi tertentu. Token partisi, start_timestamp dan record_sequence secara unik mengidentifikasi kumpulan data partisi turunan. |
child_partitions |
[ { "token": <STRING>, "parent_partition_tokens": [<STRING>], }, [...] ] |
Menampilkan array partisi turunan dan informasi yang terkait. Ini termasuk string token partisi yang digunakan untuk mengidentifikasi partisi turunan dalam kueri, serta token dari partisi induknya. |
Berikut adalah contoh data partisi turunan:
child_partitions_record: {
"start_timestamp": "2022-09-27T12:40:00.562986Z",
"record_sequence": "00000001",
"child_partitions": [
{
"token": "child_token_1",
// To make sure changes for a key is processed in timestamp
// order, wait until the records returned from all parents
// have been processed.
"parent_partition_tokens": ["parent_token_1", "parent_token_2"]
}
],
}
Mengubah alur kerja kueri streaming
Jalankan kueri aliran data perubahan menggunakan ExecuteStreamingSql
API, dengan transaksi hanya baca sekali pakai dan terikat stempel waktu yang kuat. Fungsi
baca aliran perubahan memungkinkan Anda menentukan start_timestamp
dan
end_timestamp
untuk rentang waktu yang diinginkan. Semua kumpulan data perubahan
dalam periode retensi dapat diakses menggunakan batas stempel waktu
hanya baca yang kuat.
Semua
TransactionOptions
lainnya tidak valid untuk kueri aliran data perubahan. Selain itu, jika TransactionOptions.read_only.return_read_timestamp
ditetapkan ke true, nilai khusus kint64max - 1
akan ditampilkan dalam pesan Transaction
yang menjelaskan transaksi, bukan stempel waktu baca yang valid. Nilai khusus ini harus dihapus dan tidak digunakan untuk
kueri berikutnya.
Setiap kueri aliran data perubahan dapat menampilkan sejumlah baris, yang masing-masing berisi catatan perubahan data, catatan detak jantung, atau catatan partisi turunan. Anda tidak perlu menetapkan batas waktu untuk permintaan.
Contoh:
Alur kerja kueri streaming dimulai dengan mengeluarkan kueri aliran data perubahan pertama dengan menentukan partition_token
ke NULL
. Kueri perlu menentukan
fungsi baca untuk aliran data perubahan, stempel waktu awal dan akhir yang diinginkan, serta
interval detak jantung. Jika end_timestamp
adalah NULL
, kueri akan terus
menampilkan perubahan data sampai partisi berakhir.
GoogleSQL
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:00Z",
end_timestamp => NULL,
partition_token => NULL,
heartbeat_milliseconds => 10000
);
PostgreSQL
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:00Z',
NULL,
NULL,
10000,
NULL
) ;
Memproses kumpulan data dari kueri ini hingga data partisi turunan ditampilkan. Pada contoh di bawah, dua data partisi turunan dan tiga token partisi ditampilkan, lalu kueri dihentikan. Data partisi turunan dari kueri tertentu selalu memiliki start_timestamp
yang sama.
child_partitions_record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01T09:00:01Z",
"record_sequence": 1000012389,
"child_partitions": [
{
"token": "child_token_1",
// Note parent tokens are null for child partitions returned
// from the initial change stream queries.
"parent_partition_tokens": [NULL]
}
{
"token": "child_token_2",
"parent_partition_tokens": [NULL]
}
],
}
child partitions record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01T09:00:01Z",
"record_sequence": 1000012390,
"child_partitions": [
{
"token": "child_token_3",
"parent_partition_tokens": [NULL]
}
],
}
Untuk memproses perubahan di masa mendatang setelah 2022-05-01T09:00:01Z
, buat tiga kueri baru dan jalankan secara paralel. Ketiga kueri tersebut bersama-sama menampilkan perubahan data
di masa mendatang untuk rentang kunci yang sama yang dicakup oleh induknya. Selalu tetapkan
start_timestamp
ke start_timestamp
dalam data partisi turunan yang sama, dan
gunakan end_timestamp
serta interval detak jantung yang sama untuk memproses data
secara konsisten di semua kueri.
GoogleSQL
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:01Z",
end_timestamp => NULL,
partition_token => "child_token_1",
heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:01Z",
end_timestamp => NULL,
partition_token => "child_token_2",
heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:01Z",
end_timestamp => NULL,
partition_token => "child_token_3",
heartbeat_milliseconds => 10000
);
PostgreSQL
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:01Z',
NULL,
'child_token_1',
10000,
NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:01Z',
NULL,
'child_token_2',
10000,
NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:01Z',
NULL,
'child_token_3',
10000,
NULL
);
Setelah beberapa saat, kueri pada child_token_2
selesai setelah menampilkan
data partisi turunan lainnya. Data ini menunjukkan bahwa partisi baru akan
mencakup perubahan mendatang untuk child_token_2
dan child_token_3
mulai
2022-05-01T09:30:15Z
. Data yang sama persis akan ditampilkan oleh kueri pada child_token_3
, karena keduanya merupakan partisi induk dari child_token_4
yang baru.
Untuk menjamin pemrosesan data yang berurutan secara ketat untuk kunci tertentu, kueri pada child_token_4
hanya boleh dimulai setelah semua induk selesai, yang dalam hal ini adalah child_token_2
dan child_token_3
. Hanya buat satu kueri
untuk setiap token partisi turunan, desain alur kerja kueri harus menunjuk satu
induk untuk menunggu dan menjadwalkan kueri di child_token_4
.
child partitions record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01T09:30:15Z",
"record_sequence": 1000012389,
"child_partitions": [
{
"token": "child_token_4",
"parent_partition_tokens": [child_token_2, child_token_3],
}
],
}
GoogleSQL
SELECT ChangeRecord FROM READ_SingersNameStream(
start_timestamp => "2022-05-01T09:30:15Z",
end_timestamp => NULL,
partition_token => "child_token_4",
heartbeat_milliseconds => 10000
);
PostgreSQL
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:30:15Z',
NULL,
'child_token_4',
10000,
NULL
);
Temukan contoh penanganan dan penguraian data aliran data perubahan di konektor Dataflow Apache Beam SpannerIO di GitHub.