Creare connessioni con Kafka modifiche in tempo reale

Questa pagina spiega come utilizzare il connettore Kafka per consumare e inoltrare i dati dei stream di modifiche di Spanner.

Concetti principali

Di seguito vengono descritti i concetti fondamentali del connettore Kafka.

Debesio

Debezium è un progetto open source che fornisce una piattaforma di flussi di dati a bassa latenza per Change Data Capture (CDC).

Connettore Kafka

Il connettore Kafka fornisce un'astrazione sull'API Spanner per pubblicare modifiche in tempo reale di Spanner in Kafka. Con questo connettore, non devi gestire il ciclo di vita della partizione modifiche in tempo reale, che è necessario quando utilizzi direttamente l'API Spanner.

Il connettore Kafka produce un evento di modifica per ogni mod del record di modifiche dei dati e invia i record degli eventi di modifica a valle in un argomento Kafka separato per ogni tabella monitorata in tempo reale. Una mod del record di modifica dei dati rappresenta una singola modifica (inserimento, aggiornamento o eliminazione) acquisita. Un singolo record di modifica dei dati può contenere più di una mod.

Output del connettore Kafka

Il connettore Kafka inoltra i record modifiche in tempo reale direttamente in un argomento Kafka separato. Il nome dell'argomento di output deve essere connector_name.table_name. Se l'argomento non esiste, il connettore Kafka crea automaticamente un argomento con quel nome.

Puoi anche configurare le trasformazioni di routing degli argomenti per reindirizzare i record in argomenti da te specificati. Se vuoi utilizzare il routing degli argomenti, disattiva la funzionalità filigrana bassa.

Registra ordine

I record sono ordinati in base al timestamp di commit per chiave primaria negli argomenti Kafka. I record appartenenti a chiavi primarie diverse non hanno garanzie di ordine. I record con la stessa chiave primaria sono archiviati nella stessa partizione dell'argomento Kafka. Se vuoi elaborare intere transazioni, puoi utilizzare anche i campi del record di modifica dei dati server_transaction_id e number_of_records_in_transaction per assemblare una transazione Spanner.

Modifica eventi

Il connettore Kafka genera un evento di modifica dei dati per ogni operazione INSERT, UPDATE e DELETE. Ogni evento contiene una chiave e i valori per la riga modificata.

Puoi utilizzare i convertitori di Kafka Connect per generare eventi di modifica dei dati nei formati Protobuf, AVRO, JSON o JSON Schemaless. Se utilizzi un convertitore Kafka Connect che produce schemi, l'evento contiene schemi separati per la chiave e i valori. In caso contrario, l'evento contiene solo la chiave e i valori.

Lo schema della chiave non cambia mai. Lo schema per i valori è un'unione di tutte le colonne monitorate dalla modifica in tempo reale dall'ora di inizio del connettore.

Se configuri il connettore in modo che produca eventi JSON, l'evento di modifica dell'output contiene cinque campi:

  • Il primo campo schema specifica uno schema di Kafka Connect che descrive lo schema della chiave Spanner.

  • Il primo campo payload ha la struttura descritta dal campo schema precedente e contiene la chiave della riga che è stata modificata.

  • Il secondo campo schema specifica lo schema di Kafka Connect che descrive lo schema per la riga modificata.

  • Il secondo campo payload ha la struttura descritta dal campo schema precedente e contiene i dati effettivi per la riga che è stata modificata.

  • Il campo source è un campo obbligatorio che descrive i metadati di origine dell'evento.

Di seguito è riportato un esempio di evento di modifica dei dati:

