Mengalirkan perubahan dengan Dataflow

Konektor Bigtable Beam memungkinkan Anda menggunakan Dataflow untuk membaca catatan perubahan data Bigtable tanpa perlu melacak atau memproses perubahan partisi dalam kode Anda, karena konektornya akan menangani logika tersebut untuk Anda.

Dokumen ini menjelaskan cara mengonfigurasi dan menggunakan konektor Bigtable Beam untuk membaca aliran perubahan menggunakan pipeline Dataflow. Sebelum membaca dokumen ini, Anda harus membaca Ringkasan aliran data perubahan dan memahami Dataflow.

Alternatif untuk membangun pipeline Anda sendiri

Jika tidak ingin mem-build pipeline Dataflow sendiri, Anda dapat menggunakan salah satu opsi berikut.

Anda dapat menggunakan template Dataflow yang disediakan oleh Google.

Anda juga dapat menggunakan contoh kode dari tutorial atau panduan memulai Bigtable sebagai titik awal untuk kode Anda.

Pastikan kode yang Anda buat menggunakan google cloud libraries-bom versi 26.14.0 atau yang lebih baru.

Detail konektor

Metode konektor Bigtable Beam, BigtableIO.readChangeStream, memungkinkan Anda membaca aliran catatan perubahan data (ChangeStreamMutation) yang dapat Anda proses. Konektor Bigtable Beam adalah komponen dari repositori GitHub Apache Beam. Untuk deskripsi kode konektor, lihat komentar di BigtableIO.java.

Anda harus menggunakan konektor dengan Beam versi 2.48.0 atau yang lebih baru. Periksa dukungan runtime Apache Beam untuk memastikan Anda menggunakan versi Java yang didukung. Kemudian, Anda dapat men-deploy pipeline yang menggunakan konektor ke Dataflow, yang menangani penyediaan dan pengelolaan resource serta membantu skalabilitas dan keandalan pemrosesan data aliran data.

Untuk mengetahui informasi selengkapnya tentang model pemrograman Apache Beam, lihat dokumentasi Beam.

Mengelompokkan data tanpa waktu peristiwa

Kumpulan data perubahan data yang di-streaming menggunakan konektor Bigtable Beam tidak kompatibel dengan fungsi Dataflow yang bergantung pada waktu peristiwa.

Seperti yang dijelaskan dalam Replikasi dan watermark, watermark rendah mungkin tidak muncul jika replikasi untuk partisi belum mencapai instance lainnya. Saat watermark yang rendah berhenti ditampilkan, aliran data perubahan dapat terhenti.

Agar streaming tidak terhenti, konektor Bigtable Beam akan membuat output semua data dengan stempel waktu output nol. Stempel waktu nol membuat Dataflow menganggap semua data perubahan data sebagai data terlambat. Akibatnya, fitur Dataflow yang bergantung pada waktu peristiwa tidak kompatibel dengan aliran data perubahan Bigtable. Secara khusus, Anda tidak dapat menggunakan fungsi windowing, pemicu waktu peristiwa, atau timer waktu peristiwa.

Sebagai gantinya, Anda dapat menggunakan GlobalWindows dengan pemicu waktu non-peristiwa untuk mengelompokkan data terlambat ini ke dalam panel, seperti yang ditunjukkan dalam contoh dari tutorial. Untuk mengetahui detail tentang pemicu dan panel, lihat Pemicu di panduan pemrograman Beam.

Penskalaan otomatis

Konektor mendukung Penskalaan otomatis dataflow, yang diaktifkan secara default saat menggunakan Runner v2 (wajib). Algoritma penskalaan otomatis Dataflow memperhitungkan estimasi backlog aliran data perubahan, yang dapat dipantau di halaman Pemantauan Dataflow di bagian Backlog. Gunakan flag --maxNumWorkers saat men-deploy tugas untuk membatasi jumlah pekerja.

Untuk menskalakan pipeline secara manual, bukan menggunakan penskalaan otomatis, lihat Menskalakan pipeline streaming secara manual.

Batasan

