Using Apache Spark DStreams with Dataproc and Pub/Sub

This tutorial shows how to deploy an Apache Spark DStreams app on Dataproc and process messages from Pub/Sub in near real time. Spark offers two APIs for streaming: the original Discretized Streams API, or DStreams, and the more recent Structured Streaming API, which came out as an alpha release in Spark 2.0 and as a stable release in Spark 2.2. While Structured Streaming offers several new, important features such as event time operations and the Datasets and DataFrames abstractions, it also has some limitations. For example, Structured Streaming does not yet support operations such as sorting or multiple streaming aggregations.

This tutorial focuses on deploying apps that use the DStreams API and fully managed solutions that are available in Google Cloud.


  • Generate thousands of simulated tweets and publish them as individual messages to Pub/Sub.
  • Deploy a Spark streaming app to Dataproc to process the generated tweets and identify trending hashtags in near real time.
  • Save the streaming operations output to Datastore.
  • Deploy an HTTP function on Cloud Functions to display the latest trending hashtags on a web page.


This tutorial uses the following billable components of Google Cloud:

To generate a cost estimate based on your projected usage, use the pricing calculator. New Google Cloud users might be eligible for a free trial.

Reference architecture

The following diagram shows the flow of information through the system.

Flow of information through the system

  1. The Tweet Generator program generates thousands of simulated tweets and publishes them to the tweets Pub/Sub topic.
  2. The Pub/Sub topic propagates newly received tweets, and the Spark streaming app subscribes to that topic for pull deliveries.
  3. The Spark streaming app pulls the latest tweets at regular intervals, for example, every 20 seconds. Then it extracts and counts hashtags over a sliding window, for example, the past 60 seconds.
  4. At the end of each sliding window, the Spark streaming app saves the latest trending hashtags to Datastore for persistent storage.
  5. A user opens a web browser and visits the page that is served by an HTTP function that is deployed on Cloud Functions.
  6. The HTTP function reads the latest trending hashtags from Datastore.
  7. The HTTP function generates an HTML page that shows trending hashtags.
  8. The HTTP function returns the HTML page and displays it in the user's browser.

Before you begin

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud Console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Cloud project. Learn how to confirm that billing is enabled for your project.

  4. Start a Cloud Shell instance. You'll use Cloud Shell throughout this tutorial.
    OPEN Cloud Shell
  5. In Cloud Shell, enable the Dataproc, Pub/Sub, Cloud Functions, and Datastore APIs:
    gcloud services enable \ \ \ \
  6. Activate the Datastore database for your project:
    gcloud app create --region=us-central

    Note: Datastore requires an active App Engine application, so you must create one by using this command.

  7. Clone the Git repository for this tutorial:
    git clone
  8. Change to the repository root and save the path into an environment variable to use later:
    cd dataproc-pubsub-spark-streaming
    export REPO_ROOT=$PWD

Creating the Pub/Sub topic and subscription

To set up the Pub/Sub topic and subscription, follow these steps.

  1. Create the tweets topic:

    gcloud pubsub topics create tweets
  2. Create the tweets-subscription subscription for the tweets topic:

    gcloud pubsub subscriptions create tweets-subscription --topic=tweets

The tweets Pub/Sub topic is ready to accept new messages to be published later by Tweet Generator, and the Pub/Sub tweets-subscription subscription is ready to be pulled from by the Spark streaming app. The following diagram shows the new state after you run the previous commands.

New state after you create the Pub/Sub topic and subscription

Creating a service account for Dataproc

In this section, you create a service account that the Dataproc cluster can use. You also assign the necessary permissions to allow the cluster instances to access Pub/Sub and Datastore.

  1. Create a service account:

    export SERVICE_ACCOUNT_NAME="dataproc-service-account"
    gcloud iam service-accounts create $SERVICE_ACCOUNT_NAME
  2. Add the Dataproc worker IAM role to allow the service account to create clusters and run jobs:

    export PROJECT=$(gcloud info --format='value(config.project)')
    gcloud projects add-iam-policy-binding $PROJECT \
        --role roles/dataproc.worker \
  3. Add the Datastore user IAM role to allow the service account to read and write to the database:

    gcloud projects add-iam-policy-binding $PROJECT \
        --role roles/datastore.user \
  4. Add the Pub/Sub subscriber IAM role to allow the service account to subscribe to the tweets-subscription Pub/Sub subscription:

    gcloud beta pubsub subscriptions add-iam-policy-binding \
        tweets-subscription \
        --role roles/pubsub.subscriber \

