Mengubah partisi, kumpulan data, dan kueri streaming

Halaman ini menjelaskan atribut aliran perubahan berikut secara mendetail:

  • Model partisi berbasis pemisahan
  • Format dan konten kumpulan data aliran perubahan
  • Sintaksis tingkat rendah yang digunakan untuk membuat kueri kumpulan data tersebut
  • Contoh alur kerja kueri

Informasi di halaman ini paling relevan untuk menggunakan Spanner API guna mengkueri aliran perubahan secara langsung. Aplikasi yang menggunakan Dataflow untuk membaca data stream perubahan tidak perlu bekerja langsung dengan model data yang dijelaskan di sini.

Untuk panduan pengantar yang lebih luas tentang aliran perubahan, lihat Ringkasan aliran perubahan.

Mengubah partisi aliran data

Saat perubahan terjadi pada tabel yang dipantau oleh aliran perubahan, Spanner akan menulis data aliran perubahan yang sesuai di database, secara sinkron dalam transaksi yang sama dengan perubahan data. Hal ini memastikan bahwa jika transaksi berhasil, Spanner juga berhasil merekam dan mempertahankan perubahan. Secara internal, Spanner menempatkan bersama data aliran perubahan dan perubahan data sehingga diproses oleh server yang sama untuk meminimalkan overhead operasi tulis.

Sebagai bagian dari DML ke bagian tertentu, Spanner menambahkan operasi tulis ke bagian data streaming perubahan yang sesuai dalam transaksi yang sama. Karena colocation ini, aliran perubahan tidak menambahkan koordinasi tambahan di seluruh resource penayangan, yang meminimalkan overhead commit transaksi.

gambar

Spanner diskalakan dengan membagi dan menggabungkan data secara dinamis berdasarkan beban dan ukuran database, serta mendistribusikan bagian di seluruh resource penayangan.

Untuk memungkinkan penulisan dan pembacaan aliran perubahan diskalakan, Spanner membagi dan menggabungkan penyimpanan aliran perubahan internal bersama dengan data database, sehingga secara otomatis menghindari hotspot. Untuk mendukung pembacaan data aliran data perubahan dalam hampir real time saat penulisan database diskalakan, Spanner API dirancang agar aliran data perubahan dikueri secara serentak menggunakan partisi aliran data perubahan. Partisi aliran perubahan dipetakan untuk mengubah pemisahan data aliran perubahan yang berisi kumpulan data aliran perubahan. Partisi aliran perubahan berubah secara dinamis dari waktu ke waktu dan berkorelasi dengan cara Spanner membagi dan menggabungkan data database secara dinamis.

Partisi aliran perubahan berisi kumpulan data untuk rentang kunci yang tidak dapat diubah selama rentang waktu tertentu. Setiap partisi aliran perubahan dapat dibagi menjadi satu atau beberapa partisi aliran perubahan, atau digabungkan dengan partisi aliran perubahan lainnya. Saat peristiwa pemisahan atau penggabungan ini terjadi, partisi turunan dibuat untuk merekam perubahan untuk rentang kunci masing-masing yang tidak dapat diubah untuk rentang waktu berikutnya. Selain data perubahan, kueri aliran perubahan menampilkan data partisi turunan untuk memberi tahu pembaca tentang partisi aliran perubahan baru yang perlu dikueri, serta data heartbeat untuk menunjukkan progres maju jika tidak ada operasi tulis yang terjadi baru-baru ini.

Saat membuat kueri partisi aliran perubahan tertentu, kumpulan data perubahan akan ditampilkan dalam urutan stempel waktu commit. Setiap data perubahan ditampilkan tepat sekali. Di seluruh partisi aliran perubahan, tidak ada jaminan urutan data perubahan. Data perubahan untuk kunci utama tertentu hanya ditampilkan di satu partisi untuk rentang waktu tertentu.

Karena garis keturunan partisi induk-turunan, untuk memproses perubahan pada kunci tertentu dalam urutan stempel waktu commit, data yang ditampilkan dari partisi turunan hanya boleh diproses setelah data dari semua partisi induk diproses.

Fungsi baca streaming perubahan dan sintaksis kueri

GoogleSQL

Anda membuat kueri aliran perubahan menggunakan API ExecuteStreamingSql. Spanner otomatis membuat fungsi baca khusus bersama dengan streaming perubahan. Fungsi baca memberikan akses ke data stream perubahan. Konvensi penamaan fungsi baca adalah READ_change_stream_name.

