機械学習で Cloud Dataproc、BigQuery、Apache Spark ML を使用する

Apache Spark 用の Google BigQuery コネクタを使用すると、データ サイエンティストは、BigQuery のシームレスでスケーラブルな SQL エンジンの能力と Apache Spark の機械学習機能を融合できます。このチュートリアルでは、Cloud Dataproc、BigQuery、Apache Spark ML を使用してデータセットで機械学習を実施する方法を示します。

目標

線形回帰を使用して次の 5 つの要素に基づく出生時体重のモデルを構築します。

  1. 妊娠週
  2. 母親の年齢
  3. 父親の年齢
  4. 母親の妊娠中の体重増加
  5. アプガースコア

Google Cloud Platform プロジェクトに書き込まれる線形回帰入力テーブルの作成には、BigQuery が使用されます。BigQuery でのデータのクエリと管理には、ローカルマシン上で動作している Python が使用されます。結果の線形回帰テーブルへのアクセスには Apache Spark が使用され、モデルの構築と評価には Spark ML が使用されます。Cloud Dataproc クラスタ内のマスター VM 上で動作している PySpark が Spark ML 関数の呼び出しに使用されます。

費用

このチュートリアルでは、以下を含む、Google Cloud Platform の課金対象となるコンポーネントを使用します。

  • Google Compute Engine
  • Google Cloud Dataproc
  • Google BigQuery

料金計算ツールを使用すると、想定される利用方法に基づいて費用の見積もりを作成できます。 Cloud Platform を初めて使用する方は、無料トライアルをご利用いただけます。

始める前に

Cloud Dataproc クラスタには、Spark ML などの Spark コンポーネントがインストールされています。Cloud Dataproc クラスタをセットアップしてこの例で示すコードを実行するには、以下を行う(またはすでに行っている)必要があります。

  1. Google アカウントにログインします。

    Google アカウントをまだお持ちでない場合は、新しいアカウントを登録します。

  2. GCP プロジェクトを選択または作成します。

    [リソースの管理] ページに移動

  3. Google Cloud Dataproc、BigQuery、Google Compute Engine API を有効にします。

    APIを有効にする

  4. Cloud SDK をインストールして初期化します。
  5. プロジェクト内で Cloud Dataproc クラスタを作成します。クラスタは、Spark 2.0 以降の Cloud Dataproc バージョン(機械学習ライブラリが含まれているバージョン)を実行している必要があります。
  6. Cloud Dataproc バージョン 1.3 以降(サポートされている Cloud Dataproc バージョンを参照)を使用している場合は、BigQuery コネクタをインストールします。

BigQuery natality データのサブセットを作成する

このセクションでは、プロジェクトにデータセットを作成し、そのデータセットにテーブルを作成して、公表されている出生率 BigQuery データセットから出生率データのサブセットをコピーします。このチュートリアルの後半では、このテーブルのサブセット データを使用して、母親の年齢、父親の年齢、妊娠週に基づいて出生体重を予測します。

