使用 Dataproc、BigQuery 和 Apache Spark ML 进行机器学习

适用于 Apache SparkBigQuery 连接器让数据科学家能够将 BigQuery 的无缝可扩缩 SQL 引擎的强大功能与 Apache Spark 的机器学习功能相结合。在本教程中,我们将介绍如何使用 Dataproc、BigQuery 和 Apache Spark ML 在数据集上执行机器学习。

目标

使用线性回归构建基于五个因素的出生体重模型:

  1. 妊娠周数
  2. 母亲的年龄
  3. 父亲的年龄
  4. 母亲在怀孕期间增加的体重
  5. Apgar 得分

BigQuery 用于准备线性回归输入表,该表将写入到您的 Google Cloud Platform 项目中。Python 用于在 BigQuery 中查询和管理数据。生成的线性回归表可在 Apache Spark 中访问,而 Spark ML 用于构建和评估模型。Dataproc PySpark 作业用于调用 Spark ML 函数。

费用

本教程使用 Google Cloud Platform 的可计费组件,包括:

  • Compute Engine
  • Dataproc
  • BigQuery

请使用价格计算器根据您的预计使用情况来估算费用。Cloud Platform 新用户可能有资格免费试用

准备工作

Dataproc 集群安装了 Spark 组件,包括 Spark ML。要设置 Dataproc 集群并运行此示例中的代码,您需要执行(或已完成)以下各项:

  1. 登录您的 Google 帐号。

    如果您还没有 Google 帐号,请注册一个新帐号

  2. 在 Cloud Console 的项目选择器页面上,选择或创建 Cloud 项目。

    转到项目选择器页面

  3. 启用 Dataproc, BigQuery, Compute Engine API。

    启用 API

  4. 安装并初始化 Cloud SDK
  5. 在项目中创建 Dataproc 集群。您的集群应在 Spark 2.0 或更高版本上运行 Dataproc 版本(包括机器学习库)。

创建 BigQuery natality 数据的子集

在本部分中,你在项目中创建一个数据集,然后在数据集中创建一个表,你可从公开可用的出生率 BigQuery 资料集中复制出生率数据子集至该表。在本教程的后半部,您将使用此表中的子集数据来预测出生体重与产妇年龄、父亲年龄和妊娠周数的函数关系。

您可以使用 Google Cloud Console 创建数据子集,也可以在本地机器上运行 Python 脚本。

控制台

  1. 在项目中创建数据集。

    1. 转到 BigQuery 网页界面
    2. 在左侧导航窗格中,点击您的项目名称,然后点击创建数据集
    3. 创建数据集对话框中执行以下操作:
      1. 对于数据集 ID,输入“natality_regression”。
      2. 对于数据位置,您可以选择数据集的位置。默认位置值为 US multi-region。 创建数据集后,就无法再更改此位置。
      3. 对于默认表到期时间,请选择以下任一选项:
        • 从不(默认值):您必须手动删除表格。
        • 天数:表格将在创建之时起的指定天数后删除。
      4. 对于加密,请选择以下任一选项:
      5. 使用 Web 界面创建数据集时,点击创建数据集
  2. 针对公共出生率数据集运行查询,然后将查询结果保存在数据集的新表中。

    1. 将以下查询复制并粘贴到查询编辑器中,然后点击“运行”。
      SELECT
      weight_pounds,
      mother_age,
      father_age,
      gestation_weeks,
      weight_gain_pounds,
      apgar_5min
      FROM
      `bigquery-public-data.samples.natality`
      WHERE
      weight_pounds IS NOT NULL
      AND mother_age IS NOT NULL
      AND father_age IS NOT NULL
      AND gestation_weeks IS NOT NULL
      AND weight_gain_pounds IS NOT NULL
      AND apgar_5min IS NOT NULL
      
    2. 查询完成后(约需 1 分钟),点击保存结果,然后选择保存选项,将结果保存为项目的 natality_regression 数据集中的“regression_input”BigQuery 表。

