Verbindung zu Kafka für Änderungsstreams erstellen

Auf dieser Seite wird erläutert, wie Sie mit dem Kafka-Connector und leiten Spanner-Änderungsstreams-Daten weiter.

Wichtige Konzepte

Im Folgenden werden die Kernkonzepte des Kafka-Connectors beschrieben.

Debezium

Debezium ist ein Open-Source-Projekt, eine Datenstreamingplattform mit niedriger Latenz zur Erfassung von Change Data Capture.

Kafka-Connector

Der Kafka-Connector bietet eine Abstraktion über die Spanner API, Spanner-Änderungsstreams in Kafka veröffentlichen. Bei diesem Connector müssen Sie den Lebenszyklus der Änderungsstreams nicht verwalten, Dies ist erforderlich, wenn Sie die Spanner API direkt verwenden.

Der Kafka-Connector erstellt ein Änderungsereignis für jeden Modifikator in Form von Datenänderungseinträgen und sendet Änderungsereigniseinträge nachgelagert in ein separates Kafka-Thema für jede Tabelle, die den Änderungsstream verfolgt. Ein Mod für den Datensatz für eine Änderung steht für eine einzelne erfasste Änderung (Einfügen, Aktualisieren oder Löschen). Ein einzelner Datensatz für die Datenänderung kann mehr als einen Mod enthalten.

Ausgabe des Kafka-Connectors

Der Kafka-Connector leitet Änderungsstreams direkt weiter. separaten Kafka-Thema. Der Name des Ausgabethemas sollte connector_name.table_name lauten. Wenn das Thema nicht vorhanden ist, erstellt der Kafka-Connector automatisch ein Thema unter diesem Namen.

Sie können auch Themen-Routing-Transformationen so konfigurieren, dass Einträge an die von Ihnen angegebenen Themen weitergeleitet werden. Wenn Sie die Weiterleitung nach Themen verwenden möchten, deaktivieren Sie die Funktion Niedriges Wasserzeichen.

Datensatzsortierung

Datensätze werden nach dem Commit-Zeitstempel pro Primärschlüssel in die Kafka-Themen. Datensätze, die zu verschiedenen Primärschlüsseln gehören, enthalten keine Bestellgarantien. Datensätze mit demselben Primärschlüssel werden im Kafka-Themenpartitionierung. Wenn Sie ganze Transaktionen verarbeiten möchten, können Sie auch die des Data Change Record server_transaction_id und number_of_records_in_transaction, um eine Spanner-Transaktion zusammenzustellen.

Änderungsereignisse

Der Kafka-Connector generiert ein Datenänderungsereignis für jede INSERT, UPDATE, und DELETE-Operation. Jedes Ereignis enthält einen Schlüssel und Werte für die geänderte Zeile.

Sie können Kafka Connect-Converter verwenden, um Datenänderungsereignisse in Protobuf, AVRO, JSON oder JSON Schemaless. Wenn Sie eine Kafka Connect-Konverter, der Schemas erstellt, enthält das Ereignis separate Schemas für Schlüssel und Werte. Andernfalls enthält der Termin nur Schlüssel und Werte.

Das Schema für den Schlüssel ändert sich nie. Das Schema für die Werte ist ein Zusammenführung aller Spalten, die der Änderungsstream seit dem letzten Connector-Startzeit.

Wenn Sie den Connector so konfigurieren, dass JSON-Ereignisse erzeugt werden, wird das Ausgabeänderungsereignis enthält fünf Felder:

  • Das erste Feld schema gibt ein Kafka Connect-Schema an, das das Spanner-Schlüsselschema beschreibt.

  • Das erste payload-Feld hat die im vorherigen schema-Feld beschriebene Struktur und enthält den Schlüssel für die geänderte Zeile.

  • Das zweite schema-Feld gibt das Kafka Connect-Schema an, das das Schema für die geänderte Zeile beschreibt.

  • Das zweite payload-Feld hat die im vorherigen schema-Feld beschriebene Struktur und enthält die tatsächlichen Daten für die geänderte Zeile.

  • Das Feld source ist ein Pflichtfeld, das die Quellmetadaten für das Ereignis beschreibt.

Hier ein Beispiel für ein Datenänderungsereignis:

