머신러닝에 Dataproc, BigQuery 및 Apache Spark ML 사용


데이터 과학자는 Apache SparkBigQuery 커넥터를 사용하여 BigQuery의 원활하게 확장 가능한 SQL 엔진의 이점을 Apache Spark의 머신러닝 기능과 통합할 수 있습니다. 이 가이드에서는 Dataproc, BigQuery, Apache Spark ML을 사용하여 데이터 세트에서 머신러닝을 수행하는 방법에 대해 설명합니다.

목표

선형 회귀를 사용하면 다음 5개 요인에 대한 함수로 출생 시 체중 모델을 구축할 수 있습니다.

  • 임신 주수
  • 산모 연령
  • 부체의 연령
  • 임신 기간 동안 산모의 체중 증가
  • Apgar 점수

다음 도구를 사용하세요.

  • BigQuery: Google Cloud 프로젝트에 기록되는 선형 회귀 입력 테이블을 준비하는 데 사용합니다.
  • Python: BigQuery에서 데이터를 쿼리하고 관리하는 데 사용합니다.
  • Apache Spark: 결과 선형 회귀 테이블에 액세스하는 데 사용합니다.
  • Spark ML: 모델을 구축하고 평가하는 데 사용합니다.
  • Spark PySpark 작업: Spark ML 함수를 호출하는 데 사용됩니다.

비용

이 문서에서는 비용이 청구될 수 있는 다음과 같은 Google Cloud 구성요소를 사용합니다.

  • Compute Engine
  • Dataproc
  • BigQuery

프로젝트 사용량을 기준으로 예상 비용을 산출하려면 가격 계산기를 사용하세요. Google Cloud를 처음 사용하는 사용자는 무료 체험판을 사용할 수 있습니다.

시작하기 전에

Dataproc 클러스터에는 Spark 구성요소(Spark ML 포함)가 설치되어 있습니다. 이 예에서 Dataproc 클러스터를 설정하고 코드를 실행하려면 다음을 수행해야 합니다.

  1. Google Cloud 계정에 로그인합니다. Google Cloud를 처음 사용하는 경우 계정을 만들고 Google 제품의 실제 성능을 평가해 보세요. 신규 고객에게는 워크로드를 실행, 테스트, 배포하는 데 사용할 수 있는 $300의 무료 크레딧이 제공됩니다.
  2. Google Cloud Console의 프로젝트 선택기 페이지에서 Google Cloud 프로젝트를 선택하거나 만듭니다.

    프로젝트 선택기로 이동

  3. API Dataproc, BigQuery, Compute Engine 사용 설정

    API 사용 설정

  4. Google Cloud CLI를 설치합니다.
  5. gcloud CLI를 초기화하려면 다음 명령어를 실행합니다.

    gcloud init
  6. Google Cloud Console의 프로젝트 선택기 페이지에서 Google Cloud 프로젝트를 선택하거나 만듭니다.

    프로젝트 선택기로 이동

  7. API Dataproc, BigQuery, Compute Engine 사용 설정

    API 사용 설정

  8. Google Cloud CLI를 설치합니다.
  9. gcloud CLI를 초기화하려면 다음 명령어를 실행합니다.

    gcloud init
  10. 프로젝트에서 Dataproc 클러스터를 만듭니다. 클러스터는 Spark 2.0 이상을 갖춘 Dataproc 버전을 실행해야 합니다(머신러닝 라이브러리 포함).

BigQuery natality 데이터 하위 집합 만들기

이 섹션에서는 프로젝트에서 데이터 세트를 만든 후에, 공개적으로 사용 가능한 natality BigQuery 데이터 세트의 출생 비율 데이터 하위 집합을 복사할 테이블을 데이터 세트 안에 만듭니다. 이 가이드의 뒷부분에서는 이 테이블에 있는 하위 집합 데이터를 사용하여 모친 연령, 부친 연령, 임신 주수를 기준으로 출생 시 체중을 예측할 것입니다.

Google Cloud Console을 사용하거나 로컬 머신에서 Python 스크립트를 실행하여 데이터 하위 집합을 만들 수 있습니다.

