Write Pub/Sub Lite messages by using Apache Spark
The Pub/Sub Lite Spark Connector is an open-source Java client library that supports the use of Pub/Sub Lite as an input and output source for Apache Spark Structured Streaming . The connector works in all Apache Spark distributions, including Dataproc.
This quickstart shows you how to:
- read messages from Pub/Sub Lite
- write messages to Pub/Sub Lite
using PySpark from a Dataproc Spark cluster.
Before you begin
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Pub/Sub Lite, Dataproc, Cloud Storage, Logging APIs.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Pub/Sub Lite, Dataproc, Cloud Storage, Logging APIs.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
Set up
Create variables for your project.
export PROJECT_ID=$(gcloud config get-value project)
export PROJECT_NUMBER=$(gcloud projects list \ --filter="projectId:$PROJECT_ID" \ --format="value(PROJECT_NUMBER)")
Create a Cloud Storage bucket. Cloud Storage bucket names must be globally unique.
export BUCKET=your-bucket-name
gcloud storage buckets create gs://$BUCKET
Create a Pub/Sub Lite topic and subscription in a supported location. See Create a topic if you use a Pub/Sub Lite reservation.
export TOPIC=your-lite-topic-id
export SUBSCRIPTION=your-lite-subscription-id
export PUBSUBLITE_LOCATION=your-lite-location
gcloud pubsub lite-topics create $TOPIC \ --location=$PUBSUBLITE_LOCATION \ --partitions=2 \ --per-partition-bytes=30GiB
gcloud pubsub lite-subscriptions create $SUBSCRIPTION \ --location=$PUBSUBLITE_LOCATION \ --topic=$TOPIC
Create a Dataproc cluster.
export DATAPROC_REGION=your-dataproc-region
export CLUSTER_ID=your-dataproc-cluster-id
gcloud dataproc clusters create $CLUSTER_ID \ --region $DATAPROC_REGION \ --image-version 2.1 \ --scopes 'https://www.googleapis.com/auth/cloud-platform' \ --enable-component-gateway \ --bucket $BUCKET
--region
: a supported Dataproc region where your Pub/Sub Lite topic and subscription reside.--image-version
: the cluster's image version , which determines the Apache Spark version installed on the cluster. Choose 2.x.x image release versions because the Pub/Sub Lite Spark Connector currently supports Apache Spark 3.x.x.--scopes
: enable API access to Google Cloud services in the same project.--enable-component-gateway
: enable access to the Apache Spark web UI.--bucket
: a staging Cloud Storage bucket used to store cluster job dependencies, driver output, and cluster config files.
Clone the quickstart repository and navigate to the sample code directory:
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
cd python-docs-samples/pubsublite/spark-connector/
Writing to Pub/Sub Lite
The following example will:
- create a
rate source
that generates consecutive numbers and timestamps formatted as
spark.sql.Row
- transform the data to match the required
table schema
by the Pub/Sub Lite Spark Connector's
writeStream
API - write the data to an existing Pub/Sub Lite topic
To submit the write job to Dataproc:
Console
- Upload the PySpark script to your Cloud Storage bucket.
- Go to the Cloud Storage console.
- Select your bucket.
- Use Upload files to upload the PySpark script that you intend to use.
- Submit the job to your Dataproc cluster:
- Go to the Dataproc console.
- Navigate to jobs.
- Click Submit job.
- Fill in the job details.
- Under Cluster, choose your cluster.
- Under Job, give a name to the job ID.
- For Job type, choose PySpark.
- For Main python file, provide the gcloud storage URI of the
uploaded PySpark script that starts with
gs://
. - For Jar files, choose the latest Spark connector version from Maven , look for the jar with dependencies in the download options, and copy its link.
- For Arguments, if you use the full PySpark script from
GitHub, enter
--project_number=
PROJECT_NUMBER,--location=
PUBSUBLITE_LOCATION,--topic_id=
TOPIC_ID ; if you copy the PySpark script above with the to-do's completed, leave it blank. - Under Properties, enter key
spark.master
and valueyarn
. - Click Submit.
gcloud
Use the gcloud dataproc jobs submit pyspark command to submit the job to Dataproc:
gcloud dataproc jobs submit pyspark spark_streaming_to_pubsublite_example.py \
--region=$DATAPROC_REGION \
--cluster=$CLUSTER_ID \
--jars=gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-LATEST-with-dependencies.jar \
--driver-log-levels=root=INFO \
--properties=spark.master=yarn \
-- --project_number=$PROJECT_NUMBER --location=$PUBSUBLITE_LOCATION --topic_id=$TOPIC
--region
: the pre-selected Dataproc region.--cluster
: the Dataproc cluster name.--jars
: the Pub/Sub Lite Spark Connector's uber jar with dependencies in a public Cloud Storage bucket. You can also visit this link to download the uber jar with dependencies from Maven.--driver-log-levels
: set logging level to INFO at the root level.--properties
: use YARN resource manager for the Spark master.--
: provide the arguments required by the script.
If the writeStream
operation succeeds, you should see log messages
like the following locally as well as in the job details page in the
Google Cloud console:
INFO com.google.cloud.pubsublite.spark.PslStreamWriter: Committed 1 messages for epochId ..
Reading from Pub/Sub Lite
The following example will read messages from an existing
Pub/Sub Lite subscription using the
readStream
API. The connector will output messages that conforms to the fixed
table schema
formatted as
spark.sql.Row
.
To submit the read job to Dataproc:
Console
- Upload the PySpark script to your Cloud Storage bucket.
- Go to the Cloud Storage console.
- Select your bucket.
- Use Upload files to upload the PySpark script that you intend to use.
- Submit the job to your Dataproc cluster:
- Go to the Dataproc console.
- Navigate to jobs.
- Click Submit job.
- Fill in the job details.
- Under Cluster, choose your cluster.
- Under Job, give a name to the job ID.
- For Job type, choose PySpark.
- For Main python file, provide the gcloud storage URI of the
uploaded PySpark script that starts with
gs://
. - For Jar files, choose the latest Spark connector version from Maven , look for the jar with dependencies in the download options, and copy its link.
- For Arguments, if you use the full PySpark script from
GitHub, enter
--project_number=
PROJECT_NUMBER,--location=
PUBSUBLITE_LOCATION,--subscription_id=
SUBSCRIPTION_ID ; if you copy the PySpark script above with the to-do's completed, leave it blank. - Under Properties, enter key
spark.master
and valueyarn
. - Click Submit.
gcloud
Use the gcloud dataproc jobs submit pyspark command again to submit the job to Dataproc:
gcloud dataproc jobs submit pyspark spark_streaming_to_pubsublite_example.py \
--region=$DATAPROC_REGION \
--cluster=$CLUSTER_ID \
--jars=gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-LATEST-with-dependencies.jar \
--driver-log-levels=root=INFO \
--properties=spark.master=yarn \
-- --project_number=$PROJECT_NUMBER --location=$PUBSUBLITE_LOCATION --subscription_id=$SUBSCRIPTION
--region
: the pre-selected Dataproc region.--cluster
: the Dataproc cluster name.--jars
: the Pub/Sub Lite Spark Connector's uber jar with dependencies in a public Cloud Storage bucket. You can also visit this link to download the uber jar with dependencies from Maven.--driver-log-levels
: set logging level to INFO at the root level.--properties
: use YARN resource manager for the Spark master.--
: provide required arguments for the script.
If the readStream
operation succeeds, you should see log messages
like the following locally as well as in the job details page in the
Google Cloud console:
+--------------------+---------+------+---+----+--------------------+--------------------+----------+
| subscription|partition|offset|key|data| publish_timestamp| event_timestamp|attributes|
+--------------------+---------+------+---+----+--------------------+--------------------+----------+
|projects/50200928...| 0| 89523| 0| .|2021-09-03 23:01:...|2021-09-03 22:56:...| []|
|projects/50200928...| 0| 89524| 1| .|2021-09-03 23:01:...|2021-09-03 22:56:...| []|
|projects/50200928...| 0| 89525| 2| .|2021-09-03 23:01:...|2021-09-03 22:56:...| []|
Replay and purge messages from Pub/Sub Lite
Seek operations don't work when reading from Pub/Sub Lite using the Pub/Sub Lite Spark Connector because Apache Spark systems perform their own tracking of offsets within partitions. The workaround is to drain, seek and restart the workflows.
Clean up
To avoid incurring charges to your Google Cloud account for the resources used on this page, follow these steps.
Delete the topic and subscription.
gcloud pubsub lite-topics delete $TOPIC
gcloud pubsub lite-subscriptions delete $SUBSCRIPTION
Delete the Dataproc cluster.
gcloud dataproc clusters delete $CLUSTER_ID --region=$DATAPROC_REGION
Remove the Cloud Storage bucket.
gcloud storage rm gs://$BUCKET
What's next
Check out the word count example in Java for the Pub/Sub Lite Spark Connector.
Learn how to access the Dataproc job driver output.
Other Spark connectors by Google Cloud products: BigQuery connector, Bigtable connector, Cloud Storage connector.