Membangun koneksi aliran perubahan ke Kafka

Halaman ini menjelaskan cara menggunakan konektor Kafka untuk menggunakan dan meneruskan data aliran perubahan Spanner.

Konsep inti

Berikut adalah penjelasan konsep inti konektor Kafka.

Debezium

Debezium adalah project open source yang menyediakan platform streaming data latensi rendah untuk pengambilan data perubahan.

Konektor Kafka

Konektor Kafka menyediakan abstraksi melalui Spanner API untuk memublikasikan aliran perubahan Spanner ke Kafka. Dengan konektor ini, Anda tidak perlu mengelola siklus proses partisi aliran perubahan, yang diperlukan saat menggunakan Spanner API secara langsung.

Konektor Kafka menghasilkan peristiwa perubahan untuk setiap mod catatan perubahan data dan mengirimkan downstream kumpulan data peristiwa perubahan ke topik Kafka terpisah untuk setiap tabel yang dilacak aliran data perubahan. Mod data perubahan data mewakili satu modifikasi (menyisipkan, memperbarui, atau menghapus) yang diambil. Satu kumpulan data perubahan data dapat berisi lebih dari satu mod.

Output konektor Kafka

Konektor Kafka meneruskan kumpulan data perubahan secara langsung ke topik Kafka yang terpisah. Nama topik output harus connector_name.table_name. Jika topik tidak ada, konektor Kafka akan otomatis membuat topik dengan nama tersebut.

Anda juga dapat mengonfigurasi transformasi pemilihan rute topik untuk mengalihkan data ke topik yang Anda tentukan. Jika Anda ingin menggunakan pemilihan rute topik, nonaktifkan fungsi water watermark rendah.

Pengurutan data

Kumpulan data diurutkan berdasarkan stempel waktu commit per kunci utama dalam topik Kafka. Kumpulan data yang memiliki kunci utama yang berbeda tidak memiliki jaminan pengurutan. Kumpulan data dengan kunci utama yang sama disimpan di partisi topik Kafka yang sama. Jika ingin memproses seluruh transaksi, Anda juga dapat menggunakan kolom server_transaction_id dan number_of_records_in_transaction data perubahan data untuk menyusun transaksi Spanner.

Mengubah acara

Konektor Kafka menghasilkan peristiwa perubahan data untuk setiap operasi INSERT, UPDATE, dan DELETE. Setiap peristiwa berisi kunci dan nilai untuk baris yang diubah.

Anda dapat menggunakan pengonversi Kafka Connect untuk menghasilkan peristiwa perubahan data dalam format Protobuf, AVRO, JSON, atau JSON Schemaless. Jika Anda menggunakan pengonversi Kafka Connect yang menghasilkan skema, peristiwa akan berisi skema terpisah untuk kunci dan nilai. Jika tidak, peristiwa hanya akan berisi kunci dan nilai.

Skema untuk kunci tidak pernah berubah. Skema untuk nilai adalah gabungan semua kolom yang dilacak aliran perubahan sejak waktu mulai konektor.

Jika Anda mengonfigurasi konektor untuk menghasilkan peristiwa JSON, peristiwa perubahan output berisi lima kolom:

  • Kolom schema pertama menentukan skema Kafka Connect yang mendeskripsikan skema kunci Spanner.

  • Kolom payload pertama memiliki struktur yang dijelaskan oleh kolom schema sebelumnya dan berisi kunci untuk baris yang diubah.

  • Kolom schema kedua menentukan skema Kafka Connect yang menjelaskan skema untuk baris yang diubah.

  • Kolom payload kedua memiliki struktur yang dijelaskan oleh kolom schema sebelumnya dan berisi data aktual untuk baris yang diubah.

  • Kolom source adalah kolom wajib yang menjelaskan metadata sumber untuk peristiwa.

Berikut adalah contoh peristiwa perubahan data:

{
  // The schema for the Spanner key.
  "schema": {
    "type": "struct",
    "name": "customers.Key",
    "optional": false,
    "fields": [
      {
        "type": "int64",
        "optional": "false"
        "field": "false"
      }
    ]
  },
  // The value of the Spanner key.
  "payload": {
      "id": "1"
  },
  // The schema for the payload, which contains the before and after values
  // of the changed row. The schema for the payload contains all the
  // columns that the change stream has tracked since the connector start
  // time.
  "schema": {
    "type": "struct",
    "fields": [
      {
        // The schema for the before values of the changed row.
        "type": "struct",
        "fields": [
            {
                "type": "int32",
                "optional": false,
                "field": "id"
            },
            {
                "type": "string",
                "optional": true,
                "field": "first_name"
            }
        ],
        "optional": true,
        "name": "customers.Value",
        "field": "before"
      },
      {
        // The schema for the after values of the changed row.
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          }
        ],
          "optional": true,
          "name": "customers.Value",
          "field": "after"
        },
        {
          // The schema for the source metadata for the event.
          "type": "struct",
          "fields": [
            {
                "type": "string",
                "optional": false,
                "field": "version"
            },
            {
                "type": "string",
                "optional": false,
                "field": "connector"
            },
            {
                "type": "string",
                "optional": false,
                "field": "name"
            },
            {
                "type": "int64",
                "optional": false,
                "field": "ts_ms"
            },
            {
                "type": "boolean",
                "optional": true,
                "default": false,
                "field": "snapshot"
            },
            {
                "type": "string",
                "optional": false,
                "field": "db"
            },
            {
                "type": "string",
                "optional": false,
                "field": "sequence"
            },
            {
                "type": "string",
                "optional": false,
                "field": "project_id"
            },
            {
                "type": "string",
                "optional": false,
                "field": "instance_id"
            },
            {
                "type": "string",
                "optional": false,
                "field": "database_id"
            },
            {
                "type": "string",
                "optional": false,
                "field": "change_stream_name"
            },
            {
                "type": "string",
                "optional": true,
                "field": "table"
            }
            {
                "type": "string",
                "optional": true,
                "field": "server_transaction_id"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "low_watermark"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "read_at_timestamp"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "number_of_records_in_transaction"
            }
            {
                "type": "string",
                "optional": true,
                "field": "transaction_tag"
            }
            {
                "type": "boolean",
                "optional": true,
                "field": "system_transaction"
            }
            {
                "type": "string",
                "optional": true,
                "field": "value_capture_type"
            }
            {
                "type": "string",
                "optional": true,
                "field": "partition_token"
            }
            {
                "type": "int32",
                "optional": true,
                "field": "mod_number"
            }
            {
                "type": "boolean",
                "optional": true,
                "field": "is_last_record_in_transaction_in_partition"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "number_of_partitions_in_transaction"
            }
          ],
          "optional": false,
          "name": "io.debezium.connector.spanner.Source",
          "field": "source"
        },
      ]
      {
        "type": "string",
        "optional": false,
        "field": "op"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
      }
    ],
    "optional": false,
    "name": "connector_name.customers.Envelope"
  },
  "payload": {
    // The values of the row before the event.
    "before": null,
    // The values of the row after the event.
    "after": {
        "id": 1,
        "first_name": "Anne",
    }
  },
  // The source metadata.
  "source": {
    "version": "{debezium-version}",
    "connector": "spanner",
    "name": "spanner_connector",
    "ts_ms": 1670955531785,
    "snapshot": "false",
    "db": "database",
    "sequence": "1",
    "project_id": "project",
    "instance_id": "instance",
    "database_id": "database",
    "change_stream_name": "change_stream",
    "table": "customers",
    "server_transaction_id": "transaction_id",
    "low_watermark": 1670955471635,
    "read_at_timestamp": 1670955531791,
    "number_records_in_transaction": 2,
    "transaction_tag": "",
    "system_transaction": false,
    "value_capture_type": "OLD_AND_NEW_VALUES",
    "partition_token": "partition_token",
    "mod_number": 0,
    "is_last_record_in_transaction_in_partition": true,
    "number_of_partitions_in_transaction": 1
  },
  "op": "c",
  "ts_ms": 1559033904863 //
}

Watermark rendah

Watermark rendah menjelaskan waktu T saat konektor Kafka dijamin di-streaming dan memublikasikan semua peristiwa dengan stempel waktu < T ke topik Kafka.

Anda dapat mengaktifkan watermark rendah di konektor Kafka menggunakan parameter gcp.spanner.low-watermark.enabled. Parameter ini dinonaktifkan secara default. Jika watermark rendah diaktifkan, kolom low_watermark di data perubahan data aliran data perubahan akan diisi dengan stempel waktu watermark rendah konektor Kafka saat ini.

Jika tidak ada kumpulan data yang dihasilkan, konektor Kafka akan mengirimkan watermark berkala "heartbeats" ke topik output Kafka yang terdeteksi oleh konektor.

