Use Cloud Dataproc, BigQuery, and Apache Spark ML for Machine Learning

The Google BigQuery Connector for Apache Spark allows Data Scientists to blend the power of BigQuery’s seamlessly scalable SQL engine with Apache Spark’s Machine Learning capabilities. In this tutorial, we show how to use Cloud Dataproc, BigQuery and Apache Spark ML to perform machine learning on a dataset.

Objectives

Use linear regression to build a model of birth weight as a function of five factors:

  1. gestation weeks
  2. mother’s age
  3. father’s age
  4. mother’s weight gain during pregnancy
  5. Apgar score

BigQuery is used to prepare the linear regression input table, which is written to your Google Cloud Platform project. Python running on your local machine is used to query and manage data in BigQuery. The resulting linear regression table is accessed in Apache Spark, and Spark ML is used to build and evaluate the model. PySpark running on the master VM in your Cloud Dataproc cluster is used to invoke Spark ML functions.

Costs

This tutorial uses billable components of Google Cloud Platform, including:

  • Google Compute Engine
  • Google Cloud Dataproc
  • Google BigQuery

Use the Pricing Calculator to generate a cost estimate based on your projected usage. New Cloud Platform users might be eligible for a free trial.

Before you begin

A Cloud Dataproc cluster has the Spark components, including Spark ML, installed. To set up a Cloud Dataproc cluster and run the code in this example, you will need to do (or have done) the following:

  1. Sign in to your Google Account.

    If you don't already have one, sign up for a new account.

  2. Select or create a GCP project.

    Go to the Manage resources page

  3. Enable the Google Cloud Dataproc, BigQuery, Google Compute Engine APIs.

    Enable the APIs

  4. Install and initialize the Cloud SDK.
  5. Create a Cloud Dataproc cluster in your project. Your cluster should be running a Cloud Dataproc version with Spark 2.0 or higher, (includes machine learning libraries).
  6. If you are using Cloud Dataproc version 1.3 or higher (see Supported Cloud Dataproc versions), install the BigQuery connector.

Create a Subset of BigQuery natality data

In this section, you create a dataset in your project, then create a table in the dataset to which you copy a subset of birth rate data from the publicly available natality BigQuery dataset. Later in this tutorial you will use the subset data in this table to predict birth weight as a function of maternal age, paternal age, and gestation weeks.

Console

  1. Create a dataset in your project.
    1. Go to the BigQuery Web UI.
    2. In the left navigation pane, click the down arrow icondown arrow icon next to your project name, then click Create new dataset.
    3. In the Create Dataset dialog:
      • For Dataset ID, enter "natality_regression".
      • For Data location, you can choose a location for the dataset. The default value location is US multi-region. After a dataset is created, the location can't be changed.
      • For Default table expiration, choose one of the following options:
        • Never: (Default) Tables created in the dataset are not automatically deleted. You must delete them manually.
        • Number of days: Any table created in the dataset is deleted after the specified number days from its creation time. This value is applied if you do not set a table expiration when the table is created. Create dataset
    4. Click Create dataset.
  2. Run a query against the public natality dataset, then save the query results in a new table in your dataset.
    1. Copy and paste the following query into the Query Editor, then click Run Query.
      SELECT
        weight_pounds,
        mother_age,
        father_age,
        gestation_weeks,
        weight_gain_pounds,
        apgar_5min
      FROM
        `bigquery-public-data.samples.natality`
      WHERE
        weight_pounds IS NOT NULL
        AND mother_age IS NOT NULL
        AND father_age IS NOT NULL
        AND gestation_weeks IS NOT NULL
        AND weight_gain_pounds IS NOT NULL
        AND apgar_5min IS NOT NULL
      
    2. After the query completes (after approximately 1 minute), in the Query results panel, select SAVE AS →Save as table.
    3. Complete the Save as table dialog to save the query results in a "regression_input" table in your natality_regression dataset. Click Save.

