Using Apache Spark DStreams with Cloud Dataproc and Cloud Pub/Sub

This tutorial shows how to deploy an Apache Spark DStreams app on Cloud Dataproc and process messages from Cloud Pub/Sub in near real time. Spark offers two APIs for streaming: the original Discretized Streams API, or DStreams, and the more recent Structured Streaming API, which came out as an alpha release in Spark 2.0 and as a stable release in Spark 2.2. While Structured Streaming offers several new, important features such as event time operations and the Datasets and DataFrames abstractions, it also has some limitations. For example, Structured Streaming does not yet support operations such as sorting or multiple streaming aggregations.

This tutorial focuses on deploying apps that use the DStreams API and fully managed solutions that are available in Google Cloud Platform (GCP).

Objectives

  • Generate thousands of simulated tweets and publish them as individual messages to Cloud Pub/Sub.
  • Deploy a Spark streaming app to Cloud Dataproc to process the generated tweets and identify trending hashtags in near real time.
  • Save the streaming operations output to Cloud Datastore.
  • Deploy an HTTP function on Cloud Functions to display the latest trending hashtags on a web page.

Costs

This tutorial uses the following billable components of Google Cloud Platform:

To generate a cost estimate based on your projected usage, use the pricing calculator. New GCP users might be eligible for a free trial.

Reference architecture

The following diagram shows the flow of information through the system.

Flow of information through the system

  1. The Tweet Generator program generates thousands of simulated tweets and publishes them to the tweets Cloud Pub/Sub topic.
  2. The Cloud Pub/Sub topic propagates newly received tweets, and the Spark streaming app subscribes to that topic for pull deliveries.
  3. The Spark streaming app pulls the latest tweets at regular intervals, for example, every 20 seconds. Then it extracts and counts hashtags over a sliding window, for example, the past 60 seconds.
  4. At the end of each sliding window, the Spark streaming app saves the latest trending hashtags to Cloud Datastore for persistent storage.
  5. A user opens a web browser and visits the page that is served by an HTTP function that is deployed on Cloud Functions.
  6. The HTTP function reads the latest trending hashtags from Cloud Datastore.
  7. The HTTP function generates an HTML page that shows trending hashtags.
  8. The HTTP function returns the HTML page and displays it in the user's browser.

Before you begin

  1. Sign in to your Google Account.

    If you don't already have one, sign up for a new account.

  2. Select or create a GCP project.

    Go to the project selector page

  3. Make sure that billing is enabled for your Google Cloud Platform project. Learn how to enable billing.

  4. Start a Cloud Shell instance. You'll use Cloud Shell throughout this tutorial.
    OPEN Cloud Shell
  5. In Cloud Shell, enable the Cloud Dataproc, Cloud Pub/Sub, Cloud Functions, and Cloud Datastore APIs:
    gcloud services enable \
        dataproc.googleapis.com \
        pubsub.googleapis.com \
        cloudfunctions.googleapis.com \
        datastore.googleapis.com
  6. Activate the Cloud Datastore database for your project:
    gcloud app create --region=us-central

    Note: Cloud Datastore requires an active App Engine application, so you must create one by using this command.

  7. Clone the Git repository for this tutorial:
    git clone https://github.com/GoogleCloudPlatform/dataproc-pubsub-spark-streaming
    
  8. Change to the repository root and save the path into an environment variable to use later:
    cd dataproc-pubsub-spark-streaming
    export REPO_ROOT=$PWD
    

Creating the Cloud Pub/Sub topic and subscription

To set up the Cloud Pub/Sub topic and subscription, follow these steps.

  1. Create the tweets topic:

    gcloud pubsub topics create tweets
  2. Create the tweets-subscription subscription for the tweets topic:

    gcloud pubsub subscriptions create tweets-subscription --topic=tweets
    

The tweets Cloud Pub/Sub topic is ready to accept new messages to be published later by Tweet Generator, and the Cloud Pub/Sub tweets-subscription subscription is ready to be pulled from by the Spark streaming app. The following diagram shows the new state after you run the previous commands.

New state after you create the Cloud Pub/Sub topic and subscription

Creating a service account for Cloud Dataproc

