Änderungsstream-Verbindungen zu Kafka erstellen

Auf dieser Seite wird erläutert, wie Sie mit dem Kafka-Connector Daten aus Spanner-Änderungsstreams verarbeiten und weiterleiten.

Wichtige Konzepte

Im Folgenden werden die wichtigsten Konzepte des Kafka-Connectors beschrieben.

Debezium

Debezium ist ein Open-Source-Projekt, das eine Streamingplattform mit niedriger Latenz für Change Data Capture bietet.

Kafka-Connector

Der Kafka-Connector bietet eine Abstraktion über die Spanner API, um Spanner-Änderungsstreams an Kafka zu veröffentlichen. Mit diesem Connector müssen Sie den Partitionslebenszyklus des Änderungsstreams nicht verwalten. Das ist notwendig, wenn Sie die Spanner API direkt verwenden.

Der Kafka-Connector erzeugt ein Änderungsereignis für jede Änderung von Datenänderungseinträgen und sendet Änderungsereigniseinträge im Downstream an ein separates Kafka-Thema für jede Tabelle, die durch einen Änderungsstream verfolgt wird. Ein Mod eines Datenänderungseintrags stellt eine einzelne erfasste Änderung (Einfügen, Aktualisieren oder Löschen) dar. Ein einzelner Datenänderungseintrag kann mehr als einen Mod enthalten.

Ausgabe des Kafka-Connectors

Der Kafka-Connector leitet Änderungsstreameinträge direkt an ein separates Kafka-Thema weiter. Der Name des Ausgabethemas sollte connector_name.table_name lauten. Wenn das Thema nicht vorhanden ist, erstellt der Kafka-Connector automatisch ein Thema unter diesem Namen.

Sie können auch Themen-Routing-Transformationen konfigurieren, um Einträge in von Ihnen angegebene Themen umzuleiten. Wenn Sie die Themenweiterleitung verwenden möchten, deaktivieren Sie die Funktion Niedriges Wasserzeichen.

Datensatzsortierung

Datensätze werden in den Kafka-Themen nach dem Commit-Zeitstempel pro Primärschlüssel sortiert. Datensätze, die zu verschiedenen Primärschlüsseln gehören, haben keine Reihenfolgegarantien. Datensätze mit demselben Primärschlüssel werden in derselben Kafka-Themenpartition gespeichert. Wenn Sie ganze Transaktionen verarbeiten möchten, können Sie auch die Felder server_transaction_id und number_of_records_in_transaction des Datenänderungseintrags verwenden, um eine Spanner-Transaktion zusammenzustellen.

Ereignisse ändern

Der Kafka-Connector generiert für jeden INSERT-, UPDATE- und DELETE-Vorgang ein Datenänderungsereignis. Jedes Ereignis enthält einen Schlüssel und Werte für die geänderte Zeile.

Sie können Kafka Connect-Konverter verwenden, um Datenänderungsereignisse im Format Protobuf, AVRO, JSON oder JSON Schemaless zu generieren. Wenn Sie einen Kafka Connect-Konverter verwenden, der Schemas erstellt, enthält das Ereignis separate Schemas für den Schlüssel und die Werte. Andernfalls enthält das Ereignis nur den Schlüssel und die Werte.

Das Schema für den Schlüssel ändert sich nie. Das Schema für die Werte ist eine Zusammenfassung aller Spalten, die der Änderungsstream seit der Startzeit des Connectors verfolgt hat.

Wenn Sie den Connector zum Erstellen von JSON-Ereignissen konfigurieren, umfasst das Ausgabeänderungsereignis fünf Felder:

  • Das erste Feld schema gibt ein Kafka Connect-Schema an, das das Spanner-Schlüsselschema beschreibt.

  • Das erste payload-Feld hat die im vorherigen schema-Feld beschriebene Struktur und enthält den Schlüssel für die geänderte Zeile.

  • Im zweiten Feld schema wird das Kafka Connect-Schema angegeben, das das Schema für die geänderte Zeile beschreibt.

  • Das zweite payload-Feld hat die im vorherigen schema-Feld beschriebene Struktur und enthält die tatsächlichen Daten für die geänderte Zeile.

  • Das Feld source ist ein Pflichtfeld, das die Quellmetadaten für das Ereignis beschreibt.

