Scrivi dati da Kafka a BigQuery con Dataflow

Questo documento fornisce indicazioni generali sulla creazione e sul deployment di una pipeline Dataflow che trasmette in flussi da Apache Kafka a BigQuery.

Apache Kafka è una piattaforma open source per lo streaming di eventi. Kafka è usato comunemente nelle architetture distribuite per consentire la comunicazione tra componenti a basso accoppiamento. Puoi utilizzare Dataflow per leggere gli eventi da Kafka, elaborarli e scrivere i risultati in una tabella BigQuery per ulteriori analisi.

Lettura degli eventi Kafka in BigQuery

Google fornisce un modello Dataflow che configura una pipeline Kafka-to-BigQuery. Il modello utilizza il connettore BigQueryIO fornito nell'SDK Apache Beam.

Per utilizzare questo modello, segui questi passaggi:

  1. Esegui il deployment di Kafka, in Google Cloud o altrove.
  2. Configurare il networking.
  3. Impostare le autorizzazioni di Identity and Access Management (IAM).
  4. Scrivi una funzione per trasformare i dati degli eventi.
  5. Crea la tabella di output BigQuery.
  6. Esegui il deployment del modello Dataflow.

Esegui il deployment di Kafka

In Google Cloud, puoi eseguire il deployment di un cluster Kafka su istanze di macchine virtuali (VM) Compute Engine o utilizzare un servizio Kafka gestito di terze parti. Per ulteriori informazioni sulle opzioni di deployment in Google Cloud, consulta Che cos'è Apache Kafka?. Puoi trovare soluzioni Kafka di terze parti su Google Cloud Marketplace.

In alternativa, potresti avere un cluster Kafka esistente che risiede al di fuori di Google Cloud. Ad esempio, potresti avere un carico di lavoro esistente di cui è stato eseguito il deployment on-premise o in un altro cloud pubblico.

Configurazione del networking

Per impostazione predefinita, Dataflow avvia le istanze all'interno della rete VPC (Virtual Private Cloud) predefinita. A seconda della configurazione di Kafka, potrebbe essere necessario configurare una rete e una subnet diverse per Dataflow. Per ulteriori informazioni, consulta Specificare una rete e una subnet nella documentazione di Dataflow. Quando configuri la rete, crea regole firewall che consentono alle macchine worker Dataflow di raggiungere i broker Kafka.

Se utilizzi Controlli di servizio VPC, posiziona il cluster Kafka all'interno del perimetro dei Controlli di servizio VPC oppure estende i perimetri alla VPN o a Cloud Interconnect autorizzata.

Connettiti a un cluster esterno

Se il deployment del cluster Kafka viene eseguito al di fuori di Google Cloud, devi creare una connessione di rete tra Dataflow e il cluster Kafka. Esistono diverse opzioni di networking con compromessi diversi:

Dedicated Interconnect è l'opzione migliore per offrire prestazioni e affidabilità prevedibili, ma la configurazione può richiedere più tempo perché le terze parti devono eseguire il provisioning dei nuovi circuiti. Con una topologia basata su IP pubblici, puoi iniziare rapidamente, perché il lavoro di networking è ridotto.

Le due sezioni successive descrivono queste opzioni in modo più dettagliato.

Spazio di indirizzi RFC 1918 condiviso

Sia Dedicated Interconnect che VPN IPsec ti offrono accesso diretto agli indirizzi IP RFC 1918 nel tuo VPC (Virtual Private Cloud), semplificando la configurazione di Kafka. Se utilizzi una topologia basata su VPN, valuta la possibilità di configurare una VPN ad alta velocità effettiva.

Per impostazione predefinita, Dataflow avvia le istanze sulla rete VPC predefinita. In una topologia di rete privata con route definite esplicitamente nel router Cloud che connettono le subnet in Google Cloud al cluster Kafka, hai bisogno di un maggiore controllo su dove posizionare le istanze Dataflow. Puoi utilizzare Dataflow per configurare i parametri di esecuzione network e subnetwork.