Console

  1. プロジェクトにデータセットを作成します。
    1. BigQuery ウェブ UI に移動します
    2. 左側のナビゲーション ペインで、プロジェクト名の横の下矢印アイコン 下矢印アイコン をクリックし、[Create new dataset] をクリックします。
    3. [Create Dataset] ダイアログで、次の操作を行います。
      • [Dataset ID] に「natality_regression」と入力します。
      • [Data location] で、データセットのロケーションを選択できます。デフォルト値のロケーションは US multi-region です。データセットを作成した後にロケーションを変更することはできません。
      • [Default table expiration] で、以下のいずれかのオプションを選択します。
        • Never: (デフォルト)データセット内に作成されたテーブルが自動的に削除されることはありません。テーブルを削除する場合は、手動で削除する必要があります。
        • Number of days: データセット内に作成されたテーブルは、作成時点から指定した日数後に削除されます。この値が適用されるのは、テーブルの作成時にテーブル有効期限を設定しなかった場合です。 データセットを作成
    4. [OK] をクリックしてデータセットを作成します。
  2. 公表されている出生率データセットに対してクエリを実行し、クエリ結果をデータセット内の新しいテーブルに保存します。
    1. [COMPOSE QUERY] をクリックし、次のクエリをコピーして、クエリエディタに貼り付けます。
      SELECT
        weight_pounds,
        mother_age,
        father_age,
        gestation_weeks,
        weight_gain_pounds,
        apgar_5min
      FROM
        `bigquery-public-data.samples.natality`
      WHERE
        weight_pounds IS NOT NULL
        AND mother_age IS NOT NULL
        AND father_age IS NOT NULL
        AND gestation_weeks IS NOT NULL
        AND weight_gain_pounds IS NOT NULL
        AND apgar_5min IS NOT NULL
      
    2. [Show Options] をクリックし、標準 SQL を使用するために [Use Legacy SQL] チェックボックスをオフにします。
    3. [RUN QUERY] をクリックします。
    4. クエリが完了したら(約 1 分後)、[Query results] パネルで、[SAVE AS] → [Save as table] を選択します。
    5. [Save as table] ダイアログに必要事項を入力して、natality_regression データセット内の「regression_input」テーブルにクエリ結果を保存します。[Save] をクリックします。

Python

  1. Python および Python 用 Google Cloud クライアント ライブラリ(コードの実行に必要)をインストールする方法については、Python 開発環境の設定をご覧ください。Python virtualenv をインストールして使用することをおすすめします。
  2. 次の natality_tutorial.py コードをコピーして、ローカルマシンの python シェルに貼り付けます。シェルで <return> キーを押してコードを実行して、デフォルトの GCP プロジェクトに「natality_regression」BigQuery データセットを作成し、「regression_input」テーブルに公表されている natality データのサブセットを入力します。
    """Create a Google BigQuery linear regression input table.
    
    In the code below, the following actions are taken:
    * A new dataset is created "natality_regression."
    * A query is run against the public dataset,
        bigquery-public-data.samples.natality, selecting only the data of
        interest to the regression, the output of which is stored in a new
        "regression_input" table.
    * The output table is moved over the wire to the user's default project via
        the built-in BigQuery Connector for Spark that bridges BigQuery and
        Cloud Dataproc.
    """
    
    from google.cloud import bigquery
    
    # Create a new Google BigQuery client using Google Cloud Platform project
    # defaults.
    client = bigquery.Client()
    
    # Prepare a reference to a new dataset for storing the query results.
    dataset_ref = client.dataset('natality_regression')
    dataset = bigquery.Dataset(dataset_ref)
    
    # Create the new BigQuery dataset.
    dataset = client.create_dataset(dataset)
    
    # In the new BigQuery dataset, create a reference to a new table for
    # storing the query results.
    table_ref = dataset.table('regression_input')
    
    # Configure the query job.
    job_config = bigquery.QueryJobConfig()
    
    # Set the destination table to the table reference created above.
    job_config.destination = table_ref
    
    # Set up a query in Standard SQL, which is the default for the BigQuery
    # Python client library.
    # The query selects the fields of interest.
    query = """
        SELECT
            weight_pounds, mother_age, father_age, gestation_weeks,
            weight_gain_pounds, apgar_5min
        FROM
            `bigquery-public-data.samples.natality`
        WHERE
            weight_pounds IS NOT NULL
            AND mother_age IS NOT NULL
            AND father_age IS NOT NULL
            AND gestation_weeks IS NOT NULL
            AND weight_gain_pounds IS NOT NULL
            AND apgar_5min IS NOT NULL
    """
    
    # Run the query.
    query_job = client.query(query, job_config=job_config)
    query_job.result()  # Waits for the query to finish

線形回帰を実行する

このセクションでは、Google Cloud Platform Console または Cloud Dataproc クラスタのマスターノードから PySpark 線形回帰を実行します。

