Scrivere dati da Kafka a BigQuery con Dataflow

Questo documento fornisce indicazioni generali sulla creazione e sul deployment di una pipeline Dataflow che trasmette in streaming da Apache Kafka a BigQuery.

Apache Kafka è una piattaforma open source per gli eventi in streaming. Kafka viene utilizzato di frequente nelle architetture distribuite per abilitare la comunicazione tra componenti a basso accoppiamento. Puoi utilizzare Dataflow per leggere gli eventi da Kafka, elaborarli e scrivere i risultati in una tabella BigQuery per ulteriori analisi.

Lettura degli eventi Kafka in BigQuery

Google fornisce un modello Dataflow che configura una pipeline da Kafka a BigQuery. Il modello utilizza il connettore BigQueryIO fornito nell'SDK Apache Beam.

Per utilizzare questo modello, svolgi i seguenti passaggi:

  1. Esegui il deployment di Kafka in Google Cloud o altrove.
  2. Configura il networking.
  3. Imposta le autorizzazioni Identity and Access Management (IAM).
  4. Scrivi una funzione per trasformare i dati dell'evento.
  5. Crea la tabella di output BigQuery.
  6. Esegui il deployment del modello Dataflow.

Esegui il deployment di Kafka

In Google Cloud, puoi eseguire il deployment di un cluster Kafka sulle istanze di macchine virtuali (VM) Compute Engine o utilizzare un servizio Kafka gestito di terze parti. Per approfondire le opzioni di deployment su Google Cloud, consulta Che cos'è Apache Kafka?. Puoi trovare soluzioni Kafka di terze parti su Google Cloud Marketplace.

In alternativa, potresti avere un cluster Kafka esistente al di fuori di Google Cloud. Ad esempio, potresti avere un carico di lavoro esistente di cui è stato eseguito il deployment on-premise o in un altro cloud pubblico.

Configurazione del networking

Per impostazione predefinita, Dataflow avvia le istanze all'interno della rete Virtual Private Cloud (VPC) predefinita. A seconda della configurazione di Kafka, potrebbe essere necessario configurare una rete e una sottorete diverse per Dataflow. Per saperne di più, consulta la sezione Specificare una rete e una sottorete. Quando configuri la rete, crea regole firewall che consentano alle macchine worker di Dataflow di raggiungere i broker Kafka.

Se utilizzi Controlli di servizio VPC, posiziona il cluster Kafka all'interno del perimetro di Controlli di servizio VPC oppure estendi i perimetri alla VPN o all'interconnessione Cloud autorizzata.

Se il cluster Kafka è dipiegato all'esterno di Google Cloud, devi creare una connessione di rete tra Dataflow e il cluster Kafka. Esistono diverse opzioni di rete con diversi compromessi:

L'Dedicated Interconnect è l'opzione migliore per prestazioni e affidabilità prevedibili, ma la configurazione può richiedere più tempo perché terze parti devono eseguire il provisioning dei nuovi circuiti. Con una topologia basata su IP pubblico, puoi iniziare rapidamente perché non è necessario eseguire molti interventi di rete.

Le due sezioni seguenti descrivono queste opzioni in maggiore dettaglio.

Spazio di indirizzi RFC 1918 condiviso

Sia l'Dedicated Interconnect che la VPN IPsec ti consentono di accedere direttamente agli indirizzi IP RFC 1918 nel tuo Virtual Private Cloud (VPC), il che può semplificare la configurazione di Kafka. Se utilizzi una topologia basata su VPN, ti consigliamo di configurare una VPN a velocità effettiva elevata.

Per impostazione predefinita, Dataflow avvia le istanze sulla rete VPC predefinita. In una topologia di rete privata con percorsi definiti esplicitamente in Cloud Router che collegano le sottoreti in Google Cloud a quel cluster Kafka, hai bisogno di più controllo su dove posizionare le istanze Dataflow. Puoi utilizzare Dataflow per configurare i parametri di esecuzione network e subnetwork.