{
  // The schema for the Spanner key.
  "schema": {
    "type": "struct",
    "name": "customers.Key",
    "optional": false,
    "fields": [
      {
        "type": "int64",
        "optional": "false"
        "field": "false"
      }
    ]
  },
  // The value of the Spanner key.
  "payload": {
      "id": "1"
  },
  // The schema for the payload, which contains the before and after values
  // of the changed row. The schema for the payload contains all the
  // columns that the change stream has tracked since the connector start
  // time.
  "schema": { 
    "type": "struct",
    "fields": [
      {
        // The schema for the before values of the changed row.
        "type": "struct",
        "fields": [
            {
                "type": "int32",
                "optional": false,
                "field": "id"
            },
            {
                "type": "string",
                "optional": true,
                "field": "first_name"
            }
        ],
        "optional": true,
        "name": "customers.Value",
        "field": "before"
      },
      {
        // The schema for the after values of the changed row.
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          }
        ],
          "optional": true,
          "name": "customers.Value",
          "field": "after"
        },
        {
          // The schema for the source metadata for the event.
          "type": "struct",
          "fields": [
            {
                "type": "string",
                "optional": false,
                "field": "version"
            },
            {
                "type": "string",
                "optional": false,
                "field": "connector"
            },
            {
                "type": "string",
                "optional": false,
                "field": "name"
            },
            {
                "type": "int64",
                "optional": false,
                "field": "ts_ms"
            },
            {
                "type": "boolean",
                "optional": true,
                "default": false,
                "field": "snapshot"
            },
            {
                "type": "string",
                "optional": false,
                "field": "db"
            },
            {
                "type": "string",
                "optional": false,
                "field": "sequence"
            },
            {
                "type": "string",
                "optional": false,
                "field": "project_id"
            },
            {
                "type": "string",
                "optional": false,
                "field": "instance_id"
            },
            {
                "type": "string",
                "optional": false,
                "field": "database_id"
            },
            {
                "type": "string",
                "optional": false,
                "field": "change_stream_name"
            },
            {
                "type": "string",
                "optional": true,
                "field": "table"
            }
            {
                "type": "string",
                "optional": true,
                "field": "server_transaction_id"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "low_watermark"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "read_at_timestamp"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "number_of_records_in_transaction"
            }
            {
                "type": "string",
                "optional": true,
                "field": "transaction_tag"
            }
            {
                "type": "boolean",
                "optional": true,
                "field": "system_transaction"
            }
            {
                "type": "string",
                "optional": true,
                "field": "value_capture_type"
            }
            {
                "type": "string",
                "optional": true,
                "field": "partition_token"
            }
            {
                "type": "int32",
                "optional": true,
                "field": "mod_number"
            }
            {
                "type": "boolean",
                "optional": true,
                "field": "is_last_record_in_transaction_in_partition"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "number_of_partitions_in_transaction"
            }
          ],
          "optional": false,
          "name": "io.debezium.connector.spanner.Source",
          "field": "source"
        },
      ]
      {
        "type": "string",
        "optional": false,
        "field": "op"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
      }
    ],
    "optional": false,
    "name": "connector_name.customers.Envelope"
  },
  "payload": {
    // The values of the row before the event.
    "before": null,
    // The values of the row after the event.
    "after": { 
        "id": 1,
        "first_name": "Anne",
    }
  },
  // The source metadata.
  "source": {
    "version": "{debezium-version}",
    "connector": "spanner",
    "name": "spanner_connector",
    "ts_ms": 1670955531785,
    "snapshot": "false",
    "db": "database",
    "sequence": "1",
    "project_id": "project",
    "instance_id": "instance",
    "database_id": "database",
    "change_stream_name": "change_stream",
    "table": "customers",
    "server_transaction_id": "transaction_id",
    "low_watermark": 1670955471635,
    "read_at_timestamp": 1670955531791,
    "number_records_in_transaction": 2,
    "transaction_tag": "",
    "system_transaction": false,
    "value_capture_type": "OLD_AND_NEW_VALUES",
    "partition_token": "partition_token",
    "mod_number": 0,
    "is_last_record_in_transaction_in_partition": true,
    "number_of_partitions_in_transaction": 1
  },
  "op": "c", 
  "ts_ms": 1559033904863 //
}

