Membaca aliran perubahan dengan Java
Library klien Cloud Bigtable untuk Java menyediakan metode tingkat rendah untuk memproses data perubahan data. Namun, dalam sebagian besar kasus, sebaiknya Anda menstreaming perubahan dengan Dataflow, bukan menggunakan metode yang dijelaskan di halaman ini, karena Dataflow menangani pemisahan dan penggabungan partisi untuk Anda.
Sebelum memulai
Sebelum membaca aliran perubahan dengan Java, pastikan Anda memahami Ringkasan aliran perubahan. Kemudian, selesaikan prasyarat berikut.
Menyiapkan autentikasi
Untuk menggunakan contoh Java di halaman ini dalam lingkungan pengembangan lokal, instal dan lakukan inisialisasi gcloud CLI, lalu siapkan Kredensial Default Aplikasi dengan kredensial pengguna Anda.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
If you're using a local shell, then create local authentication credentials for your user account:
gcloud auth application-default login
You don't need to do this if you're using Cloud Shell.
Untuk informasi selengkapnya, lihat Set up authentication for a local development environment.
Untuk informasi tentang cara menyiapkan autentikasi bagi lingkungan produksi, lihat Set up Application Default Credentials for code running on Google Cloud.
Mengaktifkan aliran perubahan
Anda harus mengaktifkan streaming perubahan di tabel sebelum dapat membacanya. Anda juga dapat membuat tabel baru dengan mengaktifkan aliran perubahan.
Peran yang diperlukan
Untuk mendapatkan izin yang Anda perlukan guna membaca aliran perubahan Bigtable, minta administrator untuk memberi Anda peran IAM berikut.
- Bigtable Administrator (
roles/bigtable.admin
) di instance Bigtable yang berisi tabel yang ingin Anda streaming perubahannya
Menambahkan library klien Java sebagai dependensi
Tambahkan kode yang mirip dengan kode berikut ke file pom.xml
Maven Anda. Ganti
VERSION
dengan versi library klien yang Anda
gunakan. Versi harus 2.21.0 atau yang lebih baru.
<dependencies>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bigtable</artifactId>
<version>VERSION</version>
</dependency>
</dependencies>
Menentukan partisi tabel
Untuk mulai membuat permintaan ReadChangeStream
, Anda perlu mengetahui partisi tabel. Hal ini dapat ditentukan menggunakan
metode GenerateInitialChangeStreamPartitions
. Contoh berikut menunjukkan cara
menggunakan metode ini untuk mendapatkan streaming
ByteStringRanges
yang mewakili setiap partisi dalam tabel. Setiap ByteStringRange
berisi
kunci awal dan akhir untuk partisi.
ServerStream<ByteStringRange> partitionStream =
client.generateInitialChangeStreamPartitions("MyTable");
Memproses perubahan untuk setiap partisi
Kemudian, Anda dapat memproses perubahan untuk setiap partisi menggunakan metode
ReadChangeStream
. Ini adalah contoh cara membuka aliran data untuk partisi, mulai
dari waktu saat ini.
ReadChangeStreamQuery query =
ReadChangeStreamQuery.create("MyTable")
.streamPartition(partition)
.startTime(Instant.now());
ServerStream<ChangeStreamRecord> changeStream = client.readChangeStream(query);
ReadChangeStreamQuery
menerima argumen berikut:
- Partisi streaming (Wajib) - partisi yang akan diubah streamingnya
- Salah satu dari berikut ini:
- Waktu mulai - Stempel waktu commit untuk mulai memproses perubahan dari
- Token lanjutan - Token yang mewakili posisi untuk melanjutkan streaming
- Waktu Akhir (Opsional) - Stempel waktu commit untuk berhenti memproses perubahan saat dicapai. Jika Anda tidak memberikan nilai, streaming akan terus membaca.
- Durasi heartbeat (Opsional) - Frekuensi pesan heartbeat saat tidak ada perubahan baru (defaultnya lima detik)
Mengubah format data streaming
Data aliran perubahan yang ditampilkan adalah salah satu dari tiga jenis respons:
ChangeStreamMutation
- Pesan yang mewakili data change record.CloseStream
- Pesan yang menunjukkan bahwa klien harus berhenti membaca dari streaming.- Status - Menunjukkan alasan menutup streaming. Salah satu dari:
OK
- waktu akhir telah tercapai untuk partisi tertentuOUT_OF_RANGE
- partisi yang diberikan tidak ada lagi, yang berarti pemisahan atau penggabungan telah terjadi pada partisi ini. PermintaanReadChangeStream
baru harus dibuat untuk setiap partisi baru.
NewPartitions
- Memberikan informasi partisi yang diperbarui pada responsOUT_OF_RANGE
.ChangeStreamContinuationTokens
- Daftar token yang digunakan untuk melanjutkan permintaanReadChangeStream
baru dari posisi yang sama. Satu perNewPartition
.
- Status - Menunjukkan alasan menutup streaming. Salah satu dari:
Heartbeat
- Pesan berkala dengan informasi yang dapat digunakan untuk memeriksa status aliran data.EstimatedLowWatermark
- Estimasi watermark rendah untuk partisi tertentuContinuationToken
- Token untuk melanjutkan streaming partisi tertentu dari posisi saat ini.
Konten data perubahan data
Untuk mengetahui informasi tentang data aliran perubahan, lihat Isi data aliran perubahan.
Menangani perubahan pada partisi
Saat partisi tabel berubah, permintaan ReadChangeStream
akan menampilkan pesan CloseStream
dengan informasi yang diperlukan untuk melanjutkan streaming
dari partisi baru.
Untuk pemisahan, ini akan berisi beberapa partisi baru dan ContinuationToken
yang sesuai untuk setiap partisi. Untuk melanjutkan streaming partisi baru
dari posisi yang sama, Anda membuat permintaan ReadChangeStream
baru
untuk setiap partisi baru dengan token yang sesuai.
Misalnya, jika Anda melakukan streaming partisi [A,C)
dan partisi tersebut terpecah menjadi dua
partisi, [A,B)
dan [B,C)
, Anda dapat mengharapkan urutan peristiwa
berikut:
ReadChangeStream(streamPartition = ByteStringRange(A, C)) receives:
CloseStream(
Status = OUT_OF_RANGE,
NewPartitions = List(ByteStringRange(A, B), ByteStringRange(B, C))
ChangeStreamContinuationTokens = List(foo, bar)
)
Untuk melanjutkan streaming setiap partisi dari posisi yang sama, Anda mengirimkan
permintaan ReadChangeStreamQuery
berikut:
ReadChangeStreamQuery queryAB =
ReadChangeStreamQuery.create("myTable")
.streamPartition(ByteStringRange(A, B))
.continuationTokens(List.of(foo));
ReadChangeStreamQuery queryBC =
ReadChangeStreamQuery.create("myTable")
.streamPartition(ByteStringRange(B, C))
.continuationTokens(List.of(bar));
Untuk penggabungan, agar dapat melanjutkan dari partisi yang sama, Anda harus mengirim permintaan ReadChangeStream
baru yang berisi setiap token dari partisi yang digabungkan.
Misalnya, jika Anda melakukan streaming dua partisi, [A,B)
dan [B,C)
, dan partisi tersebut
digabungkan ke dalam partisi [A,C)
, Anda dapat mengharapkan urutan peristiwa berikut:
ReadChangeStream(streamPartition = ByteStringRange(A, B)) receives:
CloseStream(
Status = OUT_OF_RANGE,
NewPartitions = List(ByteStringRange(A, C)),
ChangeStreamContinuationTokens = List(foo)
)
ReadChangeStream(streamPartition = ByteStringRange(B, C)) receives:
CloseStream(
Status = OUT_OF_RANGE,
NewPartitions = List(ByteStringRange(A, C)),
ChangeStreamContinuationTokens = List(bar)
)
Untuk melanjutkan streaming partisi [A, C)
dari posisi yang sama, Anda mengirim
ReadChangeStreamQuery
seperti berikut:
ReadChangeStreamQuery query =
ReadChangeStreamQuery.create("myTable")
.streamPartition(ByteStringRange(A, C))
.continuationTokens(List.of(foo, bar));