Apache Kafka è una piattaforma di flussi di dati open source distribuita per dati in tempo reale di pipeline e integrazione dei dati. Fornisce un sistema di flussi di dati efficiente e scalabile per l'utilizzo in una vasta gamma di applicazioni, tra cui:
- Analisi in tempo reale
- Elaborazione dei flussi
- Aggregazione dei log
- Messaggistica distribuita
- Flusso di eventi
Obiettivi
Installa Kafka su un Cluster ad alta disponibilità di Dataproc con ZooKeeper (indicato in questo tutorial come "cluster Dataproc Kafka").
Creare dati fittizi dei clienti, quindi pubblicarli in un argomento Kafka.
Creare parquet Hive e tabelle ORC in Cloud Storage per ricevere dati sugli argomenti Kafka in flusso.
Invia un job PySpark per la sottoscrizione e il flusso di dati dell'argomento Kafka in Cloud Storage in formato Parquet e ORC.
Eseguire una query sui dati della tabella Hive in flusso per contare i flussi di dati Messaggi Kafka.
Costi
In questo documento vengono utilizzati i seguenti componenti fatturabili di Google Cloud:
Per generare una stima dei costi basata sull'utilizzo previsto,
utilizza il Calcolatore prezzi.
Una volta completate le attività descritte in questo documento, puoi evitare la fatturazione continua eliminando le risorse che hai creato. Per ulteriori informazioni, consulta la pagina Pulizia.
Prima di iniziare
Se non l'hai ancora fatto, crea un progetto Google Cloud.
- Accedi al tuo account Google Cloud. Se non conosci Google Cloud, crea un account per valutare le prestazioni dei nostri prodotti in scenari reali. I nuovi clienti ricevono anche 300 $di crediti gratuiti per l'esecuzione, il test e il deployment dei carichi di lavoro.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Assicurati che la fatturazione sia attivata per il tuo progetto Google Cloud.
-
Abilita le API Dataproc, Compute Engine, and Cloud Storage.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Assicurati che la fatturazione sia attivata per il tuo progetto Google Cloud.
-
Abilita le API Dataproc, Compute Engine, and Cloud Storage.
- Nella console Google Cloud, vai alla pagina Bucket di Cloud Storage.
- Fai clic su Crea bucket.
- Nella pagina Crea un bucket, inserisci le informazioni del bucket. Per andare al passaggio successivo, fai clic su Continua.
- In Assegna un nome al bucket, inserisci un nome che soddisfi i requisiti di denominazione dei bucket.
-
Per Scegli dove archiviare i tuoi dati, segui questi passaggi:
- Seleziona un'opzione Tipo di località.
- Seleziona un'opzione Località.
- In Scegli una classe di archiviazione predefinita per i dati, seleziona una classe di archiviazione.
- Per Scegli come controllare l'accesso agli oggetti, seleziona un'opzione Controllo dell'accesso.
- In Impostazioni avanzate (facoltative), specifica un metodo di crittografia, un criterio di conservazione o le etichette dei bucket.
- Fai clic su Crea.
Passaggi del tutorial
Esegui questi passaggi per creare un cluster Dataproc Kafka in un argomento Kafka in Cloud Storage in formato Parquet ORC.
Copia lo script di installazione di Kafka in Cloud Storage
L'azione di inizializzazione kafka.sh
lo script installa Kafka su un cluster Dataproc.
Cerca il codice.
Copia il
kafka.sh
azione di inizializzazione nel tuo bucket Cloud Storage. Questo script installa Kafka su un cluster Dataproc.Apri Cloud Shell ed esegui il seguente comando:
gcloud storage cp gs://goog-dataproc-initialization-actions-REGION/kafka/kafka.sh gs://BUCKET_NAME/scripts/
Effettua le seguenti sostituzioni:
- REGION:
kafka.sh
è memorizzato in tag pubblici con tag a livello di regione in Cloud Storage. Specifica un'area geograficamente vicina la regione di Compute Engine, (esempio:us-central1
). - BUCKET_NAME: il nome del tuo bucket Cloud Storage.
- REGION:
Crea un cluster Dataproc Kafka
Apri Cloud Shell ed esegui i seguenti
gcloud dataproc clusters create
per creare un progetto Dataproc Cluster ad alta disponibilità che installa i componenti Kafka e ZooKeeper:gcloud dataproc clusters create KAFKA_CLUSTER \ --project=PROJECT_ID \ --region=REGION \ --image-version=2.1-debian11 \ --num-masters=3 \ --enable-component-gateway \ --initialization-actions=gs://BUCKET_NAME/scripts/kafka.sh
Note:
- KAFKA_CLUSTER: il nome del cluster, che deve essere univoco all'interno di un progetto. Il nome deve iniziare con una lettera minuscola e può contenere fino a 51 lettere minuscole lettere, numeri e trattini. Non può terminare con un trattino. Il nome di un un cluster eliminato può essere riutilizzato.
- PROJECT_ID: il progetto da associare a questo cluster.
- REGION: il
Regione di Compute Engine
in cui si troverà il cluster, ad esempio
us-central1
.- Puoi aggiungere il flag facoltativo
--zone=ZONE
per specificare una zona all'interno della regione specificata, ad esempious-central1-a
. Se non specifichi una zona, Posizionamento della zona automatica di Dataproc seleziona una zona con la regione specificata.
- Puoi aggiungere il flag facoltativo
--image-version
: versione immagine Dataproc2.1-debian11
è consigliato per questo tutorial. Nota: ogni versione dell'immagine contiene un insieme di tra cui il componente Hive usato in questo (vedi Versioni immagine Dataproc supportate).--num-master
:3
nodi master creano un'istanza cluster ad alta disponibilità. Il componente Zookeeper, richiesto da Kafka, è preinstallato in un cluster ad alta disponibilità.--enable-component-gateway
: attiva il parametro Gateway dei componenti Dataproc.- BUCKET_NAME: il nome del tuo bucket Cloud Storage
che contiene lo script di inizializzazione
/scripts/kafka.sh
(vedi Copiare lo script di installazione di Kafka in Cloud Storage).
Crea un argomento custdata
Kafka
Per creare un argomento Kafka nel cluster Dataproc Kafka:
Utilizza l'SSH per aprire una finestra del terminale sulla VM master del cluster.
Creare un argomento
custdata
Kafka./usr/lib/kafka/bin/kafka-topics.sh \ --bootstrap-server KAFKA_CLUSTER-w-0:9092 \ --create --topic custdata
Note:
KAFKA_CLUSTER: inserisci il nome del tuo cluster Kafka.
-w-0:9092
indica che il broker Kafka in esecuzione porta9092
sul nodoworker-0
.Puoi eseguire i comandi seguenti dopo aver creato l'argomento
custdata
:# List all topics. /usr/lib/kafka/bin/kafka-topics.sh \ --bootstrap-server KAFKA_CLUSTER-w-0:9092 \ --list
# Consume then display topic data. /usr/lib/kafka/bin/kafka-console-consumer.sh \ --bootstrap-server KAFKA_CLUSTER-w-0:9092 \ --topic custdata
# Count the number of messages in the topic. /usr/lib/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell \ --broker-list KAFKA_CLUSTER-w-0:9092 \ --topic custdata # Delete topic. /usr/lib/kafka/bin/kafka-topics.sh \ --bootstrap-server KAFKA_CLUSTER-w-0:9092 \ --delete --topic custdata
Pubblica contenuto nell'argomento Kafka custdata
Lo script seguente utilizza lo strumento Kafka kafka-console-producer.sh
per
generare dati fittizi sui clienti in formato CSV.
Copia e incolla lo script nel pannello SSH sul nodo master del tuo cluster Kafka. Premi <return> per eseguire lo script.
for i in {1..10000}; do \ custname="cust name${i}" uuid=$(dbus-uuidgen) age=$((45 + $RANDOM % 45)) amount=$(echo "$(( $RANDOM % 99999 )).$(( $RANDOM % 99 ))") message="${uuid}:${custname},${age},${amount}" echo ${message} done | /usr/lib/kafka/bin/kafka-console-producer.sh \ --broker-list KAFKA_CLUSTER-w-0:9092 \ --topic custdata \ --property "parse.key=true" \ --property "key.separator=:"
Note:
- KAFKA_CLUSTER: il nome del tuo cluster Kafka.
Esegui questo comando Kafka per verificare che l'argomento
custdata
contenga 10.000 messaggi./usr/lib/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell \ --broker-list KAFKA_CLUSTER-w-0:9092 \ --topic custdata
Note:
- KAFKA_CLUSTER: il nome del tuo cluster Kafka.
Output previsto:
custdata:0:10000
Creazione di tabelle Hive in Cloud Storage
Creare tabelle Hive per ricevere dati sugli argomenti Kafka in flusso.
Per creare cust_parquet
(parquet) e un
cust_orc
(ORC) Hive tabelle nel tuo bucket Cloud Storage.
Inserisci BUCKET_NAME nello script seguente, quindi copia e incolla lo script nel terminale SSH sul nodo master del cluster Kafka, quindi premi <return> per creare uno script
~/hivetables.hql
(Hive Query Language).Eseguirai lo script
~/hivetables.hql
nel passaggio successivo per creare tavole Hive in parquet e ORC nel tuo bucket Cloud Storage.cat > ~/hivetables.hql <<EOF drop table if exists cust_parquet; create external table if not exists cust_parquet (uuid string, custname string, age string, amount string) row format delimited fields terminated by ',' stored as parquet location "gs://BUCKET_NAME/tables/cust_parquet"; drop table if exists cust_orc; create external table if not exists cust_orc (uuid string, custname string, age string, amount string) row format delimited fields terminated by ',' stored as orc location "gs://BUCKET_NAME/tables/cust_orc"; EOF
Nel terminale SSH sul nodo master del cluster Kafka, invia il job Hive
~/hivetables.hql
da crearecust_parquet
(parquet) e tabelle Hivecust_orc
(ORC) nel tuo bucket Cloud Storage.gcloud dataproc jobs submit hive \ --cluster=KAFKA_CLUSTER \ --region=REGION \ -f ~/hivetables.hql
Note:
- Il componente Hive è preinstallato in Dataproc Kafka in un cluster Kubernetes. Vedi Versioni di release 2.1.x per un elenco delle versioni dei componenti Hive incluse nelle immagini 2.1 rilasciate di recente.
- KAFKA_CLUSTER: il nome del tuo cluster Kafka.
- REGION: la regione in cui si trova il cluster Kafka.
Crea un flusso di custdata
Kafka in modalità Hive
- Esegui questo comando nel terminale SSH sul nodo master di
nel tuo cluster Kafka per installare la libreria
kafka-python
. È necessario un client Kafka per trasmettere in flusso i dati degli argomenti Kafka di archiviazione ideale in Cloud Storage.
pip install kafka-python
Inserisci il tuo BUCKET_NAME, poi copia e incolla quanto segue il codice PySpark nel terminale SSH sul nodo master del cluster Kafka e quindi premi <return> per creare un file
streamdata.py
.Lo script sottoscrive l'argomento Kafka
custdata
, quindi trasmette in flusso le alle tabelle Hive in Cloud Storage. Il formato di output, che può essere parquet o ORC, viene passato nello script come un parametro.cat > streamdata.py <<EOF #!/bin/python import sys from pyspark.sql.functions import * from pyspark.sql.types import * from pyspark.sql import SparkSession from kafka import KafkaConsumer def getNameFn (data): return data.split(",")[0] def getAgeFn (data): return data.split(",")[1] def getAmtFn (data): return data.split(",")[2] def main(cluster, outputfmt): spark = SparkSession.builder.appName("APP").getOrCreate() spark.sparkContext.setLogLevel("WARN") Logger = spark._jvm.org.apache.log4j.Logger logger = Logger.getLogger(__name__) rows = spark.readStream.format("kafka") \ .option("kafka.bootstrap.servers", cluster+"-w-0:9092").option("subscribe", "custdata") \ .option("startingOffsets", "earliest")\ .load() getNameUDF = udf(getNameFn, StringType()) getAgeUDF = udf(getAgeFn, StringType()) getAmtUDF = udf(getAmtFn, StringType()) logger.warn("Params passed in are cluster name: " + cluster + " output format(sink): " + outputfmt) query = rows.select (col("key").cast("string").alias("uuid"),\ getNameUDF (col("value").cast("string")).alias("custname"),\ getAgeUDF (col("value").cast("string")).alias("age"),\ getAmtUDF (col("value").cast("string")).alias("amount")) writer = query.writeStream.format(outputfmt)\ .option("path","gs://BUCKET_NAME/tables/cust_"+outputfmt)\ .option("checkpointLocation", "gs://BUCKET_NAME/chkpt/"+outputfmt+"wr") \ .outputMode("append")\ .start() writer.awaitTermination() if __name__=="__main__": if len(sys.argv) < 2: print ("Invalid number of arguments passed ", len(sys.argv)) print ("Usage: ", sys.argv[0], " cluster format") print ("e.g.: ", sys.argv[0], " <cluster_name> orc") print ("e.g.: ", sys.argv[0], " <cluster_name> parquet") main(sys.argv[1], sys.argv[2]) EOF
Nel terminale SSH sul nodo master tuo cluster Kafka, esegui
spark-submit
per trasmettere i dati in dalle tabelle Hive in Cloud Storage.Inserisci il nome del KAFKA_CLUSTER e l'output FORMAT, quindi copia e incolla il codice seguente nell'SSH sul nodo master del tuo cluster Kafka, quindi premi <return> per eseguire il codice e inviare i dati Kafka
custdata
in formato Parquet al tuo Hive tabelle in Cloud Storage.spark-submit --packages \ org.apache.spark:spark-streaming-kafka-0-10_2.12:3.1.3,org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.3 \ --conf spark.history.fs.gs.outputstream.type=FLUSHABLE_COMPOSITE \ --conf spark.driver.memory=4096m \ --conf spark.executor.cores=2 \ --conf spark.executor.instances=2 \ --conf spark.executor.memory=6144m \ streamdata.py KAFKA_CLUSTER FORMAT
Note:
- KAFKA_CLUSTER: inserisci il nome del tuo cluster Kafka.
- FORMAT: specifica
parquet
oorc
come formato di output. Puoi eseguire il comando in successione entrambi i formati alle tabelle Hive: ad esempio, nella prima chiamata, specificaparquet
per inviare in flusso l'argomento Kafkacustdata
all'Hive tavolo da parquet; quindi, nella seconda chiamata, specifica il formatoorc
in trasmetti il flussocustdata
alla tabella ORC Hive.
Dopo che l'output standard si arresta nel terminale SSH, significa che tutti i contenuti di
custdata
sono stati riprodotti in streaming, premi <control-c> nel terminale SSH per arrestare il processo.Elenca le tabelle Hive in Cloud Storage.
gcloud storage ls gs://BUCKET_NAME/tables/* --recursive
Note:
- BUCKET_NAME: inserisci il nome dell'istanza di Cloud Storage bucket che contiene le tabelle Hive (consulta Creare tabelle Hive).
Eseguire query su flussi di dati
Nel terminale SSH sul nodo master tuo cluster Kafka, esegui questo comando
hive
per contare i messaggi Kafka trasmessi in flussocustdata
nelle tabelle Hive in Cloud Storage.hive -e "select count(1) from TABLE_NAME"
Note:
- TABLE_NAME: specifica
cust_parquet
ocust_orc
come Nome tabella Hive.
Snippet di output previsto:
- TABLE_NAME: specifica
...
Status: Running (Executing on YARN cluster with App id application_....)
----------------------------------------------------------------------------------------------
VERTICES MODE STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED
----------------------------------------------------------------------------------------------
Map 1 .......... container SUCCEEDED 1 1 0 0 0 0
Reducer 2 ...... container SUCCEEDED 1 1 0 0 0 0
----------------------------------------------------------------------------------------------
VERTICES: 02/02 [==========================>>] 100% ELAPSED TIME: 9.89 s
----------------------------------------------------------------------------------------------
OK
10000
Time taken: 21.394 seconds, Fetched: 1 row(s)
Esegui la pulizia
Elimina il progetto
Elimina un progetto Google Cloud:
gcloud projects delete PROJECT_ID
Elimina risorse
-
Elimina il bucket:
gcloud storage buckets delete BUCKET_NAME
- Elimina il tuo cluster Kafka:
gcloud dataproc clusters delete KAFKA_CLUSTER \ --region=${REGION}