使用 Dataproc、BigQuery 和 Apache Spark ML 進行機器學習


Apache Spark 專用的 BigQuery 連接器,能夠讓資料科學家將 BigQuery 可靈活擴充的 SQL 引擎所具備的強大力量,與 Apache Spark 的機器學習功能相結合。在本教學課程中,我們將示範如何在資料集上使用 Dataproc、BigQuery 和 Apache Spark ML 執行機器學習。

目標

使用線性迴歸建構出生重量模型來做為五項因素的函式:

請使用下列工具:

  • BigQuery,用於準備寫入 Google Cloud 專案的線性迴歸輸入資料表
  • Python,用於查詢及管理 BigQuery 中的資料
  • Apache Spark,用於存取產生的線性迴歸資料表
  • Spark ML,用於建構及評估模型
  • Dataproc PySpark 工作,用於叫用 Spark ML 函式

費用

在本文件中,您會使用 Google Cloud的下列計費元件:

  • Compute Engine
  • Dataproc
  • BigQuery

如要根據預測用量估算費用,請使用 Pricing Calculator

初次使用 Google Cloud 的使用者可能符合免費試用資格。

事前準備

Dataproc 叢集具備 Spark 元件,其中包含已安裝的 Spark ML。 如要在這個範例中設定 Dataproc 叢集並執行程式碼,您將需要執行 (或已執行) 下列步驟:

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Enable the Dataproc, BigQuery, Compute Engine APIs.

    Enable the APIs

  4. Install the Google Cloud CLI.

  5. 如果您使用外部識別資訊提供者 (IdP),請先 使用聯合身分登入 gcloud CLI

  6. 如要初始化 gcloud CLI,請執行下列指令:

    gcloud init
  7. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  8. Enable the Dataproc, BigQuery, Compute Engine APIs.

    Enable the APIs

  9. Install the Google Cloud CLI.

  10. 如果您使用外部識別資訊提供者 (IdP),請先 使用聯合身分登入 gcloud CLI

  11. 如要初始化 gcloud CLI,請執行下列指令:

    gcloud init
  12. 在專案中建立 Dataproc 叢集。您的叢集必須使用 Spark 2.0 以上版本 (包括機器學習程式庫),才能執行 Dataproc 版本
  13. 建立 BigQuery natality 資料的子集

    在本節中,您將會在專案中建立資料集,然後在資料集中建立資料表,再將出生率資料子集從開放大眾使用的出生率 BigQuery 資料集複製到該資料表中。稍後在本教學課程中,您將在這個資料表中使用子集資料,透過使用母親年齡、父親年齡與懷孕週數的函式來預測出生重量。

    您可以使用 Google Cloud 控制台建立資料子集,或在本機電腦上執行 Python 指令碼。

    控制台

    1. 在專案中建立資料集。

      1. 前往 BigQuery 網頁版 UI
      2. 在左側導覽面板中,按一下專案名稱,然後按一下「建立資料集」
      3. 在「建立資料集」對話方塊中:
        1. 針對「Dataset ID」,輸入「natality_regression」。
        2. 針對「Data location」(資料位置),您可以選擇該資料集的位置。預設值位置為 US multi-region。 資料集建立後即無法變更位置。
        3. 針對「Default table expiration」(預設資料表到期時間),選擇下列其中一個選項:
          • 永不過期 (預設):您必須手動刪除資料表。
          • 天數:資料表會在建立時間之後於指定天數內刪除。
        4. 在「Encryption」(加密) 部分,選擇下列其中一個選項:
        5. 按一下「建立資料集」
    2. 針對公開出生率資料集執行查詢,然後將查詢結果儲存在資料集的新資料表中。

      1. 複製下列查詢並貼到查詢編輯器,然後按一下「執行」。
        CREATE OR REPLACE TABLE natality_regression.regression_input as
        SELECT
        weight_pounds,
        mother_age,
        father_age,
        gestation_weeks,
        weight_gain_pounds,
        apgar_5min
        FROM
        `bigquery-public-data.samples.natality`
        WHERE
        weight_pounds IS NOT NULL
        AND mother_age IS NOT NULL
        AND father_age IS NOT NULL
        AND gestation_weeks IS NOT NULL
        AND weight_gain_pounds IS NOT NULL
        AND apgar_5min IS NOT NULL
        
      2. 查詢完成後 (大約一分鐘),結果會儲存為專案中 natality_regression 資料集的「regression_input」BigQuery 資料表。

    Python

    在試用這個範例之前,請先按照Python使用用戶端程式庫的 Dataproc 快速入門」中的設定說明操作。詳情請參閱 Dataproc Python API 參考說明文件

    如要向 Dataproc 進行驗證,請設定應用程式預設憑證。 詳情請參閱「為本機開發環境設定驗證」。

    1. 如需安裝 Python 和 Python 專用的 Google Cloud 用戶端程式庫 (需執行程式碼) 的操作說明,請參閱設定 Python 開發環境一文。建議安裝並使用 Python virtualenv

    2. 請複製以下 natality_tutorial.py 程式碼並貼到本機電腦上的 python 殼層。按一下殼層中的 <return> 鍵執行程式碼,以透過公開natality資料子集填入的「regression_input」資料表,在預設Google Cloud 專案中建立「natality_regression」 BigQuery 資料集。

      """Create a Google BigQuery linear regression input table.
      
      In the code below, the following actions are taken:
      * A new dataset is created "natality_regression."
      * A query is run against the public dataset,
          bigquery-public-data.samples.natality, selecting only the data of
          interest to the regression, the output of which is stored in a new
          "regression_input" table.
      * The output table is moved over the wire to the user's default project via
          the built-in BigQuery Connector for Spark that bridges BigQuery and
          Cloud Dataproc.
      """
      
      from google.cloud import bigquery
      
      # Create a new Google BigQuery client using Google Cloud Platform project
      # defaults.
      client = bigquery.Client()
      
      # Prepare a reference to a new dataset for storing the query results.
      dataset_id = "natality_regression"
      dataset_id_full = f"{client.project}.{dataset_id}"
      
      dataset = bigquery.Dataset(dataset_id_full)
      
      # Create the new BigQuery dataset.
      dataset = client.create_dataset(dataset)
      
      # Configure the query job.
      job_config = bigquery.QueryJobConfig()
      
      # Set the destination table to where you want to store query results.
      # As of google-cloud-bigquery 1.11.0, a fully qualified table ID can be
      # used in place of a TableReference.
      job_config.destination = f"{dataset_id_full}.regression_input"
      
      # Set up a query in Standard SQL, which is the default for the BigQuery
      # Python client library.
      # The query selects the fields of interest.
      query = """
          SELECT
              weight_pounds, mother_age, father_age, gestation_weeks,
              weight_gain_pounds, apgar_5min
          FROM
              `bigquery-public-data.samples.natality`
          WHERE
              weight_pounds IS NOT NULL
              AND mother_age IS NOT NULL
              AND father_age IS NOT NULL
              AND gestation_weeks IS NOT NULL
              AND weight_gain_pounds IS NOT NULL
              AND apgar_5min IS NOT NULL
      """
      
      # Run the query.
      client.query_and_wait(query, job_config=job_config)  # Waits for the query to finish
    3. 確認已建立 natality_regression 資料集和 regression_input 資料表。

    執行線性迴歸

    在本節中,您將透過 Google Cloud 控制台將工作提交至 Dataproc 服務,或從本機終端機執行 gcloud 指令,藉此執行 PySpark 線性迴歸。

    控制台

    1. 複製下列程式碼,然後貼到本機電腦上的新 natality_sparkml.py 檔案中。

      """Run a linear regression using Apache Spark ML.
      
      In the following PySpark (Spark Python API) code, we take the following actions:
      
        * Load a previously created linear regression (BigQuery) input table
          into our Cloud Dataproc Spark cluster as an RDD (Resilient
          Distributed Dataset)
        * Transform the RDD into a Spark Dataframe
        * Vectorize the features on which the model will be trained
        * Compute a linear regression using Spark ML
      
      """
      from pyspark.context import SparkContext
      from pyspark.ml.linalg import Vectors
      from pyspark.ml.regression import LinearRegression
      from pyspark.sql.session import SparkSession
      # The imports, above, allow us to access SparkML features specific to linear
      # regression as well as the Vectors types.
      
      
      # Define a function that collects the features of interest
      # (mother_age, father_age, and gestation_weeks) into a vector.
      # Package the vector in a tuple containing the label (`weight_pounds`) for that
      # row.
      def vector_from_inputs(r):
        return (r["weight_pounds"], Vectors.dense(float(r["mother_age"]),
                                                  float(r["father_age"]),
                                                  float(r["gestation_weeks"]),
                                                  float(r["weight_gain_pounds"]),
                                                  float(r["apgar_5min"])))
      
      sc = SparkContext()
      spark = SparkSession(sc)
      
      # Read the data from BigQuery as a Spark Dataframe.
      natality_data = spark.read.format("bigquery").option(
          "table", "natality_regression.regression_input").load()
      # Create a view so that Spark SQL queries can be run against the data.
      natality_data.createOrReplaceTempView("natality")
      
      # As a precaution, run a query in Spark SQL to ensure no NULL values exist.
      sql_query = """
      SELECT *
      from natality
      where weight_pounds is not null
      and mother_age is not null
      and father_age is not null
      and gestation_weeks is not null
      """
      clean_data = spark.sql(sql_query)
      
      # Create an input DataFrame for Spark ML using the above function.
      training_data = clean_data.rdd.map(vector_from_inputs).toDF(["label",
                                                                   "features"])
      training_data.cache()
      
      # Construct a new LinearRegression object and fit the training data.
      lr = LinearRegression(maxIter=5, regParam=0.2, solver="normal")
      model = lr.fit(training_data)
      # Print the model summary.
      print("Coefficients:" + str(model.coefficients))
      print("Intercept:" + str(model.intercept))
      print("R^2:" + str(model.summary.r2))
      model.summary.residuals.show()

    2. 將本機的 natality_sparkml.py 檔案複製到專案的 Cloud Storage bucket。

      gcloud storage cp natality_sparkml.py gs://bucket-name
      

    3. 從 Dataproc 的「Submit a job」(提交工作) 頁面執行迴歸。

      1. 在「Main python file」(主要 Python 檔案) 欄位中,插入 natality_sparkml.py 檔案副本所在 Cloud Storage 值區的 gs:// URI。

      2. 選取 PySpark 做為「Job type」(工作類型)

      3. 在「Jar files」(Jar 檔案) 欄位中插入 gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar。這會讓 spark-bigquery-connector 在執行階段可供 PySpark 應用程式使用,以便將 BigQuery 資料讀取至 Spark DataFrame。

      4. 填寫「Job ID」(工作 ID)、「Region」(區域) 和「Cluster」(叢集) 欄位。

      5. 按一下「提交」,即可在叢集上執行工作。

    工作完成後,線性迴歸輸出模型摘要會顯示在 Dataproc 工作詳細資料視窗中。

    gcloud

    1. 複製下列程式碼,然後貼到本機電腦上的新 natality_sparkml.py 檔案中。

      """Run a linear regression using Apache Spark ML.
      
      In the following PySpark (Spark Python API) code, we take the following actions:
      
        * Load a previously created linear regression (BigQuery) input table
          into our Cloud Dataproc Spark cluster as an RDD (Resilient
          Distributed Dataset)
        * Transform the RDD into a Spark Dataframe
        * Vectorize the features on which the model will be trained
        * Compute a linear regression using Spark ML
      
      """
      from pyspark.context import SparkContext
      from pyspark.ml.linalg import Vectors
      from pyspark.ml.regression import LinearRegression
      from pyspark.sql.session import SparkSession
      # The imports, above, allow us to access SparkML features specific to linear
      # regression as well as the Vectors types.
      
      
      # Define a function that collects the features of interest
      # (mother_age, father_age, and gestation_weeks) into a vector.
      # Package the vector in a tuple containing the label (`weight_pounds`) for that
      # row.
      def vector_from_inputs(r):
        return (r["weight_pounds"], Vectors.dense(float(r["mother_age"]),
                                                  float(r["father_age"]),
                                                  float(r["gestation_weeks"]),
                                                  float(r["weight_gain_pounds"]),
                                                  float(r["apgar_5min"])))
      
      sc = SparkContext()
      spark = SparkSession(sc)
      
      # Read the data from BigQuery as a Spark Dataframe.
      natality_data = spark.read.format("bigquery").option(
          "table", "natality_regression.regression_input").load()
      # Create a view so that Spark SQL queries can be run against the data.
      natality_data.createOrReplaceTempView("natality")
      
      # As a precaution, run a query in Spark SQL to ensure no NULL values exist.
      sql_query = """
      SELECT *
      from natality
      where weight_pounds is not null
      and mother_age is not null
      and father_age is not null
      and gestation_weeks is not null
      """
      clean_data = spark.sql(sql_query)
      
      # Create an input DataFrame for Spark ML using the above function.
      training_data = clean_data.rdd.map(vector_from_inputs).toDF(["label",
                                                                   "features"])
      training_data.cache()
      
      # Construct a new LinearRegression object and fit the training data.
      lr = LinearRegression(maxIter=5, regParam=0.2, solver="normal")
      model = lr.fit(training_data)
      # Print the model summary.
      print("Coefficients:" + str(model.coefficients))
      print("Intercept:" + str(model.intercept))
      print("R^2:" + str(model.summary.r2))
      model.summary.residuals.show()

    2. 將本機的 natality_sparkml.py 檔案複製到專案的 Cloud Storage bucket。

      gcloud storage cp natality_sparkml.py gs://bucket-name
      

    3. 在本機電腦的終端機視窗中執行下列 gcloud 指令,將 Pyspark 工作提交至 Dataproc 服務。

      1. --jars 標記值會在執行階段提供 spark-bigquery-connector 給 PySpark 作業,讓作業將 BigQuery 資料讀取至 Spark DataFrame。
        gcloud dataproc jobs submit pyspark \
            gs://your-bucket/natality_sparkml.py \
            --cluster=cluster-name \
            --region=region \
            --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_SCALA_VERSION-CONNECTOR_VERSION.jar
        

    工作完成之後,線性迴歸輸出 (模型摘要) 會出現在終端機視窗中。

    <<< # Print the model summary.
    ... print "Coefficients:" + str(model.coefficients)
    Coefficients:[0.0166657454602,-0.00296751984046,0.235714392936,0.00213002070133,-0.00048577251587]
    <<< print "Intercept:" + str(model.intercept)
    Intercept:-2.26130330748
    <<< print "R^2:" + str(model.summary.r2)
    R^2:0.295200579035
    <<< model.summary.residuals.show()
    +--------------------+
    |           residuals|
    +--------------------+
    | -0.7234737533344147|
    |  -0.985466980630501|
    | -0.6669710598385468|
    |  1.4162434829714794|
    |-0.09373154375186754|
    |-0.15461747949235072|
    | 0.32659061654192545|
    |  1.5053877697929803|
    |  -0.640142797263989|
    |   1.229530260294963|
    |-0.03776160295256...|
    | -0.5160734239126814|
    | -1.5165972740062887|
    |  1.3269085258245008|
    |  1.7604670124710626|
    |  1.2348130901905972|
    |   2.318660276655887|
    |  1.0936947030883175|
    |  1.0169768511417363|
    | -1.7744915698181583|
    +--------------------+
    only showing top 20 rows.
    
      
    

    Clean up

    After you finish the tutorial, you can clean up the resources that you created so that they stop using quota and incurring charges. The following sections describe how to delete or turn off these resources.

    Delete the project

    The easiest way to eliminate billing is to delete the project that you created for the tutorial.

    To delete the project:

    1. In the Google Cloud console, go to the Manage resources page.

      Go to Manage resources

    2. In the project list, select the project that you want to delete, and then click Delete.
    3. In the dialog, type the project ID, and then click Shut down to delete the project.
    4. 刪除 Dataproc 叢集

      請參閱刪除叢集說明。

    後續步驟