Membangun koneksi aliran perubahan menggunakan Dataflow

Halaman ini menunjukkan cara membuat pipeline Dataflow yang menggunakan dan meneruskan data perubahan Spanner menggunakan aliran data perubahan. Anda dapat menggunakan contoh kode di halaman ini untuk membuat pipeline kustom.

Konsep inti

Berikut adalah beberapa konsep inti untuk pipeline Dataflow untuk stream perubahan.

Dataflow

Dataflow adalah layanan serverless, cepat, dan hemat biaya yang mendukung pemrosesan streaming dan batch. Dataflow menyediakan portabilitas dengan pemrosesan tugas yang ditulis menggunakan library Apache Beam open source dan mengotomatiskan penyediaan infrastruktur serta pengelolaan cluster. Dataflow menyediakan streaming mendekati real-time saat membaca dari aliran data perubahan.

Anda dapat menggunakan Dataflow untuk menggunakan aliran perubahan Spanner dengan konektor SpannerIO, yang menawarkan abstraksi di atas Spanner API untuk membuat kueri aliran perubahan. Dengan konektor ini, Anda tidak perlu mengelola siklus proses partisi aliran perubahan, yang diperlukan saat Anda menggunakan Spanner API secara langsung. Konektor memberi Anda aliran data perubahan sehingga Anda dapat lebih berfokus pada logika aplikasi, dan lebih sedikit pada detail API tertentu dan partisi aliran perubahan dinamis. Sebaiknya gunakan konektor SpannerIO, bukan Spanner API, dalam sebagian besar situasi saat Anda perlu membaca data aliran perubahan.

Template Dataflow adalah pipeline Dataflow bawaan yang menerapkan kasus penggunaan umum. Lihat Template Dataflow untuk mengetahui ringkasannya.

Pipeline Dataflow

Pipeline Dataflow aliran perubahan Spanner terdiri dari empat bagian utama:

  1. Database Spanner dengan aliran perubahan
  2. Konektor SpannerIO
  3. Transformasi dan sink yang ditentukan pengguna
  4. Penulis I/O sink Apache Beam

gambar

Aliran data perubahan Spanner

Untuk mengetahui detail tentang cara membuat aliran perubahan, lihat Membuat aliran perubahan.

Konektor Apache Beam SpannerIO

Ini adalah konektor SpannerIO yang dijelaskan di bagian Dataflow sebelumnya. Ini adalah konektor I/O sumber yang memunculkan PCollection data perubahan data ke tahap selanjutnya dari pipeline. Waktu peristiwa untuk setiap data perubahan yang dikeluarkan akan menjadi stempel waktu commit. Perhatikan bahwa data yang dikeluarkan adalah tidak diurutkan, dan bahwa konektor SpannerIO menjamin tidak akan ada data terlambat.

Saat menangani aliran perubahan, Dataflow menggunakan pembuatan checkpoint. Akibatnya, setiap pekerja mungkin menunggu hingga interval pemeriksaan yang dikonfigurasi untuk buffering perubahan sebelum mengirim perubahan untuk pemrosesan lebih lanjut.

Transformasi yang ditentukan pengguna

Transformasi yang ditentukan pengguna memungkinkan pengguna menggabungkan, mengubah, atau memodifikasi data pemrosesan dalam pipeline Dataflow. Kasus penggunaan umum untuk hal ini adalah penghapusan informasi identitas pribadi, memenuhi persyaratan format data downstream, dan pengurutan. Lihat dokumentasi Apache Beam resmi untuk panduan pemrograman tentang transformasi.

Penulis I/O sink Apache Beam

Apache Beam berisi konektor I/O bawaan yang dapat digunakan untuk menulis dari pipeline Dataflow ke sink data seperti BigQuery. Sebagian besar sink data umum didukung secara native.

Template Dataflow

Template Dataflow menyediakan metode untuk membuat tugas Dataflow berdasarkan image Docker bawaan untuk kasus penggunaan umum menggunakan konsol Google Cloud, Google Cloud CLI, atau panggilan Rest API.

Untuk aliran perubahan Spanner, kami menyediakan tiga template fleksibel Dataflow:

Menetapkan Izin IAM untuk template Dataflow

Sebelum membuat tugas Dataflow dengan tiga template fleksibel yang tercantum, pastikan Anda memiliki izin IAM yang diperlukan untuk akun layanan berikut:

Jika tidak memiliki izin IAM yang diperlukan, Anda harus menentukan akun layanan pekerja yang dikelola pengguna untuk membuat tugas Dataflow. Untuk mengetahui informasi selengkapnya, lihat Keamanan dan izin Dataflow.

Saat Anda mencoba menjalankan tugas dari template fleksibel Dataflow tanpa semua izin yang diperlukan, tugas Anda mungkin gagal dengan error gagal membaca file hasil atau izin ditolak pada error resource. Untuk mengetahui informasi selengkapnya, lihat Memecahkan Masalah Template Flex.

Membangun pipeline Dataflow

Bagian ini membahas konfigurasi awal konektor, dan memberikan contoh untuk integrasi umum dengan fitur streaming perubahan Spanner.

Untuk mengikuti langkah-langkah ini, Anda memerlukan lingkungan pengembangan Java untuk Dataflow. Untuk mengetahui informasi selengkapnya, lihat Membuat pipeline Dataflow menggunakan Java.

Membuat aliran perubahan

Untuk mengetahui detail tentang cara membuat aliran perubahan, lihat Membuat aliran perubahan. Untuk melanjutkan ke langkah berikutnya, Anda harus memiliki database Spanner dengan aliran perubahan yang dikonfigurasi.