{
  // The schema for the Spanner key.
  "schema": {
    "type": "struct",
    "name": "customers.Key",
    "optional": false,
    "fields": [
      {
        "type": "int64",
        "optional": "false"
        "field": "false"
      }
    ]
  },
  // The value of the Spanner key.
  "payload": {
      "id": "1"
  },
  // The schema for the payload, which contains the before and after values
  // of the changed row. The schema for the payload contains all the
  // columns that the change stream has tracked since the connector start
  // time.
  "schema": {
    "type": "struct",
    "fields": [
      {
        // The schema for the before values of the changed row.
        "type": "struct",
        "fields": [
            {
                "type": "int32",
                "optional": false,
                "field": "id"
            },
            {
                "type": "string",
                "optional": true,
                "field": "first_name"
            }
        ],
        "optional": true,
        "name": "customers.Value",
        "field": "before"
      },
      {
        // The schema for the after values of the changed row.
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          }
        ],
          "optional": true,
          "name": "customers.Value",
          "field": "after"
        },
        {
          // The schema for the source metadata for the event.
          "type": "struct",
          "fields": [
            {
                "type": "string",
                "optional": false,
                "field": "version"
            },
            {
                "type": "string",
                "optional": false,
                "field": "connector"
            },
            {
                "type": "string",
                "optional": false,
                "field": "name"
            },
            {
                "type": "int64",
                "optional": false,
                "field": "ts_ms"
            },
            {
                "type": "boolean",
                "optional": true,
                "default": false,
                "field": "snapshot"
            },
            {
                "type": "string",
                "optional": false,
                "field": "db"
            },
            {
                "type": "string",
                "optional": false,
                "field": "sequence"
            },
            {
                "type": "string",
                "optional": false,
                "field": "project_id"
            },
            {
                "type": "string",
                "optional": false,
                "field": "instance_id"
            },
            {
                "type": "string",
                "optional": false,
                "field": "database_id"
            },
            {
                "type": "string",
                "optional": false,
                "field": "change_stream_name"
            },
            {
                "type": "string",
                "optional": true,
                "field": "table"
            }
            {
                "type": "string",
                "optional": true,
                "field": "server_transaction_id"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "low_watermark"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "read_at_timestamp"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "number_of_records_in_transaction"
            }
            {
                "type": "string",
                "optional": true,
                "field": "transaction_tag"
            }
            {
                "type": "boolean",
                "optional": true,
                "field": "system_transaction"
            }
            {
                "type": "string",
                "optional": true,
                "field": "value_capture_type"
            }
            {
                "type": "string",
                "optional": true,
                "field": "partition_token"
            }
            {
                "type": "int32",
                "optional": true,
                "field": "mod_number"
            }
            {
                "type": "boolean",
                "optional": true,
                "field": "is_last_record_in_transaction_in_partition"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "number_of_partitions_in_transaction"
            }
          ],
          "optional": false,
          "name": "io.debezium.connector.spanner.Source",
          "field": "source"
        },
      ]
      {
        "type": "string",
        "optional": false,
        "field": "op"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
      }
    ],
    "optional": false,
    "name": "connector_name.customers.Envelope"
  },
  "payload": {
    // The values of the row before the event.
    "before": null,
    // The values of the row after the event.
    "after": {
        "id": 1,
        "first_name": "Anne",
    }
  },
  // The source metadata.
  "source": {
    "version": "{debezium-version}",
    "connector": "spanner",
    "name": "spanner_connector",
    "ts_ms": 1670955531785,
    "snapshot": "false",
    "db": "database",
    "sequence": "1",
    "project_id": "project",
    "instance_id": "instance",
    "database_id": "database",
    "change_stream_name": "change_stream",
    "table": "customers",
    "server_transaction_id": "transaction_id",
    "low_watermark": 1670955471635,
    "read_at_timestamp": 1670955531791,
    "number_records_in_transaction": 2,
    "transaction_tag": "",
    "system_transaction": false,
    "value_capture_type": "OLD_AND_NEW_VALUES",
    "partition_token": "partition_token",
    "mod_number": 0,
    "is_last_record_in_transaction_in_partition": true,
    "number_of_partitions_in_transaction": 1
  },
  "op": "c",
  "ts_ms": 1559033904863 //
}

Filigrana bassa

La filigrana bassa descrive il momento T in cui è garantito che il connettore Kafka abbia trasmesso in streaming e pubblicato in un argomento Kafka tutti gli eventi con timestamp < T.

Puoi attivare il livello di filigrana basso nel connettore Kafka utilizzando il parametro gcp.spanner.low-watermark.enabled. Questo parametro è disattivato per impostazione predefinita. Se il livello di filigrana basso è attivato, il campo low_watermark nel record delle modifiche ai dati delle modifiche in tempo reale viene compilato con il timestamp attuale della filigrana bassa del connettore Kafka.

Se non viene generato alcun record, il connettore Kafka invia "heartbeat" periodici della filigrana agli argomenti di output Kafka rilevati dal connettore.

Questi heartbeat della filigrana sono record vuoti, ad eccezione del campo low_watermark. Puoi quindi utilizzare la filigrana bassa per eseguire aggregazioni basate sul tempo. Ad esempio, puoi utilizzare la filigrana bassa per ordinare gli eventi in base al timestamp di commit nelle chiavi primarie.

