Connettore sink Bigtable per Kafka Connect
I connettori sink sono plug-in per il framework Kafka Connect che puoi utilizzare per trasmettere dati in streaming da Kafka direttamente ad altri sistemi per l'archiviazione e l'elaborazione. Il sink Kafka Connect Bigtable è un connettore dedicato progettato per trasmettere dati in streaming in Bigtable in tempo reale con la latenza più bassa possibile.
Questa pagina descrive le funzionalità e le limitazioni del connettore. Fornisce inoltre esempi di utilizzo per scenari avanzati con trasformazioni di singoli messaggi (SMT) e creazione automatica di tabelle. Per istruzioni di installazione e documentazione di riferimento completa, consulta il repository del connettore sink Kafka Connect Bigtable.
Funzionalità
Il connettore di sink Bigtable si iscrive agli argomenti Kafka, legge i messaggi ricevuti su questi argomenti e poi scrive i dati nelle tabelle Bigtable. Le sezioni seguenti forniscono una panoramica generale di ogni funzionalità. Per informazioni dettagliate sull'utilizzo, consulta la sezione Configurazione di questo documento.
Mappatura dei tasti, SMT e convertitori
Per scrivere dati in una tabella Bigtable, devi fornire una chiave di riga, una famiglia di colonne e un nome di colonna univoci per ogni operazione.
Queste informazioni vengono dedotte dai campi dei messaggi Kafka.
Puoi creare tutti gli identificatori richiesti con impostazioni come
row.key.definition
, row.key.delimiter
o
default.column.family
.
Creazione automatica delle tabelle
Puoi utilizzare le impostazioni auto.create.tables
e
auto.create.column.families
per creare automaticamente
tabelle di destinazione e famiglie di colonne se non esistono nella
destinazione Bigtable. Questa flessibilità ha un costo in termini di rendimento, quindi in genere ti consigliamo di creare prima le tabelle in cui vuoi trasmettere i dati in streaming.
Modalità di scrittura ed eliminazione delle righe
Quando scrivi in una tabella, puoi sovrascrivere completamente i dati
se esiste già una riga oppure scegliere di abbandonare l'operazione
con l'impostazione insert.mode
. Puoi utilizzare questa impostazione
in combinazione con la gestione degli errori DLQ per ottenere la garanzia di consegna almeno una volta.
Per inviare comandi DELETE
, configura la proprietà value.null.mode
. Puoi utilizzarlo per eliminare intere righe, famiglie di colonne o
singole colonne.
Coda dei messaggi non recapitabili
Configura la proprietà errors.deadletterqueue.topic.name
e imposta errors.tolerance=all
in modo che pubblichi i messaggi che non vengono elaborati nell'argomento DLQ.
Compatibilità con il connettore di sink Bigtable di Confluent Platform
Il connettore di sink Bigtable Kafka Connect di Google Cloud
offre una parità completa con il
connettore di sink Bigtable della piattaforma Confluent autogestita.
Puoi utilizzare il file di configurazione esistente per il connettore Confluent Platform
modificando l'impostazione connector.class
in
connector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector
.
Limitazioni
Si applicano le seguenti limitazioni:
Al momento, il connettore sink Kafka Connect Bigtable è supportato solo per i cluster Kafka in cui è possibile installare i connettori in modo indipendente (cluster Kafka autogestiti o on-premise). Questo connettore non è attualmente supportato per Google Cloud Managed Service per Apache Kafka.
Questo connettore può creare famiglie di colonne e colonne dai nomi dei campi fino a due livelli di nidificazione:
- Le struct nidificate a un livello inferiore al secondo vengono convertite in
JSON
e salvate nella colonna principale. - Le strutture di primo livello vengono trasformate in famiglie di colonne. I campi in queste struct diventano nomi di colonne.
- I valori primitivi di primo livello vengono salvati per impostazione predefinita in una famiglia di colonne che
utilizza l'argomento Kafka come nome. Le colonne di questa famiglia hanno nomi uguali
ai nomi dei campi. Puoi modificare questo comportamento utilizzando le impostazioni
default.column.family
edefault.column.qualifier
.
- Le struct nidificate a un livello inferiore al secondo vengono convertite in
Installazione
Per installare questo connettore, segui i passaggi di installazione standard: crea il progetto con Maven, copia i file .jar
nella directory dei plug-in di Kafka Connect e crea il file di configurazione.
Per istruzioni dettagliate, consulta la sezione
Esecuzione del connettore
nel repository.
Configurazione
Per configurare i connettori Kafka Connect, devi scrivere file di configurazione. Il connettore di sink Bigtable Kafka Connect di Google Cloud supporta tutte le proprietà di base del connettore Kafka, nonché alcuni campi aggiuntivi personalizzati per l'utilizzo con le tabelle Bigtable.
Le sezioni seguenti forniscono esempi dettagliati per i casi d'uso più avanzati, ma non descrivono tutte le impostazioni disponibili. Per esempi di utilizzo di base e il riferimento completo alle proprietà, consulta il repository del connettore sink Kafka Connect Bigtable.
Esempio: creazione flessibile di chiave di riga e famiglia di colonne
- Esempio di scenario
-
I messaggi Kafka in entrata contengono dettagli sugli ordini di acquisto con identificatori utente. Vuoi scrivere ogni ordine in una riga con due famiglie di colonne: una per i dettagli dell'utente e una per i dettagli dell'ordine.
- Formato messaggi Kafka di origine
-
Formatta i messaggi Kafka pubblicati nell'argomento con
JsonConverter
per ottenere la seguente struttura:{ "user": "user123", "phone": "800‑555‑0199", "email": "business@example.com", "order": { id: "order123", items: ["itemUUID1", "itemUUID2"], discount: 0.2 } }
- Riga Bigtable prevista
-
Vuoi scrivere ogni messaggio come riga Bigtable con la seguente struttura:
Chiave di riga contact_details order_details nome telefono email orderId elementi sconto user123#order123
user123 800‑555‑0199 business@example.com order123 ["itemUUID1", "itemUUID2"] 0,2 - Configurazione del connettore
-
Per ottenere il risultato previsto, scrivi il seguente file di configurazione:
# Settings such as latency configuration or DLQ identifiers omitted for brevity. # Refer to the GitHub repository for full settings reference. # Connector name, class, Bigtable and Google Cloud identifiers name=BigtableSinkConnector connector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector gcp.bigtable.project.id=my_project_id gcp.bigtable.instance.id=my_bigtable_instance_id # Use JsonConverter to format Kafka messages as JSON key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter # Name of the topic where shopping details are posted topics=shopping_topic # Settings for row key creation row.key.definition=user,order.id row.key.delimiter=# # All user identifiers are root level fields. # Use the default column family to aggregate them into a single family. default.column.family=contact_details # Use SMT to rename "orders" field into "order_details" for the new column family transforms=renameOrderField transforms.renameOrderField.type=org.apache.kafka.connect.transforms.ReplaceField$Key transforms.renameOrderField.renames=order:order_details
I risultati dell'utilizzo di questo file sono i seguenti:
-
row.key.definition=user,order.id
è un elenco separato da virgole dei campi che vuoi utilizzare per creare la chiave di riga. Ogni voce viene concatenata con il set di caratteri nell'impostazionerow.key.delimiter
.Quando utilizzi
row.key.definition
, tutti i tuoi messaggi devono utilizzare lo stesso schema. Se devi elaborare messaggi con strutture diverse in colonne o famiglie di colonne diverse, ti consigliamo di creare istanze del connettore separate. Per ulteriori informazioni, consulta la sezione Esempio: scrivere messaggi in più tabelle di questo documento. -
I nomi famiglia di colonne Bigtable si basano sui nomi delle strutture di primo livello non nulle. Di conseguenza:
- I valori per i dettagli di contatto sono tipi di dati primitivi a livello di radice, quindi
li aggreghi in una famiglia di colonne predefinita con l'impostazione
default.column.family=contact_details
. - I dettagli dell'ordine sono già inclusi nell'oggetto
order
, ma vuoi utilizzareorder_details
come nome della famiglia di colonne. Per farlo, utilizza ReplaceFields SMT e rinomina il campo.
- I valori per i dettagli di contatto sono tipi di dati primitivi a livello di radice, quindi
li aggreghi in una famiglia di colonne predefinita con l'impostazione
Esempio: creazione automatica di tabelle e scritture idempotenti
- Esempio di scenario
-
I messaggi Kafka in entrata contengono i dettagli degli ordini di acquisto. I clienti possono modificare i propri carrelli prima dell'evasione, quindi prevedi di ricevere messaggi di follow-up con ordini modificati che devi salvare come aggiornamenti nella stessa riga. Inoltre, non puoi garantire che la tabella di destinazione esista al momento della scrittura, quindi vuoi che il connettore crei automaticamente la tabella se non esiste.
- Configurazione del connettore
-
Per ottenere il risultato previsto, scrivi il seguente file di configurazione:
# Settings such as latency configuration or DLQ identifiers omitted for brevity. # Refer to the GitHub repository for full settings reference. # Settings for row key creation also omitted. # Refer to the Example: flexible row key and column family creation section. # Connector name, class, Bigtable and Google Cloud identifiers name=BigtableSinkConnector connector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector gcp.bigtable.project.id=my_project_id gcp.bigtable.instance.id=my_bigtable_instance_id # Use JsonConverter to format Kafka messages as JSON key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter # Name of the topic where shopping details are posted topics=shopping_topic # Automatically create destination tables if they don't exist auto.create.tables=true # UPSERT causes subsequent writes to overwrite existing rows. # This way you can update the same order when customers change the contents # of their baskets. insert.mode=upsert
Esempio: scrivere messaggi in più tabelle
- Esempio di scenario
-
I messaggi Kafka in arrivo contengono i dettagli degli ordini di acquisto provenienti da diversi canali di evasione. Questi messaggi vengono pubblicati in argomenti diversi e vuoi utilizzare lo stesso file di configurazione per scriverli in tabelle separate.
- Configurazione del connettore
-
Puoi scrivere i messaggi in più tabelle, ma se utilizzi un unico file di configurazione per la configurazione, ogni messaggio deve utilizzare lo stesso schema. Se devi elaborare messaggi di argomenti diversi in colonne o famiglie distinte, ti consigliamo di creare istanze del connettore separate.
Per ottenere il risultato previsto, scrivi il seguente file di configurazione:
# Settings such as latency configuration or DLQ identifiers omitted for brevity. # Refer to the GitHub repository for full settings reference. # Settings for row key creation are also omitted. # Refer to the Example: flexible row key and column family creation section. # Connector name, class, Bigtable and Google Cloud identifiers name=BigtableSinkConnector connector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector gcp.bigtable.project.id=my_project_id gcp.bigtable.instance.id=my_bigtable_instance_id # Use JsonConverter to format Kafka messages as JSON key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter # Name of the topics where shopping details are posted topics=shopping_topic_store1,shopping_topic_store2 # Use a dynamic table name based on the Kafka topic name. table.name.format=orders_${topic}
In questo approccio, utilizzi la proprietà
table.name.format=orders_${topic}
per fare riferimento dinamicamente a ogni nome di argomento Kafka. Quando configuri più nomi di argomenti con l'impostazionetopics=shopping_topic_store1,
, ogni messaggio viene scritto in una tabella separata:shopping_topic_store2 - I messaggi dell'argomento
shopping_topic_store1
vengono scritti nella tabellaorders_shopping_topic_store1
. - I messaggi dell'argomento
shopping_topic_store2
vengono scritti nella tabellaorders_shopping_topic_store2
.
- I messaggi dell'argomento