Apache Kafka è una piattaforma di flussi di dati open source distribuita per dati in tempo reale di pipeline e integrazione dei dati. Fornisce un sistema di flussi di dati efficiente e scalabile per l'utilizzo in una vasta gamma di applicazioni, tra cui:
- Analisi in tempo reale
- Elaborazione dei flussi
- Aggregazione dei log
- Messaggistica distribuita
- Flusso di eventi
Obiettivi
Installa Kafka su un cluster Dataproc HA con ZooKeeper (in questo tutorial denominato "cluster Kafka Dataproc").
Crea dati dei clienti fittizi, quindi pubblicali in un argomento Kafka.
Crea tabelle Hive parquet e ORC in Cloud Storage per ricevere i dati degli argomenti Kafka in streaming.
Invia un job PySpark per la sottoscrizione e il flusso di dati dell'argomento Kafka in Cloud Storage in formato Parquet e ORC.
Eseguire una query sui dati della tabella Hive trasmessi in flusso per contare i flussi di dati Messaggi Kafka.
Costi
In questo documento utilizzi i seguenti componenti fatturabili di Google Cloud:
Per generare una stima dei costi basata sull'utilizzo previsto,
utilizza il Calcolatore prezzi.
Al termine delle attività descritte in questo documento, puoi evitare la fatturazione continua eliminando le risorse che hai creato. Per ulteriori informazioni, consulta la sezione Pulizia.
Prima di iniziare
Se non l'hai ancora fatto, crea un progetto Google Cloud.
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Dataproc, Compute Engine, and Cloud Storage APIs.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Dataproc, Compute Engine, and Cloud Storage APIs.
- In the Google Cloud console, go to the Cloud Storage Buckets page.
- Click Create bucket.
- On the Create a bucket page, enter your bucket information. To go to the next
step, click Continue.
- For Name your bucket, enter a name that meets the bucket naming requirements.
-
For Choose where to store your data, do the following:
- Select a Location type option.
- Select a Location option.
- For Choose a default storage class for your data, select a storage class.
- For Choose how to control access to objects, select an Access control option.
- For Advanced settings (optional), specify an encryption method, a retention policy, or bucket labels.
- Click Create.
Passaggi del tutorial
Esegui questi passaggi per creare un cluster Dataproc Kafka in un argomento Kafka in Cloud Storage in formato Parquet ORC.
Copia lo script di installazione di Kafka in Cloud Storage
L'azione di inizializzazione kafka.sh
lo script installa Kafka su un cluster Dataproc.
Cerca il codice.
Copia il
kafka.sh
azione di inizializzazione nel tuo bucket Cloud Storage. Questo script installa Kafka su un cluster Dataproc.Apri Cloud Shell ed esegui il comando seguente:
gcloud storage cp gs://goog-dataproc-initialization-actions-REGION/kafka/kafka.sh gs://BUCKET_NAME/scripts/
Effettua le seguenti sostituzioni:
- REGION:
kafka.sh
viene archiviato in bucket pubblici con tag regionali in Cloud Storage. Specifica una regione Compute Engine geograficamente vicina (ad esempious-central1
). - BUCKET_NAME: il nome del bucket Cloud Storage.
- REGION:
Crea un cluster Kafka Dataproc
Apri Cloud Shell, quindi esegui il seguente comando
gcloud dataproc clusters create
per creare un cluster Dataproc HA che installa i componenti Kafka e ZooKeeper:gcloud dataproc clusters create KAFKA_CLUSTER \ --project=PROJECT_ID \ --region=REGION \ --image-version=2.1-debian11 \ --num-masters=3 \ --enable-component-gateway \ --initialization-actions=gs://BUCKET_NAME/scripts/kafka.sh
Note:
- KAFKA_CLUSTER: il nome del cluster, che deve essere univoco all'interno di un progetto. Il nome deve iniziare con una lettera minuscola e può contenere fino a 51 lettere minuscole, numeri e trattini. Non può terminare con un trattino. Il nome di un cluster eliminato può essere riutilizzato.
- PROJECT_ID: il progetto da associare a questo cluster.
- REGION: la
regione Compute Engine
dove si troverà il cluster, ad esempio
us-central1
.- Puoi aggiungere il flag facoltativo
--zone=ZONE
per specificare una zona all'interno della regione specificata, ad esempious-central1-a
. Se non specifichi una zona, Posizionamento della zona automatica di Dataproc seleziona una zona con la regione specificata.
- Puoi aggiungere il flag facoltativo
--image-version
: versione immagine Dataproc2.1-debian11
è consigliato per questo tutorial. Nota: ogni versione dell'immagine contiene un insieme di tra cui il componente Hive usato in questo (vedi Versioni immagine Dataproc supportate).--num-master
:3
nodi master creano un'istanza cluster ad alta disponibilità. Il componente Zookeeper, richiesto da Kafka, è preinstallato in un cluster ad alta disponibilità.--enable-component-gateway
: attiva il parametro Gateway dei componenti Dataproc.- BUCKET_NAME: il nome del bucket Cloud Storage
che contiene lo script di inizializzazione
/scripts/kafka.sh
(vedi Copiare lo script di installazione di Kafka in Cloud Storage).
Crea un argomento Kafka custdata
Per creare un argomento Kafka nel cluster Kafka Dataproc:
Utilizza l'SSH per aprire una finestra del terminale sulla VM master del cluster.
Creare un argomento
custdata
Kafka./usr/lib/kafka/bin/kafka-topics.sh \ --bootstrap-server KAFKA_CLUSTER-w-0:9092 \ --create --topic custdata
Note:
KAFKA_CLUSTER: inserisci il nome del cluster Kafka.
-w-0:9092
indica il broker Kafka in esecuzione sulla porta9092
nel nodoworker-0
.Dopo aver creato l'argomento
custdata
, puoi eseguire i seguenti comandi:# List all topics. /usr/lib/kafka/bin/kafka-topics.sh \ --bootstrap-server KAFKA_CLUSTER-w-0:9092 \ --list
# Consume then display topic data. /usr/lib/kafka/bin/kafka-console-consumer.sh \ --bootstrap-server KAFKA_CLUSTER-w-0:9092 \ --topic custdata
# Count the number of messages in the topic. /usr/lib/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell \ --broker-list KAFKA_CLUSTER-w-0:9092 \ --topic custdata # Delete topic. /usr/lib/kafka/bin/kafka-topics.sh \ --bootstrap-server KAFKA_CLUSTER-w-0:9092 \ --delete --topic custdata
Pubblica contenuto nell'argomento Kafka custdata
Lo script seguente utilizza lo kafka-console-producer.sh
strumento Kafka per
generare dati fittizi dei clienti in formato CSV.
Copia e incolla lo script nella finestra SSH sul nodo master del tuo cluster Kafka. Premi <return> per eseguire lo script.
for i in {1..10000}; do \ custname="cust name${i}" uuid=$(dbus-uuidgen) age=$((45 + $RANDOM % 45)) amount=$(echo "$(( $RANDOM % 99999 )).$(( $RANDOM % 99 ))") message="${uuid}:${custname},${age},${amount}" echo ${message} done | /usr/lib/kafka/bin/kafka-console-producer.sh \ --broker-list KAFKA_CLUSTER-w-0:9092 \ --topic custdata \ --property "parse.key=true" \ --property "key.separator=:"
Note:
- KAFKA_CLUSTER: il nome del cluster Kafka.
Esegui questo comando Kafka per verificare che l'argomento
custdata
contenga 10.000 messaggi./usr/lib/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell \ --broker-list KAFKA_CLUSTER-w-0:9092 \ --topic custdata
Note:
- KAFKA_CLUSTER: il nome del cluster Kafka.
Risultato previsto:
custdata:0:10000
Crea tabelle Hive in Cloud Storage
Crea tabelle Hive per ricevere i dati degli argomenti Kafka in streaming.
Esegui i seguenti passaggi per creare tabelle Hive cust_parquet
(parquet) e cust_orc
(ORC) nel bucket Cloud Storage.
Inserisci BUCKET_NAME nello script seguente, quindi copia e incolla lo script nel terminale SSH sul nodo master del cluster Kafka, quindi premi <return> per creare uno script
~/hivetables.hql
(Hive Query Language).Eseguirai lo script
~/hivetables.hql
nel passaggio successivo per creare tavole Hive in parquet e ORC nel tuo bucket Cloud Storage.cat > ~/hivetables.hql <<EOF drop table if exists cust_parquet; create external table if not exists cust_parquet (uuid string, custname string, age string, amount string) row format delimited fields terminated by ',' stored as parquet location "gs://BUCKET_NAME/tables/cust_parquet"; drop table if exists cust_orc; create external table if not exists cust_orc (uuid string, custname string, age string, amount string) row format delimited fields terminated by ',' stored as orc location "gs://BUCKET_NAME/tables/cust_orc"; EOF
Nel terminale SSH sul nodo principale del cluster Kafka, invia il job Hive
~/hivetables.hql
per crearecust_parquet
(parquet) e una tabella Hivecust_orc
(ORC) nel bucket Cloud Storage.gcloud dataproc jobs submit hive \ --cluster=KAFKA_CLUSTER \ --region=REGION \ -f ~/hivetables.hql
Note:
- Il componente Hive è preinstallato in Dataproc Kafka in un cluster Kubernetes. Vedi Versioni di release 2.1.x per un elenco delle versioni dei componenti Hive incluse nelle immagini 2.1 rilasciate di recente.
- KAFKA_CLUSTER: il nome del cluster Kafka.
- REGION: la regione in cui si trova il cluster Kafka.
Esegui lo streaming di Kafka custdata
nelle tabelle Hive
- Esegui questo comando nel terminale SSH sul nodo master di
nel tuo cluster Kafka per installare la libreria
kafka-python
. È necessario un client Kafka per trasmettere in streaming i dati degli argomenti Kafka su Cloud Storage.
pip install kafka-python
Inserisci il tuo BUCKET_NAME, quindi copia e incolla il seguente codice PySpark nel terminale SSH sul nodo principale del cluster Kafka e premi <Invio> per creare un file
streamdata.py
.Lo script sottoscrive l'argomento Kafka
custdata
, quindi trasmette in flusso le alle tabelle Hive in Cloud Storage. Il formato di output, che può essere parquet o ORC, viene passato nello script come un parametro.cat > streamdata.py <<EOF #!/bin/python import sys from pyspark.sql.functions import * from pyspark.sql.types import * from pyspark.sql import SparkSession from kafka import KafkaConsumer def getNameFn (data): return data.split(",")[0] def getAgeFn (data): return data.split(",")[1] def getAmtFn (data): return data.split(",")[2] def main(cluster, outputfmt): spark = SparkSession.builder.appName("APP").getOrCreate() spark.sparkContext.setLogLevel("WARN") Logger = spark._jvm.org.apache.log4j.Logger logger = Logger.getLogger(__name__) rows = spark.readStream.format("kafka") \ .option("kafka.bootstrap.servers", cluster+"-w-0:9092").option("subscribe", "custdata") \ .option("startingOffsets", "earliest")\ .load() getNameUDF = udf(getNameFn, StringType()) getAgeUDF = udf(getAgeFn, StringType()) getAmtUDF = udf(getAmtFn, StringType()) logger.warn("Params passed in are cluster name: " + cluster + " output format(sink): " + outputfmt) query = rows.select (col("key").cast("string").alias("uuid"),\ getNameUDF (col("value").cast("string")).alias("custname"),\ getAgeUDF (col("value").cast("string")).alias("age"),\ getAmtUDF (col("value").cast("string")).alias("amount")) writer = query.writeStream.format(outputfmt)\ .option("path","gs://BUCKET_NAME/tables/cust_"+outputfmt)\ .option("checkpointLocation", "gs://BUCKET_NAME/chkpt/"+outputfmt+"wr") \ .outputMode("append")\ .start() writer.awaitTermination() if __name__=="__main__": if len(sys.argv) < 2: print ("Invalid number of arguments passed ", len(sys.argv)) print ("Usage: ", sys.argv[0], " cluster format") print ("e.g.: ", sys.argv[0], " <cluster_name> orc") print ("e.g.: ", sys.argv[0], " <cluster_name> parquet") main(sys.argv[1], sys.argv[2]) EOF
Nel terminale SSH sul nodo principale del tuo cluster Kafka, esegui
spark-submit
per trasmettere i dati alle tabelle Hive in Cloud Storage.Inserisci il nome del tuo KAFKA_CLUSTER e dell'output FORMAT, quindi copia e incolla il seguente codice nel terminale SSH sul nodo principale del tuo cluster Kafka e premi <Invio> per eseguire il codice e trasmettere in streaming i dati di Kafka
custdata
in formato parquet alle tue tabelle Hive in Cloud Storage.spark-submit --packages \ org.apache.spark:spark-streaming-kafka-0-10_2.12:3.1.3,org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.3 \ --conf spark.history.fs.gs.outputstream.type=FLUSHABLE_COMPOSITE \ --conf spark.driver.memory=4096m \ --conf spark.executor.cores=2 \ --conf spark.executor.instances=2 \ --conf spark.executor.memory=6144m \ streamdata.py KAFKA_CLUSTER FORMAT
Note:
- KAFKA_CLUSTER: inserisci il nome del tuo cluster Kafka.
- FORMAT: specifica
parquet
oorc
come formato di output. Puoi eseguire il comando in successione entrambi i formati alle tabelle Hive: ad esempio, nella prima chiamata, specificaparquet
per inviare in flusso l'argomento Kafkacustdata
all'Hive tavolo da parquet; quindi, nella seconda chiamata, specifica il formatoorc
in trasmetti il flussocustdata
alla tabella ORC Hive.
Quando l'output standard si interrompe nel terminale SSH, significa che tutto il
custdata
è stato trasmesso in streaming. Premi <Ctrl + C> nel terminale SSH per interrompere il processo.Elenca le tabelle Hive in Cloud Storage.
gcloud storage ls gs://BUCKET_NAME/tables/* --recursive
Note:
- BUCKET_NAME: inserisci il nome del bucket Cloud Storage che contiene le tabelle Hive (vedi Creare tabelle Hive).
Esegui query sui dati in streaming
Nel terminale SSH sul nodo master tuo cluster Kafka, esegui questo comando
hive
per contare i messaggi Kafka trasmessi in flussocustdata
nelle tabelle Hive in Cloud Storage.hive -e "select count(1) from TABLE_NAME"
Note:
- TABLE_NAME: specifica
cust_parquet
ocust_orc
come nome della tabella Hive.
Snippet di output previsto:
- TABLE_NAME: specifica
...
Status: Running (Executing on YARN cluster with App id application_....)
----------------------------------------------------------------------------------------------
VERTICES MODE STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED
----------------------------------------------------------------------------------------------
Map 1 .......... container SUCCEEDED 1 1 0 0 0 0
Reducer 2 ...... container SUCCEEDED 1 1 0 0 0 0
----------------------------------------------------------------------------------------------
VERTICES: 02/02 [==========================>>] 100% ELAPSED TIME: 9.89 s
----------------------------------------------------------------------------------------------
OK
10000
Time taken: 21.394 seconds, Fetched: 1 row(s)
Esegui la pulizia
Elimina il progetto
Delete a Google Cloud project:
gcloud projects delete PROJECT_ID
Elimina risorse
-
Elimina il bucket:
gcloud storage buckets delete BUCKET_NAME
- Elimina il cluster Kafka:
gcloud dataproc clusters delete KAFKA_CLUSTER \ --region=${REGION}