Memberikan hak istimewa kontrol akses yang sangat terperinci

Jika Anda ingin pengguna kontrol akses terperinci menjalankan tugas Dataflow, pastikan pengguna diberi akses ke peran database yang memiliki hak istimewa SELECT di aliran perubahan dan hak istimewa EXECUTE di fungsi nilai tabel aliran perubahan. Pastikan juga bahwa prinsipal menentukan peran database dalam konfigurasi SpannerIO atau dalam template fleksibel Dataflow.

Untuk informasi selengkapnya, lihat Tentang kontrol akses terperinci.

Menambahkan konektor SpannerIO sebagai dependensi

Konektor Apache Beam SpannerIO mengenkapsulasi kompleksitas penggunaan change stream secara langsung menggunakan Cloud Spanner API, yang memunculkan PCollection data change stream ke tahap pipeline berikutnya.

Objek ini dapat digunakan di tahap lain pada pipeline Dataflow pengguna. Integrasi aliran data perubahan adalah bagian dari konektor SpannerIO. Agar dapat menggunakan konektor SpannerIO, dependensi harus ditambahkan ke file pom.xml:

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
  <version>${beam-version}</version> <!-- available from version 2.38.0 -->
</dependency>

Membuat database metadata

Konektor perlu melacak setiap partisi saat menjalankan pipeline Apache Beam. Konektor ini menyimpan metadata ini dalam tabel Spanner yang dibuat oleh konektor selama inisialisasi. Anda menentukan database tempat tabel ini akan dibuat saat mengonfigurasi konektor.

Seperti yang dijelaskan dalam Praktik terbaik aliran perubahan, sebaiknya buat database baru untuk tujuan ini, bukan mengizinkan konektor menggunakan database aplikasi Anda untuk menyimpan tabel metadatanya.

Pemilik tugas Dataflow yang menggunakan konektor SpannerIO harus menetapkan izin IAM berikut dengan database metadata ini:

  • spanner.databases.updateDdl
  • spanner.databases.beginReadOnlyTransaction
  • spanner.databases.beginOrRollbackReadWriteTransaction
  • spanner.databases.read
  • spanner.databases.select
  • spanner.databases.write
  • spanner.sessions.create
  • spanner.sessions.get

Mengonfigurasi konektor

Konektor aliran data perubahan Spanner dapat dikonfigurasi sebagai berikut:

SpannerConfig spannerConfig = SpannerConfig
  .create()
  .withProjectId("my-project-id")
  .withInstanceId("my-instance-id")
  .withDatabaseId("my-database-id")
  .withDatabaseRole("my-database-role");    // Needed for fine-grained access control only

Timestamp startTime = Timestamp.now();
Timestamp endTime = Timestamp.ofTimeSecondsAndNanos(
   startTime.getSeconds() + (10 * 60),
   startTime.getNanos()
);

SpannerIO
  .readChangeStream()
  .withSpannerConfig(spannerConfig)
  .withChangeStreamName("my-change-stream")
  .withMetadataInstance("my-meta-instance-id")
  .withMetadataDatabase("my-meta-database-id")
  .withMetadataTable("my-meta-table-name")
  .withRpcPriority(RpcPriority.MEDIUM)
  .withInclusiveStartAt(startTime)
  .withInclusiveEndAt(endTime);

Berikut adalah deskripsi opsi readChangeStream():

Konfigurasi Spanner (Wajib)

Digunakan untuk mengonfigurasi project, instance, dan database tempat aliran perubahan dibuat dan harus dikueri. Secara opsional, menentukan peran database yang akan digunakan saat akun utama IAM yang menjalankan tugas Dataflow adalah pengguna kontrol akses terperinci. Tugas ini mengasumsikan peran database ini untuk akses ke aliran perubahan. Untuk mengetahui informasi selengkapnya, lihat Tentang kontrol akses terperinci.

Nama aliran data perubahan (Wajib diisi)

Nama ini secara unik mengidentifikasi aliran perubahan. Nama di sini harus sama dengan nama yang digunakan saat membuatnya.

ID instance metadata (Opsional)

Ini adalah instance untuk menyimpan metadata yang digunakan oleh konektor untuk mengontrol penggunaan data API aliran perubahan.

ID database metadata (Wajib)

Ini adalah database untuk menyimpan metadata yang digunakan oleh konektor untuk mengontrol penggunaan data API aliran perubahan.

Nama tabel metadata (Opsional)

Ini hanya boleh digunakan saat memperbarui pipeline yang ada.

Ini adalah nama tabel metadata yang sudah ada dan akan digunakan oleh konektor. Ini digunakan oleh konektor untuk menyimpan metadata guna mengontrol penggunaan data API aliran perubahan. Jika opsi ini dihilangkan, Spanner akan membuat tabel baru dengan nama yang dihasilkan pada inisialisasi konektor.

Prioritas RPC (Opsional)

Prioritas permintaan yang akan digunakan untuk kueri aliran perubahan. Jika parameter ini dihilangkan, high priority akan digunakan.

InclusiveStartAt (Wajib)

Perubahan dari stempel waktu yang diberikan akan ditampilkan kepada pemanggil.

InclusiveEndAt (Opsional)

Perubahan hingga stempel waktu yang diberikan akan ditampilkan kepada pemanggil. Jika parameter ini dihilangkan, perubahan akan dikeluarkan tanpa batas.

Menambahkan transformasi dan sink untuk memproses data perubahan

Setelah langkah-langkah sebelumnya selesai, konektor SpannerIO yang dikonfigurasi siap memunculkan PCollection objek DataChangeRecord. Lihat Contoh transformasi dan sink untuk beberapa contoh konfigurasi pipeline yang memproses data yang di-streaming ini dengan berbagai cara.

