Halaman ini menjelaskan cara menggunakan konektor Kafka untuk menggunakan dan meneruskan data aliran perubahan Spanner.
Konsep inti
Berikut adalah penjelasan konsep inti konektor Kafka.
Debezium
Debezium adalah project open source yang menyediakan platform streaming data latensi rendah untuk pengambilan data perubahan.
Konektor Kafka
Konektor Kafka menyediakan abstraksi melalui Spanner API untuk memublikasikan aliran perubahan Spanner ke Kafka. Dengan konektor ini, Anda tidak perlu mengelola siklus proses partisi aliran perubahan, yang diperlukan saat menggunakan Spanner API secara langsung.
Konektor Kafka menghasilkan peristiwa perubahan untuk setiap mod catatan perubahan data dan mengirimkan downstream kumpulan data peristiwa perubahan ke topik Kafka terpisah untuk setiap tabel yang dilacak aliran data perubahan. Mod data perubahan data mewakili satu modifikasi (menyisipkan, memperbarui, atau menghapus) yang diambil. Satu kumpulan data perubahan data dapat berisi lebih dari satu mod.
Output konektor Kafka
Konektor Kafka meneruskan kumpulan data perubahan secara langsung ke topik Kafka yang terpisah. Nama topik output harus connector_name
.table_name
.
Jika topik tidak ada, konektor Kafka akan otomatis membuat topik dengan nama tersebut.
Anda juga dapat mengonfigurasi transformasi pemilihan rute topik untuk mengalihkan data ke topik yang Anda tentukan. Jika Anda ingin menggunakan pemilihan rute topik, nonaktifkan fungsi water watermark rendah.
Pengurutan data
Kumpulan data diurutkan berdasarkan stempel waktu commit per kunci utama dalam topik Kafka. Kumpulan data yang memiliki kunci utama yang berbeda tidak memiliki jaminan pengurutan. Kumpulan data dengan kunci utama yang sama disimpan di partisi topik Kafka yang sama. Jika ingin memproses seluruh transaksi, Anda juga dapat
menggunakan kolom server_transaction_id
dan number_of_records_in_transaction
data perubahan data
untuk menyusun transaksi Spanner.
Mengubah acara
Konektor Kafka menghasilkan peristiwa perubahan data untuk setiap operasi INSERT
, UPDATE
,
dan DELETE
. Setiap peristiwa berisi kunci dan nilai untuk baris yang diubah.
Anda dapat menggunakan pengonversi Kafka Connect untuk menghasilkan peristiwa perubahan data dalam
format Protobuf
, AVRO
, JSON
, atau JSON Schemaless
. Jika Anda menggunakan
pengonversi Kafka Connect yang menghasilkan skema, peristiwa akan berisi
skema terpisah untuk kunci dan nilai. Jika tidak, peristiwa hanya akan berisi kunci dan nilai.
Skema untuk kunci tidak pernah berubah. Skema untuk nilai adalah gabungan semua kolom yang dilacak aliran perubahan sejak waktu mulai konektor.
Jika Anda mengonfigurasi konektor untuk menghasilkan peristiwa JSON, peristiwa perubahan output berisi lima kolom:
Kolom
schema
pertama menentukan skema Kafka Connect yang mendeskripsikan skema kunci Spanner.Kolom
payload
pertama memiliki struktur yang dijelaskan oleh kolomschema
sebelumnya dan berisi kunci untuk baris yang diubah.Kolom
schema
kedua menentukan skema Kafka Connect yang menjelaskan skema untuk baris yang diubah.Kolom
payload
kedua memiliki struktur yang dijelaskan oleh kolomschema
sebelumnya dan berisi data aktual untuk baris yang diubah.Kolom
source
adalah kolom wajib yang menjelaskan metadata sumber untuk peristiwa.
Berikut adalah contoh peristiwa perubahan data:
{ // The schema for the Spanner key. "schema": { "type": "struct", "name": "customers.Key", "optional": false, "fields": [ { "type": "int64", "optional": "false" "field": "false" } ] }, // The value of the Spanner key. "payload": { "id": "1" }, // The schema for the payload, which contains the before and after values // of the changed row. The schema for the payload contains all the // columns that the change stream has tracked since the connector start // time. "schema": { "type": "struct", "fields": [ { // The schema for the before values of the changed row. "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "id" }, { "type": "string", "optional": true, "field": "first_name" } ], "optional": true, "name": "customers.Value", "field": "before" }, { // The schema for the after values of the changed row. "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "id" }, { "type": "string", "optional": false, "field": "first_name" } ], "optional": true, "name": "customers.Value", "field": "after" }, { // The schema for the source metadata for the event. "type": "struct", "fields": [ { "type": "string", "optional": false, "field": "version" }, { "type": "string", "optional": false, "field": "connector" }, { "type": "string", "optional": false, "field": "name" }, { "type": "int64", "optional": false, "field": "ts_ms" }, { "type": "boolean", "optional": true, "default": false, "field": "snapshot" }, { "type": "string", "optional": false, "field": "db" }, { "type": "string", "optional": false, "field": "sequence" }, { "type": "string", "optional": false, "field": "project_id" }, { "type": "string", "optional": false, "field": "instance_id" }, { "type": "string", "optional": false, "field": "database_id" }, { "type": "string", "optional": false, "field": "change_stream_name" }, { "type": "string", "optional": true, "field": "table" } { "type": "string", "optional": true, "field": "server_transaction_id" } { "type": "int64", "optional": true, "field": "low_watermark" } { "type": "int64", "optional": true, "field": "read_at_timestamp" } { "type": "int64", "optional": true, "field": "number_of_records_in_transaction" } { "type": "string", "optional": true, "field": "transaction_tag" } { "type": "boolean", "optional": true, "field": "system_transaction" } { "type": "string", "optional": true, "field": "value_capture_type" } { "type": "string", "optional": true, "field": "partition_token" } { "type": "int32", "optional": true, "field": "mod_number" } { "type": "boolean", "optional": true, "field": "is_last_record_in_transaction_in_partition" } { "type": "int64", "optional": true, "field": "number_of_partitions_in_transaction" } ], "optional": false, "name": "io.debezium.connector.spanner.Source", "field": "source" }, ] { "type": "string", "optional": false, "field": "op" }, { "type": "int64", "optional": true, "field": "ts_ms" } ], "optional": false, "name": "connector_name.customers.Envelope" }, "payload": { // The values of the row before the event. "before": null, // The values of the row after the event. "after": { "id": 1, "first_name": "Anne", } }, // The source metadata. "source": { "version": "{debezium-version}", "connector": "spanner", "name": "spanner_connector", "ts_ms": 1670955531785, "snapshot": "false", "db": "database", "sequence": "1", "project_id": "project", "instance_id": "instance", "database_id": "database", "change_stream_name": "change_stream", "table": "customers", "server_transaction_id": "transaction_id", "low_watermark": 1670955471635, "read_at_timestamp": 1670955531791, "number_records_in_transaction": 2, "transaction_tag": "", "system_transaction": false, "value_capture_type": "OLD_AND_NEW_VALUES", "partition_token": "partition_token", "mod_number": 0, "is_last_record_in_transaction_in_partition": true, "number_of_partitions_in_transaction": 1 }, "op": "c", "ts_ms": 1559033904863 // }
Watermark rendah
Watermark rendah menjelaskan waktu T saat konektor Kafka dijamin di-streaming dan memublikasikan semua peristiwa dengan stempel waktu < T ke topik Kafka.
Anda dapat mengaktifkan watermark rendah di konektor Kafka menggunakan parameter gcp.spanner.low-watermark.enabled
. Parameter ini dinonaktifkan
secara default. Jika watermark rendah diaktifkan, kolom low_watermark
di data perubahan data aliran data perubahan akan diisi dengan stempel waktu watermark rendah konektor Kafka saat ini.
Jika tidak ada kumpulan data yang dihasilkan, konektor Kafka akan mengirimkan watermark berkala "heartbeats" ke topik output Kafka yang terdeteksi oleh konektor.
Heartbeat watermark ini adalah catatan yang kosong kecuali untuk
kolom low_watermark
. Anda kemudian dapat menggunakan watermark rendah untuk melakukan agregasi berbasis waktu.
Misalnya, Anda dapat menggunakan watermark rendah untuk mengurutkan peristiwa dengan meng-commit stempel waktu di seluruh kunci utama.
Topik metadata
Konektor Kafka, serta framework Kafka Connect, membuat beberapa topik metadata untuk menyimpan informasi terkait konektor. Sebaiknya perubahan konfigurasi atau konten topik metadata ini tidak disarankan.
Berikut adalah topik metadata:
_consumer_offsets
: Topik yang dibuat secara otomatis oleh Kafka. Menyimpan offset konsumen untuk konsumen yang dibuat di konektor Kafka._kafka-connect-offsets
: Topik yang dibuat secara otomatis oleh Kafka Connect. Menyimpan offset konektor._sync_topic_spanner_connector_connectorname
: Topik yang dibuat secara otomatis oleh konektor. Menyimpan metadata terkait partisi streaming perubahan._rebalancing_topic_spanner_connector_connectorname
: Topik yang dibuat secara otomatis oleh konektor. Digunakan untuk menentukan keaktifan tugas konektor._debezium-heartbeat.connectorname
: Topik yang digunakan untuk memproses heartbeat streaming perubahan Spanner.
Runtime konektor Kafka
Berikut ini penjelasan tentang runtime konektor Kafka.
Skalabilitas
Konektor Kafka skalabel secara horizontal dan berjalan pada satu atau beberapa tugas yang tersebar di beberapa pekerja Kafka Connect.
Jaminan pengiriman pesan
Konektor Kafka mendukung jaminan pengiriman minimal satu kali.
Toleransi kesalahan
Konektor Kafka toleran terhadap kegagalan. Saat konektor Kafka membaca perubahan dan menghasilkan peristiwa, konektor Kafka akan mencatat stempel waktu commit terakhir yang diproses untuk setiap partisi aliran perubahan. Jika konektor Kafka berhenti karena alasan apa pun (termasuk kegagalan komunikasi, masalah jaringan, atau kegagalan software), setelah dimulai ulang, konektor Kafka akan melanjutkan streaming rekaman dari posisi terakhir yang terhenti.
Konektor Kafka membaca skema informasi pada stempel waktu awal konektor Kafka untuk mengambil informasi skema. Secara default, Spanner tidak dapat membaca skema informasi pada stempel waktu baca sebelum periode retensi versi, yang nilai defaultnya adalah satu jam. Jika ingin memulai konektor lebih awal dari satu jam sebelumnya, Anda harus meningkatkan periode retensi data versi database.
Menyiapkan konektor Kafka
Membuat aliran perubahan
Untuk mengetahui detail tentang cara membuat aliran perubahan, lihat Membuat aliran perubahan. Untuk melanjutkan ke langkah berikutnya, diperlukan instance Spanner dengan aliran data perubahan yang telah dikonfigurasi.
Perhatikan bahwa jika Anda ingin kolom yang diubah dan tidak berubah ditampilkan pada setiap peristiwa perubahan data, gunakan jenis pengambilan nilai NEW_ROW
. Untuk mengetahui informasi selengkapnya, lihat jenis pengambilan nilai.
Instal JAR konektor Kafka
Setelah Zookeeper, Kafka, dan Kafka Connect terinstal, tugas yang tersisa untuk men-deploy konektor Kafka adalah mendownload
arsip plugin konektor, mengekstrak file JAR ke lingkungan Kafka Connect, lalu menambahkan direktori dengan file JAR ke plugin.path
Kafka Connect.
Kemudian, Anda perlu memulai ulang proses Kafka Connect untuk mengambil file JAR baru.
Jika menggunakan container yang tidak dapat diubah, Anda dapat mengambil image dari image Container Debezium untuk Zookeeper, Kafka, dan Kafka Connect. Image Kafka Connect memiliki konektor Spanner yang telah diinstal sebelumnya.
Untuk mengetahui informasi selengkapnya tentang cara menginstal JAR konektor Kafka berbasis Debezium, lihat Menginstal Debezium.
Mengonfigurasi konektor Kafka
Berikut adalah contoh konfigurasi untuk konektor Kafka yang terhubung ke aliran data perubahan yang disebut changeStreamAll
di database users
di instance test-instance
dan project test-project
.
"name": "spanner-connector", "config": { "connector.class": "io.debezium.connector.spanner.SpannerConnector", "gcp.spanner.project.id": "test-project", "gcp.spanner.instance.id": "test-instance", "gcp.spanner.database.id": "users", "gcp.spanner.change.stream": "changeStreamAll", "gcp.spanner.credentials.json": "{"client_id": user@example.com}", "gcp.spanner.database.role": "cdc-role", "tasks.max": "10" }
Konfigurasi ini berisi hal-hal berikut:
Nama konektor saat didaftarkan ke layanan Kafka Connect.
Nama class konektor Spanner ini.
Project ID.
ID Instance Spanner.
ID Database Spanner.
Nama aliran data perubahan.
Objek JSON untuk kunci akun layanan.
(Opsional) Peran database Spanner yang akan digunakan.
Jumlah tugas maksimum.
Untuk daftar lengkap properti konektor, lihat Properti konfigurasi konektor Kafka.
Menambahkan konfigurasi konektor ke Kafka Connect
Untuk mulai menjalankan konektor Spanner:
Buat konfigurasi untuk konektor Spanner.
Gunakan Kafka Connect REST API untuk menambahkan konfigurasi konektor tersebut ke cluster Kafka Connect Anda.
Anda dapat mengirim konfigurasi ini dengan perintah POST
ke layanan Kafka Connect
yang sedang berjalan. Secara default, layanan Kafka Connect berjalan pada port 8083
.
Layanan mencatat konfigurasi dan memulai tugas konektor yang terhubung ke database Spanner dan mengalirkan data peristiwa perubahan ke topik Kafka.
Berikut adalah contoh perintah POST
:
POST /connectors HTTP/1.1 Host: http://localhost:8083 Accept: application/json { "name": "spanner-connector" "config": { "connector.class": "io.debezium.connector.spanner.SpannerConnector", "gcp.spanner.project.id": "test-project", "gcp.spanner.instance.id": "test-instance", "gcp.spanner.database.id": "users", "gcp.spanner.change.stream": "changeStreamAll", "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }", "heartbeat.interval.ms": "100", "tasks.max": "10" } }
Contoh respons yang berhasil:
HTTP/1.1 201 Created Content-Type: application/json { "name": "spanner-connector", "config": { "connector.class": "io.debezium.connector.spanner.SpannerConnector", "gcp.spanner.project.id": "test-project", "gcp.spanner.instance.id": "test-instance", "gcp.spanner.database.id": "users", "gcp.spanner.change.stream": "changeStreamAll", "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }", "heartbeat.interval.ms": "100", "tasks.max": "10" }, "tasks": [ { "connector": "spanner-connector", "task": 1 }, { "connector": "spanner-connector", "task": 2 }, { "connector": "spanner-connector", "task": 3 } ] }
Memperbarui konfigurasi konektor Kafka
Untuk memperbarui konfigurasi konektor, kirim perintah PUT
ke layanan Kafka Connect yang sedang berjalan dengan nama konektor yang sama.
Asumsikan bahwa kita memiliki konektor yang berjalan dengan konfigurasi dari bagian sebelumnya. Berikut adalah contoh perintah PUT
:
PUT /connectors/spanner-connector/config HTTP/1.1 Host: http://localhost:8083 Accept: application/json { "connector.class": "io.debezium.connector.spanner.SpannerConnector", "gcp.spanner.project.id": "test-project", "gcp.spanner.instance.id": "test-instance", "gcp.spanner.database.id": "users", "gcp.spanner.change.stream": "changeStreamAll", "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }", "heartbeat.interval.ms": "100", "tasks.max": "10" }
Contoh respons yang berhasil:
HTTP/1.1 200 OK Content-Type: application/json { "connector.class": "io.debezium.connector.spanner.SpannerConnector", "tasks.max": "10", "gcp.spanner.project.id": "test-project", "gcp.spanner.instance.id": "test-instance", "gcp.spanner.database.id": "users", "gcp.spanner.change.stream": "changeStreamAll", "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }", "heartbeat.interval.ms": "100", "tasks.max": "10" }
Menghentikan konektor Kafka
Untuk menghentikan konektor, kirim perintah DELETE
ke layanan Kafka Connect yang sedang berjalan dengan nama konektor yang sama.
Asumsikan bahwa kita memiliki konektor yang berjalan dengan konfigurasi dari bagian sebelumnya. Berikut adalah contoh perintah DELETE
:
DELETE /connectors/spanner-connector HTTP/1.1 Host: http://localhost:8083
Contoh respons yang berhasil:
HTTP/1.1 204 No Content
Memantau konektor Kafka
Selain metrik Kafka Connect dan Debezium standar, konektor Kafka mengekspor metriknya sendiri:
MilliSecondsLowWatermark
: Watermark rendah saat ini dari tugas konektor dalam milidetik. Watermark rendah menjelaskan waktu saat konektor dijamin telah menstreaming semua peristiwa dengan stempel waktu < TMilliSecondsLowWatermarkLag
: Jeda watermark rendah di belakang waktu saat ini dalam milidetik. menyiarkan semua peristiwa dengan stempel waktu < TLatencyLowWatermark<Variant>MilliSeconds
: Jeda watermark rendah di belakang waktu saat ini dalam milidetik. Varian P50, P95, P99, Rata-rata, Min, dan Max disediakan.LatencySpanner<Variant>MilliSeconds
: Latensi Spanner-commit-timestamp-to-connector-read. Varian P50, P95, P99, Rata-rata, Min, Max disediakan.LatencyReadToEmit<Variant>MilliSeconds
: Latensi Spanner-read-timestamp-to-connector-emit. Varian P50, P95, P99, Rata-rata, Min, dan Max disediakan.LatencyCommitToEmit<Variant>tMilliSeconds
: Latensi Spanner-commit-timestamp-to-connector-emit. Varian P50, P95, P99, Rata-rata, Min, dan Max disediakan.LatencyCommitToPublish<Variant>MilliSeconds
: Latensi Spanner-commit-timestamp-to Kafka-publish-timestamp. Varian P50, P95, P99, Rata-rata, Min, Max disediakan.NumberOfChangeStreamPartitionsDetected
: Jumlah total partisi yang terdeteksi oleh tugas konektor saat ini.NumberOfChangeStreamQueriesIssued
: Jumlah total kueri aliran data perubahan yang dikeluarkan oleh tugas saat ini.NumberOfActiveChangeStreamQueries
: Jumlah aktif dari kueri aliran data perubahan yang terdeteksi oleh tugas konektor saat ini.SpannerEventQueueCapacity
: Kapasitas totalStreamEventQueue
, antrean yang menyimpan elemen yang diterima dari kueri aliran data perubahan.SpannerEventQueueCapacity
: KapasitasStreamEventQueue
yang tersisa.TaskStateChangeEventQueueCapacity
: Total kapasitasTaskStateChangeEventQueue
, antrean yang menyimpan peristiwa yang terjadi di konektor.RemainingTaskStateChangeEventQueueCapacity
: KapasitasTaskStateChangeEventQueue
yang tersisa.NumberOfActiveChangeStreamQueries
: Jumlah aktif dari kueri aliran data perubahan yang terdeteksi oleh tugas konektor saat ini.
Properti konfigurasi konektor Kafka
Berikut adalah properti konfigurasi yang diperlukan untuk konektor:
name
: Nama unik untuk konektor. Mencoba mendaftar lagi dengan nama yang sama akan menyebabkan kegagalan. Properti ini diwajibkan oleh semua konektor Kafka Connect.connector.class
: Nama class Java untuk konektor. Selalu gunakan nilaiio.debezium.connector.spanner.SpannerConnector
untuk konektor Kafka.tasks.max
: Jumlah maksimum tugas yang harus dibuat untuk konektor ini.gcp.spanner.project.id
: ID projectgcp.spanner.instance.id
: ID instance Spannergcp.spanner.database.id
: ID database Spannergcp.spanner.change.stream
: Nama aliran data perubahan Spannergcp.spanner.credentials.json
: Objek JSON kunci akun layanan.gcp.spanner.credentials.path
: Jalur file ke objek JSON kunci akun layanan. Wajib diisi jika kolom di atas tidak ada.gcp.spanner.database.role
: Peran database Spanner yang akan digunakan. Hal ini diperlukan hanya jika aliran data perubahan diamankan dengan kontrol akses yang mendetail. Peran database harus memiliki hak istimewaSELECT
di aliran perubahan dan hak istimewaEXECUTE
pada fungsi baca aliran perubahan. Untuk mengetahui informasi selengkapnya, lihat Kontrol akses terperinci untuk aliran perubahan.
Properti konfigurasi lanjutan berikut memiliki default yang berfungsi di sebagian besar situasi sehingga jarang perlu ditentukan dalam konfigurasi konektor:
gcp.spanner.low-watermark.enabled
: Menunjukkan apakah watermark rendah diaktifkan untuk konektor. Defaultnya adalah false.gcp.spanner.low-watermark.update-period.ms
: Interval saat watermark rendah diperbarui. Nilai defaultnya adalah 1.000 md.heartbeat.interval.ms
: Interval detak jantung Spanner. Defaultnya adalah 300.000 (lima menit).gcp.spanner.start.time
: Waktu mulai konektor. Default-nya adalah waktu saat ini.gcp.spanner.end.time
: Waktu berakhir konektor. Nilai defaultnya adalah tak terbatas.tables.exclude.list
: Tabel untuk mengecualikan peristiwa perubahannya. Nilai defaultnya adalah kosong.tables.include.list
: Tabel untuk menyertakan peristiwa perubahan. Jika tidak diisi, semua tabel akan disertakan. Nilai defaultnya adalah kosong.gcp.spanner.stream.event.queue.capacity
: Kapasitas antrean peristiwa Spanner. Default-nya adalah 10.000.connector.spanner.task.state.change.event.queue.capacity
: Kapasitas antrean peristiwa perubahan status tugas. Setelan defaultnya adalah 1.000.connector.spanner.max.missed.heartbeats
: Jumlah maksimum detak jantung yang terlewat untuk kueri streaming perubahan sebelum pengecualian ditampilkan. Setelan defaultnya adalah 10.scaler.monitor.enabled
: Menunjukkan apakah penskalaan otomatis tugas diaktifkan. Nilai defaultnya adalah false (salah).tasks.desired.partitions
: Jumlah partisi aliran perubahan yang diinginkan per tugas. Parameter ini diperlukan untuk penskalaan otomatis tugas. Default-nya adalah 2.tasks.min
: Jumlah tugas minimum. Parameter ini diperlukan untuk penskalaan otomatis tugas. Default-nya adalah 1.connector.spanner.sync.topic
: Nama untuk topik sinkronisasi, topik konektor internal yang digunakan untuk menyimpan komunikasi antartugas. Secara default ke_sync_topic_spanner_connector_connectorname
jika pengguna tidak memberi nama.connector.spanner.sync.poll.duration
: Durasi polling untuk topik sinkronisasi. Default-nya adalah 500 md.connector.spanner.sync.request.timeout.ms
: Waktu tunggu untuk permintaan ke topik sinkronisasi. Nilai defaultnya adalah 5.000 md.connector.spanner.sync.delivery.timeout.ms
: Waktu tunggu untuk memublikasikan ke topik sinkronisasi. Setelan defaultnya adalah 15.000 md.connector.spanner.sync.commit.offsets.interval.ms
: Interval saat offset dilakukan untuk topik sinkronisasi. Setelan defaultnya adalah 60.000 md.connector.spanner.sync.publisher.wait.timeout
: Interval saat pesan dipublikasikan ke topik sinkronisasi. Nilai defaultnya adalah 5 md.connector.spanner.rebalancing.topic
: Nama topik penyeimbangan ulang. Topik penyeimbangan ulang adalah topik konektor internal yang digunakan untuk menentukan keaktifan tugas. Secara default ke_rebalancing_topic_spanner_connector_connectorname
jika pengguna tidak memberi nama.connector.spanner.rebalancing.poll.duration
: Durasi polling untuk penyeimbangan ulang topik. Nilai defaultnya adalah 5.000 md.connector.spanner.rebalancing.commit.offsets.timeout
: Waktu tunggu guna melakukan offset untuk topik penyeimbangan ulang. Nilai defaultnya adalah 5.000 md.connector.spanner.rebalancing.commit.offsets.interval.ms
: Interval saat offset dilakukan untuk topik sinkronisasi. Setelan defaultnya adalah 60.000 md.connector.spanner.rebalancing.task.waiting.timeout
: Durasi waktu tunggu tugas sebelum memproses peristiwa penyeimbangan ulang. Setelan defaultnya adalah 1.000 md.
Untuk mengetahui daftar properti konektor yang dapat dikonfigurasi secara lebih mendetail, lihat repositori GitHub.
Batasan
Konektor tidak mendukung streaming peristiwa snapshot.
Jika watermark diaktifkan di konektor, Anda tidak dapat mengonfigurasi Transformasi perutean topik Debezium.