Heartbeat watermark ini adalah catatan yang kosong kecuali untuk kolom low_watermark. Anda kemudian dapat menggunakan watermark rendah untuk melakukan agregasi berbasis waktu. Misalnya, Anda dapat menggunakan watermark rendah untuk mengurutkan peristiwa dengan meng-commit stempel waktu di seluruh kunci utama.

Topik metadata

Konektor Kafka, serta framework Kafka Connect, membuat beberapa topik metadata untuk menyimpan informasi terkait konektor. Sebaiknya perubahan konfigurasi atau konten topik metadata ini tidak disarankan.

Berikut adalah topik metadata:

  • _consumer_offsets: Topik yang dibuat secara otomatis oleh Kafka. Menyimpan offset konsumen untuk konsumen yang dibuat di konektor Kafka.
  • _kafka-connect-offsets: Topik yang dibuat secara otomatis oleh Kafka Connect. Menyimpan offset konektor.
  • _sync_topic_spanner_connector_connectorname: Topik yang dibuat secara otomatis oleh konektor. Menyimpan metadata terkait partisi streaming perubahan.
  • _rebalancing_topic_spanner_connector_connectorname: Topik yang dibuat secara otomatis oleh konektor. Digunakan untuk menentukan keaktifan tugas konektor.
  • _debezium-heartbeat.connectorname: Topik yang digunakan untuk memproses heartbeat streaming perubahan Spanner.

Runtime konektor Kafka

Berikut ini penjelasan tentang runtime konektor Kafka.

Skalabilitas

Konektor Kafka skalabel secara horizontal dan berjalan pada satu atau beberapa tugas yang tersebar di beberapa pekerja Kafka Connect.

Jaminan pengiriman pesan

Konektor Kafka mendukung jaminan pengiriman minimal satu kali.

Toleransi kesalahan

Konektor Kafka toleran terhadap kegagalan. Saat konektor Kafka membaca perubahan dan menghasilkan peristiwa, konektor Kafka akan mencatat stempel waktu commit terakhir yang diproses untuk setiap partisi aliran perubahan. Jika konektor Kafka berhenti karena alasan apa pun (termasuk kegagalan komunikasi, masalah jaringan, atau kegagalan software), setelah dimulai ulang, konektor Kafka akan melanjutkan streaming rekaman dari posisi terakhir yang terhenti.

Konektor Kafka membaca skema informasi pada stempel waktu awal konektor Kafka untuk mengambil informasi skema. Secara default, Spanner tidak dapat membaca skema informasi pada stempel waktu baca sebelum periode retensi versi, yang nilai defaultnya adalah satu jam. Jika ingin memulai konektor lebih awal dari satu jam sebelumnya, Anda harus meningkatkan periode retensi data versi database.

Menyiapkan konektor Kafka

Membuat aliran perubahan

Untuk mengetahui detail tentang cara membuat aliran perubahan, lihat Membuat aliran perubahan. Untuk melanjutkan ke langkah berikutnya, diperlukan instance Spanner dengan aliran data perubahan yang telah dikonfigurasi.

Perhatikan bahwa jika Anda ingin kolom yang diubah dan tidak berubah ditampilkan pada setiap peristiwa perubahan data, gunakan jenis pengambilan nilai NEW_ROW. Untuk mengetahui informasi selengkapnya, lihat jenis pengambilan nilai.

Instal JAR konektor Kafka

Setelah Zookeeper, Kafka, dan Kafka Connect terinstal, tugas yang tersisa untuk men-deploy konektor Kafka adalah mendownload arsip plugin konektor, mengekstrak file JAR ke lingkungan Kafka Connect, lalu menambahkan direktori dengan file JAR ke plugin.pathKafka Connect. Kemudian, Anda perlu memulai ulang proses Kafka Connect untuk mengambil file JAR baru.

Jika menggunakan container yang tidak dapat diubah, Anda dapat mengambil image dari image Container Debezium untuk Zookeeper, Kafka, dan Kafka Connect. Image Kafka Connect memiliki konektor Spanner yang telah diinstal sebelumnya.

Untuk mengetahui informasi selengkapnya tentang cara menginstal JAR konektor Kafka berbasis Debezium, lihat Menginstal Debezium.

Mengonfigurasi konektor Kafka

Berikut adalah contoh konfigurasi untuk konektor Kafka yang terhubung ke aliran data perubahan yang disebut changeStreamAll di database users di instance test-instance dan project test-project.

