使用 Dataproc、BigQuery 和 Apache Spark ML 进行机器学习


适用于 Apache SparkBigQuery 连接器让数据科学家能够将 BigQuery 的无缝可伸缩 SQL 引擎的强大功能与 Apache Spark 的机器学习功能相结合。在本教程中,我们将介绍如何使用 Dataproc、BigQuery 和 Apache Spark ML 在数据集上执行机器学习。

目标

使用线性回归来将出生体重模型建立成由五个因素组成的函数:

  • 妊娠周数
  • 母亲的年龄
  • 父亲的年龄
  • 母亲在怀孕期间增加的体重
  • Apgar 得分

使用以下工具:

  • BigQuery,用于准备线性回归输入表,该表将写入到您的 Google Cloud 项目
  • Python,用于在 BigQuery 中查询和管理数据
  • Apache Spark,用于访问生成的线性回归表
  • Spark ML,用于构建和评估模型
  • Dataproc PySpark 作业,用于调用 Spark ML 函数

费用

在本文档中,您将使用 Google Cloud 的以下收费组件:

  • Compute Engine
  • Dataproc
  • BigQuery

您可使用价格计算器根据您的预计使用情况来估算费用。 Google Cloud 新用户可能有资格申请免费试用

准备工作

Dataproc 集群安装了 Spark 组件,包括 Spark ML。要设置 Dataproc 集群并运行此示例中的代码,您需要执行(或已完成)以下各项:

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Enable the Dataproc, BigQuery, Compute Engine APIs.

    Enable the APIs

  4. Install the Google Cloud CLI.
  5. To initialize the gcloud CLI, run the following command:

    gcloud init
  6. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  7. Enable the Dataproc, BigQuery, Compute Engine APIs.

    Enable the APIs

  8. Install the Google Cloud CLI.
  9. To initialize the gcloud CLI, run the following command:

    gcloud init
  10. 在项目中创建 Dataproc 集群。您的集群应在 Spark 2.0 或更高版本上运行 Dataproc 版本(包括机器学习库)。

创建 BigQuery natality 数据的子集

在本部分中,你在项目中创建一个数据集,然后在数据集中创建一个表,你可从公开可用的出生率 BigQuery 资料集中复制出生率数据子集至该表。在本教程的后半部,您将使用此表中的子集数据来预测出生体重与产妇年龄、父亲年龄和妊娠周数的函数关系。

您可以使用 Google Cloud 控制台创建数据子集,也可以在本地机器上运行 Python 脚本。

控制台

  1. 在项目中创建数据集。

    1. 转到 BigQuery 网页界面
    2. 在左侧导航窗格中,点击您的项目名称,然后点击创建数据集
    3. 创建数据集对话框中执行以下操作:
      1. 对于数据集 ID,输入“natality_regression”。
      2. 对于数据位置,您可以选择数据集的位置。默认位置值为 US multi-region。 创建数据集后,就无法再更改此位置。
      3. 对于默认表到期时间,请选择以下任一选项:
        • 从不(默认值):您必须手动删除表格。
        • 天数:表格将在创建之时起的指定天数后删除。
      4. 对于加密,请选择以下任一选项:
      5. 使用 Web 界面创建数据集时,点击创建数据集
  2. 针对公共出生率数据集运行查询,然后将查询结果保存在数据集的新表中。

    1. 将以下查询复制并粘贴到查询编辑器中,然后点击“运行”。
      CREATE OR REPLACE TABLE natality_regression.regression_input as
      SELECT
      weight_pounds,
      mother_age,
      father_age,
      gestation_weeks,
      weight_gain_pounds,
      apgar_5min
      FROM
      `bigquery-public-data.samples.natality`
      WHERE
      weight_pounds IS NOT NULL
      AND mother_age IS NOT NULL
      AND father_age IS NOT NULL
      AND gestation_weeks IS NOT NULL
      AND weight_gain_pounds IS NOT NULL
      AND apgar_5min IS NOT NULL
      
    2. 查询完成后(大约需要 1 分钟),结果会保存为项目的 natality_regression 数据集中的“regression_input”BigQuery 表。

Python

在尝试此示例之前,请按照 Dataproc 快速入门:使用客户端库中的 Python 设置说明进行操作。 如需了解详情,请参阅 Dataproc Python API 参考文档