Assicurati che la subnet corrispondente disponga di un numero sufficiente di indirizzi IP per consentire a Dataflow di avviare le istanze durante il tentativo di scale out. Inoltre, quando crei una rete separata per avviare le tue istanze Dataflow, assicurati di avere una regola firewall che abiliti il traffico TCP tra tutte le macchine virtuali del progetto. Questa regola firewall è già configurata per la rete predefinita.

Spazio di indirizzi IP pubblici

Questa architettura utilizza Transport Layer Security (TLS) per proteggere il traffico tra client esterni e Kafka e utilizza testo non crittografato per la comunicazione tra broker. Quando il listener Kafka si associa a un'interfaccia di rete utilizzata per le comunicazioni interne ed esterne, la configurazione del listener è semplice. Tuttavia, in molti scenari, gli indirizzi pubblicizzati esternamente dei broker Kafka nel cluster sono diversi dalle interfacce di rete interne utilizzate da Kafka. In questi casi, puoi utilizzare la proprietà advertised.listeners:

# Configure protocol map
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SSL
# Use plaintext for inter-broker communication inter.broker.listener.name=INTERNAL
# Specify that Kafka listeners should bind to all local interfaces listeners=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093
# Separately, specify externally visible address advertised.listeners=INTERNAL://kafkabroker-n.mydomain.com:9092,EXTERNAL://kafkabroker-n.mydomain.com:9093

I client esterni si connettono utilizzando la porta 9093 tramite un canale "SSL", mentre i client interni si connettono tramite la porta 9092 tramite un canale con testo non crittografato. Quando specifichi un indirizzo in advertised.listeners, utilizza i nomi DNS (kafkabroker-n.mydomain.com, in questo esempio) che si risolvono nella stessa istanza sia per il traffico esterno sia per quello interno. Gli indirizzi IP pubblici potrebbero non funzionare perché potrebbero non riuscire a risolvere il problema del traffico interno.

Impostazione delle autorizzazioni IAM

I job Dataflow utilizzano due account di servizio IAM:

  • Il servizio Dataflow utilizza un account di servizio Dataflow per manipolare le risorse Google Cloud, ad esempio per creare VM.
  • Le VM worker Dataflow utilizzano un account di servizio worker per accedere ai file e ad altre risorse della pipeline. Questo account di servizio richiede l'accesso in scrittura alla tabella di output di BigQuery. Occorre inoltre l'accesso a qualsiasi altra risorsa a cui fa riferimento il job della pipeline.

Assicurati che questi due account di servizio abbiano ruoli appropriati. Per ulteriori informazioni, consulta Sicurezza e autorizzazioni di Dataflow.

Trasforma i dati per BigQuery

Il modello Kafka-to-BigQuery crea una pipeline che legge gli eventi da uno o più argomenti Kafka e li scrive in una tabella BigQuery. Facoltativamente, puoi fornire una funzione JavaScript definita dall'utente (UDF) che trasforma i dati degli eventi prima che vengano scritti in BigQuery.

L'output della pipeline deve essere costituito da dati in formato JSON corrispondenti allo schema della tabella di output. Se i dati degli eventi Kafka sono già in formato JSON, puoi creare una tabella BigQuery con uno schema corrispondente e passare gli eventi direttamente a BigQuery. In caso contrario, crea una funzione definita dall'utente che prenda i dati dell'evento come input e restituisca dati JSON che corrispondano alla tua tabella BigQuery.

Ad esempio, supponi che i dati sugli eventi contengano due campi:

  • name (stringa)
  • customer_id (numero intero)

L'output della pipeline Dataflow potrebbe essere simile al seguente:

{ "name": "Alice", "customer_id": 1234 }

Supponendo che i dati sugli eventi non siano già in formato JSON, devi scrivere una funzione definita dall'utente che trasforma i dati, come segue:

// UDF
function process(eventData) {
  var name;
  var customer_id;

  // TODO Parse the event data to extract the name and customer_id fields.

  // Return a JSON payload.
  return JSON.stringify({ name: name, customer_id: customer_id });
}

La funzione definita dall'utente può eseguire elaborazioni aggiuntive sui dati sugli eventi, ad esempio filtrare gli eventi, rimuovere le informazioni che consentono l'identificazione personale (PII) o arricchire i dati con campi aggiuntivi.

Per saperne di più sulla scrittura di una funzione definita dall'utente per il modello, consulta Estendere il modello Dataflow con le funzioni definite dall'utente. Carica il file JavaScript in Cloud Storage.

Crea la tabella di output BigQuery

Crea la tabella di output BigQuery prima di eseguire il modello. Lo schema della tabella deve essere compatibile con l'output JSON della pipeline. Per ogni proprietà nel payload JSON, la pipeline scrive il valore nella colonna della tabella BigQuery con lo stesso nome. Eventuali proprietà mancanti nel codice JSON vengono interpretate come valori NULL.

Utilizzando l'esempio precedente, la tabella BigQuery avrà le seguenti colonne:

Nome colonna Tipo di dati
name STRING
customer_id INTEGER

Puoi utilizzare l'istruzione SQL CREATE TABLE per creare la tabella:

CREATE TABLE my_dataset.kafka_events (name STRING, customer_id INTEGER);

In alternativa, puoi specificare lo schema della tabella utilizzando un file di definizione JSON. Per ulteriori informazioni, consulta la sezione Specifica di uno schema nella documentazione di BigQuery.

Esegui il job Dataflow

Dopo aver creato la tabella BigQuery, esegui il modello Dataflow.

Console

Per creare il job Dataflow utilizzando la console Google Cloud, esegui questi passaggi:

  1. Vai alla pagina Dataflow nella console Google Cloud.
  2. Fai clic su Crea job da modello.
  3. Nel campo Nome job, inserisci un nome per il job.
  4. Per Endpoint a livello di regione, seleziona una regione.
  5. Seleziona il modello "Da Kafka a BigQuery".
  6. In Parametri obbligatori, inserisci il nome della tabella di output BigQuery. La tabella deve già esistere e avere uno schema valido.
  7. Fai clic su Mostra parametri facoltativi e inserisci valori per almeno i seguenti parametri:

    • L'argomento Kafka da cui leggere l'input.
    • L'elenco di server di bootstrap Kafka, separati da virgole.
    • L'email di un account di servizio.

    Inserisci eventuali parametri aggiuntivi. In particolare, potresti dover specificare quanto segue:

    • Networking: per utilizzare una rete VPC diversa da quella predefinita, specifica la rete e la subnet.
    • funzione definita dall'utente: per utilizzare una funzione JavaScript definita dall'utente, specifica la località Cloud Storage dello script e il nome della funzione JavaScript da richiamare.

gcloud

Per creare il job Dataflow utilizzando Google Cloud CLI, esegui questo comando:

gcloud dataflow flex-template run JOB_NAME \
--template-file-gcs-location gs://dataflow-templates/latest/flex/Kafka_to_BigQuery \
--region LOCATION \
--parameters inputTopics=KAFKA_TOPICS \
--parameters bootstrapServers=BOOTSTRAP_SERVERS \
--parameters outputTableSpec=OUTPUT_TABLE \
--parameters serviceAccount=IAM_SERVICE_ACCOUNT \
--parameters javascriptTextTransformGcsPath=UDF_SCRIPT_PATH \
--parameters javascriptTextTransformFunctionName=UDF_FUNCTION_NAME \
--network VPC_NETWORK_NAME \
--subnetwork SUBNET_NAME

