Scrivere messaggi Pub/Sub Lite utilizzando Apache Spark
Il connettore Spark Pub/Sub Lite è una libreria client Java open source che supporta l'utilizzo di Pub/Sub Lite come origine di input e output per Apache Spark Structured Streaming . Il connettore funziona in tutte le distribuzioni Apache Spark, tra cui Dataproc.
Questa guida rapida illustra come:
- leggere i messaggi da Pub/Sub Lite
- scrivere messaggi in Pub/Sub Lite
utilizzando PySpark da un cluster Spark Dataproc.
Prima di iniziare
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Pub/Sub Lite, Dataproc, Cloud Storage, Logging APIs.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Pub/Sub Lite, Dataproc, Cloud Storage, Logging APIs.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
Configura
Crea variabili per il progetto.
export PROJECT_ID=$(gcloud config get-value project)
export PROJECT_NUMBER=$(gcloud projects list \ --filter="projectId:$PROJECT_ID" \ --format="value(PROJECT_NUMBER)")
Creare un bucket Cloud Storage. I nomi dei bucket Cloud Storage devono essere univoci a livello globale.
export BUCKET=your-bucket-name
gcloud storage buckets create gs://$BUCKET
Crea un argomento e una sottoscrizione Pub/Sub Lite in una località supportata. Consulta Creare un argomento se utilizzi una prenotazione Pub/Sub Lite.
export TOPIC=your-lite-topic-id
export SUBSCRIPTION=your-lite-subscription-id
export PUBSUBLITE_LOCATION=your-lite-location
gcloud pubsub lite-topics create $TOPIC \ --location=$PUBSUBLITE_LOCATION \ --partitions=2 \ --per-partition-bytes=30GiB
gcloud pubsub lite-subscriptions create $SUBSCRIPTION \ --location=$PUBSUBLITE_LOCATION \ --topic=$TOPIC
Crea un cluster Dataproc.
export DATAPROC_REGION=your-dataproc-region
export CLUSTER_ID=your-dataproc-cluster-id
gcloud dataproc clusters create $CLUSTER_ID \ --region $DATAPROC_REGION \ --image-version 2.1 \ --scopes 'https://www.googleapis.com/auth/cloud-platform' \ --enable-component-gateway \ --bucket $BUCKET
--region
: una regione Dataproc supportata in cui si trovano l'argomento e l'abbonamento Pub/Sub Lite.--image-version
: la versione dell'immagine del cluster, che determina la versione di Apache Spark installata nel cluster. Scegli le versioni di release delle immagini 2.x.x perché il connettore Spark Pub/Sub Lite supporta attualmente Apache Spark 3.x.x.--scopes
: abilita l'accesso API ai servizi Google Cloud nello stesso progetto.--enable-component-gateway
: abilita l'accesso all'interfaccia utente web di Apache Spark.--bucket
: un bucket gestione temporanea Cloud Storage utilizzato per archiviare le dipendenze dei job del cluster, l'output del driver e i file di configurazione del cluster.
Clona il repository della guida rapida e vai alla directory del codice campione:
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
cd python-docs-samples/pubsublite/spark-connector/
Scrittura in Pub/Sub Lite
L'esempio seguente:
- Crea una
origine tariffa
che genera numeri e timestamp consecutivi formattati come
spark.sql.Row
- trasforma i dati in modo che corrispondano allo
schema della tabella
richiesto dall'API
writeStream
del connettore Spark Pub/Sub Lite - scrivere i dati in un argomento Pub/Sub Lite esistente
Per inviare il job di scrittura a Dataproc:
Console
- Carica lo script PySpark nel bucket Cloud Storage.
- Vai alla console Cloud Storage.
- Seleziona il bucket.
- Utilizza Carica file per caricare lo script PySpark che intendi utilizzare.
- Invia il job al cluster Dataproc:
- Vai alla console Dataproc.
- Vai ai job.
- Fai clic su Invia job.
- Inserisci i dettagli del lavoro.
- In Cluster, scegli il cluster.
- In Job (Job), assegna un nome all'ID job.
- Per Tipo di job, scegli PySpark.
- Per File python principale, fornisci l'URI di Cloud Storage gcloud dello script PySpark caricato che inizia con
gs://
. - Per i file JAR, scegli la versione più recente del connettore Spark da Maven, cerca il file JAR con le dipendenze nelle opzioni di download e copia il link.
- Per Argomenti, se utilizzi lo script PySpark completo di GitHub, inserisci
--project_number=
PROJECT_NUMBER,--location=
PUBSUBLITE_LOCATION,--topic_id=
TOPIC_ID. Se copi lo script PySpark riportato sopra con i to-do completati, lascia vuoto questo campo. - In Proprietà, inserisci la chiave
spark.master
e il valoreyarn
. - Fai clic su Invia.
gcloud
Utilizza il comando gcloud dataproc jobs submit pyspark per inviare il job a Dataproc:
gcloud dataproc jobs submit pyspark spark_streaming_to_pubsublite_example.py \
--region=$DATAPROC_REGION \
--cluster=$CLUSTER_ID \
--jars=gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-LATEST-with-dependencies.jar \
--driver-log-levels=root=INFO \
--properties=spark.master=yarn \
-- --project_number=$PROJECT_NUMBER --location=$PUBSUBLITE_LOCATION --topic_id=$TOPIC
--region
: la regione Dataproc preselezionata.--cluster
: il nome del cluster Dataproc.--jars
: il file jar uber del connettore Spark Pub/Sub Lite con le dipendenze in un bucket Cloud Storage pubblico. Puoi anche visitare questo link per scaricare il file jar uber con le dipendenze da Maven.--driver-log-levels
: imposta il livello di logging su INFO a livello di radice.--properties
: utilizza il gestore delle risorse YARN per il master Spark.--
: fornisci gli argomenti richiesti dallo script.
Se l'operazione writeStream
va a buon fine, dovresti vedere messaggi di log come quelli riportati di seguito localmente e nella pagina dei dettagli del job nella consoleGoogle Cloud :
INFO com.google.cloud.pubsublite.spark.PslStreamWriter: Committed 1 messages for epochId ..
Lettura da Pub/Sub Lite
L'esempio seguente legge i messaggi da un abbonamento Pub/Sub Lite esistente utilizzando l'API readStream
. Il connettore emette messaggi conformi allo schema della tabella fisso formattato come spark.sql.Row
.
Per inviare il job di lettura a Dataproc:
Console
- Carica lo script PySpark nel bucket Cloud Storage.
- Vai alla console Cloud Storage.
- Seleziona il bucket.
- Utilizza Carica file per caricare lo script PySpark che intendi utilizzare.
- Invia il job al cluster Dataproc:
- Vai alla console Dataproc.
- Vai ai job.
- Fai clic su Invia job.
- Inserisci i dettagli del lavoro.
- In Cluster, scegli il cluster.
- In Job (Job), assegna un nome all'ID job.
- Per Tipo di job, scegli PySpark.
- Per File python principale, fornisci l'URI di Cloud Storage dello script PySpark caricato che inizia con
gs://
. - Per i file JAR, scegli la versione più recente del connettore Spark da Maven, cerca il file JAR con le dipendenze nelle opzioni di download e copia il link.
- Per Argomenti, se utilizzi lo script PySpark completo di GitHub, inserisci
--project_number=
PROJECT_NUMBER,--location=
PUBSUBLITE_LOCATION,--subscription_id=
SUBSCRIPTION_ID. Se copi lo script PySpark riportato sopra con i to-do completati, lascia vuoto questo campo. - In Proprietà, inserisci la chiave
spark.master
e il valoreyarn
. - Fai clic su Invia.
gcloud
Utilizza di nuovo il comando gcloud dataproc jobs submit pyspark per inviare il job a Dataproc:
gcloud dataproc jobs submit pyspark spark_streaming_to_pubsublite_example.py \
--region=$DATAPROC_REGION \
--cluster=$CLUSTER_ID \
--jars=gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-LATEST-with-dependencies.jar \
--driver-log-levels=root=INFO \
--properties=spark.master=yarn \
-- --project_number=$PROJECT_NUMBER --location=$PUBSUBLITE_LOCATION --subscription_id=$SUBSCRIPTION
--region
: la regione Dataproc preselezionata.--cluster
: il nome del cluster Dataproc.--jars
: il file jar uber del connettore Spark Pub/Sub Lite con le dipendenze in un bucket Cloud Storage pubblico. Puoi anche visitare questo link per scaricare il file jar uber con le dipendenze da Maven.--driver-log-levels
: imposta il livello di logging su INFO a livello di radice.--properties
: utilizza il gestore delle risorse YARN per il master Spark.--
: fornisci gli argomenti richiesti per lo script.
Se l'operazione readStream
va a buon fine, dovresti vedere messaggi di log come quelli riportati di seguito localmente e nella pagina dei dettagli del job nella consoleGoogle Cloud :
+--------------------+---------+------+---+----+--------------------+--------------------+----------+
| subscription|partition|offset|key|data| publish_timestamp| event_timestamp|attributes|
+--------------------+---------+------+---+----+--------------------+--------------------+----------+
|projects/50200928...| 0| 89523| 0| .|2021-09-03 23:01:...|2021-09-03 22:56:...| []|
|projects/50200928...| 0| 89524| 1| .|2021-09-03 23:01:...|2021-09-03 22:56:...| []|
|projects/50200928...| 0| 89525| 2| .|2021-09-03 23:01:...|2021-09-03 22:56:...| []|
Riproduci ed elimina i messaggi da Pub/Sub Lite
Le operazioni di ricerca non funzionano quando si legge da Pub/Sub Lite utilizzando il connettore Spark Pub/Sub Lite perché i sistemi Apache Spark eseguono il proprio monitoraggio degli offset all'interno delle partizioni. La soluzione alternativa consiste nel svuotare, eseguire ricerche e riavviare i flussi di lavoro.
Esegui la pulizia
Per evitare che al tuo account Google Cloud vengano addebitati costi relativi alle risorse utilizzate in questa pagina, segui questi passaggi.
Elimina l'argomento e la sottoscrizione.
gcloud pubsub lite-topics delete $TOPIC
gcloud pubsub lite-subscriptions delete $SUBSCRIPTION
Elimina il cluster Dataproc.
gcloud dataproc clusters delete $CLUSTER_ID --region=$DATAPROC_REGION
Rimuovi il bucket Cloud Storage.
gcloud storage rm gs://$BUCKET
Passaggi successivi
Consulta l'esempio di conteggio delle parole in Java per il connettore Spark Pub/Sub Lite.
Scopri come accedere all'output del driver del job Dataproc.
Altri connettori Spark di prodotti Google Cloud: connettore BigQuery, connettore Bigtable, connettore Cloud Storage.