Melakukan streaming perubahan dengan Dataflow

Konektor Bigtable Beam memungkinkan Anda menggunakan Dataflow untuk membaca data Bigtable yang mengubah data tanpa perlu melacak atau memproses perubahan partisi dalam kode Anda, karena konektor menangani logika tersebut untuk Anda.

Dokumen ini menjelaskan cara mengonfigurasi dan menggunakan konektor Bigtable Beam untuk membaca aliran perubahan menggunakan pipeline Dataflow. Sebelum membaca dokumen ini, Anda harus membaca Ringkasan aliran perubahan dan memahami Dataflow.

Alternatif selain membuat pipeline Anda sendiri

Jika tidak ingin membuat pipeline Dataflow Anda sendiri, Anda dapat menggunakan salah satu opsi berikut.

Anda dapat menggunakan template Dataflow yang disediakan Google.

Anda juga dapat menggunakan contoh kode dari tutorial atau panduan memulai Bigtable sebagai titik awal untuk kode Anda.

Pastikan kode yang Anda buat menggunakan google cloud libraries-bom versi 26.14.0 atau yang lebih baru.

Detail konektor

Metode konektor Bigtable Beam, BigtableIO.readChangeStream, memungkinkan Anda membaca aliran data perubahan data (ChangeStreamMutation) yang dapat Anda proses. Konektor Bigtable Beam adalah komponen dari repositori GitHub Apache Beam. Untuk deskripsi kode konektor, lihat komentar di BigtableIO.java.

Anda harus menggunakan konektor dengan Beam versi 2.48.0 atau yang lebih baru. Periksa dukungan runtime Apache Beam untuk memastikan bahwa Anda menggunakan versi Java yang didukung. Kemudian, Anda dapat men-deploy pipeline yang menggunakan konektor ke Dataflow, yang menangani penyediaan dan pengelolaan resource serta membantu skalabilitas dan keandalan pemrosesan data streaming.

Untuk informasi selengkapnya tentang model pemrograman Apache Beam, lihat dokumentasi Beam.

Mengelompokkan data tanpa waktu peristiwa

Data perubahan yang di-streaming menggunakan konektor Bigtable Beam tidak kompatibel dengan fungsi Dataflow yang bergantung pada waktu peristiwa.

Seperti yang dijelaskan dalam Replikasi dan watermark, watermark rendah mungkin tidak maju jika replikasi untuk partisi belum menyamai instance lainnya. Jika stempel waktu rendah berhenti maju, hal ini dapat menyebabkan aliran perubahan terhenti.

Untuk mencegah streaming terhenti, konektor Bigtable Beam menghasilkan semua data dengan stempel waktu output nol. Stempel waktu nol membuat Dataflow menganggap semua data perubahan sebagai data terlambat. Akibatnya, fitur Dataflow yang bergantung pada waktu peristiwa tidak kompatibel dengan aliran data perubahan Bigtable. Secara khusus, Anda tidak dapat menggunakan fungsi periode, pemicu waktu peristiwa, atau timer waktu peristiwa.

Sebagai gantinya, Anda dapat menggunakan GlobalWindows dengan pemicu waktu non-peristiwa untuk mengelompokkan data terlambat ini ke dalam panel, seperti yang ditunjukkan dalam contoh dari tutorial. Untuk mengetahui detail tentang pemicu dan panel, lihat Pemicu dalam panduan pemrograman Beam.

Penskalaan otomatis

Konektor ini mendukung Penskalaan otomatis Dataflow, yang diaktifkan secara default saat menggunakan Runner v2 (wajib). Algoritma penskalaan otomatis Dataflow memperhitungkan estimasi backlog aliran perubahan, yang dapat dipantau di halaman Pemantauan Dataflow di bagian Backlog. Gunakan flag --maxNumWorkers saat men-deploy tugas untuk membatasi jumlah pekerja.

Untuk menskalakan pipeline secara manual, bukan menggunakan penskalaan otomatis, lihat Menskalakan pipeline streaming secara manual.

Batasan

Perhatikan batasan berikut sebelum menggunakan konektor Bigtable Beam dengan Dataflow.

Dataflow Runner V2

Konektor hanya dapat dijalankan menggunakan Runner Dataflow v2. Untuk mengaktifkannya, tentukan --experiments=use_runner_v2 dalam argumen command line Anda. Menjalankan dengan Runner v1 menyebabkan pipeline Anda gagal dengan pengecualian berikut:

java.lang.UnsupportedOperationException: BundleFinalizer unsupported by non-portable Dataflow

Snapshot