Python

  1. 请参阅设置 Python 开发环境,了解关于安装 Python 和适用于 Python 的 Google Cloud 客户端库(运行代码所需的资源)的说明。建议您安装和使用 Python virtualenv

  2. 将以下 natality_tutorial.py 代码复制并粘贴到本地机器的 python Shell 中。 在 Shell 中按下 <return> 键运行代码,以在默认 Google Cloud 项目中创建“natality_regression”BigQuery 数据集,其中的“regression_input”表填充了公开 natality 数据的子集。

    """Create a Google BigQuery linear regression input table.
    
    In the code below, the following actions are taken:
    * A new dataset is created "natality_regression."
    * A query is run against the public dataset,
        bigquery-public-data.samples.natality, selecting only the data of
        interest to the regression, the output of which is stored in a new
        "regression_input" table.
    * The output table is moved over the wire to the user's default project via
        the built-in BigQuery Connector for Spark that bridges BigQuery and
        Cloud Dataproc.
    """
    
    from google.cloud import bigquery
    
    # Create a new Google BigQuery client using Google Cloud Platform project
    # defaults.
    client = bigquery.Client()
    
    # Prepare a reference to a new dataset for storing the query results.
    dataset_id = "natality_regression"
    
    dataset = bigquery.Dataset(client.dataset(dataset_id))
    
    # Create the new BigQuery dataset.
    dataset = client.create_dataset(dataset)
    
    # In the new BigQuery dataset, create a reference to a new table for
    # storing the query results.
    table_ref = dataset.table("regression_input")
    
    # Configure the query job.
    job_config = bigquery.QueryJobConfig()
    
    # Set the destination table to the table reference created above.
    job_config.destination = table_ref
    
    # Set up a query in Standard SQL, which is the default for the BigQuery
    # Python client library.
    # The query selects the fields of interest.
    query = """
        SELECT
            weight_pounds, mother_age, father_age, gestation_weeks,
            weight_gain_pounds, apgar_5min
        FROM
            `bigquery-public-data.samples.natality`
        WHERE
            weight_pounds IS NOT NULL
            AND mother_age IS NOT NULL
            AND father_age IS NOT NULL
            AND gestation_weeks IS NOT NULL
            AND weight_gain_pounds IS NOT NULL
            AND apgar_5min IS NOT NULL
    """
    
    # Run the query.
    query_job = client.query(query, job_config=job_config)
    query_job.result()  # Waits for the query to finish
  3. 确认创建 natality_regression 数据集和 regression_input 表。

运行线性回归

在本部分中,您将使用 Google Cloud Console 或通过从本地终端运行 gcloud 命令,将作业提交到 Dataproc 服务以运行 PySpark 线性回归。

控制台

  1. 将以下代码复制并粘贴到本地机器上的新 natality_sparkml.py 文件中。

    """Run a linear regression using Apache Spark ML.
    
    In the following PySpark (Spark Python API) code, we take the following actions:
    
      * Load a previously created linear regression (BigQuery) input table
        into our Cloud Dataproc Spark cluster as an RDD (Resilient
        Distributed Dataset)
      * Transform the RDD into a Spark Dataframe
      * Vectorize the features on which the model will be trained
      * Compute a linear regression using Spark ML
    
    """
    
    from __future__ import print_function
    from pyspark.context import SparkContext
    from pyspark.ml.linalg import Vectors
    from pyspark.ml.regression import LinearRegression
    from pyspark.sql.session import SparkSession
    # The imports, above, allow us to access SparkML features specific to linear
    # regression as well as the Vectors types.
    
    # Define a function that collects the features of interest
    # (mother_age, father_age, and gestation_weeks) into a vector.
    # Package the vector in a tuple containing the label (`weight_pounds`) for that
    # row.
    def vector_from_inputs(r):
      return (r["weight_pounds"], Vectors.dense(float(r["mother_age"]),
                                                float(r["father_age"]),
                                                float(r["gestation_weeks"]),
                                                float(r["weight_gain_pounds"]),
                                                float(r["apgar_5min"])))
    
    sc = SparkContext()
    spark = SparkSession(sc)
    
    # Read the data from BigQuery as a Spark Dataframe.
    natality_data = spark.read.format("bigquery").option(
        "table", "natality_regression.regression_input").load()
    # Create a view so that Spark SQL queries can be run against the data.
    natality_data.createOrReplaceTempView("natality")
    
    # As a precaution, run a query in Spark SQL to ensure no NULL values exist.
    sql_query = """
    SELECT *
    from natality
    where weight_pounds is not null
    and mother_age is not null
    and father_age is not null
    and gestation_weeks is not null
    """
    clean_data = spark.sql(sql_query)
    
    # Create an input DataFrame for Spark ML using the above function.
    training_data = clean_data.rdd.map(vector_from_inputs).toDF(["label",
                                                                 "features"])
    training_data.cache()
    
    # Construct a new LinearRegression object and fit the training data.
    lr = LinearRegression(maxIter=5, regParam=0.2, solver="normal")
    model = lr.fit(training_data)
    # Print the model summary.
    print("Coefficients:" + str(model.coefficients))
    print("Intercept:" + str(model.intercept))
    print("R^2:" + str(model.summary.r2))
    model.summary.residuals.show()
    
    

  2. 将本地 natality_sparkml.py 文件复制到项目中的 Cloud Storage 存储分区。

    gsutil cp natality_sparkml.py gs://bucket-name
    

  3. 从 Dataproc 提交作业页面运行回归。

    1. 主 Python 文件字段中,插入您的 natality_sparkml.py 文件副本所在的 Cloud Storage 存储分区的 gs:// URI。

    2. 选择 PySpark 作为作业类型

    3. Jar 文件字段中插入 gs://spark-lib/bigquery/spark-bigquery-latest.jar。这可让 PySpark 应用在运行时使用 spark-bigquery-connector 将 BigQuery 数据读取到 Spark DataFrame 中。

    4. 填写作业 ID区域集群字段。

    5. 点击创建以在集群上运行作业。

作业完成后,Dataproc 作业详情窗口将显示线性回归输出模型汇总。

