Auf dieser Seite wird erläutert, wie Sie den Kafka-Connector zum Verarbeiten und Weiterleiten von Cloud Spanner-Änderungsstream-Daten verwenden.
Wichtige Konzepte
Im Folgenden werden die Kernkonzepte des Kafka-Connectors beschrieben.
Debezium
Debezium ist ein Open-Source-Projekt, das eine Datenstreamingplattform mit niedriger Latenz für die Erfassung von Änderungsdaten bietet.
Kafka-Connector
Der Kafka-Connector bietet eine Abstraktion über die Spanner API, um Spanner-Änderungsstreams in Kafka zu veröffentlichen. Mit diesem Connector müssen Sie den Lebenszyklus der Änderungsstreams nicht verwalten, was erforderlich ist, wenn Sie die Spanner API direkt verwenden.
Der Kafka-Connector generiert ein Änderungsereignis für jeden Datensatz mit Datenänderungsänderungen und sendet die Änderungsereignisdatensätze an jedes separate Kafka-Thema für jede mit dem Änderungsstream verfolgte Tabelle. Ein Mod-Datensatz-Mod-Datensatz stellt eine einzelne Änderung (einfügen, aktualisieren oder löschen) dar, die erfasst wurde. Ein einzelner Datensatz mit Datenänderungen kann mehr als einen Mod enthalten.
Ausgabe des Kafka-Connectors
Der Kafka-Connector leitet Änderungsstream-Einträge direkt in ein separates Kafka-Thema weiter. Der Name des Ausgabethemas sollte connector_name
.table_name
sein. Wenn das Thema nicht vorhanden ist, erstellt der Kafka-Connector automatisch ein Thema unter diesem Namen.
Sie können auch Themenrouting-Transformationen konfigurieren, um Einträge an von Ihnen angegebene Themen weiterzuleiten. Wenn Sie die Themenweiterleitung verwenden möchten, deaktivieren Sie die Funktion Niedriges Wasserzeichen.
Reihenfolge der Einträge
Datensätze werden in den Kafka-Themen nach Commit-Zeitstempel pro Primärschlüssel geordnet. Datensätze, die zu verschiedenen Primärschlüsseln gehören, haben keine Sortiergarantien. Datensätze mit demselben Primärschlüssel werden in derselben Kafka-Themenpartition gespeichert. Wenn Sie ganze Transaktionen verarbeiten möchten, können Sie auch die Felder server_transaction_id
und number_of_records_in_transaction
des Datenänderungseintrags verwenden, um eine Spanner-Transaktion zusammenzustellen.
Ereignisse ändern
Der Kafka-Connector generiert für jeden INSERT
-, UPDATE
- und DELETE
-Vorgang ein Datenänderungsereignis. Jedes Ereignis enthält einen Schlüssel und Werte für die geänderte Zeile.
Mit Kafka Connect-Convertern können Sie Datenänderungsereignisse im Format Protobuf
, AVRO
, JSON
oder JSON Schemaless
generieren. Wenn Sie einen Kafka Connect-Converter verwenden, der Schemas generiert, enthält das Ereignis separate Schemas für den Schlüssel und die Werte. Andernfalls enthält das Ereignis nur den Schlüssel und die Werte.
Das Schema für den Schlüssel ändert sich nie. Das Schema für die Werte ist eine Zusammenfassung aller Spalten, die vom Änderungsstream seit der Startzeit des Connectors erfasst wurden.
Wenn Sie den Connector so konfigurieren, dass er JSON-Ereignisse erzeugt, enthält das Ausgabeänderungsereignis fünf Felder:
Das erste
schema
-Feld gibt ein Kafka Connect-Schema an, das das Spanner-Schlüsselschema beschreibt.Das erste
payload
-Feld hat die im vorherigenschema
-Feld beschriebene Struktur und enthält den Schlüssel für die geänderte Zeile.Das zweite
schema
-Feld gibt das Kafka Connect-Schema an, das das Schema für die geänderte Zeile beschreibt.Das zweite
payload
-Feld hat die im vorherigenschema
-Feld beschriebene Struktur und enthält die tatsächlichen Daten für die geänderte Zeile.Das Feld
source
ist ein Pflichtfeld, das die Quellmetadaten für das Ereignis beschreibt.
Hier ein Beispiel für ein Datenänderungsereignis:
{ // The schema for the Spanner key. "schema": { "type": "struct", "name": "customers.Key", "optional": false, "fields": [ { "type": "int64", "optional": "false" "field": "false" } ] }, // The value of the Spanner key. "payload": { "id": "1" }, // The schema for the payload, which contains the before and after values // of the changed row. The schema for the payload contains all the // columns that the change stream has tracked since the connector start // time. "schema": { "type": "struct", "fields": [ { // The schema for the before values of the changed row. "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "id" }, { "type": "string", "optional": true, "field": "first_name" } ], "optional": true, "name": "customers.Value", "field": "before" }, { // The schema for the after values of the changed row. "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "id" }, { "type": "string", "optional": false, "field": "first_name" } ], "optional": true, "name": "customers.Value", "field": "after" }, { // The schema for the source metadata for the event. "type": "struct", "fields": [ { "type": "string", "optional": false, "field": "version" }, { "type": "string", "optional": false, "field": "connector" }, { "type": "string", "optional": false, "field": "name" }, { "type": "int64", "optional": false, "field": "ts_ms" }, { "type": "boolean", "optional": true, "default": false, "field": "snapshot" }, { "type": "string", "optional": false, "field": "db" }, { "type": "string", "optional": false, "field": "sequence" }, { "type": "string", "optional": false, "field": "project_id" }, { "type": "string", "optional": false, "field": "instance_id" }, { "type": "string", "optional": false, "field": "database_id" }, { "type": "string", "optional": false, "field": "change_stream_name" }, { "type": "string", "optional": true, "field": "table" } { "type": "string", "optional": true, "field": "server_transaction_id" } { "type": "int64", "optional": true, "field": "low_watermark" } { "type": "int64", "optional": true, "field": "read_at_timestamp" } { "type": "int64", "optional": true, "field": "number_of_records_in_transaction" } { "type": "string", "optional": true, "field": "transaction_tag" } { "type": "boolean", "optional": true, "field": "system_transaction" } { "type": "string", "optional": true, "field": "value_capture_type" } { "type": "string", "optional": true, "field": "partition_token" } { "type": "int32", "optional": true, "field": "mod_number" } { "type": "boolean", "optional": true, "field": "is_last_record_in_transaction_in_partition" } { "type": "int64", "optional": true, "field": "number_of_partitions_in_transaction" } ], "optional": false, "name": "io.debezium.connector.spanner.Source", "field": "source" }, ] { "type": "string", "optional": false, "field": "op" }, { "type": "int64", "optional": true, "field": "ts_ms" } ], "optional": false, "name": "connector_name.customers.Envelope" }, "payload": { // The values of the row before the event. "before": null, // The values of the row after the event. "after": { "id": 1, "first_name": "Anne", } }, // The source metadata. "source": { "version": "{debezium-version}", "connector": "spanner", "name": "spanner_connector", "ts_ms": 1670955531785, "snapshot": "false", "db": "database", "sequence": "1", "project_id": "project", "instance_id": "instance", "database_id": "database", "change_stream_name": "change_stream", "table": "customers", "server_transaction_id": "transaction_id", "low_watermark": 1670955471635, "read_at_timestamp": 1670955531791, "number_records_in_transaction": 2, "transaction_tag": "", "system_transaction": false, "value_capture_type": "OLD_AND_NEW_VALUES", "partition_token": "partition_token", "mod_number": 0, "is_last_record_in_transaction_in_partition": true, "number_of_partitions_in_transaction": 1 }, "op": "c", "ts_ms": 1559033904863 // }
Niedriges Wasserzeichen
Das niedrige Wasserzeichen beschreibt die Zeit T, zu der der Kafka-Connector garantiert gestreamt und in einem Kafka-Thema veröffentlicht wurde. Dabei werden alle Ereignisse mit dem Zeitstempel < T berücksichtigt.
Sie können das niedrige Wasserzeichen im Kafka-Connector mit dem Parameter gcp.spanner.low-watermark.enabled
aktivieren. Dieser Parameter ist standardmäßig deaktiviert. Wenn das niedrige Wasserzeichen aktiviert ist, wird das Feld low_watermark
im Änderungsdatenstream-Datensatz mit dem aktuellen niedrigen Wasserzeichenzeitstempel des Kafka-Connectors gefüllt.
Wenn keine Datensätze erstellt werden, sendet der Kafka-Connector regelmäßige Wasserzeichen „Herzschläge“ an die Kafka-Ausgabethemen, die vom Connector erkannt werden.
Diese Wasserzeichen-Herzschläge sind Einträge, die mit Ausnahme des Felds low_watermark
leer sind. Sie können das Wasserzeichen dann für zeitbasierte Zusammenfassungen verwenden.
Beispielsweise können Sie mit dem niedrigen Wasserzeichen Ereignisse nach Commit-Zeitstempel für Primärschlüssel sortieren.
Metadatenthemen
Der Kafka-Connector sowie das Framework von Kafka Connect erstellen mehrere Metadatenthemen, um Connector-bezogene Informationen zu speichern. Es wird nicht empfohlen, entweder die Konfiguration oder den Inhalt dieser Metadatenthemen zu ändern.
Im Folgenden finden Sie die Themen:
_consumer_offsets
: Ein Thema, das automatisch von Kafka erstellt wird. Speichert Verbraucher-Offsets für Nutzer, die im Kafka-Connector erstellt wurden._kafka-connect-offsets
: Ein Thema, das automatisch von Kafka Connect erstellt wird. Speichert die Connector-Offsets._sync_topic_spanner_connector_connectorname
: Ein vom Connector automatisch erstelltes Thema. Speichert Metadaten zu Änderungsstream-Partitionen._rebalancing_topic_spanner_connector_connectorname
: Ein vom Connector automatisch erstelltes Thema. Wird verwendet, um die Aktivität der Connector-Aufgabe zu bestimmen._debezium-heartbeat.connectorname
: Ein Thema zum Verarbeiten von Heartbeats für Spanner-Änderungsstreams.
Kafka-Connector-Laufzeit
Im Folgenden wird die Laufzeit des Kafka-Connectors beschrieben.
Skalierbarkeit
Der Kafka-Connector ist horizontal skalierbar und wird für eine oder mehrere Aufgaben ausgeführt, die auf mehrere Kafka Connect-Worker verteilt sind.
Garantien für Nachrichtenübermittlung
Der Kafka-Connector unterstützt die mindestens einmalige Zustellungsgarantie.
Fehlertoleranz
Der Kafka-Connector ist fehlertolerant. Während der Kafka-Connector Änderungen liest und Ereignisse erstellt, wird der letzte verarbeitete Zeitstempel für jede Änderungsstream-Partition aufgezeichnet. Wenn der Kafka-Connector aus irgendeinem Grund beendet wird (einschließlich Kommunikationsfehlern, Netzwerk- oder Softwarefehlern), werden die Kafka-Connectors beim Neustart an der Stelle fortgesetzt, an der sie unterbrochen wurden.
Der Kafka-Connector liest das Informationsschema beim Startzeitstempel des Kafka-Connectors, um Schemainformationen abzurufen. Spanner kann das Informationsschema standardmäßig nicht zu Lesezeitstempeln vor der Aufbewahrungsdauer von Versionen lesen, die standardmäßig auf eine Stunde festgelegt ist. Wenn Sie den Connector früher als eine Stunde in der Vergangenheit starten möchten, müssen Sie die Aufbewahrungsdauer der Datenbank erhöhen.
Kafka-Connector einrichten
Änderungsstream erstellen
Weitere Informationen finden Sie im Hilfeartikel Änderungsstream erstellen. Um mit den nächsten Schritten fortzufahren, ist eine Spanner-Instanz mit konfiguriertem Änderungsstream erforderlich.
Wenn bei jedem Datenänderungsereignis sowohl geänderte als auch unveränderte Spalten zurückgegeben werden sollen, verwenden Sie den Werterfassungstyp NEW_ROW
. Weitere Informationen finden Sie unter Werterfassungstyp.
Kafka-Connector-JAR-Datei installieren
Wenn Zookeeper, Kafka und Kafka Connect installiert sind, müssen Sie zum Bereitstellen eines Kafka-Connectors die Plug-in-Archivdatei des Connectors herunterladen, die JAR-Dateien in Ihre Kafka Connect-Umgebung extrahieren und das Verzeichnis mit den JAR-Dateien dem von Kafka Connect hinzufügen. Anschließend müssen Sie Ihren Kafka Connect-Prozess neu starten, um die neuen JAR-Dateien abzurufen.
Wenn Sie mit unveränderlichen Containern arbeiten, können Sie Images aus den Debezium-Container-Images für Zookeeper, Kafka und Kafka Connect abrufen. Auf dem Kafka Connect-Image ist der Cloud Spanner-Connector vorinstalliert.
Weitere Informationen zur Installation von Debezium-basierten Kafka-Connector-JARs finden Sie unter Debezium installieren.
Kafka-Connector konfigurieren
Das folgende Beispiel zeigt die Konfiguration für einen Kafka-Connector, der mit einem Änderungsstream namens changeStreamAll
in der Datenbank users
in der Instanz test-instance
und dem Projekt test-project
verbunden ist.
"name": "spanner-connector", "config": { "connector.class": "io.debezium.connector.spanner.SpannerConnector", "gcp.spanner.project.id": "test-project", "gcp.spanner.instance.id": "test-instance", "gcp.spanner.database.id": "users", "gcp.spanner.change.stream": "changeStreamAll", "gcp.spanner.credentials.json": "{"client_id": user@example.com}", "tasks.max": "10" }
Diese Konfiguration enthält Folgendes:
Der Name des Connectors bei der Registrierung für einen Kafka Connect-Dienst.
Der Name dieser Spanner-Connector-Klasse.
Die Projekt-ID.
Spanner-Instanz-ID
Spanner-Datenbank-ID
Name des Streams ändern
Das JSON-Objekt für den Dienstkontoschlüssel.
Die maximale Anzahl von Aufgaben.
Eine vollständige Liste der Connector-Eigenschaften finden Sie unter Eigenschaften der Kafka-Connector-Konfiguration.
Connector-Konfiguration zu Kafka Connect hinzufügen
So starten Sie einen Spanner-Connector:
Erstellen Sie eine Konfiguration für den Spanner-Connector.
Verwenden Sie die Kafka Connect REST API, um Ihrem Kafka Connect-Cluster diese Connector-Konfiguration hinzuzufügen.
Sie können diese Konfiguration mit dem Befehl POST
an einen laufenden Kafka Connect-Dienst senden. Standardmäßig wird der Kafka Connect-Dienst über Port 8083
ausgeführt.
Der Dienst zeichnet die Konfiguration auf und startet die Connector-Aufgabe, die eine Verbindung zur Spanner-Datenbank herstellt und Stream-Ereignisdatensätze in Kafka-Themen streamt.
Hier ein Beispiel für einen POST
-Befehl:
POST /connectors HTTP/1.1 Host: http://localhost:8083 Accept: application/json { "name": "spanner-connector" "config": { "connector.class": "io.debezium.connector.spanner.SpannerConnector", "gcp.spanner.project.id": "test-project", "gcp.spanner.instance.id": "test-instance", "gcp.spanner.database.id": "users", "gcp.spanner.change.stream": "changeStreamAll", "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }", "heartbeat.interval.ms": "100", "tasks.max": "10" } }
Beispiel für eine erfolgreiche Antwort:
HTTP/1.1 201 Created Content-Type: application/json { "name": "spanner-connector", "config": { "connector.class": "io.debezium.connector.spanner.SpannerConnector", "gcp.spanner.project.id": "test-project", "gcp.spanner.instance.id": "test-instance", "gcp.spanner.database.id": "users", "gcp.spanner.change.stream": "changeStreamAll", "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }", "heartbeat.interval.ms": "100", "tasks.max": "10" }, "tasks": [ { "connector": "spanner-connector", "task": 1 }, { "connector": "spanner-connector", "task": 2 }, { "connector": "spanner-connector", "task": 3 } ] }
Konfiguration des Kafka-Connectors aktualisieren
Senden Sie zum Aktualisieren der Connector-Konfiguration einen PUT
-Befehl an den ausgeführten Kafka Connect-Dienst mit demselben Connector-Namen.
Angenommen, ein Connector wird mit der Konfiguration aus dem vorherigen Abschnitt ausgeführt. Hier ein Beispiel für einen PUT
-Befehl:
PUT /connectors/spanner-connector/config HTTP/1.1 Host: http://localhost:8083 Accept: application/json { "connector.class": "io.debezium.connector.spanner.SpannerConnector", "gcp.spanner.project.id": "test-project", "gcp.spanner.instance.id": "test-instance", "gcp.spanner.database.id": "users", "gcp.spanner.change.stream": "changeStreamAll", "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }", "heartbeat.interval.ms": "100", "tasks.max": "10" }
Beispiel für eine erfolgreiche Antwort:
HTTP/1.1 200 OK Content-Type: application/json { "connector.class": "io.debezium.connector.spanner.SpannerConnector", "tasks.max": "10", "gcp.spanner.project.id": "test-project", "gcp.spanner.instance.id": "test-instance", "gcp.spanner.database.id": "users", "gcp.spanner.change.stream": "changeStreamAll", "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }", "heartbeat.interval.ms": "100", "tasks.max": "10" }
Kafka-Connector beenden
Senden Sie zum Beenden des Connectors einen DELETE
-Befehl an den ausgeführten Kafka Connect-Dienst mit demselben Connector-Namen.
Angenommen, ein Connector wird mit der Konfiguration aus dem vorherigen Abschnitt ausgeführt. Hier ein Beispiel für einen DELETE
-Befehl:
DELETE /connectors/spanner-connector HTTP/1.1 Host: http://localhost:8083
Beispiel für eine erfolgreiche Antwort:
HTTP/1.1 204 No Content
Kafka-Connector überwachen
Neben den standardmäßigen Kafka Connect- und Debezium-Messwerten exportiert der Kafka-Connector seine eigenen Messwerte:
MilliSecondsLowWatermark
: Das aktuelle niedrige Wasserzeichen der Connector-Aufgabe in Millisekunden. Das niedrige Wasserzeichen beschreibt die Zeit T, zu der der Connector garantiert alle Ereignisse mit dem Zeitstempel < T gestreamt hat.MilliSecondsLowWatermarkLag
: Die Verzögerung des niedrigen Wasserzeichens hinter der aktuellen Zeit in Millisekunden. Alle Ereignisse mit Zeitstempel < T wurden gestreamt.LatencyLowWatermark<Variant>MilliSeconds
: Die Verzögerung des niedrigen Wasserzeichens hinter der aktuellen Zeit in Millisekunden. Die Varianten P50, P95, P99, Durchschnitt, Min und Max sind verfügbar.LatencySpanner<Variant>MilliSeconds
: Die Latenz von Spanner-commit-timestamp-to-connector-read. Die Varianten P50, P95, P99, Durchschnitt, Min und Max werden angegeben.LatencyReadToEmit<Variant>MilliSeconds
: Die Latenz von Spanner-read-timestamp-to-connector-emit. Die Varianten P50, P95, P99, Durchschnitt, Min und Max sind verfügbar.LatencyCommitToEmit<Variant>tMilliSeconds
: Die Latenz von Spanner-commit-timestamp-to-connector-emit. Die Varianten P50, P95, P99, Durchschnitt, Min und Max sind verfügbar.LatencyCommitToPublish<Variant>MilliSeconds
: Die Latenz von Spanner-commit-timestamp-to Kafka-publish-timestamp. Die Varianten P50, P95, P99, Durchschnitt, Min und Max werden angegeben.NumberOfChangeStreamPartitionsDetected
: Die Gesamtzahl der Partitionen, die von der aktuellen Connector-Aufgabe erkannt wurden.NumberOfChangeStreamQueriesIssued
: Die Gesamtzahl der Abfragen des Änderungsstreams, die von der aktuellen Aufgabe ausgegeben werden.NumberOfActiveChangeStreamQueries
: Die aktive Anzahl von Abfragen für Änderungsstreams, die von der aktuellen Connector-Aufgabe erkannt wurden.SpannerEventQueueCapacity
: Die Gesamtkapazität vonStreamEventQueue
, einer Warteschlange, in der Elemente gespeichert werden, die von Abfragen des Änderungsstreams empfangen wurden.SpannerEventQueueCapacity
: die verbleibendeStreamEventQueue
-Kapazität.TaskStateChangeEventQueueCapacity
: Die Gesamtkapazität vonTaskStateChangeEventQueue
, einer Warteschlange, in der Ereignisse im Connector gespeichert werden.RemainingTaskStateChangeEventQueueCapacity
: die verbleibendeTaskStateChangeEventQueue
-Kapazität.NumberOfActiveChangeStreamQueries
: Die aktive Anzahl von Abfragen für Änderungsstreams, die von der aktuellen Connector-Aufgabe erkannt wurden.
Konfigurationseigenschaften für den Kafka-Connector
Die folgenden Konfigurationsattribute für den Connector sind erforderlich:
name
: Eindeutiger Name für den Connector. Ein erneuter Registrierungsversuch mit demselben Namen führt zu einem Fehler. Diese Eigenschaft ist für alle Kafka Connect-Connectors erforderlich.connector.class
: Der Name der Java-Klasse für den Connector. Verwenden Sie für den Kafka-Connector immer den Wertio.debezium.connector.spanner.SpannerConnector
.tasks.max
: Die maximale Anzahl von Aufgaben, die für diesen Connector erstellt werden sollen.gcp.spanner.project.id
: die Projekt-IDgcp.spanner.instance.id
: die Spanner-Instanz-IDgcp.spanner.database.id
: die Spanner-Datenbank-IDgcp.spanner.change.stream
: Der Name des Spanner-Änderungsstreamsgcp.spanner.credentials.json
: Das JSON-Objekt des Dienstkontoschlüssels.gcp.spanner.credentials.path
: Der Dateipfad zum JSON-Objekt des Dienstkontoschlüssels. Erforderlich, wenn das Feld oben nicht angegeben wurde.
Die folgenden erweiterten Konfigurationsattribute haben Standardwerte, die in den meisten Situationen funktionieren und daher nur selten in der Konfiguration des Connectors angegeben werden müssen:
gcp.spanner.low-watermark.enabled
: Gibt an, ob das niedrige Wasserzeichen für den Connector aktiviert ist. Der Standardwert ist "false".gcp.spanner.low-watermark.update-period.ms
: Das Intervall, in dem das niedrige Wasserzeichen aktualisiert wird. Der Standardwert ist 1.000 ms.heartbeat.interval.ms
: Das Spanner-Intervall. Der Standardwert ist 300.000 (fünf Minuten).gcp.spanner.start.time
: Die Startzeit des Connectors. Die Standardeinstellung ist die aktuelle Zeit.gcp.spanner.end.time
: Die Endzeit des Connectors. Die Standardeinstellung ist „unendlich“.tables.exclude.list
: Die Tabellen, für die Änderungsereignisse ausgeschlossen werden sollen. Die Standardeinstellung ist leer.tables.include.list
: Die Tabellen, für die Änderungsereignisse berücksichtigt werden sollen. Wenn das Feld nicht ausgefüllt ist, werden alle Tabellen eingeschlossen. Die Standardeinstellung ist leer.gcp.spanner.stream.event.queue.capacity
: die Kapazität der Spanner-Ereigniswarteschlange. Der Standardwert ist 10.000.connector.spanner.task.state.change.event.queue.capacity
: Die Kapazität der Ereigniswarteschlange zur Änderung des Aufgabenstatus. Der Standardwert ist 1.000.connector.spanner.max.missed.heartbeats
: Die maximale Anzahl verpasster Heartbeats für eine Änderungsstream-Abfrage, bevor eine Ausnahme ausgelöst wird. Der Standardwert ist 10.scaler.monitor.enabled
: Gibt an, ob Aufgaben-Autoscaling aktiviert ist. Die Standardeinstellung ist "false".tasks.desired.partitions
: Die bevorzugte Anzahl von Änderungsstreams-Partitionen pro Aufgabe. Dieser Parameter wird für das Autoscaling von Aufgaben benötigt. Der Standardwert ist 2.tasks.min
: Die Mindestanzahl von Aufgaben. Dieser Parameter wird für das Autoscaling von Aufgaben benötigt. Der Standardfaktor ist 1.connector.spanner.sync.topic
: Der Name für das Synchronisierungsthema, ein internes Connector-Thema zum Speichern der Kommunikation zwischen Aufgaben. Wenn der Nutzer keinen Namen angegeben hat, wird standardmäßig_sync_topic_spanner_connector_connectorname
verwendet.connector.spanner.sync.poll.duration
: Die Dauer der Umfrage für das Synchronisierungsthema. Der Standardwert ist 500 ms.connector.spanner.sync.request.timeout.ms
: Das Zeitlimit für Anfragen an das Synchronisierungsthema. Der Standardwert ist 5.000 ms.connector.spanner.sync.delivery.timeout.ms
: Zeitlimit für die Veröffentlichung im Synchronisierungsthema. Der Standardwert ist 15.000 ms.connector.spanner.sync.commit.offsets.interval.ms
: Das Intervall, in dem die Offsets für das Synchronisierungsthema angewendet werden. Der Standardwert ist 60.000 ms.connector.spanner.sync.publisher.wait.timeout
: Das Intervall, in dem Nachrichten im Synchronisierungsthema veröffentlicht werden. Die Standardeinstellung ist 5 ms.connector.spanner.rebalancing.topic
: Der Name des Ausgleichsthemas. Das Thema für den Ausgleich ist ein internes Connector-Thema, das die Aktivität eines Aufgabenbereichs bestimmt. Wenn der Nutzer keinen Namen angegeben hat, wird standardmäßig_rebalancing_topic_spanner_connector_connectorname
verwendet.connector.spanner.rebalancing.poll.duration
: Die Umfragedauer für das Ausgleichsthema. Der Standardwert ist 5.000 ms.connector.spanner.rebalancing.commit.offsets.timeout
: Das Zeitlimit für das Commit von Offsets für das Ausgleichsthema. Der Standardwert ist 5.000 ms.connector.spanner.rebalancing.commit.offsets.interval.ms
: Das Intervall, in dem die Offsets für das Synchronisierungsthema angewendet werden. Der Standardwert ist 60.000 ms.connector.spanner.rebalancing.task.waiting.timeout
: Die Zeit, die eine Aufgabe wartet, bevor ein Ausgleichsereignis verarbeitet wird. Der Standardwert ist 1.000 ms.
Eine detailliertere Liste der konfigurierbaren Connector-Attribute finden Sie im GitHub-Repository.
Beschränkungen
Das Streaming von Snapshot-Ereignissen wird vom Connector nicht unterstützt.
Wenn im Connector das Wasserzeichen aktiviert ist, können Sie Debezium-Themenrouting-Transformationen nicht konfigurieren.
Dieser Connector unterstützt derzeit nicht die PostgreSQL-Schnittstelle für Cloud Spanner.