Console

  1. 次のコードをコピーして、ローカルマシン上の新しい natality_sparkml.py ファイルに貼り付けます。
    """Run a linear regression using Apache Spark ML.
    
    In the following PySpark (Spark Python API) code, we take the following actions:
    
      * Load a previously created linear regression (BigQuery) input table
        into our Cloud Dataproc Spark cluster as an RDD (Resilient
        Distributed Dataset)
      * Transform the RDD into a Spark Dataframe
      * Vectorize the features on which the model will be trained
      * Compute a linear regression using Spark ML
    
    """
    
    from datetime import datetime
    from pyspark.context import SparkContext
    from pyspark.ml.linalg import Vectors
    from pyspark.ml.regression import LinearRegression
    from pyspark.sql.session import SparkSession
    # The imports, above, allow us to access SparkML features specific to linear
    # regression as well as the Vectors types.
    
    # Define a function that collects the features of interest
    # (mother_age, father_age, and gestation_weeks) into a vector.
    # Package the vector in a tuple containing the label (`weight_pounds`) for that
    # row.
    
    def vector_from_inputs(r):
      return (r["weight_pounds"], Vectors.dense(float(r["mother_age"]),
                                                float(r["father_age"]),
                                                float(r["gestation_weeks"]),
                                                float(r["weight_gain_pounds"]),
                                                float(r["apgar_5min"])))
    
    # Use Cloud Dataprocs automatically propagated configurations to get
    # the Cloud Storage bucket and Google Cloud Platform project for this
    # cluster.
    sc = SparkContext()
    spark = SparkSession(sc)
    bucket = spark._jsc.hadoopConfiguration().get("fs.gs.system.bucket")
    project = spark._jsc.hadoopConfiguration().get("fs.gs.project.id")
    
    # Set an input directory for reading data from Bigquery.
    todays_date = datetime.strftime(datetime.today(), "%Y-%m-%d-%H-%M-%S")
    input_directory = "gs://{}/tmp/natality-{}".format(bucket, todays_date)
    
    # Set the configuration for importing data from BigQuery.
    # Specifically, make sure to set the project ID and bucket for Cloud Dataproc,
    # and the project ID, dataset, and table names for BigQuery.
    
    conf = {
        # Input Parameters
        "mapred.bq.project.id": project,
        "mapred.bq.gcs.bucket": bucket,
        "mapred.bq.temp.gcs.path": input_directory,
        "mapred.bq.input.project.id": project,
        "mapred.bq.input.dataset.id": "natality_regression",
        "mapred.bq.input.table.id": "regression_input",
    }
    
    # Read the data from BigQuery into Spark as an RDD.
    table_data = spark.sparkContext.newAPIHadoopRDD(
        "com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat",
        "org.apache.hadoop.io.LongWritable",
        "com.google.gson.JsonObject",
        conf=conf)
    
    # Extract the JSON strings from the RDD.
    table_json = table_data.map(lambda x: x[1])
    
    # Load the JSON strings as a Spark Dataframe.
    natality_data = spark.read.json(table_json)
    # Create a view so that Spark SQL queries can be run against the data.
    natality_data.createOrReplaceTempView("natality")
    
    # As a precaution, run a query in Spark SQL to ensure no NULL values exist.
    sql_query = """
    SELECT *
    from natality
    where weight_pounds is not null
    and mother_age is not null
    and father_age is not null
    and gestation_weeks is not null
    """
    clean_data = spark.sql(sql_query)
    
    # Create an input DataFrame for Spark ML using the above function.
    training_data = clean_data.rdd.map(vector_from_inputs).toDF(["label",
                                                                 "features"])
    training_data.cache()
    
    # Construct a new LinearRegression object and fit the training data.
    lr = LinearRegression(maxIter=5, regParam=0.2, solver="normal")
    model = lr.fit(training_data)
    # Print the model summary.
    print "Coefficients:" + str(model.coefficients)
    print "Intercept:" + str(model.intercept)
    print "R^2:" + str(model.summary.r2)
    model.summary.residuals.show()
    
        
  2. ローカルの natality_sparkml.py ファイルをプロジェクトの Cloud Storage バケットにコピーします。
    gsutil cp natality_sparkml.py gs://bucket-name
    
  3. Cloud Dataproc の [ジョブを送信] ページから回帰を行います。natality_sparkml.py が存在する Cloud Storage バケットの gs:// URI を挿入します。[ジョブタイプ] として [PySpark] を選択します。[送信] をクリックして、クラスタでジョブを実行します。
    ジョブが完了すると、線形回帰出力モデルの概要が Cloud Dataproc の [ジョブの詳細] ウィンドウに表示されます。

