Questa pagina spiega come utilizzare il connettore Kafka per utilizzare e inoltrare i flussi di modifiche di Spanner.
Concetti principali
Di seguito sono descritti i concetti principali del connettore Kafka.
Debezium
Debezium è un progetto open source che fornisce una piattaforma per flussi di dati a bassa latenza per l'acquisizione dei dati dei cambiamenti.
Connettore Kafka
Il connettore Kafka fornisce un'astrazione sull'API Spanner per pubblicare modifiche in tempo reale Spanner in Kafka. Con questo connettore, non devi gestire il ciclo di vita della partizione delle modifiche in tempo reale, che è necessario quando utilizzi direttamente l'API Spanner.
Il connettore Kafka produce un evento di modifica per ogni mod di record delle modifiche dei dati e invia record di eventi di modifica a valle in un argomento Kafka separato per ogni tabella monitorata con flussi di modifiche. Una mod del record di modifiche dei dati rappresenta una singola modifica (inserimento, aggiornamento o eliminazione) acquisita. Un singolo record di modifiche dati può contenere più di una mod.
Output connettore Kafka
Il connettore Kafka inoltra i record modifiche in tempo realee direttamente a un argomento Kafka separato. Il nome dell'argomento di output deve essere connector_name
.table_name
.
Se l'argomento non esiste, il connettore Kafka crea automaticamente un argomento con quel nome.
Puoi anche configurare trasformazioni del routing degli argomenti per reindirizzare i record in argomenti da te specificati. Se vuoi utilizzare il routing degli argomenti, disattiva la funzionalità della filigrana bassa.
Ordine di dischi
I record sono ordinati per timestamp di commit per chiave primaria
negli argomenti Kafka. I record appartenenti a chiavi primarie diverse non hanno
garanzie di ordine. I record con la stessa chiave primaria sono archiviati
nella stessa partizione di argomento Kafka. Se vuoi elaborare intere transazioni, puoi
anche utilizzare i campi server_transaction_id
e number_of_records_in_transaction
del record delle modifiche dei dati
per combinare una transazione Spanner.
Eventi di modifica
Il connettore Kafka genera un evento di modifica dei dati per ogni operazione INSERT
, UPDATE
e DELETE
. Ogni evento contiene una chiave e i valori per la riga modificata.
Puoi utilizzare gli utenti che hanno completato una conversione di Kafka Connect per produrre eventi di modifica dei dati nei formati Protobuf
, AVRO
, JSON
o JSON Schemaless
. Se utilizzi un
convertitore Kafka Connect che produce schemi, l'evento contiene
schemi separati per le chiavi e i valori. In caso contrario, l'evento contiene
solo la chiave e i valori.
Lo schema della chiave non cambia mai. Lo schema dei valori è una combinazione di tutte le colonne monitorate dalla modifica in tempo reale dall'ora di inizio del connettore.
Se configuri il connettore per produrre eventi JSON, l'evento di modifica dell'output contiene cinque campi:
Il primo campo
schema
specifica uno schema Kafka Connect che descrive lo schema chiave di Spanner.Il primo campo
payload
ha la struttura descritta dal camposchema
precedente e contiene la chiave per la riga modificata.Il secondo campo
schema
specifica lo schema Kafka Connect che descrive lo schema per la riga modificata.Il secondo campo
payload
ha la struttura descritta dal camposchema
precedente e contiene i dati effettivi per la riga modificata.Il campo
source
è un campo obbligatorio che descrive i metadati di origine dell'evento.
Di seguito è riportato un esempio di evento di modifica dei dati:
{ // The schema for the Spanner key. "schema": { "type": "struct", "name": "customers.Key", "optional": false, "fields": [ { "type": "int64", "optional": "false" "field": "false" } ] }, // The value of the Spanner key. "payload": { "id": "1" }, // The schema for the payload, which contains the before and after values // of the changed row. The schema for the payload contains all the // columns that the change stream has tracked since the connector start // time. "schema": { "type": "struct", "fields": [ { // The schema for the before values of the changed row. "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "id" }, { "type": "string", "optional": true, "field": "first_name" } ], "optional": true, "name": "customers.Value", "field": "before" }, { // The schema for the after values of the changed row. "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "id" }, { "type": "string", "optional": false, "field": "first_name" } ], "optional": true, "name": "customers.Value", "field": "after" }, { // The schema for the source metadata for the event. "type": "struct", "fields": [ { "type": "string", "optional": false, "field": "version" }, { "type": "string", "optional": false, "field": "connector" }, { "type": "string", "optional": false, "field": "name" }, { "type": "int64", "optional": false, "field": "ts_ms" }, { "type": "boolean", "optional": true, "default": false, "field": "snapshot" }, { "type": "string", "optional": false, "field": "db" }, { "type": "string", "optional": false, "field": "sequence" }, { "type": "string", "optional": false, "field": "project_id" }, { "type": "string", "optional": false, "field": "instance_id" }, { "type": "string", "optional": false, "field": "database_id" }, { "type": "string", "optional": false, "field": "change_stream_name" }, { "type": "string", "optional": true, "field": "table" } { "type": "string", "optional": true, "field": "server_transaction_id" } { "type": "int64", "optional": true, "field": "low_watermark" } { "type": "int64", "optional": true, "field": "read_at_timestamp" } { "type": "int64", "optional": true, "field": "number_of_records_in_transaction" } { "type": "string", "optional": true, "field": "transaction_tag" } { "type": "boolean", "optional": true, "field": "system_transaction" } { "type": "string", "optional": true, "field": "value_capture_type" } { "type": "string", "optional": true, "field": "partition_token" } { "type": "int32", "optional": true, "field": "mod_number" } { "type": "boolean", "optional": true, "field": "is_last_record_in_transaction_in_partition" } { "type": "int64", "optional": true, "field": "number_of_partitions_in_transaction" } ], "optional": false, "name": "io.debezium.connector.spanner.Source", "field": "source" }, ] { "type": "string", "optional": false, "field": "op" }, { "type": "int64", "optional": true, "field": "ts_ms" } ], "optional": false, "name": "connector_name.customers.Envelope" }, "payload": { // The values of the row before the event. "before": null, // The values of the row after the event. "after": { "id": 1, "first_name": "Anne", } }, // The source metadata. "source": { "version": "{debezium-version}", "connector": "spanner", "name": "spanner_connector", "ts_ms": 1670955531785, "snapshot": "false", "db": "database", "sequence": "1", "project_id": "project", "instance_id": "instance", "database_id": "database", "change_stream_name": "change_stream", "table": "customers", "server_transaction_id": "transaction_id", "low_watermark": 1670955471635, "read_at_timestamp": 1670955531791, "number_records_in_transaction": 2, "transaction_tag": "", "system_transaction": false, "value_capture_type": "OLD_AND_NEW_VALUES", "partition_token": "partition_token", "mod_number": 0, "is_last_record_in_transaction_in_partition": true, "number_of_partitions_in_transaction": 1 }, "op": "c", "ts_ms": 1559033904863 // }
Watermark basso
La filigrana bassa descrive il momento T in cui viene garantito che il connettore Kafka sia trasmesso in streaming e pubblicato in un argomento Kafka per tutti gli eventi con timestamp < T.
Puoi abilitare la filigrana bassa nel connettore Kafka utilizzando il parametro gcp.spanner.low-watermark.enabled
. Questo parametro è disattivato per impostazione predefinita. Se è abilitata la filigrana bassa, il campo low_watermark
nel record delle modifiche ai dati delle modifiche in tempo reale viene compilato con l'attuale timestamp della filigrana bassa del connettore Kafka.
Se non viene generato alcun record, il connettore Kafka invia periodicamente "heartbeat" della filigrana agli argomenti di output Kafka rilevati dal connettore.
Questi heartbeat della filigrana sono record vuoti, ad eccezione del campo low_watermark
. Puoi quindi utilizzare la filigrana bassa per eseguire aggregazioni basate sul tempo.
Ad esempio, puoi utilizzare la filigrana bassa per ordinare gli eventi in base al timestamp di commit nelle chiavi primarie.
Argomenti dei metadati
Il connettore Kafka e il framework Kafka Connect creano diversi argomenti di metadati per archiviare le informazioni relative al connettore. Non è consigliabile modificare la configurazione o il contenuto di questi argomenti dei metadati.
Di seguito sono riportati gli argomenti dei metadati:
_consumer_offsets
: un argomento creato automaticamente da Kafka. Archivia le compensazioni dei consumer per i consumatori creati nel connettore Kafka._kafka-connect-offsets
: un argomento creato automaticamente da Kafka Connect. Archivia gli offset dei connettori._sync_topic_spanner_connector_connectorname
: un argomento creato automaticamente dal connettore. Archivia i metadati relativi alle partizioni delle modifiche in tempo reale._rebalancing_topic_spanner_connector_connectorname
: un argomento creato automaticamente dal connettore. Utilizzato per determinare l'attività del connettore._debezium-heartbeat.connectorname
: un argomento utilizzato per elaborare gli heartbeat di modifiche in tempo reale di Spanner.
Runtime connettore Kafka
Di seguito viene descritto il runtime del connettore Kafka.
Scalabilità
Il connettore Kafka è scalabile orizzontalmente ed è eseguito su una o più attività distribuite tra più worker Kafka Connect.
Garanzie di consegna dei messaggi
Il connettore Kafka supporta la garanzia di consegna "at-least-once".
Tolleranza di errore
Il connettore Kafka è tollerante agli errori. Quando il connettore Kafka legge le modifiche e produce eventi, registra il timestamp dell'ultimo commit elaborato per ogni partizione del flusso di modifiche. Se il connettore Kafka si arresta per qualsiasi motivo (compresi errori di comunicazione, problemi di rete o errori del software), al riavvio il connettore Kafka continua a trasferire i record dal punto in cui era stato interrotto.
Il connettore Kafka legge lo schema di informazioni in corrispondenza del timestamp di inizio del connettore Kafka per recuperare le informazioni dello schema. Per impostazione predefinita, Spanner non può leggere lo schema delle informazioni ai timestamp di lettura precedenti al periodo di conservazione della versione, che per impostazione predefinita è di un'ora. Se vuoi avviare il connettore da meno di un'ora nel passato, devi incrementare il periodo di conservazione delle versioni del database.
configura il connettore Kafka
Crea un flusso di modifiche
Per maggiori dettagli su come creare un flusso di modifiche, consulta Creare un flusso di modifiche. Per continuare con i passaggi successivi, è necessaria un'istanza Spanner con un flusso di modifiche configurato.
Tieni presente che se vuoi che per ogni evento di modifica dei dati vengano restituite colonne modificate e non modificate, utilizza il tipo di acquisizione valore NEW_ROW
. Per ulteriori informazioni, consulta la sezione Tipo di acquisizione valore.
Installa il JAR del connettore Kafka
Dopo avere installato Zookeeper, Kafka e Kafka Connect, le restanti attività per eseguire il deployment di un connettore Kafka sono scaricare l'archivio dei plug-in del connettore, estrarre i file JAR nel tuo ambiente Kafka Connect e aggiungere la directory con i file JAR alla plugin.path
di Kafka Connect.
Devi quindi riavviare il processo Kafka Connect per recuperare i nuovi file JAR.
Se utilizzi container immutabili, puoi eseguire il pull delle immagini dalle immagini container di Debezium per Zookeeper, Kafka e Kafka Connect. L'immagine Kafka Connect ha il connettore Spanner preinstallato.
Per maggiori informazioni su come installare i JAR del connettore Kafka basati su Debezium, consulta Installazione di Debezium.
configura il connettore Kafka
Di seguito è riportato un esempio della configurazione di un connettore Kafka che si connette a un flusso di modifiche denominato changeStreamAll
nel database users
nell'istanza test-instance
e nel progetto test-project
.
"name": "spanner-connector", "config": { "connector.class": "io.debezium.connector.spanner.SpannerConnector", "gcp.spanner.project.id": "test-project", "gcp.spanner.instance.id": "test-instance", "gcp.spanner.database.id": "users", "gcp.spanner.change.stream": "changeStreamAll", "gcp.spanner.credentials.json": "{"client_id": user@example.com}", "gcp.spanner.database.role": "cdc-role", "tasks.max": "10" }
Questa configurazione contiene:
Il nome del connettore quando registrato con un servizio Kafka Connect.
Il nome di questa classe del connettore Spanner.
L'ID del progetto.
L'ID istanza Spanner.
L'ID database di Spanner.
Il nome della modifica in tempo reale.
L'oggetto JSON per la chiave dell'account di servizio.
(Facoltativo) Il ruolo di database Spanner da utilizzare.
Il numero massimo di attività.
Per un elenco completo delle proprietà del connettore, consulta Proprietà di configurazione del connettore Kafka.
aggiungi la configurazione del connettore a Kafka Connect
Per avviare un connettore Spanner:
Creare una configurazione per il connettore Spanner.
Utilizza l'API REST Kafka Connect per aggiungere la configurazione del connettore al cluster Kafka Connect.
Puoi inviare questa configurazione con un comando POST
a un servizio Kafka Connect in esecuzione. Per impostazione predefinita, il servizio Kafka Connect viene eseguito sulla porta 8083
.
Il servizio registra la configurazione e avvia l'attività del connettore che si connette al database Spanner e trasmette i flussi di record di eventi di modifica agli argomenti Kafka.
Di seguito è riportato un esempio di comando POST
:
POST /connectors HTTP/1.1 Host: http://localhost:8083 Accept: application/json { "name": "spanner-connector" "config": { "connector.class": "io.debezium.connector.spanner.SpannerConnector", "gcp.spanner.project.id": "test-project", "gcp.spanner.instance.id": "test-instance", "gcp.spanner.database.id": "users", "gcp.spanner.change.stream": "changeStreamAll", "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }", "heartbeat.interval.ms": "100", "tasks.max": "10" } }
Esempio di risposta corretta:
HTTP/1.1 201 Created Content-Type: application/json { "name": "spanner-connector", "config": { "connector.class": "io.debezium.connector.spanner.SpannerConnector", "gcp.spanner.project.id": "test-project", "gcp.spanner.instance.id": "test-instance", "gcp.spanner.database.id": "users", "gcp.spanner.change.stream": "changeStreamAll", "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }", "heartbeat.interval.ms": "100", "tasks.max": "10" }, "tasks": [ { "connector": "spanner-connector", "task": 1 }, { "connector": "spanner-connector", "task": 2 }, { "connector": "spanner-connector", "task": 3 } ] }
Aggiorna la configurazione del connettore Kafka
Per aggiornare la configurazione del connettore, invia un comando PUT
al servizio Kafka Connect in esecuzione con lo stesso nome connettore.
Supponi di avere un connettore in esecuzione con la configurazione della sezione precedente. Di seguito è riportato un esempio di comando PUT
:
PUT /connectors/spanner-connector/config HTTP/1.1 Host: http://localhost:8083 Accept: application/json { "connector.class": "io.debezium.connector.spanner.SpannerConnector", "gcp.spanner.project.id": "test-project", "gcp.spanner.instance.id": "test-instance", "gcp.spanner.database.id": "users", "gcp.spanner.change.stream": "changeStreamAll", "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }", "heartbeat.interval.ms": "100", "tasks.max": "10" }
Esempio di risposta corretta:
HTTP/1.1 200 OK Content-Type: application/json { "connector.class": "io.debezium.connector.spanner.SpannerConnector", "tasks.max": "10", "gcp.spanner.project.id": "test-project", "gcp.spanner.instance.id": "test-instance", "gcp.spanner.database.id": "users", "gcp.spanner.change.stream": "changeStreamAll", "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }", "heartbeat.interval.ms": "100", "tasks.max": "10" }
Arresta il connettore Kafka
Per arrestare il connettore, invia un comando DELETE
al servizio Kafka Connect in esecuzione con lo stesso nome connettore.
Supponi di avere un connettore in esecuzione con la configurazione della sezione precedente. Di seguito è riportato un esempio di comando DELETE
:
DELETE /connectors/spanner-connector HTTP/1.1 Host: http://localhost:8083
Esempio di risposta corretta:
HTTP/1.1 204 No Content
Monitora il connettore Kafka
Oltre alle metriche standard Kafka Connect e Debezium, il connettore Kafka esporta le proprie metriche:
MilliSecondsLowWatermark
: l'attuale filigrana bassa dell'attività del connettore in millisecondi. La filigrane bassa descrive il momento T in cui è garantito che il connettore esegua lo streaming di tutti gli eventi con timestamp < TMilliSecondsLowWatermarkLag
: il ritardo della filigrana in basso rispetto all'ora attuale in millisecondi. Tutti gli eventi sono stati trasmessi con timestamp < TLatencyLowWatermark<Variant>MilliSeconds
: il ritardo della filigrana in basso rispetto all'ora attuale in millisecondi. Sono disponibili le varianti P50, P95, P99, Media, Min e Max.LatencySpanner<Variant>MilliSeconds
: latenza Spanner-commit-timestamp-to-connector-read. Sono disponibili le varianti P50, P95, P99, Media, Min, Max.LatencyReadToEmit<Variant>MilliSeconds
: la latenza Spanner-read-timestamp-to-connector-emit. Sono disponibili le varianti P50, P95, P99, Media, Min e Max.LatencyCommitToEmit<Variant>tMilliSeconds
: latenza di Spanner-commit-timestamp-to-connector-emit. Sono disponibili le varianti P50, P95, P99, Media, Min e Max.LatencyCommitToPublish<Variant>MilliSeconds
: la latenza Spanner-commit-timestamp-to Kafka-publish-timestamp. Sono disponibili le varianti P50, P95, P99, Media, Min, Max.NumberOfChangeStreamPartitionsDetected
: il numero totale di partizioni rilevate dall'attività del connettore attuale.NumberOfChangeStreamQueriesIssued
: il numero totale di query di modifiche in tempo reale emesse dall'attività corrente.NumberOfActiveChangeStreamQueries
: il numero attivo di query di modifiche in tempo reale rilevate dall'attività del connettore attuale.SpannerEventQueueCapacity
: la capacità totale diStreamEventQueue
, una coda che archivia gli elementi ricevuti dalle query di modifiche in tempo reale.SpannerEventQueueCapacity
: la capacità rimanente diStreamEventQueue
.TaskStateChangeEventQueueCapacity
: la capacità totale diTaskStateChangeEventQueue
, una coda che archivia gli eventi che si verificano nel connettore.RemainingTaskStateChangeEventQueueCapacity
: la capacità rimanente diTaskStateChangeEventQueue
.NumberOfActiveChangeStreamQueries
: il numero attivo di query di modifiche in tempo reale rilevate dall'attività del connettore attuale.
Proprietà di configurazione del connettore Kafka
Di seguito sono riportate le proprietà di configurazione obbligatorie per il connettore:
name
: nome univoco del connettore. Il tentativo di registrazione di nuovo con lo stesso nome causa un errore. Questa proprietà è obbligatoria per tutti i connettori Kafka Connect.connector.class
: il nome della classe Java per il connettore. Usa sempre il valoreio.debezium.connector.spanner.SpannerConnector
per il connettore Kafka.tasks.max
: il numero massimo di attività che devono essere create per questo connettore.gcp.spanner.project.id
: l'ID progettogcp.spanner.instance.id
: l'ID istanza Spannergcp.spanner.database.id
: l'ID database di Spannergcp.spanner.change.stream
: il nome delle modifiche in tempo reale di Spannergcp.spanner.credentials.json
: l'oggetto JSON della chiave dell'account di servizio.gcp.spanner.credentials.path
: il percorso del file dell'oggetto JSON chiave dell'account di servizio. Obbligatorio se il campo sopra non è stato fornito.gcp.spanner.database.role
: il ruolo di database Spanner da utilizzare. Questa operazione è necessaria solo se il flusso di modifiche è protetto con controllo dell'accesso granulare. Il ruolo del database deve avere il privilegioSELECT
per il flusso di modifiche e il privilegioEXECUTE
per la funzione di lettura della modifica in tempo reale. Per ulteriori informazioni, consulta Controllo dell'controllo dell'accesso granulare per i flussi di modifiche.
Le seguenti proprietà di configurazione avanzata hanno valori predefiniti che funzionano nella maggior parte delle situazioni e pertanto raramente devono essere specificati nella configurazione del connettore:
gcp.spanner.low-watermark.enabled
: indica se la filigrana bassa è attiva per il connettore. Il valore predefinito è false.gcp.spanner.low-watermark.update-period.ms
: l'intervallo di aggiornamento della filigrana bassa. Il valore predefinito è 1000 ms.heartbeat.interval.ms
: l'intervallo heartbeat di Spanner. Il valore predefinito è 300000 (cinque minuti).gcp.spanner.start.time
: l'ora di inizio del connettore. Il valore predefinito è l'ora attuale.gcp.spanner.end.time
: l'ora di fine del connettore. Il valore predefinito è infinito.tables.exclude.list
: le tabelle per cui escludere gli eventi di modifica. Il campo predefinito è vuoto.tables.include.list
: le tabelle per cui includere gli eventi di modifica. Se non viene compilato, vengono incluse tutte le tabelle. Il campo predefinito è vuoto.gcp.spanner.stream.event.queue.capacity
: capacità della coda degli eventi di Spanner. Il valore predefinito è 10.000.connector.spanner.task.state.change.event.queue.capacity
: la capacità della coda degli eventi di modifica dello stato dell'attività. Il valore predefinito è 1000.connector.spanner.max.missed.heartbeats
: il numero massimo di heartbeat mancanti per una query di modifiche in tempo reale prima che venga generata un'eccezione. Il valore predefinito è 10.scaler.monitor.enabled
: indica se la scalabilità automatica delle attività è abilitata. Il valore predefinito è false.tasks.desired.partitions
: il numero preferito di partizioni di modifiche in tempo reale per attività. Questo parametro è necessario per la scalabilità automatica delle attività. Il valore predefinito è 2.tasks.min
: il numero minimo di attività. Questo parametro è necessario per la scalabilità automatica delle attività. Il valore predefinito è 1.connector.spanner.sync.topic
: il nome dell'argomento sincronizzazione, un argomento del connettore interno utilizzato per archiviare le comunicazioni tra le attività. Il valore predefinito è_sync_topic_spanner_connector_connectorname
se l'utente non ha fornito un nome.connector.spanner.sync.poll.duration
: la durata del sondaggio per l'argomento di sincronizzazione. Il valore predefinito è 500 ms.connector.spanner.sync.request.timeout.ms
: il timeout per le richieste all'argomento di sincronizzazione. Il valore predefinito è 5000 ms.connector.spanner.sync.delivery.timeout.ms
: il timeout per la pubblicazione nell'argomento di sincronizzazione. Il valore predefinito è 15.000 ms.connector.spanner.sync.commit.offsets.interval.ms
: l'intervallo in cui viene eseguito il commit degli offset per l'argomento di sincronizzazione. Il valore predefinito è 60.000 ms.connector.spanner.sync.publisher.wait.timeout
: l'intervallo con cui i messaggi vengono pubblicati nell'argomento di sincronizzazione. Il valore predefinito è 5 ms.connector.spanner.rebalancing.topic
: il nome dell'argomento di ribilanciamento. L'argomento di ribilanciamento è un argomento del connettore interno utilizzato per determinare l'attività. Il valore predefinito è_rebalancing_topic_spanner_connector_connectorname
se l'utente non ha fornito un nome.connector.spanner.rebalancing.poll.duration
: la durata del sondaggio per l'argomento di ribilanciamento. Il valore predefinito è 5000 ms.connector.spanner.rebalancing.commit.offsets.timeout
: il timeout per il commit degli offset per l'argomento di ribilanciamento. Il valore predefinito è 5000 ms.connector.spanner.rebalancing.commit.offsets.interval.ms
: l'intervallo in cui viene eseguito il commit degli offset per l'argomento di sincronizzazione. Il valore predefinito è 60.000 ms.connector.spanner.rebalancing.task.waiting.timeout
: il tempo di attesa di un'attività prima di elaborare un evento di ribilanciamento. Il valore predefinito è 1000 ms.
Per un elenco ancora più dettagliato delle proprietà del connettore configurabili, consulta il repository GitHub.
Limitazioni
Il connettore non supporta lo streaming di eventi snapshot.
Se la filigrana è abilitata nel connettore, non puoi configurare le trasformazioni di routing degli argomenti Debezium.