머신러닝에 Dataproc, BigQuery 및 Apache Spark ML 사용

데이터 과학자는 Apache SparkBigQuery 커넥터를 사용하여 BigQuery의 원활하게 확장 가능한 SQL 엔진의 이점을 Apache Spark의 머신러닝 기능과 통합할 수 있습니다. 이 가이드에서는 Dataproc, BigQuery, Apache Spark ML을 사용하여 데이터 세트에서 머신러닝을 수행하는 방법에 대해 설명합니다.

목표

선형 회귀를 사용하면 다음 5개 요인에 대한 함수로 출생 시 체중 모델을 구축할 수 있습니다.

  1. 임신 주수
  2. 모체의 연령
  3. 부체의 연령
  4. 임신 기간 동안 모체의 체중 증가
  5. Apgar 점수

BigQuery는 Google Cloud Platform 프로젝트에 기록되는 선형 회귀 출력 테이블을 준비하는 데 사용됩니다. Python은 BigQuery에서 데이터를 쿼리하고 관리하는 데 사용됩니다. 그 결과로 나타나는 선형 회귀 테이블은 Apache Spark에서 액세스하고, Spark ML은 모델을 구축하고 평가하는 데 사용됩니다. Dataproc PySpark 작업은 Spark ML 함수를 호출하는 데 사용됩니다.

비용

이 가이드에서는 비용이 청구될 수 있는 다음과 같은 Google Cloud Platform 구성요소를 사용합니다.

  • Compute Engine
  • Dataproc
  • BigQuery

가격 계산기를 사용하여 예상 사용량을 토대로 예상 비용을 산출합니다. 새 Cloud Platform 사용자는 무료 체험판을 사용할 수 있습니다.

시작하기 전에

Dataproc 클러스터에는 Spark 구성요소(Spark ML 포함)가 설치되어 있습니다. 이 예에서 Dataproc 클러스터를 설정하고 코드를 실행하려면 다음을 수행해야 합니다.

  1. Google 계정으로 로그인합니다.

    아직 계정이 없으면 새 계정을 등록하세요.

  2. Google Cloud Console의 프로젝트 선택기 페이지에서 Google Cloud 프로젝트를 선택하거나 만듭니다.

    프로젝트 선택기 페이지로 이동

  3. Dataproc, BigQuery, Compute Engine API를 사용 설정합니다.

    API 사용 설정

  4. Cloud SDK 설치 및 초기화
  5. 프로젝트에서 Dataproc 클러스터를 만듭니다. 클러스터는 Spark 2.0 이상을 갖춘 Dataproc 버전을 실행해야 합니다(머신러닝 라이브러리 포함).

BigQuery natality 데이터 하위 집합 만들기

이 섹션에서는 프로젝트에서 데이터세트를 만든 후에, 공개적으로 사용 가능한 natality BigQuery 데이터세트의 출생 비율 데이터 하위 집합을 복사할 테이블을 데이터세트 안에 만듭니다. 이 가이드의 뒷부분에서는 이 테이블에 있는 하위 집합 데이터를 사용하여 모친 연령, 부친 연령, 임신 주수를 기준으로 출생 시 체중을 예측할 것입니다.

Google Cloud Console을 사용하거나 로컬 머신에서 Python 스크립트를 실행하여 데이터 하위 집합을 만들 수 있습니다.

콘솔

  1. 프로젝트에서 데이터세트를 만듭니다.

    1. BigQuery 웹 UI로 이동합니다.
    2. 왼쪽 탐색 창에서 프로젝트 이름을 클릭한 다음 데이터 세트 만들기를 클릭합니다.
    3. 데이터 세트 만들기 대화상자에서 다음을 수행합니다.
      1. 데이터세트 ID에 'natality_regression'을 입력합니다.
      2. 데이터 위치에서 데이터 세트의 위치를 선택할 수 있습니다. 기본값 위치는 US multi-region입니다. 데이터세트를 만든 후에는 위치를 변경할 수 없습니다.
      3. 기본 테이블 만료 시간에서 다음 옵션 중 하나를 선택합니다.
        • 사용 안 함(기본값): 테이블을 수동으로 삭제해야 합니다.
        • 일수: 테이블이 생성 시간으로부터 지정된 일 수가 지난 후에 삭제됩니다.
      4. 암호화에서 다음 옵션 중 하나를 선택합니다.
      5. 데이터세트 만들기를 클릭합니다.
  2. 공개 출생률 데이터 세트에 대해 쿼리를 실행한 후에 쿼리 결과를 데이터 세트의 새 테이블에 저장합니다.

    1. 다음 쿼리를 복사하여 쿼리 편집기에 붙여넣은 후에 실행을 클릭합니다.
      SELECT
      weight_pounds,
      mother_age,
      father_age,
      gestation_weeks,
      weight_gain_pounds,
      apgar_5min
      FROM
      `bigquery-public-data.samples.natality`
      WHERE
      weight_pounds IS NOT NULL
      AND mother_age IS NOT NULL
      AND father_age IS NOT NULL
      AND gestation_weeks IS NOT NULL
      AND weight_gain_pounds IS NOT NULL
      AND apgar_5min IS NOT NULL
      
    2. 쿼리가 완료된 후에(약 1분 후에) 결과 저장을 클릭한 다음 저장 옵션을 선택하여 결과를 프로젝트의 natality_regression 데이터 세트에 'regression_input' BigQuery 테이블로 저장합니다.