Perhatikan batasan berikut sebelum menggunakan konektor Bigtable Beam dengan Dataflow.

Runner Dataflow V2

Konektor hanya dapat dijalankan menggunakan Dataflow Runner v2. Untuk mengaktifkannya, tentukan --experiments=use_runner_v2 dalam argumen command line Anda. Menjalankan dengan Runner v1 menyebabkan pipeline Anda gagal dengan pengecualian berikut:

java.lang.UnsupportedOperationException: BundleFinalizer unsupported by non-portable Dataflow

Snapshot

Konektor tidak mendukung snapshot Dataflow.

Sebelum memulai

Sebelum Anda menggunakan konektor, lengkapi prasyarat berikut.

Menyiapkan autentikasi

Untuk menggunakan contoh Java di halaman ini dari lingkungan pengembangan lokal, instal dan lakukan inisialisasi gcloud CLI, lalu siapkan Kredensial Default Aplikasi dengan kredensial pengguna Anda.

  1. Menginstal Google Cloud CLI.
  2. Untuk initialize gcloud CLI, jalankan perintah berikut:

    gcloud init
  3. Buat kredensial autentikasi lokal untuk Akun Google Anda:

    gcloud auth application-default login

Untuk informasi selengkapnya, lihat Siapkan autentikasi untuk lingkungan pengembangan lokal.

Untuk informasi tentang cara menyiapkan autentikasi bagi lingkungan produksi, lihat Siapkan Kredensial Default Aplikasi untuk kode yang berjalan di Google Cloud.

Mengaktifkan aliran data perubahan

Anda harus mengaktifkan aliran data perubahan pada tabel sebelum dapat membacanya. Anda juga dapat membuat tabel baru dengan aliran data perubahan diaktifkan.

Peran yang diperlukan

Untuk mendapatkan izin yang diperlukan untuk membaca aliran data perubahan Bigtable menggunakan Dataflow, minta administrator untuk memberi Anda peran IAM berikut.

Untuk membaca perubahan dari Bigtable, Anda memerlukan peran ini:

  • Administrator Bigtable (roles/bigtable.admin) pada instance Bigtable yang berisi tabel yang direncanakan untuk mengalirkan perubahan

Untuk menjalankan tugas Dataflow, Anda memerlukan peran berikut:

Untuk mengetahui informasi lebih lanjut tentang cara memberikan peran, lihat Mengelola akses.

Anda mungkin juga bisa mendapatkan izin yang diperlukan melalui peran khusus atau peran bawaan lainnya.

Menambahkan konektor Bigtable Beam sebagai dependensi

Tambahkan kode yang mirip dengan dependensi berikut ke file pom.xml Maven Anda. Versi harus 2.48.0 atau yang lebih baru.

<dependencies>
  <dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
    <version>VERSION</version>
  </dependency>
</dependencies>

Membaca aliran data perubahan

Untuk mem-build pipeline Dataflow guna membaca catatan perubahan data, Anda harus mengonfigurasi konektor, lalu menambahkan transformasi dan sink. Kemudian, gunakan konektor untuk membaca objek ChangeStreamMutation di pipeline Beam.

Contoh kode di bagian ini, yang ditulis dalam Java, menunjukkan cara mem-build pipeline dan menggunakannya untuk mengonversi key-value pair menjadi string. Setiap pasangan terdiri dari kunci baris dan objek ChangeStreamMutation. Pipeline mengonversi setiap entri objek menjadi string yang dipisahkan koma.

Membangun pipeline

Contoh kode Java ini menunjukkan cara membangun pipeline:

BigtableOptions options =
    PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
Pipeline p = Pipeline.create(options);

final Instant startTime = Instant.now();

p.apply(
        "Read Change Stream",
        BigtableIO.readChangeStream()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .withAppProfileId(options.getBigtableAppProfile())
            .withStartTime(startTime))
    .apply(
        "Flatten Mutation Entries",
        FlatMapElements.into(TypeDescriptors.strings())
            .via(ChangeStreamsHelloWorld::mutationEntriesToString))
    .apply(
        "Print mutations",
        ParDo.of(
            new DoFn<String, Void>() { // a DoFn as an anonymous inner class instance
              @ProcessElement
              public void processElement(@Element String mutation) {
                System.out.println("Change captured: " + mutation);
              }
            }));