Assicurati che la sottorete corrispondente abbia un numero sufficiente di indirizzi IP disponibili su cui Dataflow possa avviare istanze quando tenta di eseguire il ridimensionamento. Inoltre, quando crei una rete separata per l'avvio delle istanze Dataflow, assicurati di avere una regola firewall che consenta il traffico TCP tra tutte le macchine virtuali del progetto. La rete predefinita ha già questa regola firewall configurata.

Spazio degli indirizzi IP pubblici

Questa architettura utilizza Transport Layer Security (TLS) per proteggere il traffico tra i client esterni e Kafka e utilizza il traffico non criptato per la comunicazione tra broker. Quando l'ascoltatore Kafka si associa a un'interfaccia di rete utilizzata sia per la comunicazione interna che esterna, la configurazione dell'ascoltatore è semplice. Tuttavia, in molti scenari, gli indirizzi pubblicizzati esternamente degli broker Kafka nel cluster sono diversi dalle interfacce di rete interne utilizzate da Kafka. In questi casi, puoi utilizzare la proprietà advertised.listeners:

# Configure protocol map
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SSL
# Use plaintext for inter-broker communication inter.broker.listener.name=INTERNAL
# Specify that Kafka listeners should bind to all local interfaces listeners=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093
# Separately, specify externally visible address advertised.listeners=INTERNAL://kafkabroker-n.mydomain.com:9092,EXTERNAL://kafkabroker-n.mydomain.com:9093

I client esterni si connettono utilizzando la porta 9093 tramite un canale "SSL", mentre i client interni si connettono utilizzando la porta 9092 tramite un canale non criptato. Quando specifichi un indirizzo in advertised.listeners, utilizza nomi DNS (kafkabroker-n.mydomain.com in questo esempio) che risolvono nella stessa istanza sia per il traffico esterno che per quello interno. L'utilizzo di indirizzi IP pubblici potrebbe non funzionare perché potrebbero non essere risolti per il traffico interno.

Impostazione delle autorizzazioni IAM

I job Dataflow utilizzano due account di servizio IAM:

  • Il servizio Dataflow utilizza un account di servizio Dataflow per manipolare le risorse Google Cloud, ad esempio creare VM.
  • Le VM worker di Dataflow utilizzano un account di servizio worker per accedere ai file e ad altre risorse della pipeline. Questo account di servizio necessita dell'accesso in scrittura alla tabella di output BigQuery. Inoltre, deve avere accesso a tutte le altre risorse a cui fa riferimento il job della pipeline.

Assicurati che questi due account di servizio abbiano i ruoli appropriati. Per ulteriori informazioni, consulta Autorizzazioni e sicurezza di Dataflow.

Trasformare i dati per BigQuery

Il modello Da Kafka a BigQuery crea una pipeline che legge gli eventi da uno o più argomenti Kafka e li scrive in una tabella BigQuery. Se vuoi, puoi fornire una funzione definita dall'utente#39;utente (UDF) JavaScript che trasforma i dati sugli eventi prima che vengano scritti in BigQuery.

L'output della pipeline deve essere costituito da dati in formato JSON che corrispondano allo schema della tabella di output. Se i dati sugli eventi Kafka sono già in formato JSON, puoi creare una tabella BigQuery con uno schema corrispondente e passare gli eventi direttamente a BigQuery. In caso contrario, crea una UDF che prenda come input i dati sugli eventi e restituisca i dati JSON corrispondenti alla tabella BigQuery.

Ad esempio, supponiamo che i dati sugli eventi contengano due campi:

  • name (stringa)
  • customer_id (numero intero)

L'output della pipeline Dataflow potrebbe essere simile al seguente:

{ "name": "Alice", "customer_id": 1234 }