PySpark

  1. Cloud Dataproc クラスタのマスターノードに SSH で接続します。
    1. プロジェクトの [Cloud Dataproc] → [クラスタ] ページに移動します。
    2. クラスタ名をクリックして、[クラスタの詳細] ページを開きます。次に、[VM インスタンス] タブをクリックし、[SSH] → [ブラウザ ウィンドウで開く] を選択します。
    3. マスター インスタンスに接続されたターミナル ウィンドウがブラウザに表示されます。
  2. pyspark を実行して PySpark シェルを開きます。
  3. 次のコードをコピーして shell に貼り付け、Enter キーを押して Spark ML 線形回帰を実行します。
    """Run a linear regression using Apache Spark ML.
    
    In the following PySpark (Spark Python API) code, we take the following actions:
    
      * Load a previously created linear regression (BigQuery) input table
        into our Cloud Dataproc Spark cluster as an RDD (Resilient
        Distributed Dataset)
      * Transform the RDD into a Spark Dataframe
      * Vectorize the features on which the model will be trained
      * Compute a linear regression using Spark ML
    
    """
    
    from datetime import datetime
    from pyspark.context import SparkContext
    from pyspark.ml.linalg import Vectors
    from pyspark.ml.regression import LinearRegression
    from pyspark.sql.session import SparkSession
    # The imports, above, allow us to access SparkML features specific to linear
    # regression as well as the Vectors types.
    
    # Define a function that collects the features of interest
    # (mother_age, father_age, and gestation_weeks) into a vector.
    # Package the vector in a tuple containing the label (`weight_pounds`) for that
    # row.
    
    def vector_from_inputs(r):
      return (r["weight_pounds"], Vectors.dense(float(r["mother_age"]),
                                                float(r["father_age"]),
                                                float(r["gestation_weeks"]),
                                                float(r["weight_gain_pounds"]),
                                                float(r["apgar_5min"])))
    
    # Use Cloud Dataprocs automatically propagated configurations to get
    # the Cloud Storage bucket and Google Cloud Platform project for this
    # cluster.
    sc = SparkContext()
    spark = SparkSession(sc)
    bucket = spark._jsc.hadoopConfiguration().get("fs.gs.system.bucket")
    project = spark._jsc.hadoopConfiguration().get("fs.gs.project.id")
    
    # Set an input directory for reading data from Bigquery.
    todays_date = datetime.strftime(datetime.today(), "%Y-%m-%d-%H-%M-%S")
    input_directory = "gs://{}/tmp/natality-{}".format(bucket, todays_date)
    
    # Set the configuration for importing data from BigQuery.
    # Specifically, make sure to set the project ID and bucket for Cloud Dataproc,
    # and the project ID, dataset, and table names for BigQuery.
    
    conf = {
        # Input Parameters
        "mapred.bq.project.id": project,
        "mapred.bq.gcs.bucket": bucket,
        "mapred.bq.temp.gcs.path": input_directory,
        "mapred.bq.input.project.id": project,
        "mapred.bq.input.dataset.id": "natality_regression",
        "mapred.bq.input.table.id": "regression_input",
    }
    
    # Read the data from BigQuery into Spark as an RDD.
    table_data = spark.sparkContext.newAPIHadoopRDD(
        "com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat",
        "org.apache.hadoop.io.LongWritable",
        "com.google.gson.JsonObject",
        conf=conf)
    
    # Extract the JSON strings from the RDD.
    table_json = table_data.map(lambda x: x[1])
    
    # Load the JSON strings as a Spark Dataframe.
    natality_data = spark.read.json(table_json)
    # Create a view so that Spark SQL queries can be run against the data.
    natality_data.createOrReplaceTempView("natality")
    
    # As a precaution, run a query in Spark SQL to ensure no NULL values exist.
    sql_query = """
    SELECT *
    from natality
    where weight_pounds is not null
    and mother_age is not null
    and father_age is not null
    and gestation_weeks is not null
    """
    clean_data = spark.sql(sql_query)
    
    # Create an input DataFrame for Spark ML using the above function.
    training_data = clean_data.rdd.map(vector_from_inputs).toDF(["label",
                                                                 "features"])
    training_data.cache()
    
    # Construct a new LinearRegression object and fit the training data.
    lr = LinearRegression(maxIter=5, regParam=0.2, solver="normal")
    model = lr.fit(training_data)
    # Print the model summary.
    print "Coefficients:" + str(model.coefficients)
    print "Intercept:" + str(model.intercept)
    print "R^2:" + str(model.summary.r2)
    model.summary.residuals.show()
    
        
    ジョブが完了すると、線形回帰出力(モデル概要)がターミナル ウィンドウに表示されます。
    <<< # Print the model summary.
    ... print "Coefficients:" + str(model.coefficients)
    Coefficients:[0.0166657454602,-0.00296751984046,0.235714392936,0.00213002070133,-0.00048577251587]
    <<< print "Intercept:" + str(model.intercept)
    Intercept:-2.26130330748
    <<< print "R^2:" + str(model.summary.r2)
    R^2:0.295200579035
    <<< model.summary.residuals.show()
    +--------------------+
    |           residuals|
    +--------------------+
    | -0.7234737533344147|
    |  -0.985466980630501|
    | -0.6669710598385468|
    |  1.4162434829714794|
    |-0.09373154375186754|
    |-0.15461747949235072|
    | 0.32659061654192545|
    |  1.5053877697929803|
    |  -0.640142797263989|
    |   1.229530260294963|
    |-0.03776160295256...|
    | -0.5160734239126814|
    | -1.5165972740062887|
    |  1.3269085258245008|
    |  1.7604670124710626|
    |  1.2348130901905972|
    |   2.318660276655887|
    |  1.0936947030883175|
    |  1.0169768511417363|
    | -1.7744915698181583|
    +--------------------+
    only showing top 20 rows.
    