Perhatikan bahwa data aliran perubahan yang dikeluarkan oleh konektor SpannerIO tidak diurutkan. Hal ini karena PCollection tidak memberikan jaminan pengurutan apa pun. Jika memerlukan streaming yang diurutkan, Anda harus mengelompokkan dan mengurutkan data sebagai transformasi dalam pipeline: lihat Contoh: Mengurutkan menurut kunci. Anda dapat memperluas contoh ini untuk mengurutkan data berdasarkan kolom data apa pun, seperti berdasarkan ID transaksi.

Contoh transformasi dan sink

Anda dapat menentukan transformasi sendiri dan menentukan sink untuk menulis data. Dokumentasi Apache Beam menyediakan berbagai transformasi yang dapat diterapkan, serta konektor I/O yang siap digunakan untuk menulis data ke sistem eksternal.

Contoh: Mengurutkan berdasarkan kunci

Contoh kode ini memunculkan kumpulan data perubahan yang diurutkan berdasarkan stempel waktu commit dan dikelompokkan menurut kunci utama menggunakan konektor Dataflow.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new BreakRecordByModFn()))
  .apply(ParDo.of(new KeyByIdFn()))
  .apply(ParDo.of(new BufferKeyUntilOutputTimestamp()))
  // Subsequent processing goes here

Contoh kode ini menggunakan status dan timer untuk melakukan buffering data untuk setiap kunci, dan menetapkan waktu habis masa berlaku timer ke beberapa waktu yang dikonfigurasi pengguna T di masa mendatang (ditentukan dalam fungsi BufferKeyUntilOutputTimestamp). Saat watermark Dataflow melewati waktu T, kode ini akan menghapus semua data dalam buffering dengan stempel waktu kurang dari T, mengurutkan data ini berdasarkan stempel waktu commit, dan menghasilkan pasangan nilai kunci dengan:

  • Kunci adalah kunci input, yaitu kunci utama yang di-hash ke array bucket berukuran 1.000.
  • Nilainya adalah data perubahan data yang diurutkan yang di-buffer untuk kunci.

Untuk setiap kunci, kami memiliki jaminan berikut:

  • Timer dijamin akan diaktifkan sesuai urutan stempel waktu habis masa berlaku.
  • Tahap downstream dijamin akan menerima elemen dalam urutan yang sama seperti saat elemen tersebut diproduksi.

Misalnya, dengan kunci bernilai 100, timer akan diaktifkan pada T1 dan T10, yang menghasilkan kumpulan data perubahan data pada setiap stempel waktu. Karena data perubahan yang ditampilkan di T1 dibuat sebelum data perubahan yang ditampilkan di T10, data perubahan yang ditampilkan di T1 juga dijamin akan diterima oleh tahap berikutnya sebelum data perubahan yang ditampilkan di T10. Mekanisme ini membantu kami menjamin pengurutan stempel waktu commit yang ketat per kunci utama untuk pemrosesan downstream.

Proses ini akan diulang hingga pipeline berakhir dan semua data perubahan data telah diproses (atau akan diulang tanpa batas waktu jika tidak ada waktu berakhir yang ditentukan).

Perhatikan bahwa contoh kode ini menggunakan status dan timer, bukan jendela, untuk melakukan pengurutan per kunci. Alasannya adalah jendela tidak dijamin diproses secara berurutan. Artinya, jendela lama dapat diproses lebih lambat daripada jendela yang lebih baru, yang dapat menyebabkan pemrosesan tidak berurutan.

BreakRecordByModFn

Setiap kumpulan data perubahan dapat berisi beberapa mod. Setiap mod mewakili penyisipan, pembaruan, atau penghapusan ke satu nilai kunci utama. Fungsi ini membagi setiap data perubahan menjadi data perubahan terpisah, satu per mod.

private static class BreakRecordByModFn extends DoFn<DataChangeRecord,
                                                     DataChangeRecord>  {
  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record, OutputReceiver<DataChangeRecord>
    outputReceiver) {
    record.getMods().stream()
      .map(
          mod ->
              new DataChangeRecord(
                  record.getPartitionToken(),
                  record.getCommitTimestamp(),
                  record.getServerTransactionId(),
                  record.isLastRecordInTransactionInPartition(),
                  record.getRecordSequence(),
                  record.getTableName(),
                  record.getRowType(),
                  Collections.singletonList(mod),
                  record.getModType(),
                  record.getValueCaptureType(),
                  record.getNumberOfRecordsInTransaction(),
                  record.getNumberOfPartitionsInTransaction(),
                  record.getTransactionTag(),
                  record.isSystemTransaction(),
                  record.getMetadata()))
      .forEach(outputReceiver::output);
  }
}

KeyByIdFn

Fungsi ini menggunakan DataChangeRecord dan menghasilkan DataChangeRecord yang diberi kunci oleh kunci utama Spanner yang di-hash ke nilai bilangan bulat.

private static class KeyByIdFn extends DoFn<DataChangeRecord, KV<String, DataChangeRecord>>  {
  // NUMBER_OF_BUCKETS should be configured by the user to match their key cardinality
  // Here, we are choosing to hash the Spanner primary keys to a bucket index, in order to have a deterministic number
  // of states and timers for performance purposes.
  // Note that having too many buckets might have undesirable effects if it results in a low number of records per bucket
  // On the other hand, having too few buckets might also be problematic, since many keys will be contained within them.
  private static final int NUMBER_OF_BUCKETS = 1000;

  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record,
      OutputReceiver<KV<String, DataChangeRecord>> outputReceiver) {
    int hashCode = (int) record.getMods().get(0).getKeysJson().hashCode();
    // Hash the received keys into a bucket in order to have a
    // deterministic number of buffers and timers.
    String bucketIndex = String.valueOf(hashCode % NUMBER_OF_BUCKETS);

    outputReceiver.output(KV.of(bucketIndex, record));
  }
}