Niedriges Wasserzeichen

Das niedrige Wasserzeichen beschreibt den Zeitpunkt T, zu dem der Kafka-Connector garantiert alle Ereignisse mit dem Zeitstempel < gestreamt und an ein Kafka-Thema veröffentlicht hat. D.

Sie können das niedrige Wasserzeichen im Kafka-Connector über die gcp.spanner.low-watermark.enabled-Parameter. Dieser Parameter ist deaktiviert ist standardmäßig aktiviert. Wenn das niedrige Wasserzeichen aktiviert ist, wird das Feld low_watermark in den Änderungsstreamdaten verwendet Der Änderungseintrag wird mit dem aktuellen niedrigen Wasserzeichen des Kafka-Connectors gefüllt. Zeitstempel.

Wenn keine Datensätze erstellt werden, sendet der Kafka-Connector regelmäßig Wasserzeichen „Herzschläge“ zu den vom Connector erkannten Kafka-Ausgabethemen.

Diese Wasserzeichen-Heartbeats sind Datensätze, die mit Ausnahme der low_watermark. Sie können dann das niedrige Wasserzeichen verwenden, um zeitbasierte Aggregationen durchzuführen. Sie können z. B. das niedrige Wasserzeichen verwenden, um Ereignisse nach Commit zu sortieren. Zeitstempel für die Primärschlüssel.

Metadatenthemen

Mit dem Kafka-Connector und dem Kafka Connect-Framework werden Metadatenthemen, um Connector-bezogene Informationen zu speichern. Es ist nicht ratsam, die Konfiguration oder den Inhalt dieser Metadatenthemen ändern.

Im Folgenden sind die Metadatenthemen aufgeführt:

  • _consumer_offsets: Ein automatisch von Kafka erstelltes Thema. Speichert Nutzer-Offsets für Nutzer, die im Kafka-Connector erstellt wurden.
  • _kafka-connect-offsets: Ein automatisch von Kafka Connect erstelltes Thema. Speichert die Connector-Offsets.
  • _sync_topic_spanner_connector_connectorname: Ein vom Connector automatisch erstelltes Thema. Speichert Metadaten zu Änderungsstreampartitionen.
  • _rebalancing_topic_spanner_connector_connectorname: Ein vom Connector automatisch erstelltes Thema. Wird verwendet, um die Aktualität von Connector-Aufgaben zu bestimmen.
  • _debezium-heartbeat.connectorname: Ein Thema, das zum Verarbeiten von Spanner-Heartbeats des Änderungsstreams verwendet wird.

Laufzeit des Kafka-Connectors

Im Folgenden wird die Laufzeit des Kafka-Connectors beschrieben.

Skalierbarkeit

Der Kafka-Connector ist horizontal skalierbar und wird für eine oder mehrere Aufgaben ausgeführt, die auf mehrere Kafka Connect-Worker verteilt sind.

Garantien für Nachrichtenübermittlung

Der Kafka-Connector unterstützt die Garantie für mindestens einmalige Zustellung.

Fehlertoleranz

Der Kafka-Connector ist fehlertolerant. Da der Kafka-Connector Änderungen und Erzeugt Ereignisse, zeichnet er den letzten verarbeiteten Commit-Zeitstempel für jede Änderung auf. Streampartition. Wenn der Kafka-Connector aus irgendeinem Grund (z. B. Kommunikationsfehler, Netzwerkprobleme oder Softwarefehler), beim Neustart Der Kafka-Connector setzt das Streaming von Datensätzen dort fort, wo er zuletzt unterbrochen wurde.

Der Kafka-Connector liest das Informationsschema zu Beginn des Kafka-Connectors. Zeitstempel, um Schemainformationen abzurufen. Standardmäßig kann Spanner keine das Informationsschema bei Lesezeitstempeln vor dem Aufbewahrungsdauer, der standardmäßig auf eine Stunde eingestellt ist. Wenn Sie den Connector von vorn beginnen möchten, eine Stunde zurückliegen, müssen Sie die Versionsaufbewahrung der Datenbank erhöhen Punkt.

Kafka-Connector einrichten

Änderungsstream erstellen

