Halaman ini menunjukkan cara membuat pipeline Dataflow yang menggunakan dan meneruskan data perubahan Spanner menggunakan aliran data perubahan. Anda dapat menggunakan contoh kode di halaman ini untuk membuat pipeline kustom.
Konsep inti
Berikut adalah beberapa konsep inti untuk pipeline Dataflow untuk stream perubahan.
Dataflow
Dataflow adalah layanan serverless, cepat, dan hemat biaya yang mendukung pemrosesan streaming dan batch. Dataflow menyediakan portabilitas dengan pemrosesan tugas yang ditulis menggunakan library Apache Beam open source dan mengotomatiskan penyediaan infrastruktur serta pengelolaan cluster. Dataflow menyediakan streaming mendekati real-time saat membaca dari aliran data perubahan.
Anda dapat menggunakan Dataflow untuk menggunakan aliran perubahan Spanner dengan konektor SpannerIO, yang menawarkan abstraksi di atas Spanner API untuk membuat kueri aliran perubahan. Dengan konektor ini, Anda tidak perlu mengelola siklus proses partisi aliran perubahan, yang diperlukan saat Anda menggunakan Spanner API secara langsung. Konektor memberi Anda aliran data perubahan sehingga Anda dapat lebih berfokus pada logika aplikasi, dan lebih sedikit pada detail API tertentu dan partisi aliran perubahan dinamis. Sebaiknya gunakan konektor SpannerIO, bukan Spanner API, dalam sebagian besar situasi saat Anda perlu membaca data aliran perubahan.
Template Dataflow adalah pipeline Dataflow bawaan yang menerapkan kasus penggunaan umum. Lihat Template Dataflow untuk mengetahui ringkasannya.
Pipeline Dataflow
Pipeline Dataflow aliran perubahan Spanner terdiri dari empat bagian utama:
- Database Spanner dengan aliran perubahan
- Konektor SpannerIO
- Transformasi dan sink yang ditentukan pengguna
- Penulis I/O sink Apache Beam
Aliran data perubahan Spanner
Untuk mengetahui detail tentang cara membuat aliran perubahan, lihat Membuat aliran perubahan.
Konektor Apache Beam SpannerIO
Ini adalah konektor SpannerIO yang dijelaskan di bagian Dataflow sebelumnya.
Ini adalah konektor I/O sumber yang memunculkan PCollection
data perubahan data
ke tahap selanjutnya dari pipeline. Waktu peristiwa untuk setiap data perubahan yang dikeluarkan akan menjadi stempel waktu commit. Perhatikan bahwa data yang dikeluarkan adalah
tidak diurutkan, dan bahwa konektor SpannerIO menjamin tidak akan ada
data terlambat.
Saat menangani aliran perubahan, Dataflow menggunakan pembuatan checkpoint. Akibatnya, setiap pekerja mungkin menunggu hingga interval pemeriksaan yang dikonfigurasi untuk buffering perubahan sebelum mengirim perubahan untuk pemrosesan lebih lanjut.
Transformasi yang ditentukan pengguna
Transformasi yang ditentukan pengguna memungkinkan pengguna menggabungkan, mengubah, atau memodifikasi data pemrosesan dalam pipeline Dataflow. Kasus penggunaan umum untuk hal ini adalah penghapusan informasi identitas pribadi, memenuhi persyaratan format data downstream, dan pengurutan. Lihat dokumentasi Apache Beam resmi untuk panduan pemrograman tentang transformasi.
Penulis I/O sink Apache Beam
Apache Beam berisi konektor I/O bawaan yang dapat digunakan untuk menulis dari pipeline Dataflow ke sink data seperti BigQuery. Sebagian besar sink data umum didukung secara native.
Template Dataflow
Template Dataflow menyediakan metode untuk membuat tugas Dataflow berdasarkan image Docker bawaan untuk kasus penggunaan umum menggunakan konsol Google Cloud, Google Cloud CLI, atau panggilan Rest API.
Untuk aliran perubahan Spanner, kami menyediakan tiga template fleksibel Dataflow:
Menetapkan Izin IAM untuk template Dataflow
Sebelum membuat tugas Dataflow dengan tiga template fleksibel yang tercantum, pastikan Anda memiliki izin IAM yang diperlukan untuk akun layanan berikut:
Jika tidak memiliki izin IAM yang diperlukan, Anda harus menentukan akun layanan pekerja yang dikelola pengguna untuk membuat tugas Dataflow. Untuk mengetahui informasi selengkapnya, lihat Keamanan dan izin Dataflow.
Saat Anda mencoba menjalankan tugas dari template fleksibel Dataflow tanpa semua izin yang diperlukan, tugas Anda mungkin gagal dengan error gagal membaca file hasil atau izin ditolak pada error resource. Untuk mengetahui informasi selengkapnya, lihat Memecahkan Masalah Template Flex.
Membangun pipeline Dataflow
Bagian ini membahas konfigurasi awal konektor, dan memberikan contoh untuk integrasi umum dengan fitur streaming perubahan Spanner.
Untuk mengikuti langkah-langkah ini, Anda memerlukan lingkungan pengembangan Java untuk Dataflow. Untuk mengetahui informasi selengkapnya, lihat Membuat pipeline Dataflow menggunakan Java.
Membuat aliran perubahan
Untuk mengetahui detail tentang cara membuat aliran perubahan, lihat Membuat aliran perubahan. Untuk melanjutkan ke langkah berikutnya, Anda harus memiliki database Spanner dengan aliran perubahan yang dikonfigurasi.
Memberikan hak istimewa kontrol akses yang sangat terperinci
Jika Anda ingin pengguna kontrol akses terperinci menjalankan tugas Dataflow,
pastikan pengguna diberi akses ke peran database
yang memiliki hak istimewa SELECT
di aliran perubahan dan hak istimewa EXECUTE
di fungsi nilai tabel aliran perubahan. Pastikan juga bahwa
prinsipal menentukan peran database dalam konfigurasi SpannerIO atau dalam
template fleksibel Dataflow.
Untuk informasi selengkapnya, lihat Tentang kontrol akses terperinci.
Menambahkan konektor SpannerIO sebagai dependensi
Konektor Apache Beam SpannerIO mengenkapsulasi kompleksitas penggunaan change stream secara langsung menggunakan Cloud Spanner API, yang memunculkan PCollection data change stream ke tahap pipeline berikutnya.
Objek ini dapat digunakan di tahap lain pada pipeline Dataflow
pengguna. Integrasi aliran data perubahan adalah bagian dari konektor SpannerIO. Agar dapat menggunakan konektor SpannerIO, dependensi
harus ditambahkan ke file pom.xml
:
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
<version>${beam-version}</version> <!-- available from version 2.38.0 -->
</dependency>
Membuat database metadata
Konektor perlu melacak setiap partisi saat menjalankan pipeline Apache Beam. Konektor ini menyimpan metadata ini dalam tabel Spanner yang dibuat oleh konektor selama inisialisasi. Anda menentukan database tempat tabel ini akan dibuat saat mengonfigurasi konektor.
Seperti yang dijelaskan dalam Praktik terbaik aliran perubahan, sebaiknya buat database baru untuk tujuan ini, bukan mengizinkan konektor menggunakan database aplikasi Anda untuk menyimpan tabel metadatanya.
Pemilik tugas Dataflow yang menggunakan konektor SpannerIO harus menetapkan izin IAM berikut dengan database metadata ini:
spanner.databases.updateDdl
spanner.databases.beginReadOnlyTransaction
spanner.databases.beginOrRollbackReadWriteTransaction
spanner.databases.read
spanner.databases.select
spanner.databases.write
spanner.sessions.create
spanner.sessions.get
Mengonfigurasi konektor
Konektor aliran data perubahan Spanner dapat dikonfigurasi sebagai berikut:
SpannerConfig spannerConfig = SpannerConfig
.create()
.withProjectId("my-project-id")
.withInstanceId("my-instance-id")
.withDatabaseId("my-database-id")
.withDatabaseRole("my-database-role"); // Needed for fine-grained access control only
Timestamp startTime = Timestamp.now();
Timestamp endTime = Timestamp.ofTimeSecondsAndNanos(
startTime.getSeconds() + (10 * 60),
startTime.getNanos()
);
SpannerIO
.readChangeStream()
.withSpannerConfig(spannerConfig)
.withChangeStreamName("my-change-stream")
.withMetadataInstance("my-meta-instance-id")
.withMetadataDatabase("my-meta-database-id")
.withMetadataTable("my-meta-table-name")
.withRpcPriority(RpcPriority.MEDIUM)
.withInclusiveStartAt(startTime)
.withInclusiveEndAt(endTime);
Berikut adalah deskripsi opsi readChangeStream()
:
Konfigurasi Spanner (Wajib)
Digunakan untuk mengonfigurasi project, instance, dan database tempat aliran perubahan dibuat dan harus dikueri. Secara opsional, menentukan peran database yang akan digunakan saat akun utama IAM yang menjalankan tugas Dataflow adalah pengguna kontrol akses terperinci. Tugas ini mengasumsikan peran database ini untuk akses ke aliran perubahan. Untuk mengetahui informasi selengkapnya, lihat Tentang kontrol akses terperinci.
Nama aliran data perubahan (Wajib diisi)
Nama ini secara unik mengidentifikasi aliran perubahan. Nama di sini harus sama dengan nama yang digunakan saat membuatnya.
ID instance metadata (Opsional)
Ini adalah instance untuk menyimpan metadata yang digunakan oleh konektor untuk mengontrol penggunaan data API aliran perubahan.
ID database metadata (Wajib)
Ini adalah database untuk menyimpan metadata yang digunakan oleh konektor untuk mengontrol penggunaan data API aliran perubahan.
Nama tabel metadata (Opsional)
Ini hanya boleh digunakan saat memperbarui pipeline yang ada.
Ini adalah nama tabel metadata yang sudah ada dan akan digunakan oleh konektor. Ini digunakan oleh konektor untuk menyimpan metadata guna mengontrol penggunaan data API aliran perubahan. Jika opsi ini dihilangkan, Spanner akan membuat tabel baru dengan nama yang dihasilkan pada inisialisasi konektor.
Prioritas RPC (Opsional)
Prioritas permintaan yang akan digunakan untuk kueri aliran perubahan. Jika parameter ini dihilangkan, high
priority
akan digunakan.
InclusiveStartAt (Wajib)
Perubahan dari stempel waktu yang diberikan akan ditampilkan kepada pemanggil.
InclusiveEndAt (Opsional)
Perubahan hingga stempel waktu yang diberikan akan ditampilkan kepada pemanggil. Jika parameter ini dihilangkan, perubahan akan dikeluarkan tanpa batas.
Menambahkan transformasi dan sink untuk memproses data perubahan
Setelah langkah-langkah sebelumnya selesai, konektor SpannerIO yang dikonfigurasi siap
memunculkan PCollection objek DataChangeRecord
.
Lihat Contoh transformasi dan sink untuk beberapa contoh konfigurasi pipeline yang memproses data yang di-streaming ini dengan berbagai cara.
Perhatikan bahwa data aliran perubahan yang dikeluarkan oleh konektor SpannerIO tidak diurutkan. Hal ini karena PCollection tidak memberikan jaminan pengurutan apa pun. Jika memerlukan streaming yang diurutkan, Anda harus mengelompokkan dan mengurutkan data sebagai transformasi dalam pipeline: lihat Contoh: Mengurutkan menurut kunci. Anda dapat memperluas contoh ini untuk mengurutkan data berdasarkan kolom data apa pun, seperti berdasarkan ID transaksi.
Contoh transformasi dan sink
Anda dapat menentukan transformasi sendiri dan menentukan sink untuk menulis data. Dokumentasi Apache Beam menyediakan berbagai transformasi yang dapat diterapkan, serta konektor I/O yang siap digunakan untuk menulis data ke sistem eksternal.
Contoh: Mengurutkan berdasarkan kunci
Contoh kode ini memunculkan kumpulan data perubahan yang diurutkan berdasarkan stempel waktu commit dan dikelompokkan menurut kunci utama menggunakan konektor Dataflow.
pipeline
.apply(SpannerIO
.readChangeStream()
.withSpannerConfig(SpannerConfig
.create()
.withProjectId("my-project-id")
.withInstanceId("my-instance-id")
.withDatabaseId("my-database-id")
.withDatabaseRole("my-database-role")) // Needed for fine-grained access control only
.withChangeStreamName("my-change-stream")
.withMetadataInstance("my-metadata-instance-id")
.withMetadataDatabase("my-metadata-database-id")
.withInclusiveStartAt(Timestamp.now()))
.apply(ParDo.of(new BreakRecordByModFn()))
.apply(ParDo.of(new KeyByIdFn()))
.apply(ParDo.of(new BufferKeyUntilOutputTimestamp()))
// Subsequent processing goes here
Contoh kode ini menggunakan status dan timer untuk melakukan buffering data untuk setiap kunci, dan menetapkan waktu habis masa berlaku timer ke beberapa waktu yang dikonfigurasi pengguna T
di masa mendatang (ditentukan dalam fungsi BufferKeyUntilOutputTimestamp). Saat watermark Dataflow melewati waktu T
, kode ini akan menghapus semua data dalam buffering dengan stempel waktu kurang dari T
, mengurutkan data ini berdasarkan stempel waktu commit, dan menghasilkan pasangan nilai kunci dengan:
- Kunci adalah kunci input, yaitu kunci utama yang di-hash ke array bucket berukuran 1.000.
- Nilainya adalah data perubahan data yang diurutkan yang di-buffer untuk kunci.
Untuk setiap kunci, kami memiliki jaminan berikut:
- Timer dijamin akan diaktifkan sesuai urutan stempel waktu habis masa berlaku.
- Tahap downstream dijamin akan menerima elemen dalam urutan yang sama seperti saat elemen tersebut diproduksi.
Misalnya, dengan kunci bernilai 100, timer akan diaktifkan pada T1
dan T10
, yang menghasilkan kumpulan data perubahan data pada setiap stempel waktu. Karena data perubahan yang ditampilkan di T1
dibuat sebelum data perubahan yang ditampilkan di T10
, data perubahan yang ditampilkan di T1
juga dijamin akan diterima oleh tahap berikutnya sebelum data perubahan yang ditampilkan di T10
. Mekanisme ini membantu kami menjamin pengurutan stempel waktu commit yang ketat per kunci utama untuk pemrosesan downstream.
Proses ini akan diulang hingga pipeline berakhir dan semua data perubahan data telah diproses (atau akan diulang tanpa batas waktu jika tidak ada waktu berakhir yang ditentukan).
Perhatikan bahwa contoh kode ini menggunakan status dan timer, bukan jendela, untuk melakukan pengurutan per kunci. Alasannya adalah jendela tidak dijamin diproses secara berurutan. Artinya, jendela lama dapat diproses lebih lambat daripada jendela yang lebih baru, yang dapat menyebabkan pemrosesan tidak berurutan.
BreakRecordByModFn
Setiap kumpulan data perubahan dapat berisi beberapa mod. Setiap mod mewakili penyisipan, pembaruan, atau penghapusan ke satu nilai kunci utama. Fungsi ini membagi setiap data perubahan menjadi data perubahan terpisah, satu per mod.
private static class BreakRecordByModFn extends DoFn<DataChangeRecord,
DataChangeRecord> {
@ProcessElement
public void processElement(
@Element DataChangeRecord record, OutputReceiver<DataChangeRecord>
outputReceiver) {
record.getMods().stream()
.map(
mod ->
new DataChangeRecord(
record.getPartitionToken(),
record.getCommitTimestamp(),
record.getServerTransactionId(),
record.isLastRecordInTransactionInPartition(),
record.getRecordSequence(),
record.getTableName(),
record.getRowType(),
Collections.singletonList(mod),
record.getModType(),
record.getValueCaptureType(),
record.getNumberOfRecordsInTransaction(),
record.getNumberOfPartitionsInTransaction(),
record.getTransactionTag(),
record.isSystemTransaction(),
record.getMetadata()))
.forEach(outputReceiver::output);
}
}
KeyByIdFn
Fungsi ini menggunakan DataChangeRecord
dan menghasilkan DataChangeRecord
yang diberi kunci oleh kunci utama Spanner yang di-hash ke nilai bilangan bulat.
private static class KeyByIdFn extends DoFn<DataChangeRecord, KV<String, DataChangeRecord>> {
// NUMBER_OF_BUCKETS should be configured by the user to match their key cardinality
// Here, we are choosing to hash the Spanner primary keys to a bucket index, in order to have a deterministic number
// of states and timers for performance purposes.
// Note that having too many buckets might have undesirable effects if it results in a low number of records per bucket
// On the other hand, having too few buckets might also be problematic, since many keys will be contained within them.
private static final int NUMBER_OF_BUCKETS = 1000;
@ProcessElement
public void processElement(
@Element DataChangeRecord record,
OutputReceiver<KV<String, DataChangeRecord>> outputReceiver) {
int hashCode = (int) record.getMods().get(0).getKeysJson().hashCode();
// Hash the received keys into a bucket in order to have a
// deterministic number of buffers and timers.
String bucketIndex = String.valueOf(hashCode % NUMBER_OF_BUCKETS);
outputReceiver.output(KV.of(bucketIndex, record));
}
}
BufferKeyUntilOutputTimestamp
Timer dan buffering bersifat per kunci. Fungsi ini menyimpan setiap kumpulan data perubahan dalam buffering hingga watermark meneruskan stempel waktu saat kita ingin menampilkan kumpulan data perubahan yang di-buffer.
Kode ini menggunakan timer berulang untuk menentukan kapan harus menghapus buffer:
- Saat melihat data perubahan data untuk kunci untuk pertama kalinya, timer akan disetel untuk diaktifkan pada stempel waktu commit data perubahan data +
incrementIntervalSeconds
(opsi yang dapat dikonfigurasi pengguna). - Saat diaktifkan, timer akan menambahkan semua data perubahan data dalam buffering dengan stempel waktu kurang dari waktu habis masa berlaku timer ke
recordsToOutput
. Jika buffer memiliki data perubahan data yang stempel waktunya lebih besar dari atau sama dengan waktu habis masa berlaku timer, buffer akan menambahkan data perubahan data tersebut kembali ke buffer, bukan menampilkannya. Kemudian, timer berikutnya ditetapkan ke waktu habis masa berlaku timer saat ini ditambahincrementIntervalInSeconds
. - Jika
recordsToOutput
tidak kosong, fungsi ini akan mengurutkan data perubahan data direcordsToOutput
berdasarkan stempel waktu commit dan ID transaksi, lalu menampilkannya.
private static class BufferKeyUntilOutputTimestamp extends
DoFn<KV<String, DataChangeRecord>, KV<String, Iterable<DataChangeRecord>>> {
private static final Logger LOG =
LoggerFactory.getLogger(BufferKeyUntilOutputTimestamp.class);
private final long incrementIntervalInSeconds = 2;
private BufferKeyUntilOutputTimestamp(long incrementIntervalInSeconds) {
this.incrementIntervalInSeconds = incrementIntervalInSeconds;
}
@SuppressWarnings("unused")
@TimerId("timer")
private final TimerSpec timerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
@StateId("buffer")
private final StateSpec<BagState<DataChangeRecord>> buffer = StateSpecs.bag();
@StateId("keyString")
private final StateSpec<ValueState<String>> keyString =
StateSpecs.value(StringUtf8Coder.of());
@ProcessElement
public void process(
@Element KV<String, DataChangeRecord> element,
@StateId("buffer") BagState<DataChangeRecord> buffer,
@TimerId("timer") Timer timer,
@StateId("keyString") ValueState<String> keyString) {
buffer.add(element.getValue());
// Only set the timer if this is the first time we are receiving a data change
// record with this key.
String elementKey = keyString.read();
if (elementKey == null) {
Instant commitTimestamp =
new Instant(element.getValue().getCommitTimestamp().toSqlTimestamp());
Instant outputTimestamp =
commitTimestamp.plus(Duration.standardSeconds(incrementIntervalInSeconds));
timer.set(outputTimestamp);
keyString.write(element.getKey());
}
}
@OnTimer("timer")
public void onExpiry(
OnTimerContext context,
@StateId("buffer") BagState<DataChangeRecord> buffer,
@TimerId("timer") Timer timer,
@StateId("keyString") ValueState<String> keyString) {
if (!buffer.isEmpty().read()) {
String elementKey = keyString.read();
final List<DataChangeRecord> records =
StreamSupport.stream(buffer.read().spliterator(), false)
.collect(Collectors.toList());
buffer.clear();
List<DataChangeRecord> recordsToOutput = new ArrayList<>();
for (DataChangeRecord record : records) {
Instant recordCommitTimestamp =
new Instant(record.getCommitTimestamp().toSqlTimestamp());
final String recordString =
record.getMods().get(0).getNewValuesJson().isEmpty()
? "Deleted record"
: record.getMods().get(0).getNewValuesJson();
// When the watermark passes time T, this means that all records with
// event time < T have been processed and successfully committed. Since the
// timer fires when the watermark passes the expiration time, we should
// only output records with event time < expiration time.
if (recordCommitTimestamp.isBefore(context.timestamp())) {
LOG.info(
"Outputting record with key {} and value {} at expiration " +
"timestamp {}",
elementKey,
recordString,
context.timestamp().toString());
recordsToOutput.add(record);
} else {
LOG.info(
"Expired at {} but adding record with key {} and value {} back to " +
"buffer due to commit timestamp {}",
context.timestamp().toString(),
elementKey,
recordString,
recordCommitTimestamp.toString());
buffer.add(record);
}
}
// Output records, if there are any to output.
if (!recordsToOutput.isEmpty()) {
// Order the records in place, and output them. The user would need
// to implement DataChangeRecordComparator class that sorts the
// data change records by commit timestamp and transaction ID.
Collections.sort(recordsToOutput, new DataChangeRecordComparator());
context.outputWithTimestamp(
KV.of(elementKey, recordsToOutput), context.timestamp());
LOG.info(
"Expired at {}, outputting records for key {}",
context.timestamp().toString(),
elementKey);
} else {
LOG.info("Expired at {} with no records", context.timestamp().toString());
}
}
Instant nextTimer = context.timestamp().plus(Duration.standardSeconds(incrementIntervalInSeconds));
if (buffer.isEmpty() != null && !buffer.isEmpty().read()) {
LOG.info("Setting next timer to {}", nextTimer.toString());
timer.set(nextTimer);
} else {
LOG.info(
"Timer not being set since the buffer is empty: ");
keyString.clear();
}
}
}
Mengurutkan transaksi
Pipeline ini dapat diubah untuk diurutkan berdasarkan ID transaksi dan stempel waktu commit. Untuk melakukannya, buffer data untuk setiap pasangan ID transaksi / stempel waktu commit, bukan untuk setiap kunci Spanner. Hal ini memerlukan modifikasi kode di KeyByIdFn.
Contoh: Menggabungkan transaksi
Contoh kode ini membaca data perubahan data, menggabungkan semua data perubahan data yang termasuk dalam transaksi yang sama menjadi satu elemen, dan menampilkan elemen tersebut. Perhatikan bahwa transaksi yang dihasilkan oleh kode contoh ini tidak diurutkan berdasarkan stempel waktu commit.
Contoh kode ini menggunakan buffer untuk menyusun transaksi dari data perubahan data. Setelah menerima data perubahan data yang termasuk dalam transaksi untuk pertama kalinya, operasi ini akan membaca kolom numberOfRecordsInTransaction
dalam data perubahan data, yang menjelaskan jumlah data perubahan data yang diharapkan yang termasuk dalam transaksi tersebut. Fungsi ini menyimpan data perubahan data yang termasuk dalam transaksi tersebut ke dalam buffer hingga jumlah data yang di-buffer cocok dengan numberOfRecordsInTransaction
, lalu menampilkan data perubahan data yang dipaketkan.
pipeline
.apply(SpannerIO
.readChangeStream()
.withSpannerConfig(SpannerConfig
.create()
.withProjectId("my-project-id")
.withInstanceId("my-instance-id")
.withDatabaseId("my-database-id")
.withDatabaseRole("my-database-role")) // Needed for fine-grained access control only
.withChangeStreamName("my-change-stream")
.withMetadataInstance("my-metadata-instance-id")
.withMetadataDatabase("my-metadata-database-id")
.withInclusiveStartAt(Timestamp.now()))
.apply(ParDo.of(new KeyByTransactionIdFn()))
.apply(ParDo.of(new TransactionBoundaryFn()))
// Subsequent processing goes here
KeyByTransactionIdFn
Fungsi ini menggunakan DataChangeRecord
dan menghasilkan DataChangeRecord
yang diberi kunci oleh ID transaksi.
private static class KeyByTransactionIdFn extends DoFn<DataChangeRecord, KV<String, DataChangeRecord>> {
@ProcessElement
public void processElement(
@Element DataChangeRecord record,
OutputReceiver<KV<String, DataChangeRecord>> outputReceiver) {
outputReceiver.output(KV.of(record.getServerTransactionId(), record));
}
}
TransactionBoundaryFn
TransactionBoundaryFn
buffering menerima key-value pair
{TransactionId, DataChangeRecord}
dari KeyByTransactionIdFn
dan
melakukan buffering dalam grup berdasarkan TransactionId
. Jika jumlah
data yang di-buffer sama dengan jumlah data yang terdapat dalam
seluruh transaksi, fungsi ini akan mengurutkan objek DataChangeRecord
dalam grup berdasarkan urutan data dan menghasilkan pasangan nilai kunci
{CommitTimestamp, TransactionId}
, Iterable<DataChangeRecord>
.
Di sini, kita mengasumsikan bahwa SortKey
adalah class yang ditentukan pengguna yang mewakili
pasangan {CommitTimestamp, TransactionId}
. Untuk informasi selengkapnya tentang
SortKey
, lihat contoh implementasi.
private static class TransactionBoundaryFn extends DoFn<KV<String, DataChangeRecord>, KV<SortKey, Iterable<DataChangeRecord>>> {
@StateId("buffer")
private final StateSpec<BagState<DataChangeRecord>> buffer = StateSpecs.bag();
@StateId("count")
private final StateSpec<ValueState<Integer>> countState = StateSpecs.value();
@ProcessElement
public void process(
ProcessContext context,
@StateId("buffer") BagState<DataChangeRecord> buffer,
@StateId("count") ValueState<Integer> countState) {
final KV<String, DataChangeRecord> element = context.element();
final DataChangeRecord record = element.getValue();
buffer.add(record);
int count = (countState.read() != null ? countState.read() : 0);
count = count + 1;
countState.write(count);
if (count == record.getNumberOfRecordsInTransaction()) {
final List<DataChangeRecord> sortedRecords =
StreamSupport.stream(buffer.read().spliterator(), false)
.sorted(Comparator.comparing(DataChangeRecord::getRecordSequence))
.collect(Collectors.toList());
final Instant commitInstant =
new Instant(sortedRecords.get(0).getCommitTimestamp().toSqlTimestamp()
.getTime());
context.outputWithTimestamp(
KV.of(
new SortKey(sortedRecords.get(0).getCommitTimestamp(),
sortedRecords.get(0).getServerTransactionId()),
sortedRecords),
commitInstant);
buffer.clear();
countState.clear();
}
}
}
Contoh: Memfilter menurut tag transaksi
Saat transaksi yang mengubah data pengguna diberi tag, tag yang sesuai dan jenisnya akan disimpan sebagai bagian dari DataChangeRecord
. Contoh ini menunjukkan cara memfilter data aliran perubahan berdasarkan tag transaksi yang ditentukan pengguna serta tag sistem:
Pemfilteran tag yang ditentukan pengguna untuk my-tx-tag
:
pipeline
.apply(SpannerIO
.readChangeStream()
.withSpannerConfig(SpannerConfig
.create()
.withProjectId("my-project-id")
.withInstanceId("my-instance-id")
.withDatabaseId("my-database-id")
.withDatabaseRole("my-database-role")) // Needed for fine-grained access control only
.withChangeStreamName("my-change-stream")
.withMetadataInstance("my-metadata-instance-id")
.withMetadataDatabase("my-metadata-database-id")
.withInclusiveStartAt(Timestamp.now()))
.apply(Filter.by(record ->
!record.isSystemTransaction()
&& record.getTransactionTag().equalsIgnoreCase("my-tx-tag")))
// Subsequent processing goes here
Pemfilteran tag sistem/audit TTL:
pipeline
.apply(SpannerIO
.readChangeStream()
.withSpannerConfig(SpannerConfig
.create()
.withProjectId("my-project-id")
.withInstanceId("my-instance-id")
.withDatabaseId("my-database-id")
.withDatabaseRole("my-database-role")) // Needed for fine-grained access control only
.withChangeStreamName("my-change-stream")
.withMetadataInstance("my-metadata-instance-id")
.withMetadataDatabase("my-metadata-database-id")
.withInclusiveStartAt(Timestamp.now()))
.apply(Filter.by(record ->
record.isSystemTransaction()
&& record.getTransactionTag().equals("RowDeletionPolicy")))
// Subsequent processing goes here
Contoh: Mengambil baris lengkap
Contoh ini berfungsi dengan tabel Spanner bernama Singer
yang memiliki definisi berikut:
CREATE TABLE Singers (
SingerId INT64 NOT NULL,
FirstName STRING(1024),
LastName STRING(1024)
) PRIMARY KEY (SingerId);
Pada mode pengambilan nilai OLD_AND_NEW_VALUES
default dari aliran perubahan,
saat ada pembaruan pada baris Spanner, data perubahan
yang diterima hanya akan berisi kolom yang diubah. Kolom yang dilacak tetapi tidak berubah tidak akan disertakan dalam data. Kunci utama modi dapat digunakan untuk melakukan pembacaan snapshot Spanner pada stempel waktu commit data perubahan untuk mengambil kolom yang tidak diubah atau bahkan mengambil baris lengkap.
Perhatikan bahwa kebijakan retensi database mungkin perlu diubah ke nilai yang lebih besar atau sama dengan kebijakan retensi aliran perubahan agar pembacaan snapshot berhasil.
Perhatikan juga bahwa menggunakan jenis pengambilan nilai NEW_ROW
adalah cara yang direkomendasikan dan
lebih efisien untuk melakukannya, karena menampilkan semua kolom baris yang dilacak
secara default dan tidak memerlukan snapshot tambahan yang dibaca ke dalam Spanner.
SpannerConfig spannerConfig = SpannerConfig
.create()
.withProjectId("my-project-id")
.withInstanceId("my-instance-id")
.withDatabaseId("my-database-id")
.withDatabaseRole("my-database-role"); // Needed for fine-grained access control only
pipeline
.apply(SpannerIO
.readChangeStream()
.withSpannerConfig(spannerConfig)
// Assume we have a change stream "my-change-stream" that watches Singers table.
.withChangeStreamName("my-change-stream")
.withMetadataInstance("my-metadata-instance-id")
.withMetadataDatabase("my-metadata-database-id")
.withInclusiveStartAt(Timestamp.now()))
.apply(ParDo.of(new ToFullRowJsonFn(spannerConfig)))
// Subsequent processing goes here
ToFullRowJsonFn
Transformasi ini akan melakukan pembacaan yang sudah tidak berlaku pada stempel waktu commit setiap data yang diterima, dan memetakan baris lengkap ke JSON.
public class ToFullRowJsonFn extends DoFn<DataChangeRecord, String> {
// Since each instance of this DoFn will create its own session pool and will
// perform calls to Spanner sequentially, we keep the number of sessions in
// the pool small. This way, we avoid wasting resources.
private static final int MIN_SESSIONS = 1;
private static final int MAX_SESSIONS = 5;
private final String projectId;
private final String instanceId;
private final String databaseId;
private transient DatabaseClient client;
private transient Spanner spanner;
public ToFullRowJsonFn(SpannerConfig spannerConfig) {
this.projectId = spannerConfig.getProjectId().get();
this.instanceId = spannerConfig.getInstanceId().get();
this.databaseId = spannerConfig.getDatabaseId().get();
}
@Setup
public void setup() {
SessionPoolOptions sessionPoolOptions = SessionPoolOptions
.newBuilder()
.setMinSessions(MIN_SESSIONS)
.setMaxSessions(MAX_SESSIONS)
.build();
SpannerOptions options = SpannerOptions
.newBuilder()
.setProjectId(projectId)
.setSessionPoolOption(sessionPoolOptions)
.build();
DatabaseId id = DatabaseId.of(projectId, instanceId, databaseId);
spanner = options.getService();
client = spanner.getDatabaseClient(id);
}
@Teardown
public void teardown() {
spanner.close();
}
@ProcessElement
public void process(
@Element DataChangeRecord element,
OutputReceiver<String> output) {
com.google.cloud.Timestamp commitTimestamp = element.getCommitTimestamp();
element.getMods().forEach(mod -> {
JSONObject keysJson = new JSONObject(mod.getKeysJson());
JSONObject newValuesJson = new JSONObject(mod.getNewValuesJson());
ModType modType = element.getModType();
JSONObject jsonRow = new JSONObject();
long singerId = keysJson.getLong("SingerId");
jsonRow.put("SingerId", singerId);
if (modType == ModType.INSERT) {
// For INSERT mod, get non-primary key columns from mod.
jsonRow.put("FirstName", newValuesJson.get("FirstName"));
jsonRow.put("LastName", newValuesJson.get("LastName"));
} else if (modType == ModType.UPDATE) {
// For UPDATE mod, get non-primary key columns by doing a snapshot read using the primary key column from mod.
try (ResultSet resultSet = client
.singleUse(TimestampBound.ofReadTimestamp(commitTimestamp))
.read(
"Singers",
KeySet.singleKey(com.google.cloud.spanner.Key.of(singerId)),
Arrays.asList("FirstName", "LastName"))) {
if (resultSet.next()) {
jsonRow.put("FirstName", resultSet.isNull("FirstName") ?
JSONObject.NULL : resultSet.getString("FirstName"));
jsonRow.put("LastName", resultSet.isNull("LastName") ?
JSONObject.NULL : resultSet.getString("LastName"));
}
}
} else {
// For DELETE mod, there is nothing to do, as we already set SingerId.
}
output.output(jsonRow.toString());
});
}
}
Kode ini membuat klien database Spanner untuk melakukan pengambilan baris penuh, dan mengonfigurasi kumpulan sesi agar hanya memiliki beberapa sesi, yang melakukan pembacaan dalam satu instance ToFullReowJsonFn
secara berurutan.
Dataflow memastikan untuk membuat banyak instance fungsi ini,
masing-masing dengan kumpulan kliennya sendiri.
Contoh: Spanner ke Pub/Sub
Dalam skenario ini, pemanggil melakukan streaming data ke Pub/Sub secepat mungkin, tanpa pengelompokan atau agregasi. Hal ini cocok untuk memicu pemrosesan downstream, seperti streaming semua baris baru yang disisipkan ke dalam tabel Spanner ke Pub/Sub untuk pemrosesan lebih lanjut.
pipeline
.apply(SpannerIO
.readChangeStream()
.withSpannerConfig(SpannerConfig
.create()
.withProjectId("my-project-id")
.withInstanceId("my-instance-id")
.withDatabaseId("my-database-id")
.withDatabaseRole("my-database-role")) // Needed for fine-grained access control only
.withChangeStreamName("my-change-stream")
.withMetadataInstance("my-metadata-instance-id")
.withMetadataDatabase("my-metadata-database-id")
.withInclusiveStartAt(Timestamp.now()))
.apply(MapElements.into(TypeDescriptors.strings()).via(Object::toString))
.apply(PubsubIO.writeStrings().to("my-topic"));
Perhatikan bahwa sink Pub/Sub dapat dikonfigurasi untuk memastikan semantik tepat satu kali.
Contoh: Spanner ke Cloud Storage
Dalam skenario ini, pemanggil mengelompokkan semua kumpulan data dalam periode tertentu dan menyimpan grup dalam file Cloud Storage terpisah. Hal ini cocok untuk analisis dan pengarsipan titik waktu, yang tidak bergantung pada periode retensi Spanner.
pipeline
.apply(SpannerIO
.readChangeStream()
.withSpannerConfig(SpannerConfig
.create()
.withProjectId("my-project-id")
.withInstanceId("my-instance-id")
.withDatabaseId("my-database-id")
.withDatabaseRole("my-database-role")) // Needed for fine-grained access control only
.withChangeStreamName("my-change-stream")
.withMetadataInstance("my-metadata-instance-id")
.withMetadataDatabase("my-metadata-database-id")
.withInclusiveStartAt(Timestamp.now()))
.apply(MapElements.into(TypeDescriptors.strings()).via(Object::toString))
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
.apply(TextIO
.write()
.to("gs://my-bucket/change-stream-results-")
.withSuffix(".txt")
.withWindowedWrites()
.withNumShards(1));
Perhatikan bahwa sink Cloud Storage menyediakan semantik setidaknya satu kali secara default. Dengan pemrosesan tambahan, semantik ini dapat diubah agar memiliki semantik tepat satu kali.
Kami juga menyediakan template Dataflow untuk kasus penggunaan ini: lihat Menghubungkan aliran perubahan ke Cloud Storage.
Contoh: Spanner ke BigQuery (tabel ledger)
Di sini, pemanggil melakukan streaming data perubahan ke BigQuery. Setiap data perubahan data ditampilkan sebagai satu baris di BigQuery. Hal ini cocok untuk analisis. Kode ini menggunakan fungsi yang ditentukan sebelumnya, di bagian Mengambil baris lengkap, untuk mengambil baris lengkap data dan menulisnya ke BigQuery.
SpannerConfig spannerConfig = SpannerConfig
.create()
.withProjectId("my-project-id")
.withInstanceId("my-instance-id")
.withDatabaseId("my-database-id")
.withDatabaseRole("my-database-role"); // Needed for fine-grained access control only
pipeline
.apply(SpannerIO
.readChangeStream()
.withSpannerConfig(spannerConfig)
.withChangeStreamName("my-change-stream")
.withMetadataInstance("my-metadata-instance-id")
.withMetadataDatabase("my-metadata-database-id")
.withInclusiveStartAt(Timestamp.now()))
.apply(ParDo.of(new ToFullRowJsonFn(spannerConfig)))
.apply(BigQueryIO
.<String>write()
.to("my-bigquery-table")
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(Write.WriteDisposition.WRITE_APPEND)
.withSchema(new TableSchema().setFields(Arrays.asList(
new TableFieldSchema()
.setName("SingerId")
.setType("INT64")
.setMode("REQUIRED"),
new TableFieldSchema()
.setName("FirstName")
.setType("STRING")
.setMode("REQUIRED"),
new TableFieldSchema()
.setName("LastName")
.setType("STRING")
.setMode("REQUIRED")
)))
.withAutoSharding()
.optimizedWrites()
.withFormatFunction((String element) -> {
ObjectMapper objectMapper = new ObjectMapper();
JsonNode jsonNode = null;
try {
jsonNode = objectMapper.readTree(element);
} catch (IOException e) {
e.printStackTrace();
}
return new TableRow()
.set("SingerId", jsonNode.get("SingerId").asInt())
.set("FirstName", jsonNode.get("FirstName").asText())
.set("LastName", jsonNode.get("LastName").asText());
}
)
);
Perhatikan bahwa sink BigQuery menyediakan semantik setidaknya satu kali secara default. Dengan pemrosesan tambahan, semantik ini dapat diubah agar memiliki semantik tepat satu kali.
Kami juga menyediakan template Dataflow untuk kasus penggunaan ini; lihat Menghubungkan aliran perubahan ke BigQuery.
Memantau pipeline
Ada dua class metrik yang tersedia untuk memantau pipeline Dataflow aliran perubahan.
Metrik Dataflow standar
Dataflow menyediakan beberapa metrik untuk memastikan tugas Anda berjalan dengan baik, seperti keaktualan data, kelambatan sistem, throughput tugas, penggunaan CPU pekerja, dan lainnya. Anda dapat menemukan informasi selengkapnya di Menggunakan Monitoring untuk pipeline Dataflow.
Untuk pipeline aliran perubahan, ada dua metrik utama yang harus diperhitungkan: latensi sistem dan keaktualan data.
Latensi sistem akan memberi tahu Anda durasi maksimum saat ini (dalam detik) saat item data diproses atau menunggu pemrosesan.
Keaktualan data akan menunjukkan jumlah waktu antara sekarang (real time) dan watermark output. Stempel waktu output waktu T
menunjukkan bahwa semua elemen dengan waktu peristiwa (secara ketat) sebelum T
telah diproses untuk komputasi. Dengan kata lain, keaktualan data mengukur seberapa baru pipeline, sehubungan dengan pemrosesan peristiwa yang telah diterimanya.
Jika pipeline kekurangan resource, Anda dapat melihat efeknya dalam kedua metrik ini. Latensi sistem akan meningkat, karena item harus menunggu lebih lama sebelum diproses. Keaktualan data juga akan meningkat, karena pipeline tidak akan dapat mengimbangi jumlah data yang diterima.
Metrik aliran perubahan kustom
Metrik ini ditampilkan di Cloud Monitoring dan mencakup:
- Latensi bucket (histogram) antara data yang di-commit di Spanner hingga data tersebut dikeluarkan ke PCollection oleh konektor. Metrik ini dapat digunakan untuk melihat masalah performa (latensi) pada pipeline.
- Jumlah total data yang dibaca. Ini adalah indikasi keseluruhan jumlah data yang dikeluarkan oleh konektor. Jumlah ini akan terus meningkat, yang mencerminkan tren penulisan di database Spanner yang mendasarinya.
- Jumlah partisi yang sedang dibaca. Harus selalu ada partisi yang dibaca. Jika angka ini nol, berarti terjadi error dalam pipeline.
- Total jumlah kueri yang dikeluarkan selama eksekusi konektor. Ini adalah indikasi keseluruhan kueri aliran perubahan yang dibuat ke instance Spanner selama eksekusi pipeline. Hal ini dapat digunakan untuk mendapatkan estimasi beban dari konektor ke database Spanner.
Memperbarui pipeline yang ada
Anda dapat mengupdate pipeline yang sedang berjalan yang menggunakan konektor SpannerIO untuk memproses aliran perubahan jika pemeriksaan kompatibilitas tugas lulus. Untuk melakukannya, Anda harus menetapkan parameter nama tabel metadata tugas baru secara eksplisit saat memperbaruinya. Gunakan nilai opsi pipeline metadataTable
dari tugas yang Anda perbarui.
Jika Anda menggunakan template Dataflow yang disediakan Google, tetapkan
nama tabel menggunakan parameter spannerMetadataTableName
. Anda juga dapat mengubah
tugas yang ada untuk menggunakan tabel metadata secara eksplisit dengan metode
withMetadataTable(your-metadata-table-name)
dalam
konfigurasi konektor. Setelah selesai, Anda dapat mengikuti petunjuk di Meluncurkan tugas pengganti dari dokumentasi Dataflow untuk memperbarui tugas yang sedang berjalan.
Praktik terbaik untuk aliran perubahan dan Dataflow
Berikut adalah beberapa praktik terbaik untuk membuat koneksi aliran perubahan menggunakan Dataflow.
Menggunakan database metadata terpisah
Sebaiknya buat database terpisah untuk konektor SpannerIO yang akan digunakan untuk penyimpanan metadata, bukan mengonfigurasinya untuk menggunakan database aplikasi Anda.
Untuk mengetahui informasi selengkapnya, lihat Mempertimbangkan database metadata terpisah.
Menentukan ukuran cluster
Pedoman umum untuk jumlah awal pekerja dalam tugas aliran perubahan Spanner adalah satu pekerja per 1.000 operasi tulis per detik. Perhatikan bahwa estimasi ini dapat bervariasi bergantung pada beberapa faktor, seperti ukuran setiap transaksi, jumlah data aliran perubahan yang dihasilkan dari satu transaksi dan transformasi, agregasi, atau sink lainnya yang digunakan dalam pipeline.
Setelah penyediaan resource awal, penting untuk melacak metrik yang disebutkan di Memantau pipeline, untuk memastikan pipeline dalam kondisi baik. Sebaiknya lakukan eksperimen dengan ukuran kumpulan pekerja awal dan pantau cara pipeline Anda menangani beban, dengan meningkatkan jumlah node jika perlu. Penggunaan CPU adalah metrik utama untuk memeriksa apakah beban sudah tepat dan apakah diperlukan lebih banyak node.
Batasan umum
Ada beberapa batasan umum saat menggunakan aliran data perubahan Spanner dengan Dataflow:
Penskalaan otomatis
Dukungan penskalaan otomatis untuk pipeline apa pun yang menyertakan SpannerIO.readChangeStream
memerlukan Apache Beam 2.39.0
atau yang lebih tinggi.
Jika Anda menggunakan versi Apache Beam sebelum 2.39.0
, pipeline yang menyertakan
SpannerIO.readChangeStream
harus menentukan algoritma penskalaan otomatis
secara eksplisit sebagai NONE
, seperti yang dijelaskan dalam Penskalaan otomatis horizontal.
Untuk menskalakan pipeline Dataflow secara manual, bukan menggunakan penskalaan otomatis, lihat Menskalakan pipeline streaming secara manual.
Runner V2
Konektor aliran perubahan Spanner memerlukan
Dataflow Runner V2.
Hal ini harus ditentukan secara manual selama eksekusi atau error akan
ditampilkan. Anda dapat menentukan Runner V2
dengan mengonfigurasi tugas dengan
--experiments=use_unified_worker,use_runner_v2
.
Snapshot
Konektor aliran perubahan Spanner tidak mendukung Snapshot Dataflow.
Menyelesaikan
Konektor aliran perubahan Spanner tidak mendukung menguras tugas. Anda hanya dapat membatalkan tugas yang ada.
Anda juga dapat memperbarui pipeline yang ada tanpa perlu menghentikannya.
OpenCensus
Untuk menggunakan OpenCensus untuk memantau pipeline, tentukan versi 0.28.3 atau yang lebih baru.
NullPointerException
saat pipeline dimulai
Bug di Apache Beam versi 2.38.0
dapat menyebabkan NullPointerException
saat memulai pipeline dalam kondisi tertentu. Hal ini akan mencegah tugas Anda dimulai, dan menampilkan pesan error ini:
java.lang.NullPointerException: null value in entry: Cloud Storage_PROJECT_ID=null
Untuk mengatasi masalah ini, gunakan Apache Beam versi 2.39.0
atau yang lebih baru, atau tentukan versi beam-sdks-java-core
secara manual sebagai 2.37.0
:
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<version>2.37.0</version>
</dependency>