Questa pagina spiega come utilizzare il connettore Kafka per consumare e inoltrare i dati delle modifiche in tempo reale di Spanner.
Concetti principali
Di seguito sono descritti i concetti fondamentali del connettore Kafka.
Debezium
Debezium è un progetto open source che fornisce una piattaforma di streaming di dati a bassa latenza per il rilevamento dei dati modificati.
Connettore Kafka
Il connettore Kafka fornisce un'astrazione dell'API Spanner per pubblicare le modifiche in tempo reale di Spanner in Kafka. Con questo connettore, non devi gestire il ciclo di vita della partizione degli stream di modifiche, che è necessario quando utilizzi direttamente l'API Spanner.
Il connettore Kafka genera un evento di modifica per ogni modifica del record dei dati e invia i record degli eventi di modifica a valle in un argomento Kafka separato per ogni tabella monitorata dal flusso di modifiche. Un mod record di modifica dei dati rappresenta una singola modifica (inserimento, aggiornamento o eliminazione) acquisita. Un singolo record di modifica dei dati può contenere più di una modifica.
Output del connettore Kafka
Il connettore Kafka inoltra i record dei flussi di modifiche direttamente
in un argomento Kafka separato. Il nome dell'argomento di output deve essere connector_name
.table_name
.
Se l'argomento non esiste, il connettore Kafka ne crea automaticamente uno con questo nome.
Puoi anche configurare le trasformazioni di routing degli argomenti per reindirizzare i record agli argomenti specificati. Se vuoi utilizzare il routing degli argomenti, disattiva la funzionalità di watermark basso.
Ordinamento delle registrazioni
I record sono ordinati in base al timestamp del commit per chiave primaria negli argomenti Kafka. I record appartenenti a chiavi principali diverse non hanno garanzie di ordinamento. I record con la stessa chiave primaria vengono archiviati nella stessa partizione dell'argomento Kafka. Se vuoi elaborare intere transazioni, puoi anche utilizzare i campi server_transaction_id
e number_of_records_in_transaction
del record di modifica dei dati per assemblare una transazione Spanner.
Eventi di modifica
Il connettore Kafka genera un evento di modifica dei dati per ogni operazione INSERT
, UPDATE
e DELETE
. Ogni evento contiene una chiave e i valori per la riga modificata.
Puoi utilizzare i convertitori Kafka Connect per produrre eventi di modifica dei dati nei formati Protobuf
, AVRO
, JSON
o JSON Schemaless
. Se utilizzi un
convertitore Kafka Connect che produce schemi, l'evento contiene
schemi separati per la chiave e i valori. In caso contrario, l'evento contiene solo la chiave e i valori.
Lo schema della chiave non cambia mai. Lo schema dei valori è un'aggregazione di tutte le colonne monitorate dallo stream di modifiche dall'ora di inizio del connettore.
Se configuri il connettore per produrre eventi JSON, l'evento di modifica dell'output contiene cinque campi:
Il primo campo
schema
specifica uno schema Kafka Connect che descrive lo schema della chiave Spanner.Il primo campo
payload
ha la struttura descritta dal camposchema
precedente e contiene la chiave per la riga modificata.Il secondo campo
schema
specifica lo schema Kafka Connect che descrive lo schema della riga modificata.Il secondo campo
payload
ha la struttura descritta dal campopayload
precedente e contiene i dati effettivi della riga modificata.schema
Il campo
source
è un campo obbligatorio che descrive i metadati dell'origine per l'evento.
Di seguito è riportato un esempio di evento di modifica dei dati:
{ // The schema for the Spanner key. "schema": { "type": "struct", "name": "customers.Key", "optional": false, "fields": [ { "type": "int64", "optional": "false" "field": "false" } ] }, // The value of the Spanner key. "payload": { "id": "1" }, // The schema for the payload, which contains the before and after values // of the changed row. The schema for the payload contains all the // columns that the change stream has tracked since the connector start // time. "schema": { "type": "struct", "fields": [ { // The schema for the before values of the changed row. "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "id" }, { "type": "string", "optional": true, "field": "first_name" } ], "optional": true, "name": "customers.Value", "field": "before" }, { // The schema for the after values of the changed row. "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "id" }, { "type": "string", "optional": false, "field": "first_name" } ], "optional": true, "name": "customers.Value", "field": "after" }, { // The schema for the source metadata for the event. "type": "struct", "fields": [ { "type": "string", "optional": false, "field": "version" }, { "type": "string", "optional": false, "field": "connector" }, { "type": "string", "optional": false, "field": "name" }, { "type": "int64", "optional": false, "field": "ts_ms" }, { "type": "boolean", "optional": true, "default": false, "field": "snapshot" }, { "type": "string", "optional": false, "field": "db" }, { "type": "string", "optional": false, "field": "sequence" }, { "type": "string", "optional": false, "field": "project_id" }, { "type": "string", "optional": false, "field": "instance_id" }, { "type": "string", "optional": false, "field": "database_id" }, { "type": "string", "optional": false, "field": "change_stream_name" }, { "type": "string", "optional": true, "field": "table" } { "type": "string", "optional": true, "field": "server_transaction_id" } { "type": "int64", "optional": true, "field": "low_watermark" } { "type": "int64", "optional": true, "field": "read_at_timestamp" } { "type": "int64", "optional": true, "field": "number_of_records_in_transaction" } { "type": "string", "optional": true, "field": "transaction_tag" } { "type": "boolean", "optional": true, "field": "system_transaction" } { "type": "string", "optional": true, "field": "value_capture_type" } { "type": "string", "optional": true, "field": "partition_token" } { "type": "int32", "optional": true, "field": "mod_number" } { "type": "boolean", "optional": true, "field": "is_last_record_in_transaction_in_partition" } { "type": "int64", "optional": true, "field": "number_of_partitions_in_transaction" } ], "optional": false, "name": "io.debezium.connector.spanner.Source", "field": "source" }, ] { "type": "string", "optional": false, "field": "op" }, { "type": "int64", "optional": true, "field": "ts_ms" } ], "optional": false, "name": "connector_name.customers.Envelope" }, "payload": { // The values of the row before the event. "before": null, // The values of the row after the event. "after": { "id": 1, "first_name": "Anne", } }, // The source metadata. "source": { "version": "{debezium-version}", "connector": "spanner", "name": "spanner_connector", "ts_ms": 1670955531785, "snapshot": "false", "db": "database", "sequence": "1", "project_id": "project", "instance_id": "instance", "database_id": "database", "change_stream_name": "change_stream", "table": "customers", "server_transaction_id": "transaction_id", "low_watermark": 1670955471635, "read_at_timestamp": 1670955531791, "number_records_in_transaction": 2, "transaction_tag": "", "system_transaction": false, "value_capture_type": "OLD_AND_NEW_VALUES", "partition_token": "partition_token", "mod_number": 0, "is_last_record_in_transaction_in_partition": true, "number_of_partitions_in_transaction": 1 }, "op": "c", "ts_ms": 1559033904863 // }
Filigrana bassa
Il watermark basso descrive l'ora T in cui è garantito che il connettore Kafka abbia eseguito lo streaming e pubblicato in un argomento Kafka tutti gli eventi con timestamp < T.
Puoi attivare la marcatura temporale minima nel connettore Kafka utilizzando il parametro
gcp.spanner.low-watermark.enabled
. Questo parametro è disattivato per impostazione predefinita. Se la marcatura temporale minima è attivata, il campo low_watermark
nel record di variazione dei dati del flusso di modifiche viene compilato con il timestamp della marcatura temporale minima corrente del connettore Kafka.
Se non vengono prodotti record, il connettore Kafka invia periodicamente "heartbeat" con marcatura temporale agli argomenti di output Kafka rilevati dal connettore.
Questi battiti della filigrana sono record vuoti, ad eccezione del campolow_watermark
. Puoi quindi utilizzare il watermark basso per eseguire aggregazioni in base al tempo.
Ad esempio, puoi utilizzare la marcatura temporale minima per ordinare gli eventi in base al timestamp del commit per le chiavi principali.
Argomenti dei metadati
Il connettore Kafka, così come il framework Kafka Connect, crea diversi argomenti di metadati per memorizzare le informazioni relative al connettore. Sconsigliamo di modificare la configurazione o i contenuti di questi argomenti dei metadati.
Di seguito sono riportati gli argomenti dei metadati:
_consumer_offsets
: un argomento creato automaticamente da Kafka. Memorizza gli offset dei consumatori per i consumatori creati nel connettore Kafka._kafka-connect-offsets
: un argomento creato automaticamente da Kafka Connect. Memorizza gli offset del connettore._sync_topic_spanner_connector_connectorname
: un argomento creato automaticamente dal connettore. Memorizza i metadati relativi alle partizioni del flusso di modifiche._rebalancing_topic_spanner_connector_connectorname
: un argomento creato automaticamente dal connettore. Utilizzato per determinare l'attività del connettore._debezium-heartbeat.connectorname
: un argomento utilizzato per elaborare gli heartbeat delle modifiche in tempo reale di Spanner.
Runtime del connettore Kafka
Di seguito è descritto il runtime del connettore Kafka.
Scalabilità
Il connettore Kafka è scalabile orizzontalmente ed è eseguito su una o più attività distribuite su più worker Kafka Connect.
Garanzie di recapito dei messaggi
Il connettore Kafka supporta la garanzia di consegna almeno una volta.
Tolleranza di errore
Il connettore Kafka è tollerante ai guasti. Quando il connettore Kafka legge le modifiche e produce eventi, registra l'ultimo timestamp del commit elaborato per ogni partizione dello stream delle modifiche. Se il connettore Kafka si arresta per qualsiasi motivo (inclusi errori di comunicazione, problemi di rete o errori di software), al riavvio il connettore Kafka continua a trasmettere i record dal punto in cui si era interrotto.
Il connettore Kafka legge lo schema delle informazioni al timestamp di inizio del connettore Kafka per recuperare le informazioni dello schema. Per impostazione predefinita, Spanner non può leggere lo schema delle informazioni in base ai timestamp di lettura precedenti al periodo di conservazione della versione, che per impostazione predefinita è pari a un'ora. Se vuoi avviare il connettore da un periodo precedente a un'ora fa, devi aumentare il periodo di conservazione della versione del database.
Configura il connettore Kafka
Crea un flusso di modifiche
Per informazioni dettagliate su come creare un flusso di modifiche, consulta Creare un flusso di modifiche. Per continuare con i passaggi successivi, è necessaria un'istanza Spanner con uno stream di modifiche configurato.
Tieni presente che se vuoi che in ogni evento di modifica dei dati vengano restituite sia le colonne modificate sia quelle non modificate, utilizza il tipo di acquisizione dei valori NEW_ROW
. Per ulteriori informazioni, consulta Tipo di acquisizione del valore.
Installa il file JAR del connettore Kafka
Dopo aver installato Zookeeper, Kafka e Kafka Connect, le attività rimanenti per eseguire il deployment di un connettore Kafka sono scaricare
l'archivio del plug-in del connettore, estrarre i file JAR nell'ambiente Kafka Connect e aggiungere la
directory con i file JAR a plugin.path
di Kafka Connect.
Devi quindi riavviare il processo Kafka Connect per rilevare i nuovi file JAR.
Se utilizzi container immutabili, puoi estrarre le immagini dalle immagini container di Debezium per Zookeeper, Kafka e Kafka Connect. L'immagine Kafka Connect ha il connettore Spanner preinstallato.
Per saperne di più su come installare i file JAR del connettore Kafka basato su Debezium, consulta Installazione di Debezium.
Configura il connettore Kafka
Di seguito è riportato un esempio di configurazione di un connettore Kafka che si connette a uno stream di variazioni denominato changeStreamAll
nel database users
nell'istanza test-instance
e nel progetto test-project
.
"name": "spanner-connector", "config": { "connector.class": "io.debezium.connector.spanner.SpannerConnector", "gcp.spanner.project.id": "test-project", "gcp.spanner.instance.id": "test-instance", "gcp.spanner.database.id": "users", "gcp.spanner.change.stream": "changeStreamAll", "gcp.spanner.credentials.json": "{"client_id": user@example.com}", "gcp.spanner.database.role": "cdc-role", "tasks.max": "10" }
Questa configurazione contiene quanto segue:
Il nome del connettore quando è registrato con un servizio Kafka Connect.
Il nome di questa classe di connettore Spanner.
L'ID progetto.
L'ID istanza Spanner.
L'ID database Spanner.
Il nome del flusso di modifiche.
L'oggetto JSON per la chiave dell'account di servizio.
(Facoltativo) Il ruolo database Spanner da utilizzare.
Il numero massimo di attività.
Per un elenco completo delle proprietà del connettore, consulta Proprietà di configurazione del connettore Kafka.
Aggiungi la configurazione del connettore a Kafka Connect
Per iniziare a eseguire un connettore Spanner:
Crea una configurazione per il connettore Spanner.
Utilizza l'API REST di Kafka Connect per aggiungere la configurazione del connettore al cluster Kafka Connect.
Puoi inviare questa configurazione con un comando POST
a un servizio Kafka Connect in esecuzione. Per impostazione predefinita, il servizio Kafka Connect viene eseguito sulla porta 8083
.
Il servizio registra la configurazione e avvia l'attività del connettore che si connette al database Spanner e trasmette in streaming i record degli eventi di modifica agli argomenti Kafka.
Di seguito è riportato un esempio di comando POST
:
POST /connectors HTTP/1.1 Host: http://localhost:8083 Accept: application/json { "name": "spanner-connector" "config": { "connector.class": "io.debezium.connector.spanner.SpannerConnector", "gcp.spanner.project.id": "test-project", "gcp.spanner.instance.id": "test-instance", "gcp.spanner.database.id": "users", "gcp.spanner.change.stream": "changeStreamAll", "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }", "heartbeat.interval.ms": "100", "tasks.max": "10" } }
Esempio di risposta corretta:
HTTP/1.1 201 Created Content-Type: application/json { "name": "spanner-connector", "config": { "connector.class": "io.debezium.connector.spanner.SpannerConnector", "gcp.spanner.project.id": "test-project", "gcp.spanner.instance.id": "test-instance", "gcp.spanner.database.id": "users", "gcp.spanner.change.stream": "changeStreamAll", "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }", "heartbeat.interval.ms": "100", "tasks.max": "10" }, "tasks": [ { "connector": "spanner-connector", "task": 1 }, { "connector": "spanner-connector", "task": 2 }, { "connector": "spanner-connector", "task": 3 } ] }
Aggiorna la configurazione del connettore Kafka
Per aggiornare la configurazione del connettore, invia un comando PUT
al servizio Kafka Connect in esecuzione con lo stesso nome del connettore.
Supponiamo di avere un connettore in esecuzione con la configurazione della sezione precedente. Di seguito è riportato un esempio di comando PUT
:
PUT /connectors/spanner-connector/config HTTP/1.1 Host: http://localhost:8083 Accept: application/json { "connector.class": "io.debezium.connector.spanner.SpannerConnector", "gcp.spanner.project.id": "test-project", "gcp.spanner.instance.id": "test-instance", "gcp.spanner.database.id": "users", "gcp.spanner.change.stream": "changeStreamAll", "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }", "heartbeat.interval.ms": "100", "tasks.max": "10" }
Esempio di risposta corretta:
HTTP/1.1 200 OK Content-Type: application/json { "connector.class": "io.debezium.connector.spanner.SpannerConnector", "tasks.max": "10", "gcp.spanner.project.id": "test-project", "gcp.spanner.instance.id": "test-instance", "gcp.spanner.database.id": "users", "gcp.spanner.change.stream": "changeStreamAll", "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }", "heartbeat.interval.ms": "100", "tasks.max": "10" }
Interrompi il connettore Kafka
Per interrompere il connettore, invia un comando DELETE
al servizio Kafka Connect in esecuzione con lo stesso nome del connettore.
Supponiamo di avere un connettore in esecuzione con la configurazione della sezione precedente. Di seguito è riportato un esempio di comando DELETE
:
DELETE /connectors/spanner-connector HTTP/1.1 Host: http://localhost:8083
Esempio di risposta corretta:
HTTP/1.1 204 No Content
Monitora il connettore Kafka
Oltre alle metriche standard di Kafka Connect e Debezium, il connettore Kafka esporta le proprie metriche:
MilliSecondsLowWatermark
: la filigrana bassa corrente dell'attività del connettore in millisecondi. La marcatura temporale minima descrive l'ora T in cui è garantito che il connettore abbia eseguito lo streaming di tutti gli eventi con timestamp < TMilliSecondsLowWatermarkLag
: il ritardo del watermark basso rispetto all'ora corrente in millisecondi. è stato eseguito lo streaming di tutti gli eventi con timestamp < TLatencyLowWatermark<Variant>MilliSeconds
: il ritardo del watermark basso rispetto all'ora corrente in millisecondi. Sono disponibili le varianti P50, P95, P99, Media, Min e Max.LatencySpanner<Variant>MilliSeconds
: la latenza dal timestamp di commit di Spanner alla lettura del connettore. Sono disponibili le varianti P50, P95, P99, Media, Min, Max.LatencyReadToEmit<Variant>MilliSeconds
: la latenza da timestamp-lettura-Spanner a emissione-connector. Sono disponibili le varianti P50, P95, P99, Media, Min e Max.LatencyCommitToEmit<Variant>tMilliSeconds
: la latenza di emissione del timestamp del commit di Spanner al connettore. Sono disponibili le varianti P50, P95, P99, Media, Min e Max.LatencyCommitToPublish<Variant>MilliSeconds
: la latenza dal timestamp del commit di Spanner al timestamp della pubblicazione di Kafka. Sono disponibili le varianti P50, P95, P99, Media, Min, Max.NumberOfChangeStreamPartitionsDetected
: il numero totale di partizioni rilevate dall'attività del connettore corrente.NumberOfChangeStreamQueriesIssued
: il numero totale di query sul flusso di modifiche emesse dall'attività corrente.NumberOfActiveChangeStreamQueries
: il numero attivo di query sul stream di modifiche rilevate dall'attività del connettore corrente.SpannerEventQueueCapacity
: la capacità totale diStreamEventQueue
, una coda che memorizza gli elementi ricevuti dalle query sugli stream di modifiche.SpannerEventQueueCapacity
: la capacitàStreamEventQueue
rimanente.TaskStateChangeEventQueueCapacity
: la capacità totale diTaskStateChangeEventQueue
, una coda che memorizza gli eventi che si verificano nel connettore.RemainingTaskStateChangeEventQueueCapacity
: la capacitàTaskStateChangeEventQueue
rimanente.NumberOfActiveChangeStreamQueries
: il numero attivo di query sul stream di modifiche rilevate dall'attività del connettore corrente.
Proprietà di configurazione del connettore Kafka
Di seguito sono riportate le proprietà di configurazione obbligatorie per il connettore:
name
: nome univoco del connettore. Il tentativo di registrarsi di nuovo con lo stesso nome causa un errore. Questa proprietà è obbligatoria per tutti i connettori Kafka Connect.connector.class
: il nome della classe Java per il connettore. Utilizza sempre un valoreio.debezium.connector.spanner.SpannerConnector
per il connettore Kafka.tasks.max
: il numero massimo di attività da creare per questo connettore.gcp.spanner.project.id
: l'ID progettogcp.spanner.instance.id
: l'ID istanza Spannergcp.spanner.database.id
: l'ID database Spannergcp.spanner.change.stream
: il nome dello stream di modifiche Spannergcp.spanner.credentials.json
: l'oggetto JSON della chiave dell'account di servizio.gcp.spanner.credentials.path
: il percorso del file dell'oggetto JSON della chiave dell'account di servizio. Obbligatorio se non viene fornito il campo precedente.gcp.spanner.database.role
: il ruolo database Spanner da utilizzare. Questo è necessario solo quando il flusso di modifiche è protetto con un controllo di accesso granulare. Il ruolo database deve disporre del privilegioSELECT
sul flusso di modifiche e del privilegioEXECUTE
sulla funzione di lettura del flusso di modifiche. Per ulteriori informazioni, consulta Controllo dell'accesso granulare per i flussi di modifiche.
Le seguenti proprietà di configurazione avanzata hanno valori predefiniti che funzionano nella maggior parte delle situazioni e quindi raramente devono essere specificati nella configurazione del connettore:
gcp.spanner.low-watermark.enabled
: indica se la filigrana bassa è abilitata per il connettore. Il valore predefinito è false.gcp.spanner.low-watermark.update-period.ms
: l'intervallo di aggiornamento della soglia minima. Il valore predefinito è 1000 ms.heartbeat.interval.ms
: l'intervallo del battito cardiaco di Spanner. Il valore predefinito è 300000 (cinque minuti).gcp.spanner.start.time
: l'ora di inizio del connettore. Il valore predefinito è l'ora corrente.gcp.spanner.end.time
: l'ora di fine del connettore. Il valore predefinito è infinito.tables.exclude.list
: le tabelle per le quali escludere gli eventi di modifica. Il valore predefinito è vuoto.tables.include.list
: le tabelle per cui includere gli eventi di modifica. Se non è compilato, vengono incluse tutte le tabelle. Il valore predefinito è vuoto.gcp.spanner.stream.event.queue.capacity
: la capacità della coda di eventi Spanner. Il valore predefinito è 10000.connector.spanner.task.state.change.event.queue.capacity
: la capacità della coda di eventi di modifica dello stato dell'attività. Il valore predefinito è 1000.connector.spanner.max.missed.heartbeats
: il numero massimo di heartbeat mancanti per una query sul flusso di modifiche prima che venga lanciata un'eccezione. Il valore predefinito è 10.scaler.monitor.enabled
: indica se la scalabilità automatica delle attività è abilitata. Il valore predefinito è false.tasks.desired.partitions
: il numero preferito di partizioni dei flussi di modifiche per attività. Questo parametro è necessario per la scalabilità automatica delle attività. Il valore predefinito è 2.tasks.min
: il numero minimo di attività. Questo parametro è necessario per la scalabilità automatica delle attività. Il valore predefinito è 1.connector.spanner.sync.topic
: il nome dell'argomento di sincronizzazione, un argomento del connettore interno utilizzato per archiviare la comunicazione tra le attività. Il valore predefinito è_sync_topic_spanner_connector_connectorname
se l'utente non ha fornito un nome.connector.spanner.sync.poll.duration
: la durata del sondaggio per l'argomento di sincronizzazione. Il valore predefinito è 500 ms.connector.spanner.sync.request.timeout.ms
: il timeout per le richieste all'argomento di sincronizzazione. Il valore predefinito è 5000 ms.connector.spanner.sync.delivery.timeout.ms
: il timeout per la pubblicazione nell'argomento di sincronizzazione. Il valore predefinito è 15000 ms.connector.spanner.sync.commit.offsets.interval.ms
: l'intervallo a cui vengono committati gli offset per l'argomento di sincronizzazione. Il valore predefinito è 60000 ms.connector.spanner.sync.publisher.wait.timeout
: l'intervallo a cui i messaggi vengono pubblicati nell'argomento di sincronizzazione. Il valore predefinito è 5 ms.connector.spanner.rebalancing.topic
: il nome dell'argomento di riequilibrio. L'argomento di riequilibrio è un argomento del connettore interno utilizzato per determinare l'attività delle attività. Il valore predefinito è_rebalancing_topic_spanner_connector_connectorname
se l'utente non ha fornito un nome.connector.spanner.rebalancing.poll.duration
: la durata del sondaggio per l'argomento di riequilibrio. Il valore predefinito è 5000 ms.connector.spanner.rebalancing.commit.offsets.timeout
: il timeout per l'applicazione degli offset per l'argomento di riequilibrio. Il valore predefinito è 5000 ms.connector.spanner.rebalancing.commit.offsets.interval.ms
: l'intervallo a cui vengono committati gli offset per l'argomento di sincronizzazione. Il valore predefinito è 60000 ms.connector.spanner.rebalancing.task.waiting.timeout
: la durata del tempo di attesa di un'attività prima dell'elaborazione di un evento di riequilibrio. Il valore predefinito è 1000 ms.
Per un elenco ancora più dettagliato delle proprietà del connettore configurabili, consulta il repository GitHub.
Limitazioni
Il connettore non supporta gli eventi snapshot in streaming.
Se il watermarking è attivato nel connettore, non puoi configurare le trasformazioni di routing degli argomenti Debezium.