Verbindungen von Änderungsstreams zu Kafka herstellen

Auf dieser Seite wird erläutert, wie Sie mit dem Kafka-Connector Spanner-Änderungsstreams-Daten abrufen und weiterleiten.

Wichtige Konzepte

Im Folgenden werden die Hauptkonzepte des Kafka-Connectors beschrieben.

Debezium

Debezium ist ein Open-Source-Projekt, das eine Datenstreamingplattform mit niedriger Latenz für die Erfassung von Änderungsdaten bietet.

Kafka-Connector

Der Kafka-Connector bietet eine Abstraktion über die Spanner API, um Spanner-Änderungsstreams in Kafka zu veröffentlichen. Mit diesem Connector müssen Sie den Lebenszyklus der Partitionen von Änderungsstreams nicht verwalten. Das ist bei der direkten Verwendung der Spanner API erforderlich.

Der Kafka-Connector generiert für jeden Datenänderungsdatensatz ein Änderungsereignis und sendet Änderungsereignisdatensätze für jede Tabelle, die über den Änderungsstream erfasst wird, an ein separates Kafka-Thema. Ein Datenänderungsdatensatz-mod stellt eine einzelne Änderung (Einfügen, Aktualisieren oder Löschen) dar, die erfasst wurde. Ein einzelner Datenänderungseintrag kann mehrere Modifikationen enthalten.

Kafka-Connector-Ausgabe

Der Kafka-Connector leitet Änderungsstreams direkt an ein separates Kafka-Thema weiter. Der Name des Ausgabethemas sollte connector_name.table_name sein. Wenn das Thema nicht vorhanden ist, erstellt der Kafka-Connector automatisch ein Thema mit diesem Namen.

Sie können auch Transformationen für die Themenweiterleitung konfigurieren, um Einträge an von Ihnen angegebene Themen weiterzuleiten. Wenn du die Themenweiterleitung verwenden möchtest, deaktiviere die Funktion Niedriger Wasserstand.

Reihenfolge der Einträge

Die Datensätze werden in den Kafka-Topics nach Commit-Zeitstempel pro Primärschlüssel sortiert. Für Einträge mit unterschiedlichen Primärschlüsseln gibt es keine Garantie für die Reihenfolge. Datensätze mit demselben Primärschlüssel werden in derselben Kafka-Themenpartition gespeichert. Wenn Sie ganze Transaktionen verarbeiten möchten, können Sie auch die Felder server_transaction_id und number_of_records_in_transaction des Datenänderungsdatensatzes verwenden, um eine Spanner-Transaktion zusammenzustellen.

Änderungsereignisse

Der Kafka-Connector generiert für jeden INSERT-, UPDATE- und DELETE-Vorgang ein Datenänderungsereignis. Jedes Ereignis enthält einen Schlüssel und Werte für die geänderte Zeile.

Mit Kafka Connect-Konvertern können Sie Datenänderungsereignisse im Protobuf-, AVRO-, JSON- oder JSON Schemaless-Format generieren. Wenn Sie einen Kafka Connect-Konverter verwenden, der Schemas generiert, enthält das Ereignis separate Schemas für den Schlüssel und die Werte. Andernfalls enthält das Ereignis nur den Schlüssel und die Werte.

Das Schema für den Schlüssel ändert sich nie. Das Schema für die Werte ist eine Zusammenführung aller Spalten, die seit dem Start des Connectors im Änderungsstream erfasst wurden.

Wenn Sie den Connector so konfigurieren, dass JSON-Ereignisse generiert werden, enthält das Ausgabeänderungsereignis fünf Felder:

  • Das erste schema-Feld gibt ein Kafka Connect-Schema an, das das Spanner-Schlüsselschema beschreibt.

  • Das erste payload-Feld hat die Struktur, die durch das vorherige schema-Feld beschrieben wird, und enthält den Schlüssel für die geänderte Zeile.

  • Das zweite schema-Feld gibt das Kafka Connect-Schema an, das das Schema für die geänderte Zeile beschreibt.

  • Das zweite payload-Feld hat die Struktur, die im vorherigen schema-Feld beschrieben wurde, und enthält die tatsächlichen Daten für die geänderte Zeile.

  • Das Feld source ist ein Pflichtfeld, das die Quellmetadaten für das Ereignis beschreibt.