Konektor tidak mendukung Snapshot aliran data.

Duplikat

Konektor Bigtable Beam melakukan streaming perubahan untuk setiap kunci baris dan setiap cluster dalam urutan stempel waktu commit, tetapi karena terkadang dimulai ulang dari waktu sebelumnya dalam streaming, konektor ini dapat menghasilkan duplikat.

Sebelum memulai

Sebelum menggunakan konektor, selesaikan prasyarat berikut.

Menyiapkan autentikasi

Untuk menggunakan contoh Java di halaman ini dalam lingkungan pengembangan lokal, instal dan lakukan inisialisasi gcloud CLI, lalu siapkan Kredensial Default Aplikasi dengan kredensial pengguna Anda.

  1. Install the Google Cloud CLI.
  2. To initialize the gcloud CLI, run the following command:

    gcloud init
  3. If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

Untuk informasi selengkapnya, lihat Set up authentication for a local development environment.

Untuk informasi tentang cara menyiapkan autentikasi bagi lingkungan produksi, lihat Set up Application Default Credentials for code running on Google Cloud.

Mengaktifkan aliran perubahan

Anda harus mengaktifkan aliran perubahan di tabel sebelum dapat membacanya. Anda juga dapat membuat tabel baru dengan mengaktifkan aliran perubahan.

Mengubah tabel metadata aliran data

Saat Anda melakukan streaming perubahan dengan Dataflow, konektor Bigtable Beam akan membuat tabel metadata yang diberi nama __change_stream_md_table secara default. Tabel metadata aliran perubahan mengelola status operasional konektor dan menyimpan metadata tentang data perubahan.

Secara default, konektor membuat tabel dalam instance yang sama dengan tabel yang di-streaming. Untuk memastikan tabel berfungsi dengan benar, profil aplikasi untuk tabel metadata harus menggunakan perutean cluster tunggal dan mengaktifkan transaksi baris tunggal.

Untuk informasi selengkapnya tentang streaming perubahan dari Bigtable dengan konektor Bigtable Beam, lihat dokumentasi BigtableIO.

Peran yang diperlukan

Untuk mendapatkan izin yang diperlukan guna membaca aliran perubahan Bigtable menggunakan Dataflow, minta administrator untuk memberi Anda peran IAM berikut.

Untuk membaca perubahan dari Bigtable, Anda memerlukan peran ini:

  • Bigtable Administrator (roles/bigtable.admin) di instance Bigtable yang berisi tabel yang akan Anda streaming perubahannya

Untuk menjalankan tugas Dataflow, Anda memerlukan peran berikut:

Untuk mengetahui informasi selengkapnya tentang cara memberikan peran, lihat Mengelola akses.

Anda mungkin juga bisa mendapatkan izin yang diperlukan melalui peran khusus atau peran bawaan lainnya.

Menambahkan konektor Bigtable Beam sebagai dependensi

Tambahkan kode yang mirip dengan dependensi berikut ke file pom.xml Maven Anda. Versi harus 2.48.0 atau yang lebih baru.

<dependencies>
  <dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
    <version>VERSION</version>
  </dependency>
</dependencies>

Membaca aliran data perubahan

Untuk mem-build pipeline Dataflow guna membaca data perubahan data, Anda mengonfigurasi konektor, lalu menambahkan transformasi dan sink. Kemudian, Anda menggunakan konektor untuk membaca objek ChangeStreamMutation di pipeline Beam.

Contoh kode di bagian ini, yang ditulis dalam Java, menunjukkan cara mem-build pipeline dan menggunakannya untuk mengonversi pasangan nilai kunci menjadi string. Setiap pasangan terdiri dari kunci baris dan objek ChangeStreamMutation. Pipeline mengonversi setiap entri objek menjadi string yang dipisahkan koma.

Mem-build pipeline

Contoh kode Java ini menunjukkan cara mem-build pipeline:

BigtableOptions options =
    PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
Pipeline p = Pipeline.create(options);

final Instant startTime = Instant.now();

p.apply(
        "Read Change Stream",
        BigtableIO.readChangeStream()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .withAppProfileId(options.getBigtableAppProfile())
            .withStartTime(startTime))
    .apply(
        "Flatten Mutation Entries",
        FlatMapElements.into(TypeDescriptors.strings())
            .via(ChangeStreamsHelloWorld::mutationEntriesToString))
    .apply(
        "Print mutations",
        ParDo.of(
            new DoFn<String, Void>() { // a DoFn as an anonymous inner class instance
              @ProcessElement
              public void processElement(@Element String mutation) {
                System.out.println("Change captured: " + mutation);
              }
            }));
