Crea connessioni di modifiche in tempo reale a Kafka

Questa pagina spiega come utilizzare il connettore Kafka per utilizzare e inoltrare i dati dei flussi di modifiche di Spanner.

Concetti principali

Di seguito sono descritti i concetti principali del connettore Kafka.

Debezium

Debezium è un progetto open source che offre una piattaforma per flussi di dati a bassa latenza per l'acquisizione dei dati delle modifiche.

Connettore Kafka

Il connettore Kafka fornisce un'astrazione sull'API Spanner per e pubblicare le modifiche in tempo reale di Spanner in Kafka. Con questo connettore, non devi gestire il ciclo di vita della partizione delle modifiche in tempo reale, che è necessario se utilizzi direttamente l'API Spanner.

Il connettore Kafka produce un evento di modifica per ogni mod del record di modifiche dei dati e invia record di eventi di modifica downstream in un argomento Kafka separato per ogni tabella monitorata con flussi di modifiche. Una mod del record di modifiche dei dati rappresenta una singola modifica (inserimento, aggiornamento o eliminazione) acquisita. Un singolo record di modifiche dati può contenere più di una mod.

Output connettore Kafka

Il connettore Kafka inoltra direttamente i record modifiche in tempo reale in un argomento Kafka separato. Il nome dell'argomento di output deve essere connector_name.table_name. Se l'argomento non esiste, il connettore Kafka crea automaticamente un argomento con quel nome.

Puoi anche configurare trasformazioni del routing degli argomenti per reindirizzare i record in argomenti da te specificati. Se vuoi utilizzare il routing degli argomenti, disattiva la funzionalità della filigrana bassa.

Ordine di dischi

I record sono ordinati per timestamp di commit per chiave primaria in gli argomenti Kafka. I record appartenenti a chiavi primarie diverse non hanno garanzie di ordinazione. I record con la stessa chiave primaria vengono archiviati la stessa partizione di un argomento Kafka. Se vuoi elaborare intere transazioni, puoi utilizza anche i dati di gestione delle modifiche dei dati server_transaction_id e number_of_records_in_transaction campi per di creare una transazione Spanner.

Eventi di modifica

Il connettore Kafka genera un evento di modifica dei dati per ogni INSERT, UPDATE, e l'operazione DELETE. Ogni evento contiene una chiave e i valori per la riga modificata.

Puoi utilizzare i convertitori di Kafka Connect per generare eventi di modifica dei dati in nei formati Protobuf, AVRO, JSON o JSON Schemaless. Se utilizzi un Convertitore Kafka Connect che produce schemi, l'evento contiene schemi separati per la chiave e i valori. In caso contrario, l'evento contiene solo la chiave e i valori.

Lo schema della chiave non cambia mai. Lo schema per i valori è un amalgamazione di tutte le colonne tracciate dal flusso di modifiche a partire e l'ora di inizio del connettore.

Se configuri il connettore per produrre eventi JSON, l'evento di modifica di output contiene cinque campi:

  • Il primo campo schema specifica uno schema Kafka Connect che descrive lo schema chiave di Spanner.

  • Il primo campo payload ha la struttura descritta dal campo schema precedente e contiene la chiave per la riga modificata.

  • Il secondo campo schema specifica lo schema Kafka Connect che descrive lo schema per la riga modificata.

  • Il secondo campo payload ha la struttura descritta dal campo schema precedente e contiene i dati effettivi per la riga modificata.

  • Il campo source è un campo obbligatorio che descrive i metadati di origine dell'evento.

Di seguito è riportato un esempio di evento di modifica dei dati:

{
  // The schema for the Spanner key.
  "schema": {
    "type": "struct",
    "name": "customers.Key",
    "optional": false,
    "fields": [
      {
        "type": "int64",
        "optional": "false"
        "field": "false"
      }
    ]
  },
  // The value of the Spanner key.
  "payload": {
      "id": "1"
  },
  // The schema for the payload, which contains the before and after values
  // of the changed row. The schema for the payload contains all the
  // columns that the change stream has tracked since the connector start
  // time.
  "schema": { 
    "type": "struct",
    "fields": [
      {
        // The schema for the before values of the changed row.
        "type": "struct",
        "fields": [
            {
                "type": "int32",
                "optional": false,
                "field": "id"
            },
            {
                "type": "string",
                "optional": true,
                "field": "first_name"
            }
        ],
        "optional": true,
        "name": "customers.Value",
        "field": "before"
      },
      {
        // The schema for the after values of the changed row.
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          }
        ],
          "optional": true,
          "name": "customers.Value",
          "field": "after"
        },
        {
          // The schema for the source metadata for the event.
          "type": "struct",
          "fields": [
            {
                "type": "string",
                "optional": false,
                "field": "version"
            },
            {
                "type": "string",
                "optional": false,
                "field": "connector"
            },
            {
                "type": "string",
                "optional": false,
                "field": "name"
            },
            {
                "type": "int64",
                "optional": false,
                "field": "ts_ms"
            },
            {
                "type": "boolean",
                "optional": true,
                "default": false,
                "field": "snapshot"
            },
            {
                "type": "string",
                "optional": false,
                "field": "db"
            },
            {
                "type": "string",
                "optional": false,
                "field": "sequence"
            },
            {
                "type": "string",
                "optional": false,
                "field": "project_id"
            },
            {
                "type": "string",
                "optional": false,
                "field": "instance_id"
            },
            {
                "type": "string",
                "optional": false,
                "field": "database_id"
            },
            {
                "type": "string",
                "optional": false,
                "field": "change_stream_name"
            },
            {
                "type": "string",
                "optional": true,
                "field": "table"
            }
            {
                "type": "string",
                "optional": true,
                "field": "server_transaction_id"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "low_watermark"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "read_at_timestamp"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "number_of_records_in_transaction"
            }
            {
                "type": "string",
                "optional": true,
                "field": "transaction_tag"
            }
            {
                "type": "boolean",
                "optional": true,
                "field": "system_transaction"
            }
            {
                "type": "string",
                "optional": true,
                "field": "value_capture_type"
            }
            {
                "type": "string",
                "optional": true,
                "field": "partition_token"
            }
            {
                "type": "int32",
                "optional": true,
                "field": "mod_number"
            }
            {
                "type": "boolean",
                "optional": true,
                "field": "is_last_record_in_transaction_in_partition"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "number_of_partitions_in_transaction"
            }
          ],
          "optional": false,
          "name": "io.debezium.connector.spanner.Source",
          "field": "source"
        },
      ]
      {
        "type": "string",
        "optional": false,
        "field": "op"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
      }
    ],
    "optional": false,
    "name": "connector_name.customers.Envelope"
  },
  "payload": {
    // The values of the row before the event.
    "before": null,
    // The values of the row after the event.
    "after": { 
        "id": 1,
        "first_name": "Anne",
    }
  },
  // The source metadata.
  "source": {
    "version": "{debezium-version}",
    "connector": "spanner",
    "name": "spanner_connector",
    "ts_ms": 1670955531785,
    "snapshot": "false",
    "db": "database",
    "sequence": "1",
    "project_id": "project",
    "instance_id": "instance",
    "database_id": "database",
    "change_stream_name": "change_stream",
    "table": "customers",
    "server_transaction_id": "transaction_id",
    "low_watermark": 1670955471635,
    "read_at_timestamp": 1670955531791,
    "number_records_in_transaction": 2,
    "transaction_tag": "",
    "system_transaction": false,
    "value_capture_type": "OLD_AND_NEW_VALUES",
    "partition_token": "partition_token",
    "mod_number": 0,
    "is_last_record_in_transaction_in_partition": true,
    "number_of_partitions_in_transaction": 1
  },
  "op": "c", 
  "ts_ms": 1559033904863 //
}

Watermark basso

La filigrana bassa descrive il momento T in cui viene garantito che il connettore Kafka sia trasmesso in streaming e pubblicato in un argomento Kafka di tutti gli eventi con timestamp < T.

Puoi abilitare la filigrana bassa nel connettore Kafka utilizzando Parametro gcp.spanner.low-watermark.enabled. Questo parametro è disattivato per impostazione predefinita. Se è attivata la filigrana bassa, il campo low_watermark nei dati delle modifiche in tempo reale il record di modifica viene compilato con la filigrana bassa attuale del connettore Kafka timestamp.

Se non viene generato alcun record, il connettore Kafka invia periodicamente "battiti cardiaci" con filigrana agli argomenti di output Kafka rilevati dal connettore.