Dengan asumsi bahwa aliran perubahan SingersNameStream ada di database, sintaksis kueri untuk GoogleSQL adalah sebagai berikut:

SELECT ChangeRecord
FROM READ_SingersNameStream (
    start_timestamp,
    end_timestamp,
    partition_token,
    heartbeat_milliseconds,
    read_options
)

Fungsi baca menerima argumen berikut:

Nama argumen Jenis Wajib? Deskripsi
start_timestamp TIMESTAMP Wajib Menentukan bahwa data dengan commit_timestamp lebih besar dari atau sama dengan start_timestamp harus ditampilkan. Nilai harus berada dalam periode retensi streaming perubahan, dan harus kurang dari atau sama dengan waktu saat ini, serta lebih besar dari atau sama dengan stempel waktu pembuatan streaming perubahan.
end_timestamp TIMESTAMP Opsional (Default: NULL) Menentukan bahwa data dengan commit_timestamp kurang dari atau sama dengan end_timestamp harus ditampilkan. Nilai harus berada dalam periode retensi stream perubahan dan lebih besar atau sama dengan start_timestamp. Kueri selesai setelah menampilkan semua ChangeRecords hingga end_timestamp atau pengguna menghentikan koneksi. Jika NULL atau tidak ditentukan, kueri akan dieksekusi hingga semua ChangeRecords ditampilkan atau pengguna memutuskan koneksi.
partition_token STRING Opsional (Default: NULL) Menentukan partisi aliran perubahan yang akan dikueri, berdasarkan konten kumpulan data partisi turunan. Jika NULL atau tidak ditentukan, artinya pembaca membuat kueri aliran perubahan untuk pertama kalinya, dan belum mendapatkan token partisi tertentu untuk membuat kueri.
heartbeat_milliseconds INT64 Wajib Menentukan seberapa sering ChangeRecord heartbeat ditampilkan jika tidak ada transaksi yang dilakukan di partisi ini. Nilai harus antara 1,000 (satu detik) dan 300,000 (lima menit).
read_options ARRAY Opsional (Default: NULL) Opsi baca tambahan yang dicadangkan untuk penggunaan di masa mendatang. Saat ini, satu-satunya nilai yang diizinkan adalah NULL.

Sebaiknya buat metode praktis untuk membuat teks kueri fungsi baca dan parameter binding ke dalamnya, seperti yang ditunjukkan dalam contoh berikut.

Java

private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE =
        "SELECT ChangeRecord FROM READ_SingersNameStream"
            + "("
            + "   start_timestamp => @startTimestamp,"
            + "   end_timestamp => @endTimestamp,"
            + "   partition_token => @partitionToken,"
            + "   heartbeat_milliseconds => @heartbeatMillis"
            + ")";

// Helper method to conveniently create change stream query texts and bind parameters.
public static Statement getChangeStreamQuery(
      String partitionToken,
      Timestamp startTimestamp,
      Timestamp endTimestamp,
      long heartbeatMillis) {
  return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE)
                    .bind("startTimestamp")
                    .to(startTimestamp)
                    .bind("endTimestamp")
                    .to(endTimestamp)
                    .bind("partitionToken")
                    .to(partitionToken)
                    .bind("heartbeatMillis")
                    .to(heartbeatMillis)
                    .build();
}

PostgreSQL

Anda membuat kueri aliran perubahan menggunakan ExecuteStreamingSql API. Spanner otomatis membuat fungsi baca khusus bersama dengan streaming perubahan. Fungsi baca memberikan akses ke data stream perubahan. Konvensi penamaan fungsi baca adalah spanner.read_json_change_stream_name.

Dengan asumsi bahwa aliran perubahan SingersNameStream ada di database, sintaksis kueri untuk PostgreSQL adalah sebagai berikut:

SELECT *
FROM "spanner"."read_json_SingersNameStream" (
    start_timestamp,
    end_timestamp,
    partition_token,
    heartbeat_milliseconds,
    null
)

Fungsi baca menerima argumen berikut:

Nama argumen Jenis Wajib? Deskripsi
start_timestamp timestamp with time zone Wajib Menentukan bahwa data perubahan dengan commit_timestamp lebih besar dari atau sama dengan start_timestamp harus ditampilkan. Nilai harus berada dalam periode retensi streaming perubahan, dan harus kurang dari atau sama dengan waktu saat ini, serta lebih besar dari atau sama dengan stempel waktu pembuatan streaming perubahan.
end_timestamp timestamp with timezone Opsional (Default: NULL) Menentukan bahwa data perubahan dengan commit_timestamp kurang dari atau sama dengan end_timestamp harus ditampilkan. Nilai harus berada dalam periode retensi stream perubahan dan lebih besar atau sama dengan start_timestamp. Kueri selesai setelah menampilkan semua data perubahan hingga end_timestamp atau pengguna menghentikan koneksi. Jika NULL, kueri akan dieksekusi hingga semua data perubahan ditampilkan atau pengguna menghentikan koneksi.
partition_token text Opsional (Default: NULL) Menentukan partisi aliran perubahan yang akan dikueri, berdasarkan konten kumpulan data partisi turunan. Jika NULL atau tidak ditentukan, artinya pembaca membuat kueri aliran perubahan untuk pertama kalinya, dan belum mendapatkan token partisi tertentu untuk membuat kueri.
heartbeat_milliseconds bigint Wajib Menentukan seberapa sering ChangeRecord heartbeat akan ditampilkan jika tidak ada transaksi yang dilakukan di partisi ini. Nilai harus antara 1,000 (satu detik) dan 300,000 (lima menit).
null null Wajib Disimpan untuk penggunaan di masa mendatang

Sebaiknya buat metode praktis untuk mem-build teks fungsi baca dan parameter binding ke fungsi tersebut, seperti yang ditunjukkan dalam contoh berikut.

Java

private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE =
        "SELECT * FROM \"spanner\".\"read_json_SingersNameStream\""
            + "($1, $2, $3, $4, null)";

// Helper method to conveniently create change stream query texts and bind parameters.
public static Statement getChangeStreamQuery(
      String partitionToken,
      Timestamp startTimestamp,
      Timestamp endTimestamp,
      long heartbeatMillis) {

  return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE)
                    .bind("p1")
                    .to(startTimestamp)
                    .bind("p2")
                    .to(endTimestamp)
                    .bind("p3")
                    .to(partitionToken)
                    .bind("p4")
                    .to(heartbeatMillis)
                    .build();
}

Mengubah format kumpulan data aliran data perubahan

GoogleSQL

Fungsi baca aliran perubahan menampilkan satu kolom ChangeRecord dari jenis ARRAY<STRUCT<...>>. Di setiap baris, array ini selalu berisi satu elemen.

Elemen array memiliki jenis berikut:

STRUCT <
  data_change_record ARRAY<STRUCT<...>>,
  heartbeat_record ARRAY<STRUCT<...>>,
  child_partitions_record ARRAY<STRUCT<...>>
>

Ada tiga kolom dalam struct ini: data_change_record, heartbeat_record, dan child_partitions_record, masing-masing berjenis ARRAY<STRUCT<...>>. Di baris mana pun yang ditampilkan oleh fungsi baca aliran perubahan, hanya satu dari tiga kolom ini yang berisi nilai; dua kolom lainnya kosong atau NULL. Kolom array ini berisi maksimal satu elemen.

Bagian berikut akan memeriksa setiap jenis data tersebut.

PostgreSQL

Fungsi baca aliran perubahan menampilkan satu kolom ChangeRecord dari jenis JSON dengan struktur berikut:

{
  "data_change_record" : {},
  "heartbeat_record" : {},
  "child_partitions_record" : {}
}

Ada tiga kemungkinan kunci dalam objek ini: data_change_record, heartbeat_record, dan child_partitions_record, jenis nilai yang sesuai adalah JSON. Di baris mana pun yang ditampilkan oleh fungsi baca aliran perubahan, hanya salah satu dari tiga kunci ini yang ada.

Bagian berikut akan memeriksa setiap jenis data tersebut.

Data mengubah catatan

Data perubahan data berisi kumpulan perubahan pada tabel dengan jenis modifikasi yang sama (menyisipkan, memperbarui, atau menghapus) yang di-commit pada stempel waktu commit yang sama dalam satu partisi aliran perubahan untuk transaksi yang sama. Beberapa data perubahan data dapat ditampilkan untuk transaksi yang sama di beberapa partisi aliran perubahan.

Semua kumpulan data perubahan memiliki kolom commit_timestamp, server_transaction_id, dan record_sequence, yang bersama-sama menentukan urutan dalam aliran perubahan untuk kumpulan data aliran. Tiga kolom ini sudah cukup untuk mendapatkan urutan perubahan dan memberikan konsistensi eksternal.

