Usa Cloud Dataproc, BigQuery y Apache Spark ML para el aprendizaje automático

El Conector de BigQuery de Apache Spark permite a los científicos de datos combinar el poder del motor SQL perfectamente escalable de BigQuery con las capacidades del aprendizaje automático de Apache Spark. En este instructivo, mostramos cómo usar Dataproc, BigQuery y Apache Spark ML para realizar el aprendizaje automático en un conjunto de datos.

Objetivos

Usa la regresión lineal para compilar un modelo de peso de un recién nacido como una función de cinco factores:

  1. semanas de gestación
  2. edad de la madre
  3. edad del padre
  4. aumento de peso de la madre durante el embarazo
  5. puntuación de Apgar

BigQuery se usa para preparar la tabla de entrada de regresión lineal, que se escribe en tu proyecto de Google Cloud Platform. Python se usa para consultar y administrar datos en BigQuery. Se accede a la tabla de regresión lineal resultante en Apache Spark y se usa Spark ML para compilar y evaluar el modelo. Se usa un trabajo de PySpark en Dataproc para invocar las funciones de Spark ML.

Costos

En este instructivo, se usan componentes facturables de Google Cloud Platform, como los que se indican a continuación:

  • Compute Engine
  • Dataproc
  • BigQuery

Usa la calculadora de precios para generar una estimación de los costos según el uso previsto. Los usuarios nuevos de Cloud Platform pueden cumplir los requisitos para una prueba gratuita.

Antes de comenzar

Un clúster de Cloud Dataproc tiene los componentes de Spark, incluido Spark ML, instalados. Para configurar un clúster de Dataproc y ejecutar el código en este ejemplo, necesitarás hacer (o haber hecho) lo siguiente:

  1. Accede a tu Cuenta de Google.

    Si todavía no tienes una cuenta, regístrate para obtener una nueva.

  2. En la página de selección de proyectos de Cloud Console, selecciona o crea un proyecto de Cloud.

    Ir a la página Selector de proyectos

  3. Habilita las API de Dataproc, BigQuery, Compute Engine.

    Habilita las API

  4. Instala e inicializa el SDK de Cloud.
  5. Crear un clúster de Cloud Dataproc en tu proyecto. Tu clúster debe ejecutar una versión de Dataproc con Spark 2.0 o superior, (incluye las bibliotecas de aprendizaje automático).

Crear un subconjunto de datos de BigQuery natality

En esta sección, creas un conjunto de datos en tu proyecto y luego creas una tabla en el conjunto de datos al que copias el subconjunto de datos de tasa de natalidad del conjunto de datos de BigQuery de natalidad disponible públicamente. Más adelante en este instructivo, usarás los datos del subconjunto en esta tabla para predecir en peso del recién nacido en función de la edad de la madre, del padre y las semanas de gestación.

Puedes crear el subconjunto de datos con Google Cloud Console o ejecutar una secuencia de comandos de Python en tu máquina local.

Console

  1. Crea un conjunto de datos en tu proyecto.

    1. Ve a la IU web de BigQuery.
    2. En el panel de navegación izquierdo, haz clic en el nombre del proyecto y, luego, en CREAR UN CONJUNTO DE DATOS.
    3. En el diálogo Crear conjunto de datos, realice lo siguiente:
      1. Para el ID de conjunto de datos, ingresa "natality_regression".
      2. En Ubicación de los datos, puedes seleccionar una ubicación para el conjunto de datos. La ubicación predeterminada es US multi-region. Una vez que se crea un conjunto de datos, la ubicación no se puede cambiar.
      3. En Vencimiento predeterminado de la tabla, selecciona una de las siguientes opciones:
        • Nunca (predeterminado): Debes borrar la tabla manualmente.
        • Cantidad de días: La tabla se borrará después de la cantidad de días especificada a partir del momento de su creación.
      4. En Encriptación, elige una de las siguientes opciones:
      5. Haz clic en Create dataset.
  2. Ejecuta una consulta en el conjunto de datos público de natalidad y, luego, guarda los resultados de la consulta en una tabla nueva en tu conjunto de datos.

    1. Copia y pega la siguiente consulta en el Editor de consultas y, luego, haz clic en Ejecutar.
      SELECT
      weight_pounds,
      mother_age,
      father_age,
      gestation_weeks,
      weight_gain_pounds,
      apgar_5min
      FROM
      `bigquery-public-data.samples.natality`
      WHERE
      weight_pounds IS NOT NULL
      AND mother_age IS NOT NULL
      AND father_age IS NOT NULL
      AND gestation_weeks IS NOT NULL
      AND weight_gain_pounds IS NOT NULL
      AND apgar_5min IS NOT NULL
      
    2. Cuando la consulta finalice (aproximadamente en 1 minuto), haz clic en GUARDAR RESULTADOS y, luego, selecciona la opción de guardar los resultados como una tabla "regression_input" de BigQuery en el conjunto de datos natality_regression de tu proyecto.