如需向 Dataproc 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

  1. 请参阅设置 Python 开发环境,了解关于安装 Python 和适用于 Python 的 Google Cloud 客户端库(运行代码所需的资源)的说明。建议您安装和使用 Python virtualenv

  2. 将以下 natality_tutorial.py 代码复制并粘贴到本地机器的 python Shell 中。 在 Shell 中按下 <return> 键运行代码,以在默认的Google Cloud 项目中创建“natality_regression”BigQuery 数据集,其中的“regression_input”表填充了公开 natality 数据的子集。

    """Create a Google BigQuery linear regression input table.
    
    In the code below, the following actions are taken:
    * A new dataset is created "natality_regression."
    * A query is run against the public dataset,
        bigquery-public-data.samples.natality, selecting only the data of
        interest to the regression, the output of which is stored in a new
        "regression_input" table.
    * The output table is moved over the wire to the user's default project via
        the built-in BigQuery Connector for Spark that bridges BigQuery and
        Cloud Dataproc.
    """
    
    from google.cloud import bigquery
    
    # Create a new Google BigQuery client using Google Cloud Platform project
    # defaults.
    client = bigquery.Client()
    
    # Prepare a reference to a new dataset for storing the query results.
    dataset_id = "natality_regression"
    dataset_id_full = f"{client.project}.{dataset_id}"
    
    dataset = bigquery.Dataset(dataset_id_full)
    
    # Create the new BigQuery dataset.
    dataset = client.create_dataset(dataset)
    
    # Configure the query job.
    job_config = bigquery.QueryJobConfig()
    
    # Set the destination table to where you want to store query results.
    # As of google-cloud-bigquery 1.11.0, a fully qualified table ID can be
    # used in place of a TableReference.
    job_config.destination = f"{dataset_id_full}.regression_input"
    
    # Set up a query in Standard SQL, which is the default for the BigQuery
    # Python client library.
    # The query selects the fields of interest.
    query = """
        SELECT
            weight_pounds, mother_age, father_age, gestation_weeks,
            weight_gain_pounds, apgar_5min
        FROM
            `bigquery-public-data.samples.natality`
        WHERE
            weight_pounds IS NOT NULL
            AND mother_age IS NOT NULL
            AND father_age IS NOT NULL
            AND gestation_weeks IS NOT NULL
            AND weight_gain_pounds IS NOT NULL
            AND apgar_5min IS NOT NULL
    """
    
    # Run the query.
    client.query_and_wait(query, job_config=job_config)  # Waits for the query to finish
  3. 确认创建 natality_regression 数据集和 regression_input 表。

运行线性回归

在本部分中,您将使用 Google Cloud 控制台或通过从本地终端运行 gcloud 命令,将作业提交到 Dataproc 服务以运行 PySpark 线性回归。

控制台

  1. 将以下代码复制并粘贴到本地机器上的新 natality_sparkml.py 文件中。

    """Run a linear regression using Apache Spark ML.
    
    In the following PySpark (Spark Python API) code, we take the following actions:
    
      * Load a previously created linear regression (BigQuery) input table
        into our Cloud Dataproc Spark cluster as an RDD (Resilient
        Distributed Dataset)
      * Transform the RDD into a Spark Dataframe
      * Vectorize the features on which the model will be trained
      * Compute a linear regression using Spark ML
    
    """
    from pyspark.context import SparkContext
    from pyspark.ml.linalg import Vectors
    from pyspark.ml.regression import LinearRegression
    from pyspark.sql.session import SparkSession
    # The imports, above, allow us to access SparkML features specific to linear
    # regression as well as the Vectors types.
    
    
    # Define a function that collects the features of interest
    # (mother_age, father_age, and gestation_weeks) into a vector.
    # Package the vector in a tuple containing the label (`weight_pounds`) for that
    # row.
    def vector_from_inputs(r):
      return (r["weight_pounds"], Vectors.dense(float(r["mother_age"]),
                                                float(r["father_age"]),
                                                float(r["gestation_weeks"]),
                                                float(r["weight_gain_pounds"]),
                                                float(r["apgar_5min"])))
    
    sc = SparkContext()
    spark = SparkSession(sc)
    
    # Read the data from BigQuery as a Spark Dataframe.
    natality_data = spark.read.format("bigquery").option(
        "table", "natality_regression.regression_input").load()
    # Create a view so that Spark SQL queries can be run against the data.
    natality_data.createOrReplaceTempView("natality")
    
    # As a precaution, run a query in Spark SQL to ensure no NULL values exist.
    sql_query = """
    SELECT *
    from natality
    where weight_pounds is not null
    and mother_age is not null
    and father_age is not null
    and gestation_weeks is not null
    """
    clean_data = spark.sql(sql_query)
    
    # Create an input DataFrame for Spark ML using the above function.
    training_data = clean_data.rdd.map(vector_from_inputs).toDF(["label",
                                                                 "features"])
    training_data.cache()
    
    # Construct a new LinearRegression object and fit the training data.
    lr = LinearRegression(maxIter=5, regParam=0.2, solver="normal")
    model = lr.fit(training_data)
    # Print the model summary.
    print("Coefficients:" + str(model.coefficients))
    print("Intercept:" + str(model.intercept))
    print("R^2:" + str(model.summary.r2))
    model.summary.residuals.show()

  2. 将本地 natality_sparkml.py 文件复制到项目中的 Cloud Storage 存储分区。

    gcloud storage cp natality_sparkml.py gs://bucket-name
    

  3. 从 Dataproc 提交作业页面运行回归。

    1. 主 Python 文件字段中,插入您的 natality_sparkml.py 文件副本所在的 Cloud Storage 存储分区的 gs:// URI。

    2. 选择 PySpark 作为作业类型

    3. Jar 文件字段中插入 gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar。这可让 PySpark 应用在运行时使用 spark-bigquery-connector 将 BigQuery 数据读取到 Spark DataFrame 中。

    4. 填写作业 ID区域集群字段。

    5. 点击提交以在集群上运行作业。