p.run();

Memproses data perubahan data

Contoh ini menunjukkan cara melakukan loop melalui semua entri dalam data perubahan data untuk baris dan memanggil metode konversi-ke-string berdasarkan jenis entri.

Untuk daftar jenis entri yang dapat dimuat dalam data change record, lihat Isi data change record.

static List<String> mutationEntriesToString(KV<ByteString, ChangeStreamMutation> mutationPair) {
  List<String> mutations = new ArrayList<>();
  String rowKey = mutationPair.getKey().toStringUtf8();
  ChangeStreamMutation mutation = mutationPair.getValue();
  MutationType mutationType = mutation.getType();
  for (Entry entry : mutation.getEntries()) {
    if (entry instanceof SetCell) {
      mutations.add(setCellToString(rowKey, mutationType, (SetCell) entry));
    } else if (entry instanceof DeleteCells) {
      mutations.add(deleteCellsToString(rowKey, mutationType, (DeleteCells) entry));
    } else if (entry instanceof DeleteFamily) {
      // Note: DeleteRow mutations are mapped into one DeleteFamily per-family
      mutations.add(deleteFamilyToString(rowKey, mutationType, (DeleteFamily) entry));
    } else {
      throw new RuntimeException("Entry type not supported.");
    }
  }
  return mutations;
}

Dalam contoh ini, entri write dikonversi:

private static String setCellToString(String rowKey, MutationType mutationType, SetCell setCell) {
  List<String> mutationParts =
      Arrays.asList(
          rowKey,
          mutationType.name(),
          "SetCell",
          setCell.getFamilyName(),
          setCell.getQualifier().toStringUtf8(),
          setCell.getValue().toStringUtf8());
  return String.join(",", mutationParts);
}

Dalam contoh ini, entri penghapusan sel dikonversi:

private static String deleteCellsToString(
    String rowKey, MutationType mutationType, DeleteCells deleteCells) {
  String timestampRange =
      deleteCells.getTimestampRange().getStart() + "-" + deleteCells.getTimestampRange().getEnd();
  List<String> mutationParts =
      Arrays.asList(
          rowKey,
          mutationType.name(),
          "DeleteCells",
          deleteCells.getFamilyName(),
          deleteCells.getQualifier().toStringUtf8(),
          timestampRange);
  return String.join(",", mutationParts);
}

Dalam contoh ini, entri penghapusan keluarga kolom dikonversi:


private static String deleteFamilyToString(
    String rowKey, MutationType mutationType, DeleteFamily deleteFamily) {
  List<String> mutationParts =
      Arrays.asList(rowKey, mutationType.name(), "DeleteFamily", deleteFamily.getFamilyName());
  return String.join(",", mutationParts);
}

Memantau

Resource berikut di konsol Google Cloud memungkinkan Anda memantau resource Google Cloud saat menjalankan pipeline Dataflow untuk membaca aliran perubahan Bigtable:

Secara khusus, periksa metrik berikut:

  • Di halaman Monitoring Bigtable, periksa metrik berikut:
    • Data Pemakaian CPU menurut aliran perubahan dalam metrik cpu_load_by_app_profile_by_method_by_table. Menampilkan dampak stream perubahan pada penggunaan CPU cluster Anda.
    • Perubahan penggunaan penyimpanan aliran data (byte) (change_stream_log_used_bytes).
  • Di halaman pemantauan Dataflow, periksa keaktualan data, yang menunjukkan perbedaan antara waktu saat ini dan watermark. Durasinya sekitar dua menit, dengan lonjakan sesekali yang satu atau dua menit lebih lama. Jika metrik keaktualan data secara konsisten lebih tinggi dari nilai minimum tersebut, pipeline Anda mungkin kekurangan resource dan Anda harus menambahkan lebih banyak pekerja Dataflow. Keaktualan data tidak menunjukkan apakah data perubahan diproses secara lambat.
  • Metrik processing_delay_from_commit_timestamp_MEAN Dataflow dapat memberi tahu Anda waktu pemrosesan rata-rata data perubahan data selama masa aktif tugas.

Metrik server/latencies Bigtable tidak berguna saat Anda memantau pipeline Dataflow yang membaca streaming perubahan Bigtable, karena metrik ini mencerminkan durasi permintaan streaming, bukan latensi pemrosesan data perubahan. Latensi tinggi dalam aliran perubahan tidak berarti permintaan diproses secara lambat; artinya, koneksi terbuka selama itu.

Langkah selanjutnya