Python

  1. Consulta la documentación sobre cómo configurar el entorno de desarrollo de Python para obtener instrucciones sobre cómo instalar Python y la Biblioteca cliente de Google Cloud para Python (necesaria para ejecutar el código). Se recomienda instalar y usar un virtualenv de Python.

  2. Copia y pega el código natality_tutorial.py que aparece a continuación en un shell python en tu máquina local. Presiona la tecla <return> en el shell para ejecutar el código y crear un conjunto de datos de BigQuery "natality_regression" en tu proyecto predeterminado de Google Cloud con una tabla "regression_input" que se propaga con un subconjunto de los datos natality públicos.

    """Create a Google BigQuery linear regression input table.
    
    In the code below, the following actions are taken:
    * A new dataset is created "natality_regression."
    * A query is run against the public dataset,
        bigquery-public-data.samples.natality, selecting only the data of
        interest to the regression, the output of which is stored in a new
        "regression_input" table.
    * The output table is moved over the wire to the user's default project via
        the built-in BigQuery Connector for Spark that bridges BigQuery and
        Cloud Dataproc.
    """
    
    from google.cloud import bigquery
    
    # Create a new Google BigQuery client using Google Cloud Platform project
    # defaults.
    client = bigquery.Client()
    
    # Prepare a reference to a new dataset for storing the query results.
    dataset_id = "natality_regression"
    
    dataset = bigquery.Dataset(client.dataset(dataset_id))
    
    # Create the new BigQuery dataset.
    dataset = client.create_dataset(dataset)
    
    # In the new BigQuery dataset, create a reference to a new table for
    # storing the query results.
    table_ref = dataset.table("regression_input")
    
    # Configure the query job.
    job_config = bigquery.QueryJobConfig()
    
    # Set the destination table to the table reference created above.
    job_config.destination = table_ref
    
    # Set up a query in Standard SQL, which is the default for the BigQuery
    # Python client library.
    # The query selects the fields of interest.
    query = """
        SELECT
            weight_pounds, mother_age, father_age, gestation_weeks,
            weight_gain_pounds, apgar_5min
        FROM
            `bigquery-public-data.samples.natality`
        WHERE
            weight_pounds IS NOT NULL
            AND mother_age IS NOT NULL
            AND father_age IS NOT NULL
            AND gestation_weeks IS NOT NULL
            AND weight_gain_pounds IS NOT NULL
            AND apgar_5min IS NOT NULL
    """
    
    # Run the query.
    query_job = client.query(query, job_config=job_config)
    query_job.result()  # Waits for the query to finish
  3. Confirma la creación del conjunto de datos natality_regression y la tabla regression_input.

Ejecuta una regresión lineal

En esta sección, ejecutarás una regresión lineal de PySpark mediante el envío del trabajo al servicio de Dataproc con Google Cloud Console o a través del comando gcloud desde una terminal local.