Python

  1. See Setting Up a Python Development Environment for instructions on installing Python and the Google Cloud Client Library for Python (needed to run the code). Installing and using a Python virtualenv is recommended.
  2. Copy and paste the natality_tutorial.py code, below, into a python shell on your local machine. Press the <return> key in the shell to run the code to create a "natality_regression" BigQuery dataset in your default GCP project with a "regression_input" table that is populated with a subset of the public natality data.
    """Create a Google BigQuery linear regression input table.
    
    In the code below, the following actions are taken:
    * A new dataset is created "natality_regression."
    * A query is run against the public dataset,
        bigquery-public-data.samples.natality, selecting only the data of
        interest to the regression, the output of which is stored in a new
        "regression_input" table.
    * The output table is moved over the wire to the user's default project via
        the built-in BigQuery Connector for Spark that bridges BigQuery and
        Cloud Dataproc.
    """
    
    from google.cloud import bigquery
    
    # Create a new Google BigQuery client using Google Cloud Platform project
    # defaults.
    client = bigquery.Client()
    
    # Prepare a reference to a new dataset for storing the query results.
    dataset_ref = client.dataset('natality_regression')
    dataset = bigquery.Dataset(dataset_ref)
    
    # Create the new BigQuery dataset.
    dataset = client.create_dataset(dataset)
    
    # In the new BigQuery dataset, create a reference to a new table for
    # storing the query results.
    table_ref = dataset.table('regression_input')
    
    # Configure the query job.
    job_config = bigquery.QueryJobConfig()
    
    # Set the destination table to the table reference created above.
    job_config.destination = table_ref
    
    # Set up a query in Standard SQL, which is the default for the BigQuery
    # Python client library.
    # The query selects the fields of interest.
    query = """
        SELECT
            weight_pounds, mother_age, father_age, gestation_weeks,
            weight_gain_pounds, apgar_5min
        FROM
            `bigquery-public-data.samples.natality`
        WHERE
            weight_pounds IS NOT NULL
            AND mother_age IS NOT NULL
            AND father_age IS NOT NULL
            AND gestation_weeks IS NOT NULL
            AND weight_gain_pounds IS NOT NULL
            AND apgar_5min IS NOT NULL
    """
    
    # Run the query.
    query_job = client.query(query, job_config=job_config)
    query_job.result()  # Waits for the query to finish

Run a linear regression

In this section, you'll run a PySpark linear regression from the Google Cloud Platform Console or your Cloud Dataproc cluster's master node.

Console

  1. Copy and paste the following code into a new natality_sparkml.py file on your local machine.
    """Run a linear regression using Apache Spark ML.
    
    In the following PySpark (Spark Python API) code, we take the following actions:
    
      * Load a previously created linear regression (BigQuery) input table
        into our Cloud Dataproc Spark cluster as an RDD (Resilient
        Distributed Dataset)
      * Transform the RDD into a Spark Dataframe
      * Vectorize the features on which the model will be trained
      * Compute a linear regression using Spark ML
    
    """
    
    from datetime import datetime
    from pyspark.context import SparkContext
    from pyspark.ml.linalg import Vectors
    from pyspark.ml.regression import LinearRegression
    from pyspark.sql.session import SparkSession
    # The imports, above, allow us to access SparkML features specific to linear
    # regression as well as the Vectors types.
    
    
    # Define a function that collects the features of interest
    # (mother_age, father_age, and gestation_weeks) into a vector.
    # Package the vector in a tuple containing the label (`weight_pounds`) for that
    # row.
    
    
    def vector_from_inputs(r):
      return (r["weight_pounds"], Vectors.dense(float(r["mother_age"]),
                                                float(r["father_age"]),
                                                float(r["gestation_weeks"]),
                                                float(r["weight_gain_pounds"]),
                                                float(r["apgar_5min"])))
    
    # Use Cloud Dataprocs automatically propagated configurations to get
    # the Cloud Storage bucket and Google Cloud Platform project for this
    # cluster.
    sc = SparkContext()
    spark = SparkSession(sc)
    bucket = spark._jsc.hadoopConfiguration().get("fs.gs.system.bucket")
    project = spark._jsc.hadoopConfiguration().get("fs.gs.project.id")
    
    # Set an input directory for reading data from Bigquery.
    todays_date = datetime.strftime(datetime.today(), "%Y-%m-%d-%H-%M-%S")
    input_directory = "gs://{}/tmp/natality-{}".format(bucket, todays_date)
    
    # Set the configuration for importing data from BigQuery.
    # Specifically, make sure to set the project ID and bucket for Cloud Dataproc,
    # and the project ID, dataset, and table names for BigQuery.
    
    conf = {
        # Input Parameters
        "mapred.bq.project.id": project,
        "mapred.bq.gcs.bucket": bucket,
        "mapred.bq.temp.gcs.path": input_directory,
        "mapred.bq.input.project.id": project,
        "mapred.bq.input.dataset.id": "natality_regression",
        "mapred.bq.input.table.id": "regression_input",
    }
    
    # Read the data from BigQuery into Spark as an RDD.
    table_data = spark.sparkContext.newAPIHadoopRDD(
        "com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat",
        "org.apache.hadoop.io.LongWritable",
        "com.google.gson.JsonObject",
        conf=conf)
    
    # Extract the JSON strings from the RDD.
    table_json = table_data.map(lambda x: x[1])
    
    # Load the JSON strings as a Spark Dataframe.
    natality_data = spark.read.json(table_json)
    # Create a view so that Spark SQL queries can be run against the data.
    natality_data.createOrReplaceTempView("natality")
    
    
    # As a precaution, run a query in Spark SQL to ensure no NULL values exist.
    sql_query = """
    SELECT *
    from natality
    where weight_pounds is not null
    and mother_age is not null
    and father_age is not null
    and gestation_weeks is not null
    """
    clean_data = spark.sql(sql_query)
    
    # Create an input DataFrame for Spark ML using the above function.
    training_data = clean_data.rdd.map(vector_from_inputs).toDF(["label",
                                                                 "features"])
    training_data.cache()
    
    # Construct a new LinearRegression object and fit the training data.
    lr = LinearRegression(maxIter=5, regParam=0.2, solver="normal")
    model = lr.fit(training_data)
    # Print the model summary.
    print "Coefficients:" + str(model.coefficients)
    print "Intercept:" + str(model.intercept)
    print "R^2:" + str(model.summary.r2)
    model.summary.residuals.show()
    
    
        
  2. Copy the local natality_sparkml.py file to a Cloud Storage bucket in your project.
    gsutil cp natality_sparkml.py gs://bucket-name
    
  3. Run the regression from the Cloud Dataproc Submit a job page. Insert the gs:// URI of the Cloud Storage bucket where your natality_sparkml.py is located. Select PySpark as the Job type. Click Submit to run the job on your cluster.
    When the job completes, the linear regression output model summary appears in the Cloud Dataproc Job details window.