Perhatikan bahwa beberapa transaksi dapat memiliki stempel waktu commit yang sama jika data yang disentuh tidak tumpang-tindih. Kolom server_transaction_id menawarkan kemampuan untuk membedakan kumpulan perubahan mana (yang mungkin di seluruh partisi aliran perubahan) yang dikeluarkan dalam transaksi yang sama. Dengan menyambungkan kolom record_sequence dan number_of_records_in_transaction, Anda juga dapat melakukan buffering dan mengurutkan semua data dari transaksi tertentu.

Kolom data perubahan data mencakup hal berikut:

GoogleSQL

Kolom Jenis Deskripsi
commit_timestamp TIMESTAMP Stempel waktu saat perubahan dilakukan.
record_sequence STRING Nomor urut untuk data dalam transaksi. Nomor urutan dijamin unik dan meningkat secara monoton (tetapi tidak harus berurutan) dalam transaksi. Urutkan data untuk server_transaction_id yang sama menurut record_sequence untuk merekonstruksi pengurutan perubahan dalam transaksi. Pengurutan ini mungkin dioptimalkan oleh Spanner untuk performa yang lebih baik dan mungkin tidak selalu cocok dengan pengurutan asli yang diberikan pengguna.
server_transaction_id STRING String unik secara global yang mewakili transaksi tempat perubahan dilakukan. Nilai ini hanya boleh digunakan dalam konteks pemrosesan data aliran perubahan dan tidak dikorelasi dengan ID transaksi di API Spanner.
is_last_record_in_transaction_in_partition BOOL Menunjukkan apakah ini adalah data terakhir untuk transaksi di partisi saat ini.
table_name STRING Nama tabel yang terpengaruh oleh perubahan tersebut.
value_capture_type STRING

Menjelaskan jenis pengambilan nilai yang ditentukan dalam konfigurasi aliran perubahan saat perubahan ini diambil.

Jenis pengambilan nilai dapat berupa "OLD_AND_NEW_VALUES", "NEW_ROW", "NEW_VALUES", atau "NEW_ROW_AND_OLD_VALUES". Secara default, nilainya adalah "OLD_AND_NEW_VALUES". Untuk mengetahui informasi selengkapnya, lihat jenis pengambilan nilai.

column_types ARRAY<STRUCT<
name STRING,
 type JSON,
 is_primary_key BOOL,
 ordinal_position INT64
>>
Nama kolom, jenis kolom, apakah itu kunci utama, dan posisi kolom seperti yang ditentukan dalam skema (`ordinal_position`). Kolom pertama tabel dalam skema akan memiliki posisi ordinal `1`. Jenis kolom dapat disusun bertingkat untuk kolom array. Formatnya cocok dengan struktur jenis yang dijelaskan dalam referensi Spanner API.
mods ARRAY<STRUCT<
keys JSON,
 new_values JSON,
 old_values JSON
>>
Menjelaskan perubahan yang dilakukan, termasuk nilai kunci utama, nilai lama, dan nilai baru dari kolom yang diubah atau dilacak. Ketersediaan dan konten nilai lama dan baru akan bergantung pada value_capture_type yang dikonfigurasi. Kolom new_values dan old_values hanya berisi kolom non-kunci.
mod_type STRING Menjelaskan jenis perubahan. Salah satu dari INSERT, UPDATE, atau DELETE.
number_of_records_in_transaction INT64 Jumlah data perubahan data yang merupakan bagian dari transaksi ini di semua partisi aliran perubahan.
number_of_partitions_in_transaction INT64 Jumlah partisi yang akan menampilkan data perubahan data untuk transaksi ini.
transaction_tag STRING Tag transaksi yang terkait dengan transaksi ini.
is_system_transaction BOOL Menunjukkan apakah transaksi merupakan transaksi sistem.

PostgreSQL

Kolom Jenis Deskripsi
commit_timestamp STRING Stempel waktu saat perubahan dilakukan.
record_sequence STRING Nomor urut untuk data dalam transaksi. Nomor urutan dijamin unik dan meningkat secara monoton (tetapi tidak harus berurutan) dalam transaksi. Urutkan data untuk `server_transaction_id` yang sama menurut `record_sequence` untuk merekonstruksi pengurutan perubahan dalam transaksi.
server_transaction_id STRING String unik secara global yang mewakili transaksi tempat perubahan dilakukan. Nilai hanya boleh digunakan dalam konteks pemrosesan data aliran perubahan dan tidak dikorelasi dengan ID transaksi di API Spanner
is_last_record_in_transaction_in_partition BOOLEAN Menunjukkan apakah ini adalah data terakhir untuk transaksi di partisi saat ini.
table_name STRING Nama tabel yang terpengaruh oleh perubahan tersebut.
value_capture_type STRING