In this section, you create a service account that the Cloud Dataproc cluster can use. You also assign the necessary permissions to allow the cluster instances to access Cloud Pub/Sub and Cloud Datastore.

  1. Create a service account:

    export SERVICE_ACCOUNT_NAME="dataproc-service-account"
    gcloud iam service-accounts create $SERVICE_ACCOUNT_NAME
    
  2. Add the Cloud Dataproc worker IAM role to allow the service account to create clusters and run jobs:

    export PROJECT=$(gcloud info --format='value(config.project)')
    gcloud projects add-iam-policy-binding $PROJECT \
        --role roles/dataproc.worker \
        --member="serviceAccount:$SERVICE_ACCOUNT_NAME@$PROJECT.iam.gserviceaccount.com"
    
  3. Add the Cloud Datastore user IAM role to allow the service account to read and write to the database:

    gcloud projects add-iam-policy-binding $PROJECT \
        --role roles/datastore.user \
        --member="serviceAccount:$SERVICE_ACCOUNT_NAME@$PROJECT.iam.gserviceaccount.com"
    
  4. Add the Cloud Pub/Sub subscriber IAM role to allow the service account to subscribe to the tweets-subscription Cloud Pub/Sub subscription:

    gcloud beta pubsub subscriptions add-iam-policy-binding \
        tweets-subscription \
        --role roles/pubsub.subscriber \
        --member="serviceAccount:$SERVICE_ACCOUNT_NAME@$PROJECT.iam.gserviceaccount.com"
    

Creating a Cloud Dataproc cluster

When you create the Cloud Dataproc cluster, you must provide the following:

  • The pubsub and datastore access scopes in order to allow cluster instances to access the corresponding APIs for Cloud Pub/Sub and Cloud Datastore.
  • The service account that you created in the previous section. Cloud Dataproc assigns this service account to every instance in the cluster so that all the instances get the correct permissions to run the app.
  • The Cloud Dataproc cluster image version 1.2, because it contains the latest patch version of Spark 2.2 that Cloud Dataproc supports and that the app in this tutorial requires.

Run the following command:

gcloud dataproc clusters create demo-cluster \
    --zone=us-central1-a \
    --scopes=pubsub,datastore \
    --image-version=1.2 \
    --service-account="$SERVICE_ACCOUNT_NAME@$PROJECT.iam.gserviceaccount.com"

Submitting the Spark streaming job

The following diagram shows the sliding window mechanism that the Spark streaming app uses.

The sliding window mechanism that the Spark streaming app uses

The Spark streaming app collects pipeline executions of new tweets from the tweets Cloud Pub/Sub topic every 20 seconds. It processes new tweets together with all tweets that were collected over a 60-second window. The following code sets up the stream from Cloud Pub/Sub:

val sparkConf = new SparkConf().setAppName("TrendingHashtags")
val ssc = new StreamingContext(sparkConf, Seconds(slidingInterval.toInt))

// Set the checkpoint directory
val yarnTags = sparkConf.get("spark.yarn.tags")
val jobId = yarnTags.split(",").filter(_.startsWith("dataproc_job")).head
ssc.checkpoint(checkpointDirectory + '/' + jobId)

// Create stream
val messagesStream: DStream[String] = PubsubUtils
  .createStream(
    ssc,
    projectID,
    None,
    "tweets-subscription",  // Cloud Pub/Sub subscription for incoming tweets
    SparkGCPCredentials.builder.build(), StorageLevel.MEMORY_AND_DISK_SER_2)
  .map(message => new String(message.getData(), StandardCharsets.UTF_8))

The Spark app extracts and counts all hashtags using this simple pipeline:

private[demo] def extractTrendingTags(input: RDD[String]): RDD[Popularity] =
  input.flatMap(_.split("\\s+")) // Split on any white character
    .filter(_.startsWith("#")) // Keep only the hashtags
    // Remove punctuation, force to lowercase
    .map(_.replaceAll("[,.!?:;]", "").toLowerCase)
    // Remove the first #
    .map(_.replaceFirst("^#", ""))
    .filter(!_.isEmpty) // Remove any non-words
    .map((_, 1)) // Create word count pairs
    .reduceByKey(_ + _) // Count occurrences
    .map(r => Popularity(r._1, r._2))
    // Sort hashtags by descending number of occurrences
    .sortBy(r => (-r.amount, r.tag), ascending = true)

The Spark app saves the top 10 trending hashtags to a new database row in Cloud Datastore:

private[demo] def convertToEntity(hashtags: Array[Popularity],
                                  keyFactory: String => KeyFactory): FullEntity[IncompleteKey] = {
  val hashtagKeyFactory: KeyFactory = keyFactory("Hashtag")

  val listValue = hashtags.foldLeft[ListValue.Builder](ListValue.newBuilder())(
    (listValue, hashTag) => listValue.addValue(convertToDatastore(hashtagKeyFactory, hashTag))
  )

  val rowKeyFactory: KeyFactory = keyFactory("TrendingHashtags")

  FullEntity.newBuilder(rowKeyFactory.newKey())
    .set("datetime", Timestamp.now())
    .set("hashtags", listValue.build())
    .build()
}

