Crea connessioni di modifiche in tempo reale a Kafka

Questa pagina spiega come utilizzare il connettore Kafka per utilizzare e inoltrare i flussi di modifiche di Spanner.

Concetti principali

Di seguito sono descritti i concetti principali del connettore Kafka.

Debezium

Debezium è un progetto open source che fornisce una piattaforma per flussi di dati a bassa latenza per l'acquisizione dei dati dei cambiamenti.

Connettore Kafka

Il connettore Kafka fornisce un'astrazione sull'API Spanner per pubblicare modifiche in tempo reale Spanner in Kafka. Con questo connettore, non devi gestire il ciclo di vita della partizione delle modifiche in tempo reale, che è necessario quando utilizzi direttamente l'API Spanner.

Il connettore Kafka produce un evento di modifica per ogni mod di record delle modifiche dei dati e invia record di eventi di modifica a valle in un argomento Kafka separato per ogni tabella monitorata con flussi di modifiche. Una mod del record di modifiche dei dati rappresenta una singola modifica (inserimento, aggiornamento o eliminazione) acquisita. Un singolo record di modifiche dati può contenere più di una mod.

Output connettore Kafka

Il connettore Kafka inoltra i record modifiche in tempo realee direttamente a un argomento Kafka separato. Il nome dell'argomento di output deve essere connector_name.table_name. Se l'argomento non esiste, il connettore Kafka crea automaticamente un argomento con quel nome.

Puoi anche configurare trasformazioni del routing degli argomenti per reindirizzare i record in argomenti da te specificati. Se vuoi utilizzare il routing degli argomenti, disattiva la funzionalità della filigrana bassa.

Ordine di dischi

I record sono ordinati per timestamp di commit per chiave primaria negli argomenti Kafka. I record appartenenti a chiavi primarie diverse non hanno garanzie di ordine. I record con la stessa chiave primaria sono archiviati nella stessa partizione di argomento Kafka. Se vuoi elaborare intere transazioni, puoi anche utilizzare i campi server_transaction_id e number_of_records_in_transaction del record delle modifiche dei dati per combinare una transazione Spanner.

Eventi di modifica

Il connettore Kafka genera un evento di modifica dei dati per ogni operazione INSERT, UPDATE e DELETE. Ogni evento contiene una chiave e i valori per la riga modificata.

Puoi utilizzare gli utenti che hanno completato una conversione di Kafka Connect per produrre eventi di modifica dei dati nei formati Protobuf, AVRO, JSON o JSON Schemaless. Se utilizzi un convertitore Kafka Connect che produce schemi, l'evento contiene schemi separati per le chiavi e i valori. In caso contrario, l'evento contiene solo la chiave e i valori.

Lo schema della chiave non cambia mai. Lo schema dei valori è una combinazione di tutte le colonne monitorate dalla modifica in tempo reale dall'ora di inizio del connettore.

Se configuri il connettore per produrre eventi JSON, l'evento di modifica dell'output contiene cinque campi:

  • Il primo campo schema specifica uno schema Kafka Connect che descrive lo schema chiave di Spanner.

  • Il primo campo payload ha la struttura descritta dal campo schema precedente e contiene la chiave per la riga modificata.

  • Il secondo campo schema specifica lo schema Kafka Connect che descrive lo schema per la riga modificata.

  • Il secondo campo payload ha la struttura descritta dal campo schema precedente e contiene i dati effettivi per la riga modificata.

  • Il campo source è un campo obbligatorio che descrive i metadati di origine dell'evento.

Di seguito è riportato un esempio di evento di modifica dei dati:

{
  // The schema for the Spanner key.
  "schema": {
    "type": "struct",
    "name": "customers.Key",
    "optional": false,
    "fields": [
      {
        "type": "int64",
        "optional": "false"
        "field": "false"
      }
    ]
  },
  // The value of the Spanner key.
  "payload": {
      "id": "1"
  },
  // The schema for the payload, which contains the before and after values
  // of the changed row. The schema for the payload contains all the
  // columns that the change stream has tracked since the connector start
  // time.
  "schema": { 
    "type": "struct",
    "fields": [
      {
        // The schema for the before values of the changed row.
        "type": "struct",
        "fields": [
            {
                "type": "int32",
                "optional": false,
                "field": "id"
            },
            {
                "type": "string",
                "optional": true,
                "field": "first_name"
            }
        ],
        "optional": true,
        "name": "customers.Value",
        "field": "before"
      },
      {
        // The schema for the after values of the changed row.
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          }
        ],
          "optional": true,
          "name": "customers.Value",
          "field": "after"
        },
        {
          // The schema for the source metadata for the event.
          "type": "struct",
          "fields": [
            {
                "type": "string",
                "optional": false,
                "field": "version"
            },
            {
                "type": "string",
                "optional": false,
                "field": "connector"
            },
            {
                "type": "string",
                "optional": false,
                "field": "name"
            },
            {
                "type": "int64",
                "optional": false,
                "field": "ts_ms"
            },
            {
                "type": "boolean",
                "optional": true,
                "default": false,
                "field": "snapshot"
            },
            {
                "type": "string",
                "optional": false,
                "field": "db"
            },
            {
                "type": "string",
                "optional": false,
                "field": "sequence"
            },
            {
                "type": "string",
                "optional": false,
                "field": "project_id"
            },
            {
                "type": "string",
                "optional": false,
                "field": "instance_id"
            },
            {
                "type": "string",
                "optional": false,
                "field": "database_id"
            },
            {
                "type": "string",
                "optional": false,
                "field": "change_stream_name"
            },
            {
                "type": "string",
                "optional": true,
                "field": "table"
            }
            {
                "type": "string",
                "optional": true,
                "field": "server_transaction_id"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "low_watermark"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "read_at_timestamp"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "number_of_records_in_transaction"
            }
            {
                "type": "string",
                "optional": true,
                "field": "transaction_tag"
            }
            {
                "type": "boolean",
                "optional": true,
                "field": "system_transaction"
            }
            {
                "type": "string",
                "optional": true,
                "field": "value_capture_type"
            }
            {
                "type": "string",
                "optional": true,
                "field": "partition_token"
            }
            {
                "type": "int32",
                "optional": true,
                "field": "mod_number"
            }
            {
                "type": "boolean",
                "optional": true,
                "field": "is_last_record_in_transaction_in_partition"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "number_of_partitions_in_transaction"
            }
          ],
          "optional": false,
          "name": "io.debezium.connector.spanner.Source",
          "field": "source"
        },
      ]
      {
        "type": "string",
        "optional": false,
        "field": "op"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
      }
    ],
    "optional": false,
    "name": "connector_name.customers.Envelope"
  },
  "payload": {
    // The values of the row before the event.
    "before": null,
    // The values of the row after the event.
    "after": { 
        "id": 1,
        "first_name": "Anne",
    }
  },
  // The source metadata.
  "source": {
    "version": "{debezium-version}",
    "connector": "spanner",
    "name": "spanner_connector",
    "ts_ms": 1670955531785,
    "snapshot": "false",
    "db": "database",
    "sequence": "1",
    "project_id": "project",
    "instance_id": "instance",
    "database_id": "database",
    "change_stream_name": "change_stream",
    "table": "customers",
    "server_transaction_id": "transaction_id",
    "low_watermark": 1670955471635,
    "read_at_timestamp": 1670955531791,
    "number_records_in_transaction": 2,
    "transaction_tag": "",
    "system_transaction": false,
    "value_capture_type": "OLD_AND_NEW_VALUES",
    "partition_token": "partition_token",
    "mod_number": 0,
    "is_last_record_in_transaction_in_partition": true,
    "number_of_partitions_in_transaction": 1
  },
  "op": "c", 
  "ts_ms": 1559033904863 //
}

Watermark basso

La filigrana bassa descrive il momento T in cui viene garantito che il connettore Kafka sia trasmesso in streaming e pubblicato in un argomento Kafka per tutti gli eventi con timestamp < T.

Puoi abilitare la filigrana bassa nel connettore Kafka utilizzando il parametro gcp.spanner.low-watermark.enabled. Questo parametro è disattivato per impostazione predefinita. Se è abilitata la filigrana bassa, il campo low_watermark nel record delle modifiche ai dati delle modifiche in tempo reale viene compilato con l'attuale timestamp della filigrana bassa del connettore Kafka.

Se non viene generato alcun record, il connettore Kafka invia periodicamente "heartbeat" della filigrana agli argomenti di output Kafka rilevati dal connettore.