p.run();

Memproses {i>record<i} perubahan data

Contoh ini menunjukkan cara melakukan loop semua entri dalam kumpulan data perubahan data untuk satu baris dan memanggil metode konversi ke string berdasarkan jenis entri.

Untuk daftar jenis entri yang dapat dimuat oleh catatan perubahan data, lihat Apa yang ada dalam catatan perubahan data.

static List<String> mutationEntriesToString(KV<ByteString, ChangeStreamMutation> mutationPair) {
  List<String> mutations = new ArrayList<>();
  String rowKey = mutationPair.getKey().toStringUtf8();
  ChangeStreamMutation mutation = mutationPair.getValue();
  MutationType mutationType = mutation.getType();
  for (Entry entry : mutation.getEntries()) {
    if (entry instanceof SetCell) {
      mutations.add(setCellToString(rowKey, mutationType, (SetCell) entry));
    } else if (entry instanceof DeleteCells) {
      mutations.add(deleteCellsToString(rowKey, mutationType, (DeleteCells) entry));
    } else if (entry instanceof DeleteFamily) {
      // Note: DeleteRow mutations are mapped into one DeleteFamily per-family
      mutations.add(deleteFamilyToString(rowKey, mutationType, (DeleteFamily) entry));
    } else {
      throw new RuntimeException("Entry type not supported.");
    }
  }
  return mutations;
}

Dalam contoh ini, entri write dikonversi:

private static String setCellToString(String rowKey, MutationType mutationType, SetCell setCell) {
  List<String> mutationParts =
      Arrays.asList(
          rowKey,
          mutationType.name(),
          "SetCell",
          setCell.getFamilyName(),
          setCell.getQualifier().toStringUtf8(),
          setCell.getValue().toStringUtf8());
  return String.join(",", mutationParts);
}

Dalam sampel ini, entri penghapusan sel dikonversi:

private static String deleteCellsToString(
    String rowKey, MutationType mutationType, DeleteCells deleteCells) {
  String timestampRange =
      deleteCells.getTimestampRange().getStart() + "-" + deleteCells.getTimestampRange().getEnd();
  List<String> mutationParts =
      Arrays.asList(
          rowKey,
          mutationType.name(),
          "DeleteCells",
          deleteCells.getFamilyName(),
          deleteCells.getQualifier().toStringUtf8(),
          timestampRange);
  return String.join(",", mutationParts);
}

Dalam contoh ini, entri penghapusan kelompok kolom akan dikonversi:


private static String deleteFamilyToString(
    String rowKey, MutationType mutationType, DeleteFamily deleteFamily) {
  List<String> mutationParts =
      Arrays.asList(rowKey, mutationType.name(), "DeleteFamily", deleteFamily.getFamilyName());
  return String.join(",", mutationParts);
}

Memantau

Resource di konsol Google Cloud berikut dapat digunakan untuk memantau resource Google Cloud saat Anda menjalankan pipeline Dataflow untuk membaca aliran perubahan Bigtable:

Secara khusus, periksa metrik berikut:

  • Di halaman Monitoring Bigtable, periksa metrics berikut:
    • Data Penggunaan CPU oleh aliran data perubahan dalam metrik cpu_load_by_app_profile_by_method_by_table. Menampilkan dampak aliran perubahan terhadap penggunaan CPU cluster Anda.
    • Mengubah penggunaan penyimpanan stream (byte) (change_stream_log_used_bytes).
  • Di halaman pemantauan Dataflow, periksa keaktualan data, yang menunjukkan perbedaan antara waktu saat ini dan watermark. Durasinya sekitar dua menit, dengan sesekali lonjakan yang terjadi satu atau dua menit lebih lama. Jika metrik keaktualan data secara konsisten lebih tinggi dari batas tersebut, pipeline Anda mungkin kekurangan resource dan Anda harus menambahkan lebih banyak pekerja Dataflow.

Langkah selanjutnya