Membaca aliran perubahan dengan Java

Library klien Cloud Bigtable untuk Java menyediakan metode tingkat rendah untuk memproses data perubahan data. Sebelum membaca dokumen ini, baca Ringkasan aliran perubahan.

Sebelum memulai

Sebelum Anda membaca aliran perubahan dengan Java, selesaikan prasyarat berikut.

Menyiapkan autentikasi

Untuk menggunakan contoh Java di halaman ini dalam lingkungan pengembangan lokal, instal dan lakukan inisialisasi gcloud CLI, lalu siapkan Kredensial Default Aplikasi dengan kredensial pengguna Anda.

  1. Install the Google Cloud CLI.
  2. To initialize the gcloud CLI, run the following command:

    gcloud init
  3. Buat kredensial autentikasi lokal untuk Akun Google Anda:

    gcloud auth application-default login

Untuk informasi selengkapnya, lihat Set up authentication for a local development environment.

Untuk informasi tentang cara menyiapkan autentikasi bagi lingkungan produksi, lihat Set up Application Default Credentials for code running on Google Cloud.

Mengaktifkan aliran data perubahan

Anda harus mengaktifkan aliran perubahan pada tabel sebelum dapat membacanya. Anda juga dapat membuat tabel baru dengan aliran data perubahan yang diaktifkan.

Peran yang diperlukan

Untuk mendapatkan izin yang diperlukan untuk membaca aliran data perubahan Bigtable, minta administrator untuk memberi Anda peran IAM berikut.

  • Administrator Bigtable (roles/bigtable.admin) pada instance Bigtable yang berisi tabel yang ingin Anda gunakan untuk mengalirkan perubahan

Menambahkan library klien Java sebagai dependensi

Tambahkan kode yang mirip dengan berikut ini ke file pom.xml Maven Anda. Ganti VERSION dengan versi library klien yang Anda gunakan. Versi harus 2.21.0 atau yang lebih baru.

<dependencies>
  <dependency>
    <groupId>com.google.cloud</groupId>
    <artifactId>google-cloud-bigtable</artifactId>
    <version>VERSION</version>
  </dependency>
</dependencies>

Menentukan partisi tabel

Untuk mulai membuat permintaan ReadChangeStream, Anda perlu mengetahui partisi tabel Anda. Hal ini dapat ditentukan menggunakan metode GenerateInitialChangeStreamPartitions. Contoh berikut menunjukkan cara menggunakan metode ini untuk mendapatkan aliran ByteStringRanges yang mewakili setiap partisi dalam tabel. Setiap ByteStringRange berisi kunci awal dan akhir untuk sebuah partisi.

ServerStream<ByteStringRange> partitionStream =
    client.generateInitialChangeStreamPartitions("MyTable");

Memproses perubahan untuk setiap partisi

Anda kemudian dapat memproses perubahan untuk setiap partisi menggunakan metode ReadChangeStream. Ini adalah contoh cara membuka aliran untuk partisi, mulai dari waktu saat ini.

ReadChangeStreamQuery query =
    ReadChangeStreamQuery.create("MyTable")
        .streamPartition(partition)
        .startTime(Instant.now());
ServerStream<ChangeStreamRecord> changeStream = client.readChangeStream(query);

ReadChangeStreamQuery menerima argumen berikut:

  • Partisi streaming (Wajib) - partisi untuk streaming perubahan
  • Salah satu dari kolom berikut:
    • Waktu mulai - Jalankan stempel waktu untuk memulai pemrosesan perubahan
    • Token lanjutan - Token yang mewakili posisi untuk melanjutkan streaming dari
  • Waktu Berakhir (Opsional) - Jalankan stempel waktu untuk menghentikan pemrosesan perubahan saat tercapai. Jika Anda tidak memberikan nilai, aliran data akan terus berjalan.
  • Durasi Heartbeat (Opsional) - Frekuensi pesan heartbeat jika tidak ada perubahan baru (default-nya adalah lima detik)

Mengubah format rekaman streaming