PySpark

  1. SSH into your Cloud Dataproc cluster's master node.
    1. Go to your project's Cloud Dataproc→Clusters page.
    2. Click your cluster name to open the Cluster details page. Then, click the VM instances tab and select SSH→Open in browser window.
    3. A terminal window that is connected to the master instance opens in your browser.
  2. Run pyspark to open a PySpark shell.
  3. Copy and paste the following code into the shell, then press to run the Spark ML linear regression.
    """Run a linear regression using Apache Spark ML.
    
    In the following PySpark (Spark Python API) code, we take the following actions:
    
      * Load a previously created linear regression (BigQuery) input table
        into our Cloud Dataproc Spark cluster as an RDD (Resilient
        Distributed Dataset)
      * Transform the RDD into a Spark Dataframe
      * Vectorize the features on which the model will be trained
      * Compute a linear regression using Spark ML
    
    """
    
    from datetime import datetime
    from pyspark.context import SparkContext
    from pyspark.ml.linalg import Vectors
    from pyspark.ml.regression import LinearRegression
    from pyspark.sql.session import SparkSession
    # The imports, above, allow us to access SparkML features specific to linear
    # regression as well as the Vectors types.
    
    
    # Define a function that collects the features of interest
    # (mother_age, father_age, and gestation_weeks) into a vector.
    # Package the vector in a tuple containing the label (`weight_pounds`) for that
    # row.
    
    
    def vector_from_inputs(r):
      return (r["weight_pounds"], Vectors.dense(float(r["mother_age"]),
                                                float(r["father_age"]),
                                                float(r["gestation_weeks"]),
                                                float(r["weight_gain_pounds"]),
                                                float(r["apgar_5min"])))
    
    # Use Cloud Dataprocs automatically propagated configurations to get
    # the Cloud Storage bucket and Google Cloud Platform project for this
    # cluster.
    sc = SparkContext()
    spark = SparkSession(sc)
    bucket = spark._jsc.hadoopConfiguration().get("fs.gs.system.bucket")
    project = spark._jsc.hadoopConfiguration().get("fs.gs.project.id")
    
    # Set an input directory for reading data from Bigquery.
    todays_date = datetime.strftime(datetime.today(), "%Y-%m-%d-%H-%M-%S")
    input_directory = "gs://{}/tmp/natality-{}".format(bucket, todays_date)
    
    # Set the configuration for importing data from BigQuery.
    # Specifically, make sure to set the project ID and bucket for Cloud Dataproc,
    # and the project ID, dataset, and table names for BigQuery.
    
    conf = {
        # Input Parameters
        "mapred.bq.project.id": project,
        "mapred.bq.gcs.bucket": bucket,
        "mapred.bq.temp.gcs.path": input_directory,
        "mapred.bq.input.project.id": project,
        "mapred.bq.input.dataset.id": "natality_regression",
        "mapred.bq.input.table.id": "regression_input",
    }
    
    # Read the data from BigQuery into Spark as an RDD.
    table_data = spark.sparkContext.newAPIHadoopRDD(
        "com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat",
        "org.apache.hadoop.io.LongWritable",
        "com.google.gson.JsonObject",
        conf=conf)
    
    # Extract the JSON strings from the RDD.
    table_json = table_data.map(lambda x: x[1])
    
    # Load the JSON strings as a Spark Dataframe.
    natality_data = spark.read.json(table_json)
    # Create a view so that Spark SQL queries can be run against the data.
    natality_data.createOrReplaceTempView("natality")
    
    
    # As a precaution, run a query in Spark SQL to ensure no NULL values exist.
    sql_query = """
    SELECT *
    from natality
    where weight_pounds is not null
    and mother_age is not null
    and father_age is not null
    and gestation_weeks is not null
    """
    clean_data = spark.sql(sql_query)
    
    # Create an input DataFrame for Spark ML using the above function.
    training_data = clean_data.rdd.map(vector_from_inputs).toDF(["label",
                                                                 "features"])
    training_data.cache()
    
    # Construct a new LinearRegression object and fit the training data.
    lr = LinearRegression(maxIter=5, regParam=0.2, solver="normal")
    model = lr.fit(training_data)
    # Print the model summary.
    print "Coefficients:" + str(model.coefficients)
    print "Intercept:" + str(model.intercept)
    print "R^2:" + str(model.summary.r2)
    model.summary.residuals.show()
    
    
        
    The linear regression output (model summary) appears in the terminal window when the job completes.
    <<< # Print the model summary.
    ... print "Coefficients:" + str(model.coefficients)
    Coefficients:[0.0166657454602,-0.00296751984046,0.235714392936,0.00213002070133,-0.00048577251587]
    <<< print "Intercept:" + str(model.intercept)
    Intercept:-2.26130330748
    <<< print "R^2:" + str(model.summary.r2)
    R^2:0.295200579035
    <<< model.summary.residuals.show()
    +--------------------+
    |           residuals|
    +--------------------+
    | -0.7234737533344147|
    |  -0.985466980630501|
    | -0.6669710598385468|
    |  1.4162434829714794|
    |-0.09373154375186754|
    |-0.15461747949235072|
    | 0.32659061654192545|
    |  1.5053877697929803|
    |  -0.640142797263989|
    |   1.229530260294963|
    |-0.03776160295256...|
    | -0.5160734239126814|
    | -1.5165972740062887|
    |  1.3269085258245008|
    |  1.7604670124710626|
    |  1.2348130901905972|
    |   2.318660276655887|
    |  1.0936947030883175|
    |  1.0169768511417363|
    | -1.7744915698181583|
    +--------------------+
    only showing top 20 rows.
    

Cleaning up

After you've finished the Use Cloud Dataproc, BigQuery, and Spark ML for Machine Learning tutorial, you can clean up the resources you created on Google Cloud Platform so you won't be billed for them in the future. The following sections describe how to delete or turn off these resources.

Deleting the project

The easiest way to eliminate billing is to delete the project you created for the tutorial.

To delete the project:

  1. In the GCP Console, go to the Projects page.

    Go to the Projects page

  2. In the project list, select the project you want to delete and click Delete project. After selecting the checkbox next to the project name, click
      Delete project
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

Deleting the Cloud Dataproc cluster

See Delete a cluster.

Deleting Cloud Storage data

To clean up the temporary BigQuery input data files from your project's Cloud Storage bucket:

  1. Run the following commands to obtain the name of the bucket in your project that was used to store the temporary data files (this bucket starts with the characters "dataproc-"). Insert the name of your cluster, as shown below.
    CLUSTER_NAME=<my cluster>
    TEMP_DATA_BUCKET="$(gcloud dataproc clusters describe $CLUSTER_NAME| \
      grep configBucket|awk '{print $2}')"
    echo $TEMP_DATA_BUCKET
    
  2. Go to your project's Cloud Dataproc→Browser page, then click on the TEMP_DATA_BUCKET name. Inside the tmp/ folder, you will see a folder that starts with the name"natality-" (the remaining characters represent the date and time that the folder was created). This is the temporary input data folder to remove. Select the check box to the left of the natality- folder name, then select Delete at the top of the page to remove the folder from Cloud Storage.
Was this page helpful? Let us know how we did:

Send feedback about...

Cloud Dataproc Documentation