Apache Kafka è una piattaforma di streaming distribuita open source per pipeline di dati e integrazione dei dati in tempo reale. Fornisce un sistema di streaming efficiente e scalabile per l'utilizzo in una serie di applicazioni, tra cui:
- Analisi in tempo reale
- Elaborazione dei flussi
- Aggregazione dei log
- Messaggistica distribuita
- Streaming di eventi
Obiettivi
Installa Kafka su un cluster Dataproc HA con ZooKeeper (in questo tutorial denominato "cluster Kafka Dataproc").
Crea dati dei clienti fittizi, quindi pubblicali in un argomento Kafka.
Crea tabelle Hive parquet e ORC in Cloud Storage per ricevere i dati degli argomenti Kafka in streaming.
Invia un job PySpark per iscriverti all'argomento Kafka e riprodurlo in streaming in Cloud Storage in formato Parquet e ORC.
Esegui una query sui dati della tabella Hive in streaming per conteggiare i messaggi Kafka in streaming.
Costi
In questo documento utilizzi i seguenti componenti fatturabili di Google Cloud:
Per generare una stima dei costi basata sull'utilizzo previsto,
utilizza il Calcolatore prezzi.
Al termine delle attività descritte in questo documento, puoi evitare la fatturazione continua eliminando le risorse che hai creato. Per ulteriori informazioni, consulta la sezione Pulizia.
Prima di iniziare
Se non l'hai ancora fatto, crea un progetto Google Cloud.
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Dataproc, Compute Engine, and Cloud Storage APIs.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Dataproc, Compute Engine, and Cloud Storage APIs.
- In the Google Cloud console, go to the Cloud Storage Buckets page.
- Click Create bucket.
- On the Create a bucket page, enter your bucket information. To go to the next
step, click Continue.
- For Name your bucket, enter a name that meets the bucket naming requirements.
-
For Choose where to store your data, do the following:
- Select a Location type option.
- Select a Location option.
- For Choose a default storage class for your data, select a storage class.
- For Choose how to control access to objects, select an Access control option.
- For Advanced settings (optional), specify an encryption method, a retention policy, or bucket labels.
- Click Create.
Passaggi del tutorial
Esegui i seguenti passaggi per creare un cluster Kafka Dataproc per leggere un argomento Kafka in Cloud Storage in formato Parquet OPPURE ORC.
Copia lo script di installazione di Kafka in Cloud Storage
Lo script kafka.sh
azione di inizializzazione
installa Kafka su un cluster Dataproc.
Sfoglia il codice.
Copia lo script dell'
kafka.sh
azione di inizializzazione nel tuo bucket Cloud Storage. Questo script installa Kafka su un cluster Dataproc.Apri Cloud Shell ed esegui il comando seguente:
gcloud storage cp gs://goog-dataproc-initialization-actions-REGION/kafka/kafka.sh gs://BUCKET_NAME/scripts/
Apporta le seguenti sostituzioni:
- REGION:
kafka.sh
viene archiviato in bucket pubblici con tag regionali in Cloud Storage. Specifica una regione Compute Engine geograficamente vicina (ad esempious-central1
). - BUCKET_NAME: il nome del bucket Cloud Storage.
- REGION:
Crea un cluster Kafka Dataproc
Apri Cloud Shell, quindi esegui il seguente comando
gcloud dataproc clusters create
per creare un cluster Dataproc HA che installa i componenti Kafka e ZooKeeper:gcloud dataproc clusters create KAFKA_CLUSTER \ --project=PROJECT_ID \ --region=REGION \ --image-version=2.1-debian11 \ --num-masters=3 \ --enable-component-gateway \ --initialization-actions=gs://BUCKET_NAME/scripts/kafka.sh
Note:
- KAFKA_CLUSTER: il nome del cluster, che deve essere univoco all'interno di un progetto. Il nome deve iniziare con una lettera minuscola e può contenere fino a 51 lettere minuscole, numeri e trattini. Non può terminare con un trattino. Il nome di un cluster eliminato può essere riutilizzato.
- PROJECT_ID: il progetto da associare a questo cluster.
- REGION: la
regione Compute Engine
dove si troverà il cluster, ad esempio
us-central1
.- Puoi aggiungere il flag facoltativo
--zone=ZONE
per specificare una zona all'interno della regione specificata, ad esempious-central1-a
. Se non specifichi una zona, la funzionalità di posizionamento in zona automatica di Dataproc seleziona una zona con la regione specificata.
- Puoi aggiungere il flag facoltativo
--image-version
: per questo tutorial è consigliata la versione dell'immagine Dataproc2.1-debian11
. Nota: ogni versione dell'immagine contiene un insieme di componenti preinstallati, incluso il componente Hive utilizzato in questo tutorial (consulta Versioni delle immagini Dataproc supportate).--num-master
:3
nodi master creano un cluster HA. Il componente Zookeeper, richiesto da Kafka, è preinstallato su un cluster ad alta disponibilità.--enable-component-gateway
: abilita il gateway dei componenti Dataproc.- BUCKET_NAME: il nome del bucket Cloud Storage
che contiene lo script di inizializzazione
/scripts/kafka.sh
(vedi Copiare lo script di installazione di Kafka in Cloud Storage).
Crea un argomento Kafka custdata
Per creare un argomento Kafka nel cluster Kafka Dataproc:
Utilizza l'utilità SSH per aprire una finestra del terminale sulla VM master del cluster.
Crea un argomento Kafka
custdata
./usr/lib/kafka/bin/kafka-topics.sh \ --bootstrap-server KAFKA_CLUSTER-w-0:9092 \ --create --topic custdata
Note:
KAFKA_CLUSTER: inserisci il nome del cluster Kafka.
-w-0:9092
indica il broker Kafka in esecuzione sulla porta9092
nel nodoworker-0
.Dopo aver creato l'argomento
custdata
, puoi eseguire i seguenti comandi:# List all topics. /usr/lib/kafka/bin/kafka-topics.sh \ --bootstrap-server KAFKA_CLUSTER-w-0:9092 \ --list
# Consume then display topic data. /usr/lib/kafka/bin/kafka-console-consumer.sh \ --bootstrap-server KAFKA_CLUSTER-w-0:9092 \ --topic custdata
# Count the number of messages in the topic. /usr/lib/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell \ --broker-list KAFKA_CLUSTER-w-0:9092 \ --topic custdata # Delete topic. /usr/lib/kafka/bin/kafka-topics.sh \ --bootstrap-server KAFKA_CLUSTER-w-0:9092 \ --delete --topic custdata
Pubblicare contenuti nell'argomento Kafka custdata
Lo script seguente utilizza lo kafka-console-producer.sh
strumento Kafka per
generare dati fittizi dei clienti in formato CSV.
Copia e incolla lo script nel terminale SSH sul nodo principale del cluster Kafka. Premi <Invio> per eseguire lo script.
for i in {1..10000}; do \ custname="cust name${i}" uuid=$(dbus-uuidgen) age=$((45 + $RANDOM % 45)) amount=$(echo "$(( $RANDOM % 99999 )).$(( $RANDOM % 99 ))") message="${uuid}:${custname},${age},${amount}" echo ${message} done | /usr/lib/kafka/bin/kafka-console-producer.sh \ --broker-list KAFKA_CLUSTER-w-0:9092 \ --topic custdata \ --property "parse.key=true" \ --property "key.separator=:"
Note:
- KAFKA_CLUSTER: il nome del cluster Kafka.
Esegui il seguente comando Kafka per verificare che l'argomento
custdata
contenga 10.000 messaggi./usr/lib/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell \ --broker-list KAFKA_CLUSTER-w-0:9092 \ --topic custdata
Note:
- KAFKA_CLUSTER: il nome del cluster Kafka.
Risultato previsto:
custdata:0:10000
Crea tabelle Hive in Cloud Storage
Crea tabelle Hive per ricevere i dati degli argomenti Kafka in streaming.
Esegui i seguenti passaggi per creare tabelle Hive cust_parquet
(parquet) e cust_orc
(ORC) nel bucket Cloud Storage.
Inserisci il tuo BUCKET_NAME nello script seguente, poi copia e incolla lo script nel terminale SSH sul nodo master del cluster Kafka, poi premi <Invio> per creare uno script
~/hivetables.hql
(Hive Query Language).Eseguirai lo script
~/hivetables.hql
nel passaggio successivo per creare tabelle Hive parquet e ORC nel bucket Cloud Storage.cat > ~/hivetables.hql <<EOF drop table if exists cust_parquet; create external table if not exists cust_parquet (uuid string, custname string, age string, amount string) row format delimited fields terminated by ',' stored as parquet location "gs://BUCKET_NAME/tables/cust_parquet"; drop table if exists cust_orc; create external table if not exists cust_orc (uuid string, custname string, age string, amount string) row format delimited fields terminated by ',' stored as orc location "gs://BUCKET_NAME/tables/cust_orc"; EOF
Nel terminale SSH sul nodo principale del cluster Kafka, invia il job Hive
~/hivetables.hql
per crearecust_parquet
(parquet) e una tabella Hivecust_orc
(ORC) nel bucket Cloud Storage.gcloud dataproc jobs submit hive \ --cluster=KAFKA_CLUSTER \ --region=REGION \ -f ~/hivetables.hql
Note:
- Il componente Hive è preinstallato sul cluster Dataproc Kafka. Consulta le versioni release 2.1.x per un elenco delle versioni dei componenti Hive incluse nelle immagini 2.1 rilasciate di recente.
- KAFKA_CLUSTER: il nome del cluster Kafka.
- REGION: la regione in cui si trova il cluster Kafka.
Esegui lo streaming di Kafka custdata
nelle tabelle Hive
- Esegui il seguente comando nel terminale SSH sul nodo principale del tuo cluster Kafka per installare la libreria
kafka-python
. È necessario un client Kafka per eseguire lo streaming dei dati degli argomenti Kafka in Cloud Storage.
pip install kafka-python
Inserisci il tuo BUCKET_NAME, quindi copia e incolla il seguente codice PySpark nel terminale SSH sul nodo master del cluster Kafka e premi <Invio> per creare un file
streamdata.py
.Lo script si iscrive all'argomento Kafka
custdata
, quindi trasmette i dati in streaming alle tabelle Hive in Cloud Storage. Il formato di output, che può essere parquet o ORC, viene passato allo script come parametro.cat > streamdata.py <<EOF #!/bin/python import sys from pyspark.sql.functions import * from pyspark.sql.types import * from pyspark.sql import SparkSession from kafka import KafkaConsumer def getNameFn (data): return data.split(",")[0] def getAgeFn (data): return data.split(",")[1] def getAmtFn (data): return data.split(",")[2] def main(cluster, outputfmt): spark = SparkSession.builder.appName("APP").getOrCreate() spark.sparkContext.setLogLevel("WARN") Logger = spark._jvm.org.apache.log4j.Logger logger = Logger.getLogger(__name__) rows = spark.readStream.format("kafka") \ .option("kafka.bootstrap.servers", cluster+"-w-0:9092").option("subscribe", "custdata") \ .option("startingOffsets", "earliest")\ .load() getNameUDF = udf(getNameFn, StringType()) getAgeUDF = udf(getAgeFn, StringType()) getAmtUDF = udf(getAmtFn, StringType()) logger.warn("Params passed in are cluster name: " + cluster + " output format(sink): " + outputfmt) query = rows.select (col("key").cast("string").alias("uuid"),\ getNameUDF (col("value").cast("string")).alias("custname"),\ getAgeUDF (col("value").cast("string")).alias("age"),\ getAmtUDF (col("value").cast("string")).alias("amount")) writer = query.writeStream.format(outputfmt)\ .option("path","gs://BUCKET_NAME/tables/cust_"+outputfmt)\ .option("checkpointLocation", "gs://BUCKET_NAME/chkpt/"+outputfmt+"wr") \ .outputMode("append")\ .start() writer.awaitTermination() if __name__=="__main__": if len(sys.argv) < 2: print ("Invalid number of arguments passed ", len(sys.argv)) print ("Usage: ", sys.argv[0], " cluster format") print ("e.g.: ", sys.argv[0], " <cluster_name> orc") print ("e.g.: ", sys.argv[0], " <cluster_name> parquet") main(sys.argv[1], sys.argv[2]) EOF
Nel terminale SSH sul nodo principale del tuo cluster Kafka, esegui
spark-submit
per trasmettere i dati alle tabelle Hive in Cloud Storage.Inserisci il nome del tuo KAFKA_CLUSTER e dell'output FORMAT, quindi copia e incolla il seguente codice nel terminale SSH sul nodo principale del tuo cluster Kafka e premi <Invio> per eseguire il codice e trasmettere in streaming i dati
custdata
di Kafka in formato parquet alle tue tabelle Hive in Cloud Storage.spark-submit --packages \ org.apache.spark:spark-streaming-kafka-0-10_2.12:3.1.3,org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.3 \ --conf spark.history.fs.gs.outputstream.type=FLUSHABLE_COMPOSITE \ --conf spark.driver.memory=4096m \ --conf spark.executor.cores=2 \ --conf spark.executor.instances=2 \ --conf spark.executor.memory=6144m \ streamdata.py KAFKA_CLUSTER FORMAT
Note:
- KAFKA_CLUSTER: inserisci il nome del cluster Kafka.
- FORMAT: specifica
parquet
oorc
come formato di output. Puoi eseguire il comando in sequenza per eseguire lo streaming di entrambi i formati nelle tabelle Hive: ad esempio, nella prima chiamata, specificaparquet
per eseguire lo streaming dell'argomento Kafkacustdata
nella tabella Hive Parquet; poi, nella seconda chiamata, specifica il formatoorc
per eseguire lo streaming dicustdata
nella tabella Hive ORC.
Quando l'output standard si interrompe nel terminale SSH, significa che tutto il
custdata
è stato trasmesso in streaming. Premi <control-c> nel terminale SSH per interrompere il processo.Elenca le tabelle Hive in Cloud Storage.
gcloud storage ls gs://BUCKET_NAME/tables/* --recursive
Note:
- BUCKET_NAME: inserisci il nome del bucket Cloud Storage che contiene le tabelle Hive (vedi Creare tabelle Hive).
Esegui query sui dati in streaming
Nel terminale SSH sul nodo principale del tuo cluster Kafka, esegui il seguente comando
hive
per conteggiare i messaggi Kafkacustdata
in streaming nelle tabelle Hive in Cloud Storage.hive -e "select count(1) from TABLE_NAME"
Note:
- TABLE_NAME: specifica
cust_parquet
ocust_orc
come nome della tabella Hive.
Snippet di output previsto:
- TABLE_NAME: specifica
...
Status: Running (Executing on YARN cluster with App id application_....)
----------------------------------------------------------------------------------------------
VERTICES MODE STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED
----------------------------------------------------------------------------------------------
Map 1 .......... container SUCCEEDED 1 1 0 0 0 0
Reducer 2 ...... container SUCCEEDED 1 1 0 0 0 0
----------------------------------------------------------------------------------------------
VERTICES: 02/02 [==========================>>] 100% ELAPSED TIME: 9.89 s
----------------------------------------------------------------------------------------------
OK
10000
Time taken: 21.394 seconds, Fetched: 1 row(s)
Esegui la pulizia
Elimina il progetto
Delete a Google Cloud project:
gcloud projects delete PROJECT_ID
Elimina risorse
-
Elimina il bucket:
gcloud storage buckets delete BUCKET_NAME
- Elimina il cluster Kafka:
gcloud dataproc clusters delete KAFKA_CLUSTER \ --region=${REGION}