Google Cloud Platform

Google Cloud Platform for data scientists: using Jupyter Notebooks with Apache Spark on Google Cloud

Using the combination of Jupyter Notebooks and GCP gives you a familiar data science experience without the tedious infrastructure setup.

If you’re a data scientist, you probably prefer to spend your time exploring and analyzing your data, not thinking about sizing, installing and configuring your environment. If that sounds familiar, you’ll be happy to hear that using Jupyter Notebooks with the Google Cloud fully-managed big data stack gives you the notebook experience you know and love without having to burn hours on the boring infrastructure pieces. To demonstrate, in this post (which is part of an open-ended series about doing data science on GCP), we’ll walk you through the process of:

  • Creating a Jupyter notebook environment on Google Cloud Dataproc, a fully-managed Apache Spark and Hadoop service
  • Using the notebook to explore and visualize the public “NYC Taxi & Limousine Trips” dataset in Google BigQuery, Google’s fully-managed, cloud-native data warehouse service
  • Analyzing that data for a bit of a "hello world" type fun with Spark
Note that Google Cloud Datalab, a data exploration/analysis/visualization tool based on Jupyter, is another “no-ops” option for BigQuery, Google Compute Engine and Google Cloud Storage users. In this post, however, we’ll focus on using notebooks with Spark, which is more conveniently achieved by deploying Jupyter on Cloud Dataproc, as described here. 


Prerequisites

Step 1: Get a Cloud Dataproc cluster up and running

In this step, you'll create a Cloud Dataproc cluster named "datascience" with Jupyter notebooks initialized and running using the command line. (Note: Please do not use Cloud Shell as you will not be able to create a socket connection from it in Step 2.)

The simplest approach is to use all default settings for your cluster. Jupyter will run on port 8123 of your master node. If you don't have defaults set, you'll be prompted at this stage to enter a zone for the cluster. As you'll be connecting to the UI on the cluster, choose zones in a region close to you.

  gcloud dataproc clusters create datascience \
    --initialization-actions \
        gs://dataproc-initialization-actions/jupyter/jupyter.sh \
  Waiting on operation [projects/------/regions/global/operations/XXX-XXX-XXX-XXX-XXX].
Waiting for cluster creation operation...done.                                                                                                                     
Created tw[https://dataproc.googleapis.com/v1/projects/------/regions/global/clusters/datascience].

(If you prefer using a graphical user interface, then the same action can be taken by following these instructions.)

Once completed, your Cloud Dataproc cluster is up and running and ready for a connection.

For the next step, you'll need to know the hostname of your Cloud Dataproc master machine as well as the zone in which your instance was created. To determine that zone, run the following command in your terminal:

  gcloud dataproc clusters list

Output:

  NAME      WORKER_COUNT  STATUS  ZONE
datascience 2     RUNNING europe-west1-c

The cluster master-host-name is the name of your Cloud Dataproc cluster followed by an -m suffix. For example, if your cluster is named "my-cluster", the master-host-name would be "my-cluster-m".

Step 2: Connect to the Jupyter notebook

You'll use an ssh tunnel from your local machine to the server to connect to the notebook. Depending on your machine’s networking setup, this step can take a little while to get right, so before proceeding confirm that everything is working by accessing the YARN UI. From the browser that you launched when following the instructions in the cluster-web-interfaces cloud documentation, access the following URL.

http://datascience-m:8088/

Once you have the tunnel running, connect to the external IP of the notebook and port. The default port is 8123.

http://datascience-m:8123

jupyter-2h3or.PNG

Step 3: Create a new notebook and add libraries Create a new PySpark notebook by clicking the new button on the Jupyter UI.

jupyter-1z0fo.PNG

Everyone will have their own preferred selection of libraries, and adding new ones to the environment is simple. In this example, we'll ensure that pandas, google-api-python-client and seaborn are available.

In your notebook, run the following code cell:

  !pip install --upgrade pandas
!pip install --upgrade google-api-python-client
!pip install --upgrade seaborn

(Note: The default Cloud Dataproc cluster configuration has been setup to work with one PySpark notebook kernel, so ensure you only have one notebook active at a time. Although you can change the configuration to be able to work with multiple running kernels, that process is beyond our scope here.)

Step 4: Run some queries and visualize results

In this step, you'll use the public NYC taxi dataset available in BigQuery to pull some data into a dataframe and visualize it. (Note: The pandas.io.gbq library is used in the examples below because the result set of the queries is small enough to be quickly transferred to the Cloud Dataproc node by paging through the resultset. If there's a need to pull a large dataset into Cloud Dataproc, however, the BigQuery Connector for Spark is a better choice.)

To import the required libraries, run the following in a cell.

  import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import numpy as np
from pandas.io import gbq

To set up the project-id that you'll use, run the following in a cell.

  project_id=''

Next, run a query to find the 50th, 75th and 90th quantiles against the nyc-tlc:yellow dataset. BigQuery will process 130GB of data containing 1,108,779,463 rows. The result set will be 24 rows of data that will be pulled into the notebook.

