Membaca aliran perubahan dengan Java
Library klien Cloud Bigtable untuk Java menyediakan metode tingkat rendah untuk memproses data perubahan data. Sebelum membaca dokumen ini, baca Ringkasan aliran perubahan.
Sebelum memulai
Sebelum Anda membaca aliran perubahan dengan Java, selesaikan prasyarat berikut.
Menyiapkan autentikasi
Untuk menggunakan contoh Java di halaman ini dalam lingkungan pengembangan lokal, instal dan lakukan inisialisasi gcloud CLI, lalu siapkan Kredensial Default Aplikasi dengan kredensial pengguna Anda.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Buat kredensial autentikasi lokal untuk Akun Google Anda:
gcloud auth application-default login
Untuk informasi selengkapnya, lihat Set up authentication for a local development environment.
Untuk informasi tentang cara menyiapkan autentikasi bagi lingkungan produksi, lihat Set up Application Default Credentials for code running on Google Cloud.
Mengaktifkan aliran data perubahan
Anda harus mengaktifkan aliran perubahan pada tabel sebelum dapat membacanya. Anda juga dapat membuat tabel baru dengan aliran data perubahan yang diaktifkan.
Peran yang diperlukan
Untuk mendapatkan izin yang diperlukan untuk membaca aliran data perubahan Bigtable, minta administrator untuk memberi Anda peran IAM berikut.
- Administrator Bigtable (
roles/bigtable.admin
) pada instance Bigtable yang berisi tabel yang ingin Anda gunakan untuk mengalirkan perubahan
Menambahkan library klien Java sebagai dependensi
Tambahkan kode yang mirip dengan berikut ini ke file pom.xml
Maven Anda. Ganti
VERSION
dengan versi library klien yang Anda
gunakan. Versi harus 2.21.0 atau yang lebih baru.
<dependencies>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bigtable</artifactId>
<version>VERSION</version>
</dependency>
</dependencies>
Menentukan partisi tabel
Untuk mulai membuat permintaan ReadChangeStream
, Anda perlu mengetahui partisi
tabel Anda. Hal ini dapat ditentukan menggunakan
metode GenerateInitialChangeStreamPartitions
. Contoh berikut menunjukkan cara
menggunakan metode ini untuk mendapatkan aliran
ByteStringRanges
yang mewakili setiap partisi dalam tabel. Setiap ByteStringRange
berisi
kunci awal dan akhir untuk sebuah partisi.
ServerStream<ByteStringRange> partitionStream =
client.generateInitialChangeStreamPartitions("MyTable");
Memproses perubahan untuk setiap partisi
Anda kemudian dapat memproses perubahan untuk setiap partisi menggunakan metode
ReadChangeStream
. Ini adalah contoh cara membuka aliran untuk partisi, mulai
dari waktu saat ini.
ReadChangeStreamQuery query =
ReadChangeStreamQuery.create("MyTable")
.streamPartition(partition)
.startTime(Instant.now());
ServerStream<ChangeStreamRecord> changeStream = client.readChangeStream(query);
ReadChangeStreamQuery
menerima argumen berikut:
- Partisi streaming (Wajib) - partisi untuk streaming perubahan
- Salah satu dari kolom berikut:
- Waktu mulai - Jalankan stempel waktu untuk memulai pemrosesan perubahan
- Token lanjutan - Token yang mewakili posisi untuk melanjutkan streaming dari
- Waktu Berakhir (Opsional) - Jalankan stempel waktu untuk menghentikan pemrosesan perubahan saat tercapai. Jika Anda tidak memberikan nilai, aliran data akan terus berjalan.
- Durasi Heartbeat (Opsional) - Frekuensi pesan heartbeat jika tidak ada perubahan baru (default-nya adalah lima detik)
Mengubah format rekaman streaming
Catatan aliran perubahan yang ditampilkan adalah salah satu dari tiga jenis respons:
ChangeStreamMutation
- Pesan yang mewakili data perubahan data.CloseStream
- Pesan yang menunjukkan bahwa klien harus berhenti membaca dari aliran data.- Status - Menunjukkan alasan menutup streaming. Salah satu dari:
OK
- waktu berakhir untuk partisi yang ditentukan telah tercapaiOUT_OF_RANGE
- partisi yang ditentukan sudah tidak ada, artinya pemisahan atau penggabungan telah terjadi pada partisi ini. PermintaanReadChangeStream
baru harus dibuat untuk setiap partisi baru.
NewPartitions
- Memberikan informasi partisi yang telah diperbarui pada responsOUT_OF_RANGE
.ChangeStreamContinuationTokens
- Daftar token yang digunakan untuk melanjutkan permintaanReadChangeStream
baru dari posisi yang sama. Satu perNewPartition
.
- Status - Menunjukkan alasan menutup streaming. Salah satu dari:
Heartbeat
- Pesan berkala dengan informasi yang dapat digunakan untuk memeriksa status aliran data.EstimatedLowWatermark
- Estimasi watermark rendah untuk partisi yang diberikanContinuationToken
- Token untuk melanjutkan streaming partisi yang diberikan dari posisi saat ini.
Konten catatan perubahan data
Setiap record perubahan data berisi hal-hal berikut:
- Entri - perubahan yang dilakukan pada baris, termasuk satu atau beberapa hal berikut:
- Tulis
- Kelompok kolom
- Penentu kolom
- Stempel waktu
- Nilai
- Penghapusan sel
- Kelompok kolom
- Penentu kolom
- Rentang stempel waktu
- Penghapusan kelompok kolom
- Kelompok kolom
- Penghapusan dari baris - Penghapusan dari baris dikonversi menjadi daftar penghapusan dari grup kolom untuk setiap grup kolom tempat baris berisi data.
- Tulis
- Kunci baris - ID untuk baris yang diubah
- Jenis perubahan - baik yang dimulai oleh pengguna atau pembersihan sampah memori
- ID cluster yang menerima perubahan
- Stempel waktu commit - waktu sisi server saat perubahan di-commit ke tabel
- Tie breaker - nilai yang memungkinkan aplikasi yang membaca aliran data menggunakan kebijakan resolusi konflik bawaan Bigtable
- Token - digunakan oleh aplikasi pengguna untuk melanjutkan streaming jika terganggu
- Estimasi watermark rendah - perkiraan waktu sejak partisi data menemukan replikasi di semua cluster. Untuk mengetahui detailnya, lihat Partisi dan Watermark.
Untuk detail tambahan tentang kolom dalam record perubahan data, lihat referensi API untuk ReadChangeStream
.
Menangani perubahan partisi
Jika partisi tabel berubah, permintaan ReadChangeStream
menampilkan pesan CloseStream
dengan informasi yang diperlukan untuk melanjutkan streaming
dari partisi baru.
Untuk pemisahan, partisi ini akan berisi beberapa partisi baru dan ContinuationToken
yang sesuai untuk setiap partisi. Untuk melanjutkan streaming partisi baru dari posisi yang sama, buat permintaan ReadChangeStream
baru untuk setiap partisi baru dengan token yang sesuai.
Misalnya, jika Anda menstreaming partisi [A,C)
dan partisi tersebut dibagi menjadi dua
partisi, [A,B)
dan [B,C)
, Anda dapat mengharapkan urutan
peristiwa berikut:
ReadChangeStream(streamPartition = ByteStringRange(A, C)) receives:
CloseStream(
Status = OUT_OF_RANGE,
NewPartitions = List(ByteStringRange(A, B), ByteStringRange(B, C))
ChangeStreamContinuationTokens = List(foo, bar)
)
Untuk melanjutkan streaming setiap partisi dari posisi yang sama, Anda harus mengirim
permintaan ReadChangeStreamQuery
berikut:
ReadChangeStreamQuery queryAB =
ReadChangeStreamQuery.create("myTable")
.streamPartition(ByteStringRange(A, B))
.continuationTokens(List.of(foo));
ReadChangeStreamQuery queryBC =
ReadChangeStreamQuery.create("myTable")
.streamPartition(ByteStringRange(B, C))
.continuationTokens(List.of(bar));
Untuk penggabungan, agar dapat melanjutkan dari partisi yang sama, Anda harus mengirimkan permintaan
ReadChangeStream
baru yang berisi setiap token dari partisi gabungan.
Misalnya, jika Anda melakukan streaming dua partisi, [A,B)
dan [B,C)
, lalu partisi tersebut
digabungkan ke dalam partisi [A,C)
, urutan peristiwa berikut akan terjadi:
ReadChangeStream(streamPartition = ByteStringRange(A, B)) receives:
CloseStream(
Status = OUT_OF_RANGE,
NewPartitions = List(ByteStringRange(A, C)),
ChangeStreamContinuationTokens = List(foo)
)
ReadChangeStream(streamPartition = ByteStringRange(B, C)) receives:
CloseStream(
Status = OUT_OF_RANGE,
NewPartitions = List(ByteStringRange(A, C)),
ChangeStreamContinuationTokens = List(bar)
)
Untuk melanjutkan partisi streaming [A, C)
dari posisi yang sama, kirim
ReadChangeStreamQuery
seperti berikut:
ReadChangeStreamQuery query =
ReadChangeStreamQuery.create("myTable")
.streamPartition(ByteStringRange(A, C))
.continuationTokens(List.of(foo, bar));