Questi heartbeat della filigrana sono record vuoti, ad eccezione dei campo low_watermark. Puoi quindi utilizzare la filigrana bassa per eseguire aggregazioni basate sul tempo. Ad esempio, puoi utilizzare la filigrana bassa per ordinare gli eventi in base al commit le chiavi primarie.

Argomenti dei metadati

Il connettore Kafka e il framework Kafka Connect creano diversi degli argomenti dei metadati per archiviare le informazioni relative al connettore. È sconsigliato modificare la configurazione o i contenuti di questi argomenti dei metadati.

Di seguito sono riportati gli argomenti dei metadati:

  • _consumer_offsets: un argomento creato automaticamente da Kafka. Archivia le compensazioni dei consumer per i consumatori creati nel connettore Kafka.
  • _kafka-connect-offsets: un argomento creato automaticamente da Kafka Connect. Archivia gli offset dei connettori.
  • _sync_topic_spanner_connector_connectorname: un argomento creato automaticamente dal connettore. Archivia i metadati relativi alle partizioni delle modifiche in tempo reale.
  • _rebalancing_topic_spanner_connector_connectorname: un argomento creato automaticamente dal connettore. Utilizzato per determinare l'attività del connettore.
  • _debezium-heartbeat.connectorname: un argomento utilizzato per elaborare gli heartbeat di modifiche in tempo reale di Spanner.

Runtime connettore Kafka

Di seguito viene descritto il runtime del connettore Kafka.

Scalabilità

Il connettore Kafka è scalabile orizzontalmente ed è eseguito su una o più attività distribuite tra più worker Kafka Connect.

Garanzie di consegna dei messaggi

Il connettore Kafka supporta la garanzia di consegna "at-least-once".

Tolleranza di errore

Il connettore Kafka è tollerante agli errori. Quando il connettore Kafka legge le modifiche produce eventi, registra l'ultimo timestamp di commit elaborato per ogni modifica la partizione del flusso di dati. Se il connettore Kafka si arresta per qualsiasi motivo (inclusi errori di comunicazione, problemi di rete o errori del software), al riavvio il connettore Kafka continua a trasferire flussi di record dal punto in cui era stato interrotto.

Il connettore Kafka legge lo schema di informazioni all'inizio del connettore Kafka per recuperare le informazioni sullo schema. Per impostazione predefinita, Spanner non può lo schema di informazioni viene letto nei timestamp di lettura prima periodo di conservazione della versione, che per impostazione predefinita è di un'ora. Se vuoi avviare il connettore da prima della data corrente un'ora prima, devi aumentare la conservazione delle versioni del database punto.

configura il connettore Kafka

Crea un flusso di modifiche

Per maggiori dettagli su come creare un flusso di modifiche, consulta Creare un flusso di modifiche. Per continuare con i passaggi successivi, è necessaria un'istanza Spanner con un flusso di modifiche configurato.

Tieni presente che se desideri che vengano restituite sia le colonne modificate sia quelle non modificate evento di modifica dei dati, utilizza il tipo di acquisizione del valore NEW_ROW. Per ulteriori informazioni, consulta la sezione Tipo di acquisizione valore.

Installa il JAR del connettore Kafka

Dopo avere installato Zookeeper, Kafka e Kafka Connect, le attività rimanenti per il deployment di un connettore Kafka sono il download l'archivio dei plug-in del connettore, estrarre i file JAR nel tuo ambiente Kafka Connect e aggiungere con i file JAR nell'istanza plugin.path di Kafka Connect. Devi quindi riavviare il processo Kafka Connect per recuperare i nuovi file JAR.

Se lavori con container immutabili, puoi eseguire il pull delle immagini Immagini container di Debezium per Zookeeper, Kafka e Kafka Connect. L'immagine Kafka Connect ha Connettore Spanner preinstallato.

Per ulteriori informazioni su come installare i JAR del connettore Kafka basati su Debezium, vedi Installazione di Debezium.

configura il connettore Kafka

Di seguito è riportato un esempio della configurazione di un connettore Kafka che si collega a un flusso di modifiche denominato changeStreamAll nel il database users nell'istanza test-instance e nel progetto test-project.

"name": "spanner-connector",
"config": {
    "connector.class": "io.debezium.connector.spanner.SpannerConnector",
    "gcp.spanner.project.id": "test-project",
    "gcp.spanner.instance.id": "test-instance",
    "gcp.spanner.database.id": "users",
    "gcp.spanner.change.stream": "changeStreamAll",
    "gcp.spanner.credentials.json": "{"client_id": user@example.com}",
    "gcp.spanner.database.role": "cdc-role",
    "tasks.max": "10"
}