Run the following code in a cell.

  quantiles = """SELECT
  pickup_hour,
  quantiles[SAFE_ORDINAL(50)] fiftieth,
  quantiles[SAFE_ORDINAL(75)] seventy_fifth,
  quantiles[SAFE_ORDINAL(90)] ninetieth
FROM (
  SELECT
    pickup_hour,
    APPROX_QUANTILES(passenger_count, 101) AS quantiles
  FROM (
    SELECT
      EXTRACT (hour
      FROM
        pickup_datetime) AS pickup_hour,
      passenger_count
    FROM
      `nyc-tlc.yellow.trips`)
  GROUP BY
    pickup_hour )
    ORDER BY 1"""
trips = gbq.read_gbq(query=quantiles, dialect ='standard', project_id=project_id)
trips.head()

Output

  Requesting query... ok.
Query running...
Query done.
Processed: 16.5 Gb
Retrieving results...
Got 24 rows.
Total time taken 0.76 s.
Finished at 2017-01-31 11:47:15.

pickup_hourfiftiethseventy_fifthninetieth
00124
11124
22123
33124
44123

This next step will visualize the result set stored in trips. Run the following in a cell.

  %matplotlib inline
plt.figure();
data = trips.set_index ('pickup_hour')
data.plot(kind='area',stacked=False)
jupyter-3rrzo.PNG

Step 5: Work with a Spark Dataframe and RDD

As described in Step 4, whereas the pandas.io.gbq library is great for pulling smaller results sets into the machine hosting the notebook, the BigQuery Connector for Spark is a better choice for larger ones. Although the example below uses the former for continuity purposes, in the real world, you would likely consider using the latter.

The query below works with the nyc-tlc:green dataset using trips from 2015; the predicate also further restricts the number of rows retrieved to just 1.5M. It should take approximately 2 mins to pull the rows into the notebook.

  rawDataQuery="""
SELECT
  pickup_datetime,
  passenger_count
FROM
  `nyc-tlc.green.trips_2015`
WHERE DATE(pickup_datetime) BETWEEN DATE("2015-01-01") AND DATE("2015-01-31")
"""
rawData = gbq.read_gbq(query=rawDataQuery,dialect ='standard', project_id=project_id)
rawData.head()

Output:

  Requesting query... ok.
Query running...
Query done.
Processed: 151.0 Mb
Retrieving results...
  Got page: 1; 7.0% done. Elapsed 11.1 s.
  Got page: 2; 13.0% done. Elapsed 15.33 s.
  Got page: 3; 20.0% done. Elapsed 19.68 s.
  Got page: 4; 27.0% done. Elapsed 24.34 s.
  Got page: 5; 33.0% done. Elapsed 29.73 s.
  Got page: 6; 40.0% done. Elapsed 34.26 s.
  Got page: 7; 46.0% done. Elapsed 38.79 s.
  Got page: 8; 53.0% done. Elapsed 43.13 s.
  Got page: 9; 60.0% done. Elapsed 48.25 s.
  Got page: 10; 66.0% done. Elapsed 53.48 s.
  Got page: 11; 73.0% done. Elapsed 58.18 s.
  Got page: 12; 80.0% done. Elapsed 64.02 s.
  Got page: 13; 86.0% done. Elapsed 67.83 s.
  Got page: 14; 93.0% done. Elapsed 72.29 s.
  Got page: 15; 99.0% done. Elapsed 77.26 s.
  Got page: 16; 100.0% done. Elapsed 77.97 s.
Got 1508501 rows.
Total time taken 108.6 s.
Finished at 2017-01-31 12:56:55.

pickup_datetimepassenger_count
02015-01-23 09:33:076
12015-01-11 19:06:341
22015-01-24 03:30:211
32015-01-08 04:18:581
42015-01-25 00:44:341

In the next step, a Spark DataFrame is created using sqlContext and some approximate quantiles are calculated with the error value moving from 1.0 down to 0.0.

  # If you are making use of Spark 1.6 (Dataproc 1.0) then change spark.createDataFrame to sqlContext.createDataFrame. 
sDF = spark.createDataFrame(rawData,["pickup_datetime","passenger_count"])
quantiles = [0.5, 0.75, .95]
for error in np.arange(1.0,0.0,-0.1):
    qs = sDF.approxQuantile("passenger_count", quantiles,error) 
    print(qs)

Output:

  [0.0, 0.0, 0.0]
[0.0, 0.0, 9.0]
[0.0, 0.0, 9.0]
[0.0, 9.0, 9.0]
[0.0, 9.0, 9.0]
[0.0, 9.0, 9.0]
[1.0, 9.0, 9.0]
[1.0, 9.0, 9.0]
[1.0, 1.0, 9.0]
[1.0, 1.0, 9.0]

In this next step, a RDD is created from the Spark DataFrame and a "hello world" mean is calculated.

  rdd = sDF.select('passenger_count').rdd 
rdd = rdd.map(lambda x: int(x[0])) 
rdd.mean()

Output:

  1.3869755472485399

Cleaning up

You can save your work by downloading the notebook and storing the file on your workstation. To delete the cluster (remember, ALL work will be deleted), run the following command on your workstation.

  gcloud dataproc clusters delete datascience

Output:

  The cluster 'datascience' and all attached disks will be deleted.
Do you want to continue (Y/n)?  Y
Waiting on operation [projects/xxxxxx/regions/global/operations/xxx-xxx-xxx--xxx].
Waiting for cluster deletion operation...done.                                                                                                       
Deleted [https://dataproc.googleapis.com/v1/projects/xxxxxx/regions/global/clusters/datascience]

Next Steps

To explore Cloud Dataproc further: