Membaca stream perubahan dengan Java

Library klien Cloud Bigtable untuk Java menyediakan metode tingkat rendah untuk memproses rekaman perubahan data. Namun, dalam sebagian besar kasus, sebaiknya Anda mengalirkan perubahan dengan Dataflow, bukan menggunakan metode yang dijelaskan di halaman ini, karena Dataflow menangani pemisahan dan penggabungan partisi untuk Anda.

Sebelum memulai

Sebelum membaca aliran perubahan dengan Java, pastikan Anda memahami Ringkasan aliran perubahan. Kemudian, selesaikan prasyarat berikut.

Menyiapkan autentikasi

Untuk menggunakan contoh Java di halaman ini dalam lingkungan pengembangan lokal, instal dan lakukan inisialisasi gcloud CLI, lalu siapkan Kredensial Default Aplikasi dengan kredensial pengguna Anda.

    Menginstal Google Cloud CLI.

    Jika Anda menggunakan penyedia identitas (IdP) eksternal, Anda harus login ke gcloud CLI dengan identitas gabungan Anda terlebih dahulu.

    If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

Untuk informasi selengkapnya, lihat Set up authentication for a local development environment.

Untuk informasi tentang cara menyiapkan autentikasi bagi lingkungan produksi, lihat Set up Application Default Credentials for code running on Google Cloud.

Mengaktifkan aliran perubahan

Anda harus mengaktifkan aliran perubahan pada tabel sebelum dapat membacanya. Anda juga dapat membuat tabel baru dengan aliran perubahan yang diaktifkan.

Peran yang diperlukan

Untuk mendapatkan izin yang Anda perlukan untuk membaca aliran perubahan Bigtable, minta administrator Anda untuk memberi Anda peran IAM berikut.

  • Bigtable Administrator (roles/bigtable.admin) di instance Bigtable yang berisi tabel yang ingin Anda streaming perubahannya

Menambahkan library klien Java sebagai dependensi

Tambahkan kode yang mirip dengan kode berikut ke file pom.xml Maven Anda. Ganti VERSION dengan versi library klien yang Anda gunakan. Versinya harus 2.21.0 atau yang lebih baru.

<dependencies>
  <dependency>
    <groupId>com.google.cloud</groupId>
    <artifactId>google-cloud-bigtable</artifactId>
    <version>VERSION</version>
  </dependency>
</dependencies>

Menentukan partisi tabel

Untuk mulai membuat permintaan ReadChangeStream, Anda perlu mengetahui partisi tabel Anda. Hal ini dapat ditentukan menggunakan metode GenerateInitialChangeStreamPartitions. Contoh berikut menunjukkan cara menggunakan metode ini untuk mendapatkan aliran ByteStringRanges yang merepresentasikan setiap partisi dalam tabel. Setiap ByteStringRange berisi kunci awal dan akhir untuk partisi.

ServerStream<ByteStringRange> partitionStream =
    client.generateInitialChangeStreamPartitions("MyTable");

Perubahan proses untuk setiap partisi

Kemudian, Anda dapat memproses perubahan untuk setiap partisi menggunakan metode ReadChangeStream. Berikut contoh cara membuka stream untuk partisi, dimulai dari waktu saat ini.

ReadChangeStreamQuery query =
    ReadChangeStreamQuery.create("MyTable")
        .streamPartition(partition)
        .startTime(Instant.now());
ServerStream<ChangeStreamRecord> changeStream = client.readChangeStream(query);

ReadChangeStreamQuery menerima argumen berikut:

  • Partisi streaming (Wajib) - partisi untuk melakukan streaming perubahan dari
  • Salah satu dari berikut ini:
    • Waktu mulai - Stempel waktu penerapan untuk mulai memproses perubahan dari
    • Token lanjutan - Token yang merepresentasikan posisi untuk melanjutkan streaming
  • Waktu Berakhir (Opsional) - Stempel waktu penerapan untuk menghentikan pemrosesan perubahan saat tercapai. Jika Anda tidak memberikan nilai, aliran akan terus membaca.
  • Durasi detak jantung (Opsional) - Frekuensi pesan detak jantung saat tidak ada perubahan baru (default lima detik)