Questi heartbeat della filigrana sono record vuoti, ad eccezione del campo low_watermark. Puoi quindi utilizzare la filigrana bassa per eseguire aggregazioni basate sul tempo. Ad esempio, puoi utilizzare la filigrana bassa per ordinare gli eventi in base al timestamp di commit nelle chiavi primarie.

Argomenti dei metadati

Il connettore Kafka e il framework Kafka Connect creano diversi argomenti di metadati per archiviare le informazioni relative al connettore. Non è consigliabile modificare la configurazione o il contenuto di questi argomenti dei metadati.

Di seguito sono riportati gli argomenti dei metadati:

  • _consumer_offsets: un argomento creato automaticamente da Kafka. Archivia le compensazioni dei consumer per i consumatori creati nel connettore Kafka.
  • _kafka-connect-offsets: un argomento creato automaticamente da Kafka Connect. Archivia gli offset dei connettori.
  • _sync_topic_spanner_connector_connectorname: un argomento creato automaticamente dal connettore. Archivia i metadati relativi alle partizioni delle modifiche in tempo reale.
  • _rebalancing_topic_spanner_connector_connectorname: un argomento creato automaticamente dal connettore. Utilizzato per determinare l'attività del connettore.
  • _debezium-heartbeat.connectorname: un argomento utilizzato per elaborare gli heartbeat di modifiche in tempo reale di Spanner.

Runtime connettore Kafka

Di seguito viene descritto il runtime del connettore Kafka.

Scalabilità

Il connettore Kafka è scalabile orizzontalmente ed è eseguito su una o più attività distribuite tra più worker Kafka Connect.

Garanzie di consegna dei messaggi

Il connettore Kafka supporta la garanzia di consegna "at-least-once".

Tolleranza di errore

Il connettore Kafka è tollerante agli errori. Quando il connettore Kafka legge le modifiche e produce eventi, registra il timestamp dell'ultimo commit elaborato per ogni partizione del flusso di modifiche. Se il connettore Kafka si arresta per qualsiasi motivo (compresi errori di comunicazione, problemi di rete o errori del software), al riavvio il connettore Kafka continua a trasferire i record dal punto in cui era stato interrotto.

Il connettore Kafka legge lo schema di informazioni in corrispondenza del timestamp di inizio del connettore Kafka per recuperare le informazioni dello schema. Per impostazione predefinita, Spanner non può leggere lo schema delle informazioni ai timestamp di lettura precedenti al periodo di conservazione della versione, che per impostazione predefinita è di un'ora. Se vuoi avviare il connettore da meno di un'ora nel passato, devi incrementare il periodo di conservazione delle versioni del database.

configura il connettore Kafka

Crea un flusso di modifiche

Per maggiori dettagli su come creare un flusso di modifiche, consulta Creare un flusso di modifiche. Per continuare con i passaggi successivi, è necessaria un'istanza Spanner con un flusso di modifiche configurato.

Tieni presente che se vuoi che per ogni evento di modifica dei dati vengano restituite colonne modificate e non modificate, utilizza il tipo di acquisizione valore NEW_ROW. Per ulteriori informazioni, consulta la sezione Tipo di acquisizione valore.

Installa il JAR del connettore Kafka

Dopo avere installato Zookeeper, Kafka e Kafka Connect, le restanti attività per eseguire il deployment di un connettore Kafka sono scaricare l'archivio dei plug-in del connettore, estrarre i file JAR nel tuo ambiente Kafka Connect e aggiungere la directory con i file JAR alla plugin.path di Kafka Connect. Devi quindi riavviare il processo Kafka Connect per recuperare i nuovi file JAR.

Se utilizzi container immutabili, puoi eseguire il pull delle immagini dalle immagini container di Debezium per Zookeeper, Kafka e Kafka Connect. L'immagine Kafka Connect ha il connettore Spanner preinstallato.

Per maggiori informazioni su come installare i JAR del connettore Kafka basati su Debezium, consulta Installazione di Debezium.

configura il connettore Kafka

Di seguito è riportato un esempio della configurazione di un connettore Kafka che si connette a un flusso di modifiche denominato changeStreamAll nel database users nell'istanza test-instance e nel progetto test-project.

"name": "spanner-connector",
"config": {
    "connector.class": "io.debezium.connector.spanner.SpannerConnector",
    "gcp.spanner.project.id": "test-project",
    "gcp.spanner.instance.id": "test-instance",
    "gcp.spanner.database.id": "users",
    "gcp.spanner.change.stream": "changeStreamAll",
    "gcp.spanner.credentials.json": "{"client_id": user@example.com}",
    "gcp.spanner.database.role": "cdc-role",
    "tasks.max": "10"
}

Questa configurazione contiene:

  • Il nome del connettore quando registrato con un servizio Kafka Connect.

  • Il nome di questa classe del connettore Spanner.

  • L'ID del progetto.

  • L'ID istanza Spanner.

  • L'ID database di Spanner.

  • Il nome della modifica in tempo reale.

  • L'oggetto JSON per la chiave dell'account di servizio.

  • (Facoltativo) Il ruolo di database Spanner da utilizzare.

  • Il numero massimo di attività.

Per un elenco completo delle proprietà del connettore, consulta Proprietà di configurazione del connettore Kafka.

aggiungi la configurazione del connettore a Kafka Connect

Per avviare un connettore Spanner:

  1. Creare una configurazione per il connettore Spanner.

  2. Utilizza l'API REST Kafka Connect per aggiungere la configurazione del connettore al cluster Kafka Connect.

Puoi inviare questa configurazione con un comando POST a un servizio Kafka Connect in esecuzione. Per impostazione predefinita, il servizio Kafka Connect viene eseguito sulla porta 8083. Il servizio registra la configurazione e avvia l'attività del connettore che si connette al database Spanner e trasmette i flussi di record di eventi di modifica agli argomenti Kafka.

Di seguito è riportato un esempio di comando POST:

POST /connectors HTTP/1.1
Host: http://localhost:8083
Accept: application/json
{
  "name": "spanner-connector"
  "config": {
      "connector.class": "io.debezium.connector.spanner.SpannerConnector",
      "gcp.spanner.project.id": "test-project",
      "gcp.spanner.instance.id": "test-instance",
      "gcp.spanner.database.id": "users",
      "gcp.spanner.change.stream": "changeStreamAll",
      "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
      "heartbeat.interval.ms": "100",
      "tasks.max": "10"
  }
}

Esempio di risposta corretta:

HTTP/1.1 201 Created
Content-Type: application/json
{
    "name": "spanner-connector",
    "config": {
        "connector.class": "io.debezium.connector.spanner.SpannerConnector",
        "gcp.spanner.project.id": "test-project",
        "gcp.spanner.instance.id": "test-instance",
        "gcp.spanner.database.id": "users",
        "gcp.spanner.change.stream": "changeStreamAll",
        "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
        "heartbeat.interval.ms": "100",
        "tasks.max": "10"
    },
    "tasks": [
        { "connector": "spanner-connector", "task": 1 },
        { "connector": "spanner-connector", "task": 2 },
        { "connector": "spanner-connector", "task": 3 }
    ]
}

Aggiorna la configurazione del connettore Kafka

Per aggiornare la configurazione del connettore, invia un comando PUT al servizio Kafka Connect in esecuzione con lo stesso nome connettore.

Supponi di avere un connettore in esecuzione con la configurazione della sezione precedente. Di seguito è riportato un esempio di comando PUT:

PUT /connectors/spanner-connector/config HTTP/1.1
Host: http://localhost:8083
Accept: application/json
{
    "connector.class": "io.debezium.connector.spanner.SpannerConnector",
    "gcp.spanner.project.id": "test-project",
    "gcp.spanner.instance.id": "test-instance",
    "gcp.spanner.database.id": "users",
    "gcp.spanner.change.stream": "changeStreamAll",
    "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
    "heartbeat.interval.ms": "100",
    "tasks.max": "10"
}

Esempio di risposta corretta:

HTTP/1.1 200 OK
Content-Type: application/json
{
    "connector.class": "io.debezium.connector.spanner.SpannerConnector",
    "tasks.max": "10",
    "gcp.spanner.project.id": "test-project",
    "gcp.spanner.instance.id": "test-instance",
    "gcp.spanner.database.id": "users",
    "gcp.spanner.change.stream": "changeStreamAll",
    "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
    "heartbeat.interval.ms": "100",
    "tasks.max": "10"
}

Arresta il connettore Kafka

Per arrestare il connettore, invia un comando DELETE al servizio Kafka Connect in esecuzione con lo stesso nome connettore.

Supponi di avere un connettore in esecuzione con la configurazione della sezione precedente. Di seguito è riportato un esempio di comando DELETE:

DELETE /connectors/spanner-connector HTTP/1.1
Host: http://localhost:8083

Esempio di risposta corretta:

HTTP/1.1 204 No Content

Monitora il connettore Kafka

Oltre alle metriche standard Kafka Connect e Debezium, il connettore Kafka esporta le proprie metriche:

  • MilliSecondsLowWatermark: l'attuale filigrana bassa dell'attività del connettore in millisecondi. La filigrane bassa descrive il momento T in cui è garantito che il connettore esegua lo streaming di tutti gli eventi con timestamp < T

  • MilliSecondsLowWatermarkLag: il ritardo della filigrana in basso rispetto all'ora attuale in millisecondi. Tutti gli eventi sono stati trasmessi con timestamp < T

  • LatencyLowWatermark<Variant>MilliSeconds: il ritardo della filigrana in basso rispetto all'ora attuale in millisecondi. Sono disponibili le varianti P50, P95, P99, Media, Min e Max.

  • LatencySpanner<Variant>MilliSeconds: latenza Spanner-commit-timestamp-to-connector-read. Sono disponibili le varianti P50, P95, P99, Media, Min, Max.

  • LatencyReadToEmit<Variant>MilliSeconds: la latenza Spanner-read-timestamp-to-connector-emit. Sono disponibili le varianti P50, P95, P99, Media, Min e Max.

  • LatencyCommitToEmit<Variant>tMilliSeconds: latenza di Spanner-commit-timestamp-to-connector-emit. Sono disponibili le varianti P50, P95, P99, Media, Min e Max.

  • LatencyCommitToPublish<Variant>MilliSeconds: la latenza Spanner-commit-timestamp-to Kafka-publish-timestamp. Sono disponibili le varianti P50, P95, P99, Media, Min, Max.

  • NumberOfChangeStreamPartitionsDetected: il numero totale di partizioni rilevate dall'attività del connettore attuale.

  • NumberOfChangeStreamQueriesIssued: il numero totale di query di modifiche in tempo reale emesse dall'attività corrente.

  • NumberOfActiveChangeStreamQueries: il numero attivo di query di modifiche in tempo reale rilevate dall'attività del connettore attuale.

  • SpannerEventQueueCapacity: la capacità totale di StreamEventQueue, una coda che archivia gli elementi ricevuti dalle query di modifiche in tempo reale.

  • SpannerEventQueueCapacity: la capacità rimanente di StreamEventQueue.

  • TaskStateChangeEventQueueCapacity: la capacità totale di TaskStateChangeEventQueue, una coda che archivia gli eventi che si verificano nel connettore.

  • RemainingTaskStateChangeEventQueueCapacity: la capacità rimanente di TaskStateChangeEventQueue.

  • NumberOfActiveChangeStreamQueries: il numero attivo di query di modifiche in tempo reale rilevate dall'attività del connettore attuale.

Proprietà di configurazione del connettore Kafka

Di seguito sono riportate le proprietà di configurazione obbligatorie per il connettore:

  • name: nome univoco del connettore. Il tentativo di registrazione di nuovo con lo stesso nome causa un errore. Questa proprietà è obbligatoria per tutti i connettori Kafka Connect.

  • connector.class: il nome della classe Java per il connettore. Usa sempre il valore io.debezium.connector.spanner.SpannerConnector per il connettore Kafka.

  • tasks.max: il numero massimo di attività che devono essere create per questo connettore.

  • gcp.spanner.project.id: l'ID progetto

  • gcp.spanner.instance.id: l'ID istanza Spanner

  • gcp.spanner.database.id: l'ID database di Spanner

  • gcp.spanner.change.stream: il nome delle modifiche in tempo reale di Spanner

  • gcp.spanner.credentials.json: l'oggetto JSON della chiave dell'account di servizio.

  • gcp.spanner.credentials.path: il percorso del file dell'oggetto JSON chiave dell'account di servizio. Obbligatorio se il campo sopra non è stato fornito.

  • gcp.spanner.database.role : il ruolo di database Spanner da utilizzare. Questa operazione è necessaria solo se il flusso di modifiche è protetto con controllo dell'accesso granulare. Il ruolo del database deve avere il privilegio SELECT per il flusso di modifiche e il privilegio EXECUTE per la funzione di lettura della modifica in tempo reale. Per ulteriori informazioni, consulta Controllo dell'controllo dell'accesso granulare per i flussi di modifiche.