Das folgende Beispiel zeigt ein Datenänderungsereignis:

{
  // The schema for the Spanner key.
  "schema": {
    "type": "struct",
    "name": "customers.Key",
    "optional": false,
    "fields": [
      {
        "type": "int64",
        "optional": "false"
        "field": "false"
      }
    ]
  },
  // The value of the Spanner key.
  "payload": {
      "id": "1"
  },
  // The schema for the payload, which contains the before and after values
  // of the changed row. The schema for the payload contains all the
  // columns that the change stream has tracked since the connector start
  // time.
  "schema": { 
    "type": "struct",
    "fields": [
      {
        // The schema for the before values of the changed row.
        "type": "struct",
        "fields": [
            {
                "type": "int32",
                "optional": false,
                "field": "id"
            },
            {
                "type": "string",
                "optional": true,
                "field": "first_name"
            }
        ],
        "optional": true,
        "name": "customers.Value",
        "field": "before"
      },
      {
        // The schema for the after values of the changed row.
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          }
        ],
          "optional": true,
          "name": "customers.Value",
          "field": "after"
        },
        {
          // The schema for the source metadata for the event.
          "type": "struct",
          "fields": [
            {
                "type": "string",
                "optional": false,
                "field": "version"
            },
            {
                "type": "string",
                "optional": false,
                "field": "connector"
            },
            {
                "type": "string",
                "optional": false,
                "field": "name"
            },
            {
                "type": "int64",
                "optional": false,
                "field": "ts_ms"
            },
            {
                "type": "boolean",
                "optional": true,
                "default": false,
                "field": "snapshot"
            },
            {
                "type": "string",
                "optional": false,
                "field": "db"
            },
            {
                "type": "string",
                "optional": false,
                "field": "sequence"
            },
            {
                "type": "string",
                "optional": false,
                "field": "project_id"
            },
            {
                "type": "string",
                "optional": false,
                "field": "instance_id"
            },
            {
                "type": "string",
                "optional": false,
                "field": "database_id"
            },
            {
                "type": "string",
                "optional": false,
                "field": "change_stream_name"
            },
            {
                "type": "string",
                "optional": true,
                "field": "table"
            }
            {
                "type": "string",
                "optional": true,
                "field": "server_transaction_id"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "low_watermark"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "read_at_timestamp"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "number_of_records_in_transaction"
            }
            {
                "type": "string",
                "optional": true,
                "field": "transaction_tag"
            }
            {
                "type": "boolean",
                "optional": true,
                "field": "system_transaction"
            }
            {
                "type": "string",
                "optional": true,
                "field": "value_capture_type"
            }
            {
                "type": "string",
                "optional": true,
                "field": "partition_token"
            }
            {
                "type": "int32",
                "optional": true,
                "field": "mod_number"
            }
            {
                "type": "boolean",
                "optional": true,
                "field": "is_last_record_in_transaction_in_partition"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "number_of_partitions_in_transaction"
            }
          ],
          "optional": false,
          "name": "io.debezium.connector.spanner.Source",
          "field": "source"
        },
      ]
      {
        "type": "string",
        "optional": false,
        "field": "op"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
      }
    ],
    "optional": false,
    "name": "connector_name.customers.Envelope"
  },
  "payload": {
    // The values of the row before the event.
    "before": null,
    // The values of the row after the event.
    "after": { 
        "id": 1,
        "first_name": "Anne",
    }
  },
  // The source metadata.
  "source": {
    "version": "{debezium-version}",
    "connector": "spanner",
    "name": "spanner_connector",
    "ts_ms": 1670955531785,
    "snapshot": "false",
    "db": "database",
    "sequence": "1",
    "project_id": "project",
    "instance_id": "instance",
    "database_id": "database",
    "change_stream_name": "change_stream",
    "table": "customers",
    "server_transaction_id": "transaction_id",
    "low_watermark": 1670955471635,
    "read_at_timestamp": 1670955531791,
    "number_records_in_transaction": 2,
    "transaction_tag": "",
    "system_transaction": false,
    "value_capture_type": "OLD_AND_NEW_VALUES",
    "partition_token": "partition_token",
    "mod_number": 0,
    "is_last_record_in_transaction_in_partition": true,
    "number_of_partitions_in_transaction": 1
  },
  "op": "c", 
  "ts_ms": 1559033904863 //
}