gcloud

  1. 将以下代码复制并粘贴到本地机器上的新 natality_sparkml.py 文件中。

    """Run a linear regression using Apache Spark ML.
    
    In the following PySpark (Spark Python API) code, we take the following actions:
    
      * Load a previously created linear regression (BigQuery) input table
        into our Cloud Dataproc Spark cluster as an RDD (Resilient
        Distributed Dataset)
      * Transform the RDD into a Spark Dataframe
      * Vectorize the features on which the model will be trained
      * Compute a linear regression using Spark ML
    
    """
    
    from __future__ import print_function
    from pyspark.context import SparkContext
    from pyspark.ml.linalg import Vectors
    from pyspark.ml.regression import LinearRegression
    from pyspark.sql.session import SparkSession
    # The imports, above, allow us to access SparkML features specific to linear
    # regression as well as the Vectors types.
    
    # Define a function that collects the features of interest
    # (mother_age, father_age, and gestation_weeks) into a vector.
    # Package the vector in a tuple containing the label (`weight_pounds`) for that
    # row.
    def vector_from_inputs(r):
      return (r["weight_pounds"], Vectors.dense(float(r["mother_age"]),
                                                float(r["father_age"]),
                                                float(r["gestation_weeks"]),
                                                float(r["weight_gain_pounds"]),
                                                float(r["apgar_5min"])))
    
    sc = SparkContext()
    spark = SparkSession(sc)
    
    # Read the data from BigQuery as a Spark Dataframe.
    natality_data = spark.read.format("bigquery").option(
        "table", "natality_regression.regression_input").load()
    # Create a view so that Spark SQL queries can be run against the data.
    natality_data.createOrReplaceTempView("natality")
    
    # As a precaution, run a query in Spark SQL to ensure no NULL values exist.
    sql_query = """
    SELECT *
    from natality
    where weight_pounds is not null
    and mother_age is not null
    and father_age is not null
    and gestation_weeks is not null
    """
    clean_data = spark.sql(sql_query)
    
    # Create an input DataFrame for Spark ML using the above function.
    training_data = clean_data.rdd.map(vector_from_inputs).toDF(["label",
                                                                 "features"])
    training_data.cache()
    
    # Construct a new LinearRegression object and fit the training data.
    lr = LinearRegression(maxIter=5, regParam=0.2, solver="normal")
    model = lr.fit(training_data)
    # Print the model summary.
    print("Coefficients:" + str(model.coefficients))
    print("Intercept:" + str(model.intercept))
    print("R^2:" + str(model.summary.r2))
    model.summary.residuals.show()
    
    

  2. 将本地 natality_sparkml.py 文件复制到项目中的 Cloud Storage 存储分区。

    gsutil cp natality_sparkml.py gs://bucket-name
    

  3. 通过从本地机器的终端窗口中运行如下所示的 gcloud 命令,将 Pyspark 作业提交到 Dataproc 服务。

    1. 对于 --files 标志值,请插入您的 natality_sparkml.py 文件副本所在的 Cloud Storage 存储分区的名称。
    2. --jars 标志值可让 PySpark jobv 在运行时使用 spark-bigquery-connector 将 BigQuery 数据读取到 Spark DataFrame 中。
      gcloud dataproc jobs submit pyspark \
          --cluster=cluster-name \
          --region=region \
          --files=gs://your-bucket/natality_sparkml.py \
          --jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar
      

作业完成时,线性回归输出(模型汇总)将显示在终端窗口中。

<<< # Print the model summary.
... print "Coefficients:" + str(model.coefficients)
Coefficients:[0.0166657454602,-0.00296751984046,0.235714392936,0.00213002070133,-0.00048577251587]
<<< print "Intercept:" + str(model.intercept)
Intercept:-2.26130330748
<<< print "R^2:" + str(model.summary.r2)
R^2:0.295200579035
<<< model.summary.residuals.show()
+--------------------+
|           residuals|
+--------------------+
| -0.7234737533344147|
|  -0.985466980630501|
| -0.6669710598385468|
|  1.4162434829714794|
|-0.09373154375186754|
|-0.15461747949235072|
| 0.32659061654192545|
|  1.5053877697929803|
|  -0.640142797263989|
|   1.229530260294963|
|-0.03776160295256...|
| -0.5160734239126814|
| -1.5165972740062887|
|  1.3269085258245008|
|  1.7604670124710626|
|  1.2348130901905972|
|   2.318660276655887|
|  1.0936947030883175|
|  1.0169768511417363|
| -1.7744915698181583|
+--------------------+
only showing top 20 rows.

清理

完成“使用 Dataproc、BigQuery 和 Apache Spark ML 进行机器学习”教程后,您可以清理在 Google Cloud 上创建的资源,避免这些资源占用配额,日后产生费用。以下部分介绍如何删除或关闭这些资源。

删除项目

为了避免产生费用,最简单的方法是删除您为本教程创建的项目。

要删除项目,请执行以下操作:

  1. 在 Cloud Console 中,转到管理资源页面。

    转到“管理资源”页面

  2. 在项目列表中,选择要删除的项目,然后点击删除
  3. 在对话框中输入项目 ID,然后点击关闭以删除项目。

删除 Dataproc 集群

请参阅删除集群