"name": "spanner-connector",
"config": {
    "connector.class": "io.debezium.connector.spanner.SpannerConnector",
    "gcp.spanner.project.id": "test-project",
    "gcp.spanner.instance.id": "test-instance",
    "gcp.spanner.database.id": "users",
    "gcp.spanner.change.stream": "changeStreamAll",
    "gcp.spanner.credentials.json": "{"client_id": user@example.com}",
    "gcp.spanner.database.role": "cdc-role",
    "tasks.max": "10"
}

Konfigurasi ini berisi hal-hal berikut:

  • Nama konektor saat didaftarkan ke layanan Kafka Connect.

  • Nama class konektor Spanner ini.

  • Project ID.

  • ID Instance Spanner.

  • ID Database Spanner.

  • Nama aliran data perubahan.

  • Objek JSON untuk kunci akun layanan.

  • (Opsional) Peran database Spanner yang akan digunakan.

  • Jumlah tugas maksimum.

Untuk daftar lengkap properti konektor, lihat Properti konfigurasi konektor Kafka.

Menambahkan konfigurasi konektor ke Kafka Connect

Untuk mulai menjalankan konektor Spanner:

  1. Buat konfigurasi untuk konektor Spanner.

  2. Gunakan Kafka Connect REST API untuk menambahkan konfigurasi konektor tersebut ke cluster Kafka Connect Anda.

Anda dapat mengirim konfigurasi ini dengan perintah POST ke layanan Kafka Connect yang sedang berjalan. Secara default, layanan Kafka Connect berjalan pada port 8083. Layanan mencatat konfigurasi dan memulai tugas konektor yang terhubung ke database Spanner dan mengalirkan data peristiwa perubahan ke topik Kafka.

Berikut adalah contoh perintah POST:

POST /connectors HTTP/1.1
Host: http://localhost:8083
Accept: application/json
{
  "name": "spanner-connector"
  "config": {
      "connector.class": "io.debezium.connector.spanner.SpannerConnector",
      "gcp.spanner.project.id": "test-project",
      "gcp.spanner.instance.id": "test-instance",
      "gcp.spanner.database.id": "users",
      "gcp.spanner.change.stream": "changeStreamAll",
      "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
      "heartbeat.interval.ms": "100",
      "tasks.max": "10"
  }
}

Contoh respons yang berhasil:

HTTP/1.1 201 Created
Content-Type: application/json
{
    "name": "spanner-connector",
    "config": {
        "connector.class": "io.debezium.connector.spanner.SpannerConnector",
        "gcp.spanner.project.id": "test-project",
        "gcp.spanner.instance.id": "test-instance",
        "gcp.spanner.database.id": "users",
        "gcp.spanner.change.stream": "changeStreamAll",
        "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
        "heartbeat.interval.ms": "100",
        "tasks.max": "10"
    },
    "tasks": [
        { "connector": "spanner-connector", "task": 1 },
        { "connector": "spanner-connector", "task": 2 },
        { "connector": "spanner-connector", "task": 3 }
    ]
}

Memperbarui konfigurasi konektor Kafka

Untuk memperbarui konfigurasi konektor, kirim perintah PUT ke layanan Kafka Connect yang sedang berjalan dengan nama konektor yang sama.

Asumsikan bahwa kita memiliki konektor yang berjalan dengan konfigurasi dari bagian sebelumnya. Berikut adalah contoh perintah PUT:

PUT /connectors/spanner-connector/config HTTP/1.1
Host: http://localhost:8083
Accept: application/json
{
    "connector.class": "io.debezium.connector.spanner.SpannerConnector",
    "gcp.spanner.project.id": "test-project",
    "gcp.spanner.instance.id": "test-instance",
    "gcp.spanner.database.id": "users",
    "gcp.spanner.change.stream": "changeStreamAll",
    "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
    "heartbeat.interval.ms": "100",
    "tasks.max": "10"
}

Contoh respons yang berhasil:

HTTP/1.1 200 OK
Content-Type: application/json
{
    "connector.class": "io.debezium.connector.spanner.SpannerConnector",
    "tasks.max": "10",
    "gcp.spanner.project.id": "test-project",
    "gcp.spanner.instance.id": "test-instance",
    "gcp.spanner.database.id": "users",
    "gcp.spanner.change.stream": "changeStreamAll",
    "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
    "heartbeat.interval.ms": "100",
    "tasks.max": "10"
}

Menghentikan konektor Kafka

Untuk menghentikan konektor, kirim perintah DELETE ke layanan Kafka Connect yang sedang berjalan dengan nama konektor yang sama.

