Auf dieser Seite wird erläutert, wie Sie mit dem Kafka-Connector und leiten Spanner-Änderungsstreams-Daten weiter.
Wichtige Konzepte
Im Folgenden werden die Kernkonzepte des Kafka-Connectors beschrieben.
Debezium
Debezium ist ein Open-Source-Projekt, eine Datenstreamingplattform mit niedriger Latenz zur Erfassung von Change Data Capture.
Kafka-Connector
Der Kafka-Connector bietet eine Abstraktion über die Spanner API, Spanner-Änderungsstreams in Kafka veröffentlichen. Bei diesem Connector müssen Sie den Lebenszyklus der Änderungsstreams nicht verwalten, Dies ist erforderlich, wenn Sie die Spanner API direkt verwenden.
Der Kafka-Connector erstellt ein Änderungsereignis für jeden Modifikator in Form von Datenänderungseinträgen und sendet Änderungsereigniseinträge nachgelagert in ein separates Kafka-Thema für jede Tabelle, die den Änderungsstream verfolgt. Ein Mod für den Datensatz für eine Änderung steht für eine einzelne erfasste Änderung (Einfügen, Aktualisieren oder Löschen). Ein einzelner Datensatz für die Datenänderung kann mehr als einen Mod enthalten.
Ausgabe des Kafka-Connectors
Der Kafka-Connector leitet Änderungsstreams direkt weiter.
separaten Kafka-Thema. Der Name des Ausgabethemas sollte connector_name
.table_name
lauten.
Wenn das Thema nicht vorhanden ist, erstellt der Kafka-Connector automatisch ein Thema unter diesem Namen.
Sie können auch Themen-Routing-Transformationen so konfigurieren, dass Einträge an die von Ihnen angegebenen Themen weitergeleitet werden. Wenn Sie die Weiterleitung nach Themen verwenden möchten, deaktivieren Sie die Funktion Niedriges Wasserzeichen.
Datensatzsortierung
Datensätze werden nach dem Commit-Zeitstempel pro Primärschlüssel in
die Kafka-Themen. Datensätze, die zu verschiedenen Primärschlüsseln gehören, enthalten keine
Bestellgarantien. Datensätze mit demselben Primärschlüssel werden im
Kafka-Themenpartitionierung. Wenn Sie ganze Transaktionen verarbeiten möchten, können Sie
auch die des Data Change Record
server_transaction_id
und number_of_records_in_transaction
, um
eine Spanner-Transaktion zusammenzustellen.
Änderungsereignisse
Der Kafka-Connector generiert ein Datenänderungsereignis für jede INSERT
, UPDATE
,
und DELETE
-Operation. Jedes Ereignis enthält einen Schlüssel und Werte für die geänderte Zeile.
Sie können Kafka Connect-Converter verwenden, um Datenänderungsereignisse in
Protobuf
, AVRO
, JSON
oder JSON Schemaless
. Wenn Sie eine
Kafka Connect-Konverter, der Schemas erstellt, enthält das Ereignis
separate Schemas für Schlüssel und Werte. Andernfalls enthält der Termin nur
Schlüssel und Werte.
Das Schema für den Schlüssel ändert sich nie. Das Schema für die Werte ist ein Zusammenführung aller Spalten, die der Änderungsstream seit dem letzten Connector-Startzeit.
Wenn Sie den Connector so konfigurieren, dass JSON-Ereignisse erzeugt werden, wird das Ausgabeänderungsereignis enthält fünf Felder:
Das erste Feld
schema
gibt ein Kafka Connect-Schema an, das das Spanner-Schlüsselschema beschreibt.Das erste
payload
-Feld hat die im vorherigenschema
-Feld beschriebene Struktur und enthält den Schlüssel für die geänderte Zeile.Das zweite
schema
-Feld gibt das Kafka Connect-Schema an, das das Schema für die geänderte Zeile beschreibt.Das zweite
payload
-Feld hat die im vorherigenschema
-Feld beschriebene Struktur und enthält die tatsächlichen Daten für die geänderte Zeile.Das Feld
source
ist ein Pflichtfeld, das die Quellmetadaten für das Ereignis beschreibt.
Hier ein Beispiel für ein Datenänderungsereignis:
{ // The schema for the Spanner key. "schema": { "type": "struct", "name": "customers.Key", "optional": false, "fields": [ { "type": "int64", "optional": "false" "field": "false" } ] }, // The value of the Spanner key. "payload": { "id": "1" }, // The schema for the payload, which contains the before and after values // of the changed row. The schema for the payload contains all the // columns that the change stream has tracked since the connector start // time. "schema": { "type": "struct", "fields": [ { // The schema for the before values of the changed row. "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "id" }, { "type": "string", "optional": true, "field": "first_name" } ], "optional": true, "name": "customers.Value", "field": "before" }, { // The schema for the after values of the changed row. "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "id" }, { "type": "string", "optional": false, "field": "first_name" } ], "optional": true, "name": "customers.Value", "field": "after" }, { // The schema for the source metadata for the event. "type": "struct", "fields": [ { "type": "string", "optional": false, "field": "version" }, { "type": "string", "optional": false, "field": "connector" }, { "type": "string", "optional": false, "field": "name" }, { "type": "int64", "optional": false, "field": "ts_ms" }, { "type": "boolean", "optional": true, "default": false, "field": "snapshot" }, { "type": "string", "optional": false, "field": "db" }, { "type": "string", "optional": false, "field": "sequence" }, { "type": "string", "optional": false, "field": "project_id" }, { "type": "string", "optional": false, "field": "instance_id" }, { "type": "string", "optional": false, "field": "database_id" }, { "type": "string", "optional": false, "field": "change_stream_name" }, { "type": "string", "optional": true, "field": "table" } { "type": "string", "optional": true, "field": "server_transaction_id" } { "type": "int64", "optional": true, "field": "low_watermark" } { "type": "int64", "optional": true, "field": "read_at_timestamp" } { "type": "int64", "optional": true, "field": "number_of_records_in_transaction" } { "type": "string", "optional": true, "field": "transaction_tag" } { "type": "boolean", "optional": true, "field": "system_transaction" } { "type": "string", "optional": true, "field": "value_capture_type" } { "type": "string", "optional": true, "field": "partition_token" } { "type": "int32", "optional": true, "field": "mod_number" } { "type": "boolean", "optional": true, "field": "is_last_record_in_transaction_in_partition" } { "type": "int64", "optional": true, "field": "number_of_partitions_in_transaction" } ], "optional": false, "name": "io.debezium.connector.spanner.Source", "field": "source" }, ] { "type": "string", "optional": false, "field": "op" }, { "type": "int64", "optional": true, "field": "ts_ms" } ], "optional": false, "name": "connector_name.customers.Envelope" }, "payload": { // The values of the row before the event. "before": null, // The values of the row after the event. "after": { "id": 1, "first_name": "Anne", } }, // The source metadata. "source": { "version": "{debezium-version}", "connector": "spanner", "name": "spanner_connector", "ts_ms": 1670955531785, "snapshot": "false", "db": "database", "sequence": "1", "project_id": "project", "instance_id": "instance", "database_id": "database", "change_stream_name": "change_stream", "table": "customers", "server_transaction_id": "transaction_id", "low_watermark": 1670955471635, "read_at_timestamp": 1670955531791, "number_records_in_transaction": 2, "transaction_tag": "", "system_transaction": false, "value_capture_type": "OLD_AND_NEW_VALUES", "partition_token": "partition_token", "mod_number": 0, "is_last_record_in_transaction_in_partition": true, "number_of_partitions_in_transaction": 1 }, "op": "c", "ts_ms": 1559033904863 // }
Niedriges Wasserzeichen
Das niedrige Wasserzeichen beschreibt den Zeitpunkt T, zu dem der Kafka-Connector garantiert alle Ereignisse mit dem Zeitstempel < gestreamt und an ein Kafka-Thema veröffentlicht hat. D.
Sie können das niedrige Wasserzeichen im Kafka-Connector über die
gcp.spanner.low-watermark.enabled
-Parameter. Dieser Parameter ist deaktiviert
ist standardmäßig aktiviert. Wenn das niedrige Wasserzeichen aktiviert ist, wird das Feld low_watermark
in den Änderungsstreamdaten verwendet
Der Änderungseintrag wird mit dem aktuellen niedrigen Wasserzeichen des Kafka-Connectors gefüllt.
Zeitstempel.
Wenn keine Datensätze erstellt werden, sendet der Kafka-Connector regelmäßig Wasserzeichen „Herzschläge“ zu den vom Connector erkannten Kafka-Ausgabethemen.
Diese Wasserzeichen-Heartbeats sind Datensätze, die mit Ausnahme der
low_watermark
. Sie können dann das niedrige Wasserzeichen verwenden, um zeitbasierte Aggregationen durchzuführen.
Sie können z. B. das niedrige Wasserzeichen verwenden, um Ereignisse nach Commit zu sortieren.
Zeitstempel für die Primärschlüssel.
Metadatenthemen
Mit dem Kafka-Connector und dem Kafka Connect-Framework werden Metadatenthemen, um Connector-bezogene Informationen zu speichern. Es ist nicht ratsam, die Konfiguration oder den Inhalt dieser Metadatenthemen ändern.
Im Folgenden sind die Metadatenthemen aufgeführt:
_consumer_offsets
: Ein automatisch von Kafka erstelltes Thema. Speichert Nutzer-Offsets für Nutzer, die im Kafka-Connector erstellt wurden._kafka-connect-offsets
: Ein automatisch von Kafka Connect erstelltes Thema. Speichert die Connector-Offsets._sync_topic_spanner_connector_connectorname
: Ein vom Connector automatisch erstelltes Thema. Speichert Metadaten zu Änderungsstreampartitionen._rebalancing_topic_spanner_connector_connectorname
: Ein vom Connector automatisch erstelltes Thema. Wird verwendet, um die Aktualität von Connector-Aufgaben zu bestimmen._debezium-heartbeat.connectorname
: Ein Thema, das zum Verarbeiten von Spanner-Heartbeats des Änderungsstreams verwendet wird.
Laufzeit des Kafka-Connectors
Im Folgenden wird die Laufzeit des Kafka-Connectors beschrieben.
Skalierbarkeit
Der Kafka-Connector ist horizontal skalierbar und wird für eine oder mehrere Aufgaben ausgeführt, die auf mehrere Kafka Connect-Worker verteilt sind.
Garantien für Nachrichtenübermittlung
Der Kafka-Connector unterstützt die Garantie für mindestens einmalige Zustellung.
Fehlertoleranz
Der Kafka-Connector ist fehlertolerant. Da der Kafka-Connector Änderungen und Erzeugt Ereignisse, zeichnet er den letzten verarbeiteten Commit-Zeitstempel für jede Änderung auf. Streampartition. Wenn der Kafka-Connector aus irgendeinem Grund (z. B. Kommunikationsfehler, Netzwerkprobleme oder Softwarefehler), beim Neustart Der Kafka-Connector setzt das Streaming von Datensätzen dort fort, wo er zuletzt unterbrochen wurde.
Der Kafka-Connector liest das Informationsschema zu Beginn des Kafka-Connectors. Zeitstempel, um Schemainformationen abzurufen. Standardmäßig kann Spanner keine das Informationsschema bei Lesezeitstempeln vor dem Aufbewahrungsdauer, der standardmäßig auf eine Stunde eingestellt ist. Wenn Sie den Connector von vorn beginnen möchten, eine Stunde zurückliegen, müssen Sie die Versionsaufbewahrung der Datenbank erhöhen Punkt.
Kafka-Connector einrichten
Änderungsstream erstellen
Weitere Informationen zum Erstellen eines Änderungsstreams finden Sie unter Änderungsstream erstellen. Für die nächsten Schritte ist eine Spanner-Instanz mit einem konfigurierten Änderungsstream erforderlich.
Wenn Sie sowohl geänderte als auch unveränderte Spalten für jede Spalte zurückgeben möchten,
verwenden Sie den Werterfassungstyp NEW_ROW
. Weitere Informationen finden Sie unter Werterfassungstyp.
Installieren Sie die JAR-Datei für den Kafka-Connector
Wenn Zookeeper, Kafka und Kafka Connect installiert sind, müssen die restlichen Aufgaben zum Bereitstellen eines Kafka-Connectors heruntergeladen werden.
Connector-Plug-in-Archiv aus, extrahieren Sie die JAR-Dateien in Ihre Kafka Connect-Umgebung und fügen Sie den
mit den JAR-Dateien in die Kafka Connect-Datei plugin.path
.
Anschließend müssen Sie Ihren Kafka Connect-Prozess neu starten, um die neuen JAR-Dateien abzurufen.
Wenn Sie mit unveränderlichen Containern arbeiten, können Sie Images aus Container-Images von Debezium für Zookeeper, Kafka und Kafka Connect. Das Kafka Connect-Image hat Spanner-Connector ist vorinstalliert.
Weitere Informationen zum Installieren von Debezium-basierten Kafka-Connector-JARs finden Sie unter Debezium installieren
Kafka-Connector konfigurieren
Das folgende Beispiel zeigt die Konfiguration eines Kafka-Connectors:
der mit einem Änderungsstream namens changeStreamAll
im
Datenbank users
in Instanz test-instance
und Projekt test-project
.
"name": "spanner-connector", "config": { "connector.class": "io.debezium.connector.spanner.SpannerConnector", "gcp.spanner.project.id": "test-project", "gcp.spanner.instance.id": "test-instance", "gcp.spanner.database.id": "users", "gcp.spanner.change.stream": "changeStreamAll", "gcp.spanner.credentials.json": "{"client_id": user@example.com}", "gcp.spanner.database.role": "cdc-role", "tasks.max": "10" }
Diese Konfiguration enthält Folgendes:
Der Name des Connectors, wenn er bei einem Kafka Connect-Dienst registriert ist.
Der Name dieser Spanner-Connector-Klasse.
Die Projekt-ID.
Die Spanner-Instanz-ID.
Die Spanner-Datenbank-ID.
Der Name des Änderungsstreams.
Das JSON-Objekt für den Dienstkontoschlüssel.
(Optional) Die zu verwendende Spanner-Datenbankrolle.
Die maximale Anzahl von Tasks.
Eine vollständige Liste der Connector-Eigenschaften finden Sie unter Konfigurationseigenschaften des Kafka-Connectors.
Connector-Konfiguration zu Kafka Connect hinzufügen
So starten Sie einen Spanner-Connector:
Erstellen Sie eine Konfiguration für den Spanner-Connector.
Verwenden Sie dazu die Kafka Connect REST API. Connector-Konfiguration zu Ihrem Kafka Connect-Cluster.
Sie können diese Konfiguration mit einem POST
-Befehl an ein laufendes Kafka Connect senden
Service. Standardmäßig wird der Kafka Connect-Dienst auf Port 8083
ausgeführt.
Der Dienst zeichnet die Konfiguration auf und startet die Connector-Aufgabe
eine Verbindung zur Spanner-Datenbank herstellt und die Änderungsereignisse streamt.
Kafka-Themen.
Hier ein Beispiel für einen POST
-Befehl:
POST /connectors HTTP/1.1 Host: http://localhost:8083 Accept: application/json { "name": "spanner-connector" "config": { "connector.class": "io.debezium.connector.spanner.SpannerConnector", "gcp.spanner.project.id": "test-project", "gcp.spanner.instance.id": "test-instance", "gcp.spanner.database.id": "users", "gcp.spanner.change.stream": "changeStreamAll", "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }", "heartbeat.interval.ms": "100", "tasks.max": "10" } }
Beispiel für eine erfolgreiche Antwort:
HTTP/1.1 201 Created Content-Type: application/json { "name": "spanner-connector", "config": { "connector.class": "io.debezium.connector.spanner.SpannerConnector", "gcp.spanner.project.id": "test-project", "gcp.spanner.instance.id": "test-instance", "gcp.spanner.database.id": "users", "gcp.spanner.change.stream": "changeStreamAll", "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }", "heartbeat.interval.ms": "100", "tasks.max": "10" }, "tasks": [ { "connector": "spanner-connector", "task": 1 }, { "connector": "spanner-connector", "task": 2 }, { "connector": "spanner-connector", "task": 3 } ] }
Konfiguration des Kafka-Connectors aktualisieren
Senden Sie zum Aktualisieren der Connector-Konfiguration einen PUT
-Befehl an die laufende
Kafka Connect-Dienst mit demselben Connector-Namen.
Angenommen, Sie haben einen Connector mit der Konfiguration von
im vorherigen Abschnitt. Hier ein Beispiel für einen PUT
-Befehl:
PUT /connectors/spanner-connector/config HTTP/1.1 Host: http://localhost:8083 Accept: application/json { "connector.class": "io.debezium.connector.spanner.SpannerConnector", "gcp.spanner.project.id": "test-project", "gcp.spanner.instance.id": "test-instance", "gcp.spanner.database.id": "users", "gcp.spanner.change.stream": "changeStreamAll", "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }", "heartbeat.interval.ms": "100", "tasks.max": "10" }
Beispiel für eine erfolgreiche Antwort:
HTTP/1.1 200 OK Content-Type: application/json { "connector.class": "io.debezium.connector.spanner.SpannerConnector", "tasks.max": "10", "gcp.spanner.project.id": "test-project", "gcp.spanner.instance.id": "test-instance", "gcp.spanner.database.id": "users", "gcp.spanner.change.stream": "changeStreamAll", "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }", "heartbeat.interval.ms": "100", "tasks.max": "10" }
Kafka-Connector beenden
Um den Connector zu beenden, senden Sie einen DELETE
-Befehl an den ausgeführten
Kafka Connect-Dienst mit demselben Connector-Namen.
Angenommen, Sie haben einen Connector mit der Konfiguration von
im vorherigen Abschnitt. Hier ein Beispiel für einen DELETE
-Befehl:
DELETE /connectors/spanner-connector HTTP/1.1 Host: http://localhost:8083
Beispiel für eine erfolgreiche Antwort:
HTTP/1.1 204 No Content
Kafka-Connector überwachen
Neben den Standardmesswerten von Kafka Connect und Debezium exportiert der Kafka-Connector seine eigenen Messwerte:
MilliSecondsLowWatermark
: Das aktuelle niedrige Wasserzeichen der Connector-Aufgabe in Millisekunden. Die ein niedriges Wasserzeichen beschreibt die Zeit T, zu der der Connector garantiert Alle Ereignisse mit dem Zeitstempel < gestreamt DMilliSecondsLowWatermarkLag
: Die Verzögerung des niedrigen Wasserzeichens hinter der aktuellen Zeit in Millisekunden. Alle Ereignisse mit dem Zeitstempel < gestreamt DLatencyLowWatermark<Variant>MilliSeconds
: die Verzögerung des niedrigen Wasserzeichens hinter der aktuellen Zeit in Millisekunden. Die Varianten P50, P95, P99, Durchschnitt, Min und Max werden angegeben.LatencySpanner<Variant>MilliSeconds
: Die Latenzzeit für das Lesen von Spanner-Commit-Zeitstempel-an-Connector-Lesevorgängen. Die Varianten P50, P95, P99, Durchschnitt, Min und Max werden angegeben.LatencyReadToEmit<Variant>MilliSeconds
: Die Latenz von Spanner-Lese-Zeitstempel-zu-Connector-Emit. Die Varianten P50, P95, P99, Durchschnitt, Min und Max werden angegeben.LatencyCommitToEmit<Variant>tMilliSeconds
: Die Latenz von Spanner-Commit-Zeitstempel-zu-Connector-Emit. Die Varianten P50, P95, P99, Durchschnitt, Min und Max werden angegeben.LatencyCommitToPublish<Variant>MilliSeconds
: Die Latenzzeit zwischen Spanner-Commit-Zeitstempel und Kafka-Veröffentlichungs-Zeitstempel. Die Varianten P50, P95, P99, Durchschnitt, Min und Max werden angegeben.NumberOfChangeStreamPartitionsDetected
: die Gesamtzahl der Partitionen von der aktuellen Connector-Aufgabe erkannt.NumberOfChangeStreamQueriesIssued
: Die Gesamtzahl der Änderungsstreamabfragen die von der aktuellen Aufgabe ausgegeben wird.NumberOfActiveChangeStreamQueries
: Die Anzahl der aktiven Änderungsstreams Abfragen, die von der aktuellen Connector-Aufgabe erkannt wurden.SpannerEventQueueCapacity
: Die Gesamtkapazität vonStreamEventQueue
, einer Warteschlange, in der Elemente gespeichert werden, die über Änderungsstreamabfragen empfangen wurden.SpannerEventQueueCapacity
: Die verbleibendeStreamEventQueue
-Kapazität.TaskStateChangeEventQueueCapacity
: Die Gesamtkapazität vonTaskStateChangeEventQueue
, einer Warteschlange, in der Ereignisse im Connector gespeichert werden.RemainingTaskStateChangeEventQueueCapacity
: Die verbleibendeTaskStateChangeEventQueue
-Kapazität.NumberOfActiveChangeStreamQueries
: Die Anzahl der aktiven Änderungsstreams Abfragen, die von der aktuellen Connector-Aufgabe erkannt wurden.
Konfigurationseigenschaften des Kafka-Connectors
Für den Connector sind die folgenden erforderlichen Konfigurationseigenschaften erforderlich:
name
: Eindeutiger Name für den Connector. Ein erneuter Versuch mit demselben Namen führt zu einem Fehler. Dieses Attribut ist für alle Kafka Connect-Connectors erforderlich.connector.class
: Der Name der Java-Klasse für den Connector. Verwenden Sie für den Kafka-Connector immer den Wertio.debezium.connector.spanner.SpannerConnector
.tasks.max
: Die maximale Anzahl von Aufgaben, die für diesen Connector erstellt werden sollen.gcp.spanner.project.id
: die Projekt-IDgcp.spanner.instance.id
: die Spanner-Instanz-IDgcp.spanner.database.id
: Die Spanner-Datenbank-IDgcp.spanner.change.stream
: Der Name des Spanner-Änderungsstreamsgcp.spanner.credentials.json
: Das JSON-Objekt des Dienstkontoschlüssels.gcp.spanner.credentials.path
: Der Dateipfad zum JSON-Objekt des Dienstkontoschlüssels. Erforderlich, wenn das Feld oben nicht ausgefüllt ist.gcp.spanner.database.role
: Die Spanner-Datenbankrolle für verwenden. Dies ist nur erforderlich, wenn der Änderungsstream mit gesichert ist mit eine differenzierte Zugriffssteuerung. Die Datenbankrolle muss die BerechtigungSELECT
für das Änderungsstream und die BerechtigungEXECUTE
für den Lesevorgang des Änderungsstreams . Weitere Informationen finden Sie unter Detailgenaue Zugriffssteuerung für Änderungen Streams
Die folgenden erweiterten Konfigurationseigenschaften haben Standardeinstellungen, die in den meisten und müssen daher nur selten in der Konfiguration des Connectors angegeben werden:
gcp.spanner.low-watermark.enabled
: Gibt an, ob das niedrige Wasserzeichen für den Connector aktiviert ist. Der Standardwert ist "False".gcp.spanner.low-watermark.update-period.ms
: Das Intervall, in dem das niedrige Wasserzeichen aktualisiert wird. Der Standardwert ist 1.000 ms.heartbeat.interval.ms
: Das Spanner-Heartbeat-Intervall. Der Standardwert ist 300.000 (fünf Minuten).gcp.spanner.start.time
: Die Startzeit des Connectors. Die Standardeinstellung ist die aktuelle Uhrzeit.gcp.spanner.end.time
: Ende des Connectors. Die Standardeinstellung ist unendlich.tables.exclude.list
: Die Tabellen, für die Änderungsereignisse ausgeschlossen werden sollen. Die Standardeinstellung ist leer.tables.include.list
: Die Tabellen, für die Änderungsereignisse enthalten sein sollen. Wenn das Feld nicht ausgefüllt ist, werden alle Tabellen eingeschlossen. Die Standardeinstellung ist leer.gcp.spanner.stream.event.queue.capacity
: Die Kapazität der Spanner-Ereigniswarteschlange. Die Standardeinstellung ist 10.000.connector.spanner.task.state.change.event.queue.capacity
: Die Kapazität der Ereigniswarteschlange der Aufgabenstatusänderung. Die Standardeinstellung ist 1.000.connector.spanner.max.missed.heartbeats
: Die maximale Anzahl verpasster Heartbeats für eine Änderungsstreamabfrage, bevor eine Ausnahme ausgelöst wird. Der Standardwert ist 10.scaler.monitor.enabled
: Gibt an, ob das Aufgaben-Autoscaling aktiviert ist. Die Standardeinstellung ist "false".tasks.desired.partitions
: Die bevorzugte Anzahl von Partitionen für Änderungsstreams pro Aufgabe. Dieser Parameter wird für das Aufgaben-Autoscaling benötigt. Die Standardeinstellung ist 2.tasks.min
: Die Mindestanzahl von Aufgaben. Dieser Parameter wird für das Aufgaben-Autoscaling benötigt. Der Standardfaktor ist 1.connector.spanner.sync.topic
: Der Name des Synchronisierungsthemas. Dabei handelt es sich um ein internes Connector-Thema, das zum Speichern der Kommunikation zwischen Aufgaben verwendet wird. Die Standardeinstellung ist_sync_topic_spanner_connector_connectorname
, wenn der Nutzer keinen Namen angegeben hat.connector.spanner.sync.poll.duration
: Die Dauer der Abfrage für das Synchronisierungsthema. Die Standardeinstellung ist 500 ms.connector.spanner.sync.request.timeout.ms
: Das Zeitlimit für Anfragen an das Synchronisierungsthema. Die Standardeinstellung ist 5.000 ms.connector.spanner.sync.delivery.timeout.ms
: Das Zeitlimit für die Veröffentlichung im Synchronisierungsthema. Die Standardeinstellung ist 15.000 ms.connector.spanner.sync.commit.offsets.interval.ms
: Das Intervall, in dem Offsets für das Synchronisierungsthema per Commit übergeben werden. Die Standardeinstellung ist 60.000 ms.connector.spanner.sync.publisher.wait.timeout
: Das Intervall, in dem Nachrichten im Synchronisierungsthema veröffentlicht werden. Die Standardeinstellung ist 5 ms.connector.spanner.rebalancing.topic
: Der Name des Ausgleichsthemas. Das Ausgleichsthema ist ein internes Connector-Thema, mit dem die Aufgabenaktivität festgestellt wird. Die Standardeinstellung ist_rebalancing_topic_spanner_connector_connectorname
, wenn der Nutzer keinen Namen angegeben hat.connector.spanner.rebalancing.poll.duration
: Die Abfragedauer für das Ausgleichsthema. Die Standardeinstellung ist 5.000 ms.connector.spanner.rebalancing.commit.offsets.timeout
: Das Zeitlimit für das Commit von Offsets für das Ausgleichsthema. Die Standardeinstellung ist 5.000 ms.connector.spanner.rebalancing.commit.offsets.interval.ms
: Das Intervall, in dem Offsets für das Synchronisierungsthema per Commit übergeben werden. Die Standardeinstellung ist 60.000 ms.connector.spanner.rebalancing.task.waiting.timeout
: Die Zeit, die eine Aufgabe wartet, bevor ein Ausgleichsereignis verarbeitet wird. Die Standardeinstellung ist 1.000 ms.
Eine noch detailliertere Liste der Eigenschaften des konfigurierbaren Connectors finden Sie im GitHub-Repository.
Beschränkungen
Der Connector unterstützt das Streaming von Snapshot-Ereignissen nicht.
Wenn im Connector Wasserzeichen aktiviert sind, können Sie Debezium-Thema nicht konfigurieren Routingtransformationen.