Argomenti dei metadati

Il connettore Kafka, nonché il framework Kafka Connect, creano diversi argomenti di metadati per archiviare le informazioni relative al connettore. Non è consigliabile modificare la configurazione o i contenuti di questi argomenti dei metadati.

Di seguito sono riportati gli argomenti dei metadati:

  • _consumer_offsets: un argomento creato automaticamente da Kafka. Archivia le compensazioni per i consumatori creati nel connettore Kafka.
  • _kafka-connect-offsets: un argomento creato automaticamente da Kafka Connect. Archivia gli offset del connettore.
  • _sync_topic_spanner_connector_connectorname: un argomento creato automaticamente dal connettore. Archivia i metadati relativi alle partizioni delle modifiche in tempo reale.
  • _rebalancing_topic_spanner_connector_connectorname: un argomento creato automaticamente dal connettore. Utilizzato per determinare l'attività dell'attività del connettore.
  • _debezium-heartbeat.connectorname: un argomento utilizzato per elaborare gli heartbeat del flusso di modifiche di Spanner.

Runtime connettore Kafka

Di seguito viene descritto il runtime del connettore Kafka.

Scalabilità

Il connettore Kafka è scalabile orizzontalmente e viene eseguito su una o più attività distribuite tra più worker Kafka Connect.

Garanzie di consegna dei messaggi

Il connettore Kafka supporta la garanzia di consegna "atleast-once".

Tolleranza di errore

Il connettore Kafka tollera errori. Quando il connettore Kafka legge le modifiche e genera eventi, registra l'ultimo timestamp di commit elaborato per ogni partizione del flusso di modifiche. Se il connettore Kafka si arresta per qualsiasi motivo (inclusi errori di comunicazione, problemi di rete o guasti software), al riavvio del connettore Kafka continua il flusso di dati dei record dal punto in cui è stato interrotto.

Il connettore Kafka legge lo schema delle informazioni al timestamp di inizio del connettore Kafka per recuperare le informazioni sullo schema. Per impostazione predefinita, Spanner non può leggere lo schema di informazioni nei timestamp di lettura precedenti al periodo di conservazione della versione, che per impostazione predefinita è di un'ora. Se vuoi avviare il connettore da più di un'ora nel passato, devi aumentare il periodo di conservazione della versione del database.

Configurare il connettore Kafka

Crea un flusso di modifiche

Per informazioni dettagliate su come creare una modifica in tempo reale, consulta Creare una modifica in tempo reale. Per continuare con i passaggi successivi, è richiesta un'istanza Spanner con un flusso di modifiche configurato.

Tieni presente che se vuoi che le colonne modificate e non modificate vengano restituite per ogni evento di modifica dei dati, utilizza il tipo di acquisizione dei valori NEW_ROW. Per saperne di più, consulta la sezione Tipo di acquisizione del valore.

Installa il JAR del connettore Kafka

Con Zookeeper, Kafka e Kafka Connect installati, le attività rimanenti per implementare un connettore Kafka consistono nel scaricare l'archivio del plug-in del connettore, estrarre i file JAR nell'ambiente Kafka Connect e aggiungere la directory con i file JAR al plugin.path di Kafka Connect. Quindi, devi riavviare il processo di Kafka Connect per recuperare i nuovi file JAR.

Se utilizzi container immutabili, puoi eseguire il pull di immagini dalle immagini container di Debezium per Zookeeper, Kafka e Kafka Connect. Nell'immagine Kafka Connect ha il connettore Spanner preinstallato.

Per maggiori informazioni su come installare i JAR del connettore Kafka basati su Debezium, consulta Installazione di Debezium.

Configurare il connettore Kafka

Di seguito è riportato un esempio di configurazione per un connettore Kafka che si connette a un flusso di modifiche denominato changeStreamAll nel database users nell'istanza test-instance e nel progetto test-project.

"name": "spanner-connector",
"config": {
    "connector.class": "io.debezium.connector.spanner.SpannerConnector",
    "gcp.spanner.project.id": "test-project",
    "gcp.spanner.instance.id": "test-instance",
    "gcp.spanner.database.id": "users",
    "gcp.spanner.change.stream": "changeStreamAll",
    "gcp.spanner.credentials.json": "{"client_id": user@example.com}",
    "gcp.spanner.database.role": "cdc-role",
    "tasks.max": "10"
}

