Questo documento fornisce indicazioni di alto livello sulla creazione e il deployment di una Pipeline Dataflow con flussi di dati da Apache Kafka a BigQuery.
Apache Kafka è un software open source di streaming di eventi. Kafka viene utilizzato di frequente nelle architetture distribuite per abilitare la comunicazione tra componenti accoppiati in modo lasco. Puoi utilizzare Dataflow per leggere gli eventi da Kafka, elaborarli e scrivere i risultati in una tabella BigQuery per ulteriori analisi.
Google offre Modello Dataflow che configura una pipeline Kafka-to-BigQuery. Il modello utilizza il connettore BigQueryIO fornito nell'SDK Apache Beam.
Per utilizzare questo modello, segui questi passaggi:
- Esegui il deployment di Kafka in Google Cloud o altrove.
- Configura il networking.
- Imposta le autorizzazioni IAM (Identity and Access Management).
- Scrivi una funzione per trasformare i dati sugli eventi.
- Creare la tabella di output BigQuery.
- Eseguire il deployment del modello Dataflow.
Esegui il deployment di Kafka
In Google Cloud, puoi eseguire il deployment di un cluster Kafka sulle istanze di macchine virtuali (VM) Compute Engine o utilizzare un servizio Kafka gestito di terze parti. Per approfondire le opzioni di deployment su Google Cloud, consulta Che cos'è Apache Kafka?. Puoi trovare informazioni Le soluzioni Kafka sul Google Cloud Marketplace.
In alternativa, potresti avere un cluster Kafka esistente che risiede all'esterno di in Google Cloud. Ad esempio, potresti avere un carico di lavoro esistente on-premise o in un altro cloud pubblico.
Configurazione del networking
Per impostazione predefinita, Dataflow avvia le istanze all'interno una rete VPC (Virtual Private Cloud). A seconda della configurazione di Kafka, potrebbe essere necessario configurare una rete e una sottorete diverse per Dataflow. Per saperne di più, consulta la sezione Specificare una rete e una sottorete. Quando configuri la rete, crea regole firewall che consentano alle macchine worker di Dataflow di raggiungere i broker Kafka.
Se utilizzi Controlli di servizio VPC, posiziona il cluster Kafka all'interno del perimetro di Controlli di servizio VPC oppure estendi i perimetri alla VPN o all'interconnessione Cloud autorizzata.
Se il cluster Kafka è dipiegato all'esterno di Google Cloud, devi creare una connessione di rete tra Dataflow e il cluster Kafka. Esistono diverse opzioni di networking con diversi compromessi:
- Esegui la connessione utilizzando uno spazio di indirizzi RFC 1918 condiviso, utilizzando una delle seguenti opzioni:
- Raggiungi il tuo cluster Kafka ospitato esternamente tramite indirizzi IP pubblici utilizzando uno dei seguenti metodi:
- Rete internet pubblica
- Peering diretto
- Peering con operatori
Dedicated Interconnect è l'opzione migliore per prestazioni e affidabilità, ma la configurazione può richiedere più tempo le parti interessate devono fornire i nuovi circuiti. Con una topologia basata su IP pubblici, possono iniziare rapidamente perché il lavoro di networking deve essere ridotto.
Le due sezioni seguenti descrivono queste opzioni in maggiore dettaglio.
Spazio di indirizzi RFC 1918 condiviso
Sia l'interconnessione dedicata che la VPN IPsec ti consentono di accedere direttamente agli indirizzi IP RFC 1918 nel tuo Virtual Private Cloud (VPC), il che può semplificare la configurazione di Kafka. Se utilizzi una topologia basata su VPN, ti consigliamo di configurare una VPN a velocità effettiva elevata.
Per impostazione predefinita, Dataflow avvia le istanze sul tuo
Rete VPC. In una topologia di rete privata con
route definite esplicitamente nel router Cloud
che collegano le subnet in Google Cloud a quel cluster Kafka,
con maggiore controllo sulla posizione
delle istanze Dataflow. Tu
puoi usare Dataflow per configurare network
e subnetwork
parametri di esecuzione.
Assicurati che la sottorete corrispondente abbia un numero sufficiente di indirizzi IP disponibili su cui Dataflow possa avviare istanze quando tenta di eseguire il ridimensionamento. Inoltre, anche quando crei una rete separata per avviare Istanze Dataflow, assicurati di avere una regola firewall che abilita il traffico TCP tra tutte le macchine virtuali nel progetto. Il valore predefinito in questa rete è già configurata questa regola firewall.
Spazio di indirizzi IP pubblici
Questa architettura utilizza Transport Layer Security (TLS) per proteggere il traffico tra i client esterni e Kafka e utilizza il traffico non criptato per la comunicazione tra broker. Quando l'ascoltatore Kafka si associa a un'interfaccia di rete utilizzata sia per la comunicazione interna che esterna, la configurazione dell'ascoltatore è semplice. Tuttavia, in molti scenari, gli indirizzi
annunciati esternamente
dei broker Kafka nel cluster sono diversi dalle interfacce di rete interne
usate da Kafka. In questi casi, puoi utilizzare la proprietà advertised.listeners
:
# Configure protocol map listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SSL
# Use plaintext for inter-broker communication inter.broker.listener.name=INTERNAL
# Specify that Kafka listeners should bind to all local interfaces listeners=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093
# Separately, specify externally visible address advertised.listeners=INTERNAL://kafkabroker-n.mydomain.com:9092,EXTERNAL://kafkabroker-n.mydomain.com:9093
I client esterni si connettono utilizzando la porta 9093 tramite un protocollo "SSL" canale e interno
I client si connettono utilizzando la porta 9092 tramite un canale di testo non crittografato. Quando specifichi un indirizzo in advertised.listeners
, utilizza nomi DNS (kafkabroker-n.mydomain.com
in questo esempio) che risolvono nella stessa istanza sia per il traffico esterno che per quello interno. L'utilizzo di indirizzi IP pubblici potrebbe non funzionare
perché potrebbero non essere risolti per il traffico interno.
Impostazione delle autorizzazioni IAM
I job Dataflow utilizzano due account di servizio IAM:
- Il servizio Dataflow utilizza un Account di servizio Dataflow per manipolare le risorse Google Cloud, ad esempio creando VM.
- Le VM worker di Dataflow utilizzano un account di servizio worker per accedere ai file e ad altre risorse della pipeline. Questo account di servizio necessita dell'accesso in scrittura alla tabella di output BigQuery. Occorre inoltre accesso a qualsiasi altra risorsa a cui fa riferimento il job della pipeline.
Assicurati che questi due account di servizio abbiano i ruoli appropriati. Per ulteriori informazioni, consulta Autorizzazioni e sicurezza di Dataflow.
Trasformare i dati per BigQuery
Il modello Kafka-to-BigQuery crea una pipeline che legge eventi da uno o più argomenti Kafka e li scrive in un Tabella BigQuery. Se vuoi, puoi fornire una funzione definita dall'utente (UDF) JavaScript che trasforma i dati sugli eventi prima che vengano scritti in BigQuery.
L'output della pipeline deve essere costituito da dati in formato JSON corrispondenti allo schema della tabella di output. Se i dati degli eventi Kafka sono già in formato JSON, puoi creare una tabella BigQuery con uno schema corrispondente e passare direttamente in BigQuery. In caso contrario, crea una funzione definita dall'utente che prenda i dati sugli eventi come input e restituisce i dati JSON che corrispondono Tabella BigQuery.
Ad esempio, supponiamo che i dati sugli eventi contengano due campi:
name
(stringa)customer_id
(numero intero)
L'output della pipeline Dataflow potrebbe essere simile al seguente:
{ "name": "Alice", "customer_id": 1234 }
Supponendo che i dati sugli eventi non siano già in formato JSON, devi scrivere una UDF che li trasformi, come segue:
// UDF
function process(eventData) {
var name;
var customer_id;
// TODO Parse the event data to extract the name and customer_id fields.
// Return a JSON payload.
return JSON.stringify({ name: name, customer_id: customer_id });
}
La UDF può eseguire un'elaborazione aggiuntiva sui dati sugli eventi, ad esempio filtrare gli eventi, rimuovere le informazioni che consentono l'identificazione personale (PII) o arricchire i dati con campi aggiuntivi.
Per ulteriori informazioni sulla scrittura di una UDF per il modello, consulta Estendere il modello Dataflow con le UDF. Carica il file JavaScript su Cloud Storage.
crea la tabella di output BigQuery
Crea la tabella di output BigQuery prima di eseguire il modello. Lo schema della tabella deve essere compatibile con l'output JSON della pipeline. Per ogni proprietà nel payload JSON, la pipeline scrive il valore nella colonna della tabella BigQuery dello stesso nome. Qualsiasi proprietà mancante nel codice JSON vengono interpretati come valori NULL.
Utilizzando l'esempio precedente, la tabella BigQuery avrebbe le seguenti colonne:
Nome colonna | Tipo di dati |
---|---|
name |
STRING |
customer_id |
INTEGER |
Puoi utilizzare lo
CREATE TABLE
Istruzione SQL per creare la tabella:
CREATE TABLE my_dataset.kafka_events (name STRING, customer_id INTEGER);
In alternativa, puoi specificare lo schema della tabella utilizzando un file di definizione JSON. Per ulteriori informazioni, consulta la sezione Specificare uno schema nella sezione documentazione di BigQuery.
esegui il job Dataflow
Dopo aver creato la tabella BigQuery, esegui il modello Dataflow.
Console
Per creare il job Dataflow utilizzando la console Google Cloud, segui questi passaggi:
- Vai alla pagina Dataflow nella console Google Cloud.
- Fai clic su Crea job da modello.
- Nel campo Nome job, inserisci un nome per il job.
- Per Endpoint a livello di regione, seleziona una regione.
- Seleziona "Da Kafka a BigQuery" modello.
- In Parametri obbligatori, inserisci il nome del Tabella di output BigQuery. La tabella deve già esistere e avere uno schema valido.
Fai clic su Mostra parametri facoltativi e inserisci i valori per almeno il seguenti parametri:
- L'argomento Kafka da cui leggere l'input.
- L'elenco dei server bootstrap Kafka, separati da virgole.
- Un'email dell'account di servizio.
Inserisci eventuali parametri aggiuntivi. In particolare, potresti dover specificare quanto segue:
- Rete: per utilizzare una rete VPC diversa da quella predefinita, specifica la rete e la subnet.
- Funzioni definite dall'utente: per utilizzare una funzione JavaScript definita dall'utente, specifica il percorso di Cloud Storage lo script e il nome della funzione JavaScript da richiamare.
gcloud
Per creare il job Dataflow utilizzando Google Cloud CLI, esegui questo comando:
gcloud dataflow flex-template run JOB_NAME \ --template-file-gcs-location gs://dataflow-templates/latest/flex/Kafka_to_BigQuery \ --region LOCATION \ --parameters inputTopics=KAFKA_TOPICS \ --parameters bootstrapServers=BOOTSTRAP_SERVERS \ --parameters outputTableSpec=OUTPUT_TABLE \ --parameters serviceAccount=IAM_SERVICE_ACCOUNT \ --parameters javascriptTextTransformGcsPath=UDF_SCRIPT_PATH \ --parameters javascriptTextTransformFunctionName=UDF_FUNCTION_NAME \ --network VPC_NETWORK_NAME \ --subnetwork SUBNET_NAME
Sostituisci le seguenti variabili:
- JOB_NAME. Un nome job a tua scelta.
- LOCATION. La regione in cui eseguire il job. Per ulteriori informazioni su regioni e località, consulta Località di Dataflow.
- KAFKA_TOPICS. Un elenco separato da virgole di argomenti Kafka da leggere.
- BOOTSTRAP_SERVERS. Un elenco di server bootstrap Kafka separati da virgole. Esempio:
127:9092,127.0.0.1:9093
. - OUTPUT_TABLE. La tabella BigQuery di output,
specificato come PROJECT_ID:DATASET_NAME.TABLE_NAME.
Esempio:
my_project:dataset1.table1
. - IAM_SERVICE_ACCOUNT. Facoltativo. L'indirizzo email del servizio di esecuzione del job.
- UDF_SCRIPT_PATH. Facoltativo. Il percorso Cloud Storage di un
file JavaScript contenente una funzione definita dall'utente. Esempio:
gs://your-bucket/your-function.js
. - UDF_FUNCTION_NAME. Facoltativo. Il nome della funzione JavaScript da chiamare come funzione definita dall'utente.
- VPC_NETWORK_NAME. Facoltativo. La rete a cui verranno assegnati i worker.
- SUBNET_NAME. Facoltativa. La subnet a cui verranno assegnati i worker.
Tipi di dati
Questa sezione descrive come gestire i vari tipi di dati in: Schema della tabella BigQuery.
Internamente, i messaggi JSON vengono convertiti in oggetti TableRow
,
e i valori del campo TableRow
vengono convertiti in tipi BigQuery.
Tipi scalari
L'esempio seguente crea una tabella BigQuery con diversi tipi di dati scalari, tra cui stringa, numerico, booleano, data/ora, intervallo e geografico:
CREATE TABLE my_dataset.kafka_events (
string_col STRING,
integer_col INT64,
float_col FLOAT64,
decimal_col DECIMAL,
bool_col BOOL,
date_col DATE,
dt_col DATETIME,
ts_col TIMESTAMP,
interval_col INTERVAL,
geo_col GEOGRAPHY
);
Ecco un payload JSON con campi compatibili:
{
"string_col": "string_val",
"integer_col": 10,
"float_col": 3.142,
"decimal_col": 5.2E11,
"bool_col": true,
"date_col": "2022-07-01",
"dt_col": "2022-07-01 12:00:00.00",
"ts_col": "2022-07-01T12:00:00.00Z",
"interval_col": "0-13 370 48:61:61",
"geo_col": "POINT(1 2)"
}
Note:
- Per una colonna
TIMESTAMP
, puoi utilizzare il metodoDate.toJSON
JavaScript per formattare il valore. - Per la colonna
GEOGRAPHY
, puoi specificare l'area geografica utilizzando i campi testo (WKT) o GeoJSON, formattato come stringa. Per ulteriori informazioni, vedi Caricamento di dati geospaziali.
Per ulteriori informazioni sui tipi di dati in BigQuery, consulta Tipi di dati.
Array
Puoi archiviare un array in BigQuery utilizzando
Dati di ARRAY
di testo. Nell'esempio seguente, il payload JSON contiene una proprietà denominata
scores
il cui valore è un array JSON:
{"name":"Emily","scores":[10,7,10,9]}
L'istruzione SQL CREATE TABLE
seguente crea una tabella BigQuery con uno schema compatibile:
CREATE TABLE my_dataset.kafka_events (name STRING, scores ARRAY<INTEGER>);
La tabella risultante ha il seguente aspetto:
+-------+-------------+
| name | scores |
+-------+-------------+
| Emily | [10,7,10,9] |
+-------+-------------+
Strutture
Il tipo di dati STRUCT
in
BigQuery contiene un elenco ordinato di campi denominati. Puoi utilizzare uno dei seguenti
STRUCT
per contenere oggetti JSON che seguono uno schema coerente.
Nell'esempio seguente, il payload JSON contiene una proprietà denominata val
il cui valore è un oggetto JSON:
{"name":"Emily","val":{"a":"yes","b":"no"}}
L'istruzione SQL CREATE TABLE
seguente crea una tabella BigQuery con uno schema compatibile:
CREATE TABLE my_dataset.kafka_events (name STRING, val STRUCT<a STRING, b STRING>);
La tabella risultante ha il seguente aspetto:
+-------+----------------------+
| name | val |
+-------+----------------------+
| Emily | {"a":"yes","b":"no"} |
+-------+----------------------+
Dati sugli eventi semistrutturati
Se i dati degli eventi Kafka non seguono uno schema rigido, valuta la possibilità di archiviarli in
BigQuery come
Tipo di dati JSON
(anteprima). Se archivi i dati JSON come tipo di dati JSON
, non è necessario definire lo schema evento in anticipo. Dopo l'importazione dati, puoi eseguire query sulla tabella di output
mediante gli operatori di accesso ai campi (notazione punti) e agli array.
Innanzitutto, crea una tabella con una colonna JSON
:
-- Create the BigQuery table
CREATE TABLE my_dataset.kafka_events (event_data JSON);
Definisci quindi una funzione JavaScript definita dall'utente che aggrega il payload dell'evento all'interno di un oggetto JSON:
// UDF
function process(eventData) {
var json;
// TODO Convert the event data to JSON.
return JSON.stringify({ "event_data": json });
}
Dopo aver scritto i dati in BigQuery, puoi eseguire query sui singoli campi utilizzando l'operatore di accesso ai campi. Ad esempio,
query restituisce il valore del campo name
per ogni record:
SELECT event_data.name FROM my_dataset1.kafka_events;
Per ulteriori informazioni sull'utilizzo di JSON in BigQuery, vedi Utilizzo dei dati JSON in SQL standard di Google.
Errori e log
Potresti riscontrare errori durante l'esecuzione della pipeline o durante la gestione di singoli eventi Kafka.
Per ulteriori informazioni sulla gestione degli errori della pipeline, consulta Risoluzione dei problemi e debug della pipeline.
Se il job viene eseguito correttamente, ma si verifica un errore durante l'elaborazione di una singola Kafka, il job della pipeline scrive un record di errore in una tabella in BigQuery. Il job stesso non ha esito negativo e l'errore a livello di evento non viene visualizzato come errore nel log del job Dataflow.
Il job della pipeline crea automaticamente la tabella in cui inserire i record di errore. Di
per impostazione predefinita, il nome della tabella è "output_table_error_records", dove
output_table è il nome della tabella di output. Ad esempio, se la tabella di output
è denominato kafka_events
, la tabella degli errori è denominata kafka_events_error_records
.
Puoi specificare un nome diverso impostando il modello outputDeadletterTable
:
outputDeadletterTable=my_project:dataset1.errors_table
I possibili errori includono:
- Errori di serializzazione, inclusi JSON con formattazione errata.
- Errori di conversione del tipo, causati da una mancata corrispondenza nello schema della tabella e nei dati JSON.
- Campi aggiuntivi nei dati JSON che non sono presenti nello schema della tabella.
Esempi di messaggi di errore:
Tipo di errore | Dati sull'evento | errorMessage |
---|---|---|
Errore di serializzazione | "Hello World" | Impossibile serializzare il json nella riga della tabella: "Hello World" |
Tipo di errore di conversione | {"name":"Emily","customer_id":"abc"} | { "errors" : [ { "debugInfo" : "", "location" : "age", "message" : "Cannot convert value to integer (bad value): abc", "reason" : "invalid" } ], "index" : 0 } |
Campo sconosciuto | {"name":"Zoe","age":34} | { "errori" : [ { "debugInfo" : "", "posizione" : "età", "messaggio" : "campo non presente: customer_id.", "motivo" : "non valido" } ], "index" : 0 } |
Passaggi successivi
- Scopri di più sui modelli Dataflow.
- Inizia a utilizzare BigQuery.