Questa configurazione contiene:

  • Il nome del connettore quando registrato con un servizio Kafka Connect.

  • Il nome di questa classe del connettore Spanner.

  • L'ID del progetto.

  • L'ID istanza Spanner.

  • L'ID database di Spanner.

  • Il nome della modifica in tempo reale.

  • L'oggetto JSON per la chiave dell'account di servizio.

  • (Facoltativo) Il ruolo di database Spanner da utilizzare.

  • Il numero massimo di attività.

Per un elenco completo delle proprietà del connettore, consulta Proprietà di configurazione del connettore Kafka.

aggiungi la configurazione del connettore a Kafka Connect

Per avviare un connettore Spanner:

  1. Creare una configurazione per il connettore Spanner.

  2. Per aggiungerlo, utilizza l'API REST Kaafka Connect. del connettore dati al cluster Kafka Connect.

Puoi inviare questa configurazione con un comando POST a Kafka Connect in esecuzione completamente gestito di Google Cloud. Per impostazione predefinita, il servizio Kafka Connect viene eseguito sulla porta 8083. Il servizio registra la configurazione e avvia l'attività del connettore. che si connette al database Spanner e trasmette i flussi di record di eventi di modifica Argomenti Kafka.

Di seguito è riportato un esempio di comando POST:

POST /connectors HTTP/1.1
Host: http://localhost:8083
Accept: application/json
{
  "name": "spanner-connector"
  "config": {
      "connector.class": "io.debezium.connector.spanner.SpannerConnector",
      "gcp.spanner.project.id": "test-project",
      "gcp.spanner.instance.id": "test-instance",
      "gcp.spanner.database.id": "users",
      "gcp.spanner.change.stream": "changeStreamAll",
      "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
      "heartbeat.interval.ms": "100",
      "tasks.max": "10"
  }
}

Esempio di risposta corretta:

HTTP/1.1 201 Created
Content-Type: application/json
{
    "name": "spanner-connector",
    "config": {
        "connector.class": "io.debezium.connector.spanner.SpannerConnector",
        "gcp.spanner.project.id": "test-project",
        "gcp.spanner.instance.id": "test-instance",
        "gcp.spanner.database.id": "users",
        "gcp.spanner.change.stream": "changeStreamAll",
        "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
        "heartbeat.interval.ms": "100",
        "tasks.max": "10"
    },
    "tasks": [
        { "connector": "spanner-connector", "task": 1 },
        { "connector": "spanner-connector", "task": 2 },
        { "connector": "spanner-connector", "task": 3 }
    ]
}

Aggiorna la configurazione del connettore Kafka

Per aggiornare la configurazione del connettore, invia un comando PUT all'istanza in esecuzione Servizio Kafka Connect con lo stesso nome connettore.

Supponiamo di avere un connettore in esecuzione con la configurazione nella sezione precedente. Di seguito è riportato un esempio di comando PUT:

PUT /connectors/spanner-connector/config HTTP/1.1
Host: http://localhost:8083
Accept: application/json
{
    "connector.class": "io.debezium.connector.spanner.SpannerConnector",
    "gcp.spanner.project.id": "test-project",
    "gcp.spanner.instance.id": "test-instance",
    "gcp.spanner.database.id": "users",
    "gcp.spanner.change.stream": "changeStreamAll",
    "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
    "heartbeat.interval.ms": "100",
    "tasks.max": "10"
}

Esempio di risposta corretta:

HTTP/1.1 200 OK
Content-Type: application/json
{
    "connector.class": "io.debezium.connector.spanner.SpannerConnector",
    "tasks.max": "10",
    "gcp.spanner.project.id": "test-project",
    "gcp.spanner.instance.id": "test-instance",
    "gcp.spanner.database.id": "users",
    "gcp.spanner.change.stream": "changeStreamAll",
    "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
    "heartbeat.interval.ms": "100",
    "tasks.max": "10"
}

Arresta il connettore Kafka

Per arrestare il connettore, invia un comando DELETE all'istanza in esecuzione Servizio Kafka Connect con lo stesso nome connettore.

Supponiamo di avere un connettore in esecuzione con la configurazione nella sezione precedente. Di seguito è riportato un esempio di comando DELETE:

DELETE /connectors/spanner-connector HTTP/1.1
Host: http://localhost:8083

Esempio di risposta corretta:

HTTP/1.1 204 No Content

Monitora il connettore Kafka

Oltre alle metriche standard Kafka Connect e Debezium, il connettore Kafka esporta le proprie metriche:

  • MilliSecondsLowWatermark: l'attuale filigrana bassa dell'attività del connettore in millisecondi. La la filigrana bassa indica l'ora T in cui è garantito che il connettore abbia hanno trasmesso tutti gli eventi con timestamp < T

  • MilliSecondsLowWatermarkLag: il ritardo della filigrana in basso rispetto all'ora corrente in millisecondi. hanno trasmesso tutti gli eventi con timestamp < T

  • LatencyLowWatermark<Variant>MilliSeconds: il ritardo della filigrana bassa rispetto all'ora attuale in millisecondi. Sono disponibili le varianti P50, P95, P99, Media, Min e Max.

  • LatencySpanner<Variant>MilliSeconds: latenza Spanner-commit-timestamp-to-connector-read. Sono disponibili le varianti P50, P95, P99, Media, Min, Max.

  • LatencyReadToEmit<Variant>MilliSeconds: latenza di Spanner-read-timestamp-to-connector-emit. Sono disponibili le varianti P50, P95, P99, Media, Min e Max.

  • LatencyCommitToEmit<Variant>tMilliSeconds: latenza di Spanner-commit-timestamp-to-connector-emit. Sono disponibili le varianti P50, P95, P99, Media, Min e Max.

  • LatencyCommitToPublish<Variant>MilliSeconds: la latenza Spanner-commit-timestamp-to Kafka-publish-timestamp. Sono disponibili le varianti P50, P95, P99, Media, Min, Max.

  • NumberOfChangeStreamPartitionsDetected: il numero totale di partizioni rilevato dall'attività del connettore attuale.

  • NumberOfChangeStreamQueriesIssued: il numero totale di query modifiche in tempo reale emessi dall'attività corrente.

  • NumberOfActiveChangeStreamQueries: il numero attivo di modifiche in tempo reale rilevate dall'attività del connettore attuale.

  • SpannerEventQueueCapacity: la capacità totale di StreamEventQueue, una coda che archivia gli elementi ricevuti dalle query di modifiche in tempo reale.

  • SpannerEventQueueCapacity: la capacità rimanente di StreamEventQueue.

  • TaskStateChangeEventQueueCapacity: la capacità totale di TaskStateChangeEventQueue, una coda che archivia gli eventi che si verificano nel connettore.

  • RemainingTaskStateChangeEventQueueCapacity: la capacità rimanente di TaskStateChangeEventQueue.

  • NumberOfActiveChangeStreamQueries: il numero attivo di modifiche in tempo reale rilevate dall'attività del connettore attuale.

Proprietà di configurazione del connettore Kafka

Di seguito sono riportate le proprietà di configurazione obbligatorie per il connettore:

  • name: nome univoco del connettore. Il tentativo di registrazione di nuovo con lo stesso nome causa un errore. Questa proprietà è obbligatoria per tutti i connettori Kafka Connect.

  • connector.class: il nome della classe Java per il connettore. Usa sempre il valore io.debezium.connector.spanner.SpannerConnector per il connettore Kafka.

  • tasks.max: il numero massimo di attività che devono essere create per questo connettore.

  • gcp.spanner.project.id: l'ID progetto

  • gcp.spanner.instance.id: l'ID istanza Spanner

  • gcp.spanner.database.id: l'ID database di Spanner

  • gcp.spanner.change.stream: il nome delle modifiche in tempo reale di Spanner

  • gcp.spanner.credentials.json: l'oggetto JSON della chiave dell'account di servizio.

  • gcp.spanner.credentials.path: il percorso del file dell'oggetto JSON chiave dell'account di servizio. Obbligatorio se il campo sopra non è stato fornito.

  • gcp.spanner.database.role : il ruolo del database Spanner per per gli utilizzi odierni. Questa operazione è necessaria solo se il flusso di modifiche è protetto con controllo dell'accesso granulare. Il ruolo del database deve avere il privilegio SELECT nella modifiche in tempo reale e il privilegio EXECUTE sulla lettura personalizzata. Per ulteriori informazioni, consulta Controllo granulare dell'controllo dell'accesso per le di streaming.