Menjelaskan jenis pengambilan nilai yang ditentukan dalam konfigurasi aliran perubahan saat perubahan ini diambil.

Jenis pengambilan nilai dapat berupa "OLD_AND_NEW_VALUES", "NEW_ROW", "NEW_VALUES", atau "NEW_ROW_AND_OLD_VALUES". Secara default, nilainya adalah "OLD_AND_NEW_VALUES". Untuk mengetahui informasi selengkapnya, lihat jenis pengambilan nilai.

column_types
[
  {
      "name": <STRING>,
      "type": {
        "code": <STRING>
      },
      "is_primary_key": <BOOLEAN>,
      "ordinal_position": <NUMBER>
    },
    ...
]
Nama kolom, jenis kolom, apakah itu kunci utama, dan posisi kolom seperti yang ditentukan dalam skema (`ordinal_position`). Kolom pertama tabel dalam skema akan memiliki posisi ordinal `1`. Jenis kolom dapat disusun bertingkat untuk kolom array. Formatnya cocok dengan struktur jenis yang dijelaskan dalam referensi Spanner API.
mods
[
  {
    "keys": {<STRING> : <STRING>},
    "new_values": {
      <STRING> : <VALUE-TYPE>,
      [...]
    },
    "old_values": {
      <STRING> : <VALUE-TYPE>,
      [...]
    },
  },
  [...]
]
Menjelaskan perubahan yang dilakukan, termasuk nilai kunci utama, nilai lama, dan nilai baru dari kolom yang diubah atau dilacak. Ketersediaan dan konten nilai lama dan baru akan bergantung pada value_capture_type yang dikonfigurasi. Kolom new_values dan old_values hanya berisi kolom non-kunci.
mod_type STRING Menjelaskan jenis perubahan. Salah satu dari INSERT, UPDATE, atau DELETE.
number_of_records_in_transaction INT64 Jumlah data perubahan data yang merupakan bagian dari transaksi ini di semua partisi aliran perubahan.
number_of_partitions_in_transaction NUMBER Jumlah partisi yang akan menampilkan data perubahan data untuk transaksi ini.
transaction_tag STRING Tag transaksi yang terkait dengan transaksi ini.
is_system_transaction BOOLEAN Menunjukkan apakah transaksi merupakan transaksi sistem.

Berikut adalah contoh sepasang data perubahan data. Kedua transaksi tersebut menjelaskan satu transaksi dengan adanya transfer antara dua akun. Perhatikan bahwa kedua akun tersebut berada di partisi aliran perubahan terpisah.

"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  // record_sequence is unique and monotonically increasing within a
  // transaction, across all partitions.
  "record_sequence": "00000000",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
       "name": "Balance",
       "type": {"code": "INT"},
       "is_primary_key": false,
       "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id1"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 1000
      },
      "old_values": {
        "LastUpdate": "2022-09-26T11:28:00.189413Z",
        "Balance": 1500
      },
    }
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "OLD_AND_NEW_VALUES",
  "number_of_records_in_transaction": 2,
  "number_of_partitions_in_transaction": 2,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false,
}
"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  "record_sequence": "00000001",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
      "name": "Balance",
      "type": {"code": "INT"},
      "is_primary_key": false,
      "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id2"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 2000
      },
      "old_values": {
        "LastUpdate": "2022-01-20T11:25:00.199915Z",
        "Balance": 1500
      },
    },
    ...
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "OLD_AND_NEW_VALUES",
  "number_of_records_in_transaction": 2,
  "number_of_partitions_in_transaction": 2,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false,
}

Kumpulan data perubahan data berikut adalah contoh kumpulan data dengan jenis pengambilan nilai "NEW_VALUES". Perhatikan bahwa hanya nilai baru yang diisi. Hanya kolom "LastUpdate" yang diubah, sehingga hanya kolom tersebut yang ditampilkan.

"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  // record_sequence is unique and monotonically increasing within a
  // transaction, across all partitions.
  "record_sequence": "00000000",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,
  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id1"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z"
      },
      "old_values": {}
    }
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "NEW_VALUES",
  "number_of_records_in_transaction": 1,
  "number_of_partitions_in_transaction": 1,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false
}