Le seguenti proprietà di configurazione avanzata hanno valori predefiniti che funzionano nella maggior parte delle situazioni e pertanto raramente devono essere specificati nella configurazione del connettore:

  • gcp.spanner.low-watermark.enabled: indica se la filigrana bassa è attiva per il connettore. Il valore predefinito è false.

  • gcp.spanner.low-watermark.update-period.ms: l'intervallo di aggiornamento della filigrana bassa. Il valore predefinito è 1000 ms.

  • heartbeat.interval.ms: l'intervallo heartbeat di Spanner. Il valore predefinito è 300000 (cinque minuti).

  • gcp.spanner.start.time: l'ora di inizio del connettore. Il valore predefinito è l'ora attuale.

  • gcp.spanner.end.time: l'ora di fine del connettore. Il valore predefinito è infinito.

  • tables.exclude.list: le tabelle per cui escludere gli eventi di modifica. Il campo predefinito è vuoto.

  • tables.include.list: le tabelle per cui includere gli eventi di modifica. Se non viene compilato, vengono incluse tutte le tabelle. Il campo predefinito è vuoto.

  • gcp.spanner.stream.event.queue.capacity: capacità della coda degli eventi di Spanner. Il valore predefinito è 10.000.

  • connector.spanner.task.state.change.event.queue.capacity: la capacità della coda degli eventi di modifica dello stato dell'attività. Il valore predefinito è 1000.

  • connector.spanner.max.missed.heartbeats: il numero massimo di heartbeat mancanti per una query di modifiche in tempo reale prima che venga generata un'eccezione. Il valore predefinito è 10.

  • scaler.monitor.enabled: indica se la scalabilità automatica delle attività è abilitata. Il valore predefinito è false.

  • tasks.desired.partitions: il numero preferito di partizioni di modifiche in tempo reale per attività. Questo parametro è necessario per la scalabilità automatica delle attività. Il valore predefinito è 2.

  • tasks.min: il numero minimo di attività. Questo parametro è necessario per la scalabilità automatica delle attività. Il valore predefinito è 1.

  • connector.spanner.sync.topic: il nome dell'argomento sincronizzazione, un argomento del connettore interno utilizzato per archiviare le comunicazioni tra le attività. Il valore predefinito è _sync_topic_spanner_connector_connectorname se l'utente non ha fornito un nome.

  • connector.spanner.sync.poll.duration: la durata del sondaggio per l'argomento di sincronizzazione. Il valore predefinito è 500 ms.

  • connector.spanner.sync.request.timeout.ms: il timeout per le richieste all'argomento di sincronizzazione. Il valore predefinito è 5000 ms.

  • connector.spanner.sync.delivery.timeout.ms: il timeout per la pubblicazione nell'argomento di sincronizzazione. Il valore predefinito è 15.000 ms.

  • connector.spanner.sync.commit.offsets.interval.ms: l'intervallo in cui viene eseguito il commit degli offset per l'argomento di sincronizzazione. Il valore predefinito è 60.000 ms.

  • connector.spanner.sync.publisher.wait.timeout: l'intervallo con cui i messaggi vengono pubblicati nell'argomento di sincronizzazione. Il valore predefinito è 5 ms.

  • connector.spanner.rebalancing.topic: il nome dell'argomento di ribilanciamento. L'argomento di ribilanciamento è un argomento del connettore interno utilizzato per determinare l'attività. Il valore predefinito è _rebalancing_topic_spanner_connector_connectorname se l'utente non ha fornito un nome.

  • connector.spanner.rebalancing.poll.duration: la durata del sondaggio per l'argomento di ribilanciamento. Il valore predefinito è 5000 ms.

  • connector.spanner.rebalancing.commit.offsets.timeout: il timeout per il commit degli offset per l'argomento di ribilanciamento. Il valore predefinito è 5000 ms.

  • connector.spanner.rebalancing.commit.offsets.interval.ms: l'intervallo in cui viene eseguito il commit degli offset per l'argomento di sincronizzazione. Il valore predefinito è 60.000 ms.

  • connector.spanner.rebalancing.task.waiting.timeout: il tempo di attesa di un'attività prima di elaborare un evento di ribilanciamento. Il valore predefinito è 1000 ms.

Per un elenco ancora più dettagliato delle proprietà del connettore configurabili, consulta il repository GitHub.

Limitazioni