Mengalirkan perubahan dengan Dataflow

Konektor Bigtable Beam memungkinkan Anda menggunakan Dataflow untuk membaca rekaman perubahan data Bigtable tanpa perlu melacak atau memproses perubahan partisi dalam kode Anda, karena konektor menangani logika tersebut untuk Anda.

Dokumen ini menjelaskan cara mengonfigurasi dan menggunakan konektor Bigtable Beam untuk membaca aliran perubahan menggunakan pipeline Dataflow. Sebelum membaca dokumen ini, Anda harus membaca Ringkasan aliran perubahan dan memahami Dataflow.

Alternatif untuk membangun pipeline Anda sendiri

Jika tidak ingin membuat pipeline Dataflow sendiri, Anda dapat menggunakan salah satu opsi berikut.

Anda dapat menggunakan template Dataflow yang disediakan Google.

Anda juga dapat menggunakan contoh kode dari tutorial atau panduan memulai Bigtable sebagai titik awal untuk kode Anda.

Pastikan kode yang Anda buat menggunakan google cloud libraries-bom versi 26.14.0 atau yang lebih baru.

Detail konektor

Metode konektor Bigtable Beam, BigtableIO.readChangeStream, memungkinkan Anda membaca aliran data perubahan data (ChangeStreamMutation) yang dapat Anda proses. Konektor Bigtable Beam adalah komponen repositori GitHub Apache Beam. Untuk mengetahui deskripsi kode konektor, lihat komentar di BigtableIO.java.

Anda harus menggunakan konektor dengan Beam versi 2.48.0 atau yang lebih baru. Periksa dukungan runtime Apache Beam untuk memastikan bahwa Anda menggunakan Java versi yang didukung. Kemudian, Anda dapat men-deploy pipeline yang menggunakan konektor ke Dataflow, yang menangani penyediaan dan pengelolaan resource serta membantu skalabilitas dan keandalan pemrosesan data streaming.

Untuk mengetahui informasi selengkapnya tentang model pemrograman Apache Beam, lihat dokumentasi Beam.

Mengelompokkan data tanpa waktu acara

Catatan perubahan data yang di-streaming menggunakan konektor Bigtable Beam tidak kompatibel dengan fungsi Dataflow yang bergantung pada waktu peristiwa.

Seperti yang dijelaskan dalam Replikasi dan tanda air, tanda air rendah mungkin tidak maju jika replikasi untuk partisi belum menyusul ke bagian instance lainnya. Jika watermark rendah berhenti bergerak, hal ini dapat menyebabkan aliran perubahan terhenti.

Untuk mencegah aliran data terhenti, konektor Bigtable Beam menampilkan semua data dengan stempel waktu output nol. Stempel waktu nol membuat Dataflow menganggap semua catatan perubahan data sebagai data terlambat. Akibatnya, fitur Dataflow yang bergantung pada waktu peristiwa tidak kompatibel dengan aliran data perubahan Bigtable. Secara khusus, Anda tidak dapat menggunakan fungsi windowing, pemicu waktu peristiwa, atau timer waktu peristiwa.

Sebagai gantinya, Anda dapat menggunakan GlobalWindows dengan pemicu waktu non-peristiwa untuk mengelompokkan data yang terlambat ini ke dalam panel, seperti yang ditunjukkan dalam contoh dari tutorial. Untuk mengetahui detail tentang pemicu dan panel, lihat Pemicu dalam panduan pemrograman Beam.

Penskalaan otomatis

Konektor mendukung penskalaan otomatis Dataflow, yang diaktifkan secara default saat menggunakan Runner v2 (wajib). Algoritma penskalaan otomatis Dataflow mempertimbangkan perkiraan backlog aliran perubahan, yang dapat dipantau di halaman Pemantauan Dataflow di bagian Backlog. Gunakan flag --maxNumWorkers saat men-deploy tugas untuk membatasi jumlah pekerja.

Untuk menskalakan pipeline secara manual, bukan menggunakan penskalaan otomatis, lihat Menskalakan pipeline streaming secara manual.

Batasan

Perhatikan batasan berikut sebelum menggunakan konektor Bigtable Beam dengan Dataflow.

Dataflow Runner V2

Konektor hanya dapat dieksekusi menggunakan Dataflow Runner v2. Untuk mengaktifkannya, tentukan --experiments=use_runner_v2 di argumen command line Anda. Menjalankan dengan Runner v1 menyebabkan pipeline Anda gagal dengan pengecualian berikut:

java.lang.UnsupportedOperationException: BundleFinalizer unsupported by non-portable Dataflow

Snapshot

Konektor tidak mendukung snapshot Dataflow.

Duplikat

Konektor Bigtable Beam melakukan streaming perubahan untuk setiap kunci baris dan setiap cluster dalam urutan stempel waktu commit, tetapi karena terkadang dimulai ulang dari waktu yang lebih awal dalam streaming, konektor ini dapat menghasilkan duplikat.