Weitere Informationen zum Erstellen eines Änderungsstreams finden Sie unter Änderungsstream erstellen. Für die nächsten Schritte ist eine Spanner-Instanz mit einem konfigurierten Änderungsstream erforderlich.

Wenn Sie sowohl geänderte als auch unveränderte Spalten für jede Spalte zurückgeben möchten, verwenden Sie den Werterfassungstyp NEW_ROW. Weitere Informationen finden Sie unter Werterfassungstyp.

Installieren Sie die JAR-Datei für den Kafka-Connector

Wenn Zookeeper, Kafka und Kafka Connect installiert sind, müssen die restlichen Aufgaben zum Bereitstellen eines Kafka-Connectors heruntergeladen werden. Connector-Plug-in-Archiv aus, extrahieren Sie die JAR-Dateien in Ihre Kafka Connect-Umgebung und fügen Sie den mit den JAR-Dateien in die Kafka Connect-Datei plugin.path. Anschließend müssen Sie Ihren Kafka Connect-Prozess neu starten, um die neuen JAR-Dateien abzurufen.

Wenn Sie mit unveränderlichen Containern arbeiten, können Sie Images aus Container-Images von Debezium für Zookeeper, Kafka und Kafka Connect. Das Kafka Connect-Image hat Spanner-Connector ist vorinstalliert.

Weitere Informationen zum Installieren von Debezium-basierten Kafka-Connector-JARs finden Sie unter Debezium installieren

Kafka-Connector konfigurieren

Das folgende Beispiel zeigt die Konfiguration eines Kafka-Connectors: der mit einem Änderungsstream namens changeStreamAll im Datenbank users in Instanz test-instance und Projekt test-project.

"name": "spanner-connector",
"config": {
    "connector.class": "io.debezium.connector.spanner.SpannerConnector",
    "gcp.spanner.project.id": "test-project",
    "gcp.spanner.instance.id": "test-instance",
    "gcp.spanner.database.id": "users",
    "gcp.spanner.change.stream": "changeStreamAll",
    "gcp.spanner.credentials.json": "{"client_id": user@example.com}",
    "gcp.spanner.database.role": "cdc-role",
    "tasks.max": "10"
}

Diese Konfiguration enthält Folgendes:

  • Der Name des Connectors, wenn er bei einem Kafka Connect-Dienst registriert ist.

  • Der Name dieser Spanner-Connector-Klasse.

  • Die Projekt-ID.

  • Die Spanner-Instanz-ID.

  • Die Spanner-Datenbank-ID.

  • Der Name des Änderungsstreams.

  • Das JSON-Objekt für den Dienstkontoschlüssel.

  • (Optional) Die zu verwendende Spanner-Datenbankrolle.

  • Die maximale Anzahl von Tasks.

Eine vollständige Liste der Connector-Eigenschaften finden Sie unter Konfigurationseigenschaften des Kafka-Connectors.

Connector-Konfiguration zu Kafka Connect hinzufügen

So starten Sie einen Spanner-Connector:

  1. Erstellen Sie eine Konfiguration für den Spanner-Connector.

  2. Verwenden Sie dazu die Kafka Connect REST API. Connector-Konfiguration zu Ihrem Kafka Connect-Cluster.

Sie können diese Konfiguration mit einem POST-Befehl an ein laufendes Kafka Connect senden Service. Standardmäßig wird der Kafka Connect-Dienst auf Port 8083 ausgeführt. Der Dienst zeichnet die Konfiguration auf und startet die Connector-Aufgabe eine Verbindung zur Spanner-Datenbank herstellt und die Änderungsereignisse streamt. Kafka-Themen.

Hier ein Beispiel für einen POST-Befehl:

POST /connectors HTTP/1.1
Host: http://localhost:8083
Accept: application/json
{
  "name": "spanner-connector"
  "config": {
      "connector.class": "io.debezium.connector.spanner.SpannerConnector",
      "gcp.spanner.project.id": "test-project",
      "gcp.spanner.instance.id": "test-instance",
      "gcp.spanner.database.id": "users",
      "gcp.spanner.change.stream": "changeStreamAll",
      "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
      "heartbeat.interval.ms": "100",
      "tasks.max": "10"
  }
}

Beispiel für eine erfolgreiche Antwort:

HTTP/1.1 201 Created
Content-Type: application/json
{
    "name": "spanner-connector",
    "config": {
        "connector.class": "io.debezium.connector.spanner.SpannerConnector",
        "gcp.spanner.project.id": "test-project",
        "gcp.spanner.instance.id": "test-instance",
        "gcp.spanner.database.id": "users",
        "gcp.spanner.change.stream": "changeStreamAll",
        "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
        "heartbeat.interval.ms": "100",
        "tasks.max": "10"
    },
    "tasks": [
        { "connector": "spanner-connector", "task": 1 },
        { "connector": "spanner-connector", "task": 2 },
        { "connector": "spanner-connector", "task": 3 }
    ]
}

Konfiguration des Kafka-Connectors aktualisieren

Senden Sie zum Aktualisieren der Connector-Konfiguration einen PUT-Befehl an die laufende Kafka Connect-Dienst mit demselben Connector-Namen.

Angenommen, Sie haben einen Connector mit der Konfiguration von im vorherigen Abschnitt. Hier ein Beispiel für einen PUT-Befehl:

PUT /connectors/spanner-connector/config HTTP/1.1
Host: http://localhost:8083
Accept: application/json
{
    "connector.class": "io.debezium.connector.spanner.SpannerConnector",
    "gcp.spanner.project.id": "test-project",
    "gcp.spanner.instance.id": "test-instance",
    "gcp.spanner.database.id": "users",
    "gcp.spanner.change.stream": "changeStreamAll",
    "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
    "heartbeat.interval.ms": "100",
    "tasks.max": "10"
}

Beispiel für eine erfolgreiche Antwort:

HTTP/1.1 200 OK
Content-Type: application/json
{
    "connector.class": "io.debezium.connector.spanner.SpannerConnector",
    "tasks.max": "10",
    "gcp.spanner.project.id": "test-project",
    "gcp.spanner.instance.id": "test-instance",
    "gcp.spanner.database.id": "users",
    "gcp.spanner.change.stream": "changeStreamAll",
    "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
    "heartbeat.interval.ms": "100",
    "tasks.max": "10"
}

Kafka-Connector beenden

Um den Connector zu beenden, senden Sie einen DELETE-Befehl an den ausgeführten Kafka Connect-Dienst mit demselben Connector-Namen.

Angenommen, Sie haben einen Connector mit der Konfiguration von im vorherigen Abschnitt. Hier ein Beispiel für einen DELETE-Befehl:

DELETE /connectors/spanner-connector HTTP/1.1
Host: http://localhost:8083

Beispiel für eine erfolgreiche Antwort:

HTTP/1.1 204 No Content

Kafka-Connector überwachen