Niedriges Wasserzeichen

Das untere Wasserzeichen beschreibt die Zeit T, zu der der Kafka-Connector garantiert alle Ereignisse mit einem Zeitstempel < T gestreamt und in einem Kafka-Thema veröffentlicht hat.

Sie können den niedrigen Wasserstand im Kafka-Connector mit dem Parameter gcp.spanner.low-watermark.enabled aktivieren. Dieser Parameter ist standardmäßig deaktiviert. Wenn das Low-Watermark aktiviert ist, wird das Feld low_watermark im Datenänderungseintrag des Änderungsstreams mit dem aktuellen Low-Watermark-Zeitstempel des Kafka-Connectors ausgefüllt.

Wenn keine Datensätze generiert werden, sendet der Kafka-Connector regelmäßig „Herzschläge“ in Form von Wasserzeichen an die vom Connector erkannten Kafka-Ausgabethemen.

Diese Herztöne für Wasserzeichen sind Einträge, die mit Ausnahme des Felds low_watermark leer sind. Sie können dann den niedrigen Grenzwert verwenden, um zeitbasierte Aggregationen durchzuführen. Sie können beispielsweise den niedrigen Grenzwert verwenden, um Ereignisse nach Commit-Zeitstempeln über Primärschlüssel zu sortieren.

Metadatenthemen

Sowohl der Kafka-Connector als auch das Kafka Connect-Framework erstellen mehrere Metadatenthemen zum Speichern von Connector-bezogenen Informationen. Es wird nicht empfohlen, die Konfiguration oder den Inhalt dieser Metadatenthemen zu ändern.

Die folgenden Themen beziehen sich auf Metadaten:

  • _consumer_offsets: Ein Thema, das automatisch von Kafka erstellt wird. Hier werden die Consumer-Offsets für im Kafka-Connector erstellte Consumer gespeichert.
  • _kafka-connect-offsets: Ein Thema, das automatisch von Kafka Connect erstellt wird. Hier werden die Anschlüsse gespeichert.
  • _sync_topic_spanner_connector_connectorname: Ein Thema, das vom Connector automatisch erstellt wird. Hier werden Metadaten zu Änderungsstream-Partitionen gespeichert.
  • _rebalancing_topic_spanner_connector_connectorname: Ein Thema, das vom Connector automatisch erstellt wird. Wird verwendet, um die Aktivität der Connector-Aufgabe zu bestimmen.
  • _debezium-heartbeat.connectorname: Ein Thema, das zum Verarbeiten von Spanner-Änderungsstream-Heartbeats verwendet wird.

Kafka-Connector-Laufzeit

Im Folgenden wird die Laufzeit des Kafka-Connectors beschrieben.

Skalierbarkeit

Der Kafka-Connector ist horizontal skalierbar und wird auf einer oder mehreren Aufgaben ausgeführt, die auf mehrere Kafka Connect-Worker verteilt sind.

Garantien für Nachrichtenübermittlung

Der Kafka-Connector unterstützt die Garantie für eine mindestens einmalige Zustellung.

Fehlertoleranz

Der Kafka-Connector ist fehlertolerant. Wenn der Kafka-Connector Änderungen liest und Ereignisse generiert, wird für jede Partition des Änderungsstreams der letzte verarbeitete Commit-Zeitstempel aufgezeichnet. Wenn der Kafka-Connector aus irgendeinem Grund beendet wird (z. B. aufgrund von Kommunikationsfehlern, Netzwerkproblemen oder Softwarefehlern), wird nach dem Neustart der Streaming von Datensätzen an der Stelle fortgesetzt, an der er unterbrochen wurde.

Der Kafka-Connector liest das Informationsschema zum Startzeitstempel des Kafka-Connectors, um Schemainformationen abzurufen. Standardmäßig kann Spanner das Informationsschema nicht mit Lesezeitstempeln vor der Aufbewahrungsdauer der Version lesen, die standardmäßig eine Stunde beträgt. Wenn Sie den Connector früher als vor einer Stunde starten möchten, müssen Sie die Versionsaufbewahrungsdauer der Datenbank verlängern.

Kafka-Connector einrichten

Änderungsstream erstellen

Weitere Informationen zum Erstellen eines Änderungsstreams finden Sie unter Änderungsstream erstellen. Damit Sie mit den nächsten Schritten fortfahren können, ist eine Spanner-Instanz mit einem konfigurierten Änderungsstream erforderlich.