Supponendo che i dati sugli eventi non siano già in formato JSON, devi scrivere una UDF che li trasformi, come segue:

// UDF
function process(eventData) {
  var name;
  var customer_id;

  // TODO Parse the event data to extract the name and customer_id fields.

  // Return a JSON payload.
  return JSON.stringify({ name: name, customer_id: customer_id });
}

La UDF può eseguire un'elaborazione aggiuntiva sui dati sugli eventi, ad esempio filtrare gli eventi, rimuovere le informazioni che consentono l'identificazione personale (PII) o arricchire i dati con campi aggiuntivi.

Per ulteriori informazioni sulla scrittura di una UDF per il modello, consulta Estendere il modello Dataflow con le UDF. Carica il file JavaScript su Cloud Storage.

Crea la tabella di output BigQuery

Crea la tabella di output BigQuery prima di eseguire il modello. Lo schema della tabella deve essere compatibile con l'output JSON della pipeline. Per ogni proprietà nel payload JSON, la pipeline scrive il valore nella colonna della tabella BigQuery con lo stesso nome. Eventuali proprietà mancanti nel JSON vengono interpretate come valori NULL.

Utilizzando l'esempio precedente, la tabella BigQuery avrebbe le seguenti colonne:

Nome colonna Tipo di dati
name STRING
customer_id INTEGER

Puoi utilizzare l'istruzione SQL CREATE TABLE per creare la tabella:

CREATE TABLE my_dataset.kafka_events (name STRING, customer_id INTEGER);

In alternativa, puoi specificare lo schema della tabella utilizzando un file di definizione JSON. Per ulteriori informazioni, consulta Specificare uno schema nella documentazione di BigQuery.

Esegui il job Dataflow

Dopo aver creato la tabella BigQuery, esegui il modello Dataflow.

Console

Per creare il job Dataflow utilizzando la console Google Cloud, segui questi passaggi:

  1. Vai alla pagina Dataflow nella console Google Cloud.
  2. Fai clic su Crea job da modello.
  3. Nel campo Nome job, inserisci un nome per il job.
  4. In Endpoint a livello di regione, seleziona una regione.
  5. Seleziona il modello "Da Kafka a BigQuery".
  6. In Parametri obbligatori, inserisci il nome della tabella di output BigQuery. La tabella deve già esistere e avere un schema valido.
  7. Fai clic su Mostra parametri facoltativi e inserisci i valori per almeno i seguenti parametri:

    • L'argomento Kafka da cui leggere l'input.
    • L'elenco dei server bootstrap Kafka, separati da virgole.
    • Un'email dell'account di servizio.

    Inserisci parametri aggiuntivi, se necessario. In particolare, potresti dover specificare quanto segue:

    • Networking: per utilizzare una rete VPC diversa da quella predefinita, specifica la rete e la subnet.
    • FDU: per utilizzare una funzione definita dall'utente JavaScript, specifica la posizione Cloud Storage dello script e il nome della funzione JavaScript da richiamare.

gcloud

Per creare il job Dataflow utilizzando Google Cloud CLI, esegui il seguente comando:

gcloud dataflow flex-template run JOB_NAME \
--template-file-gcs-location gs://dataflow-templates/latest/flex/Kafka_to_BigQuery \
--region LOCATION \
--parameters inputTopics=KAFKA_TOPICS \
--parameters bootstrapServers=BOOTSTRAP_SERVERS \
--parameters outputTableSpec=OUTPUT_TABLE \
--parameters serviceAccount=IAM_SERVICE_ACCOUNT \
--parameters javascriptTextTransformGcsPath=UDF_SCRIPT_PATH \
--parameters javascriptTextTransformFunctionName=UDF_FUNCTION_NAME \
--network VPC_NETWORK_NAME \
--subnetwork SUBNET_NAME