Neben den Standardmesswerten von Kafka Connect und Debezium exportiert der Kafka-Connector seine eigenen Messwerte:

  • MilliSecondsLowWatermark: Das aktuelle niedrige Wasserzeichen der Connector-Aufgabe in Millisekunden. Die ein niedriges Wasserzeichen beschreibt die Zeit T, zu der der Connector garantiert Alle Ereignisse mit dem Zeitstempel < gestreamt D

  • MilliSecondsLowWatermarkLag: Die Verzögerung des niedrigen Wasserzeichens hinter der aktuellen Zeit in Millisekunden. Alle Ereignisse mit dem Zeitstempel < gestreamt D

  • LatencyLowWatermark<Variant>MilliSeconds: die Verzögerung des niedrigen Wasserzeichens hinter der aktuellen Zeit in Millisekunden. Die Varianten P50, P95, P99, Durchschnitt, Min und Max werden angegeben.

  • LatencySpanner<Variant>MilliSeconds: Die Latenzzeit für das Lesen von Spanner-Commit-Zeitstempel-an-Connector-Lesevorgängen. Die Varianten P50, P95, P99, Durchschnitt, Min und Max werden angegeben.

  • LatencyReadToEmit<Variant>MilliSeconds: Die Latenz von Spanner-Lese-Zeitstempel-zu-Connector-Emit. Die Varianten P50, P95, P99, Durchschnitt, Min und Max werden angegeben.

  • LatencyCommitToEmit<Variant>tMilliSeconds: Die Latenz von Spanner-Commit-Zeitstempel-zu-Connector-Emit. Die Varianten P50, P95, P99, Durchschnitt, Min und Max werden angegeben.

  • LatencyCommitToPublish<Variant>MilliSeconds: Die Latenzzeit zwischen Spanner-Commit-Zeitstempel und Kafka-Veröffentlichungs-Zeitstempel. Die Varianten P50, P95, P99, Durchschnitt, Min und Max werden angegeben.

  • NumberOfChangeStreamPartitionsDetected: die Gesamtzahl der Partitionen von der aktuellen Connector-Aufgabe erkannt.

  • NumberOfChangeStreamQueriesIssued: Die Gesamtzahl der Änderungsstreamabfragen die von der aktuellen Aufgabe ausgegeben wird.

  • NumberOfActiveChangeStreamQueries: Die Anzahl der aktiven Änderungsstreams Abfragen, die von der aktuellen Connector-Aufgabe erkannt wurden.

  • SpannerEventQueueCapacity: Die Gesamtkapazität von StreamEventQueue, einer Warteschlange, in der Elemente gespeichert werden, die über Änderungsstreamabfragen empfangen wurden.

  • SpannerEventQueueCapacity: Die verbleibende StreamEventQueue-Kapazität.

  • TaskStateChangeEventQueueCapacity: Die Gesamtkapazität von TaskStateChangeEventQueue, einer Warteschlange, in der Ereignisse im Connector gespeichert werden.

  • RemainingTaskStateChangeEventQueueCapacity: Die verbleibende TaskStateChangeEventQueue-Kapazität.

  • NumberOfActiveChangeStreamQueries: Die Anzahl der aktiven Änderungsstreams Abfragen, die von der aktuellen Connector-Aufgabe erkannt wurden.

Konfigurationseigenschaften des Kafka-Connectors

Für den Connector sind die folgenden erforderlichen Konfigurationseigenschaften erforderlich:

  • name: Eindeutiger Name für den Connector. Ein erneuter Versuch mit demselben Namen führt zu einem Fehler. Dieses Attribut ist für alle Kafka Connect-Connectors erforderlich.

  • connector.class: Der Name der Java-Klasse für den Connector. Verwenden Sie für den Kafka-Connector immer den Wert io.debezium.connector.spanner.SpannerConnector.

  • tasks.max: Die maximale Anzahl von Aufgaben, die für diesen Connector erstellt werden sollen.

  • gcp.spanner.project.id: die Projekt-ID

  • gcp.spanner.instance.id: die Spanner-Instanz-ID

  • gcp.spanner.database.id: Die Spanner-Datenbank-ID

  • gcp.spanner.change.stream: Der Name des Spanner-Änderungsstreams

  • gcp.spanner.credentials.json: Das JSON-Objekt des Dienstkontoschlüssels.

  • gcp.spanner.credentials.path: Der Dateipfad zum JSON-Objekt des Dienstkontoschlüssels. Erforderlich, wenn das Feld oben nicht ausgefüllt ist.

  • gcp.spanner.database.role : Die Spanner-Datenbankrolle für verwenden. Dies ist nur erforderlich, wenn der Änderungsstream mit gesichert ist mit eine differenzierte Zugriffssteuerung. Die Datenbankrolle muss die Berechtigung SELECT für das Änderungsstream und die Berechtigung EXECUTE für den Lesevorgang des Änderungsstreams . Weitere Informationen finden Sie unter Detailgenaue Zugriffssteuerung für Änderungen Streams

