Auf dieser Seite wird erläutert, wie Sie mit dem Kafka-Connector Daten aus Spanner-Änderungsstreams verarbeiten und weiterleiten.
Wichtige Konzepte
Im Folgenden werden die wichtigsten Konzepte des Kafka-Connectors beschrieben.
Debezium
Debezium ist ein Open-Source-Projekt, das eine Streamingplattform mit niedriger Latenz für Change Data Capture bietet.
Kafka-Connector
Der Kafka-Connector bietet eine Abstraktion über die Spanner API, um Spanner-Änderungsstreams an Kafka zu veröffentlichen. Mit diesem Connector müssen Sie den Partitionslebenszyklus des Änderungsstreams nicht verwalten. Das ist notwendig, wenn Sie die Spanner API direkt verwenden.
Der Kafka-Connector erzeugt ein Änderungsereignis für jede Änderung von Datenänderungseinträgen und sendet Änderungsereigniseinträge im Downstream an ein separates Kafka-Thema für jede Tabelle, die durch einen Änderungsstream verfolgt wird. Ein Mod eines Datenänderungseintrags stellt eine einzelne erfasste Änderung (Einfügen, Aktualisieren oder Löschen) dar. Ein einzelner Datenänderungseintrag kann mehr als einen Mod enthalten.
Ausgabe des Kafka-Connectors
Der Kafka-Connector leitet Änderungsstreameinträge direkt an ein separates Kafka-Thema weiter. Der Name des Ausgabethemas sollte connector_name
.table_name
lauten. Wenn das Thema nicht vorhanden ist, erstellt der Kafka-Connector automatisch ein Thema unter diesem Namen.
Sie können auch Themen-Routing-Transformationen konfigurieren, um Einträge in von Ihnen angegebene Themen umzuleiten. Wenn Sie die Themenweiterleitung verwenden möchten, deaktivieren Sie die Funktion Niedriges Wasserzeichen.
Datensatzsortierung
Datensätze werden in den Kafka-Themen nach dem Commit-Zeitstempel pro Primärschlüssel sortiert. Datensätze, die zu verschiedenen Primärschlüsseln gehören, haben keine Reihenfolgegarantien. Datensätze mit demselben Primärschlüssel werden in derselben Kafka-Themenpartition gespeichert. Wenn Sie ganze Transaktionen verarbeiten möchten, können Sie auch die Felder server_transaction_id
und number_of_records_in_transaction
des Datenänderungseintrags verwenden, um eine Spanner-Transaktion zusammenzustellen.
Ereignisse ändern
Der Kafka-Connector generiert für jeden INSERT
-, UPDATE
- und DELETE
-Vorgang ein Datenänderungsereignis. Jedes Ereignis enthält einen Schlüssel und Werte für die geänderte Zeile.
Sie können Kafka Connect-Konverter verwenden, um Datenänderungsereignisse im Format Protobuf
, AVRO
, JSON
oder JSON Schemaless
zu generieren. Wenn Sie einen Kafka Connect-Konverter verwenden, der Schemas erstellt, enthält das Ereignis separate Schemas für den Schlüssel und die Werte. Andernfalls enthält das Ereignis nur den Schlüssel und die Werte.
Das Schema für den Schlüssel ändert sich nie. Das Schema für die Werte ist eine Zusammenfassung aller Spalten, die der Änderungsstream seit der Startzeit des Connectors verfolgt hat.
Wenn Sie den Connector zum Erstellen von JSON-Ereignissen konfigurieren, umfasst das Ausgabeänderungsereignis fünf Felder:
Das erste Feld
schema
gibt ein Kafka Connect-Schema an, das das Spanner-Schlüsselschema beschreibt.Das erste
payload
-Feld hat die im vorherigenschema
-Feld beschriebene Struktur und enthält den Schlüssel für die geänderte Zeile.Im zweiten Feld
schema
wird das Kafka Connect-Schema angegeben, das das Schema für die geänderte Zeile beschreibt.Das zweite
payload
-Feld hat die im vorherigenschema
-Feld beschriebene Struktur und enthält die tatsächlichen Daten für die geänderte Zeile.Das Feld
source
ist ein Pflichtfeld, das die Quellmetadaten für das Ereignis beschreibt.
Hier ein Beispiel für ein Datenänderungsereignis:
{ // The schema for the Spanner key. "schema": { "type": "struct", "name": "customers.Key", "optional": false, "fields": [ { "type": "int64", "optional": "false" "field": "false" } ] }, // The value of the Spanner key. "payload": { "id": "1" }, // The schema for the payload, which contains the before and after values // of the changed row. The schema for the payload contains all the // columns that the change stream has tracked since the connector start // time. "schema": { "type": "struct", "fields": [ { // The schema for the before values of the changed row. "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "id" }, { "type": "string", "optional": true, "field": "first_name" } ], "optional": true, "name": "customers.Value", "field": "before" }, { // The schema for the after values of the changed row. "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "id" }, { "type": "string", "optional": false, "field": "first_name" } ], "optional": true, "name": "customers.Value", "field": "after" }, { // The schema for the source metadata for the event. "type": "struct", "fields": [ { "type": "string", "optional": false, "field": "version" }, { "type": "string", "optional": false, "field": "connector" }, { "type": "string", "optional": false, "field": "name" }, { "type": "int64", "optional": false, "field": "ts_ms" }, { "type": "boolean", "optional": true, "default": false, "field": "snapshot" }, { "type": "string", "optional": false, "field": "db" }, { "type": "string", "optional": false, "field": "sequence" }, { "type": "string", "optional": false, "field": "project_id" }, { "type": "string", "optional": false, "field": "instance_id" }, { "type": "string", "optional": false, "field": "database_id" }, { "type": "string", "optional": false, "field": "change_stream_name" }, { "type": "string", "optional": true, "field": "table" } { "type": "string", "optional": true, "field": "server_transaction_id" } { "type": "int64", "optional": true, "field": "low_watermark" } { "type": "int64", "optional": true, "field": "read_at_timestamp" } { "type": "int64", "optional": true, "field": "number_of_records_in_transaction" } { "type": "string", "optional": true, "field": "transaction_tag" } { "type": "boolean", "optional": true, "field": "system_transaction" } { "type": "string", "optional": true, "field": "value_capture_type" } { "type": "string", "optional": true, "field": "partition_token" } { "type": "int32", "optional": true, "field": "mod_number" } { "type": "boolean", "optional": true, "field": "is_last_record_in_transaction_in_partition" } { "type": "int64", "optional": true, "field": "number_of_partitions_in_transaction" } ], "optional": false, "name": "io.debezium.connector.spanner.Source", "field": "source" }, ] { "type": "string", "optional": false, "field": "op" }, { "type": "int64", "optional": true, "field": "ts_ms" } ], "optional": false, "name": "connector_name.customers.Envelope" }, "payload": { // The values of the row before the event. "before": null, // The values of the row after the event. "after": { "id": 1, "first_name": "Anne", } }, // The source metadata. "source": { "version": "{debezium-version}", "connector": "spanner", "name": "spanner_connector", "ts_ms": 1670955531785, "snapshot": "false", "db": "database", "sequence": "1", "project_id": "project", "instance_id": "instance", "database_id": "database", "change_stream_name": "change_stream", "table": "customers", "server_transaction_id": "transaction_id", "low_watermark": 1670955471635, "read_at_timestamp": 1670955531791, "number_records_in_transaction": 2, "transaction_tag": "", "system_transaction": false, "value_capture_type": "OLD_AND_NEW_VALUES", "partition_token": "partition_token", "mod_number": 0, "is_last_record_in_transaction_in_partition": true, "number_of_partitions_in_transaction": 1 }, "op": "c", "ts_ms": 1559033904863 // }
Niedriges Wasserzeichen
Das niedrige Wasserzeichen beschreibt den Zeitpunkt T, zu dem der Kafka-Connector alle Ereignisse mit dem Zeitstempel < T garantiert in ein Kafka-Thema gestreamt und veröffentlicht hat.
Sie können das niedrige Wasserzeichen im Kafka-Connector mithilfe des Parameters gcp.spanner.low-watermark.enabled
aktivieren. Dieser Parameter ist standardmäßig deaktiviert. Wenn das Low-Wasserzeichen aktiviert ist, wird das Feld low_watermark
im Änderungsstream-Datenänderungseintrag mit dem aktuellen Wasserzeichen-Zeitstempel des Kafka-Connectors gefüllt.
Wenn keine Datensätze erstellt werden, sendet der Kafka-Connector regelmäßig Wasserzeichen „Heartbeats“ an die vom Connector erkannten Kafka-Ausgabethemen.
Diese Wasserzeichen-Heartbeats sind Datensätze, die mit Ausnahme des Felds low_watermark
leer sind. Anschließend können Sie das Low-Wasserzeichen verwenden, um zeitbasierte Aggregationen durchzuführen.
Sie können das Low-Wasserzeichen beispielsweise verwenden, um Ereignisse nach dem Commit-Zeitstempel über Primärschlüssel hinweg zu sortieren.
Metadatenthemen
Der Kafka-Connector und das Kafka Connect-Framework erstellen mehrere Metadatenthemen, um Connector-bezogene Informationen zu speichern. Es ist nicht empfehlenswert, entweder die Konfiguration oder den Inhalt dieser Metadatenthemen zu ändern.
Im Folgenden sind die Metadatenthemen aufgeführt:
_consumer_offsets
: Ein Thema, das automatisch von Kafka erstellt wird. Speichert Nutzer-Offsets für Nutzer, die im Kafka-Connector erstellt wurden._kafka-connect-offsets
: Ein Thema, das automatisch von Kafka Connect erstellt wird. Speichert die Connector-Offsets._sync_topic_spanner_connector_connectorname
: Ein Thema, das automatisch vom Connector erstellt wird. Speichert Metadaten zu Änderungsstreampartitionen._rebalancing_topic_spanner_connector_connectorname
: Ein Thema, das automatisch vom Connector erstellt wird. Wird verwendet, um die Aktivität von Connector-Aufgaben zu ermitteln._debezium-heartbeat.connectorname
: Ein Thema, mit dem Spanner-Änderungsstream-Heartbeats verarbeitet werden.
Laufzeit des Kafka-Connectors
Im Folgenden wird die Laufzeit des Kafka-Connectors beschrieben.
Skalierbarkeit
Der Kafka-Connector ist horizontal skalierbar und wird für eine oder mehrere Aufgaben ausgeführt, die auf mehrere Kafka Connect-Worker verteilt sind.
Garantien für Nachrichtenübermittlung
Der Kafka-Connector unterstützt eine mindestens einmalige Zustellung.
Fehlertoleranz
Der Kafka-Connector ist fehlertolerant. Während der Kafka-Connector Änderungen liest und Ereignisse erstellt, wird der letzte verarbeitete Commit-Zeitstempel für jede Änderungsstreampartition aufgezeichnet. Wenn der Kafka-Connector aus irgendeinem Grund beendet wird (z. B. bei Kommunikationsfehlern, Netzwerkproblemen oder Softwarefehlern), fährt der Kafka-Connector beim Neustart die Datensätze dort weiter, wo er zuletzt angehalten wurde.
Der Kafka-Connector liest das Informationsschema am Startzeitstempel des Kafka-Connectors, um Schemainformationen abzurufen. Standardmäßig kann Spanner das Informationsschema nicht bei Lesezeitstempeln vor der Versionsaufbewahrungsdauer lesen, die standardmäßig eine Stunde beträgt. Wenn Sie den Connector vor mehr als einer Stunde starten möchten, müssen Sie die Aufbewahrungsdauer der Datenbankversion erhöhen.
Kafka-Connector einrichten
Änderungsstream erstellen
Weitere Informationen zum Erstellen eines Änderungsstreams finden Sie unter Änderungsstream erstellen. Damit Sie mit den nächsten Schritten fortfahren können, ist eine Spanner-Instanz mit konfiguriertem Änderungsstream erforderlich.
Wenn Sie möchten, dass bei jedem Datenänderungsereignis sowohl geänderte als auch unveränderte Spalten zurückgegeben werden, müssen Sie den Werterfassungstyp NEW_ROW
verwenden. Weitere Informationen finden Sie unter Werterfassungstyp.
JAR-Datei für den Kafka-Connector installieren
Wenn Zookeeper, Kafka und Kafka Connect installiert sind, müssen Sie noch das Plug-in-Archiv des Connectors herunterladen, die JAR-Dateien in Ihre Kafka Connect-Umgebung extrahieren und das Verzeichnis mit den JAR-Dateien in Kafka Connect plugin.path
einfügen.
Anschließend müssen Sie den Kafka Connect-Prozess neu starten, um die neuen JAR-Dateien abzurufen.
Wenn Sie mit unveränderlichen Containern arbeiten, können Sie Images aus den Container-Images von Debezium für Zookeeper, Kafka und Kafka Connect abrufen. Auf dem Kafka Connect-Image ist der Spanner-Connector vorinstalliert.
Weitere Informationen zum Installieren von Debezium-basierten Kafka-Connector-JARs finden Sie unter Debezium installieren.
Kafka-Connector konfigurieren
Im Folgenden finden Sie ein Konfigurationsbeispiel für einen Kafka-Connector, der eine Verbindung zu einem Änderungsstream namens changeStreamAll
in der Datenbank users
in der Instanz test-instance
und dem Projekt test-project
herstellt.
"name": "spanner-connector", "config": { "connector.class": "io.debezium.connector.spanner.SpannerConnector", "gcp.spanner.project.id": "test-project", "gcp.spanner.instance.id": "test-instance", "gcp.spanner.database.id": "users", "gcp.spanner.change.stream": "changeStreamAll", "gcp.spanner.credentials.json": "{"client_id": user@example.com}", "gcp.spanner.database.role": "cdc-role", "tasks.max": "10" }
Diese Konfiguration enthält Folgendes:
Der Name des Connectors bei der Registrierung bei einem Kafka Connect-Dienst.
Der Name dieser Spanner-Connector-Klasse.
Die Projekt-ID.
Die Spanner-Instanz-ID.
Die Spanner-Datenbank-ID.
Der Name des Änderungsstreams.
Das JSON-Objekt für den Dienstkontoschlüssel.
(Optional) Die zu verwendende Spanner-Datenbankrolle.
Die maximale Anzahl von Tasks.
Eine vollständige Liste der Connector-Eigenschaften finden Sie hier.
Connector-Konfiguration zu Kafka Connect hinzufügen
So starten Sie einen Spanner-Connector:
Erstellen Sie eine Konfiguration für den Spanner-Connector.
Verwenden Sie die Kafka Connect REST API, um diese Connector-Konfiguration Ihrem Kafka Connect-Cluster hinzuzufügen.
Sie können diese Konfiguration mit einem POST
-Befehl an einen laufenden Kafka Connect-Dienst senden. Standardmäßig wird der Kafka Connect-Dienst auf Port 8083
ausgeführt.
Der Dienst zeichnet die Konfiguration auf und startet die Connector-Aufgabe, die eine Verbindung zur Spanner-Datenbank herstellt und Änderungsereigniseinträge an Kafka-Themen streamt.
Hier ein Beispiel für einen POST
-Befehl:
POST /connectors HTTP/1.1 Host: http://localhost:8083 Accept: application/json { "name": "spanner-connector" "config": { "connector.class": "io.debezium.connector.spanner.SpannerConnector", "gcp.spanner.project.id": "test-project", "gcp.spanner.instance.id": "test-instance", "gcp.spanner.database.id": "users", "gcp.spanner.change.stream": "changeStreamAll", "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }", "heartbeat.interval.ms": "100", "tasks.max": "10" } }
Beispiel für eine erfolgreiche Antwort:
HTTP/1.1 201 Created Content-Type: application/json { "name": "spanner-connector", "config": { "connector.class": "io.debezium.connector.spanner.SpannerConnector", "gcp.spanner.project.id": "test-project", "gcp.spanner.instance.id": "test-instance", "gcp.spanner.database.id": "users", "gcp.spanner.change.stream": "changeStreamAll", "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }", "heartbeat.interval.ms": "100", "tasks.max": "10" }, "tasks": [ { "connector": "spanner-connector", "task": 1 }, { "connector": "spanner-connector", "task": 2 }, { "connector": "spanner-connector", "task": 3 } ] }
Konfiguration des Kafka-Connectors aktualisieren
Zum Aktualisieren der Connector-Konfiguration senden Sie einen PUT
-Befehl an den ausgeführten Kafka Connect-Dienst mit demselben Connector-Namen.
Angenommen, wir haben einen Connector mit der Konfiguration aus dem vorherigen Abschnitt, die ausgeführt wird. Hier ein Beispiel für einen PUT
-Befehl:
PUT /connectors/spanner-connector/config HTTP/1.1 Host: http://localhost:8083 Accept: application/json { "connector.class": "io.debezium.connector.spanner.SpannerConnector", "gcp.spanner.project.id": "test-project", "gcp.spanner.instance.id": "test-instance", "gcp.spanner.database.id": "users", "gcp.spanner.change.stream": "changeStreamAll", "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }", "heartbeat.interval.ms": "100", "tasks.max": "10" }
Beispiel für eine erfolgreiche Antwort:
HTTP/1.1 200 OK Content-Type: application/json { "connector.class": "io.debezium.connector.spanner.SpannerConnector", "tasks.max": "10", "gcp.spanner.project.id": "test-project", "gcp.spanner.instance.id": "test-instance", "gcp.spanner.database.id": "users", "gcp.spanner.change.stream": "changeStreamAll", "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }", "heartbeat.interval.ms": "100", "tasks.max": "10" }
Kafka-Connector beenden
Zum Beenden des Connectors senden Sie einen DELETE
-Befehl an den ausgeführten Kafka Connect-Dienst mit demselben Connector-Namen.
Angenommen, wir haben einen Connector mit der Konfiguration aus dem vorherigen Abschnitt, die ausgeführt wird. Hier ein Beispiel für einen DELETE
-Befehl:
DELETE /connectors/spanner-connector HTTP/1.1 Host: http://localhost:8083
Beispiel für eine erfolgreiche Antwort:
HTTP/1.1 204 No Content
Kafka-Connector überwachen
Zusätzlich zu den standardmäßigen Kafka Connect- und Debezium-Messwerten exportiert der Kafka-Connector seine eigenen Messwerte:
MilliSecondsLowWatermark
: Das aktuelle Wasserzeichen der Connector-Aufgabe in Millisekunden. Das untere Wasserzeichen beschreibt die Zeit T, zu der der Connector garantiert alle Ereignisse mit dem Zeitstempel < T gestreamt hat.MilliSecondsLowWatermarkLag
: Die Verzögerung des niedrigen Wasserzeichens hinter der aktuellen Zeit in Millisekunden. Alle Ereignisse mit Zeitstempel < T wurden gestreamt.LatencyLowWatermark<Variant>MilliSeconds
: Die Verzögerung des niedrigen Wasserzeichens hinter der aktuellen Zeit in Millisekunden. Es werden die Varianten P50, P95, P99, Durchschnitt, Min und Max angegeben.LatencySpanner<Variant>MilliSeconds
: Die Latenz von Spanner-commit-timestamp-to-connector-read. Die Varianten P50, P95, P99, Durchschnitt, Min und Max werden angegeben.LatencyReadToEmit<Variant>MilliSeconds
: Die Spanner-Lese-Zeitstempel-zu-Connector-Emit-Latenz. Es werden die Varianten P50, P95, P99, Durchschnitt, Min und Max angegeben.LatencyCommitToEmit<Variant>tMilliSeconds
: Die Latenz von Spanner-commit-timestamp-to-connector-emit. Es werden die Varianten P50, P95, P99, Durchschnitt, Min und Max angegeben.LatencyCommitToPublish<Variant>MilliSeconds
: Die Latenz von Spanner-commit-timestamp-to Kafka-publish-timestamp. Die Varianten P50, P95, P99, Durchschnitt, Min und Max werden angegeben.NumberOfChangeStreamPartitionsDetected
: Die Gesamtzahl der Partitionen, die von der aktuellen Connector-Task erkannt wurden.NumberOfChangeStreamQueriesIssued
: Die Gesamtzahl der Änderungsstreamabfragen, die von der aktuellen Aufgabe ausgegeben wurden.NumberOfActiveChangeStreamQueries
: Die aktive Anzahl der Änderungsstreamabfragen, die von der aktuellen Connector-Aufgabe erkannt werden.SpannerEventQueueCapacity
: Die Gesamtkapazität vonStreamEventQueue
, einer Warteschlange, in der Elemente gespeichert werden, die über Änderungsstreamabfragen empfangen wurden.SpannerEventQueueCapacity
: die verbleibende Kapazität vonStreamEventQueue
TaskStateChangeEventQueueCapacity
: Die Gesamtkapazität vonTaskStateChangeEventQueue
, einer Warteschlange, in der Ereignisse im Connector gespeichert werden.RemainingTaskStateChangeEventQueueCapacity
: die verbleibende Kapazität vonTaskStateChangeEventQueue
NumberOfActiveChangeStreamQueries
: Die aktive Anzahl der Änderungsstreamabfragen, die von der aktuellen Connector-Aufgabe erkannt werden.
Konfigurationsattribute des Kafka-Connectors
Folgende Konfigurationseigenschaften sind für den Connector erforderlich:
name
: Eindeutiger Name für den Connector. Der Versuch, sich noch einmal mit demselben Namen zu registrieren, führt zu einem Fehler. Dieses Attribut ist für alle Kafka Connect-Connectors erforderlich.connector.class
: Der Name der Java-Klasse für den Connector. Verwenden Sie für den Kafka-Connector immer den Wertio.debezium.connector.spanner.SpannerConnector
.tasks.max
: Die maximale Anzahl von Aufgaben, die für diesen Connector erstellt werden sollen.gcp.spanner.project.id
: die Projekt-IDgcp.spanner.instance.id
: die Spanner-Instanz-IDgcp.spanner.database.id
: die Spanner-Datenbank-IDgcp.spanner.change.stream
: Der Name des Spanner-Änderungsstreamsgcp.spanner.credentials.json
: Das JSON-Objekt des Dienstkontoschlüssels.gcp.spanner.credentials.path
: Der Dateipfad zum JSON-Objekt des Dienstkontoschlüssels. Erforderlich, wenn das obige Feld nicht ausgefüllt ist.gcp.spanner.database.role
: Die zu verwendende Spanner-Datenbankrolle. Dies ist nur erforderlich, wenn der Änderungsstream durch eine detailgenaue Zugriffssteuerung gesichert ist. Die Datenbankrolle muss die BerechtigungSELECT
für den Änderungsstream und die BerechtigungEXECUTE
für die Lesefunktion des Änderungsstreams haben. Weitere Informationen finden Sie unter Detailgenaue Zugriffssteuerung für Änderungsstreams.
Die folgenden erweiterten Konfigurationseigenschaften enthalten Standardeinstellungen, die in den meisten Situationen funktionieren und daher nur selten in der Konfiguration des Connectors angegeben werden müssen:
gcp.spanner.low-watermark.enabled
: Gibt an, ob das niedrige Wasserzeichen für den Connector aktiviert ist. Der Standardwert ist "false".gcp.spanner.low-watermark.update-period.ms
: Das Intervall, in dem das niedrige Wasserzeichen aktualisiert wird. Der Standardwert ist 1.000 ms.heartbeat.interval.ms
: Das Spanner-Heartbeat-Intervall. Der Standardwert ist 300.000 (fünf Minuten).gcp.spanner.start.time
: Die Startzeit des Connectors. Die Standardeinstellung ist die aktuelle Uhrzeit.gcp.spanner.end.time
: Die Endzeit des Connectors. Die Standardeinstellung ist unendlich.tables.exclude.list
: Die Tabellen, bei denen Änderungsereignisse ausgeschlossen werden sollen. Die Standardeinstellung ist leer.tables.include.list
: Die Tabellen, für die Änderungsereignisse enthalten sein sollen. Wenn das Feld nicht ausgefüllt ist, werden alle Tabellen einbezogen. Die Standardeinstellung ist leer.gcp.spanner.stream.event.queue.capacity
: Die Kapazität der Spanner-Ereigniswarteschlange. Die Standardeinstellung ist 10.000.connector.spanner.task.state.change.event.queue.capacity
: Die Kapazität der Ereigniswarteschlange für die Aufgabenstatusänderung. Die Standardeinstellung ist 1.000.connector.spanner.max.missed.heartbeats
: Die maximale Anzahl verpasster Herzschläge für eine Änderungsstreamabfrage, bevor eine Ausnahme ausgelöst wird. Der Standardwert ist 10.scaler.monitor.enabled
: Gibt an, ob das Aufgaben-Autoscaling aktiviert ist. Die Standardeinstellung ist "false".tasks.desired.partitions
: Die bevorzugte Anzahl von Änderungsstreampartitionen pro Task. Dieser Parameter wird für das Autoscaling von Aufgaben benötigt. Die Standardeinstellung ist 2.tasks.min
: Die Mindestanzahl von Tasks. Dieser Parameter wird für das Autoscaling von Aufgaben benötigt. Der Standardfaktor ist 1.connector.spanner.sync.topic
: Der Name des Synchronisierungsthemas. Dies ist ein internes Connector-Thema, das zum Speichern der Kommunikation zwischen Aufgaben verwendet wird. Wenn der Nutzer keinen Namen angegeben hat, wird die Standardeinstellung_sync_topic_spanner_connector_connectorname
verwendet.connector.spanner.sync.poll.duration
: Die Abfragedauer für das Synchronisierungsthema. Die Standardeinstellung ist 500 ms.connector.spanner.sync.request.timeout.ms
: Das Zeitlimit für Anfragen an das Synchronisierungsthema. Die Standardeinstellung ist 5.000 ms.connector.spanner.sync.delivery.timeout.ms
: Das Zeitlimit für die Veröffentlichung im Synchronisierungsthema. Die Standardeinstellung ist 15.000 ms.connector.spanner.sync.commit.offsets.interval.ms
: Das Intervall, in dem für das Synchronisierungsthema ein Commit für Offsets durchgeführt wird. Die Standardeinstellung ist 60.000 ms.connector.spanner.sync.publisher.wait.timeout
: Das Intervall, in dem Nachrichten im Synchronisierungsthema veröffentlicht werden. Die Standardeinstellung ist 5 ms.connector.spanner.rebalancing.topic
: Der Name des Themas für den Ausgleich. Das Thema für den Ausgleich ist ein internes Connector-Thema, mit dem die Aktivität von Aufgaben ermittelt wird. Wenn der Nutzer keinen Namen angegeben hat, wird die Standardeinstellung_rebalancing_topic_spanner_connector_connectorname
verwendet.connector.spanner.rebalancing.poll.duration
: Die Abfragedauer für das Thema des Ausgleichs. Die Standardeinstellung ist 5.000 ms.connector.spanner.rebalancing.commit.offsets.timeout
: Das Zeitlimit für das Commit von Offsets für das Thema für den Ausgleich. Die Standardeinstellung ist 5.000 ms.connector.spanner.rebalancing.commit.offsets.interval.ms
: Das Intervall, in dem für das Synchronisierungsthema ein Commit für Offsets durchgeführt wird. Die Standardeinstellung ist 60.000 ms.connector.spanner.rebalancing.task.waiting.timeout
: Die Zeit, die eine Aufgabe wartet, bevor ein Ausgleichsereignis verarbeitet wird. Die Standardeinstellung ist 1.000 ms.
Eine noch detailliertere Liste der konfigurierbaren Connector-Attribute finden Sie im GitHub-Repository.
Beschränkungen
Das Streaming von Snapshot-Ereignissen wird vom Connector nicht unterstützt.
Wenn Wasserzeichen im Connector aktiviert ist, können Sie keine Routing-Transformationen für Debezium-Themen konfigurieren.