Console

  1. Copia y pega el siguiente código en un archivo natality_sparkml.py nuevo en tu máquina local.

    """Run a linear regression using Apache Spark ML.
    
    In the following PySpark (Spark Python API) code, we take the following actions:
    
      * Load a previously created linear regression (BigQuery) input table
        into our Cloud Dataproc Spark cluster as an RDD (Resilient
        Distributed Dataset)
      * Transform the RDD into a Spark Dataframe
      * Vectorize the features on which the model will be trained
      * Compute a linear regression using Spark ML
    
    """
    
    from __future__ import print_function
    from pyspark.context import SparkContext
    from pyspark.ml.linalg import Vectors
    from pyspark.ml.regression import LinearRegression
    from pyspark.sql.session import SparkSession
    # The imports, above, allow us to access SparkML features specific to linear
    # regression as well as the Vectors types.
    
    # Define a function that collects the features of interest
    # (mother_age, father_age, and gestation_weeks) into a vector.
    # Package the vector in a tuple containing the label (`weight_pounds`) for that
    # row.
    def vector_from_inputs(r):
      return (r["weight_pounds"], Vectors.dense(float(r["mother_age"]),
                                                float(r["father_age"]),
                                                float(r["gestation_weeks"]),
                                                float(r["weight_gain_pounds"]),
                                                float(r["apgar_5min"])))
    
    sc = SparkContext()
    spark = SparkSession(sc)
    
    # Read the data from BigQuery as a Spark Dataframe.
    natality_data = spark.read.format("bigquery").option(
        "table", "natality_regression.regression_input").load()
    # Create a view so that Spark SQL queries can be run against the data.
    natality_data.createOrReplaceTempView("natality")
    
    # As a precaution, run a query in Spark SQL to ensure no NULL values exist.
    sql_query = """
    SELECT *
    from natality
    where weight_pounds is not null
    and mother_age is not null
    and father_age is not null
    and gestation_weeks is not null
    """
    clean_data = spark.sql(sql_query)
    
    # Create an input DataFrame for Spark ML using the above function.
    training_data = clean_data.rdd.map(vector_from_inputs).toDF(["label",
                                                                 "features"])
    training_data.cache()
    
    # Construct a new LinearRegression object and fit the training data.
    lr = LinearRegression(maxIter=5, regParam=0.2, solver="normal")
    model = lr.fit(training_data)
    # Print the model summary.
    print("Coefficients:" + str(model.coefficients))
    print("Intercept:" + str(model.intercept))
    print("R^2:" + str(model.summary.r2))
    model.summary.residuals.show()
    
    

  2. Copia el archivo natality_sparkml.py local a un depósito de Cloud Storage en tu proyecto.

    gsutil cp natality_sparkml.py gs://bucket-name
    

  3. Ejecuta la regresión desde la página Enviar un trabajo de Dataproc.

    1. En el campo Archivo principal de Python, inserta la URI gs:// del depósito de Cloud Storage donde se encuentra la copia del archivo natality_sparkml.py.

    2. Selecciona PySpark como el Tipo de trabajo.

    3. Inserta gs://spark-lib/bigquery/spark-bigquery-latest.jar en el campo Archivos jar. Esto hace que spark-bigquery-connector esté disponible para la aplicación PySpark en el tiempo de ejecución a fin de permitirle leer datos de BigQuery en un DataFrame de Spark.

    4. Completa los campos ID de trabajo, Región y Clúster.

    5. Haz clic en Enviar para ejecutar el trabajo en tu clúster.

Cuando el trabajo finalice, el resumen del modelo del resultado de la regresión lineal aparecerá en la ventana de detalles del trabajo de Dataproc.