Catatan aliran perubahan yang ditampilkan adalah salah satu dari tiga jenis respons:

  • ChangeStreamMutation - Pesan yang mewakili data perubahan data.

  • CloseStream - Pesan yang menunjukkan bahwa klien harus berhenti membaca dari aliran data.

    • Status - Menunjukkan alasan menutup streaming. Salah satu dari:
      • OK - waktu berakhir untuk partisi yang ditentukan telah tercapai
      • OUT_OF_RANGE - partisi yang ditentukan sudah tidak ada, artinya pemisahan atau penggabungan telah terjadi pada partisi ini. Permintaan ReadChangeStream baru harus dibuat untuk setiap partisi baru.
    • NewPartitions - Memberikan informasi partisi yang telah diperbarui pada respons OUT_OF_RANGE.
    • ChangeStreamContinuationTokens - Daftar token yang digunakan untuk melanjutkan permintaan ReadChangeStream baru dari posisi yang sama. Satu per NewPartition.
  • Heartbeat - Pesan berkala dengan informasi yang dapat digunakan untuk memeriksa status aliran data.

    • EstimatedLowWatermark - Estimasi watermark rendah untuk partisi yang diberikan
    • ContinuationToken - Token untuk melanjutkan streaming partisi yang diberikan dari posisi saat ini.

Konten catatan perubahan data

Setiap record perubahan data berisi hal-hal berikut:

  • Entri - perubahan yang dilakukan pada baris, termasuk satu atau beberapa hal berikut:
    • Tulis
      • Kelompok kolom
      • Penentu kolom
      • Stempel waktu
      • Nilai
    • Penghapusan sel
      • Kelompok kolom
      • Penentu kolom
      • Rentang stempel waktu
    • Penghapusan kelompok kolom
      • Kelompok kolom
      • Penghapusan dari baris - Penghapusan dari baris dikonversi menjadi daftar penghapusan dari grup kolom untuk setiap grup kolom tempat baris berisi data.
  • Kunci baris - ID untuk baris yang diubah
  • Jenis perubahan - baik yang dimulai oleh pengguna atau pembersihan sampah memori
  • ID cluster yang menerima perubahan
  • Stempel waktu commit - waktu sisi server saat perubahan di-commit ke tabel
  • Tie breaker - nilai yang memungkinkan aplikasi yang membaca aliran data menggunakan kebijakan resolusi konflik bawaan Bigtable
  • Token - digunakan oleh aplikasi pengguna untuk melanjutkan streaming jika terganggu
  • Estimasi watermark rendah - perkiraan waktu sejak partisi data menemukan replikasi di semua cluster. Untuk mengetahui detailnya, lihat Partisi dan Watermark.

Untuk detail tambahan tentang kolom dalam record perubahan data, lihat referensi API untuk ReadChangeStream.

Menangani perubahan partisi

Jika partisi tabel berubah, permintaan ReadChangeStream menampilkan pesan CloseStream dengan informasi yang diperlukan untuk melanjutkan streaming dari partisi baru.

Untuk pemisahan, partisi ini akan berisi beberapa partisi baru dan ContinuationToken yang sesuai untuk setiap partisi. Untuk melanjutkan streaming partisi baru dari posisi yang sama, buat permintaan ReadChangeStream baru untuk setiap partisi baru dengan token yang sesuai.

Misalnya, jika Anda menstreaming partisi [A,C) dan partisi tersebut dibagi menjadi dua partisi, [A,B) dan [B,C), Anda dapat mengharapkan urutan peristiwa berikut:

ReadChangeStream(streamPartition = ByteStringRange(A, C)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, B), ByteStringRange(B, C))
    ChangeStreamContinuationTokens = List(foo, bar)
)

Untuk melanjutkan streaming setiap partisi dari posisi yang sama, Anda harus mengirim permintaan ReadChangeStreamQuery berikut:

ReadChangeStreamQuery queryAB =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(A, B))
        .continuationTokens(List.of(foo));

ReadChangeStreamQuery queryBC =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(B, C))
        .continuationTokens(List.of(bar));

Untuk penggabungan, agar dapat melanjutkan dari partisi yang sama, Anda harus mengirimkan permintaan ReadChangeStream baru yang berisi setiap token dari partisi gabungan.

Misalnya, jika Anda melakukan streaming dua partisi, [A,B) dan [B,C), lalu partisi tersebut digabungkan ke dalam partisi [A,C), urutan peristiwa berikut akan terjadi:

ReadChangeStream(streamPartition = ByteStringRange(A, B)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, C)),
    ChangeStreamContinuationTokens = List(foo)
)

ReadChangeStream(streamPartition = ByteStringRange(B, C)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, C)),
    ChangeStreamContinuationTokens = List(bar)
)

Untuk melanjutkan partisi streaming [A, C) dari posisi yang sama, kirim ReadChangeStreamQuery seperti berikut:

ReadChangeStreamQuery query =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(A, C))
        .continuationTokens(List.of(foo, bar));

Langkah selanjutnya