Console

  1. 프로젝트에서 데이터세트를 만듭니다.

    1. BigQuery 웹 UI로 이동합니다.
    2. 왼쪽 탐색 창에서 프로젝트 이름을 클릭한 다음 데이터 세트 만들기를 클릭합니다.
    3. 데이터 세트 만들기 대화상자에서 다음을 수행합니다.
      1. 데이터 세트 ID에 'natality_regression'을 입력합니다.
      2. 데이터 위치에서 데이터 세트의 위치를 선택할 수 있습니다. 기본값 위치는 US multi-region입니다. 데이터 세트를 만든 후에는 위치를 변경할 수 없습니다.
      3. 기본 테이블 만료 시간에서 다음 옵션 중 하나를 선택합니다.
        • 사용 안 함(기본값): 테이블을 수동으로 삭제해야 합니다.
        • 일수: 테이블이 생성 시간으로부터 지정된 일 수가 지난 후에 삭제됩니다.
      4. 암호화에서 다음 옵션 중 하나를 선택합니다.
      5. 데이터 세트 만들기를 클릭합니다.
  2. 공개 출생률 데이터 세트에 대해 쿼리를 실행한 후에 쿼리 결과를 데이터 세트의 새 테이블에 저장합니다.

    1. 다음 쿼리를 복사하여 쿼리 편집기에 붙여넣은 후에 실행을 클릭합니다.
      SELECT
      weight_pounds,
      mother_age,
      father_age,
      gestation_weeks,
      weight_gain_pounds,
      apgar_5min
      FROM
      `bigquery-public-data.samples.natality`
      WHERE
      weight_pounds IS NOT NULL
      AND mother_age IS NOT NULL
      AND father_age IS NOT NULL
      AND gestation_weeks IS NOT NULL
      AND weight_gain_pounds IS NOT NULL
      AND apgar_5min IS NOT NULL
      
    2. 쿼리가 완료된 후에(약 1분 후에) 결과 저장을 클릭한 다음 저장 옵션을 선택하여 결과를 프로젝트의 natality_regression 데이터 세트에 'regression_input' BigQuery 테이블로 저장합니다.

Python

  1. Python과 Python용 Google Cloud 클라이언트 라이브러리(코드를 실행하는 데 필요함)를 설치하는 방법은 Python 개발 환경 설정을 참조합니다. Python virtualenv를 설치하고 사용하는 것이 좋습니다.

  2. 아래의 natality_tutorial.py 코드를 복사하여 로컬 머신의 python 셸에 붙여넣습니다. 셸에 있는 <return> 키를 눌러 코드를 실행함으로써 기본 Google Cloud 프로젝트에서 'natality_regression' BigQuery 데이터 세트를 만들고, 'regression_input' 테이블이 공개 natality 데이터의 하위 집합으로 채워지도록 합니다.

    """Create a Google BigQuery linear regression input table.
    
    In the code below, the following actions are taken:
    * A new dataset is created "natality_regression."
    * A query is run against the public dataset,
        bigquery-public-data.samples.natality, selecting only the data of
        interest to the regression, the output of which is stored in a new
        "regression_input" table.
    * The output table is moved over the wire to the user's default project via
        the built-in BigQuery Connector for Spark that bridges BigQuery and
        Cloud Dataproc.
    """
    
    from google.cloud import bigquery
    
    # Create a new Google BigQuery client using Google Cloud Platform project
    # defaults.
    client = bigquery.Client()
    
    # Prepare a reference to a new dataset for storing the query results.
    dataset_id = "natality_regression"
    dataset_id_full = f"{client.project}.{dataset_id}"
    
    dataset = bigquery.Dataset(dataset_id_full)
    
    # Create the new BigQuery dataset.
    dataset = client.create_dataset(dataset)
    
    # Configure the query job.
    job_config = bigquery.QueryJobConfig()
    
    # Set the destination table to where you want to store query results.
    # As of google-cloud-bigquery 1.11.0, a fully qualified table ID can be
    # used in place of a TableReference.
    job_config.destination = f"{dataset_id_full}.regression_input"
    
    # Set up a query in Standard SQL, which is the default for the BigQuery
    # Python client library.
    # The query selects the fields of interest.
    query = """
        SELECT
            weight_pounds, mother_age, father_age, gestation_weeks,
            weight_gain_pounds, apgar_5min
        FROM
            `bigquery-public-data.samples.natality`
        WHERE
            weight_pounds IS NOT NULL
            AND mother_age IS NOT NULL
            AND father_age IS NOT NULL
            AND gestation_weeks IS NOT NULL
            AND weight_gain_pounds IS NOT NULL
            AND apgar_5min IS NOT NULL
    """
    
    # Run the query.
    query_job = client.query(query, job_config=job_config)
    query_job.result()  # Waits for the query to finish
  3. natality_regression 데이터 세트와 regression_input 테이블이 만들어졌는지 확인합니다.

선형 회귀 실행

이 섹션에서 Coogle Cloud Console을 사용하여 작업을 Dataproc 서비스에 제출하거나 로컬 터미널에서 gcloud 명령어를 실행하여 PySpark 선형 회귀를 실행합니다.