Questa configurazione contiene quanto segue:

  • Il nome del connettore quando è registrato con un servizio Kafka Connect.

  • Il nome di questa classe del connettore Spanner.

  • L'ID progetto.

  • L'ID istanza Spanner.

  • L'ID database Spanner.

  • Il nome della modifica in tempo reale.

  • L'oggetto JSON per la chiave dell'account di servizio.

  • (Facoltativo) Il ruolo database Spanner da utilizzare.

  • Il numero massimo di attività.

Per un elenco completo delle proprietà del connettore, vedi Proprietà di configurazione del connettore Kafka.

Aggiungere la configurazione del connettore a Kafka Connect

Per iniziare a eseguire un connettore Spanner:

  1. Creare una configurazione per il connettore Spanner.

  2. Utilizza l'API REST Kafka Connect per aggiungere la configurazione del connettore al cluster Kafka Connect.

Puoi inviare questa configurazione con un comando POST a un servizio Kafka Connect in esecuzione. Per impostazione predefinita, il servizio Kafka Connect viene eseguito sulla porta 8083. Il servizio registra la configurazione e avvia l'attività del connettore che si connette al database Spanner e trasmette i record degli eventi di modifica agli argomenti Kafka.

Di seguito è riportato un esempio di comando POST:

POST /connectors HTTP/1.1
Host: http://localhost:8083
Accept: application/json
{
  "name": "spanner-connector"
  "config": {
      "connector.class": "io.debezium.connector.spanner.SpannerConnector",
      "gcp.spanner.project.id": "test-project",
      "gcp.spanner.instance.id": "test-instance",
      "gcp.spanner.database.id": "users",
      "gcp.spanner.change.stream": "changeStreamAll",
      "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
      "heartbeat.interval.ms": "100",
      "tasks.max": "10"
  }
}

Esempio di risposta corretta:

HTTP/1.1 201 Created
Content-Type: application/json
{
    "name": "spanner-connector",
    "config": {
        "connector.class": "io.debezium.connector.spanner.SpannerConnector",
        "gcp.spanner.project.id": "test-project",
        "gcp.spanner.instance.id": "test-instance",
        "gcp.spanner.database.id": "users",
        "gcp.spanner.change.stream": "changeStreamAll",
        "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
        "heartbeat.interval.ms": "100",
        "tasks.max": "10"
    },
    "tasks": [
        { "connector": "spanner-connector", "task": 1 },
        { "connector": "spanner-connector", "task": 2 },
        { "connector": "spanner-connector", "task": 3 }
    ]
}

Aggiorna la configurazione del connettore Kafka

Per aggiornare la configurazione del connettore, invia un comando PUT al servizio Kafka Connect in esecuzione con lo stesso nome del connettore.

Supponiamo di avere un connettore in esecuzione con la configurazione della sezione precedente. Di seguito è riportato un esempio di comando PUT:

PUT /connectors/spanner-connector/config HTTP/1.1
Host: http://localhost:8083
Accept: application/json
{
    "connector.class": "io.debezium.connector.spanner.SpannerConnector",
    "gcp.spanner.project.id": "test-project",
    "gcp.spanner.instance.id": "test-instance",
    "gcp.spanner.database.id": "users",
    "gcp.spanner.change.stream": "changeStreamAll",
    "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
    "heartbeat.interval.ms": "100",
    "tasks.max": "10"
}

Esempio di risposta corretta:

HTTP/1.1 200 OK
Content-Type: application/json
{
    "connector.class": "io.debezium.connector.spanner.SpannerConnector",
    "tasks.max": "10",
    "gcp.spanner.project.id": "test-project",
    "gcp.spanner.instance.id": "test-instance",
    "gcp.spanner.database.id": "users",
    "gcp.spanner.change.stream": "changeStreamAll",
    "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
    "heartbeat.interval.ms": "100",
    "tasks.max": "10"
}

Arresta il connettore Kafka

Per arrestare il connettore, invia un comando DELETE al servizio Kafka Connect in esecuzione con lo stesso nome del connettore.

Supponiamo di avere un connettore in esecuzione con la configurazione della sezione precedente. Di seguito è riportato un esempio di comando DELETE:

DELETE /connectors/spanner-connector HTTP/1.1
Host: http://localhost:8083