Mengubah format rekaman streaming

Data aliran perubahan yang ditampilkan adalah salah satu dari tiga jenis respons:

  • ChangeStreamMutation - Pesan yang merepresentasikan catatan perubahan data.

  • CloseStream - Pesan yang menunjukkan bahwa klien harus berhenti membaca dari stream.

    • Status - Menunjukkan alasan penutupan aliran data. Salah satu dari:
      • OK - waktu berakhir telah tercapai untuk partisi tertentu
      • OUT_OF_RANGE - partisi yang diberikan tidak ada lagi, artinya pemisahan atau penggabungan telah terjadi pada partisi ini. Permintaan ReadChangeStream baru harus dibuat untuk setiap partisi baru.
    • NewPartitions - Memberikan informasi partisi yang diperbarui pada respons OUT_OF_RANGE.
    • ChangeStreamContinuationTokens - Daftar token yang digunakan untuk melanjutkan permintaan ReadChangeStream baru dari posisi yang sama. Satu per NewPartition.
  • Heartbeat - Pesan berkala dengan informasi yang dapat digunakan untuk memeriksa status streaming.

    • EstimatedLowWatermark - Estimasi watermark rendah untuk partisi tertentu
    • ContinuationToken - Token untuk melanjutkan streaming partisi tertentu dari posisi saat ini.

Isi catatan perubahan data

Untuk mengetahui informasi tentang rekaman aliran perubahan, lihat Isi rekaman perubahan data.

Menangani perubahan pada partisi

Saat partisi tabel berubah, permintaan ReadChangeStream akan menampilkan pesan CloseStream dengan informasi yang diperlukan untuk melanjutkan streaming dari partisi baru.

Untuk pemisahan, ini akan berisi beberapa partisi baru dan ContinuationToken yang sesuai untuk setiap partisi. Untuk melanjutkan streaming partisi baru dari posisi yang sama, Anda membuat permintaan ReadChangeStream baru untuk setiap partisi baru dengan token yang sesuai.

Misalnya, jika Anda melakukan streaming partisi [A,C) dan partisi tersebut dibagi menjadi dua partisi, [A,B) dan [B,C), Anda dapat mengharapkan urutan peristiwa berikut:

ReadChangeStream(streamPartition = ByteStringRange(A, C)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, B), ByteStringRange(B, C))
    ChangeStreamContinuationTokens = List(foo, bar)
)

Untuk melanjutkan streaming setiap partisi dari posisi yang sama, Anda mengirimkan permintaan ReadChangeStreamQuery berikut:

ReadChangeStreamQuery queryAB =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(A, B))
        .continuationTokens(List.of(foo));

ReadChangeStreamQuery queryBC =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(B, C))
        .continuationTokens(List.of(bar));

Untuk penggabungan, agar dapat melanjutkan dari partisi yang sama, Anda perlu mengirim permintaan ReadChangeStream baru yang berisi setiap token dari partisi yang digabungkan.

Misalnya, jika Anda melakukan streaming dua partisi, [A,B) dan [B,C), dan keduanya digabungkan ke dalam partisi [A,C), Anda dapat mengharapkan urutan peristiwa berikut:

ReadChangeStream(streamPartition = ByteStringRange(A, B)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, C)),
    ChangeStreamContinuationTokens = List(foo)
)

ReadChangeStream(streamPartition = ByteStringRange(B, C)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, C)),
    ChangeStreamContinuationTokens = List(bar)
)

Untuk melanjutkan partisi streaming [A, C) dari posisi yang sama, Anda mengirim ReadChangeStreamQuery seperti berikut:

ReadChangeStreamQuery query =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(A, C))
        .continuationTokens(List.of(foo, bar));

Langkah berikutnya