Hier ein Beispiel für ein Datenänderungsereignis:

{
  // The schema for the Spanner key.
  "schema": {
    "type": "struct",
    "name": "customers.Key",
    "optional": false,
    "fields": [
      {
        "type": "int64",
        "optional": "false"
        "field": "false"
      }
    ]
  },
  // The value of the Spanner key.
  "payload": {
      "id": "1"
  },
  // The schema for the payload, which contains the before and after values
  // of the changed row. The schema for the payload contains all the
  // columns that the change stream has tracked since the connector start
  // time.
  "schema": {
    "type": "struct",
    "fields": [
      {
        // The schema for the before values of the changed row.
        "type": "struct",
        "fields": [
            {
                "type": "int32",
                "optional": false,
                "field": "id"
            },
            {
                "type": "string",
                "optional": true,
                "field": "first_name"
            }
        ],
        "optional": true,
        "name": "customers.Value",
        "field": "before"
      },
      {
        // The schema for the after values of the changed row.
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          }
        ],
          "optional": true,
          "name": "customers.Value",
          "field": "after"
        },
        {
          // The schema for the source metadata for the event.
          "type": "struct",
          "fields": [
            {
                "type": "string",
                "optional": false,
                "field": "version"
            },
            {
                "type": "string",
                "optional": false,
                "field": "connector"
            },
            {
                "type": "string",
                "optional": false,
                "field": "name"
            },
            {
                "type": "int64",
                "optional": false,
                "field": "ts_ms"
            },
            {
                "type": "boolean",
                "optional": true,
                "default": false,
                "field": "snapshot"
            },
            {
                "type": "string",
                "optional": false,
                "field": "db"
            },
            {
                "type": "string",
                "optional": false,
                "field": "sequence"
            },
            {
                "type": "string",
                "optional": false,
                "field": "project_id"
            },
            {
                "type": "string",
                "optional": false,
                "field": "instance_id"
            },
            {
                "type": "string",
                "optional": false,
                "field": "database_id"
            },
            {
                "type": "string",
                "optional": false,
                "field": "change_stream_name"
            },
            {
                "type": "string",
                "optional": true,
                "field": "table"
            }
            {
                "type": "string",
                "optional": true,
                "field": "server_transaction_id"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "low_watermark"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "read_at_timestamp"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "number_of_records_in_transaction"
            }
            {
                "type": "string",
                "optional": true,
                "field": "transaction_tag"
            }
            {
                "type": "boolean",
                "optional": true,
                "field": "system_transaction"
            }
            {
                "type": "string",
                "optional": true,
                "field": "value_capture_type"
            }
            {
                "type": "string",
                "optional": true,
                "field": "partition_token"
            }
            {
                "type": "int32",
                "optional": true,
                "field": "mod_number"
            }
            {
                "type": "boolean",
                "optional": true,
                "field": "is_last_record_in_transaction_in_partition"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "number_of_partitions_in_transaction"
            }
          ],
          "optional": false,
          "name": "io.debezium.connector.spanner.Source",
          "field": "source"
        },
      ]
      {
        "type": "string",
        "optional": false,
        "field": "op"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
      }
    ],
    "optional": false,
    "name": "connector_name.customers.Envelope"
  },
  "payload": {
    // The values of the row before the event.
    "before": null,
    // The values of the row after the event.
    "after": {
        "id": 1,
        "first_name": "Anne",
    }
  },
  // The source metadata.
  "source": {
    "version": "{debezium-version}",
    "connector": "spanner",
    "name": "spanner_connector",
    "ts_ms": 1670955531785,
    "snapshot": "false",
    "db": "database",
    "sequence": "1",
    "project_id": "project",
    "instance_id": "instance",
    "database_id": "database",
    "change_stream_name": "change_stream",
    "table": "customers",
    "server_transaction_id": "transaction_id",
    "low_watermark": 1670955471635,
    "read_at_timestamp": 1670955531791,
    "number_records_in_transaction": 2,
    "transaction_tag": "",
    "system_transaction": false,
    "value_capture_type": "OLD_AND_NEW_VALUES",
    "partition_token": "partition_token",
    "mod_number": 0,
    "is_last_record_in_transaction_in_partition": true,
    "number_of_partitions_in_transaction": 1
  },
  "op": "c",
  "ts_ms": 1559033904863 //
}