Wenn Sie bei jedem Datenänderungsereignis sowohl geänderte als auch unveränderte Spalten zurückgeben möchten, verwenden Sie den Werterfassungstyp NEW_ROW. Weitere Informationen finden Sie unter Art der Datenerhebung.

JAR-Datei des Kafka-Connectors installieren

Nachdem Zookeeper, Kafka und Kafka Connect installiert sind, müssen Sie nur noch das Plug-in-Archiv des Connectors herunterladen, die JAR-Dateien in Ihre Kafka Connect-Umgebung extrahieren und das Verzeichnis mit den JAR-Dateien zu plugin.path von Kafka Connect hinzufügen. Sie müssen dann den Kafka Connect-Prozess neu starten, damit die neuen JAR-Dateien übernommen werden.

Wenn Sie mit unveränderlichen Containern arbeiten, können Sie Images aus den Container-Images von Debezium für Zookeeper, Kafka und Kafka Connect abrufen. Auf dem Kafka Connect-Image ist der Spanner-Connector vorinstalliert.

Weitere Informationen zum Installieren von Debezium-basierten Kafka-Connector-JARs finden Sie unter Debezium installieren.

Kafka-Connector konfigurieren

Im Folgenden finden Sie ein Beispiel für die Konfiguration eines Kafka-Connectors, der eine Verbindung zu einem Änderungsstream namens changeStreamAll in der Datenbank users in der Instanz test-instance und dem Projekt test-project herstellt.

"name": "spanner-connector",
"config": {
    "connector.class": "io.debezium.connector.spanner.SpannerConnector",
    "gcp.spanner.project.id": "test-project",
    "gcp.spanner.instance.id": "test-instance",
    "gcp.spanner.database.id": "users",
    "gcp.spanner.change.stream": "changeStreamAll",
    "gcp.spanner.credentials.json": "{"client_id": user@example.com}",
    "gcp.spanner.database.role": "cdc-role",
    "tasks.max": "10"
}

Diese Konfiguration enthält Folgendes:

  • Der Name des Connectors, wenn er bei einem Kafka Connect-Dienst registriert ist.

  • Der Name dieser Spanner-Connector-Klasse.

  • Die Projekt-ID.

  • Die Spanner-Instanz-ID.

  • Die Spanner-Datenbank-ID.

  • Der Name des Änderungsstreams.

  • Das JSON-Objekt für den Dienstkontoschlüssel.

  • Optional: Die zu verwendende Spanner-Datenbankrolle.

  • Die maximale Anzahl von Aufgaben.

Eine vollständige Liste der Connector-Eigenschaften finden Sie unter Konfigurationseigenschaften des Kafka-Connectors.

Connectorkonfiguration zu Kafka Connect hinzufügen

So starten Sie einen Spanner-Connector:

  1. Erstellen Sie eine Konfiguration für den Spanner-Connector.

  2. Verwenden Sie die Kafka Connect REST API, um diese Connectorkonfiguration Ihrem Kafka Connect-Cluster hinzuzufügen.

Sie können diese Konfiguration mit einem POST-Befehl an einen laufenden Kafka Connect-Dienst senden. Standardmäßig wird der Kafka Connect-Dienst an Port 8083 ausgeführt. Der Dienst zeichnet die Konfiguration auf und startet die Connector-Aufgabe, die eine Verbindung zur Spanner-Datenbank herstellt und Änderungsereignisdatensätze an Kafka-Themen streamt.

Hier ist ein Beispiel für einen POST-Befehl:

POST /connectors HTTP/1.1
Host: http://localhost:8083
Accept: application/json
{
  "name": "spanner-connector"
  "config": {
      "connector.class": "io.debezium.connector.spanner.SpannerConnector",
      "gcp.spanner.project.id": "test-project",
      "gcp.spanner.instance.id": "test-instance",
      "gcp.spanner.database.id": "users",
      "gcp.spanner.change.stream": "changeStreamAll",
      "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
      "heartbeat.interval.ms": "100",
      "tasks.max": "10"
  }
}

Beispiel für eine erfolgreiche Antwort:

HTTP/1.1 201 Created
Content-Type: application/json
{
    "name": "spanner-connector",
    "config": {
        "connector.class": "io.debezium.connector.spanner.SpannerConnector",
        "gcp.spanner.project.id": "test-project",
        "gcp.spanner.instance.id": "test-instance",
        "gcp.spanner.database.id": "users",
        "gcp.spanner.change.stream": "changeStreamAll",
        "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
        "heartbeat.interval.ms": "100",
        "tasks.max": "10"
    },
    "tasks": [
        { "connector": "spanner-connector", "task": 1 },
        { "connector": "spanner-connector", "task": 2 },
        { "connector": "spanner-connector", "task": 3 }
    ]
}

Konfiguration des Kafka-Connectors aktualisieren

Um die Connectorkonfiguration zu aktualisieren, senden Sie einen PUT-Befehl mit demselben Connectornamen an den laufenden Kafka Connect-Dienst.

Angenommen, wir haben einen Connector mit der Konfiguration aus dem vorherigen Abschnitt ausgeführt. Hier ist ein Beispiel für einen PUT-Befehl:

PUT /connectors/spanner-connector/config HTTP/1.1
Host: http://localhost:8083
Accept: application/json
{
    "connector.class": "io.debezium.connector.spanner.SpannerConnector",
    "gcp.spanner.project.id": "test-project",
    "gcp.spanner.instance.id": "test-instance",
    "gcp.spanner.database.id": "users",
    "gcp.spanner.change.stream": "changeStreamAll",
    "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
    "heartbeat.interval.ms": "100",
    "tasks.max": "10"
}

Beispiel für eine erfolgreiche Antwort:

HTTP/1.1 200 OK
Content-Type: application/json
{
    "connector.class": "io.debezium.connector.spanner.SpannerConnector",
    "tasks.max": "10",
    "gcp.spanner.project.id": "test-project",
    "gcp.spanner.instance.id": "test-instance",
    "gcp.spanner.database.id": "users",
    "gcp.spanner.change.stream": "changeStreamAll",
    "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
    "heartbeat.interval.ms": "100",
    "tasks.max": "10"
}

Kafka-Connector beenden

Wenn Sie den Connector beenden möchten, senden Sie einen DELETE-Befehl an den laufenden Kafka Connect-Dienst mit demselben Connectornamen.

Angenommen, wir haben einen Connector mit der Konfiguration aus dem vorherigen Abschnitt ausgeführt. Hier ist ein Beispiel für einen DELETE-Befehl:

DELETE /connectors/spanner-connector HTTP/1.1
Host: http://localhost:8083

Beispiel für eine erfolgreiche Antwort:

HTTP/1.1 204 No Content

Kafka-Connector überwachen

