Membaca stream perubahan dengan Java
Library klien Cloud Bigtable untuk Java menyediakan metode tingkat rendah untuk memproses rekaman perubahan data. Namun, dalam sebagian besar kasus, sebaiknya Anda mengalirkan perubahan dengan Dataflow, bukan menggunakan metode yang dijelaskan di halaman ini, karena Dataflow menangani pemisahan dan penggabungan partisi untuk Anda.
Sebelum memulai
Sebelum membaca aliran perubahan dengan Java, pastikan Anda memahami Ringkasan aliran perubahan. Kemudian, selesaikan prasyarat berikut.
Menyiapkan autentikasi
Untuk menggunakan contoh Java di halaman ini dalam lingkungan pengembangan lokal, instal dan lakukan inisialisasi gcloud CLI, lalu siapkan Kredensial Default Aplikasi dengan kredensial pengguna Anda.
Menginstal Google Cloud CLI.
Jika Anda menggunakan penyedia identitas (IdP) eksternal, Anda harus login ke gcloud CLI dengan identitas gabungan Anda terlebih dahulu.
If you're using a local shell, then create local authentication credentials for your user account:
gcloud auth application-default login
You don't need to do this if you're using Cloud Shell.
If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.
Untuk informasi selengkapnya, lihat Set up authentication for a local development environment.
Untuk informasi tentang cara menyiapkan autentikasi bagi lingkungan produksi, lihat Set up Application Default Credentials for code running on Google Cloud.
Mengaktifkan aliran perubahan
Anda harus mengaktifkan aliran perubahan pada tabel sebelum dapat membacanya. Anda juga dapat membuat tabel baru dengan aliran perubahan yang diaktifkan.
Peran yang diperlukan
Untuk mendapatkan izin yang Anda perlukan untuk membaca aliran perubahan Bigtable, minta administrator Anda untuk memberi Anda peran IAM berikut.
- Bigtable Administrator
(
roles/bigtable.admin
) di instance Bigtable yang berisi tabel yang ingin Anda streaming perubahannya
Menambahkan library klien Java sebagai dependensi
Tambahkan kode yang mirip dengan kode berikut ke file pom.xml
Maven Anda. Ganti
VERSION
dengan versi library klien yang Anda gunakan. Versinya harus 2.21.0 atau yang lebih baru.
<dependencies>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bigtable</artifactId>
<version>VERSION</version>
</dependency>
</dependencies>
Menentukan partisi tabel
Untuk mulai membuat permintaan ReadChangeStream
, Anda perlu mengetahui partisi tabel Anda. Hal ini dapat ditentukan menggunakan metode GenerateInitialChangeStreamPartitions
. Contoh berikut menunjukkan cara menggunakan metode ini untuk mendapatkan aliran ByteStringRanges
yang merepresentasikan setiap partisi dalam tabel. Setiap ByteStringRange
berisi
kunci awal dan akhir untuk partisi.
ServerStream<ByteStringRange> partitionStream =
client.generateInitialChangeStreamPartitions("MyTable");
Perubahan proses untuk setiap partisi
Kemudian, Anda dapat memproses perubahan untuk setiap partisi menggunakan metode ReadChangeStream
. Berikut contoh cara membuka stream untuk partisi, dimulai dari waktu saat ini.
ReadChangeStreamQuery query =
ReadChangeStreamQuery.create("MyTable")
.streamPartition(partition)
.startTime(Instant.now());
ServerStream<ChangeStreamRecord> changeStream = client.readChangeStream(query);
ReadChangeStreamQuery
menerima argumen berikut:
- Partisi streaming (Wajib) - partisi untuk melakukan streaming perubahan dari
- Salah satu dari berikut ini:
- Waktu mulai - Stempel waktu penerapan untuk mulai memproses perubahan dari
- Token lanjutan - Token yang merepresentasikan posisi untuk melanjutkan streaming
- Waktu Berakhir (Opsional) - Stempel waktu penerapan untuk menghentikan pemrosesan perubahan saat tercapai. Jika Anda tidak memberikan nilai, aliran akan terus membaca.
- Durasi detak jantung (Opsional) - Frekuensi pesan detak jantung saat tidak ada perubahan baru (default lima detik)
Mengubah format rekaman streaming
Data aliran perubahan yang ditampilkan adalah salah satu dari tiga jenis respons:
ChangeStreamMutation
- Pesan yang merepresentasikan catatan perubahan data.CloseStream
- Pesan yang menunjukkan bahwa klien harus berhenti membaca dari stream.- Status - Menunjukkan alasan penutupan aliran data. Salah satu dari:
OK
- waktu berakhir telah tercapai untuk partisi tertentuOUT_OF_RANGE
- partisi yang diberikan tidak ada lagi, artinya pemisahan atau penggabungan telah terjadi pada partisi ini. PermintaanReadChangeStream
baru harus dibuat untuk setiap partisi baru.
NewPartitions
- Memberikan informasi partisi yang diperbarui pada responsOUT_OF_RANGE
.ChangeStreamContinuationTokens
- Daftar token yang digunakan untuk melanjutkan permintaanReadChangeStream
baru dari posisi yang sama. Satu perNewPartition
.
- Status - Menunjukkan alasan penutupan aliran data. Salah satu dari:
Heartbeat
- Pesan berkala dengan informasi yang dapat digunakan untuk memeriksa status streaming.EstimatedLowWatermark
- Estimasi watermark rendah untuk partisi tertentuContinuationToken
- Token untuk melanjutkan streaming partisi tertentu dari posisi saat ini.
Isi catatan perubahan data
Untuk mengetahui informasi tentang rekaman aliran perubahan, lihat Isi rekaman perubahan data.
Menangani perubahan pada partisi
Saat partisi tabel berubah, permintaan ReadChangeStream
akan menampilkan pesan CloseStream
dengan informasi yang diperlukan untuk melanjutkan streaming dari partisi baru.
Untuk pemisahan, ini akan berisi beberapa partisi baru dan ContinuationToken
yang sesuai untuk setiap partisi. Untuk melanjutkan streaming partisi baru
dari posisi yang sama, Anda membuat permintaan ReadChangeStream
baru
untuk setiap partisi baru dengan token yang sesuai.
Misalnya, jika Anda melakukan streaming partisi [A,C)
dan partisi tersebut dibagi menjadi dua
partisi, [A,B)
dan [B,C)
, Anda dapat mengharapkan urutan
peristiwa berikut:
ReadChangeStream(streamPartition = ByteStringRange(A, C)) receives:
CloseStream(
Status = OUT_OF_RANGE,
NewPartitions = List(ByteStringRange(A, B), ByteStringRange(B, C))
ChangeStreamContinuationTokens = List(foo, bar)
)
Untuk melanjutkan streaming setiap partisi dari posisi yang sama, Anda mengirimkan
permintaan ReadChangeStreamQuery
berikut:
ReadChangeStreamQuery queryAB =
ReadChangeStreamQuery.create("myTable")
.streamPartition(ByteStringRange(A, B))
.continuationTokens(List.of(foo));
ReadChangeStreamQuery queryBC =
ReadChangeStreamQuery.create("myTable")
.streamPartition(ByteStringRange(B, C))
.continuationTokens(List.of(bar));
Untuk penggabungan, agar dapat melanjutkan dari partisi yang sama, Anda perlu mengirim permintaan ReadChangeStream
baru yang berisi setiap token dari partisi yang digabungkan.
Misalnya, jika Anda melakukan streaming dua partisi, [A,B)
dan [B,C)
, dan keduanya
digabungkan ke dalam partisi [A,C)
, Anda dapat mengharapkan urutan peristiwa berikut:
ReadChangeStream(streamPartition = ByteStringRange(A, B)) receives:
CloseStream(
Status = OUT_OF_RANGE,
NewPartitions = List(ByteStringRange(A, C)),
ChangeStreamContinuationTokens = List(foo)
)
ReadChangeStream(streamPartition = ByteStringRange(B, C)) receives:
CloseStream(
Status = OUT_OF_RANGE,
NewPartitions = List(ByteStringRange(A, C)),
ChangeStreamContinuationTokens = List(bar)
)
Untuk melanjutkan partisi streaming [A, C)
dari posisi yang sama, Anda mengirim
ReadChangeStreamQuery
seperti berikut:
ReadChangeStreamQuery query =
ReadChangeStreamQuery.create("myTable")
.streamPartition(ByteStringRange(A, C))
.continuationTokens(List.of(foo, bar));