Le seguenti proprietà di configurazione avanzata hanno valori predefiniti utilizzabili nella maggior parte situazioni e pertanto raramente devono essere specificati nella configurazione del connettore:

  • gcp.spanner.low-watermark.enabled: indica se la filigrana bassa è attiva per il connettore. Il valore predefinito è false.

  • gcp.spanner.low-watermark.update-period.ms: l'intervallo di aggiornamento della filigrana bassa. Il valore predefinito è 1000 ms.

  • heartbeat.interval.ms: l'intervallo heartbeat di Spanner. Il valore predefinito è 300000 (cinque minuti).

  • gcp.spanner.start.time: l'ora di inizio del connettore. Il valore predefinito è l'ora corrente.

  • gcp.spanner.end.time: l'ora di fine del connettore. Il valore predefinito è infinito.

  • tables.exclude.list: le tabelle per cui escludere gli eventi di modifica. Il campo predefinito è vuoto.

  • tables.include.list: le tabelle per cui includere gli eventi di modifica. Se non viene compilato, vengono incluse tutte le tabelle. Il campo predefinito è vuoto.

  • gcp.spanner.stream.event.queue.capacity: capacità della coda degli eventi di Spanner. Il valore predefinito è 10.000.

  • connector.spanner.task.state.change.event.queue.capacity: la capacità della coda degli eventi di modifica dello stato dell'attività. Il valore predefinito è 1000.

  • connector.spanner.max.missed.heartbeats: il numero massimo di heartbeat mancanti per una query di modifiche in tempo reale prima che venga generata un'eccezione. Il valore predefinito è 10.

  • scaler.monitor.enabled: indica se la scalabilità automatica delle attività è abilitata. Il valore predefinito è false.

  • tasks.desired.partitions: il numero preferito di partizioni di modifiche in tempo reale per attività. Questo parametro è necessario per la scalabilità automatica delle attività. Il valore predefinito è 2.

  • tasks.min: il numero minimo di attività. Questo parametro è necessario per la scalabilità automatica delle attività. Il valore predefinito è 1.

  • connector.spanner.sync.topic: il nome dell'argomento sincronizzazione, un argomento del connettore interno utilizzato per archiviare le comunicazioni tra le attività. Il valore predefinito è _sync_topic_spanner_connector_connectorname se l'utente non ha fornito un nome.

  • connector.spanner.sync.poll.duration: la durata del sondaggio per l'argomento di sincronizzazione. Il valore predefinito è 500 ms.

  • connector.spanner.sync.request.timeout.ms: il timeout per le richieste all'argomento di sincronizzazione. Il valore predefinito è 5000 ms.

  • connector.spanner.sync.delivery.timeout.ms: il timeout per la pubblicazione nell'argomento di sincronizzazione. Il valore predefinito è 15.000 ms.

  • connector.spanner.sync.commit.offsets.interval.ms: l'intervallo in cui viene eseguito il commit degli offset per l'argomento di sincronizzazione. Il valore predefinito è 60.000 ms.

  • connector.spanner.sync.publisher.wait.timeout: l'intervallo con cui i messaggi vengono pubblicati nell'argomento di sincronizzazione. Il valore predefinito è 5 ms.

  • connector.spanner.rebalancing.topic: il nome dell'argomento di ribilanciamento. L'argomento di ribilanciamento è un argomento del connettore interno utilizzato per determinare l'attività. Il valore predefinito è _rebalancing_topic_spanner_connector_connectorname se l'utente non ha fornito un nome.

  • connector.spanner.rebalancing.poll.duration: la durata del sondaggio per l'argomento di ribilanciamento. Il valore predefinito è 5000 ms.

  • connector.spanner.rebalancing.commit.offsets.timeout: il timeout per il commit degli offset per l'argomento di ribilanciamento. Il valore predefinito è 5000 ms.

  • connector.spanner.rebalancing.commit.offsets.interval.ms: l'intervallo in cui viene eseguito il commit degli offset per l'argomento di sincronizzazione. Il valore predefinito è 60.000 ms.

  • connector.spanner.rebalancing.task.waiting.timeout: il tempo di attesa di un'attività prima di elaborare un evento di ribilanciamento. Il valore predefinito è 1000 ms.

Per un elenco ancora più dettagliato delle proprietà del connettore configurabili, consulta il repository GitHub.

Limitazioni