Esempio di risposta corretta:

HTTP/1.1 204 No Content

Monitorare il connettore Kafka

Oltre alle metriche standard di Kafka Connect e Debezium, il connettore Kafka esporta le proprie metriche:

  • MilliSecondsLowWatermark: la filigrana bassa attuale dell'attività del connettore in millisecondi. La filigrana bassa descrive il momento T in cui è garantito che il connettore abbia trasmesso in streaming tutti gli eventi con timestamp < T

  • MilliSecondsLowWatermarkLag: il ritardo del watermark basso rispetto al tempo attuale in millisecondi. Ha trasmesso in streaming tutti gli eventi con timestamp < T

  • LatencyLowWatermark<Variant>MilliSeconds: il ritardo in millisecondi del livello della filigrana basso rispetto al tempo attuale. Sono incluse le varianti P50, P95, P99, Media, Min e Max.

  • LatencySpanner<Variant>MilliSeconds: la latenza Spanner-commit-timestamp-to-connector-read. Sono disponibili le varianti P50, P95, P99, Media, Min e Max.

  • LatencyReadToEmit<Variant>MilliSeconds: la latenza Spanner-read-timestamp-to-connector-emit. Sono incluse le varianti P50, P95, P99, Media, Min e Max.

  • LatencyCommitToEmit<Variant>tMilliSeconds: la latenza Spanner-commit-timestamp-to-connector-emit. Sono incluse le varianti P50, P95, P99, Media, Min e Max.

  • LatencyCommitToPublish<Variant>MilliSeconds: la latenza Spanner-commit-timestamp-to Kafka-publish-timestamp. Sono disponibili le varianti P50, P95, P99, Media, Min e Max.

  • NumberOfChangeStreamPartitionsDetected: il numero totale di partizioni rilevate dall'attività del connettore corrente.

  • NumberOfChangeStreamQueriesIssued: il numero totale di query di modifiche in tempo reale emesse dall'attività corrente.

  • NumberOfActiveChangeStreamQueries: il numero attivo di query di modifiche in tempo reale rilevate dall'attività del connettore corrente.

  • SpannerEventQueueCapacity: la capacità totale di StreamEventQueue, una coda che archivia gli elementi ricevuti dalle query relative a modifiche in tempo reale.

  • SpannerEventQueueCapacity: la capacità rimanente di StreamEventQueue.

  • TaskStateChangeEventQueueCapacity: la capacità totale di TaskStateChangeEventQueue, una coda in cui sono archiviati gli eventi che si verificano nel connettore.

  • RemainingTaskStateChangeEventQueueCapacity: la capacità rimanente di TaskStateChangeEventQueue.

  • NumberOfActiveChangeStreamQueries: il numero attivo di query di modifiche in tempo reale rilevate dall'attività del connettore corrente.

Proprietà di configurazione del connettore Kafka

Di seguito sono riportate le proprietà di configurazione obbligatorie per il connettore:

  • name: nome univoco del connettore. Un nuovo tentativo di registrazione con lo stesso nome provoca un errore. Questa proprietà è obbligatoria per tutti i connettori Kafka Connect.

  • connector.class: il nome della classe Java del connettore. Utilizza sempre un valore di io.debezium.connector.spanner.SpannerConnector per il connettore Kafka.

  • tasks.max: il numero massimo di attività da creare per questo connettore.

  • gcp.spanner.project.id: l'ID progetto

  • gcp.spanner.instance.id: l'ID istanza Spanner

  • gcp.spanner.database.id: l'ID database Spanner

  • gcp.spanner.change.stream: nome del flusso di modifiche di Spanner

  • gcp.spanner.credentials.json: l'oggetto JSON della chiave dell'account di servizio.

  • gcp.spanner.credentials.path: il percorso del file dell'oggetto JSON della chiave dell'account di servizio. Obbligatorio se il campo riportato sopra non viene fornito.

  • gcp.spanner.database.role : il ruolo database Spanner da utilizzare. Questa operazione è necessaria solo se il flusso di modifiche è protetto con controllo dell'accesso granulare. Il ruolo del database deve disporre del privilegio SELECT per il flusso di modifiche e del privilegio EXECUTE per la funzione di lettura del flusso di modifiche. Per maggiori informazioni, consulta Controllo dell'controllo dell'accesso granulare per i flussi di modifiche.