Sostituisci le seguenti variabili:

  • JOB_NAME. Un nome job a tua scelta.
  • LOCATION. La regione in cui eseguire il job. Per ulteriori informazioni su regioni e località, consulta Località di Dataflow.
  • KAFKA_TOPICS. Un elenco con valori separati da virgole degli argomenti Kafka da leggere.
  • BOOTSTRAP_SERVERS. Un elenco di server bootstrap Kafka separati da virgole. Esempio: 127:9092,127.0.0.1:9093.
  • OUTPUT_TABLE. La tabella di output BigQuery, specificata come PROJECT_ID:DATASET_NAME.TABLE_NAME. Esempio: my_project:dataset1.table1.
  • IAM_SERVICE_ACCOUNT. Facoltativo. L'indirizzo email dell'account di servizio da utilizzare per eseguire il job.
  • UDF_SCRIPT_PATH. Facoltativo. Il percorso Cloud Storage di un file JavaScript contenente una funzione definita dall'utente. Esempio: gs://your-bucket/your-function.js.
  • UDF_FUNCTION_NAME. Facoltativo. Il nome della funzione JavaScript da chiamare come FDU.
  • VPC_NETWORK_NAME. Facoltativo. La rete a cui verranno assegnati i worker.
  • SUBNET_NAME. Facoltativo. La subnet a cui verranno assegnati i worker.

Tipi di dati

Questa sezione descrive come gestire vari tipi di dati nello schema della tabella BigQuery.

All'interno, i messaggi JSON vengono convertiti in oggetti TableRow e i valori dei campi TableRow vengono tradotti in tipi BigQuery.

Tipi scalari

L'esempio seguente crea una tabella BigQuery con diversi tipi di dati scalari, tra cui stringa, numerico, booleano, data/ora, intervallo e geografico:

CREATE TABLE  my_dataset.kafka_events (
    string_col STRING,
    integer_col INT64,
    float_col FLOAT64,
    decimal_col DECIMAL,
    bool_col BOOL,
    date_col DATE,
    dt_col DATETIME,
    ts_col TIMESTAMP,
    interval_col INTERVAL,
    geo_col GEOGRAPHY
);

Ecco un payload JSON con campi compatibili:

{
  "string_col": "string_val",
  "integer_col": 10,
  "float_col": 3.142,
  "decimal_col": 5.2E11,
  "bool_col": true,
  "date_col": "2022-07-01",
  "dt_col": "2022-07-01 12:00:00.00",
  "ts_col": "2022-07-01T12:00:00.00Z",
  "interval_col": "0-13 370 48:61:61",
  "geo_col": "POINT(1 2)"
}

Note:

  • Per una colonna TIMESTAMP, puoi utilizzare il metodo Date.toJSON di JavaScript per formattare il valore.
  • Per la colonna GEOGRAPHY, puoi specificare la geografia utilizzando il formato WKT (Well-Known Text) o GeoJSON, formattato come stringa. Per ulteriori informazioni, consulta Caricare i dati geospaziali.

Per ulteriori informazioni sui tipi di dati in BigQuery, consulta Tipi di dati.

Array

Puoi archiviare un array in BigQuery utilizzando il tipo di dato ARRAY. Nell'esempio seguente, il payload JSON contiene una proprietà denominata scores il cui valore è un array JSON:

{"name":"Emily","scores":[10,7,10,9]}

L'istruzione SQL CREATE TABLE seguente crea una tabella BigQuery con uno schema compatibile:

CREATE TABLE my_dataset.kafka_events (name STRING, scores ARRAY<INTEGER>);

La tabella risultante sarà simile alla seguente:

+-------+-------------+
| name  |   scores    |
+-------+-------------+
| Emily | [10,7,10,9] |
+-------+-------------+

Strutture

Il tipo di dati STRUCT in BigQuery contiene un elenco ordinato di campi denominati. Puoi utilizzare un STRUCT per contenere oggetti JSON che seguono uno schema coerente.