作业完成后,Dataproc 作业详情窗口将显示线性回归输出模型汇总。

gcloud

  1. 将以下代码复制并粘贴到本地机器上的新 natality_sparkml.py 文件中。

    """Run a linear regression using Apache Spark ML.
    
    In the following PySpark (Spark Python API) code, we take the following actions:
    
      * Load a previously created linear regression (BigQuery) input table
        into our Cloud Dataproc Spark cluster as an RDD (Resilient
        Distributed Dataset)
      * Transform the RDD into a Spark Dataframe
      * Vectorize the features on which the model will be trained
      * Compute a linear regression using Spark ML
    
    """
    from pyspark.context import SparkContext
    from pyspark.ml.linalg import Vectors
    from pyspark.ml.regression import LinearRegression
    from pyspark.sql.session import SparkSession
    # The imports, above, allow us to access SparkML features specific to linear
    # regression as well as the Vectors types.
    
    
    # Define a function that collects the features of interest
    # (mother_age, father_age, and gestation_weeks) into a vector.
    # Package the vector in a tuple containing the label (`weight_pounds`) for that
    # row.
    def vector_from_inputs(r):
      return (r["weight_pounds"], Vectors.dense(float(r["mother_age"]),
                                                float(r["father_age"]),
                                                float(r["gestation_weeks"]),
                                                float(r["weight_gain_pounds"]),
                                                float(r["apgar_5min"])))
    
    sc = SparkContext()
    spark = SparkSession(sc)
    
    # Read the data from BigQuery as a Spark Dataframe.
    natality_data = spark.read.format("bigquery").option(
        "table", "natality_regression.regression_input").load()
    # Create a view so that Spark SQL queries can be run against the data.
    natality_data.createOrReplaceTempView("natality")
    
    # As a precaution, run a query in Spark SQL to ensure no NULL values exist.
    sql_query = """
    SELECT *
    from natality
    where weight_pounds is not null
    and mother_age is not null
    and father_age is not null
    and gestation_weeks is not null
    """
    clean_data = spark.sql(sql_query)
    
    # Create an input DataFrame for Spark ML using the above function.
    training_data = clean_data.rdd.map(vector_from_inputs).toDF(["label",
                                                                 "features"])
    training_data.cache()
    
    # Construct a new LinearRegression object and fit the training data.
    lr = LinearRegression(maxIter=5, regParam=0.2, solver="normal")
    model = lr.fit(training_data)
    # Print the model summary.
    print("Coefficients:" + str(model.coefficients))
    print("Intercept:" + str(model.intercept))
    print("R^2:" + str(model.summary.r2))
    model.summary.residuals.show()

  2. 将本地 natality_sparkml.py 文件复制到项目中的 Cloud Storage 存储分区。

    gcloud storage cp natality_sparkml.py gs://bucket-name
    

  3. 通过从本地机器的终端窗口中运行如下所示的 gcloud 命令,将 Pyspark 作业提交到 Dataproc 服务。

    1. --jars 标志值可让 PySpark jobv 在运行时使用 spark-bigquery-connector 将 BigQuery 数据读取到 Spark DataFrame 中。
      gcloud dataproc jobs submit pyspark \
          gs://your-bucket/natality_sparkml.py \
          --cluster=cluster-name \
          --region=region \
          --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_SCALA_VERSION-CONNECTOR_VERSION.jar
      

作业完成时,线性回归输出(模型汇总)将显示在终端窗口中。

<<< # Print the model summary.
... print "Coefficients:" + str(model.coefficients)
Coefficients:[0.0166657454602,-0.00296751984046,0.235714392936,0.00213002070133,-0.00048577251587]
<<< print "Intercept:" + str(model.intercept)
Intercept:-2.26130330748
<<< print "R^2:" + str(model.summary.r2)
R^2:0.295200579035
<<< model.summary.residuals.show()
+--------------------+
|           residuals|
+--------------------+
| -0.7234737533344147|
|  -0.985466980630501|
| -0.6669710598385468|
|  1.4162434829714794|
|-0.09373154375186754|
|-0.15461747949235072|
| 0.32659061654192545|
|  1.5053877697929803|
|  -0.640142797263989|
|   1.229530260294963|
|-0.03776160295256...|
| -0.5160734239126814|
| -1.5165972740062887|
|  1.3269085258245008|
|  1.7604670124710626|
|  1.2348130901905972|
|   2.318660276655887|
|  1.0936947030883175|
|  1.0169768511417363|
| -1.7744915698181583|
+--------------------+
only showing top 20 rows.

  

清理

完成本教程后,您可以清理您创建的资源,让它们停止使用配额,以免产生费用。以下部分介绍如何删除或关闭这些资源。

删除项目

为了避免产生费用,最简单的方法是删除您为本教程创建的项目。

要删除项目,请执行以下操作:

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

删除 Dataproc 集群

请参阅删除集群

后续步骤