Asumsikan bahwa kita memiliki konektor yang berjalan dengan konfigurasi dari bagian sebelumnya. Berikut adalah contoh perintah DELETE:

DELETE /connectors/spanner-connector HTTP/1.1
Host: http://localhost:8083

Contoh respons yang berhasil:

HTTP/1.1 204 No Content

Memantau konektor Kafka

Selain metrik Kafka Connect dan Debezium standar, konektor Kafka mengekspor metriknya sendiri:

  • MilliSecondsLowWatermark: Watermark rendah saat ini dari tugas konektor dalam milidetik. Watermark rendah menjelaskan waktu saat konektor dijamin telah menstreaming semua peristiwa dengan stempel waktu < T

  • MilliSecondsLowWatermarkLag: Jeda watermark rendah di belakang waktu saat ini dalam milidetik. menyiarkan semua peristiwa dengan stempel waktu < T

  • LatencyLowWatermark<Variant>MilliSeconds: Jeda watermark rendah di belakang waktu saat ini dalam milidetik. Varian P50, P95, P99, Rata-rata, Min, dan Max disediakan.

  • LatencySpanner<Variant>MilliSeconds: Latensi Spanner-commit-timestamp-to-connector-read. Varian P50, P95, P99, Rata-rata, Min, Max disediakan.

  • LatencyReadToEmit<Variant>MilliSeconds: Latensi Spanner-read-timestamp-to-connector-emit. Varian P50, P95, P99, Rata-rata, Min, dan Max disediakan.

  • LatencyCommitToEmit<Variant>tMilliSeconds: Latensi Spanner-commit-timestamp-to-connector-emit. Varian P50, P95, P99, Rata-rata, Min, dan Max disediakan.

  • LatencyCommitToPublish<Variant>MilliSeconds: Latensi Spanner-commit-timestamp-to Kafka-publish-timestamp. Varian P50, P95, P99, Rata-rata, Min, Max disediakan.

  • NumberOfChangeStreamPartitionsDetected: Jumlah total partisi yang terdeteksi oleh tugas konektor saat ini.

  • NumberOfChangeStreamQueriesIssued: Jumlah total kueri aliran data perubahan yang dikeluarkan oleh tugas saat ini.

  • NumberOfActiveChangeStreamQueries: Jumlah aktif dari kueri aliran data perubahan yang terdeteksi oleh tugas konektor saat ini.

  • SpannerEventQueueCapacity: Kapasitas total StreamEventQueue, antrean yang menyimpan elemen yang diterima dari kueri aliran data perubahan.

  • SpannerEventQueueCapacity: Kapasitas StreamEventQueue yang tersisa.

  • TaskStateChangeEventQueueCapacity: Total kapasitas TaskStateChangeEventQueue, antrean yang menyimpan peristiwa yang terjadi di konektor.

  • RemainingTaskStateChangeEventQueueCapacity: Kapasitas TaskStateChangeEventQueue yang tersisa.

  • NumberOfActiveChangeStreamQueries: Jumlah aktif dari kueri aliran data perubahan yang terdeteksi oleh tugas konektor saat ini.

Properti konfigurasi konektor Kafka

Berikut adalah properti konfigurasi yang diperlukan untuk konektor:

  • name: Nama unik untuk konektor. Mencoba mendaftar lagi dengan nama yang sama akan menyebabkan kegagalan. Properti ini diwajibkan oleh semua konektor Kafka Connect.

  • connector.class: Nama class Java untuk konektor. Selalu gunakan nilai io.debezium.connector.spanner.SpannerConnector untuk konektor Kafka.

  • tasks.max: Jumlah maksimum tugas yang harus dibuat untuk konektor ini.

  • gcp.spanner.project.id: ID project

  • gcp.spanner.instance.id: ID instance Spanner

  • gcp.spanner.database.id: ID database Spanner

  • gcp.spanner.change.stream: Nama aliran data perubahan Spanner

  • gcp.spanner.credentials.json: Objek JSON kunci akun layanan.

  • gcp.spanner.credentials.path: Jalur file ke objek JSON kunci akun layanan. Wajib diisi jika kolom di atas tidak ada.

  • gcp.spanner.database.role : Peran database Spanner yang akan digunakan. Hal ini diperlukan hanya jika aliran data perubahan diamankan dengan kontrol akses yang mendetail. Peran database harus memiliki hak istimewa SELECT di aliran perubahan dan hak istimewa EXECUTE pada fungsi baca aliran perubahan. Untuk mengetahui informasi selengkapnya, lihat Kontrol akses terperinci untuk aliran perubahan.