Niedriges Wasserzeichen

Das niedrige Wasserzeichen beschreibt den Zeitpunkt T, zu dem der Kafka-Connector alle Ereignisse mit dem Zeitstempel < T garantiert in ein Kafka-Thema gestreamt und veröffentlicht hat.

Sie können das niedrige Wasserzeichen im Kafka-Connector mithilfe des Parameters gcp.spanner.low-watermark.enabled aktivieren. Dieser Parameter ist standardmäßig deaktiviert. Wenn das Low-Wasserzeichen aktiviert ist, wird das Feld low_watermark im Änderungsstream-Datenänderungseintrag mit dem aktuellen Wasserzeichen-Zeitstempel des Kafka-Connectors gefüllt.

Wenn keine Datensätze erstellt werden, sendet der Kafka-Connector regelmäßig Wasserzeichen „Heartbeats“ an die vom Connector erkannten Kafka-Ausgabethemen.

Diese Wasserzeichen-Heartbeats sind Datensätze, die mit Ausnahme des Felds low_watermark leer sind. Anschließend können Sie das Low-Wasserzeichen verwenden, um zeitbasierte Aggregationen durchzuführen. Sie können das Low-Wasserzeichen beispielsweise verwenden, um Ereignisse nach dem Commit-Zeitstempel über Primärschlüssel hinweg zu sortieren.

Metadatenthemen

Der Kafka-Connector und das Kafka Connect-Framework erstellen mehrere Metadatenthemen, um Connector-bezogene Informationen zu speichern. Es ist nicht empfehlenswert, entweder die Konfiguration oder den Inhalt dieser Metadatenthemen zu ändern.

Im Folgenden sind die Metadatenthemen aufgeführt:

  • _consumer_offsets: Ein Thema, das automatisch von Kafka erstellt wird. Speichert Nutzer-Offsets für Nutzer, die im Kafka-Connector erstellt wurden.
  • _kafka-connect-offsets: Ein Thema, das automatisch von Kafka Connect erstellt wird. Speichert die Connector-Offsets.
  • _sync_topic_spanner_connector_connectorname: Ein Thema, das automatisch vom Connector erstellt wird. Speichert Metadaten zu Änderungsstreampartitionen.
  • _rebalancing_topic_spanner_connector_connectorname: Ein Thema, das automatisch vom Connector erstellt wird. Wird verwendet, um die Aktivität von Connector-Aufgaben zu ermitteln.
  • _debezium-heartbeat.connectorname: Ein Thema, mit dem Spanner-Änderungsstream-Heartbeats verarbeitet werden.

Laufzeit des Kafka-Connectors

Im Folgenden wird die Laufzeit des Kafka-Connectors beschrieben.

Skalierbarkeit

Der Kafka-Connector ist horizontal skalierbar und wird für eine oder mehrere Aufgaben ausgeführt, die auf mehrere Kafka Connect-Worker verteilt sind.

Garantien für Nachrichtenübermittlung

Der Kafka-Connector unterstützt eine mindestens einmalige Zustellung.

Fehlertoleranz

Der Kafka-Connector ist fehlertolerant. Während der Kafka-Connector Änderungen liest und Ereignisse erstellt, wird der letzte verarbeitete Commit-Zeitstempel für jede Änderungsstreampartition aufgezeichnet. Wenn der Kafka-Connector aus irgendeinem Grund beendet wird (z. B. bei Kommunikationsfehlern, Netzwerkproblemen oder Softwarefehlern), fährt der Kafka-Connector beim Neustart die Datensätze dort weiter, wo er zuletzt angehalten wurde.

Der Kafka-Connector liest das Informationsschema am Startzeitstempel des Kafka-Connectors, um Schemainformationen abzurufen. Standardmäßig kann Spanner das Informationsschema nicht bei Lesezeitstempeln vor der Versionsaufbewahrungsdauer lesen, die standardmäßig eine Stunde beträgt. Wenn Sie den Connector vor mehr als einer Stunde starten möchten, müssen Sie die Aufbewahrungsdauer der Datenbankversion erhöhen.

Kafka-Connector einrichten

Änderungsstream erstellen