BufferKeyUntilOutputTimestamp

Timer dan buffering bersifat per kunci. Fungsi ini menyimpan setiap kumpulan data perubahan dalam buffering hingga watermark meneruskan stempel waktu saat kita ingin menampilkan kumpulan data perubahan yang di-buffer.

Kode ini menggunakan timer berulang untuk menentukan kapan harus menghapus buffer:

  1. Saat melihat data perubahan data untuk kunci untuk pertama kalinya, timer akan disetel untuk diaktifkan pada stempel waktu commit data perubahan data + incrementIntervalSeconds (opsi yang dapat dikonfigurasi pengguna).
  2. Saat diaktifkan, timer akan menambahkan semua data perubahan data dalam buffering dengan stempel waktu kurang dari waktu habis masa berlaku timer ke recordsToOutput. Jika buffer memiliki data perubahan data yang stempel waktunya lebih besar dari atau sama dengan waktu habis masa berlaku timer, buffer akan menambahkan data perubahan data tersebut kembali ke buffer, bukan menampilkannya. Kemudian, timer berikutnya ditetapkan ke waktu habis masa berlaku timer saat ini ditambah incrementIntervalInSeconds.
  3. Jika recordsToOutput tidak kosong, fungsi ini akan mengurutkan data perubahan data di recordsToOutput berdasarkan stempel waktu commit dan ID transaksi, lalu menampilkannya.
private static class BufferKeyUntilOutputTimestamp extends
    DoFn<KV<String, DataChangeRecord>, KV<String, Iterable<DataChangeRecord>>>  {
  private static final Logger LOG =
      LoggerFactory.getLogger(BufferKeyUntilOutputTimestamp.class);

  private final long incrementIntervalInSeconds = 2;

  private BufferKeyUntilOutputTimestamp(long incrementIntervalInSeconds) {
    this.incrementIntervalInSeconds = incrementIntervalInSeconds;
  }

  @SuppressWarnings("unused")
  @TimerId("timer")
  private final TimerSpec timerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);

  @StateId("buffer")
  private final StateSpec<BagState<DataChangeRecord>> buffer = StateSpecs.bag();

  @StateId("keyString")
  private final StateSpec<ValueState<String>> keyString =
      StateSpecs.value(StringUtf8Coder.of());

  @ProcessElement
  public void process(
      @Element KV<String, DataChangeRecord> element,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @TimerId("timer") Timer timer,
      @StateId("keyString") ValueState<String> keyString) {
    buffer.add(element.getValue());

    // Only set the timer if this is the first time we are receiving a data change
    // record with this key.
    String elementKey = keyString.read();
    if (elementKey == null) {
      Instant commitTimestamp =
          new Instant(element.getValue().getCommitTimestamp().toSqlTimestamp());
      Instant outputTimestamp =
          commitTimestamp.plus(Duration.standardSeconds(incrementIntervalInSeconds));
      timer.set(outputTimestamp);
      keyString.write(element.getKey());
    }
  }

  @OnTimer("timer")
  public void onExpiry(
      OnTimerContext context,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @TimerId("timer") Timer timer,
      @StateId("keyString") ValueState<String> keyString) {
    if (!buffer.isEmpty().read()) {
      String elementKey = keyString.read();

      final List<DataChangeRecord> records =
          StreamSupport.stream(buffer.read().spliterator(), false)
              .collect(Collectors.toList());
      buffer.clear();

      List<DataChangeRecord> recordsToOutput = new ArrayList<>();
      for (DataChangeRecord record : records) {
        Instant recordCommitTimestamp =
            new Instant(record.getCommitTimestamp().toSqlTimestamp());
        final String recordString =
            record.getMods().get(0).getNewValuesJson().isEmpty()
                ? "Deleted record"
                : record.getMods().get(0).getNewValuesJson();
        // When the watermark passes time T, this means that all records with
        // event time < T have been processed and successfully committed. Since the
        // timer fires when the watermark passes the expiration time, we should
        // only output records with event time < expiration time.
        if (recordCommitTimestamp.isBefore(context.timestamp())) {
          LOG.info(
             "Outputting record with key {} and value {} at expiration " +
             "timestamp {}",
              elementKey,
              recordString,
              context.timestamp().toString());
          recordsToOutput.add(record);
        } else {
          LOG.info(
              "Expired at {} but adding record with key {} and value {} back to " +
              "buffer due to commit timestamp {}",
              context.timestamp().toString(),
              elementKey,
              recordString,
              recordCommitTimestamp.toString());
          buffer.add(record);
        }
      }

      // Output records, if there are any to output.
      if (!recordsToOutput.isEmpty()) {
        // Order the records in place, and output them. The user would need
        // to implement DataChangeRecordComparator class that sorts the
        // data change records by commit timestamp and transaction ID.
        Collections.sort(recordsToOutput, new DataChangeRecordComparator());
        context.outputWithTimestamp(
            KV.of(elementKey, recordsToOutput), context.timestamp());
        LOG.info(
            "Expired at {}, outputting records for key {}",
            context.timestamp().toString(),
            elementKey);
      } else {
        LOG.info("Expired at {} with no records", context.timestamp().toString());
      }
    }

    Instant nextTimer = context.timestamp().plus(Duration.standardSeconds(incrementIntervalInSeconds));
    if (buffer.isEmpty() != null && !buffer.isEmpty().read()) {
      LOG.info("Setting next timer to {}", nextTimer.toString());
      timer.set(nextTimer);
    } else {
      LOG.info(
          "Timer not being set since the buffer is empty: ");
      keyString.clear();
    }
  }
}