クリーンアップ

「機械学習で Cloud Dataproc、BigQuery、Spark ML を使用する」のチュートリアルが終了したら、GCP で作成したリソースをクリーンアップして、それらのリソースが占める割り当て量によって今後料金が発生しないようにします。以下のセクションで、このようなリソースを削除または無効にする方法を説明します。

プロジェクトの削除

課金を停止する最も簡単な方法は、チュートリアル用に作成したプロジェクトを削除することです。

プロジェクトを削除するには:

  1. GCP Console で [プロジェクト] ページに移動します。

    プロジェクト ページに移動

  2. プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
  3. ダイアログでプロジェクト ID を入力し、[シャットダウン] をクリックしてプロジェクトを削除します。

Cloud Dataproc クラスタの削除

クラスタの削除をご覧ください。

Cloud Storage データの削除

プロジェクトの Cloud Storage バケットから一時的な BigQuery 入力データファイルを削除するには:

  1. 次のコマンドを実行して、一時データファイルの保存に使用されたプロジェクト内のバケットの名前を取得します(このバケットは「dataproc-」という文字列で始まります)。次のように、クラスタの名前を挿入します。
    CLUSTER_NAME=<my cluster>
    TEMP_DATA_BUCKET="$(gcloud dataproc clusters describe $CLUSTER_NAME| \
    grep configBucket|awk '{print $2}')"
    echo $TEMP_DATA_BUCKET
    
  2. プロジェクトの [Cloud Dataproc] → [ブラウザ] ページに移動してから、TEMP_DATA_BUCKET 名をクリックします。tmp/ フォルダ内で、名前が「natality-」で始まるフォルダが表示されます(残りの文字は、そのフォルダが作成された日時を表します)。これが削除する一時入力データフォルダです。natality- フォルダ名の左側にあるチェックボックスをオンにしてから、ページの一番上にある [削除] を選択して Cloud Storage からフォルダを削除します。
このページは役立ちましたか?評価をお願いいたします。

フィードバックを送信...

Cloud Dataproc ドキュメント
ご不明な点がありましたら、Google のサポートページをご覧ください。