Python

  1. Python과 Python용 Google Cloud 클라이언트 라이브러리(코드를 실행하는 데 필요함)를 설치하는 방법은 Python 개발 환경 설정을 참조합니다. Python virtualenv를 설치하고 사용하는 것이 좋습니다.

  2. 아래의 natality_tutorial.py 코드를 복사하여 로컬 머신의 python 셸에 붙여넣습니다. 셸에 있는 <return> 키를 눌러 코드를 실행함으로써 기본 Google Cloud 프로젝트에서 'natality_regression' BigQuery 데이터 세트를 만들고, 'regression_input' 테이블이 공개 natality 데이터의 하위 집합으로 채워지도록 합니다.

    """Create a Google BigQuery linear regression input table.
    
    In the code below, the following actions are taken:
    * A new dataset is created "natality_regression."
    * A query is run against the public dataset,
        bigquery-public-data.samples.natality, selecting only the data of
        interest to the regression, the output of which is stored in a new
        "regression_input" table.
    * The output table is moved over the wire to the user's default project via
        the built-in BigQuery Connector for Spark that bridges BigQuery and
        Cloud Dataproc.
    """
    
    from google.cloud import bigquery
    
    # Create a new Google BigQuery client using Google Cloud Platform project
    # defaults.
    client = bigquery.Client()
    
    # Prepare a reference to a new dataset for storing the query results.
    dataset_id = "natality_regression"
    
    dataset = bigquery.Dataset(client.dataset(dataset_id))
    
    # Create the new BigQuery dataset.
    dataset = client.create_dataset(dataset)
    
    # In the new BigQuery dataset, create a reference to a new table for
    # storing the query results.
    table_ref = dataset.table("regression_input")
    
    # Configure the query job.
    job_config = bigquery.QueryJobConfig()
    
    # Set the destination table to the table reference created above.
    job_config.destination = table_ref
    
    # Set up a query in Standard SQL, which is the default for the BigQuery
    # Python client library.
    # The query selects the fields of interest.
    query = """
        SELECT
            weight_pounds, mother_age, father_age, gestation_weeks,
            weight_gain_pounds, apgar_5min
        FROM
            `bigquery-public-data.samples.natality`
        WHERE
            weight_pounds IS NOT NULL
            AND mother_age IS NOT NULL
            AND father_age IS NOT NULL
            AND gestation_weeks IS NOT NULL
            AND weight_gain_pounds IS NOT NULL
            AND apgar_5min IS NOT NULL
    """
    
    # Run the query.
    query_job = client.query(query, job_config=job_config)
    query_job.result()  # Waits for the query to finish
  3. natality_regression 데이터 세트와 regression_input 테이블이 만들어졌는지 확인합니다.

선형 회귀 실행

이 섹션에서 Coogle Cloud Console을 사용하여 작업을 Dataproc 서비스에 제출하거나 로컬 터미널에서 gcloud 명령어를 실행하여 PySpark 선형 회귀를 실행합니다.