Zusätzlich zu den standardmäßigen Kafka Connect- und Debezium-Messwerten exportiert der Kafka-Connector eigene Messwerte:

  • MilliSecondsLowWatermark: Der aktuelle niedrigere Grenzwert der Connector-Aufgabe in Millisekunden. Das untere Wasserzeichen beschreibt die Zeit T, zu der der Connector garantiert alle Ereignisse mit einem Zeitstempel < T gestreamt hat.

  • MilliSecondsLowWatermarkLag: Die Verzögerung des Low-Watermarks hinter der aktuellen Zeit in Millisekunden. Alle Ereignisse mit Zeitstempel < T wurden gestreamt.

  • LatencyLowWatermark<Variant>MilliSeconds: Die Verzögerung des niedrigen Grenzwerts gegenüber der aktuellen Zeit in Millisekunden. Es werden die Varianten „P50“, „P95“, „P99“, „Durchschnitt“, „Minimum“ und „Maximum“ bereitgestellt.

  • LatencySpanner<Variant>MilliSeconds: Die Latenz zwischen dem Spanner-Commit-Zeitstempel und dem Connector-Lesevorgang. Es werden die Varianten „P50“, „P95“, „P99“, „Durchschnitt“, „Minimum“ und „Maximum“ bereitgestellt.

  • LatencyReadToEmit<Variant>MilliSeconds: Die Latenz zwischen Spanner-Lesezeitstempel und Connector-Emitting. Es werden die Varianten „P50“, „P95“, „P99“, „Durchschnitt“, „Minimum“ und „Maximum“ bereitgestellt.

  • LatencyCommitToEmit<Variant>tMilliSeconds: Die Latenz zwischen Spanner-Commit-Zeitstempel und Connector-Emitting. Es werden die Varianten „P50“, „P95“, „P99“, „Durchschnitt“, „Minimum“ und „Maximum“ bereitgestellt.

  • LatencyCommitToPublish<Variant>MilliSeconds: Die Latenz zwischen dem Spanner-Commit-Zeitstempel und dem Kafka-Veröffentlichungszeitstempel. Es werden die Varianten „P50“, „P95“, „P99“, „Durchschnitt“, „Minimum“ und „Maximum“ bereitgestellt.

  • NumberOfChangeStreamPartitionsDetected: Die Gesamtzahl der Partitionen, die vom aktuellen Connector-Task erkannt wurden.

  • NumberOfChangeStreamQueriesIssued: Die Gesamtzahl der Änderungsstream-Abfragen, die von der aktuellen Aufgabe gestellt wurden.

  • NumberOfActiveChangeStreamQueries: Die aktive Anzahl von Änderungsstream-Abfragen, die von der aktuellen Connector-Aufgabe erkannt wurden.

  • SpannerEventQueueCapacity: Die Gesamtkapazität von StreamEventQueue, einer Warteschlange, in der Elemente gespeichert werden, die aus Änderungsstreamabfragen empfangen wurden.

  • SpannerEventQueueCapacity: Die verbleibende StreamEventQueue-Kapazität.

  • TaskStateChangeEventQueueCapacity: Die Gesamtkapazität von TaskStateChangeEventQueue, einer Warteschlange, in der Ereignisse gespeichert werden, die im Connector auftreten.

  • RemainingTaskStateChangeEventQueueCapacity: Die verbleibende TaskStateChangeEventQueue-Kapazität.

  • NumberOfActiveChangeStreamQueries: Die aktive Anzahl von Änderungsstream-Abfragen, die von der aktuellen Connector-Aufgabe erkannt wurden.

Konfigurationseigenschaften des Kafka-Connectors

Die folgenden Konfigurationseigenschaften sind für den Connector erforderlich:

  • name: Eindeutiger Name für den Connector. Ein erneuter Versuch, sich mit demselben Namen zu registrieren, schlägt fehl. Diese Property ist für alle Kafka Connect-Connectors erforderlich.

  • connector.class: Der Name der Java-Klasse für den Connector. Verwenden Sie für den Kafka-Connector immer den Wert io.debezium.connector.spanner.SpannerConnector.

  • tasks.max: Die maximale Anzahl von Aufgaben, die für diesen Connector erstellt werden sollen.

  • gcp.spanner.project.id: die Projekt-ID

  • gcp.spanner.instance.id: Die Spanner-Instanz-ID

  • gcp.spanner.database.id: Die Spanner-Datenbank-ID

  • gcp.spanner.change.stream: Der Name des Spanner-Änderungsstreams

  • gcp.spanner.credentials.json: Das JSON-Objekt des Dienstkontoschlüssels.

  • gcp.spanner.credentials.path: Der Dateipfad zum JSON-Objekt des Dienstkontoschlüssels. Erforderlich, wenn das obige Feld nicht angegeben ist.

  • gcp.spanner.database.role : Die zu verwendende Spanner-Datenbankrolle. Das ist nur erforderlich, wenn der Änderungsstream mit einer detaillierten Zugriffssteuerung gesichert ist. Die Datenbankrolle muss die Berechtigung SELECT für den Änderungsstream und die Berechtigung EXECUTE für die Lesefunktion des Änderungsstreams haben. Weitere Informationen finden Sie unter Detaillierte Zugriffssteuerung für Änderungsstreams.