Weitere Informationen zum Erstellen eines Änderungsstreams finden Sie unter Änderungsstream erstellen. Damit Sie mit den nächsten Schritten fortfahren können, ist eine Spanner-Instanz mit konfiguriertem Änderungsstream erforderlich.

Wenn Sie möchten, dass bei jedem Datenänderungsereignis sowohl geänderte als auch unveränderte Spalten zurückgegeben werden, müssen Sie den Werterfassungstyp NEW_ROW verwenden. Weitere Informationen finden Sie unter Werterfassungstyp.

JAR-Datei für den Kafka-Connector installieren

Wenn Zookeeper, Kafka und Kafka Connect installiert sind, müssen Sie noch das Plug-in-Archiv des Connectors herunterladen, die JAR-Dateien in Ihre Kafka Connect-Umgebung extrahieren und das Verzeichnis mit den JAR-Dateien in Kafka Connect plugin.path einfügen. Anschließend müssen Sie den Kafka Connect-Prozess neu starten, um die neuen JAR-Dateien abzurufen.

Wenn Sie mit unveränderlichen Containern arbeiten, können Sie Images aus den Container-Images von Debezium für Zookeeper, Kafka und Kafka Connect abrufen. Auf dem Kafka Connect-Image ist der Spanner-Connector vorinstalliert.

Weitere Informationen zum Installieren von Debezium-basierten Kafka-Connector-JARs finden Sie unter Debezium installieren.

Kafka-Connector konfigurieren

Im Folgenden finden Sie ein Konfigurationsbeispiel für einen Kafka-Connector, der eine Verbindung zu einem Änderungsstream namens changeStreamAll in der Datenbank users in der Instanz test-instance und dem Projekt test-project herstellt.

"name": "spanner-connector",
"config": {
    "connector.class": "io.debezium.connector.spanner.SpannerConnector",
    "gcp.spanner.project.id": "test-project",
    "gcp.spanner.instance.id": "test-instance",
    "gcp.spanner.database.id": "users",
    "gcp.spanner.change.stream": "changeStreamAll",
    "gcp.spanner.credentials.json": "{"client_id": user@example.com}",
    "gcp.spanner.database.role": "cdc-role",
    "tasks.max": "10"
}

Diese Konfiguration enthält Folgendes:

  • Der Name des Connectors bei der Registrierung bei einem Kafka Connect-Dienst.

  • Der Name dieser Spanner-Connector-Klasse.

  • Die Projekt-ID.

  • Die Spanner-Instanz-ID.

  • Die Spanner-Datenbank-ID.

  • Der Name des Änderungsstreams.

  • Das JSON-Objekt für den Dienstkontoschlüssel.

  • (Optional) Die zu verwendende Spanner-Datenbankrolle.

  • Die maximale Anzahl von Tasks.

Eine vollständige Liste der Connector-Eigenschaften finden Sie hier.

Connector-Konfiguration zu Kafka Connect hinzufügen

So starten Sie einen Spanner-Connector:

  1. Erstellen Sie eine Konfiguration für den Spanner-Connector.

  2. Verwenden Sie die Kafka Connect REST API, um diese Connector-Konfiguration Ihrem Kafka Connect-Cluster hinzuzufügen.

Sie können diese Konfiguration mit einem POST-Befehl an einen laufenden Kafka Connect-Dienst senden. Standardmäßig wird der Kafka Connect-Dienst auf Port 8083 ausgeführt. Der Dienst zeichnet die Konfiguration auf und startet die Connector-Aufgabe, die eine Verbindung zur Spanner-Datenbank herstellt und Änderungsereigniseinträge an Kafka-Themen streamt.

Hier ein Beispiel für einen POST-Befehl:

POST /connectors HTTP/1.1
Host: http://localhost:8083
Accept: application/json
{
  "name": "spanner-connector"
  "config": {
      "connector.class": "io.debezium.connector.spanner.SpannerConnector",
      "gcp.spanner.project.id": "test-project",
      "gcp.spanner.instance.id": "test-instance",
      "gcp.spanner.database.id": "users",
      "gcp.spanner.change.stream": "changeStreamAll",
      "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
      "heartbeat.interval.ms": "100",
      "tasks.max": "10"
  }
}

Beispiel für eine erfolgreiche Antwort:

HTTP/1.1 201 Created
Content-Type: application/json
{
    "name": "spanner-connector",
    "config": {
        "connector.class": "io.debezium.connector.spanner.SpannerConnector",
        "gcp.spanner.project.id": "test-project",
        "gcp.spanner.instance.id": "test-instance",
        "gcp.spanner.database.id": "users",
        "gcp.spanner.change.stream": "changeStreamAll",
        "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
        "heartbeat.interval.ms": "100",
        "tasks.max": "10"
    },
    "tasks": [
        { "connector": "spanner-connector", "task": 1 },
        { "connector": "spanner-connector", "task": 2 },
        { "connector": "spanner-connector", "task": 3 }
    ]
}

Konfiguration des Kafka-Connectors aktualisieren

Zum Aktualisieren der Connector-Konfiguration senden Sie einen PUT-Befehl an den ausgeführten Kafka Connect-Dienst mit demselben Connector-Namen.

Angenommen, wir haben einen Connector mit der Konfiguration aus dem vorherigen Abschnitt, die ausgeführt wird. Hier ein Beispiel für einen PUT-Befehl:

PUT /connectors/spanner-connector/config HTTP/1.1
Host: http://localhost:8083
Accept: application/json
{
    "connector.class": "io.debezium.connector.spanner.SpannerConnector",
    "gcp.spanner.project.id": "test-project",
    "gcp.spanner.instance.id": "test-instance",
    "gcp.spanner.database.id": "users",
    "gcp.spanner.change.stream": "changeStreamAll",
    "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
    "heartbeat.interval.ms": "100",
    "tasks.max": "10"
}

Beispiel für eine erfolgreiche Antwort:

HTTP/1.1 200 OK
Content-Type: application/json
{
    "connector.class": "io.debezium.connector.spanner.SpannerConnector",
    "tasks.max": "10",
    "gcp.spanner.project.id": "test-project",
    "gcp.spanner.instance.id": "test-instance",
    "gcp.spanner.database.id": "users",
    "gcp.spanner.change.stream": "changeStreamAll",
    "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
    "heartbeat.interval.ms": "100",
    "tasks.max": "10"
}

Kafka-Connector beenden

Zum Beenden des Connectors senden Sie einen DELETE-Befehl an den ausgeführten Kafka Connect-Dienst mit demselben Connector-Namen.

Angenommen, wir haben einen Connector mit der Konfiguration aus dem vorherigen Abschnitt, die ausgeführt wird. Hier ein Beispiel für einen DELETE-Befehl:

DELETE /connectors/spanner-connector HTTP/1.1
Host: http://localhost:8083

Beispiel für eine erfolgreiche Antwort:

HTTP/1.1 204 No Content

Kafka-Connector überwachen