Mulai ulang pipeline

Jika pipeline Dataflow telah berhenti dalam waktu yang lama, catatan perubahan data dapat tertinggal di luar batas retensi. Saat pipeline dilanjutkan, Bigtable akan membuat pipeline gagal sehingga Anda dapat memulai pipeline baru dengan waktu mulai permintaan baru yang berada dalam periode retensi. Bigtable melakukan hal ini, bukan secara diam-diam memajukan waktu permintaan pipeline asli, untuk mencegah penghapusan yang tidak disengaja pada rekaman perubahan data dengan stempel waktu yang berada di luar periode retensi yang ditentukan.

Sebelum memulai

Sebelum Anda menggunakan konektor, selesaikan prasyarat berikut.

Menyiapkan autentikasi

Untuk menggunakan contoh Java di halaman ini dalam lingkungan pengembangan lokal, instal dan lakukan inisialisasi gcloud CLI, lalu siapkan Kredensial Default Aplikasi dengan kredensial pengguna Anda.

    Menginstal Google Cloud CLI.

    Jika Anda menggunakan penyedia identitas (IdP) eksternal, Anda harus login ke gcloud CLI dengan identitas gabungan Anda terlebih dahulu.

    If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

Untuk informasi selengkapnya, lihat Set up authentication for a local development environment.

Untuk informasi tentang cara menyiapkan autentikasi bagi lingkungan produksi, lihat Set up Application Default Credentials for code running on Google Cloud.

Mengaktifkan aliran perubahan

Anda harus mengaktifkan aliran perubahan pada tabel sebelum Anda dapat membacanya. Anda juga dapat membuat tabel baru dengan aliran perubahan diaktifkan.

Tabel metadata aliran perubahan

Saat Anda melakukan streaming perubahan dengan Dataflow, konektor Bigtable Beam akan membuat tabel metadata yang diberi nama __change_stream_md_table secara default. Tabel metadata aliran perubahan mengelola status operasional konektor dan menyimpan metadata tentang data yang berubah mencatat.

Secara default, konektor membuat tabel di instance yang sama dengan tabel yang di-streaming. Untuk memastikan tabel berfungsi dengan benar, profil aplikasi untuk tabel metadata harus menggunakan perutean cluster tunggal dan mengaktifkan transaksi baris tunggal.

Untuk mengetahui informasi selengkapnya tentang streaming perubahan dari Bigtable dengan konektor Bigtable Beam, lihat dokumentasi BigtableIO.

Peran yang diperlukan

Untuk mendapatkan izin yang diperlukan untuk membaca aliran perubahan Bigtable menggunakan Dataflow, minta administrator Anda untuk memberi Anda peran IAM berikut.

Untuk membaca perubahan dari Bigtable, Anda memerlukan peran ini:

  • Administrator Bigtable (roles/bigtable.admin) di instance Bigtable yang berisi tabel yang ingin Anda streaming perubahannya

Untuk menjalankan tugas Dataflow, Anda memerlukan peran berikut:

Untuk mengetahui informasi selengkapnya tentang cara memberikan peran, lihat Mengelola akses.

Anda mungkin juga bisa mendapatkan izin yang diperlukan melalui peran khusus atau peran bawaan lainnya.

Menambahkan konektor Bigtable Beam sebagai dependensi

Tambahkan kode yang mirip dengan dependensi berikut ke file pom.xml Maven Anda. Versinya harus 2.48.0 atau yang lebih baru.

<dependencies>
  <dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
    <version>VERSION</version>
  </dependency>
</dependencies>

Membaca aliran perubahan

Untuk membuat pipeline Dataflow guna membaca record perubahan data, Anda mengonfigurasi konektor, lalu menambahkan transformasi dan sink. Kemudian, Anda menggunakan konektor untuk membaca objek ChangeStreamMutation dalam pipeline Beam.

Contoh kode di bagian ini, yang ditulis dalam Java, menunjukkan cara membuat pipeline dan menggunakannya untuk mengonversi pasangan nilai kunci menjadi string. Setiap pasangan terdiri dari kunci baris dan objek ChangeStreamMutation. Pipeline mengonversi setiap entri objek menjadi string yang dipisahkan koma.

Membangun pipeline

Contoh kode Java ini menunjukkan cara membuat pipeline:

BigtableOptions options =
    PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
Pipeline p = Pipeline.create(options);

final Instant startTime = Instant.now();

p.apply(
        "Read Change Stream",
        BigtableIO.readChangeStream()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .withAppProfileId(options.getBigtableAppProfile())
            .withStartTime(startTime))
    .apply(
        "Flatten Mutation Entries",
        FlatMapElements.into(TypeDescriptors.strings())
            .via(ChangeStreamsHelloWorld::mutationEntriesToString))
    .apply(
        "Print mutations",
        ParDo.of(
            new DoFn<String, Void>() { // a DoFn as an anonymous inner class instance
              @ProcessElement
              public void processElement(@Element String mutation) {
                System.out.println("Change captured: " + mutation);
              }
            }));