Mengurutkan transaksi

Pipeline ini dapat diubah untuk diurutkan berdasarkan ID transaksi dan stempel waktu commit. Untuk melakukannya, buffer data untuk setiap pasangan ID transaksi / stempel waktu commit, bukan untuk setiap kunci Spanner. Hal ini memerlukan modifikasi kode di KeyByIdFn.

Contoh: Menggabungkan transaksi

Contoh kode ini membaca data perubahan data, menggabungkan semua data perubahan data yang termasuk dalam transaksi yang sama menjadi satu elemen, dan menampilkan elemen tersebut. Perhatikan bahwa transaksi yang dihasilkan oleh kode contoh ini tidak diurutkan berdasarkan stempel waktu commit.

Contoh kode ini menggunakan buffer untuk menyusun transaksi dari data perubahan data. Setelah menerima data perubahan data yang termasuk dalam transaksi untuk pertama kalinya, operasi ini akan membaca kolom numberOfRecordsInTransaction dalam data perubahan data, yang menjelaskan jumlah data perubahan data yang diharapkan yang termasuk dalam transaksi tersebut. Fungsi ini menyimpan data perubahan data yang termasuk dalam transaksi tersebut ke dalam buffer hingga jumlah data yang di-buffer cocok dengan numberOfRecordsInTransaction, lalu menampilkan data perubahan data yang dipaketkan.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new KeyByTransactionIdFn()))
  .apply(ParDo.of(new TransactionBoundaryFn()))
  // Subsequent processing goes here

KeyByTransactionIdFn

Fungsi ini menggunakan DataChangeRecord dan menghasilkan DataChangeRecord yang diberi kunci oleh ID transaksi.

private static class KeyByTransactionIdFn extends DoFn<DataChangeRecord, KV<String, DataChangeRecord>>  {
  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record,
      OutputReceiver<KV<String, DataChangeRecord>> outputReceiver) {
    outputReceiver.output(KV.of(record.getServerTransactionId(), record));
  }
}

TransactionBoundaryFn

TransactionBoundaryFn buffering menerima key-value pair {TransactionId, DataChangeRecord} dari KeyByTransactionIdFn dan melakukan buffering dalam grup berdasarkan TransactionId. Jika jumlah data yang di-buffer sama dengan jumlah data yang terdapat dalam seluruh transaksi, fungsi ini akan mengurutkan objek DataChangeRecord dalam grup berdasarkan urutan data dan menghasilkan pasangan nilai kunci {CommitTimestamp, TransactionId}, Iterable<DataChangeRecord>.

Di sini, kita mengasumsikan bahwa SortKey adalah class yang ditentukan pengguna yang mewakili pasangan {CommitTimestamp, TransactionId}. Untuk informasi selengkapnya tentang SortKey, lihat contoh implementasi.

private static class TransactionBoundaryFn extends DoFn<KV<String, DataChangeRecord>, KV<SortKey, Iterable<DataChangeRecord>>>  {
  @StateId("buffer")
  private final StateSpec<BagState<DataChangeRecord>> buffer = StateSpecs.bag();

  @StateId("count")
  private final StateSpec<ValueState<Integer>> countState = StateSpecs.value();

  @ProcessElement
  public void process(
      ProcessContext context,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @StateId("count") ValueState<Integer> countState) {
    final KV<String, DataChangeRecord> element = context.element();
    final DataChangeRecord record = element.getValue();

    buffer.add(record);
    int count = (countState.read() != null ? countState.read() : 0);
    count = count + 1;
    countState.write(count);

    if (count == record.getNumberOfRecordsInTransaction()) {
      final List<DataChangeRecord> sortedRecords =
          StreamSupport.stream(buffer.read().spliterator(), false)
              .sorted(Comparator.comparing(DataChangeRecord::getRecordSequence))
              .collect(Collectors.toList());

      final Instant commitInstant =
          new Instant(sortedRecords.get(0).getCommitTimestamp().toSqlTimestamp()
              .getTime());
      context.outputWithTimestamp(
          KV.of(
              new SortKey(sortedRecords.get(0).getCommitTimestamp(),
                          sortedRecords.get(0).getServerTransactionId()),
              sortedRecords),
          commitInstant);
      buffer.clear();
      countState.clear();
    }
  }
}

Contoh: Memfilter menurut tag transaksi

Saat transaksi yang mengubah data pengguna diberi tag, tag yang sesuai dan jenisnya akan disimpan sebagai bagian dari DataChangeRecord. Contoh ini menunjukkan cara memfilter data aliran perubahan berdasarkan tag transaksi yang ditentukan pengguna serta tag sistem:

Pemfilteran tag yang ditentukan pengguna untuk my-tx-tag:

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(Filter.by(record ->
           !record.isSystemTransaction()
           && record.getTransactionTag().equalsIgnoreCase("my-tx-tag")))
  // Subsequent processing goes here

Pemfilteran tag sistem/audit TTL:

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(Filter.by(record ->
           record.isSystemTransaction()
           && record.getTransactionTag().equals("RowDeletionPolicy")))
  // Subsequent processing goes here

Contoh: Mengambil baris lengkap

Contoh ini berfungsi dengan tabel Spanner bernama Singer yang memiliki definisi berikut:

CREATE TABLE Singers (
  SingerId INT64 NOT NULL,
  FirstName STRING(1024),
  LastName STRING(1024)
) PRIMARY KEY (SingerId);

Pada mode pengambilan nilai OLD_AND_NEW_VALUES default dari aliran perubahan, saat ada pembaruan pada baris Spanner, data perubahan yang diterima hanya akan berisi kolom yang diubah. Kolom yang dilacak tetapi tidak berubah tidak akan disertakan dalam data. Kunci utama modi dapat digunakan untuk melakukan pembacaan snapshot Spanner pada stempel waktu commit data perubahan untuk mengambil kolom yang tidak diubah atau bahkan mengambil baris lengkap.

Perhatikan bahwa kebijakan retensi database mungkin perlu diubah ke nilai yang lebih besar atau sama dengan kebijakan retensi aliran perubahan agar pembacaan snapshot berhasil.

Perhatikan juga bahwa menggunakan jenis pengambilan nilai NEW_ROW adalah cara yang direkomendasikan dan lebih efisien untuk melakukannya, karena menampilkan semua kolom baris yang dilacak secara default dan tidak memerlukan snapshot tambahan yang dibaca ke dalam Spanner.

SpannerConfig spannerConfig = SpannerConfig
   .create()
   .withProjectId("my-project-id")
   .withInstanceId("my-instance-id")
   .withDatabaseId("my-database-id")
   .withDatabaseRole("my-database-role");   // Needed for fine-grained access control only

pipeline
   .apply(SpannerIO
       .readChangeStream()
       .withSpannerConfig(spannerConfig)
       // Assume we have a change stream "my-change-stream" that watches Singers table.
       .withChangeStreamName("my-change-stream")
       .withMetadataInstance("my-metadata-instance-id")
       .withMetadataDatabase("my-metadata-database-id")
       .withInclusiveStartAt(Timestamp.now()))
   .apply(ParDo.of(new ToFullRowJsonFn(spannerConfig)))
   // Subsequent processing goes here

ToFullRowJsonFn

Transformasi ini akan melakukan pembacaan yang sudah tidak berlaku pada stempel waktu commit setiap data yang diterima, dan memetakan baris lengkap ke JSON.

public class ToFullRowJsonFn extends DoFn<DataChangeRecord, String> {
 // Since each instance of this DoFn will create its own session pool and will
 // perform calls to Spanner sequentially, we keep the number of sessions in
 // the pool small. This way, we avoid wasting resources.
 private static final int MIN_SESSIONS = 1;
 private static final int MAX_SESSIONS = 5;
 private final String projectId;
 private final String instanceId;
 private final String databaseId;

 private transient DatabaseClient client;
 private transient Spanner spanner;

 public ToFullRowJsonFn(SpannerConfig spannerConfig) {
   this.projectId = spannerConfig.getProjectId().get();
   this.instanceId = spannerConfig.getInstanceId().get();
   this.databaseId = spannerConfig.getDatabaseId().get();
 }

 @Setup
 public void setup() {
   SessionPoolOptions sessionPoolOptions = SessionPoolOptions
      .newBuilder()
      .setMinSessions(MIN_SESSIONS)
      .setMaxSessions(MAX_SESSIONS)
      .build();
   SpannerOptions options = SpannerOptions
       .newBuilder()
       .setProjectId(projectId)
       .setSessionPoolOption(sessionPoolOptions)
       .build();
   DatabaseId id = DatabaseId.of(projectId, instanceId, databaseId);
   spanner = options.getService();
   client = spanner.getDatabaseClient(id);
 }

 @Teardown
 public void teardown() {
   spanner.close();
 }

 @ProcessElement
 public void process(
   @Element DataChangeRecord element,
   OutputReceiver<String> output) {
   com.google.cloud.Timestamp commitTimestamp = element.getCommitTimestamp();
   element.getMods().forEach(mod -> {
     JSONObject keysJson = new JSONObject(mod.getKeysJson());
     JSONObject newValuesJson = new JSONObject(mod.getNewValuesJson());
     ModType modType = element.getModType();
     JSONObject jsonRow = new JSONObject();
     long singerId = keysJson.getLong("SingerId");
     jsonRow.put("SingerId", singerId);
     if (modType == ModType.INSERT) {
       // For INSERT mod, get non-primary key columns from mod.
       jsonRow.put("FirstName", newValuesJson.get("FirstName"));
       jsonRow.put("LastName", newValuesJson.get("LastName"));
     } else if (modType == ModType.UPDATE) {
       // For UPDATE mod, get non-primary key columns by doing a snapshot read using the primary key column from mod.
       try (ResultSet resultSet = client
         .singleUse(TimestampBound.ofReadTimestamp(commitTimestamp))
         .read(
           "Singers",
           KeySet.singleKey(com.google.cloud.spanner.Key.of(singerId)),
             Arrays.asList("FirstName", "LastName"))) {
         if (resultSet.next()) {
           jsonRow.put("FirstName", resultSet.isNull("FirstName") ?
             JSONObject.NULL : resultSet.getString("FirstName"));
           jsonRow.put("LastName", resultSet.isNull("LastName") ?
             JSONObject.NULL : resultSet.getString("LastName"));
         }
       }
     } else {
       // For DELETE mod, there is nothing to do, as we already set SingerId.
     }

     output.output(jsonRow.toString());
   });
 }
}

Kode ini membuat klien database Spanner untuk melakukan pengambilan baris penuh, dan mengonfigurasi kumpulan sesi agar hanya memiliki beberapa sesi, yang melakukan pembacaan dalam satu instance ToFullReowJsonFn secara berurutan. Dataflow memastikan untuk membuat banyak instance fungsi ini, masing-masing dengan kumpulan kliennya sendiri.

