Halaman ini menjelaskan cara menggunakan konektor Kafka untuk menggunakan dan meneruskan data aliran data perubahan Spanner.
Konsep inti
Berikut ini menjelaskan konsep inti konektor Kafka.
Debezium
Debezium adalah project open source yang menyediakan platform streaming data latensi rendah untuk pengambilan data perubahan.
Konektor Kafka
Konektor Kafka menyediakan abstraksi di atas Spanner API untuk memublikasikan aliran perubahan Spanner ke Kafka. Dengan konektor ini, Anda tidak perlu mengelola siklus proses partisi aliran perubahan, yang diperlukan saat Anda menggunakan Spanner API secara langsung.
Konektor Kafka menghasilkan peristiwa perubahan untuk setiap mod data perubahan dan mengirim data peristiwa perubahan ke downstream ke topik Kafka terpisah untuk setiap tabel yang dilacak aliran perubahan. Modifikasi data perubahan mewakili satu modifikasi (menyisipkan, memperbarui, atau menghapus) yang direkam. Satu data perubahan data dapat berisi lebih dari satu mod.
Output konektor Kafka
Konektor Kafka meneruskan data aliran perubahan langsung ke topik Kafka terpisah. Nama topik output harus connector_name
.table_name
.
Jika topik tidak ada, konektor Kafka akan otomatis membuat topik dengan nama tersebut.
Anda juga dapat mengonfigurasi transformasi pemilihan rute topik untuk merutekan ulang data ke topik yang Anda tentukan. Jika Anda ingin menggunakan pemilihan rute topik, nonaktifkan fungsi stempel waktu rendah.
Pengurutan data
Data diurutkan berdasarkan stempel waktu commit per kunci utama dalam
topik Kafka. Data yang termasuk dalam kunci utama yang berbeda tidak memiliki jaminan pengurutan. Data dengan kunci utama yang sama disimpan di
partisi topik Kafka yang sama. Jika ingin memproses seluruh transaksi, Anda juga dapat menggunakan kolom server_transaction_id
dan number_of_records_in_transaction
data change record untuk menyusun transaksi Spanner.
Mengubah peristiwa
Konektor Kafka menghasilkan peristiwa perubahan data untuk setiap operasi INSERT
, UPDATE
,
dan DELETE
. Setiap peristiwa berisi kunci dan nilai untuk baris yang diubah.
Anda dapat menggunakan konverter Kafka Connect untuk menghasilkan peristiwa perubahan data dalam
format Protobuf
, AVRO
, JSON
, atau JSON Schemaless
. Jika Anda menggunakan
konverter Kafka Connect yang menghasilkan skema, peristiwa akan berisi
skema terpisah untuk kunci dan nilai. Jika tidak, peristiwa hanya berisi
kunci dan nilai.
Skema untuk kunci tidak pernah berubah. Skema untuk nilai adalah gabungan dari semua kolom yang telah dilacak oleh aliran perubahan sejak waktu mulai konektor.
Jika Anda mengonfigurasi konektor untuk menghasilkan peristiwa JSON, peristiwa perubahan output berisi lima kolom:
Kolom
schema
pertama menentukan skema Kafka Connect yang menjelaskan skema kunci Spanner.Kolom
payload
pertama memiliki struktur yang dijelaskan oleh kolomschema
sebelumnya dan berisi kunci untuk baris yang diubah.Kolom
schema
kedua menentukan skema Kafka Connect yang menjelaskan skema untuk baris yang diubah.Kolom
payload
kedua memiliki struktur yang dijelaskan oleh kolomschema
sebelumnya dan berisi data sebenarnya untuk baris yang diubah.Kolom
source
adalah kolom wajib yang menjelaskan metadata sumber untuk peristiwa.
Berikut adalah contoh peristiwa perubahan data:
{ // The schema for the Spanner key. "schema": { "type": "struct", "name": "customers.Key", "optional": false, "fields": [ { "type": "int64", "optional": "false" "field": "false" } ] }, // The value of the Spanner key. "payload": { "id": "1" }, // The schema for the payload, which contains the before and after values // of the changed row. The schema for the payload contains all the // columns that the change stream has tracked since the connector start // time. "schema": { "type": "struct", "fields": [ { // The schema for the before values of the changed row. "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "id" }, { "type": "string", "optional": true, "field": "first_name" } ], "optional": true, "name": "customers.Value", "field": "before" }, { // The schema for the after values of the changed row. "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "id" }, { "type": "string", "optional": false, "field": "first_name" } ], "optional": true, "name": "customers.Value", "field": "after" }, { // The schema for the source metadata for the event. "type": "struct", "fields": [ { "type": "string", "optional": false, "field": "version" }, { "type": "string", "optional": false, "field": "connector" }, { "type": "string", "optional": false, "field": "name" }, { "type": "int64", "optional": false, "field": "ts_ms" }, { "type": "boolean", "optional": true, "default": false, "field": "snapshot" }, { "type": "string", "optional": false, "field": "db" }, { "type": "string", "optional": false, "field": "sequence" }, { "type": "string", "optional": false, "field": "project_id" }, { "type": "string", "optional": false, "field": "instance_id" }, { "type": "string", "optional": false, "field": "database_id" }, { "type": "string", "optional": false, "field": "change_stream_name" }, { "type": "string", "optional": true, "field": "table" } { "type": "string", "optional": true, "field": "server_transaction_id" } { "type": "int64", "optional": true, "field": "low_watermark" } { "type": "int64", "optional": true, "field": "read_at_timestamp" } { "type": "int64", "optional": true, "field": "number_of_records_in_transaction" } { "type": "string", "optional": true, "field": "transaction_tag" } { "type": "boolean", "optional": true, "field": "system_transaction" } { "type": "string", "optional": true, "field": "value_capture_type" } { "type": "string", "optional": true, "field": "partition_token" } { "type": "int32", "optional": true, "field": "mod_number" } { "type": "boolean", "optional": true, "field": "is_last_record_in_transaction_in_partition" } { "type": "int64", "optional": true, "field": "number_of_partitions_in_transaction" } ], "optional": false, "name": "io.debezium.connector.spanner.Source", "field": "source" }, ] { "type": "string", "optional": false, "field": "op" }, { "type": "int64", "optional": true, "field": "ts_ms" } ], "optional": false, "name": "connector_name.customers.Envelope" }, "payload": { // The values of the row before the event. "before": null, // The values of the row after the event. "after": { "id": 1, "first_name": "Anne", } }, // The source metadata. "source": { "version": "{debezium-version}", "connector": "spanner", "name": "spanner_connector", "ts_ms": 1670955531785, "snapshot": "false", "db": "database", "sequence": "1", "project_id": "project", "instance_id": "instance", "database_id": "database", "change_stream_name": "change_stream", "table": "customers", "server_transaction_id": "transaction_id", "low_watermark": 1670955471635, "read_at_timestamp": 1670955531791, "number_records_in_transaction": 2, "transaction_tag": "", "system_transaction": false, "value_capture_type": "OLD_AND_NEW_VALUES", "partition_token": "partition_token", "mod_number": 0, "is_last_record_in_transaction_in_partition": true, "number_of_partitions_in_transaction": 1 }, "op": "c", "ts_ms": 1559033904863 // }
Watermark rendah
Stempel waktu rendah menjelaskan waktu T saat konektor Kafka dijamin telah melakukan streaming dan memublikasikan semua peristiwa dengan stempel waktu < T ke topik Kafka.
Anda dapat mengaktifkan watermark rendah di konektor Kafka menggunakan
parameter gcp.spanner.low-watermark.enabled
. Parameter ini dinonaktifkan secara default. Jika stempel waktu rendah diaktifkan, kolom low_watermark
dalam data perubahan stream perubahan akan diisi dengan stempel waktu stempel waktu rendah konektor Kafka saat ini.
Jika tidak ada data yang dihasilkan, konektor Kafka akan mengirimkan "detak jantung" watermark berkala ke topik output Kafka yang terdeteksi oleh konektor.
Detak jantung watermark ini adalah data yang kosong kecuali untuk
kolom low_watermark
. Kemudian, Anda dapat menggunakan stempel waktu rendah untuk melakukan agregasi berbasis waktu.
Misalnya, Anda dapat menggunakan stempel waktu rendah untuk mengurutkan peristiwa menurut stempel waktu commit di seluruh kunci utama.
Topik metadata
Konektor Kafka, serta framework Kafka Connect, membuat beberapa topik metadata untuk menyimpan informasi terkait konektor. Sebaiknya jangan ubah konfigurasi atau konten topik metadata ini.
Berikut adalah topik metadata:
_consumer_offsets
: Topik yang dibuat secara otomatis oleh Kafka. Menyimpan offset konsumen untuk konsumen yang dibuat di konektor Kafka._kafka-connect-offsets
: Topik yang dibuat secara otomatis oleh Kafka Connect. Menyimpan offset konektor._sync_topic_spanner_connector_connectorname
: Topik yang dibuat secara otomatis oleh konektor. Menyimpan metadata terkait partisi aliran perubahan._rebalancing_topic_spanner_connector_connectorname
: Topik yang dibuat secara otomatis oleh konektor. Digunakan untuk menentukan status aktif tugas konektor._debezium-heartbeat.connectorname
: Topik yang digunakan untuk memproses heartbeat aliran data perubahan Spanner.
Runtime konektor Kafka
Berikut ini penjelasan runtime konektor Kafka.
Skalabilitas
Konektor Kafka dapat diskalakan secara horizontal dan berjalan pada satu atau beberapa tugas yang tersebar di antara beberapa pekerja Kafka Connect.
Jaminan pengiriman pesan
Konektor Kafka mendukung jaminan pengiriman setidaknya satu kali.
Fault tolerance
Konektor Kafka toleran terhadap kegagalan. Saat konektor Kafka membaca perubahan dan menghasilkan peristiwa, konektor akan mencatat stempel waktu commit terakhir yang diproses untuk setiap partisi aliran perubahan. Jika konektor Kafka berhenti karena alasan apa pun (termasuk kegagalan komunikasi, masalah jaringan, atau kegagalan software), setelah dimulai ulang, konektor Kafka akan melanjutkan streaming data dari tempat terakhir kali berhenti.
Konektor Kafka membaca skema informasi pada stempel waktu awal konektor Kafka untuk mengambil informasi skema. Secara default, Spanner tidak dapat membaca skema informasi pada stempel waktu baca sebelum periode retensi versi, yang secara default adalah satu jam. Jika ingin memulai konektor dari lebih awal dari satu jam ke belakang, Anda harus meningkatkan periode retensi versi database.
Menyiapkan konektor Kafka
Membuat aliran perubahan
Untuk mengetahui detail tentang cara membuat aliran perubahan, lihat Membuat aliran perubahan. Untuk melanjutkan ke langkah berikutnya, Anda memerlukan instance Spanner dengan aliran perubahan yang dikonfigurasi.
Perhatikan bahwa jika Anda ingin kolom yang berubah dan tidak berubah ditampilkan pada setiap peristiwa perubahan data, gunakan jenis pengambilan nilai NEW_ROW
. Untuk mengetahui informasi selengkapnya, lihat jenis pengambilan nilai.
Menginstal JAR konektor Kafka
Setelah Zookeeper, Kafka, dan Kafka Connect diinstal, tugas yang tersisa untuk men-deploy konektor Kafka adalah mendownload
arsip plugin konektor, mengekstrak file JAR ke lingkungan Kafka Connect, dan menambahkan
direktori dengan file JAR ke plugin.path
Kafka Connect.
Kemudian, Anda perlu memulai ulang proses Kafka Connect untuk mengambil file JAR baru.
Jika menggunakan container yang tidak dapat diubah, Anda dapat mengambil image dari Image container Debezium untuk Zookeeper, Kafka, dan Kafka Connect. Image Kafka Connect telah menginstal konektor Spanner secara default.
Untuk informasi selengkapnya tentang cara menginstal JAR konektor Kafka berbasis Debezium, lihat Menginstal Debezium.
Mengonfigurasi konektor Kafka
Berikut adalah contoh konfigurasi untuk konektor Kafka
yang terhubung ke aliran perubahan yang disebut changeStreamAll
di database users
dalam instance test-instance
dan project test-project
.
"name": "spanner-connector", "config": { "connector.class": "io.debezium.connector.spanner.SpannerConnector", "gcp.spanner.project.id": "test-project", "gcp.spanner.instance.id": "test-instance", "gcp.spanner.database.id": "users", "gcp.spanner.change.stream": "changeStreamAll", "gcp.spanner.credentials.json": "{"client_id": user@example.com}", "gcp.spanner.database.role": "cdc-role", "tasks.max": "10" }
Konfigurasi ini berisi hal berikut:
Nama konektor saat terdaftar dengan layanan Kafka Connect.
Nama class konektor Spanner ini.
Project ID.
ID Instance Spanner.
ID Database Spanner.
Nama aliran perubahan.
Objek JSON untuk kunci akun layanan.
(Opsional) Peran database Spanner yang akan digunakan.
Jumlah maksimum tugas.
Untuk mengetahui daftar lengkap properti konektor, lihat Properti konfigurasi konektor Kafka.
Menambahkan konfigurasi konektor ke Kafka Connect
Untuk mulai menjalankan konektor Spanner:
Buat konfigurasi untuk konektor Spanner.
Gunakan Kafka Connect REST API untuk menambahkan konfigurasi konektor tersebut ke cluster Kafka Connect Anda.
Anda dapat mengirim konfigurasi ini dengan perintah POST
ke layanan Kafka Connect
yang sedang berjalan. Secara default, layanan Kafka Connect berjalan di port 8083
.
Layanan mencatat konfigurasi dan memulai tugas konektor yang terhubung ke database Spanner dan melakukan streaming data peristiwa perubahan ke topik Kafka.
Berikut adalah contoh perintah POST
:
POST /connectors HTTP/1.1 Host: http://localhost:8083 Accept: application/json { "name": "spanner-connector" "config": { "connector.class": "io.debezium.connector.spanner.SpannerConnector", "gcp.spanner.project.id": "test-project", "gcp.spanner.instance.id": "test-instance", "gcp.spanner.database.id": "users", "gcp.spanner.change.stream": "changeStreamAll", "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }", "heartbeat.interval.ms": "100", "tasks.max": "10" } }
Contoh respons yang berhasil:
HTTP/1.1 201 Created Content-Type: application/json { "name": "spanner-connector", "config": { "connector.class": "io.debezium.connector.spanner.SpannerConnector", "gcp.spanner.project.id": "test-project", "gcp.spanner.instance.id": "test-instance", "gcp.spanner.database.id": "users", "gcp.spanner.change.stream": "changeStreamAll", "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }", "heartbeat.interval.ms": "100", "tasks.max": "10" }, "tasks": [ { "connector": "spanner-connector", "task": 1 }, { "connector": "spanner-connector", "task": 2 }, { "connector": "spanner-connector", "task": 3 } ] }
Memperbarui konfigurasi konektor Kafka
Untuk memperbarui konfigurasi konektor, kirim perintah PUT
ke layanan Kafka Connect yang berjalan dengan nama konektor yang sama.
Anggaplah kita memiliki konektor yang berjalan dengan konfigurasi dari
bagian sebelumnya. Berikut adalah contoh perintah PUT
:
PUT /connectors/spanner-connector/config HTTP/1.1 Host: http://localhost:8083 Accept: application/json { "connector.class": "io.debezium.connector.spanner.SpannerConnector", "gcp.spanner.project.id": "test-project", "gcp.spanner.instance.id": "test-instance", "gcp.spanner.database.id": "users", "gcp.spanner.change.stream": "changeStreamAll", "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }", "heartbeat.interval.ms": "100", "tasks.max": "10" }
Contoh respons yang berhasil:
HTTP/1.1 200 OK Content-Type: application/json { "connector.class": "io.debezium.connector.spanner.SpannerConnector", "tasks.max": "10", "gcp.spanner.project.id": "test-project", "gcp.spanner.instance.id": "test-instance", "gcp.spanner.database.id": "users", "gcp.spanner.change.stream": "changeStreamAll", "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }", "heartbeat.interval.ms": "100", "tasks.max": "10" }
Menghentikan konektor Kafka
Untuk menghentikan konektor, kirim perintah DELETE
ke layanan Kafka Connect yang sedang berjalan dengan nama konektor yang sama.
Anggaplah kita memiliki konektor yang berjalan dengan konfigurasi dari
bagian sebelumnya. Berikut adalah contoh perintah DELETE
:
DELETE /connectors/spanner-connector HTTP/1.1 Host: http://localhost:8083
Contoh respons yang berhasil:
HTTP/1.1 204 No Content
Memantau konektor Kafka
Selain metrik Kafka Connect dan Debezium standar, konektor Kafka mengekspor metriknya sendiri:
MilliSecondsLowWatermark
: Stempel waktu rendah saat ini dari tugas konektor dalam milidetik. Stempel waktu rendah menjelaskan waktu T saat konektor dijamin telah melakukan streaming semua peristiwa dengan stempel waktu < TMilliSecondsLowWatermarkLag
: Jeda stempel waktu rendah di belakang waktu saat ini dalam milidetik. mengalirkan semua peristiwa dengan stempel waktu < TLatencyLowWatermark<Variant>MilliSeconds
: Jeda stempel waktu rendah di belakang waktu saat ini dalam milidetik. Varian P50, P95, P99, Rata-rata, Min, dan Maks disediakan.LatencySpanner<Variant>MilliSeconds
: Latensi Spanner-commit-timestamp-to-connector-read. Varian P50, P95, P99, Rata-rata, Min, Maks disediakan.LatencyReadToEmit<Variant>MilliSeconds
: Latensi Spanner-read-timestamp-to-connector-emit. Varian P50, P95, P99, Rata-rata, Min, dan Maks disediakan.LatencyCommitToEmit<Variant>tMilliSeconds
: Latensi Spanner-commit-timestamp-to-connector-emit. Varian P50, P95, P99, Rata-rata, Min, dan Maks disediakan.LatencyCommitToPublish<Variant>MilliSeconds
: Latensi stempel waktu commit Spanner ke stempel waktu publikasi Kafka. Varian P50, P95, P99, Rata-rata, Min, Maks disediakan.NumberOfChangeStreamPartitionsDetected
: Jumlah total partisi yang terdeteksi oleh tugas konektor saat ini.NumberOfChangeStreamQueriesIssued
: Jumlah total kueri aliran perubahan yang dikeluarkan oleh tugas saat ini.NumberOfActiveChangeStreamQueries
: Jumlah kueri aliran perubahan aktif yang terdeteksi oleh tugas konektor saat ini.SpannerEventQueueCapacity
: Kapasitas totalStreamEventQueue
, antrean yang menyimpan elemen yang diterima dari kueri aliran perubahan.SpannerEventQueueCapacity
: KapasitasStreamEventQueue
yang tersisa.TaskStateChangeEventQueueCapacity
: Kapasitas totalTaskStateChangeEventQueue
, antrean yang menyimpan peristiwa yang terjadi di konektor.RemainingTaskStateChangeEventQueueCapacity
: KapasitasTaskStateChangeEventQueue
yang tersisa.NumberOfActiveChangeStreamQueries
: Jumlah kueri aliran perubahan aktif yang terdeteksi oleh tugas konektor saat ini.
Properti konfigurasi konektor Kafka
Berikut adalah properti konfigurasi yang diperlukan untuk konektor:
name
: Nama unik untuk konektor. Mencoba mendaftar lagi dengan nama yang sama akan menyebabkan kegagalan. Properti ini diperlukan oleh semua konektor Kafka Connect.connector.class
: Nama class Java untuk konektor. Selalu gunakan nilaiio.debezium.connector.spanner.SpannerConnector
untuk konektor Kafka.tasks.max
: Jumlah maksimum tugas yang harus dibuat untuk konektor ini.gcp.spanner.project.id
: ID projectgcp.spanner.instance.id
: ID instance Spannergcp.spanner.database.id
: ID database Spannergcp.spanner.change.stream
: Nama aliran data perubahan Spannergcp.spanner.credentials.json
: Objek JSON kunci akun layanan.gcp.spanner.credentials.path
: Jalur file ke objek JSON kunci akun layanan. Wajib jika kolom di atas tidak disediakan.gcp.spanner.database.role
: Peran database Spanner yang akan digunakan. Hal ini hanya diperlukan jika aliran perubahan diamankan dengan kontrol akses yang sangat terperinci. Peran database harus memiliki hak istimewaSELECT
pada aliran perubahan dan hak istimewaEXECUTE
pada fungsi baca aliran perubahan. Untuk mengetahui informasi selengkapnya, lihat Kontrol akses terperinci untuk aliran perubahan.
Properti konfigurasi lanjutan berikut memiliki setelan default yang berfungsi dalam sebagian besar situasi, sehingga jarang perlu ditentukan dalam konfigurasi konektor:
gcp.spanner.low-watermark.enabled
: Menunjukkan apakah watermark rendah diaktifkan untuk konektor. Defaultnya adalah "false".gcp.spanner.low-watermark.update-period.ms
: Interval saat stempel waktu rendah diperbarui. Default-nya adalah 1.000 md.heartbeat.interval.ms
: Interval detak jantung Spanner. Defaultnya adalah 300000 (lima menit).gcp.spanner.start.time
: Waktu mulai konektor. Defaultnya adalah waktu saat ini.gcp.spanner.end.time
: Waktu akhir konektor. Defaultnya adalah tak terbatas.tables.exclude.list
: Tabel yang akan mengecualikan peristiwa perubahan. Default-nya adalah kosong.tables.include.list
: Tabel yang akan menyertakan peristiwa perubahan. Jika tidak diisi, semua tabel akan disertakan. Default-nya adalah kosong.gcp.spanner.stream.event.queue.capacity
: Kapasitas antrean peristiwa Spanner. Setelan defaultnya adalah 10.000.connector.spanner.task.state.change.event.queue.capacity
: Kapasitas antrean peristiwa perubahan status tugas. Setelan defaultnya adalah 1.000.connector.spanner.max.missed.heartbeats
: Jumlah maksimum heartbeat yang terlewat untuk kueri aliran perubahan sebelum pengecualian ditampilkan. Setelan defaultnya adalah 10.scaler.monitor.enabled
: Menunjukkan apakah penskalaan otomatis tugas diaktifkan. Nilai defaultnya adalah salah (false).tasks.desired.partitions
: Jumlah partisi aliran perubahan yang diinginkan per tugas. Parameter ini diperlukan untuk penskalaan otomatis tugas. Setelan defaultnya adalah 2.tasks.min
: Jumlah minimum tugas. Parameter ini diperlukan untuk penskalaan otomatis tugas. Default-nya adalah 1.connector.spanner.sync.topic
: Nama untuk topik sinkronisasi, topik konektor internal yang digunakan untuk menyimpan komunikasi antartugas. Setelan default-nya adalah_sync_topic_spanner_connector_connectorname
jika pengguna tidak memberikan nama.connector.spanner.sync.poll.duration
: Durasi polling untuk topik sinkronisasi. Setelan defaultnya adalah 500 md.connector.spanner.sync.request.timeout.ms
: Waktu tunggu habis untuk permintaan ke topik sinkronisasi. Setelan defaultnya adalah 5.000 md.connector.spanner.sync.delivery.timeout.ms
: Waktu tunggu untuk memublikasikan ke topik sinkronisasi. Setelan defaultnya adalah 15.000 md.connector.spanner.sync.commit.offsets.interval.ms
: Interval saat offset di-commit untuk topik sinkronisasi. Setelan defaultnya adalah 60.000 md.connector.spanner.sync.publisher.wait.timeout
: Interval saat pesan dipublikasikan ke topik sinkronisasi. Defaultnya adalah 5 md.connector.spanner.rebalancing.topic
: Nama topik penyeimbangan ulang. Topik penyeimbangan ulang adalah topik konektor internal yang digunakan untuk menentukan status aktif tugas. Setelan default-nya adalah_rebalancing_topic_spanner_connector_connectorname
jika pengguna tidak memberikan nama.connector.spanner.rebalancing.poll.duration
: Durasi polling untuk topik penyeimbangan ulang. Setelan defaultnya adalah 5.000 md.connector.spanner.rebalancing.commit.offsets.timeout
: Waktu tunggu untuk melakukan offset untuk topik penyeimbangan ulang. Setelan defaultnya adalah 5.000 md.connector.spanner.rebalancing.commit.offsets.interval.ms
: Interval saat offset di-commit untuk topik sinkronisasi. Setelan defaultnya adalah 60.000 md.connector.spanner.rebalancing.task.waiting.timeout
: Durasi waktu tugas menunggu sebelum memproses peristiwa penyeimbangan ulang. Setelan defaultnya adalah 1.000 md.
Untuk mengetahui daftar properti konektor yang dapat dikonfigurasi secara lebih mendetail, lihat repositori GitHub.
Batasan
Konektor tidak mendukung streaming peristiwa snapshot.
Jika watermark diaktifkan di konektor, Anda tidak dapat mengonfigurasi transformasi pemilihan rute topik Debezium.