Kumpulan data perubahan data berikut adalah contoh kumpulan data dengan jenis pengambilan nilai "NEW_ROW". Hanya kolom "LastUpdate" yang diubah, tetapi semua kolom yang dilacak ditampilkan.

"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  // record_sequence is unique and monotonically increasing within a
  // transaction, across all partitions.
  "record_sequence": "00000000",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
       "name": "Balance",
       "type": {"code": "INT"},
       "is_primary_key": false,
       "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id1"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 1000
      },
      "old_values": {}
    }
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "NEW_ROW",
  "number_of_records_in_transaction": 1,
  "number_of_partitions_in_transaction": 1,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false
}

Kumpulan data perubahan data berikut adalah contoh kumpulan data dengan jenis pengambilan nilai "NEW_ROW_AND_OLD_VALUES". Hanya kolom "LastUpdate" yang diubah, tetapi semua kolom yang dilacak ditampilkan. Jenis pengambilan nilai ini akan mengambil nilai baru dan nilai lama LastUpdate.

"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  // record_sequence is unique and monotonically increasing within a
  // transaction, across all partitions.
  "record_sequence": "00000000",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
       "name": "Balance",
       "type": {"code": "INT"},
       "is_primary_key": false,
       "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id1"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 1000
      },
      "old_values": {
        "LastUpdate": "2022-09-26T11:28:00.189413Z"
      }
    }
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "NEW_ROW_AND_OLD_VALUES",
  "number_of_records_in_transaction": 1,
  "number_of_partitions_in_transaction": 1,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false
}

Data heartbeat

Jika kumpulan data heartbeat ditampilkan, hal ini menunjukkan bahwa semua perubahan dengan commit_timestamp kurang dari atau sama dengan timestamp kumpulan data heartbeat telah ditampilkan, dan kumpulan data mendatang di partisi ini harus memiliki stempel waktu commit yang lebih tinggi daripada yang ditampilkan oleh kumpulan data heartbeat. Data heartbeat ditampilkan saat tidak ada perubahan data yang ditulis ke partisi. Jika ada perubahan data yang ditulis ke partisi, data_change_record.commit_timestamp dapat digunakan sebagai pengganti heartbeat_record.timestamp untuk memberi tahu bahwa pembaca membuat progres maju dalam membaca partisi.

Anda dapat menggunakan data heartbeat yang ditampilkan di partisi untuk menyinkronkan pembaca di semua partisi. Setelah semua pembaca menerima heartbeat yang lebih besar dari atau sama dengan beberapa stempel waktu A atau telah menerima data atau data partisi turunan yang lebih besar dari atau sama dengan stempel waktu A, pembaca tahu bahwa mereka telah menerima semua data yang di-commit pada atau sebelum stempel waktu A tersebut dan dapat mulai memproses data yang di-buffer—misalnya, mengurutkan data lintas-partisi menurut stempel waktu dan mengelompokkan data menurut server_transaction_id.

Data heartbeat hanya berisi satu kolom:

GoogleSQL

Kolom Jenis Deskripsi
timestamp TIMESTAMP Stempel waktu data heartbeat.

PostgreSQL

Kolom Jenis Deskripsi
timestamp STRING Stempel waktu data heartbeat.

Contoh data heartbeat, yang menyampaikan bahwa semua data dengan stempel waktu yang kurang dari atau sama dengan stempel waktu data ini telah ditampilkan:

heartbeat_record: {
  "timestamp": "2022-09-27T12:35:00.312486Z"
}

Data partisi turunan

Data partisi turunan menampilkan informasi tentang partisi turunan: token partisi, token partisi induk, dan start_timestamp yang mewakili stempel waktu paling awal yang berisi data perubahan partisi turunan. Data yang stempel waktu commit-nya langsung sebelum child_partitions_record.start_timestamp ditampilkan di partisi saat ini. Setelah menampilkan semua data partisi turunan untuk partisi ini, kueri ini akan ditampilkan dengan status berhasil, yang menunjukkan bahwa semua data telah ditampilkan untuk partisi ini.

Kolom data partisi turunan mencakup hal berikut:

GoogleSQL