Le seguenti proprietà di configurazione avanzate hanno valori predefiniti validi nella maggior parte delle situazioni e pertanto raramente devono essere specificati nella configurazione del connettore:

  • gcp.spanner.low-watermark.enabled: indica se la filigrana bassa è attiva per il connettore. Il valore predefinito è false.

  • gcp.spanner.low-watermark.update-period.ms: l'intervallo con cui viene aggiornato il livello basso della filigrana. Il valore predefinito è 1000 ms.

  • heartbeat.interval.ms: l'intervallo Heartbeat di Spanner. Il valore predefinito è 300.000 (cinque minuti).

  • gcp.spanner.start.time: l'ora di inizio del connettore. Il valore predefinito è l'ora corrente.

  • gcp.spanner.end.time: l'ora di fine del connettore. Il valore predefinito è infinito.

  • tables.exclude.list: le tabelle per cui escludere gli eventi di modifica. Il valore predefinito è vuoto.

  • tables.include.list: le tabelle per cui includere gli eventi di modifica. Se non viene compilata, vengono incluse tutte le tabelle. Il valore predefinito è vuoto.

  • gcp.spanner.stream.event.queue.capacity: la capacità della coda eventi di Spanner. Il valore predefinito è 10.000.

  • connector.spanner.task.state.change.event.queue.capacity: la capacità della coda di eventi di modifica dello stato dell'attività. Il valore predefinito è 1000.

  • connector.spanner.max.missed.heartbeats: il numero massimo di heartbeat mancanti per una query di modifiche in tempo reale prima che venga generata un'eccezione. Il valore predefinito è 10.

  • scaler.monitor.enabled: indica se la scalabilità automatica delle attività è abilitata. Il valore predefinito è false.

  • tasks.desired.partitions: il numero preferito di partizioni di modifiche in tempo reale per attività. Questo parametro è necessario per la scalabilità automatica delle attività. Il valore predefinito è 2.

  • tasks.min: il numero minimo di attività. Questo parametro è necessario per la scalabilità automatica delle attività. Il valore predefinito è 1.

  • connector.spanner.sync.topic: il nome dell'argomento di sincronizzazione, un argomento del connettore interno utilizzato per archiviare le comunicazioni tra le attività. Se l'utente non ha fornito un nome, il valore predefinito è _sync_topic_spanner_connector_connectorname.

  • connector.spanner.sync.poll.duration: la durata del sondaggio per l'argomento di sincronizzazione. Il valore predefinito è 500 ms.

  • connector.spanner.sync.request.timeout.ms: il timeout per le richieste all'argomento di sincronizzazione. Il valore predefinito è 5000 ms.

  • connector.spanner.sync.delivery.timeout.ms: il timeout per la pubblicazione nell'argomento di sincronizzazione. Il valore predefinito è 15.000 ms.

  • connector.spanner.sync.commit.offsets.interval.ms: l'intervallo in cui viene eseguito il commit degli offset per l'argomento di sincronizzazione. Il valore predefinito è 60.000 ms.

  • connector.spanner.sync.publisher.wait.timeout: l'intervallo con cui i messaggi vengono pubblicati nell'argomento di sincronizzazione. Il valore predefinito è 5 ms.

  • connector.spanner.rebalancing.topic: il nome dell'argomento in fase di riequilibrio. L'argomento del ribilanciamento è un argomento del connettore interno utilizzato per determinare l'attività dell'attività. Se l'utente non ha fornito un nome, il valore predefinito è _rebalancing_topic_spanner_connector_connectorname.

  • connector.spanner.rebalancing.poll.duration: la durata del sondaggio per l'argomento in fase di riequilibrio. Il valore predefinito è 5000 ms.

  • connector.spanner.rebalancing.commit.offsets.timeout: il timeout per l'esecuzione del commit degli offset per l'argomento in fase di ribilanciamento. Il valore predefinito è 5000 ms.

  • connector.spanner.rebalancing.commit.offsets.interval.ms: l'intervallo in cui viene eseguito il commit degli offset per l'argomento di sincronizzazione. Il valore predefinito è 60.000 ms.

  • connector.spanner.rebalancing.task.waiting.timeout: il tempo di attesa di un'attività prima di elaborare un evento di ribilanciamento. Il valore predefinito è 1000 ms.

Per un elenco ancora più dettagliato delle proprietà configurabili del connettore, consulta il repository di GitHub.

Limitazioni