Questa pagina spiega come utilizzare il connettore Kafka per utilizzare e inoltrare i dati dei flussi di modifiche di Spanner.
Concetti principali
Di seguito sono descritti i concetti fondamentali del connettore Kafka.
Debezium
Debezium è un progetto open source che offre una piattaforma per flussi di dati a bassa latenza per l'acquisizione dei dati delle modifiche.
Connettore Kafka
Il connettore Kafka fornisce un'astrazione sull'API Spanner per e pubblicare le modifiche in tempo reale di Spanner in Kafka. Con questo connettore, non devi gestire il ciclo di vita della partizione delle modifiche in tempo reale, che è necessario se utilizzi direttamente l'API Spanner.
Il connettore Kafka produce un evento di modifica per ogni mod del record di modifiche dei dati e invia record di eventi di modifica downstream in un argomento Kafka separato per ogni tabella monitorata con flussi di modifiche. Una mod del record di modifiche dei dati rappresenta una singola modifica (inserimento, aggiornamento o eliminazione) acquisita. Un singolo record di modifiche dati può contenere più di una mod.
Output del connettore Kafka
Il connettore Kafka inoltra i record dei flussi di modifiche direttamente
in un argomento Kafka separato. Il nome dell'argomento di output deve essere connector_name
.table_name
.
Se l'argomento non esiste, il connettore Kafka crea automaticamente un argomento con quel nome.
Puoi anche configurare trasformazioni del routing degli argomenti per reindirizzare i record in argomenti da te specificati. Se vuoi utilizzare il routing degli argomenti, disattiva la funzionalità della filigrana bassa.
Ordine di dischi
I record sono ordinati in base al timestamp del commit per chiave primaria negli argomenti Kafka. I record appartenenti a chiavi primarie diverse non hanno
garanzie di ordinazione. I record con la stessa chiave primaria vengono archiviati nella stessa partizione dell'argomento Kafka. Se vuoi elaborare intere transazioni, puoi anche utilizzare i campi server_transaction_id
e number_of_records_in_transaction
del record di modifica dei dati per assemblare una transazione Spanner.
Eventi di modifica
Il connettore Kafka genera un evento di modifica dei dati per ogni INSERT
, UPDATE
,
e l'operazione DELETE
. Ogni evento contiene una chiave e i valori per la riga modificata.
Puoi utilizzare i convertitori di Kafka Connect per generare eventi di modifica dei dati in
nei formati Protobuf
, AVRO
, JSON
o JSON Schemaless
. Se utilizzi un
Convertitore Kafka Connect che produce schemi, l'evento contiene
schemi separati per la chiave e i valori. In caso contrario, l'evento contiene solo
la chiave e i valori.
Lo schema della chiave non cambia mai. Lo schema per i valori è un amalgamazione di tutte le colonne tracciate dal flusso di modifiche a partire e l'ora di inizio del connettore.
Se configuri il connettore per produrre eventi JSON, l'evento di modifica di output contiene cinque campi:
Il primo campo
schema
specifica uno schema Kafka Connect che descrive lo schema chiave di Spanner.Il primo campo
payload
ha la struttura descritta dal camposchema
precedente e contiene la chiave per la riga modificata.Il secondo campo
schema
specifica lo schema Kafka Connect che descrive lo schema per la riga modificata.Il secondo campo
payload
ha la struttura descritta dal camposchema
precedente e contiene i dati effettivi per la riga modificata.Il campo
source
è un campo obbligatorio che descrive i metadati dell'origine per l'evento.
Di seguito è riportato un esempio di evento di modifica dei dati:
{ // The schema for the Spanner key. "schema": { "type": "struct", "name": "customers.Key", "optional": false, "fields": [ { "type": "int64", "optional": "false" "field": "false" } ] }, // The value of the Spanner key. "payload": { "id": "1" }, // The schema for the payload, which contains the before and after values // of the changed row. The schema for the payload contains all the // columns that the change stream has tracked since the connector start // time. "schema": { "type": "struct", "fields": [ { // The schema for the before values of the changed row. "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "id" }, { "type": "string", "optional": true, "field": "first_name" } ], "optional": true, "name": "customers.Value", "field": "before" }, { // The schema for the after values of the changed row. "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "id" }, { "type": "string", "optional": false, "field": "first_name" } ], "optional": true, "name": "customers.Value", "field": "after" }, { // The schema for the source metadata for the event. "type": "struct", "fields": [ { "type": "string", "optional": false, "field": "version" }, { "type": "string", "optional": false, "field": "connector" }, { "type": "string", "optional": false, "field": "name" }, { "type": "int64", "optional": false, "field": "ts_ms" }, { "type": "boolean", "optional": true, "default": false, "field": "snapshot" }, { "type": "string", "optional": false, "field": "db" }, { "type": "string", "optional": false, "field": "sequence" }, { "type": "string", "optional": false, "field": "project_id" }, { "type": "string", "optional": false, "field": "instance_id" }, { "type": "string", "optional": false, "field": "database_id" }, { "type": "string", "optional": false, "field": "change_stream_name" }, { "type": "string", "optional": true, "field": "table" } { "type": "string", "optional": true, "field": "server_transaction_id" } { "type": "int64", "optional": true, "field": "low_watermark" } { "type": "int64", "optional": true, "field": "read_at_timestamp" } { "type": "int64", "optional": true, "field": "number_of_records_in_transaction" } { "type": "string", "optional": true, "field": "transaction_tag" } { "type": "boolean", "optional": true, "field": "system_transaction" } { "type": "string", "optional": true, "field": "value_capture_type" } { "type": "string", "optional": true, "field": "partition_token" } { "type": "int32", "optional": true, "field": "mod_number" } { "type": "boolean", "optional": true, "field": "is_last_record_in_transaction_in_partition" } { "type": "int64", "optional": true, "field": "number_of_partitions_in_transaction" } ], "optional": false, "name": "io.debezium.connector.spanner.Source", "field": "source" }, ] { "type": "string", "optional": false, "field": "op" }, { "type": "int64", "optional": true, "field": "ts_ms" } ], "optional": false, "name": "connector_name.customers.Envelope" }, "payload": { // The values of the row before the event. "before": null, // The values of the row after the event. "after": { "id": 1, "first_name": "Anne", } }, // The source metadata. "source": { "version": "{debezium-version}", "connector": "spanner", "name": "spanner_connector", "ts_ms": 1670955531785, "snapshot": "false", "db": "database", "sequence": "1", "project_id": "project", "instance_id": "instance", "database_id": "database", "change_stream_name": "change_stream", "table": "customers", "server_transaction_id": "transaction_id", "low_watermark": 1670955471635, "read_at_timestamp": 1670955531791, "number_records_in_transaction": 2, "transaction_tag": "", "system_transaction": false, "value_capture_type": "OLD_AND_NEW_VALUES", "partition_token": "partition_token", "mod_number": 0, "is_last_record_in_transaction_in_partition": true, "number_of_partitions_in_transaction": 1 }, "op": "c", "ts_ms": 1559033904863 // }
Watermark basso
La filigrana bassa descrive il momento T in cui viene garantito che il connettore Kafka sia trasmesso in streaming e pubblicato in un argomento Kafka di tutti gli eventi con timestamp < T.
Puoi abilitare la filigrana bassa nel connettore Kafka utilizzando
Parametro gcp.spanner.low-watermark.enabled
. Questo parametro è disattivato
per impostazione predefinita. Se è attivata la filigrana bassa, il campo low_watermark
nei dati delle modifiche in tempo reale
il record di modifica viene compilato con la filigrana bassa attuale del connettore Kafka
timestamp.
Se non viene generato alcun record, il connettore Kafka invia periodicamente "battiti cardiaci" con filigrana agli argomenti di output Kafka rilevati dal connettore.
Questi heartbeat della filigrana sono record vuoti, ad eccezione dei
Campo low_watermark
. Puoi quindi utilizzare la marca temporale minima per eseguire aggregazioni in base al tempo.
Ad esempio, puoi utilizzare la filigrana bassa per ordinare gli eventi in base al commit
le chiavi primarie.
Argomenti dei metadati
Il connettore Kafka e il framework Kafka Connect creano diversi degli argomenti dei metadati per archiviare le informazioni relative al connettore. È sconsigliato modificare la configurazione o i contenuti di questi argomenti dei metadati.
Di seguito sono riportati gli argomenti dei metadati:
_consumer_offsets
: un argomento creato automaticamente da Kafka. Archivia le compensazioni dei consumer per i consumatori creati nel connettore Kafka._kafka-connect-offsets
: un argomento creato automaticamente da Kafka Connect. Archivia gli offset dei connettori._sync_topic_spanner_connector_connectorname
: un argomento creato automaticamente dal connettore. Archivia i metadati relativi alle partizioni delle modifiche in tempo reale._rebalancing_topic_spanner_connector_connectorname
: un argomento creato automaticamente dal connettore. Utilizzato per determinare l'attività del connettore._debezium-heartbeat.connectorname
: un argomento utilizzato per elaborare gli heartbeat del flusso di modifiche di Spanner.
Runtime connettore Kafka
Di seguito viene descritto il runtime del connettore Kafka.
Scalabilità
Il connettore Kafka è scalabile orizzontalmente ed è eseguito su una o più attività distribuite tra più worker Kafka Connect.
Garanzie di recapito dei messaggi
Il connettore Kafka supporta la garanzia di consegna almeno una volta.
Tolleranza di errore
Il connettore Kafka è tollerante agli errori. Quando il connettore Kafka legge le modifiche e produce eventi, registra l'ultimo timestamp del commit elaborato per ogni partizione dello stream delle modifiche. Se il connettore Kafka si arresta per qualsiasi motivo (inclusi errori di comunicazione, problemi di rete o errori del software), al riavvio il connettore Kafka continua a trasferire flussi di record dal punto in cui era stato interrotto.
Il connettore Kafka legge lo schema delle informazioni al timestamp di inizio del connettore Kafka per recuperare le informazioni dello schema. Per impostazione predefinita, Spanner non può lo schema di informazioni viene letto nei timestamp di lettura prima periodo di conservazione della versione, che per impostazione predefinita è di un'ora. Se vuoi avviare il connettore da prima della data corrente un'ora prima, devi aumentare la conservazione delle versioni del database punto.
configura il connettore Kafka
Crea un flusso di modifiche
Per maggiori dettagli su come creare un flusso di modifiche, consulta Creare un flusso di modifiche. Per continuare con i passaggi successivi, è necessaria un'istanza Spanner con un flusso di modifiche configurato.
Tieni presente che se desideri che vengano restituite sia le colonne modificate sia quelle non modificate
evento di modifica dei dati, utilizza il tipo di acquisizione del valore NEW_ROW
. Per ulteriori informazioni, consulta la sezione Tipo di acquisizione valore.
Installa il JAR del connettore Kafka
Dopo avere installato Zookeeper, Kafka e Kafka Connect, le attività rimanenti per il deployment di un connettore Kafka sono il download
l'archivio dei plug-in del connettore, estrarre i file JAR nel tuo ambiente Kafka Connect e aggiungere
con i file JAR nell'istanza plugin.path
di Kafka Connect.
Devi quindi riavviare il processo Kafka Connect per recuperare i nuovi file JAR.
Se lavori con container immutabili, puoi eseguire il pull delle immagini Immagini container di Debezium per Zookeeper, Kafka e Kafka Connect. L'immagine Kafka Connect ha Connettore Spanner preinstallato.
Per saperne di più su come installare i file JAR del connettore Kafka basato su Debezium, consulta Installazione di Debezium.
configura il connettore Kafka
Di seguito è riportato un esempio della configurazione di un connettore Kafka
che si collega a un flusso di modifiche denominato changeStreamAll
nel
il database users
nell'istanza test-instance
e nel progetto test-project
.
"name": "spanner-connector", "config": { "connector.class": "io.debezium.connector.spanner.SpannerConnector", "gcp.spanner.project.id": "test-project", "gcp.spanner.instance.id": "test-instance", "gcp.spanner.database.id": "users", "gcp.spanner.change.stream": "changeStreamAll", "gcp.spanner.credentials.json": "{"client_id": user@example.com}", "gcp.spanner.database.role": "cdc-role", "tasks.max": "10" }
Questa configurazione contiene:
Il nome del connettore quando registrato con un servizio Kafka Connect.
Il nome di questa classe del connettore Spanner.
L'ID progetto.
L'ID istanza Spanner.
L'ID database di Spanner.
Il nome del flusso di modifiche.
L'oggetto JSON per la chiave dell'account di servizio.
(Facoltativo) Il ruolo di database Spanner da utilizzare.
Il numero massimo di attività.
Per un elenco completo delle proprietà del connettore, consulta Proprietà di configurazione del connettore Kafka.
aggiungi la configurazione del connettore a Kafka Connect
Per iniziare a eseguire un connettore Spanner:
Crea una configurazione per il connettore Spanner.
Utilizza l'API REST Kafka Connect per aggiungere la configurazione del connettore al cluster Kafka Connect.
Puoi inviare questa configurazione con un comando POST
a Kafka Connect in esecuzione
completamente gestito di Google Cloud. Per impostazione predefinita, il servizio Kafka Connect viene eseguito sulla porta 8083
.
Il servizio registra la configurazione e avvia l'attività del connettore.
che si connette al database Spanner e trasmette i flussi di record di eventi di modifica
Argomenti Kafka.
Di seguito è riportato un esempio di comando POST
:
POST /connectors HTTP/1.1 Host: http://localhost:8083 Accept: application/json { "name": "spanner-connector" "config": { "connector.class": "io.debezium.connector.spanner.SpannerConnector", "gcp.spanner.project.id": "test-project", "gcp.spanner.instance.id": "test-instance", "gcp.spanner.database.id": "users", "gcp.spanner.change.stream": "changeStreamAll", "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }", "heartbeat.interval.ms": "100", "tasks.max": "10" } }
Esempio di risposta corretta:
HTTP/1.1 201 Created Content-Type: application/json { "name": "spanner-connector", "config": { "connector.class": "io.debezium.connector.spanner.SpannerConnector", "gcp.spanner.project.id": "test-project", "gcp.spanner.instance.id": "test-instance", "gcp.spanner.database.id": "users", "gcp.spanner.change.stream": "changeStreamAll", "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }", "heartbeat.interval.ms": "100", "tasks.max": "10" }, "tasks": [ { "connector": "spanner-connector", "task": 1 }, { "connector": "spanner-connector", "task": 2 }, { "connector": "spanner-connector", "task": 3 } ] }
Aggiorna la configurazione del connettore Kafka
Per aggiornare la configurazione del connettore, invia un comando PUT
all'istanza in esecuzione
Servizio Kafka Connect con lo stesso nome connettore.
Supponiamo di avere un connettore in esecuzione con la configurazione
nella sezione precedente. Di seguito è riportato un esempio di comando PUT
:
PUT /connectors/spanner-connector/config HTTP/1.1 Host: http://localhost:8083 Accept: application/json { "connector.class": "io.debezium.connector.spanner.SpannerConnector", "gcp.spanner.project.id": "test-project", "gcp.spanner.instance.id": "test-instance", "gcp.spanner.database.id": "users", "gcp.spanner.change.stream": "changeStreamAll", "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }", "heartbeat.interval.ms": "100", "tasks.max": "10" }
Esempio di risposta corretta:
HTTP/1.1 200 OK Content-Type: application/json { "connector.class": "io.debezium.connector.spanner.SpannerConnector", "tasks.max": "10", "gcp.spanner.project.id": "test-project", "gcp.spanner.instance.id": "test-instance", "gcp.spanner.database.id": "users", "gcp.spanner.change.stream": "changeStreamAll", "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }", "heartbeat.interval.ms": "100", "tasks.max": "10" }
Arresta il connettore Kafka
Per arrestare il connettore, invia un comando DELETE
all'istanza in esecuzione
Servizio Kafka Connect con lo stesso nome connettore.
Supponiamo di avere un connettore in esecuzione con la configurazione
nella sezione precedente. Di seguito è riportato un esempio di comando DELETE
:
DELETE /connectors/spanner-connector HTTP/1.1 Host: http://localhost:8083
Esempio di risposta corretta:
HTTP/1.1 204 No Content
Monitora il connettore Kafka
Oltre alle metriche standard Kafka Connect e Debezium, il connettore Kafka esporta le proprie metriche:
MilliSecondsLowWatermark
: il watermark basso corrente dell'attività del connettore in millisecondi. La la filigrana bassa indica l'ora T in cui è garantito che il connettore abbia hanno trasmesso tutti gli eventi con timestamp < PMilliSecondsLowWatermarkLag
: il ritardo della filigrana in basso rispetto all'ora attuale in millisecondi. hanno trasmesso tutti gli eventi con timestamp < PLatencyLowWatermark<Variant>MilliSeconds
: il ritardo del watermark basso rispetto all'ora corrente in millisecondi. Sono disponibili le varianti P50, P95, P99, Media, Min e Max.LatencySpanner<Variant>MilliSeconds
: la latenza Spanner-commit-timestamp-to-connector-read. Sono disponibili le varianti P50, P95, P99, Media, Min, Max.LatencyReadToEmit<Variant>MilliSeconds
: latenza di Spanner-read-timestamp-to-connector-emit. Sono disponibili le varianti P50, P95, P99, Media, Min e Max.LatencyCommitToEmit<Variant>tMilliSeconds
: latenza di Spanner-commit-timestamp-to-connector-emit. Sono disponibili le varianti P50, P95, P99, Media, Min e Max.LatencyCommitToPublish<Variant>MilliSeconds
: la latenza Spanner-commit-timestamp-to Kafka-publish-timestamp. Sono disponibili le varianti P50, P95, P99, Media, Min, Max.NumberOfChangeStreamPartitionsDetected
: il numero totale di partizioni rilevato dall'attività del connettore attuale.NumberOfChangeStreamQueriesIssued
: il numero totale di query modifiche in tempo reale emessi dall'attività corrente.NumberOfActiveChangeStreamQueries
: il numero attivo di modifiche in tempo reale rilevate dall'attività del connettore attuale.SpannerEventQueueCapacity
: la capacità totale diStreamEventQueue
, una coda che archivia gli elementi ricevuti dalle query di modifiche in tempo reale.SpannerEventQueueCapacity
: la capacità rimanente diStreamEventQueue
.TaskStateChangeEventQueueCapacity
: la capacità totale diTaskStateChangeEventQueue
, una coda che archivia gli eventi che si verificano nel connettore.RemainingTaskStateChangeEventQueueCapacity
: la capacità rimanente diTaskStateChangeEventQueue
.NumberOfActiveChangeStreamQueries
: il numero attivo di modifiche in tempo reale rilevate dall'attività del connettore attuale.
Proprietà di configurazione del connettore Kafka
Di seguito sono riportate le proprietà di configurazione obbligatorie per il connettore:
name
: nome univoco del connettore. Il tentativo di registrarsi di nuovo con lo stesso nome causa un errore. Questa proprietà è obbligatoria per tutti i connettori Kafka Connect.connector.class
: il nome della classe Java per il connettore. Usa sempre il valoreio.debezium.connector.spanner.SpannerConnector
per il connettore Kafka.tasks.max
: il numero massimo di attività che devono essere create per questo connettore.gcp.spanner.project.id
: l'ID progettogcp.spanner.instance.id
: l'ID istanza Spannergcp.spanner.database.id
: l'ID database di Spannergcp.spanner.change.stream
: il nome delle modifiche in tempo reale di Spannergcp.spanner.credentials.json
: l'oggetto JSON della chiave dell'account di servizio.gcp.spanner.credentials.path
: il percorso del file dell'oggetto JSON chiave dell'account di servizio. Obbligatorio se il campo sopra non è stato fornito.gcp.spanner.database.role
: il ruolo del database Spanner per per gli utilizzi odierni. Questo è necessario solo quando lo stream delle modifiche è protetto con un controllo di accesso granulare. Il ruolo database deve disporre del privilegioSELECT
sul flusso di modifiche e del privilegioEXECUTE
sulla funzione di lettura del flusso di modifiche. Per ulteriori informazioni, consulta Controllo granulare dell'controllo dell'accesso per le streaming.
Le seguenti proprietà di configurazione avanzata hanno valori predefiniti utilizzabili nella maggior parte situazioni e pertanto raramente devono essere specificati nella configurazione del connettore:
gcp.spanner.low-watermark.enabled
: indica se la filigrana bassa è attiva per il connettore. Il valore predefinito è false.gcp.spanner.low-watermark.update-period.ms
: l'intervallo di aggiornamento della soglia minima. Il valore predefinito è 1000 ms.heartbeat.interval.ms
: l'intervallo heartbeat di Spanner. Il valore predefinito è 300000 (cinque minuti).gcp.spanner.start.time
: l'ora di inizio del connettore. Il valore predefinito è l'ora attuale.gcp.spanner.end.time
: l'ora di fine del connettore. Il valore predefinito è infinito.tables.exclude.list
: le tabelle per cui escludere gli eventi di modifica. Il campo predefinito è vuoto.tables.include.list
: le tabelle per cui includere gli eventi di modifica. Se non viene compilato, vengono incluse tutte le tabelle. Il campo predefinito è vuoto.gcp.spanner.stream.event.queue.capacity
: la capacità della coda di eventi Spanner. Il valore predefinito è 10.000.connector.spanner.task.state.change.event.queue.capacity
: la capacità della coda degli eventi di modifica dello stato dell'attività. Il valore predefinito è 1000.connector.spanner.max.missed.heartbeats
: il numero massimo di heartbeat mancanti per una query di modifiche in tempo reale prima che venga generata un'eccezione. Il valore predefinito è 10.scaler.monitor.enabled
: indica se la scalabilità automatica delle attività è abilitata. Il valore predefinito è false.tasks.desired.partitions
: il numero preferito di partizioni dei flussi di modifiche per attività. Questo parametro è necessario per la scalabilità automatica delle attività. Il valore predefinito è 2.tasks.min
: il numero minimo di attività. Questo parametro è necessario per la scalabilità automatica delle attività. Il valore predefinito è 1.connector.spanner.sync.topic
: il nome dell'argomento sincronizzazione, un argomento del connettore interno utilizzato per archiviare le comunicazioni tra le attività. Il valore predefinito è_sync_topic_spanner_connector_connectorname
se l'utente non ha fornito un nome.connector.spanner.sync.poll.duration
: la durata del sondaggio per l'argomento di sincronizzazione. Il valore predefinito è 500 ms.connector.spanner.sync.request.timeout.ms
: il timeout per le richieste all'argomento di sincronizzazione. Il valore predefinito è 5000 ms.connector.spanner.sync.delivery.timeout.ms
: il timeout per la pubblicazione nell'argomento di sincronizzazione. Il valore predefinito è 15.000 ms.connector.spanner.sync.commit.offsets.interval.ms
: l'intervallo in cui viene eseguito il commit degli offset per l'argomento di sincronizzazione. Il valore predefinito è 60.000 ms.connector.spanner.sync.publisher.wait.timeout
: l'intervallo con cui i messaggi vengono pubblicati nell'argomento di sincronizzazione. Il valore predefinito è 5 ms.connector.spanner.rebalancing.topic
: il nome dell'argomento di ribilanciamento. L'argomento di riequilibrio è un argomento del connettore interno utilizzato per determinare l'attività delle attività. Il valore predefinito è_rebalancing_topic_spanner_connector_connectorname
se l'utente non ha fornito un nome.connector.spanner.rebalancing.poll.duration
: la durata del sondaggio per l'argomento di ribilanciamento. Il valore predefinito è 5000 ms.connector.spanner.rebalancing.commit.offsets.timeout
: il timeout per il commit degli offset per l'argomento di ribilanciamento. Il valore predefinito è 5000 ms.connector.spanner.rebalancing.commit.offsets.interval.ms
: l'intervallo a cui vengono committati gli offset per l'argomento di sincronizzazione. Il valore predefinito è 60.000 ms.connector.spanner.rebalancing.task.waiting.timeout
: la durata del tempo di attesa di un'attività prima dell'elaborazione di un evento di riequilibrio. Il valore predefinito è 1000 ms.
Per un elenco ancora più dettagliato delle proprietà del connettore configurabili, consulta il repository GitHub.
Limitazioni
Il connettore non supporta lo streaming di eventi snapshot.
Se il watermarking è attivato nel connettore, non puoi configurare le trasformazioni di routing degli argomenti Debezium.