Connettore sink Bigtable per Kafka Connect


I connettori sink sono plug-in per il framework Kafka Connect che puoi utilizzare per trasmettere dati in streaming da Kafka direttamente ad altri sistemi per l'archiviazione e l'elaborazione. Il sink Kafka Connect Bigtable è un connettore dedicato progettato per trasmettere dati in streaming in Bigtable in tempo reale con la latenza più bassa possibile.

Questa pagina descrive le funzionalità e le limitazioni del connettore. Fornisce inoltre esempi di utilizzo per scenari avanzati con trasformazioni di singoli messaggi (SMT) e creazione automatica di tabelle. Per istruzioni di installazione e documentazione di riferimento completa, consulta il repository del connettore sink Kafka Connect Bigtable.

Funzionalità

Il connettore di sink Bigtable si iscrive agli argomenti Kafka, legge i messaggi ricevuti su questi argomenti e poi scrive i dati nelle tabelle Bigtable. Le sezioni seguenti forniscono una panoramica generale di ogni funzionalità. Per informazioni dettagliate sull'utilizzo, consulta la sezione Configurazione di questo documento.

Mappatura dei tasti, SMT e convertitori

Per scrivere dati in una tabella Bigtable, devi fornire una chiave di riga, una famiglia di colonne e un nome di colonna univoci per ogni operazione. Queste informazioni vengono dedotte dai campi dei messaggi Kafka. Puoi creare tutti gli identificatori richiesti con impostazioni come row.key.definition, row.key.delimiter o default.column.family.

Creazione automatica delle tabelle

Puoi utilizzare le impostazioni auto.create.tables e auto.create.column.families per creare automaticamente tabelle di destinazione e famiglie di colonne se non esistono nella destinazione Bigtable. Questa flessibilità ha un costo in termini di rendimento, quindi in genere ti consigliamo di creare prima le tabelle in cui vuoi trasmettere i dati in streaming.

Modalità di scrittura ed eliminazione delle righe

Quando scrivi in una tabella, puoi sovrascrivere completamente i dati se esiste già una riga oppure scegliere di abbandonare l'operazione con l'impostazione insert.mode. Puoi utilizzare questa impostazione in combinazione con la gestione degli errori DLQ per ottenere la garanzia di consegna almeno una volta.

Per inviare comandi DELETE, configura la proprietà value.null.mode. Puoi utilizzarlo per eliminare intere righe, famiglie di colonne o singole colonne.

Coda dei messaggi non recapitabili

Configura la proprietà errors.deadletterqueue.topic.name e imposta errors.tolerance=all in modo che pubblichi i messaggi che non vengono elaborati nell'argomento DLQ.

Compatibilità con il connettore di sink Bigtable di Confluent Platform

Il connettore di sink Bigtable Kafka Connect di Google Cloud offre una parità completa con il connettore di sink Bigtable della piattaforma Confluent autogestita. Puoi utilizzare il file di configurazione esistente per il connettore Confluent Platform modificando l'impostazione connector.class in connector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector.

Limitazioni

Si applicano le seguenti limitazioni:

  • Al momento, il connettore sink Kafka Connect Bigtable è supportato solo per i cluster Kafka in cui è possibile installare i connettori in modo indipendente (cluster Kafka autogestiti o on-premise). Questo connettore non è attualmente supportato per Google Cloud Managed Service per Apache Kafka.

  • Questo connettore può creare famiglie di colonne e colonne dai nomi dei campi fino a due livelli di nidificazione:

    • Le struct nidificate a un livello inferiore al secondo vengono convertite in JSON e salvate nella colonna principale.
    • Le strutture di primo livello vengono trasformate in famiglie di colonne. I campi in queste struct diventano nomi di colonne.
    • I valori primitivi di primo livello vengono salvati per impostazione predefinita in una famiglia di colonne che utilizza l'argomento Kafka come nome. Le colonne di questa famiglia hanno nomi uguali ai nomi dei campi. Puoi modificare questo comportamento utilizzando le impostazioni default.column.family e default.column.qualifier.

Installazione

Per installare questo connettore, segui i passaggi di installazione standard: crea il progetto con Maven, copia i file .jar nella directory dei plug-in di Kafka Connect e crea il file di configurazione. Per istruzioni dettagliate, consulta la sezione Esecuzione del connettore nel repository.

Configurazione

Per configurare i connettori Kafka Connect, devi scrivere file di configurazione. Il connettore di sink Bigtable Kafka Connect di Google Cloud supporta tutte le proprietà di base del connettore Kafka, nonché alcuni campi aggiuntivi personalizzati per l'utilizzo con le tabelle Bigtable.

Le sezioni seguenti forniscono esempi dettagliati per i casi d'uso più avanzati, ma non descrivono tutte le impostazioni disponibili. Per esempi di utilizzo di base e il riferimento completo alle proprietà, consulta il repository del connettore sink Kafka Connect Bigtable.

Esempio: creazione flessibile di chiave di riga e famiglia di colonne

Esempio di scenario

I messaggi Kafka in entrata contengono dettagli sugli ordini di acquisto con identificatori utente. Vuoi scrivere ogni ordine in una riga con due famiglie di colonne: una per i dettagli dell'utente e una per i dettagli dell'ordine.

Formato messaggi Kafka di origine

Formatta i messaggi Kafka pubblicati nell'argomento con JsonConverter per ottenere la seguente struttura:

{
  "user": "user123",
  "phone": "800‑555‑0199",
  "email": "business@example.com",
  "order": {
    id: "order123",
    items: ["itemUUID1", "itemUUID2"],
    discount: 0.2
  }
}
Riga Bigtable prevista

Vuoi scrivere ogni messaggio come riga Bigtable con la seguente struttura:

Chiave di riga contact_details order_details
nome telefono email orderId elementi sconto
user123#order123 user123 800‑555‑0199 business@example.com order123 ["itemUUID1", "itemUUID2"] 0,2
Configurazione del connettore
Per ottenere il risultato previsto, scrivi il seguente file di configurazione:
# Settings such as latency configuration or DLQ identifiers omitted for brevity.
# Refer to the GitHub repository for full settings reference.

# Connector name, class, Bigtable and Google Cloud identifiers
name=BigtableSinkConnector
connector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector
gcp.bigtable.project.id=my_project_id
gcp.bigtable.instance.id=my_bigtable_instance_id

# Use JsonConverter to format Kafka messages as JSON
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

# Name of the topic where shopping details are posted
topics=shopping_topic

# Settings for row key creation
row.key.definition=user,order.id
row.key.delimiter=#

# All user identifiers are root level fields.
# Use the default column family to aggregate them into a single family.
default.column.family=contact_details

# Use SMT to rename "orders" field into "order_details" for the new column family
transforms=renameOrderField
transforms.renameOrderField.type=org.apache.kafka.connect.transforms.ReplaceField$Key
transforms.renameOrderField.renames=order:order_details
I risultati dell'utilizzo di questo file sono i seguenti:
  • row.key.definition=user,order.id è un elenco separato da virgole dei campi che vuoi utilizzare per creare la chiave di riga. Ogni voce viene concatenata con il set di caratteri nell'impostazione row.key.delimiter.

    Quando utilizzi row.key.definition, tutti i tuoi messaggi devono utilizzare lo stesso schema. Se devi elaborare messaggi con strutture diverse in colonne o famiglie di colonne diverse, ti consigliamo di creare istanze del connettore separate. Per ulteriori informazioni, consulta la sezione Esempio: scrivere messaggi in più tabelle di questo documento.

  • I nomi famiglia di colonne Bigtable si basano sui nomi delle strutture di primo livello non nulle. Di conseguenza:

    • I valori per i dettagli di contatto sono tipi di dati primitivi a livello di radice, quindi li aggreghi in una famiglia di colonne predefinita con l'impostazione default.column.family=contact_details.
    • I dettagli dell'ordine sono già inclusi nell'oggetto order, ma vuoi utilizzare order_details come nome della famiglia di colonne. Per farlo, utilizza ReplaceFields SMT e rinomina il campo.

Esempio: creazione automatica di tabelle e scritture idempotenti

Esempio di scenario

I messaggi Kafka in entrata contengono i dettagli degli ordini di acquisto. I clienti possono modificare i propri carrelli prima dell'evasione, quindi prevedi di ricevere messaggi di follow-up con ordini modificati che devi salvare come aggiornamenti nella stessa riga. Inoltre, non puoi garantire che la tabella di destinazione esista al momento della scrittura, quindi vuoi che il connettore crei automaticamente la tabella se non esiste.

Configurazione del connettore
Per ottenere il risultato previsto, scrivi il seguente file di configurazione:
# Settings such as latency configuration or DLQ identifiers omitted for brevity.
# Refer to the GitHub repository for full settings reference.

# Settings for row key creation also omitted.
# Refer to the Example: flexible row key and column family creation section.

# Connector name, class, Bigtable and Google Cloud identifiers
name=BigtableSinkConnector
connector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector
gcp.bigtable.project.id=my_project_id
gcp.bigtable.instance.id=my_bigtable_instance_id

# Use JsonConverter to format Kafka messages as JSON
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

# Name of the topic where shopping details are posted
topics=shopping_topic

# Automatically create destination tables if they don't exist
auto.create.tables=true

# UPSERT causes subsequent writes to overwrite existing rows.
# This way you can update the same order when customers change the contents
# of their baskets.
insert.mode=upsert

Esempio: scrivere messaggi in più tabelle

Esempio di scenario

I messaggi Kafka in arrivo contengono i dettagli degli ordini di acquisto provenienti da diversi canali di evasione. Questi messaggi vengono pubblicati in argomenti diversi e vuoi utilizzare lo stesso file di configurazione per scriverli in tabelle separate.

Configurazione del connettore

Puoi scrivere i messaggi in più tabelle, ma se utilizzi un unico file di configurazione per la configurazione, ogni messaggio deve utilizzare lo stesso schema. Se devi elaborare messaggi di argomenti diversi in colonne o famiglie distinte, ti consigliamo di creare istanze del connettore separate.

Per ottenere il risultato previsto, scrivi il seguente file di configurazione:

# Settings such as latency configuration or DLQ identifiers omitted for brevity.
# Refer to the GitHub repository for full settings reference.

# Settings for row key creation are also omitted.
# Refer to the Example: flexible row key and column family creation section.

# Connector name, class, Bigtable and Google Cloud identifiers
name=BigtableSinkConnector
connector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector
gcp.bigtable.project.id=my_project_id
gcp.bigtable.instance.id=my_bigtable_instance_id

# Use JsonConverter to format Kafka messages as JSON
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

# Name of the topics where shopping details are posted
topics=shopping_topic_store1,shopping_topic_store2

# Use a dynamic table name based on the Kafka topic name.
table.name.format=orders_${topic}

In questo approccio, utilizzi la proprietà table.name.format=orders_${topic} per fare riferimento dinamicamente a ogni nome di argomento Kafka. Quando configuri più nomi di argomenti con l'impostazione topics=shopping_topic_store1,shopping_topic_store2, ogni messaggio viene scritto in una tabella separata:

  • I messaggi dell'argomento shopping_topic_store1 vengono scritti nella tabella orders_shopping_topic_store1.
  • I messaggi dell'argomento shopping_topic_store2 vengono scritti nella tabella orders_shopping_topic_store2.

Passaggi successivi