Die folgenden erweiterten Konfigurationseigenschaften haben Standardeinstellungen, die in den meisten und müssen daher nur selten in der Konfiguration des Connectors angegeben werden:

  • gcp.spanner.low-watermark.enabled: Gibt an, ob das niedrige Wasserzeichen für den Connector aktiviert ist. Der Standardwert ist "False".

  • gcp.spanner.low-watermark.update-period.ms: Das Intervall, in dem das niedrige Wasserzeichen aktualisiert wird. Der Standardwert ist 1.000 ms.

  • heartbeat.interval.ms: Das Spanner-Heartbeat-Intervall. Der Standardwert ist 300.000 (fünf Minuten).

  • gcp.spanner.start.time: Die Startzeit des Connectors. Die Standardeinstellung ist die aktuelle Uhrzeit.

  • gcp.spanner.end.time: Ende des Connectors. Die Standardeinstellung ist unendlich.

  • tables.exclude.list: Die Tabellen, für die Änderungsereignisse ausgeschlossen werden sollen. Die Standardeinstellung ist leer.

  • tables.include.list: Die Tabellen, für die Änderungsereignisse enthalten sein sollen. Wenn das Feld nicht ausgefüllt ist, werden alle Tabellen eingeschlossen. Die Standardeinstellung ist leer.

  • gcp.spanner.stream.event.queue.capacity: Die Kapazität der Spanner-Ereigniswarteschlange. Die Standardeinstellung ist 10.000.

  • connector.spanner.task.state.change.event.queue.capacity: Die Kapazität der Ereigniswarteschlange der Aufgabenstatusänderung. Die Standardeinstellung ist 1.000.

  • connector.spanner.max.missed.heartbeats: Die maximale Anzahl verpasster Heartbeats für eine Änderungsstreamabfrage, bevor eine Ausnahme ausgelöst wird. Der Standardwert ist 10.

  • scaler.monitor.enabled: Gibt an, ob das Aufgaben-Autoscaling aktiviert ist. Die Standardeinstellung ist "false".

  • tasks.desired.partitions: Die bevorzugte Anzahl von Partitionen für Änderungsstreams pro Aufgabe. Dieser Parameter wird für das Aufgaben-Autoscaling benötigt. Die Standardeinstellung ist 2.

  • tasks.min: Die Mindestanzahl von Aufgaben. Dieser Parameter wird für das Aufgaben-Autoscaling benötigt. Der Standardfaktor ist 1.

  • connector.spanner.sync.topic: Der Name des Synchronisierungsthemas. Dabei handelt es sich um ein internes Connector-Thema, das zum Speichern der Kommunikation zwischen Aufgaben verwendet wird. Die Standardeinstellung ist _sync_topic_spanner_connector_connectorname, wenn der Nutzer keinen Namen angegeben hat.

  • connector.spanner.sync.poll.duration: Die Dauer der Abfrage für das Synchronisierungsthema. Die Standardeinstellung ist 500 ms.

  • connector.spanner.sync.request.timeout.ms: Das Zeitlimit für Anfragen an das Synchronisierungsthema. Die Standardeinstellung ist 5.000 ms.

  • connector.spanner.sync.delivery.timeout.ms: Das Zeitlimit für die Veröffentlichung im Synchronisierungsthema. Die Standardeinstellung ist 15.000 ms.

  • connector.spanner.sync.commit.offsets.interval.ms: Das Intervall, in dem Offsets für das Synchronisierungsthema per Commit übergeben werden. Die Standardeinstellung ist 60.000 ms.

  • connector.spanner.sync.publisher.wait.timeout: Das Intervall, in dem Nachrichten im Synchronisierungsthema veröffentlicht werden. Die Standardeinstellung ist 5 ms.

  • connector.spanner.rebalancing.topic: Der Name des Ausgleichsthemas. Das Ausgleichsthema ist ein internes Connector-Thema, mit dem die Aufgabenaktivität festgestellt wird. Die Standardeinstellung ist _rebalancing_topic_spanner_connector_connectorname, wenn der Nutzer keinen Namen angegeben hat.

  • connector.spanner.rebalancing.poll.duration: Die Abfragedauer für das Ausgleichsthema. Die Standardeinstellung ist 5.000 ms.

  • connector.spanner.rebalancing.commit.offsets.timeout: Das Zeitlimit für das Commit von Offsets für das Ausgleichsthema. Die Standardeinstellung ist 5.000 ms.

  • connector.spanner.rebalancing.commit.offsets.interval.ms: Das Intervall, in dem Offsets für das Synchronisierungsthema per Commit übergeben werden. Die Standardeinstellung ist 60.000 ms.

  • connector.spanner.rebalancing.task.waiting.timeout: Die Zeit, die eine Aufgabe wartet, bevor ein Ausgleichsereignis verarbeitet wird. Die Standardeinstellung ist 1.000 ms.

Eine noch detailliertere Liste der Eigenschaften des konfigurierbaren Connectors finden Sie im GitHub-Repository.

Beschränkungen