Zusätzlich zu den standardmäßigen Kafka Connect- und Debezium-Messwerten exportiert der Kafka-Connector seine eigenen Messwerte:

  • MilliSecondsLowWatermark: Das aktuelle Wasserzeichen der Connector-Aufgabe in Millisekunden. Das untere Wasserzeichen beschreibt die Zeit T, zu der der Connector garantiert alle Ereignisse mit dem Zeitstempel < T gestreamt hat.

  • MilliSecondsLowWatermarkLag: Die Verzögerung des niedrigen Wasserzeichens hinter der aktuellen Zeit in Millisekunden. Alle Ereignisse mit Zeitstempel < T wurden gestreamt.

  • LatencyLowWatermark<Variant>MilliSeconds: Die Verzögerung des niedrigen Wasserzeichens hinter der aktuellen Zeit in Millisekunden. Es werden die Varianten P50, P95, P99, Durchschnitt, Min und Max angegeben.

  • LatencySpanner<Variant>MilliSeconds: Die Latenz von Spanner-commit-timestamp-to-connector-read. Die Varianten P50, P95, P99, Durchschnitt, Min und Max werden angegeben.

  • LatencyReadToEmit<Variant>MilliSeconds: Die Spanner-Lese-Zeitstempel-zu-Connector-Emit-Latenz. Es werden die Varianten P50, P95, P99, Durchschnitt, Min und Max angegeben.

  • LatencyCommitToEmit<Variant>tMilliSeconds: Die Latenz von Spanner-commit-timestamp-to-connector-emit. Es werden die Varianten P50, P95, P99, Durchschnitt, Min und Max angegeben.

  • LatencyCommitToPublish<Variant>MilliSeconds: Die Latenz von Spanner-commit-timestamp-to Kafka-publish-timestamp. Die Varianten P50, P95, P99, Durchschnitt, Min und Max werden angegeben.

  • NumberOfChangeStreamPartitionsDetected: Die Gesamtzahl der Partitionen, die von der aktuellen Connector-Task erkannt wurden.

  • NumberOfChangeStreamQueriesIssued: Die Gesamtzahl der Änderungsstreamabfragen, die von der aktuellen Aufgabe ausgegeben wurden.

  • NumberOfActiveChangeStreamQueries: Die aktive Anzahl der Änderungsstreamabfragen, die von der aktuellen Connector-Aufgabe erkannt werden.

  • SpannerEventQueueCapacity: Die Gesamtkapazität von StreamEventQueue, einer Warteschlange, in der Elemente gespeichert werden, die über Änderungsstreamabfragen empfangen wurden.

  • SpannerEventQueueCapacity: die verbleibende Kapazität von StreamEventQueue

  • TaskStateChangeEventQueueCapacity: Die Gesamtkapazität von TaskStateChangeEventQueue, einer Warteschlange, in der Ereignisse im Connector gespeichert werden.

  • RemainingTaskStateChangeEventQueueCapacity: die verbleibende Kapazität von TaskStateChangeEventQueue

  • NumberOfActiveChangeStreamQueries: Die aktive Anzahl der Änderungsstreamabfragen, die von der aktuellen Connector-Aufgabe erkannt werden.

Konfigurationsattribute des Kafka-Connectors

Folgende Konfigurationseigenschaften sind für den Connector erforderlich:

  • name: Eindeutiger Name für den Connector. Der Versuch, sich noch einmal mit demselben Namen zu registrieren, führt zu einem Fehler. Dieses Attribut ist für alle Kafka Connect-Connectors erforderlich.

  • connector.class: Der Name der Java-Klasse für den Connector. Verwenden Sie für den Kafka-Connector immer den Wert io.debezium.connector.spanner.SpannerConnector.

  • tasks.max: Die maximale Anzahl von Aufgaben, die für diesen Connector erstellt werden sollen.

  • gcp.spanner.project.id: die Projekt-ID

  • gcp.spanner.instance.id: die Spanner-Instanz-ID

  • gcp.spanner.database.id: die Spanner-Datenbank-ID

  • gcp.spanner.change.stream: Der Name des Spanner-Änderungsstreams

  • gcp.spanner.credentials.json: Das JSON-Objekt des Dienstkontoschlüssels.

  • gcp.spanner.credentials.path: Der Dateipfad zum JSON-Objekt des Dienstkontoschlüssels. Erforderlich, wenn das obige Feld nicht ausgefüllt ist.

  • gcp.spanner.database.role : Die zu verwendende Spanner-Datenbankrolle. Dies ist nur erforderlich, wenn der Änderungsstream durch eine detailgenaue Zugriffssteuerung gesichert ist. Die Datenbankrolle muss die Berechtigung SELECT für den Änderungsstream und die Berechtigung EXECUTE für die Lesefunktion des Änderungsstreams haben. Weitere Informationen finden Sie unter Detailgenaue Zugriffssteuerung für Änderungsstreams.