Nell'esempio seguente, il payload JSON contiene una proprietà denominata val il cui valore è un oggetto JSON:

{"name":"Emily","val":{"a":"yes","b":"no"}}

L'istruzione SQL CREATE TABLE seguente crea una tabella BigQuery con uno schema compatibile:

CREATE TABLE my_dataset.kafka_events (name STRING, val STRUCT<a STRING, b STRING>);

La tabella risultante sarà simile alla seguente:

+-------+----------------------+
| name  |         val          |
+-------+----------------------+
| Emily | {"a":"yes","b":"no"} |
+-------+----------------------+

Dati sugli eventi semistrutturati

Se i dati sugli eventi Kafka non seguono uno schema rigoroso, ti consigliamo di archiviarli in BigQuery come tipo di dati JSON (anteprima). Se archivi i dati JSON come tipo di dati JSON, non è necessario definire lo schema evento in anticipo. Dopo l'importazione dati, puoi eseguire query sulla tabella di output utilizzando gli operatori di accesso ai campi (notazione a punti) e di accesso agli array.

Innanzitutto, crea una tabella con una colonna JSON:

-- Create the BigQuery table
CREATE TABLE my_dataset.kafka_events (event_data JSON);

Quindi, definisci una UDF JavaScript che inserisca il payload dell'evento all'interno di un oggetto JSON:

// UDF
function process(eventData) {
  var json;

  // TODO Convert the event data to JSON.

  return JSON.stringify({ "event_data": json });
}

Dopo aver scritto i dati in BigQuery, puoi eseguire query sui singoli campi utilizzando l'operatore di accesso ai campi. Ad esempio, la seguente query restituisce il valore del campo name per ogni record:

SELECT event_data.name FROM my_dataset1.kafka_events;

Per ulteriori informazioni sull'utilizzo di JSON in BigQuery, consulta Utilizzare i dati JSON in Google Standard SQL.

Errori e log

Potresti riscontrare errori durante l'esecuzione della pipeline o durante la gestione di singoli eventi Kafka.

Per ulteriori informazioni sulla gestione degli errori della pipeline, consulta Risoluzione dei problemi e debug della pipeline.

Se il job viene eseguito correttamente, ma si verifica un errore durante l'elaborazione di un singolo evento Kafka, il job della pipeline scrive un record di errore in una tabella in BigQuery. Il job stesso non ha esito negativo e l'errore a livello di evento non viene visualizzato come errore nel log del job Dataflow.

Il job della pipeline crea automaticamente la tabella per contenere i record di errore. Per impostazione predefinita, il nome della tabella è "tabella_di_output_record_errori", dove tabella_di_output è il nome della tabella di output. Ad esempio, se la tabella di output è denominata kafka_events, la tabella degli errori è denominata kafka_events_error_records. Puoi specificare un nome diverso impostando il parametro del modello outputDeadletterTable:

outputDeadletterTable=my_project:dataset1.errors_table

I possibili errori includono:

  • Errori di serializzazione, inclusi JSON con formattazione errata.
  • Errori di conversione del tipo, causati da una mancata corrispondenza nello schema della tabella e nei dati JSON.
  • Campi aggiuntivi nei dati JSON che non sono presenti nello schema della tabella.

Esempi di messaggi di errore:

Tipo di errore Dati sull'evento errorMessage
Errore di serializzazione "Hello world" Impossibile serializzare il JSON nella riga della tabella: "Hello world"
Errore di conversione del tipo {"name":"Emily","customer_id":"abc"} { "errors" : [ { "debugInfo" : "", "location" : "age", "message" : "Cannot convert value to integer (bad value): abc", "reason" : "invalid" } ], "index" : 0 }
Campo sconosciuto {"name":"Zoe","age":34} { "errors" : [ { "debugInfo" : "", "location" : "age", "message" : "no such field: customer_id.", "reason" : "invalid" } ], "index" : 0 }

Passaggi successivi