p.run();

Memproses catatan perubahan data

Contoh ini menunjukkan cara melakukan loop pada semua entri dalam rekaman perubahan data untuk baris dan memanggil metode convert-to-string berdasarkan jenis entri.

Untuk mengetahui daftar jenis entri yang dapat dimuat dalam rekaman perubahan data, lihat Isi rekaman perubahan data.

static List<String> mutationEntriesToString(KV<ByteString, ChangeStreamMutation> mutationPair) {
  List<String> mutations = new ArrayList<>();
  String rowKey = mutationPair.getKey().toStringUtf8();
  ChangeStreamMutation mutation = mutationPair.getValue();
  MutationType mutationType = mutation.getType();
  for (Entry entry : mutation.getEntries()) {
    if (entry instanceof SetCell) {
      mutations.add(setCellToString(rowKey, mutationType, (SetCell) entry));
    } else if (entry instanceof DeleteCells) {
      mutations.add(deleteCellsToString(rowKey, mutationType, (DeleteCells) entry));
    } else if (entry instanceof DeleteFamily) {
      // Note: DeleteRow mutations are mapped into one DeleteFamily per-family
      mutations.add(deleteFamilyToString(rowKey, mutationType, (DeleteFamily) entry));
    } else {
      throw new RuntimeException("Entry type not supported.");
    }
  }
  return mutations;
}

Dalam sampel ini, entri write dikonversi:

private static String setCellToString(String rowKey, MutationType mutationType, SetCell setCell) {
  List<String> mutationParts =
      Arrays.asList(
          rowKey,
          mutationType.name(),
          "SetCell",
          setCell.getFamilyName(),
          setCell.getQualifier().toStringUtf8(),
          setCell.getValue().toStringUtf8());
  return String.join(",", mutationParts);
}

Dalam contoh ini, entri penghapusan sel dikonversi:

private static String deleteCellsToString(
    String rowKey, MutationType mutationType, DeleteCells deleteCells) {
  String timestampRange =
      deleteCells.getTimestampRange().getStart() + "-" + deleteCells.getTimestampRange().getEnd();
  List<String> mutationParts =
      Arrays.asList(
          rowKey,
          mutationType.name(),
          "DeleteCells",
          deleteCells.getFamilyName(),
          deleteCells.getQualifier().toStringUtf8(),
          timestampRange);
  return String.join(",", mutationParts);
}

Dalam contoh ini, entri penghapusan family kolom dikonversi:


private static String deleteFamilyToString(
    String rowKey, MutationType mutationType, DeleteFamily deleteFamily) {
  List<String> mutationParts =
      Arrays.asList(rowKey, mutationType.name(), "DeleteFamily", deleteFamily.getFamilyName());
  return String.join(",", mutationParts);
}

Memantau

Resource berikut di konsol Google Cloud memungkinkan Anda memantau resourceGoogle Cloud saat menjalankan pipeline Dataflow untuk membaca aliran data perubahan Bigtable:

Secara khusus, periksa metrik berikut:

  • Di halaman insight sistem Bigtable, periksa metrik berikut:
    • Data penggunaan CPU menurut aliran perubahan dalam metrik cpu_load_by_app_profile_by_method_by_table. Menampilkan dampak aliran perubahan pada penggunaan CPU cluster Anda.
    • Penggunaan penyimpanan aliran perubahan (byte) (change_stream_log_used_bytes).
  • Di halaman pemantauan Dataflow, periksa keaktualan data. Metrik ini menunjukkan perbedaan antara waktu saat ini dan tanda air, yaitu sekitar dua menit, dengan lonjakan sesekali yang lebih lama satu atau dua menit. Keaktualan data tidak menunjukkan apakah catatan perubahan data diproses secara lambat. Untuk memastikan kondisi dan performa aplikasi penting Anda tetap baik, pantau metrik keaktualan data Dataflow dan lakukan tindakan berikut:

    • Jika metrik keaktualan data secara konsisten lebih tinggi daripada nilai minimum, pipeline Anda mungkin kekurangan sumber daya. Sebaiknya tambahkan lebih banyak pekerja Dataflow.
    • Jika pekerja Dataflow disediakan dengan baik, tetapi keaktualan data meningkat atau selalu tinggi, hubungi Google Cloud Dukungan.
  • Metrik processing_delay_from_commit_timestamp_MEAN Dataflow dapat memberi tahu Anda waktu pemrosesan rata-rata rekaman perubahan data selama masa aktif tugas.

Metrik server/latencies Bigtable tidak berguna saat Anda memantau pipeline Dataflow yang membaca aliran perubahan Bigtable, karena metrik ini mencerminkan durasi permintaan streaming, bukan latensi pemrosesan rekaman perubahan data. Latensi tinggi dalam aliran perubahan tidak berarti permintaan diproses secara lambat; tetapi berarti koneksi terbuka selama itu.

Langkah berikutnya