콘솔

  1. 다음 코드를 복사하여 로컬 머신의 새 natality_sparkml.py 파일에 붙여넣습니다.

    """Run a linear regression using Apache Spark ML.
    
    In the following PySpark (Spark Python API) code, we take the following actions:
    
      * Load a previously created linear regression (BigQuery) input table
        into our Cloud Dataproc Spark cluster as an RDD (Resilient
        Distributed Dataset)
      * Transform the RDD into a Spark Dataframe
      * Vectorize the features on which the model will be trained
      * Compute a linear regression using Spark ML
    
    """
    
    from __future__ import print_function
    from pyspark.context import SparkContext
    from pyspark.ml.linalg import Vectors
    from pyspark.ml.regression import LinearRegression
    from pyspark.sql.session import SparkSession
    # The imports, above, allow us to access SparkML features specific to linear
    # regression as well as the Vectors types.
    
    # Define a function that collects the features of interest
    # (mother_age, father_age, and gestation_weeks) into a vector.
    # Package the vector in a tuple containing the label (`weight_pounds`) for that
    # row.
    def vector_from_inputs(r):
      return (r["weight_pounds"], Vectors.dense(float(r["mother_age"]),
                                                float(r["father_age"]),
                                                float(r["gestation_weeks"]),
                                                float(r["weight_gain_pounds"]),
                                                float(r["apgar_5min"])))
    
    sc = SparkContext()
    spark = SparkSession(sc)
    
    # Read the data from BigQuery as a Spark Dataframe.
    natality_data = spark.read.format("bigquery").option(
        "table", "natality_regression.regression_input").load()
    # Create a view so that Spark SQL queries can be run against the data.
    natality_data.createOrReplaceTempView("natality")
    
    # As a precaution, run a query in Spark SQL to ensure no NULL values exist.
    sql_query = """
    SELECT *
    from natality
    where weight_pounds is not null
    and mother_age is not null
    and father_age is not null
    and gestation_weeks is not null
    """
    clean_data = spark.sql(sql_query)
    
    # Create an input DataFrame for Spark ML using the above function.
    training_data = clean_data.rdd.map(vector_from_inputs).toDF(["label",
                                                                 "features"])
    training_data.cache()
    
    # Construct a new LinearRegression object and fit the training data.
    lr = LinearRegression(maxIter=5, regParam=0.2, solver="normal")
    model = lr.fit(training_data)
    # Print the model summary.
    print("Coefficients:" + str(model.coefficients))
    print("Intercept:" + str(model.intercept))
    print("R^2:" + str(model.summary.r2))
    model.summary.residuals.show()
    
    

  2. 로컬 natality_sparkml.py 파일을 프로젝트의 Cloud Storage 버킷에 복사합니다.

    gsutil cp natality_sparkml.py gs://bucket-name
    

  3. Dataproc 작업 제출 페이지에서 회귀를 실행합니다.

    1. 기본 python 파일 필드에 natality_sparkml.py 파일의 사본이 있는 Cloud Storage 버킷의 gs:// URI를 입력합니다.

    2. 작업 유형으로 PySpark를 선택합니다.

    3. JAR 파일필드에 gs://spark-lib/bigquery/spark-bigquery-latest.jar을 입력합니다. 이렇게 하면 BigQuery 데이터를 Spark DataFrame으로 읽을 수 있도록 런타임 시 PySpark 애플리케이션에서 Spark-bigquery 커넥터를 사용할 수 있습니다.

    4. 작업 ID, 리전, 클러스터 필드를 입력합니다.

    5. 만들기를 클릭하여 클러스터에서 작업을 실행합니다.

작업이 완료되면 Dataproc 작업 세부정보 창에 선형 회귀 출력 모델 요약이 나타납니다.