Creating a Dataproc cluster

When you create the Dataproc cluster, you must provide the following:

  • The pubsub and datastore access scopes in order to allow cluster instances to access the corresponding APIs for Pub/Sub and Datastore.
  • The service account that you created in the previous section. Dataproc assigns this service account to every instance in the cluster so that all the instances get the correct permissions to run the app.
  • The Dataproc cluster image version 1.2, because it contains the latest patch version of Spark 2.2 that Dataproc supports and that the app in this tutorial requires.

Run the following command:

gcloud dataproc clusters create demo-cluster \
    --region=us-central1 \
    --zone=us-central1-a \
    --scopes=pubsub,datastore \
    --image-version=1.2 \

Submitting the Spark streaming job

The following diagram shows the sliding window mechanism that the Spark streaming app uses.

The sliding window mechanism that the Spark streaming app uses

The Spark streaming app collects pipeline executions of new tweets from the tweets Pub/Sub topic every 20 seconds. It processes new tweets together with all tweets that were collected over a 60-second window. The following code sets up the stream from Pub/Sub:

val sparkConf = new SparkConf().setAppName("TrendingHashtags")
val ssc = new StreamingContext(sparkConf, Seconds(slidingInterval.toInt))

// Set the checkpoint directory
val yarnTags = sparkConf.get("spark.yarn.tags")
val jobId = yarnTags.split(",").filter(_.startsWith("dataproc_job")).head
ssc.checkpoint(checkpointDirectory + '/' + jobId)

// Create stream
val messagesStream: DStream[String] = PubsubUtils
    "tweets-subscription",  // Cloud Pub/Sub subscription for incoming tweets, StorageLevel.MEMORY_AND_DISK_SER_2)
  .map(message => new String(message.getData(), StandardCharsets.UTF_8))

The Spark app extracts and counts all hashtags using this simple pipeline:

private[demo] def extractTrendingTags(input: RDD[String]): RDD[Popularity] =
  input.flatMap(_.split("\\s+")) // Split on any white character
    .filter(_.startsWith("#")) // Keep only the hashtags
    // Remove punctuation, force to lowercase
    .map(_.replaceAll("[,.!?:;]", "").toLowerCase)
    // Remove the first #
    .map(_.replaceFirst("^#", ""))
    .filter(!_.isEmpty) // Remove any non-words
    .map((_, 1)) // Create word count pairs
    .reduceByKey(_ + _) // Count occurrences
    .map(r => Popularity(r._1, r._2))
    // Sort hashtags by descending number of occurrences
    .sortBy(r => (-r.amount, r.tag), ascending = true)

The Spark app saves the top 10 trending hashtags to a new database row in Datastore:

private[demo] def convertToEntity(hashtags: Array[Popularity],
                                  keyFactory: String => KeyFactory): FullEntity[IncompleteKey] = {
  val hashtagKeyFactory: KeyFactory = keyFactory("Hashtag")

  val listValue = hashtags.foldLeft[ListValue.Builder](ListValue.newBuilder())(
    (listValue, hashTag) => listValue.addValue(convertToDatastore(hashtagKeyFactory, hashTag))

  val rowKeyFactory: KeyFactory = keyFactory("TrendingHashtags")


For more details, see the full code for the Spark streaming app.

Follow these steps to run the app.

  1. Change to the application directory:

    cd $REPO_ROOT/spark
  2. Change the Java Development Kit (JDK) version to 1.8:

    sudo update-java-alternatives -s java-1.8.0-openjdk-amd64 && export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64/jre
  3. Build the application archive:

    mvn clean package

    The spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar archive containing the application code and dependencies is created in the spark/target directory.

  4. Submit the Spark streaming app job:

    export PROJECT=$(gcloud info --format='value(config.project)')
    export JAR="spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar"
    export SPARK_PROPERTIES="spark.dynamicAllocation.enabled=false,spark.streaming.receiver.writeAheadLog.enabled=true"
    export ARGUMENTS="$PROJECT 60 20 60 hdfs:///user/spark/checkpoint"
    gcloud dataproc jobs submit spark \
    --cluster demo-cluster \
    --region us-central1 \
    --async \
    --jar target/$JAR \
    --max-failures-per-hour 10 \
    --properties $SPARK_PROPERTIES \


    • The job's arguments are:
      • The ID of your Google Cloud project.
      • The duration of the window, in seconds.
      • The interval during which the streaming operations are performed, in seconds.
      • The total running time for the job, in minutes. If 0 was provided, the job would run indefinitely until it is explicitly terminated.
      • The directory that is used to store periodic checkpoint data to increase fault tolerance and to help avoid data loss. We recommend keeping this directory in HDFS when using short checkpoint intervals.
    • The --async parameter lets the job run asynchronously, which allows you to continue using Cloud Shell while the job is running.
    • The --jar parameter points to the application archive created in the previous step. Dataproc uploads the archive to the cluster instances right before the job starts.
    • The --max-failures-per-hour parameter lets the job restart on potential failures to increase resilience. For more details, see the document on restartable jobs.
    • The spark.dynamicAllocation.enabled=false property disables dynamic resource allocation, which is otherwise enabled by default on Dataproc clusters. Dynamic resource allocation adjusts the number of executors based on the workload, which is not effective and may cause data loss in a streaming context.
    • The spark.streaming.receiver.writeAheadLog.enabled=true property enables write ahead logs to increase fault tolerance and to help avoid data loss.
  5. Display the list of active jobs and note the JOB_ID value for the job:

    gcloud dataproc jobs list --region=us-central1 --state-filter=active

    The output is similar to the following:

    JOB_ID                            TYPE   STATUS
    473ecb6d14e2483cb88a18988a5b2e56  spark  RUNNING
  6. View the job output by opening the following URL in your browser. Replace [JOB_ID] with the value noted in the previous step.[JOB_ID]?region=us-central1

    The output is similar to the following:

    Window ending Wed Apr 11 22:03:00 UTC 2018 for the past 60 seconds
    No trending hashtags in this window.
    Window ending Wed Apr 11 22:03:20 UTC 2018 for the past 60 seconds
    No trending hashtags in this window.

The Spark streaming job runs on Dataproc and periodically checks for new tweets from the tweets Pub/Sub topic. At this point, no tweets have been generated, so the app isn't processing any data yet. The following diagram shows the new state.

New state after you create the Dataproc cluster and
Datastore database

Deploying the HTTP function

The HTTP function offers a simple user interface that displays the latest trending hashtags. When you open the function URL in a web browser, the function reads the latest trending hashtags from Datastore and returns them in a simple web page. See the function code for more details.

Follow these steps to deploy and test the HTTP function.

  1. Deploy the HTTP function. This might take up to 2 minutes to complete:

    cd $REPO_ROOT/http_function
    gcloud functions deploy http_function \
        --trigger-http \
        --runtime nodejs10 \
        --allow-unauthenticated \
  2. Verify that no hashtags are displayed by opening the following URL in your browser. Replace [PROJECT] with your project ID.


    Because no tweets have been generated yet, you see the following text on the page: "No trending hashtags at this time... Try again later."

The following diagram shows the flow of information that you just enabled. The HTTP function is deployed and is ready to accept user requests and query the Datastore database.

Flow of information that you enabled

Generating simulated tweets

Follow these steps to publish thousands of simulated tweets to Pub/Sub:

  1. Create and activate a Python virtual environment:

    cd $REPO_ROOT/tweet-generator
    virtualenv venv
    source venv/bin/activate
  2. Install the Python dependencies:

    pip install -r requirements.txt
  3. Run Tweet Generator to generate thousands of simulated tweets:

    export PROJECT=$(gcloud info --format='value(config.project)')
    python $PROJECT 15 1000 &


    • The arguments are:
      • The ID of your project.
      • The total execution time for Tweet Generator, in minutes.
      • The number of tweets generated per minute.
    • Based on the passed arguments, the program runs for 15 minutes and generates 1000 tweets per minute, for a total of 15,000 tweets.
    • The & character lets the program run in the background so you can continue to use Cloud Shell.

    Following are some examples of tweets that the program generates:

    "Piece let #outside carry identify. Management great whom
    surface. Sure away couple build seat."
    "South wonder hotel those. Machine among source manager #likely
    anyone. Ground enough big no know."

    For more details on how Tweet Generator works, check out the source code.

  4. View the job output by visiting the following page in your browser. Replace [JOB_ID] with the value noted in the previous step.[JOB_ID]?region=us-central1

    The output is similar to the following:

    Window ending 2018-04-11T22:13:40.053000000Z for the past 30 seconds
    Trending hashtags in this window:
    #outside, 22
    #likely, 18
    #information, 18
    #support, 17
    #national, 12
    #knowledge, 11
    #painting, 11
    #marriage, 8
    #remain, 7
    #finish, 7

    You see the latest top 10 trending hashtags and the number of times that each hashtag occurs in the current stream window.

  5. Verify that the results are properly saved in the database by running the following GQL query:

    export ACCESS_TOKEN=$(gcloud auth print-access-token)
    curl -X POST \ -H "Authorization: Bearer "$ACCESS_TOKEN \ -H "Content-Type: application/json; charset=utf-8" \ --data "{ 'gqlQuery': { 'allowLiterals': true, 'queryString': ' SELECT * from TrendingHashtags ORDER BY datetime DESC LIMIT 1' } }" \ "$PROJECT:runQuery"

    The output is similar to the following (truncated for concision):

      "batch": {
        "entityResultType": "FULL",
        "entityResults": [
            "entity": {
              "key": {
                "partitionId": {
                  "projectId": "[PROJECT]"
                "path": [
                    "kind": "TrendingHashtags",
                    "id": "5681777339269120"
              "properties": {
                "datetime": {
                  "timestampValue": "2018-04-11T15:15:20.253Z"
                "hashtags": {
                  "arrayValue": {
                    "values": [
                        "entityValue": {
                          "key": {
                            "partitionId": {
                              "projectId": "[PROJECT]"
                            "path": [
                          "properties": {
                            "name": {
                              "stringValue": "#outside"
                            "occurrences": {
                              "integerValue": "24"
  6. Open the HTTP function web page by entering the following URL in your browser's address bar. Replace [PROJECT] with your project ID.


The page displays the latest trending hashtags and refreshes every few seconds.

Terminating the job

The job is set to terminate automatically after a few minutes, but you can terminate it now by running the following command. Replace [JOB_ID] with the job ID that you noted earlier.

gcloud dataproc jobs kill [JOB_ID] --region us-central1 --quiet

To stop Tweet Generator, run this command:

pkill -9 -f ""

Cleaning up

To avoid incurring charges to your Google Cloud account for the resources used in this tutorial, either delete the project that contains the resources, or keep the project and delete the individual resources.

  • Clean up any resources you created so you won't be billed for them in the future. The easiest way to eliminate billing is to delete the project you created for the tutorial.
  • Alternatively, you can delete individual resources.

Delete the project

  1. In the Cloud Console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

Delete individual resources

Run the following commands to delete individual resources, instead of deleting the whole project:

gcloud dataproc clusters delete demo-cluster --quiet
gcloud functions delete http_function --quiet
gcloud pubsub topics delete tweets --quiet
gcloud pubsub subscriptions delete tweets-subscription --quiet
gcloud iam service-accounts delete $SERVICE_ACCOUNT_NAME@$ --quiet

Last, refer to the documentation to learn how to bulk delete all the TrendingHashtags entities from your Datastore database.

What's next