For more details, see the full code for the Spark streaming app.

Follow these steps to run the app.

  1. Change to the application directory:

    cd $REPO_ROOT/spark
  2. Build the application archive:

    mvn clean package

    The spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar archive containing the application code and dependencies is created in the spark/target directory.

  3. Submit the Spark streaming app job:

    export PROJECT=$(gcloud info --format='value(config.project)')
    export JAR="spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar"
    export SPARK_PROPERTIES="spark.dynamicAllocation.enabled=false,spark.streaming.receiver.writeAheadLog.enabled=true"
    export ARGUMENTS="$PROJECT 60 20 60 hdfs:///user/spark/checkpoint"
    
    gcloud dataproc jobs submit spark \
    --cluster demo-cluster \
    --async \
    --jar target/$JAR \
    --max-failures-per-hour 10 \
    --properties $SPARK_PROPERTIES \
    -- $ARGUMENTS
    

    Notes:

    • The job's arguments are:
      • The ID of your GCP project.
      • The duration of the window, in seconds.
      • The interval during which the streaming operations are performed, in seconds.
      • The total running time for the job, in minutes. If 0 was provided, the job would run indefinitely until it is explicitly terminated.
      • The directory that is used to store periodic checkpoint data to increase fault tolerance and to help avoid data loss. We recommend keeping this directory in HDFS when using short checkpoint intervals.
    • The --async parameter lets the job run asynchronously, which allows you to continue using Cloud Shell while the job is running.
    • The --jar parameter points to the application archive created in the previous step. Cloud Dataproc uploads the archive to the cluster instances right before the job starts.
    • The --max-failures-per-hour parameter lets the job restart on potential failures to increase resilience. For more details, see the document on restartable jobs.
    • The spark.dynamicAllocation.enabled=false property disables dynamic resource allocation, which is otherwise enabled by default on Cloud Dataproc clusters. Dynamic resource allocation adjusts the number of executors based on the workload, which is not effective and may cause data loss in a streaming context.
    • The spark.streaming.receiver.writeAheadLog.enabled=true property enables write ahead logs to increase fault tolerance and to help avoid data loss.
  4. Display the list of active jobs and note the JOB_ID value for the job:

    gcloud dataproc jobs list --state-filter=active

    The output is similar to the following:

    JOB_ID                            TYPE   STATUS
    473ecb6d14e2483cb88a18988a5b2e56  spark  RUNNING
    
  5. View the job output by opening the following URL in your browser. Replace [JOB_ID] with the value noted in the previous step.

    https://console.cloud.google.com/dataproc/jobs/[JOB_ID]?region=global
    

    The output is similar to the following:

    -------------------------
    Window ending Wed Apr 11 22:03:00 UTC 2018 for the past 60 seconds
    
    No trending hashtags in this window.
    
    -------------------------
    Window ending Wed Apr 11 22:03:20 UTC 2018 for the past 60 seconds
    
    No trending hashtags in this window.
    

The Spark streaming job runs on Cloud Dataproc and periodically checks for new tweets from the tweets Cloud Pub/Sub topic. At this point, no tweets have been generated, so the app isn't processing any data yet. The following diagram shows the new state.

New state after you create the Cloud Dataproc cluster and
Cloud Datastore database

Deploying the HTTP function

The HTTP function offers a simple user interface that displays the latest trending hashtags. When you open the function URL in a web browser, the function reads the latest trending hashtags from Cloud Datastore and returns them in a simple web page. See the function code for more details.

Follow these steps to deploy and test the HTTP function.

  1. Deploy the HTTP function. This might take up to 2 minutes to complete:

    cd $REPO_ROOT/http_function
    gcloud functions deploy http_function \
        --trigger-http \
        --region=us-central1
    
  2. Verify that no hashtags are displayed by opening the following URL in your browser. Replace [PROJECT] with your project ID.

    https://us-central1-[PROJECT].cloudfunctions.net/http_function
    

    Because no tweets have been generated yet, you see the following text on the page: "No trending hashtags at this time... Try again later."

The following diagram shows the flow of information that you just enabled. The HTTP function is deployed and is ready to accept user requests and query the Cloud Datastore database.

Flow of information that you enabled

Generating simulated tweets