gcloud

  1. 다음 코드를 복사하여 로컬 머신의 새 natality_sparkml.py 파일에 붙여넣습니다.

    """Run a linear regression using Apache Spark ML.
    
    In the following PySpark (Spark Python API) code, we take the following actions:
    
      * Load a previously created linear regression (BigQuery) input table
        into our Cloud Dataproc Spark cluster as an RDD (Resilient
        Distributed Dataset)
      * Transform the RDD into a Spark Dataframe
      * Vectorize the features on which the model will be trained
      * Compute a linear regression using Spark ML
    
    """
    
    from __future__ import print_function
    from pyspark.context import SparkContext
    from pyspark.ml.linalg import Vectors
    from pyspark.ml.regression import LinearRegression
    from pyspark.sql.session import SparkSession
    # The imports, above, allow us to access SparkML features specific to linear
    # regression as well as the Vectors types.
    
    # Define a function that collects the features of interest
    # (mother_age, father_age, and gestation_weeks) into a vector.
    # Package the vector in a tuple containing the label (`weight_pounds`) for that
    # row.
    def vector_from_inputs(r):
      return (r["weight_pounds"], Vectors.dense(float(r["mother_age"]),
                                                float(r["father_age"]),
                                                float(r["gestation_weeks"]),
                                                float(r["weight_gain_pounds"]),
                                                float(r["apgar_5min"])))
    
    sc = SparkContext()
    spark = SparkSession(sc)
    
    # Read the data from BigQuery as a Spark Dataframe.
    natality_data = spark.read.format("bigquery").option(
        "table", "natality_regression.regression_input").load()
    # Create a view so that Spark SQL queries can be run against the data.
    natality_data.createOrReplaceTempView("natality")
    
    # As a precaution, run a query in Spark SQL to ensure no NULL values exist.
    sql_query = """
    SELECT *
    from natality
    where weight_pounds is not null
    and mother_age is not null
    and father_age is not null
    and gestation_weeks is not null
    """
    clean_data = spark.sql(sql_query)
    
    # Create an input DataFrame for Spark ML using the above function.
    training_data = clean_data.rdd.map(vector_from_inputs).toDF(["label",
                                                                 "features"])
    training_data.cache()
    
    # Construct a new LinearRegression object and fit the training data.
    lr = LinearRegression(maxIter=5, regParam=0.2, solver="normal")
    model = lr.fit(training_data)
    # Print the model summary.
    print("Coefficients:" + str(model.coefficients))
    print("Intercept:" + str(model.intercept))
    print("R^2:" + str(model.summary.r2))
    model.summary.residuals.show()
    
    

  2. 로컬 natality_sparkml.py 파일을 프로젝트의 Cloud Storage 버킷에 복사합니다.

    gsutil cp natality_sparkml.py gs://bucket-name
    

  3. 아래와 같이 로컬 머신의 터미널 창에서 gcloud 명령어를 실행하여 Pyspark 작업을 Dataproc 서비스에 제출합니다.

    1. --files 플래그 값의 경우, natality_sparkml.py 파일의 사본이 있는 Cloud Storage 버킷의 이름을 입력합니다.
    2. - jars 플래그 값은 BigQuery 데이터를 Spark DataFrame으로 읽을 수 있도록 런타임 시 PySpark jobv에서 Spark-bigquery 커넥터를 사용할 수 있도록 합니다.
      gcloud dataproc jobs submit pyspark \
          --cluster=cluster-name \
          --region=region \
          --files=gs://your-bucket/natality_sparkml.py \
          --jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar
      

작업이 완료되면 선형 회귀 출력(모델 요약)이 터미널 창에 나타납니다.

<<< # Print the model summary.
... print "Coefficients:" + str(model.coefficients)
Coefficients:[0.0166657454602,-0.00296751984046,0.235714392936,0.00213002070133,-0.00048577251587]
<<< print "Intercept:" + str(model.intercept)
Intercept:-2.26130330748
<<< print "R^2:" + str(model.summary.r2)
R^2:0.295200579035
<<< model.summary.residuals.show()
+--------------------+
|           residuals|
+--------------------+
| -0.7234737533344147|
|  -0.985466980630501|
| -0.6669710598385468|
|  1.4162434829714794|
|-0.09373154375186754|
|-0.15461747949235072|
| 0.32659061654192545|
|  1.5053877697929803|
|  -0.640142797263989|
|   1.229530260294963|
|-0.03776160295256...|
| -0.5160734239126814|
| -1.5165972740062887|
|  1.3269085258245008|
|  1.7604670124710626|
|  1.2348130901905972|
|   2.318660276655887|
|  1.0936947030883175|
|  1.0169768511417363|
| -1.7744915698181583|
+--------------------+
only showing top 20 rows.

삭제

머신러닝에 Dataproc, BigQuery 및 Apache Spark ML 사용 가이드를 완료한 후에는 할당량을 차지하지 않고 이후에 요금이 청구되지 않도록 Google Cloud에서 만든 리소스를 삭제할 수 있습니다. 다음 섹션에서는 리소스를 삭제하거나 사용 중지하는 방법을 설명합니다.

프로젝트 삭제

비용이 청구되지 않도록 하는 가장 쉬운 방법은 가이드에서 만든 프로젝트를 삭제하는 것입니다.

프로젝트를 삭제하는 방법은 다음과 같습니다.

  1. Cloud Console에서 리소스 관리 페이지로 이동합니다.

    리소스 관리 페이지로 이동

  2. 프로젝트 목록에서 삭제할 프로젝트를 선택하고 삭제 를 클릭합니다.
  3. 대화상자에서 프로젝트 ID를 입력한 다음 종료를 클릭하여 프로젝트를 삭제합니다.

Dataproc 클러스터 삭제

클러스터 삭제를 참조하세요.