Console

  1. 다음 코드를 복사하여 로컬 머신의 새 natality_sparkml.py 파일에 붙여넣습니다.

    """Run a linear regression using Apache Spark ML.
    
    In the following PySpark (Spark Python API) code, we take the following actions:
    
      * Load a previously created linear regression (BigQuery) input table
        into our Cloud Dataproc Spark cluster as an RDD (Resilient
        Distributed Dataset)
      * Transform the RDD into a Spark Dataframe
      * Vectorize the features on which the model will be trained
      * Compute a linear regression using Spark ML
    
    """
    from pyspark.context import SparkContext
    from pyspark.ml.linalg import Vectors
    from pyspark.ml.regression import LinearRegression
    from pyspark.sql.session import SparkSession
    # The imports, above, allow us to access SparkML features specific to linear
    # regression as well as the Vectors types.
    
    # Define a function that collects the features of interest
    # (mother_age, father_age, and gestation_weeks) into a vector.
    # Package the vector in a tuple containing the label (`weight_pounds`) for that
    # row.
    def vector_from_inputs(r):
      return (r["weight_pounds"], Vectors.dense(float(r["mother_age"]),
                                                float(r["father_age"]),
                                                float(r["gestation_weeks"]),
                                                float(r["weight_gain_pounds"]),
                                                float(r["apgar_5min"])))
    
    sc = SparkContext()
    spark = SparkSession(sc)
    
    # Read the data from BigQuery as a Spark Dataframe.
    natality_data = spark.read.format("bigquery").option(
        "table", "natality_regression.regression_input").load()
    # Create a view so that Spark SQL queries can be run against the data.
    natality_data.createOrReplaceTempView("natality")
    
    # As a precaution, run a query in Spark SQL to ensure no NULL values exist.
    sql_query = """
    SELECT *
    from natality
    where weight_pounds is not null
    and mother_age is not null
    and father_age is not null
    and gestation_weeks is not null
    """
    clean_data = spark.sql(sql_query)
    
    # Create an input DataFrame for Spark ML using the above function.
    training_data = clean_data.rdd.map(vector_from_inputs).toDF(["label",
                                                                 "features"])
    training_data.cache()
    
    # Construct a new LinearRegression object and fit the training data.
    lr = LinearRegression(maxIter=5, regParam=0.2, solver="normal")
    model = lr.fit(training_data)
    # Print the model summary.
    print("Coefficients:" + str(model.coefficients))
    print("Intercept:" + str(model.intercept))
    print("R^2:" + str(model.summary.r2))
    model.summary.residuals.show()
    
    

  2. 로컬 natality_sparkml.py 파일을 프로젝트의 Cloud Storage 버킷에 복사합니다.

    gsutil cp natality_sparkml.py gs://bucket-name
    

  3. Dataproc 작업 제출 페이지에서 회귀를 실행합니다.

    1. 기본 python 파일 필드에 natality_sparkml.py 파일의 사본이 있는 Cloud Storage 버킷의 gs:// URI를 입력합니다.

    2. 작업 유형으로 PySpark를 선택합니다.

    3. JAR 파일필드에 gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar을 입력합니다. 이렇게 하면 BigQuery 데이터를 Spark DataFrame으로 읽을 수 있도록 런타임 시 PySpark 애플리케이션에서 Spark-bigquery 커넥터를 사용할 수 있습니다.

    4. 작업 ID, 리전, 클러스터 필드를 입력합니다.

    5. 제출을 클릭하여 클러스터에서 작업을 실행합니다.

작업이 완료되면 Dataproc 작업 세부정보 창에 선형 회귀 출력 모델 요약이 나타납니다.

