Membaca aliran perubahan dengan Java

Library klien Cloud Bigtable untuk Java menyediakan metode tingkat rendah untuk memproses data perubahan data. Namun, dalam sebagian besar kasus, sebaiknya Anda menstreaming perubahan dengan Dataflow, bukan menggunakan metode yang dijelaskan di halaman ini, karena Dataflow menangani pemisahan dan penggabungan partisi untuk Anda.

Sebelum memulai

Sebelum membaca aliran perubahan dengan Java, pastikan Anda memahami Ringkasan aliran perubahan. Kemudian, selesaikan prasyarat berikut.

Menyiapkan autentikasi

Untuk menggunakan contoh Java di halaman ini dalam lingkungan pengembangan lokal, instal dan lakukan inisialisasi gcloud CLI, lalu siapkan Kredensial Default Aplikasi dengan kredensial pengguna Anda.

  1. Install the Google Cloud CLI.
  2. To initialize the gcloud CLI, run the following command:

    gcloud init
  3. If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

Untuk informasi selengkapnya, lihat Set up authentication for a local development environment.

Untuk informasi tentang cara menyiapkan autentikasi bagi lingkungan produksi, lihat Set up Application Default Credentials for code running on Google Cloud.

Mengaktifkan aliran perubahan

Anda harus mengaktifkan streaming perubahan di tabel sebelum dapat membacanya. Anda juga dapat membuat tabel baru dengan mengaktifkan aliran perubahan.

Peran yang diperlukan

Untuk mendapatkan izin yang Anda perlukan guna membaca aliran perubahan Bigtable, minta administrator untuk memberi Anda peran IAM berikut.

  • Bigtable Administrator (roles/bigtable.admin) di instance Bigtable yang berisi tabel yang ingin Anda streaming perubahannya

Menambahkan library klien Java sebagai dependensi

Tambahkan kode yang mirip dengan kode berikut ke file pom.xml Maven Anda. Ganti VERSION dengan versi library klien yang Anda gunakan. Versi harus 2.21.0 atau yang lebih baru.

<dependencies>
  <dependency>
    <groupId>com.google.cloud</groupId>
    <artifactId>google-cloud-bigtable</artifactId>
    <version>VERSION</version>
  </dependency>
</dependencies>

Menentukan partisi tabel

Untuk mulai membuat permintaan ReadChangeStream, Anda perlu mengetahui partisi tabel. Hal ini dapat ditentukan menggunakan metode GenerateInitialChangeStreamPartitions. Contoh berikut menunjukkan cara menggunakan metode ini untuk mendapatkan streaming ByteStringRanges yang mewakili setiap partisi dalam tabel. Setiap ByteStringRange berisi kunci awal dan akhir untuk partisi.

ServerStream<ByteStringRange> partitionStream =
    client.generateInitialChangeStreamPartitions("MyTable");

Memproses perubahan untuk setiap partisi

Kemudian, Anda dapat memproses perubahan untuk setiap partisi menggunakan metode ReadChangeStream. Ini adalah contoh cara membuka aliran data untuk partisi, mulai dari waktu saat ini.

ReadChangeStreamQuery query =
    ReadChangeStreamQuery.create("MyTable")
        .streamPartition(partition)
        .startTime(Instant.now());
ServerStream<ChangeStreamRecord> changeStream = client.readChangeStream(query);

ReadChangeStreamQuery menerima argumen berikut:

  • Partisi streaming (Wajib) - partisi yang akan diubah streamingnya
  • Salah satu dari berikut ini:
    • Waktu mulai - Stempel waktu commit untuk mulai memproses perubahan dari
    • Token lanjutan - Token yang mewakili posisi untuk melanjutkan streaming
  • Waktu Akhir (Opsional) - Stempel waktu commit untuk berhenti memproses perubahan saat dicapai. Jika Anda tidak memberikan nilai, streaming akan terus membaca.
  • Durasi heartbeat (Opsional) - Frekuensi pesan heartbeat saat tidak ada perubahan baru (defaultnya lima detik)

Mengubah format data streaming

Data aliran perubahan yang ditampilkan adalah salah satu dari tiga jenis respons:

  • ChangeStreamMutation - Pesan yang mewakili data change record.

  • CloseStream - Pesan yang menunjukkan bahwa klien harus berhenti membaca dari streaming.

    • Status - Menunjukkan alasan menutup streaming. Salah satu dari:
      • OK - waktu akhir telah tercapai untuk partisi tertentu
      • OUT_OF_RANGE - partisi yang diberikan tidak ada lagi, yang berarti pemisahan atau penggabungan telah terjadi pada partisi ini. Permintaan ReadChangeStream baru harus dibuat untuk setiap partisi baru.
    • NewPartitions - Memberikan informasi partisi yang diperbarui pada respons OUT_OF_RANGE.
    • ChangeStreamContinuationTokens - Daftar token yang digunakan untuk melanjutkan permintaan ReadChangeStream baru dari posisi yang sama. Satu per NewPartition.
  • Heartbeat - Pesan berkala dengan informasi yang dapat digunakan untuk memeriksa status aliran data.

    • EstimatedLowWatermark - Estimasi watermark rendah untuk partisi tertentu
    • ContinuationToken - Token untuk melanjutkan streaming partisi tertentu dari posisi saat ini.

Konten data perubahan data

Untuk mengetahui informasi tentang data aliran perubahan, lihat Isi data aliran perubahan.

Menangani perubahan pada partisi

Saat partisi tabel berubah, permintaan ReadChangeStream akan menampilkan pesan CloseStream dengan informasi yang diperlukan untuk melanjutkan streaming dari partisi baru.

Untuk pemisahan, ini akan berisi beberapa partisi baru dan ContinuationToken yang sesuai untuk setiap partisi. Untuk melanjutkan streaming partisi baru dari posisi yang sama, Anda membuat permintaan ReadChangeStream baru untuk setiap partisi baru dengan token yang sesuai.

Misalnya, jika Anda melakukan streaming partisi [A,C) dan partisi tersebut terpecah menjadi dua partisi, [A,B) dan [B,C), Anda dapat mengharapkan urutan peristiwa berikut:

ReadChangeStream(streamPartition = ByteStringRange(A, C)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, B), ByteStringRange(B, C))
    ChangeStreamContinuationTokens = List(foo, bar)
)

Untuk melanjutkan streaming setiap partisi dari posisi yang sama, Anda mengirimkan permintaan ReadChangeStreamQuery berikut:

ReadChangeStreamQuery queryAB =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(A, B))
        .continuationTokens(List.of(foo));

ReadChangeStreamQuery queryBC =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(B, C))
        .continuationTokens(List.of(bar));

Untuk penggabungan, agar dapat melanjutkan dari partisi yang sama, Anda harus mengirim permintaan ReadChangeStream baru yang berisi setiap token dari partisi yang digabungkan.

Misalnya, jika Anda melakukan streaming dua partisi, [A,B) dan [B,C), dan partisi tersebut digabungkan ke dalam partisi [A,C), Anda dapat mengharapkan urutan peristiwa berikut:

ReadChangeStream(streamPartition = ByteStringRange(A, B)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, C)),
    ChangeStreamContinuationTokens = List(foo)
)

ReadChangeStream(streamPartition = ByteStringRange(B, C)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, C)),
    ChangeStreamContinuationTokens = List(bar)
)

Untuk melanjutkan streaming partisi [A, C) dari posisi yang sama, Anda mengirim ReadChangeStreamQuery seperti berikut:

ReadChangeStreamQuery query =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(A, C))
        .continuationTokens(List.of(foo, bar));

Langkah selanjutnya