Contoh: Spanner ke Pub/Sub

Dalam skenario ini, pemanggil melakukan streaming data ke Pub/Sub secepat mungkin, tanpa pengelompokan atau agregasi. Hal ini cocok untuk memicu pemrosesan downstream, seperti streaming semua baris baru yang disisipkan ke dalam tabel Spanner ke Pub/Sub untuk pemrosesan lebih lanjut.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(MapElements.into(TypeDescriptors.strings()).via(Object::toString))
  .apply(PubsubIO.writeStrings().to("my-topic"));

Perhatikan bahwa sink Pub/Sub dapat dikonfigurasi untuk memastikan semantik tepat satu kali.

Contoh: Spanner ke Cloud Storage

Dalam skenario ini, pemanggil mengelompokkan semua kumpulan data dalam periode tertentu dan menyimpan grup dalam file Cloud Storage terpisah. Hal ini cocok untuk analisis dan pengarsipan titik waktu, yang tidak bergantung pada periode retensi Spanner.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(MapElements.into(TypeDescriptors.strings()).via(Object::toString))
  .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
  .apply(TextIO
    .write()
    .to("gs://my-bucket/change-stream-results-")
    .withSuffix(".txt")
    .withWindowedWrites()
    .withNumShards(1));

Perhatikan bahwa sink Cloud Storage menyediakan semantik setidaknya satu kali secara default. Dengan pemrosesan tambahan, semantik ini dapat diubah agar memiliki semantik tepat satu kali.

Kami juga menyediakan template Dataflow untuk kasus penggunaan ini: lihat Menghubungkan aliran perubahan ke Cloud Storage.

Contoh: Spanner ke BigQuery (tabel ledger)

Di sini, pemanggil melakukan streaming data perubahan ke BigQuery. Setiap data perubahan data ditampilkan sebagai satu baris di BigQuery. Hal ini cocok untuk analisis. Kode ini menggunakan fungsi yang ditentukan sebelumnya, di bagian Mengambil baris lengkap, untuk mengambil baris lengkap data dan menulisnya ke BigQuery.

SpannerConfig spannerConfig = SpannerConfig
  .create()
  .withProjectId("my-project-id")
  .withInstanceId("my-instance-id")
  .withDatabaseId("my-database-id")
  .withDatabaseRole("my-database-role");   // Needed for fine-grained access control only

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(spannerConfig)
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new ToFullRowJsonFn(spannerConfig)))
  .apply(BigQueryIO
    .<String>write()
    .to("my-bigquery-table")
    .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
    .withWriteDisposition(Write.WriteDisposition.WRITE_APPEND)
    .withSchema(new TableSchema().setFields(Arrays.asList(
      new TableFieldSchema()
        .setName("SingerId")
        .setType("INT64")
        .setMode("REQUIRED"),
      new TableFieldSchema()
        .setName("FirstName")
        .setType("STRING")
        .setMode("REQUIRED"),
      new TableFieldSchema()
        .setName("LastName")
        .setType("STRING")
        .setMode("REQUIRED")
    )))
    .withAutoSharding()
    .optimizedWrites()
    .withFormatFunction((String element) -> {
      ObjectMapper objectMapper = new ObjectMapper();
      JsonNode jsonNode = null;
      try {
        jsonNode = objectMapper.readTree(element);
      } catch (IOException e) {
        e.printStackTrace();
      }
      return new TableRow()
        .set("SingerId", jsonNode.get("SingerId").asInt())
        .set("FirstName", jsonNode.get("FirstName").asText())
        .set("LastName", jsonNode.get("LastName").asText());
    }
  )
);

Perhatikan bahwa sink BigQuery menyediakan semantik setidaknya satu kali secara default. Dengan pemrosesan tambahan, semantik ini dapat diubah agar memiliki semantik tepat satu kali.

Kami juga menyediakan template Dataflow untuk kasus penggunaan ini; lihat Menghubungkan aliran perubahan ke BigQuery.

Memantau pipeline

Ada dua class metrik yang tersedia untuk memantau pipeline Dataflow aliran perubahan.

Metrik Dataflow standar

Dataflow menyediakan beberapa metrik untuk memastikan tugas Anda berjalan dengan baik, seperti keaktualan data, kelambatan sistem, throughput tugas, penggunaan CPU pekerja, dan lainnya. Anda dapat menemukan informasi selengkapnya di Menggunakan Monitoring untuk pipeline Dataflow.

Untuk pipeline aliran perubahan, ada dua metrik utama yang harus diperhitungkan: latensi sistem dan keaktualan data.

Latensi sistem akan memberi tahu Anda durasi maksimum saat ini (dalam detik) saat item data diproses atau menunggu pemrosesan.

Keaktualan data akan menunjukkan jumlah waktu antara sekarang (real time) dan watermark output. Stempel waktu output waktu T menunjukkan bahwa semua elemen dengan waktu peristiwa (secara ketat) sebelum T telah diproses untuk komputasi. Dengan kata lain, keaktualan data mengukur seberapa baru pipeline, sehubungan dengan pemrosesan peristiwa yang telah diterimanya.

Jika pipeline kekurangan resource, Anda dapat melihat efeknya dalam kedua metrik ini. Latensi sistem akan meningkat, karena item harus menunggu lebih lama sebelum diproses. Keaktualan data juga akan meningkat, karena pipeline tidak akan dapat mengimbangi jumlah data yang diterima.

Metrik aliran perubahan kustom