Follow these steps to publish thousands of simulated tweets to Cloud Pub/Sub:

  1. Create and activate a Python virtual environment:

    cd $REPO_ROOT/tweet-generator
    virtualenv venv
    source venv/bin/activate
    
  2. Install the Python dependencies:

    pip install -r requirements.txt
  3. Run Tweet Generator to generate thousands of simulated tweets:

    export PROJECT=$(gcloud info --format='value(config.project)')
    python tweet-generator.py $PROJECT 15 1000 &
    

    Notes:

    • The arguments are:
      • The ID of your project.
      • The total execution time for Tweet Generator, in minutes.
      • The number of tweets generated per minute.
    • Based on the passed arguments, the program runs for 15 minutes and generates 1000 tweets per minute, for a total of 15,000 tweets.
    • The & character lets the program run in the background so you can continue to use Cloud Shell.

    Following are some examples of tweets that the program generates:

    "Piece let #outside carry identify. Management great whom
    surface. Sure away couple build seat."
    
    "South wonder hotel those. Machine among source manager #likely
    anyone. Ground enough big no know."
    

    For more details on how Tweet Generator works, check out the source code.

  4. View the job output by visiting the following page in your browser. Replace [JOB_ID] with the value noted in the previous step.

    https://console.cloud.google.com/dataproc/jobs/[JOB_ID]
    

    The output is similar to the following:

    -------------------------
    Window ending 2018-04-11T22:13:40.053000000Z for the past 30 seconds
    
    Trending hashtags in this window:
    #outside, 22
    #likely, 18
    #information, 18
    #support, 17
    #national, 12
    #knowledge, 11
    #painting, 11
    #marriage, 8
    #remain, 7
    #finish, 7
    

    You see the latest top 10 trending hashtags and the number of times that each hashtag occurs in the current stream window.

  5. Verify that the results are properly saved in the database by running the following GQL query:

    export ACCESS_TOKEN=$(gcloud auth print-access-token)
    curl -X POST \ -H "Authorization: Bearer "$ACCESS_TOKEN \ -H "Content-Type: application/json; charset=utf-8" \ --data "{ 'gqlQuery': { 'allowLiterals': true, 'queryString': ' SELECT * from TrendingHashtags ORDER BY datetime DESC LIMIT 1' } }" \ "https://datastore.googleapis.com/v1beta3/projects/$PROJECT:runQuery"

    The output is similar to the following (truncated for concision):

    {
      "batch": {
        "entityResultType": "FULL",
        "entityResults": [
          {
            "entity": {
              "key": {
                "partitionId": {
                  "projectId": "[PROJECT]"
                },
                "path": [
                  {
                    "kind": "TrendingHashtags",
                    "id": "5681777339269120"
                  }
                ]
              },
              "properties": {
                "datetime": {
                  "timestampValue": "2018-04-11T15:15:20.253Z"
                },
                "hashtags": {
                  "arrayValue": {
                    "values": [
                      {
                        "entityValue": {
                          "key": {
                            "partitionId": {
                              "projectId": "[PROJECT]"
                            },
                            "path": [
                              {
                                "k
                            ]
                          },
                          "properties": {
                            "name": {
                              "stringValue": "#outside"
                            },
                            "occurrences": {
                              "integerValue": "24"
                            }
                          }
                            }
                      },
        ...
    
  6. Open the HTTP function web page by entering the following URL in your browser's address bar. Replace [PROJECT] with your project ID.

    https://us-central1-[PROJECT].cloudfunctions.net/view_trending_hashtags
    

The page displays the latest trending hashtags and refreshes every few seconds.

Terminating the job

The job is set to terminate automatically after a few minutes, but you can terminate it now by running the following command. Replace [JOB_ID] with the job ID that you noted earlier.

gcloud dataproc jobs kill [JOB_ID] --quiet

To stop Tweet Generator, run this command:

pkill -9 -f "tweet-generator.py"

Cleaning up

To avoid incurring charges to your Google Cloud Platform account for the resources used in this tutorial:

  • Clean up any resources you created so you won't be billed for them in the future. The easiest way to eliminate billing is to delete the project you created for the tutorial.
  • Alternatively, you can delete individual resources.

Delete the project

  1. In the GCP Console, go to the Manage resources page.

    Go to the Manage resources page

  2. In the project list, select the project you want to delete and click Delete .
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

Delete individual resources

Run the following commands to delete individual resources, instead of deleting the whole project:

gcloud dataproc clusters delete demo-cluster --quiet
gcloud functions delete http_function --quiet
gcloud pubsub topics delete tweets --quiet
gcloud pubsub subscriptions delete tweets-subscription --quiet
gcloud iam service-accounts delete $SERVICE_ACCOUNT_NAME@$PROJECT.iam.gserviceaccount.com --quiet

Last, refer to the documentation to learn how to bulk delete all the TrendingHashtags entities from your Cloud Datastore database.

What's next

Was this page helpful? Let us know how we did:

Send feedback about...