gcloud

  1. Copia y pega el siguiente código en un archivo natality_sparkml.py nuevo en tu máquina local.

    """Run a linear regression using Apache Spark ML.
    
    In the following PySpark (Spark Python API) code, we take the following actions:
    
      * Load a previously created linear regression (BigQuery) input table
        into our Cloud Dataproc Spark cluster as an RDD (Resilient
        Distributed Dataset)
      * Transform the RDD into a Spark Dataframe
      * Vectorize the features on which the model will be trained
      * Compute a linear regression using Spark ML
    
    """
    
    from __future__ import print_function
    from pyspark.context import SparkContext
    from pyspark.ml.linalg import Vectors
    from pyspark.ml.regression import LinearRegression
    from pyspark.sql.session import SparkSession
    # The imports, above, allow us to access SparkML features specific to linear
    # regression as well as the Vectors types.
    
    # Define a function that collects the features of interest
    # (mother_age, father_age, and gestation_weeks) into a vector.
    # Package the vector in a tuple containing the label (`weight_pounds`) for that
    # row.
    def vector_from_inputs(r):
      return (r["weight_pounds"], Vectors.dense(float(r["mother_age"]),
                                                float(r["father_age"]),
                                                float(r["gestation_weeks"]),
                                                float(r["weight_gain_pounds"]),
                                                float(r["apgar_5min"])))
    
    sc = SparkContext()
    spark = SparkSession(sc)
    
    # Read the data from BigQuery as a Spark Dataframe.
    natality_data = spark.read.format("bigquery").option(
        "table", "natality_regression.regression_input").load()
    # Create a view so that Spark SQL queries can be run against the data.
    natality_data.createOrReplaceTempView("natality")
    
    # As a precaution, run a query in Spark SQL to ensure no NULL values exist.
    sql_query = """
    SELECT *
    from natality
    where weight_pounds is not null
    and mother_age is not null
    and father_age is not null
    and gestation_weeks is not null
    """
    clean_data = spark.sql(sql_query)
    
    # Create an input DataFrame for Spark ML using the above function.
    training_data = clean_data.rdd.map(vector_from_inputs).toDF(["label",
                                                                 "features"])
    training_data.cache()
    
    # Construct a new LinearRegression object and fit the training data.
    lr = LinearRegression(maxIter=5, regParam=0.2, solver="normal")
    model = lr.fit(training_data)
    # Print the model summary.
    print("Coefficients:" + str(model.coefficients))
    print("Intercept:" + str(model.intercept))
    print("R^2:" + str(model.summary.r2))
    model.summary.residuals.show()
    
    

  2. Copia el archivo natality_sparkml.py local a un depósito de Cloud Storage en tu proyecto.

    gsutil cp natality_sparkml.py gs://bucket-name
    

  3. Envía el trabajo de Pyspark al servicio de Dataproc mediante la ejecución del comando gcloud, que se muestra a continuación, desde una ventana de la terminal en tu máquina local.

    1. Para el valor de la marca --files, ingresa el nombre del depósito de Cloud Storage donde se encuentra la copia del archivo natality_sparkml.py.
    2. El valor de la marca --jars hace que spark-bigquery-connector esté disponible para el trabajo de PySpark en el entorno de ejecución a fin de que pueda leer datos de BigQuery en un DataFrame de Spark.
      gcloud dataproc jobs submit pyspark \
          --cluster=cluster-name \
          --region=region \
          --files=gs://your-bucket/natality_sparkml.py \
          --jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar
      

Cuando finaliza el trabajo, el resultado de la regresión lineal (resumen del modelo) aparece en la ventana de la terminal.

<<< # Print the model summary.
... print "Coefficients:" + str(model.coefficients)
Coefficients:[0.0166657454602,-0.00296751984046,0.235714392936,0.00213002070133,-0.00048577251587]
<<< print "Intercept:" + str(model.intercept)
Intercept:-2.26130330748
<<< print "R^2:" + str(model.summary.r2)
R^2:0.295200579035
<<< model.summary.residuals.show()
+--------------------+
|           residuals|
+--------------------+
| -0.7234737533344147|
|  -0.985466980630501|
| -0.6669710598385468|
|  1.4162434829714794|
|-0.09373154375186754|
|-0.15461747949235072|
| 0.32659061654192545|
|  1.5053877697929803|
|  -0.640142797263989|
|   1.229530260294963|
|-0.03776160295256...|
| -0.5160734239126814|
| -1.5165972740062887|
|  1.3269085258245008|
|  1.7604670124710626|
|  1.2348130901905972|
|   2.318660276655887|
|  1.0936947030883175|
|  1.0169768511417363|
| -1.7744915698181583|
+--------------------+
only showing top 20 rows.

Realice una limpieza

Una vez que termines el instructivo Uso de Dataproc, BigQuery y Apache Spark ML para el aprendizaje automático, puedes limpiar los recursos que creaste en Google Cloud para que no consuman la cuota y no que se facturarán en el futuro. En las siguientes secciones, se describe cómo borrar o desactivar estos recursos.

Borra el proyecto

La manera más fácil de eliminar la facturación es borrar el proyecto que creaste para el instructivo.

Para borrar el proyecto, sigue estos pasos:

  1. En Cloud Console, ve a la página Administrar recursos.

    Ir a la página Administrar recursos

  2. En la lista de proyectos, selecciona el proyecto que deseas borrar y haz clic en Borrar .
  3. En el cuadro de diálogo, escribe el ID del proyecto y haz clic en Cerrar para borrar el proyecto.

Cómo borrar el clúster de Dataproc

Consulta Borra un clúster.