Kolom Jenis Deskripsi
start_timestamp TIMESTAMP Data perubahan yang ditampilkan dari partisi turunan dalam data perubahan partisi turunan ini memiliki stempel waktu commit yang lebih besar dari atau sama dengan start_timestamp. Saat membuat kueri partisi turunan, kueri harus menentukan token partisi turunan dan start_timestamp yang lebih besar dari atau sama dengan child_partitions_token.start_timestamp. Semua data partisi turunan yang ditampilkan oleh partisi memiliki start_timestamp yang sama dan stempel waktu selalu berada di antara start_timestamp dan end_timestamp kueri yang ditentukan.
record_sequence STRING Nomor urutan yang meningkat secara monoton yang dapat digunakan untuk menentukan pengurutan data partisi turunan jika ada beberapa data partisi turunan yang ditampilkan dengan start_timestamp yang sama di partisi tertentu. Token partisi, start_timestamp, dan record_sequence secara unik mengidentifikasi data partisi turunan.
child_partitions ARRAY<STRUCT<
token STRING,
parent_partition_tokens
ARRAY<STRING>
>>
Menampilkan kumpulan partisi turunan dan informasi terkaitnya. Ini mencakup string token partisi yang digunakan untuk mengidentifikasi partisi anak dalam kueri, serta token partisi induknya.

PostgreSQL

Kolom Jenis Deskripsi
start_timestamp STRING Data perubahan yang ditampilkan dari partisi turunan dalam data perubahan partisi turunan ini memiliki stempel waktu commit yang lebih besar dari atau sama dengan start_timestamp. Saat membuat kueri pada partisi turunan, kueri harus menentukan token partisi turunan dan start_timestamp yang lebih besar dari atau sama dengan child_partitions_token.start_timestamp. Semua data partisi turunan yang ditampilkan oleh partisi memiliki start_timestamp yang sama dan stempel waktu selalu berada di antara start_timestamp dan end_timestamp yang ditentukan kueri.
record_sequence STRING Nomor urutan yang meningkat secara monoton yang dapat digunakan untuk menentukan pengurutan data partisi turunan jika ada beberapa data partisi turunan yang ditampilkan dengan start_timestamp yang sama di partisi tertentu. Token partisi, start_timestamp, dan record_sequence secara unik mengidentifikasi data partisi turunan.
child_partitions
[
  {
    "token": <STRING>,
    "parent_partition_tokens": [<STRING>],
  }, [...]
]
Menampilkan array partisi turunan dan informasi terkaitnya. Ini mencakup string token partisi yang digunakan untuk mengidentifikasi partisi anak dalam kueri, serta token partisi induknya.

Berikut adalah contoh data partisi turunan:

child_partitions_record: {
  "start_timestamp": "2022-09-27T12:40:00.562986Z",
  "record_sequence": "00000001",
  "child_partitions": [
    {
      "token": "child_token_1",
      // To make sure changes for a key is processed in timestamp
      // order, wait until the records returned from all parents
      // have been processed.
      "parent_partition_tokens": ["parent_token_1", "parent_token_2"]
    }
  ],
}

Alur kerja kueri aliran data perubahan

Jalankan kueri aliran perubahan menggunakan ExecuteStreamingSql API, dengan transaksi hanya baca sekali pakai dan batas stempel waktu yang kuat. Fungsi baca aliran perubahan memungkinkan Anda menentukan start_timestamp dan end_timestamp untuk rentang waktu yang diinginkan. Semua data perubahan dalam periode retensi dapat diakses menggunakan batas stempel waktu hanya baca yang kuat.

Semua TransactionOptions lainnya tidak valid untuk kueri aliran perubahan. Selain itu, jika TransactionOptions.read_only.return_read_timestamp ditetapkan ke true, nilai khusus kint64max - 1 akan ditampilkan dalam pesan Transaction yang menjelaskan transaksi, bukan stempel waktu baca yang valid. Nilai khusus ini harus dihapus dan tidak digunakan untuk kueri berikutnya.

Setiap kueri aliran perubahan dapat menampilkan sejumlah baris, yang masing-masing berisi data perubahan data, data heartbeat, atau data partisi anak. Anda tidak perlu menetapkan batas waktu untuk permintaan tersebut.

Contoh:

Alur kerja kueri streaming dimulai dengan menerbitkan kueri aliran perubahan pertama dengan menentukan partition_token ke NULL. Kueri harus menentukan fungsi baca untuk aliran perubahan, stempel waktu awal dan akhir yang diinginkan, dan interval heartbeat. Jika end_timestamp adalah NULL, kueri akan terus menampilkan perubahan data hingga partisi berakhir.