Die folgenden erweiterten Konfigurationseigenschaften enthalten Standardeinstellungen, die in den meisten Situationen funktionieren und daher nur selten in der Konfiguration des Connectors angegeben werden müssen:

  • gcp.spanner.low-watermark.enabled: Gibt an, ob das niedrige Wasserzeichen für den Connector aktiviert ist. Der Standardwert ist "false".

  • gcp.spanner.low-watermark.update-period.ms: Das Intervall, in dem das niedrige Wasserzeichen aktualisiert wird. Der Standardwert ist 1.000 ms.

  • heartbeat.interval.ms: Das Spanner-Heartbeat-Intervall. Der Standardwert ist 300.000 (fünf Minuten).

  • gcp.spanner.start.time: Die Startzeit des Connectors. Die Standardeinstellung ist die aktuelle Uhrzeit.

  • gcp.spanner.end.time: Die Endzeit des Connectors. Die Standardeinstellung ist unendlich.

  • tables.exclude.list: Die Tabellen, bei denen Änderungsereignisse ausgeschlossen werden sollen. Die Standardeinstellung ist leer.

  • tables.include.list: Die Tabellen, für die Änderungsereignisse enthalten sein sollen. Wenn das Feld nicht ausgefüllt ist, werden alle Tabellen einbezogen. Die Standardeinstellung ist leer.

  • gcp.spanner.stream.event.queue.capacity: Die Kapazität der Spanner-Ereigniswarteschlange. Die Standardeinstellung ist 10.000.

  • connector.spanner.task.state.change.event.queue.capacity: Die Kapazität der Ereigniswarteschlange für die Aufgabenstatusänderung. Die Standardeinstellung ist 1.000.

  • connector.spanner.max.missed.heartbeats: Die maximale Anzahl verpasster Herzschläge für eine Änderungsstreamabfrage, bevor eine Ausnahme ausgelöst wird. Der Standardwert ist 10.

  • scaler.monitor.enabled: Gibt an, ob das Aufgaben-Autoscaling aktiviert ist. Die Standardeinstellung ist "false".

  • tasks.desired.partitions: Die bevorzugte Anzahl von Änderungsstreampartitionen pro Task. Dieser Parameter wird für das Autoscaling von Aufgaben benötigt. Die Standardeinstellung ist 2.

  • tasks.min: Die Mindestanzahl von Tasks. Dieser Parameter wird für das Autoscaling von Aufgaben benötigt. Der Standardfaktor ist 1.

  • connector.spanner.sync.topic: Der Name des Synchronisierungsthemas. Dies ist ein internes Connector-Thema, das zum Speichern der Kommunikation zwischen Aufgaben verwendet wird. Wenn der Nutzer keinen Namen angegeben hat, wird die Standardeinstellung _sync_topic_spanner_connector_connectorname verwendet.

  • connector.spanner.sync.poll.duration: Die Abfragedauer für das Synchronisierungsthema. Die Standardeinstellung ist 500 ms.

  • connector.spanner.sync.request.timeout.ms: Das Zeitlimit für Anfragen an das Synchronisierungsthema. Die Standardeinstellung ist 5.000 ms.

  • connector.spanner.sync.delivery.timeout.ms: Das Zeitlimit für die Veröffentlichung im Synchronisierungsthema. Die Standardeinstellung ist 15.000 ms.

  • connector.spanner.sync.commit.offsets.interval.ms: Das Intervall, in dem für das Synchronisierungsthema ein Commit für Offsets durchgeführt wird. Die Standardeinstellung ist 60.000 ms.

  • connector.spanner.sync.publisher.wait.timeout: Das Intervall, in dem Nachrichten im Synchronisierungsthema veröffentlicht werden. Die Standardeinstellung ist 5 ms.

  • connector.spanner.rebalancing.topic: Der Name des Themas für den Ausgleich. Das Thema für den Ausgleich ist ein internes Connector-Thema, mit dem die Aktivität von Aufgaben ermittelt wird. Wenn der Nutzer keinen Namen angegeben hat, wird die Standardeinstellung _rebalancing_topic_spanner_connector_connectorname verwendet.

  • connector.spanner.rebalancing.poll.duration: Die Abfragedauer für das Thema des Ausgleichs. Die Standardeinstellung ist 5.000 ms.

  • connector.spanner.rebalancing.commit.offsets.timeout: Das Zeitlimit für das Commit von Offsets für das Thema für den Ausgleich. Die Standardeinstellung ist 5.000 ms.

  • connector.spanner.rebalancing.commit.offsets.interval.ms: Das Intervall, in dem für das Synchronisierungsthema ein Commit für Offsets durchgeführt wird. Die Standardeinstellung ist 60.000 ms.

  • connector.spanner.rebalancing.task.waiting.timeout: Die Zeit, die eine Aufgabe wartet, bevor ein Ausgleichsereignis verarbeitet wird. Die Standardeinstellung ist 1.000 ms.

Eine noch detailliertere Liste der konfigurierbaren Connector-Attribute finden Sie im GitHub-Repository.

Beschränkungen