Properti konfigurasi lanjutan berikut memiliki default yang berfungsi di sebagian besar situasi sehingga jarang perlu ditentukan dalam konfigurasi konektor:

  • gcp.spanner.low-watermark.enabled: Menunjukkan apakah watermark rendah diaktifkan untuk konektor. Defaultnya adalah false.

  • gcp.spanner.low-watermark.update-period.ms: Interval saat watermark rendah diperbarui. Nilai defaultnya adalah 1.000 md.

  • heartbeat.interval.ms: Interval detak jantung Spanner. Defaultnya adalah 300.000 (lima menit).

  • gcp.spanner.start.time: Waktu mulai konektor. Default-nya adalah waktu saat ini.

  • gcp.spanner.end.time: Waktu berakhir konektor. Nilai defaultnya adalah tak terbatas.

  • tables.exclude.list: Tabel untuk mengecualikan peristiwa perubahannya. Nilai defaultnya adalah kosong.

  • tables.include.list: Tabel untuk menyertakan peristiwa perubahan. Jika tidak diisi, semua tabel akan disertakan. Nilai defaultnya adalah kosong.

  • gcp.spanner.stream.event.queue.capacity: Kapasitas antrean peristiwa Spanner. Default-nya adalah 10.000.

  • connector.spanner.task.state.change.event.queue.capacity: Kapasitas antrean peristiwa perubahan status tugas. Setelan defaultnya adalah 1.000.

  • connector.spanner.max.missed.heartbeats: Jumlah maksimum detak jantung yang terlewat untuk kueri streaming perubahan sebelum pengecualian ditampilkan. Setelan defaultnya adalah 10.

  • scaler.monitor.enabled: Menunjukkan apakah penskalaan otomatis tugas diaktifkan. Nilai defaultnya adalah false (salah).

  • tasks.desired.partitions: Jumlah partisi aliran perubahan yang diinginkan per tugas. Parameter ini diperlukan untuk penskalaan otomatis tugas. Default-nya adalah 2.

  • tasks.min: Jumlah tugas minimum. Parameter ini diperlukan untuk penskalaan otomatis tugas. Default-nya adalah 1.

  • connector.spanner.sync.topic: Nama untuk topik sinkronisasi, topik konektor internal yang digunakan untuk menyimpan komunikasi antartugas. Secara default ke _sync_topic_spanner_connector_connectorname jika pengguna tidak memberi nama.

  • connector.spanner.sync.poll.duration: Durasi polling untuk topik sinkronisasi. Default-nya adalah 500 md.

  • connector.spanner.sync.request.timeout.ms: Waktu tunggu untuk permintaan ke topik sinkronisasi. Nilai defaultnya adalah 5.000 md.

  • connector.spanner.sync.delivery.timeout.ms: Waktu tunggu untuk memublikasikan ke topik sinkronisasi. Setelan defaultnya adalah 15.000 md.

  • connector.spanner.sync.commit.offsets.interval.ms: Interval saat offset dilakukan untuk topik sinkronisasi. Setelan defaultnya adalah 60.000 md.

  • connector.spanner.sync.publisher.wait.timeout: Interval saat pesan dipublikasikan ke topik sinkronisasi. Nilai defaultnya adalah 5 md.

  • connector.spanner.rebalancing.topic: Nama topik penyeimbangan ulang. Topik penyeimbangan ulang adalah topik konektor internal yang digunakan untuk menentukan keaktifan tugas. Secara default ke _rebalancing_topic_spanner_connector_connectorname jika pengguna tidak memberi nama.

  • connector.spanner.rebalancing.poll.duration: Durasi polling untuk penyeimbangan ulang topik. Nilai defaultnya adalah 5.000 md.

  • connector.spanner.rebalancing.commit.offsets.timeout: Waktu tunggu guna melakukan offset untuk topik penyeimbangan ulang. Nilai defaultnya adalah 5.000 md.

  • connector.spanner.rebalancing.commit.offsets.interval.ms: Interval saat offset dilakukan untuk topik sinkronisasi. Setelan defaultnya adalah 60.000 md.

  • connector.spanner.rebalancing.task.waiting.timeout: Durasi waktu tunggu tugas sebelum memproses peristiwa penyeimbangan ulang. Setelan defaultnya adalah 1.000 md.

Untuk mengetahui daftar properti konektor yang dapat dikonfigurasi secara lebih mendetail, lihat repositori GitHub.

Batasan