GoogleSQL

SELECT ChangeRecord FROM READ_SingersNameStream (
  start_timestamp => "2022-05-01T09:00:00Z",
  end_timestamp => NULL,
  partition_token => NULL,
  heartbeat_milliseconds => 10000
);

PostgreSQL

SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:00:00Z',
  NULL,
  NULL,
  10000,
  NULL
) ;

Proses data dari kueri ini hingga data partisi turunan ditampilkan. Pada contoh di bawah, dua data partisi turunan dan tiga token partisi ditampilkan, lalu kueri dihentikan. Data partisi turunan dari kueri tertentu selalu memiliki start_timestamp yang sama.

child_partitions_record: {
  "record_type": "child_partitions",
  "start_timestamp": "2022-05-01T09:00:01Z",
  "record_sequence": 1000012389,
  "child_partitions": [
    {
      "token": "child_token_1",
      // Note parent tokens are null for child partitions returned
        // from the initial change stream queries.
      "parent_partition_tokens": [NULL]
    }
    {
      "token": "child_token_2",
      "parent_partition_tokens": [NULL]
    }
  ],
}
child partitions record: {
  "record_type": "child_partitions",
  "start_timestamp": "2022-05-01T09:00:01Z",
  "record_sequence": 1000012390,
  "child_partitions": [
    {
      "token": "child_token_3",
      "parent_partition_tokens": [NULL]
    }
  ],
}

Untuk memproses perubahan mendatang setelah 2022-05-01T09:00:01Z, buat tiga kueri baru dan jalankan secara paralel. Ketiga kueri tersebut bersama-sama menampilkan perubahan data mendatang untuk rentang kunci yang sama yang dicakup induknya. Selalu tetapkan start_timestamp ke start_timestamp dalam data partisi turunan yang sama dan gunakan end_timestamp dan interval heartbeat yang sama untuk memproses data secara konsisten di semua kueri.

GoogleSQL

SELECT ChangeRecord FROM READ_SingersNameStream (
  start_timestamp => "2022-05-01T09:00:01Z",
  end_timestamp => NULL,
  partition_token => "child_token_1",
  heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
  start_timestamp => "2022-05-01T09:00:01Z",
  end_timestamp => NULL,
  partition_token => "child_token_2",
  heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
  start_timestamp => "2022-05-01T09:00:01Z",
  end_timestamp => NULL,
  partition_token => "child_token_3",
  heartbeat_milliseconds => 10000
);

PostgreSQL

SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:00:01Z',
  NULL,
  'child_token_1',
  10000,
  NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:00:01Z',
  NULL,
  'child_token_2',
  10000,
  NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:00:01Z',
  NULL,
  'child_token_3',
  10000,
  NULL
);

Setelah beberapa saat, kueri di child_token_2 akan selesai setelah menampilkan data partisi turunan lainnya, data ini menunjukkan bahwa partisi baru akan mencakup perubahan mendatang untuk child_token_2 dan child_token_3 mulai dari 2022-05-01T09:30:15Z. Kumpulan data yang sama persis akan ditampilkan oleh kueri di child_token_3, karena keduanya adalah partisi induk dari child_token_4 baru. Untuk menjamin pemrosesan data yang teratur dan ketat untuk kunci tertentu, kueri di child_token_4 hanya boleh dimulai setelah semua induk selesai, yang dalam hal ini adalah child_token_2 dan child_token_3. Hanya buat satu kueri untuk setiap token partisi turunan, desain alur kerja kueri harus menunjuk satu induk untuk menunggu dan menjadwalkan kueri di child_token_4.

child partitions record: {
  "record_type": "child_partitions",
  "start_timestamp": "2022-05-01T09:30:15Z",
  "record_sequence": 1000012389,
  "child_partitions": [
    {
      "token": "child_token_4",
      "parent_partition_tokens": [child_token_2, child_token_3],
    }
  ],
}

GoogleSQL

SELECT ChangeRecord FROM READ_SingersNameStream(
  start_timestamp => "2022-05-01T09:30:15Z",
  end_timestamp => NULL,
  partition_token => "child_token_4",
  heartbeat_milliseconds => 10000
);

PostgreSQL

SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:30:15Z',
  NULL,
  'child_token_4',
  10000,
  NULL
);

Temukan contoh penanganan dan penguraian data aliran perubahan di konektor Dataflow SpannerIO Apache Beam di GitHub.