Die folgenden erweiterten Konfigurationseigenschaften haben Standardwerte, die in den meisten Fällen funktionieren und daher selten in der Konfiguration des Connectors angegeben werden müssen:

  • gcp.spanner.low-watermark.enabled: Gibt an, ob das niedrige Wasserzeichen für den Connector aktiviert ist. Der Standardwert ist "False".

  • gcp.spanner.low-watermark.update-period.ms: Das Intervall, in dem das Low-Watermark aktualisiert wird. Der Standardwert ist 1.000 ms.

  • heartbeat.interval.ms: Das Spanner-Pulsintervall. Der Standardwert ist 300000 (fünf Minuten).

  • gcp.spanner.start.time: Die Startzeit des Connectors. Standardmäßig ist die aktuelle Uhrzeit festgelegt.

  • gcp.spanner.end.time: Die Endzeit des Connectors. Die Standardeinstellung ist „Unendlich“.

  • tables.exclude.list: Die Tabellen, für die Änderungsereignisse ausgeschlossen werden sollen. Die Standardeinstellung ist leer.

  • tables.include.list: Die Tabellen, für die Änderungsereignisse eingeschlossen werden sollen. Wenn das Feld nicht ausgefüllt ist, werden alle Tabellen einbezogen. Die Standardeinstellung ist leer.

  • gcp.spanner.stream.event.queue.capacity: Die Kapazität der Spanner-Ereigniswarteschlange. Die Standardeinstellung ist 10.000.

  • connector.spanner.task.state.change.event.queue.capacity: Die Kapazität der Warteschlange für Ereignisse zur Änderung des Aufgabenstatus. Standardeinstellung ist 1.000.

  • connector.spanner.max.missed.heartbeats: Die maximale Anzahl von verpassten Heartbeats für eine Änderungsstreamabfrage, bevor eine Ausnahme ausgelöst wird. Der Standardwert ist 10.

  • scaler.monitor.enabled: Gibt an, ob die Aufgaben-Autoscaling-Funktion aktiviert ist. Die Standardeinstellung ist "false".

  • tasks.desired.partitions: Die bevorzugte Anzahl von Änderungsstream-Partitionen pro Aufgabe. Dieser Parameter ist für die automatische Skalierung von Aufgaben erforderlich. Die Standardeinstellung ist 2.

  • tasks.min: Die Mindestanzahl von Aufgaben. Dieser Parameter ist für die automatische Skalierung von Aufgaben erforderlich. Der Standardfaktor ist 1.

  • connector.spanner.sync.topic: Der Name des Synchronisierungs-Themas, ein internes Connector-Thema, das zum Speichern der Kommunikation zwischen Aufgaben verwendet wird. Wenn der Nutzer keinen Namen angegeben hat, wird standardmäßig _sync_topic_spanner_connector_connectorname verwendet.

  • connector.spanner.sync.poll.duration: Die Abfragedauer für das Synchronisierungsthema. Die Standardeinstellung ist 500 ms.

  • connector.spanner.sync.request.timeout.ms: Das Zeitlimit für Anfragen an das Synchronisierungsthema. Die Standardeinstellung ist 5.000 ms.

  • connector.spanner.sync.delivery.timeout.ms: Das Zeitlimit für die Veröffentlichung im Synchronisierungsthema. Die Standardeinstellung ist 15.000 ms.

  • connector.spanner.sync.commit.offsets.interval.ms: Das Intervall, in dem Offsets für das Synchronisierungsthema verbindlich festgelegt werden. Die Standardeinstellung ist 60.000 ms.

  • connector.spanner.sync.publisher.wait.timeout: Das Intervall, in dem Nachrichten im Synchronisierungsthema veröffentlicht werden. Die Standardeinstellung ist 5 ms.

  • connector.spanner.rebalancing.topic: Der Name des Themas für die Neuausrichtung. Das Thema „Rebalancing“ ist ein internes Connector-Thema, mit dem die Aktivität von Aufgaben ermittelt wird. Wenn der Nutzer keinen Namen angegeben hat, wird standardmäßig _rebalancing_topic_spanner_connector_connectorname verwendet.

  • connector.spanner.rebalancing.poll.duration: Die Umfragedauer für das Thema „Neuausrichtung“. Die Standardeinstellung ist 5.000 ms.

  • connector.spanner.rebalancing.commit.offsets.timeout: Das Zeitlimit für das Festschreiben von Abweichungen für das Thema zur Neuausrichtung. Die Standardeinstellung ist 5.000 ms.

  • connector.spanner.rebalancing.commit.offsets.interval.ms: Das Intervall, in dem Offsets für das Synchronisierungsthema verbindlich festgelegt werden. Die Standardeinstellung ist 60.000 ms.

  • connector.spanner.rebalancing.task.waiting.timeout: Die Dauer, die eine Aufgabe wartet, bevor ein Ereignis zur Neuausrichtung verarbeitet wird. Die Standardeinstellung ist 1.000 ms.

Eine noch detailliertere Liste der konfigurierbaren Connector-Eigenschaften finden Sie im GitHub-Repository.

Beschränkungen