Sostituisci le seguenti variabili:

  • JOB_NAME. Il nome di un job a tua scelta.
  • LOCATION. La regione in cui eseguire il job. Per ulteriori informazioni su regioni e località, consulta Località Dataflow.
  • KAFKA_TOPICS. Un elenco separato da virgole di argomenti Kafka da leggere.
  • BOOTSTRAP_SERVERS. Un elenco separato da virgole di server di bootstrap Kafka. Esempio: 127:9092,127.0.0.1:9093.
  • OUTPUT_TABLE. Tabella di output BigQuery, specificata come PROJECT_ID:DATASET_NAME.TABLE_NAME. Esempio: my_project:dataset1.table1.
  • IAM_SERVICE_ACCOUNT. Facoltativa. L'indirizzo email dell'account di servizio con cui eseguire il job.
  • UDF_SCRIPT_PATH. Facoltativa. Il percorso Cloud Storage di un file JavaScript che contiene una funzione definita dall'utente. Esempio: gs://your-bucket/your-function.js.
  • UDF_FUNCTION_NAME. Facoltativa. Il nome della funzione JavaScript da chiamare come funzione definita dall'utente.
  • VPC_NETWORK_NAME. Facoltativa. La rete a cui verranno assegnati i worker.
  • SUBNET_NAME. Facoltativa. La subnet a cui verranno assegnati i worker.

Tipi di dati

Questa sezione descrive come gestire i vari tipi di dati nello schema della tabella BigQuery.

Internamente, i messaggi JSON vengono convertiti in oggetti TableRow e i valori del campo TableRow vengono tradotti in tipi BigQuery.

Tipi scalari

L'esempio seguente crea una tabella BigQuery con diversi tipi di dati scalabili, tra cui tipi di dati stringa, numerici, booleani, data/ora, intervallo e area geografica:

CREATE TABLE  my_dataset.kafka_events (
    string_col STRING,
    integer_col INT64,
    float_col FLOAT64,
    decimal_col DECIMAL,
    bool_col BOOL,
    date_col DATE,
    dt_col DATETIME,
    ts_col TIMESTAMP,
    interval_col INTERVAL,
    geo_col GEOGRAPHY
);

Ecco un payload JSON con campi compatibili:

{
  "string_col": "string_val",
  "integer_col": 10,
  "float_col": 3.142,
  "decimal_col": 5.2E11,
  "bool_col": true,
  "date_col": "2022-07-01",
  "dt_col": "2022-07-01 12:00:00.00",
  "ts_col": "2022-07-01T12:00:00.00Z",
  "interval_col": "0-13 370 48:61:61",
  "geo_col": "POINT(1 2)"
}

Note:

  • Per una colonna TIMESTAMP, puoi utilizzare il metodo Date.toJSON JavaScript per formattare il valore.
  • Per la colonna GEOGRAPHY, puoi specificare l'area geografica utilizzando testo noto (WKT) o GeoJSON, formattato come stringa. Per maggiori informazioni, consulta Caricamento dei dati geospaziali.

Per ulteriori informazioni sui tipi di dati in BigQuery, consulta Tipi di dati.

Array

Puoi archiviare un array in BigQuery utilizzando il tipo di dati ARRAY. Nell'esempio seguente, il payload JSON contiene una proprietà denominata scores il cui valore è un array JSON:

{"name":"Emily","scores":[10,7,10,9]}

La seguente istruzione SQL CREATE TABLE crea una tabella BigQuery con uno schema compatibile:

CREATE TABLE my_dataset.kafka_events (name STRING, scores ARRAY<INTEGER>);

La tabella risultante ha il seguente aspetto:

+-------+-------------+
| name  |   scores    |
+-------+-------------+
| Emily | [10,7,10,9] |
+-------+-------------+

Strutture

Il tipo di dati STRUCT in BigQuery contiene un elenco ordinato di campi denominati. Puoi utilizzare un STRUCT per contenere oggetti JSON che seguono uno schema coerente.

Nell'esempio seguente, il payload JSON contiene una proprietà denominata val il cui valore è un oggetto JSON:

{"name":"Emily","val":{"a":"yes","b":"no"}}

La seguente istruzione SQL CREATE TABLE crea una tabella BigQuery con uno schema compatibile:

CREATE TABLE my_dataset.kafka_events (name STRING, val STRUCT<a STRING, b STRING>);

La tabella risultante ha il seguente aspetto:

+-------+----------------------+
| name  |         val          |
+-------+----------------------+
| Emily | {"a":"yes","b":"no"} |
+-------+----------------------+

Dati sugli eventi semistrutturati

Se i dati degli eventi Kafka non seguono uno schema rigido, valuta la possibilità di memorizzarli in BigQuery come tipo di dati JSON (anteprima). Se archivi i dati JSON come tipo di dati JSON, non è necessario definire preventivamente lo schema evento. Dopo l'importazione dati, puoi eseguire query sulla tabella di output utilizzando gli operatori di accesso ai campi (notazione dei punti) e di accesso all'array.

Innanzitutto, crea una tabella con una colonna JSON:

-- Create the BigQuery table
CREATE TABLE my_dataset.kafka_events (event_data JSON);

Quindi definisci una funzione JavaScript definita dall'utente che esegue il wrapping del payload dell'evento all'interno di un oggetto JSON:

// UDF
function process(eventData) {
  var json;

  // TODO Convert the event data to JSON.

  return JSON.stringify({ "event_data": json });
}

Dopo che i dati sono stati scritti in BigQuery, puoi eseguire query sui singoli campi utilizzando l'operatore di accesso ai campi. Ad esempio, la seguente query restituisce il valore del campo name per ogni record:

SELECT event_data.name FROM my_dataset1.kafka_events;

Per saperne di più sull'utilizzo di JSON in BigQuery, consulta Utilizzo dei dati JSON in SQL Standard di Google.

Errori e logging

Potresti riscontrare errori durante l'esecuzione della pipeline o errori durante la gestione di singoli eventi Kafka.

Per ulteriori informazioni sulla gestione degli errori della pipeline, consulta Risoluzione dei problemi e debug di pipeline.

Se il job viene eseguito correttamente, ma si verifica un errore durante l'elaborazione di un singolo evento Kafka, il job di pipeline scrive un record di errore in una tabella in BigQuery. Il job in sé non ha esito negativo e l'errore a livello di evento non viene visualizzato come errore nel log del job Dataflow.

Il job di pipeline crea automaticamente la tabella per contenere i record di errori. Per impostazione predefinita, il nome della tabella è "output_table_error_records", dove output_table è il nome della tabella di output. Ad esempio, se la tabella di output è denominata kafka_events, la tabella degli errori è denominata kafka_events_error_records. Puoi specificare un nome diverso impostando il parametro del modello outputDeadletterTable:

outputDeadletterTable=my_project:dataset1.errors_table

I possibili errori includono:

  • Errori di serializzazione, incluso JSON con formattazione errata.
  • Digita gli errori di conversione, causati da una mancata corrispondenza tra lo schema della tabella e i dati JSON.
  • Campi aggiuntivi nei dati JSON che non sono presenti nello schema della tabella.

Esempi di messaggi di errore:

Tipo di errore Dati sull'evento errorMessage
Errore di serializzazione "Hello World" Impossibile serializzare il file json nella riga della tabella: "Hello world"
Errore di conversione del tipo {"name":"Emily","customer_id":"abc"} { "errors" : [ { "debugInfo" : "", "location" : "age", "message" : "Impossibile convertire il valore in un numero intero (valore errato): abc", "reason" : "invalid" } ], "index" : 0 }
Campo sconosciuto {"name":"Zoe","age":34} { "errors" : [ { "debugInfo" : "", "location" : "age", "message" : "nessun campo simile: customer_id.", "motivo" : "non valido" } ], "indice" : 0 }

Passaggi successivi