gcloud

  1. 다음 코드를 복사하여 로컬 머신의 새 natality_sparkml.py 파일에 붙여넣습니다.

    """Run a linear regression using Apache Spark ML.
    
    In the following PySpark (Spark Python API) code, we take the following actions:
    
      * Load a previously created linear regression (BigQuery) input table
        into our Cloud Dataproc Spark cluster as an RDD (Resilient
        Distributed Dataset)
      * Transform the RDD into a Spark Dataframe
      * Vectorize the features on which the model will be trained
      * Compute a linear regression using Spark ML
    
    """
    from pyspark.context import SparkContext
    from pyspark.ml.linalg import Vectors
    from pyspark.ml.regression import LinearRegression
    from pyspark.sql.session import SparkSession
    # The imports, above, allow us to access SparkML features specific to linear
    # regression as well as the Vectors types.
    
    # Define a function that collects the features of interest
    # (mother_age, father_age, and gestation_weeks) into a vector.
    # Package the vector in a tuple containing the label (`weight_pounds`) for that
    # row.
    def vector_from_inputs(r):
      return (r["weight_pounds"], Vectors.dense(float(r["mother_age"]),
                                                float(r["father_age"]),
                                                float(r["gestation_weeks"]),
                                                float(r["weight_gain_pounds"]),
                                                float(r["apgar_5min"])))
    
    sc = SparkContext()
    spark = SparkSession(sc)
    
    # Read the data from BigQuery as a Spark Dataframe.
    natality_data = spark.read.format("bigquery").option(
        "table", "natality_regression.regression_input").load()
    # Create a view so that Spark SQL queries can be run against the data.
    natality_data.createOrReplaceTempView("natality")
    
    # As a precaution, run a query in Spark SQL to ensure no NULL values exist.
    sql_query = """
    SELECT *
    from natality
    where weight_pounds is not null
    and mother_age is not null
    and father_age is not null
    and gestation_weeks is not null
    """
    clean_data = spark.sql(sql_query)
    
    # Create an input DataFrame for Spark ML using the above function.
    training_data = clean_data.rdd.map(vector_from_inputs).toDF(["label",
                                                                 "features"])
    training_data.cache()
    
    # Construct a new LinearRegression object and fit the training data.
    lr = LinearRegression(maxIter=5, regParam=0.2, solver="normal")
    model = lr.fit(training_data)
    # Print the model summary.
    print("Coefficients:" + str(model.coefficients))
    print("Intercept:" + str(model.intercept))
    print("R^2:" + str(model.summary.r2))
    model.summary.residuals.show()
    
    

  2. 로컬 natality_sparkml.py 파일을 프로젝트의 Cloud Storage 버킷에 복사합니다.

    gsutil cp natality_sparkml.py gs://bucket-name
    

  3. 아래와 같이 로컬 머신의 터미널 창에서 gcloud 명령어를 실행하여 Pyspark 작업을 Dataproc 서비스에 제출합니다.

    1. - jars 플래그 값은 BigQuery 데이터를 Spark DataFrame으로 읽을 수 있도록 런타임 시 PySpark jobv에서 Spark-bigquery 커넥터를 사용할 수 있도록 합니다.
      gcloud dataproc jobs submit pyspark \
          gs://your-bucket/natality_sparkml.py \
          --cluster=cluster-name \
          --region=region \
          --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_SCALA_VERSION-CONNECTOR_VERSION.jar
      

작업이 완료되면 선형 회귀 출력(모델 요약)이 터미널 창에 나타납니다.

<<< # Print the model summary.
... print "Coefficients:" + str(model.coefficients)
Coefficients:[0.0166657454602,-0.00296751984046,0.235714392936,0.00213002070133,-0.00048577251587]
<<< print "Intercept:" + str(model.intercept)
Intercept:-2.26130330748
<<< print "R^2:" + str(model.summary.r2)
R^2:0.295200579035
<<< model.summary.residuals.show()
+--------------------+
|           residuals|
+--------------------+
| -0.7234737533344147|
|  -0.985466980630501|
| -0.6669710598385468|
|  1.4162434829714794|
|-0.09373154375186754|
|-0.15461747949235072|
| 0.32659061654192545|
|  1.5053877697929803|
|  -0.640142797263989|
|   1.229530260294963|
|-0.03776160295256...|
| -0.5160734239126814|
| -1.5165972740062887|
|  1.3269085258245008|
|  1.7604670124710626|
|  1.2348130901905972|
|   2.318660276655887|
|  1.0936947030883175|
|  1.0169768511417363|
| -1.7744915698181583|
+--------------------+
only showing top 20 rows.

삭제

튜토리얼을 완료한 후에는 만든 리소스를 삭제하여 할당량 사용을 중지하고 요금이 청구되지 않도록 할 수 있습니다. 다음 섹션은 이러한 리소스를 삭제하거나 사용 중지하는 방법을 설명합니다.

프로젝트 삭제

비용이 청구되지 않도록 하는 가장 쉬운 방법은 튜토리얼에서 만든 프로젝트를 삭제하는 것입니다.

프로젝트를 삭제하는 방법은 다음과 같습니다.

  1. Google Cloud 콘솔에서 리소스 관리 페이지로 이동합니다.

    리소스 관리로 이동

  2. 프로젝트 목록에서 삭제할 프로젝트를 선택하고 삭제를 클릭합니다.
  3. 대화상자에서 프로젝트 ID를 입력한 후 종료를 클릭하여 프로젝트를 삭제합니다.

Dataproc 클러스터 삭제

클러스터 삭제를 참조하세요.

다음 단계