Metrik ini ditampilkan di Cloud Monitoring dan mencakup:

  • Latensi bucket (histogram) antara data yang di-commit di Spanner hingga data tersebut dikeluarkan ke PCollection oleh konektor. Metrik ini dapat digunakan untuk melihat masalah performa (latensi) pada pipeline.
  • Jumlah total data yang dibaca. Ini adalah indikasi keseluruhan jumlah data yang dikeluarkan oleh konektor. Jumlah ini akan terus meningkat, yang mencerminkan tren penulisan di database Spanner yang mendasarinya.
  • Jumlah partisi yang sedang dibaca. Harus selalu ada partisi yang dibaca. Jika angka ini nol, berarti terjadi error dalam pipeline.
  • Total jumlah kueri yang dikeluarkan selama eksekusi konektor. Ini adalah indikasi keseluruhan kueri aliran perubahan yang dibuat ke instance Spanner selama eksekusi pipeline. Hal ini dapat digunakan untuk mendapatkan estimasi beban dari konektor ke database Spanner.

Memperbarui pipeline yang ada

Anda dapat mengupdate pipeline yang sedang berjalan yang menggunakan konektor SpannerIO untuk memproses aliran perubahan jika pemeriksaan kompatibilitas tugas lulus. Untuk melakukannya, Anda harus menetapkan parameter nama tabel metadata tugas baru secara eksplisit saat memperbaruinya. Gunakan nilai opsi pipeline metadataTable dari tugas yang Anda perbarui.

Jika Anda menggunakan template Dataflow yang disediakan Google, tetapkan nama tabel menggunakan parameter spannerMetadataTableName. Anda juga dapat mengubah tugas yang ada untuk menggunakan tabel metadata secara eksplisit dengan metode withMetadataTable(your-metadata-table-name) dalam konfigurasi konektor. Setelah selesai, Anda dapat mengikuti petunjuk di Meluncurkan tugas pengganti dari dokumentasi Dataflow untuk memperbarui tugas yang sedang berjalan.

Praktik terbaik untuk aliran perubahan dan Dataflow

Berikut adalah beberapa praktik terbaik untuk membuat koneksi aliran perubahan menggunakan Dataflow.

Menggunakan database metadata terpisah

Sebaiknya buat database terpisah untuk konektor SpannerIO yang akan digunakan untuk penyimpanan metadata, bukan mengonfigurasinya untuk menggunakan database aplikasi Anda.

Untuk mengetahui informasi selengkapnya, lihat Mempertimbangkan database metadata terpisah.

Menentukan ukuran cluster

Pedoman umum untuk jumlah awal pekerja dalam tugas aliran perubahan Spanner adalah satu pekerja per 1.000 operasi tulis per detik. Perhatikan bahwa estimasi ini dapat bervariasi bergantung pada beberapa faktor, seperti ukuran setiap transaksi, jumlah data aliran perubahan yang dihasilkan dari satu transaksi dan transformasi, agregasi, atau sink lainnya yang digunakan dalam pipeline.

Setelah penyediaan resource awal, penting untuk melacak metrik yang disebutkan di Memantau pipeline, untuk memastikan pipeline dalam kondisi baik. Sebaiknya lakukan eksperimen dengan ukuran kumpulan pekerja awal dan pantau cara pipeline Anda menangani beban, dengan meningkatkan jumlah node jika perlu. Penggunaan CPU adalah metrik utama untuk memeriksa apakah beban sudah tepat dan apakah diperlukan lebih banyak node.

Batasan umum

Ada beberapa batasan umum saat menggunakan aliran data perubahan Spanner dengan Dataflow:

Penskalaan otomatis

Dukungan penskalaan otomatis untuk pipeline apa pun yang menyertakan SpannerIO.readChangeStream memerlukan Apache Beam 2.39.0 atau yang lebih tinggi.

Jika Anda menggunakan versi Apache Beam sebelum 2.39.0, pipeline yang menyertakan SpannerIO.readChangeStream harus menentukan algoritma penskalaan otomatis secara eksplisit sebagai NONE, seperti yang dijelaskan dalam Penskalaan otomatis horizontal.

Untuk menskalakan pipeline Dataflow secara manual, bukan menggunakan penskalaan otomatis, lihat Menskalakan pipeline streaming secara manual.

Runner V2

Konektor aliran perubahan Spanner memerlukan Dataflow Runner V2. Hal ini harus ditentukan secara manual selama eksekusi atau error akan ditampilkan. Anda dapat menentukan Runner V2 dengan mengonfigurasi tugas dengan --experiments=use_unified_worker,use_runner_v2.

Snapshot

Konektor aliran perubahan Spanner tidak mendukung Snapshot Dataflow.

Menyelesaikan

Konektor aliran perubahan Spanner tidak mendukung menguras tugas. Anda hanya dapat membatalkan tugas yang ada.

Anda juga dapat memperbarui pipeline yang ada tanpa perlu menghentikannya.

OpenCensus

Untuk menggunakan OpenCensus untuk memantau pipeline, tentukan versi 0.28.3 atau yang lebih baru.

NullPointerException saat pipeline dimulai

Bug di Apache Beam versi 2.38.0 dapat menyebabkan NullPointerException saat memulai pipeline dalam kondisi tertentu. Hal ini akan mencegah tugas Anda dimulai, dan menampilkan pesan error ini:

java.lang.NullPointerException: null value in entry: Cloud Storage_PROJECT_ID=null

Untuk mengatasi masalah ini, gunakan Apache Beam versi 2.39.0 atau yang lebih baru, atau tentukan versi beam-sdks-java-core secara manual sebagai 2.37.0:

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-core</artifactId>
  <version>2.37.0</version>
</dependency>

Informasi selengkapnya