Usa Cloud Dataproc, BigQuery y Apache Spark ML para el aprendizaje automático

El Conector de Google BigQuery para Apache Spark permite que los científicos de datos combinen el poder del motor SQL perfectamente escalable de BigQuery con las capacidades del aprendizaje automático de Apache Spark. En este instructivo, mostramos cómo usar Cloud Dataproc, BigQuery y Apache Spark ML para realizar el aprendizaje automático en un conjunto de datos.

Objetivos

Usa la regresión lineal para compilar un modelo de peso de un recién nacido como una función de cinco factores:

  1. semanas de gestación
  2. edad de la madre
  3. edad del padre
  4. aumento de peso de la madre durante el embarazo
  5. puntuación de Apgar

BigQuery se usa para preparar la tabla de entrada de regresión lineal, que se escribe en tu proyecto de Google Cloud Platform. La ejecución de Python en tu máquina local se usa para consultar y administrar datos en BigQuery. Se accede a la tabla de regresión lineal resultante en Apache Spark y se usa Spark ML para compilar y evaluar el modelo. Se usa la ejecución de PySpark en la VM principal en tu clúster de Cloud Dataproc para invocar funciones de Spark ML.

Costos

En este instructivo, se usan componentes facturables de Google Cloud Platform, que incluyen:

  • Google Compute Engine
  • Google Cloud Dataproc
  • Google BigQuery

Usa la calculadora de precios para generar una estimación de los costos según el uso previsto. Los usuarios nuevos de Cloud Platform pueden optar por una prueba gratuita.

Antes de comenzar

Un clúster de Cloud Dataproc tiene los componentes de Spark, incluido Spark ML, instalados. Para configurar un clúster de Cloud Dataproc y ejecutar el código en este ejemplo, necesitarás hacer (o haber hecho) lo siguiente:

  1. Accede a tu Cuenta de Google.

    Si todavía no tienes una cuenta, regístrate para obtener una nueva.

  2. Selecciona o crea un proyecto de GCP.

    Ir a la página Administrar recursos

  3. Habilita las Google Cloud Dataproc, BigQuery, Google Compute Engine API necesarias.

    Habilita las API

  4. Realiza la instalación y la inicialización del SDK de Cloud.
  5. Crear un clúster de Cloud Dataproc en tu proyecto. Tu clúster debe ejecutar una versión de Cloud Dataproc con Spark 2.0 o superior, (incluye las bibliotecas de aprendizaje automático).
  6. Si estás usando la versión de Cloud Dataproc 1.3 o superior (consulta Versiones de Cloud Dataproc admitidas), instala el conector de BigQuery.

Crea un subconjunto de datos de natality de BigQuery

En esta sección, creas un conjunto de datos en tu proyecto y luego creas una tabla en el conjunto de datos al que copias el subconjunto de datos de tasa de natalidad del conjunto de datos de BigQuery de natalidad disponible públicamente. Más adelante en este instructivo, usarás los datos del subconjunto en esta tabla para predecir en peso del recién nacido en función de la edad de la madre, del padre y las semanas de gestación.

Console

  1. Crea un conjunto de datos en tu proyecto.
    1. Ve a la IU web de BigQuery.
    2. En el panel de navegación izquierdo, haz clic en el ícono de flecha hacia abajo ícono de flecha hacia abajo junto al nombre de tu proyecto y luego haz clic en Crear nuevo conjunto de datos (Create new dataset).
    3. En el diálogo Crear conjunto de datos (Create Dataset):
      • Para el ID de conjunto de datos (Dataset ID), ingresa "natality_regression".
      • Para Ubicación de los datos (Data location), puedes seleccionar una ubicación para el conjunto de datos. La ubicación del valor predeterminada es US multi-region. Después de crear un conjunto de datos, la ubicación no se puede cambiar.
      • Para Vencimiento de la tabla predeterminada (Default table expiration), selecciona una de las opciones siguientes:
        • Nunca (Never): (predeterminada) las tablas que se crearon en el conjunto de datos no se borran automáticamente. Debes borrarlas manualmente.
        • Cantidad de días (Number of days): cualquier tabla creada en el conjunto de datos se borra después de la cantidad de días especificada a partir del momento de su creación. Este valor se aplica si no estableces un vencimiento en la tabla cuando la creas. Crear conjunto de datos
    4. Haz clic en Crear conjunto de datos (Create dataset).
  2. Ejecuta una consulta en el conjunto de datos de natalidad y luego guarda los resultados de la consulta en una tabla nueva en tu conjunto de datos.
    1. Copia y pega la siguiente consulta en el Editor de consultas y luego haz clic en Ejecutar consulta (Run Query).
      SELECT
        weight_pounds,
        mother_age,
        father_age,
        gestation_weeks,
        weight_gain_pounds,
        apgar_5min
      FROM
        `bigquery-public-data.samples.natality`
      WHERE
        weight_pounds IS NOT NULL
        AND mother_age IS NOT NULL
        AND father_age IS NOT NULL
        AND gestation_weeks IS NOT NULL
        AND weight_gain_pounds IS NOT NULL
        AND apgar_5min IS NOT NULL
      
    2. Después de que se completa la consulta (luego de aproximadamente 1 minuto), en el panel Resultados de la consulta (Query results), selecciona GUARDAR COMO →Guardar como tabla (SAVE AS →Save as table).
    3. Completa el diálogo Guardar como tabla (Save as table) para guardar los resultados de la consulta en una tabla "regression_input" en tu conjunto de datos natality_regression. Haz clic en Save (Guardar).

Python

  1. Consulta Cómo configurar el entorno de desarrollo de Python para obtener instrucciones sobre cómo instalar Python y la Biblioteca cliente de Google Cloud para Python (necesaria para ejecutar el código). Se recomienda la instalación y el uso de un virtualenv de Python.
  2. Copia y pega el código natality_tutorial.py, a continuación, en una shell python en tu máquina local. Presiona la tecla <return> en la shell para ejecutar el código y así crear un conjunto de datos de BigQuery "natality_regression" en tu proyecto predeterminado de GCP con una tabla "regression_input" que se propaga con un subconjunto de los datos de natality públicos.
    """Create a Google BigQuery linear regression input table.
    
    In the code below, the following actions are taken:
    * A new dataset is created "natality_regression."
    * A query is run against the public dataset,
        bigquery-public-data.samples.natality, selecting only the data of
        interest to the regression, the output of which is stored in a new
        "regression_input" table.
    * The output table is moved over the wire to the user's default project via
        the built-in BigQuery Connector for Spark that bridges BigQuery and
        Cloud Dataproc.
    """
    
    from google.cloud import bigquery
    
    # Create a new Google BigQuery client using Google Cloud Platform project
    # defaults.
    client = bigquery.Client()
    
    # Prepare a reference to a new dataset for storing the query results.
    dataset_ref = client.dataset('natality_regression')
    dataset = bigquery.Dataset(dataset_ref)
    
    # Create the new BigQuery dataset.
    dataset = client.create_dataset(dataset)
    
    # In the new BigQuery dataset, create a reference to a new table for
    # storing the query results.
    table_ref = dataset.table('regression_input')
    
    # Configure the query job.
    job_config = bigquery.QueryJobConfig()
    
    # Set the destination table to the table reference created above.
    job_config.destination = table_ref
    
    # Set up a query in Standard SQL, which is the default for the BigQuery
    # Python client library.
    # The query selects the fields of interest.
    query = """
        SELECT
            weight_pounds, mother_age, father_age, gestation_weeks,
            weight_gain_pounds, apgar_5min
        FROM
            `bigquery-public-data.samples.natality`
        WHERE
            weight_pounds IS NOT NULL
            AND mother_age IS NOT NULL
            AND father_age IS NOT NULL
            AND gestation_weeks IS NOT NULL
            AND weight_gain_pounds IS NOT NULL
            AND apgar_5min IS NOT NULL
    """
    
    # Run the query.
    query_job = client.query(query, job_config=job_config)
    query_job.result()  # Waits for the query to finish

Ejecuta una regresión lineal

En esta sección, ejecutarás una regresión lineal de PySpark desde Google Cloud Platform Console o desde el nodo principal del clúster de Cloud Dataproc.

Console

  1. Copia y pega el siguiente código en un archivo natality_sparkml.py nuevo en tu máquina local.
    """Run a linear regression using Apache Spark ML.
    
    In the following PySpark (Spark Python API) code, we take the following actions:
    
      * Load a previously created linear regression (BigQuery) input table
        into our Cloud Dataproc Spark cluster as an RDD (Resilient
        Distributed Dataset)
      * Transform the RDD into a Spark Dataframe
      * Vectorize the features on which the model will be trained
      * Compute a linear regression using Spark ML
    
    """
    
    from datetime import datetime
    from pyspark.context import SparkContext
    from pyspark.ml.linalg import Vectors
    from pyspark.ml.regression import LinearRegression
    from pyspark.sql.session import SparkSession
    # The imports, above, allow us to access SparkML features specific to linear
    # regression as well as the Vectors types.
    
    # Define a function that collects the features of interest
    # (mother_age, father_age, and gestation_weeks) into a vector.
    # Package the vector in a tuple containing the label (`weight_pounds`) for that
    # row.
    
    def vector_from_inputs(r):
      return (r["weight_pounds"], Vectors.dense(float(r["mother_age"]),
                                                float(r["father_age"]),
                                                float(r["gestation_weeks"]),
                                                float(r["weight_gain_pounds"]),
                                                float(r["apgar_5min"])))
    
    # Use Cloud Dataprocs automatically propagated configurations to get
    # the Cloud Storage bucket and Google Cloud Platform project for this
    # cluster.
    sc = SparkContext()
    spark = SparkSession(sc)
    bucket = spark._jsc.hadoopConfiguration().get("fs.gs.system.bucket")
    project = spark._jsc.hadoopConfiguration().get("fs.gs.project.id")
    
    # Set an input directory for reading data from Bigquery.
    todays_date = datetime.strftime(datetime.today(), "%Y-%m-%d-%H-%M-%S")
    input_directory = "gs://{}/tmp/natality-{}".format(bucket, todays_date)
    
    # Set the configuration for importing data from BigQuery.
    # Specifically, make sure to set the project ID and bucket for Cloud Dataproc,
    # and the project ID, dataset, and table names for BigQuery.
    
    conf = {
        # Input Parameters
        "mapred.bq.project.id": project,
        "mapred.bq.gcs.bucket": bucket,
        "mapred.bq.temp.gcs.path": input_directory,
        "mapred.bq.input.project.id": project,
        "mapred.bq.input.dataset.id": "natality_regression",
        "mapred.bq.input.table.id": "regression_input",
    }
    
    # Read the data from BigQuery into Spark as an RDD.
    table_data = spark.sparkContext.newAPIHadoopRDD(
        "com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat",
        "org.apache.hadoop.io.LongWritable",
        "com.google.gson.JsonObject",
        conf=conf)
    
    # Extract the JSON strings from the RDD.
    table_json = table_data.map(lambda x: x[1])
    
    # Load the JSON strings as a Spark Dataframe.
    natality_data = spark.read.json(table_json)
    # Create a view so that Spark SQL queries can be run against the data.
    natality_data.createOrReplaceTempView("natality")
    
    # As a precaution, run a query in Spark SQL to ensure no NULL values exist.
    sql_query = """
    SELECT *
    from natality
    where weight_pounds is not null
    and mother_age is not null
    and father_age is not null
    and gestation_weeks is not null
    """
    clean_data = spark.sql(sql_query)
    
    # Create an input DataFrame for Spark ML using the above function.
    training_data = clean_data.rdd.map(vector_from_inputs).toDF(["label",
                                                                 "features"])
    training_data.cache()
    
    # Construct a new LinearRegression object and fit the training data.
    lr = LinearRegression(maxIter=5, regParam=0.2, solver="normal")
    model = lr.fit(training_data)
    # Print the model summary.
    print "Coefficients:" + str(model.coefficients)
    print "Intercept:" + str(model.intercept)
    print "R^2:" + str(model.summary.r2)
    model.summary.residuals.show()
    
        
  2. Copia el archivo natality_sparkml.py local al depósito de Cloud Storage en tu proyecto.
    gsutil cp natality_sparkml.py gs://bucket-name
    
  3. Ejecuta la regresión desde la página Enviar un trabajo (Submit a job) de Cloud Dataproc. Inserta el URI gs:// del depósito de Cloud Storage donde se ubica tu natality_sparkml.py. Selecciona PySpark como el Job type (Tipo de trabajo). Haz clic en Enviar (Submit) para ejecutar el trabajo en tu clúster.
    Cuando finaliza el trabajo, el resumen del modelo del resultado de la regresión lineal aparece en la ventana de detalles del trabajo de Cloud Dataproc.

PySpark

  1. Establece una conexión SSH a tu nodo principal del clúster de Cloud Dataproc
    1. Ve a la página de tu proyecto Cloud Dataproc→Clústeres (Clusters).
    2. Haz clic en el nombre del clúster para abrir la página Detalles del clúster (Cluster details). Luego, haz clic en la pestaña Instancias de VM (VM instances) y selecciona SSH→Abrir en la ventana del navegador (Open in browser window).
    3. Se abre una ventana de la terminal que está conectada a la instancia principal en tu navegador.
  2. Ejecuta pyspark para abrir una shell de PySpark.
  3. Copia y pega el siguiente código en la shell y luego presiona para ejecutar la regresión lineal de Spark ML.
    """Run a linear regression using Apache Spark ML.
    
    In the following PySpark (Spark Python API) code, we take the following actions:
    
      * Load a previously created linear regression (BigQuery) input table
        into our Cloud Dataproc Spark cluster as an RDD (Resilient
        Distributed Dataset)
      * Transform the RDD into a Spark Dataframe
      * Vectorize the features on which the model will be trained
      * Compute a linear regression using Spark ML
    
    """
    
    from datetime import datetime
    from pyspark.context import SparkContext
    from pyspark.ml.linalg import Vectors
    from pyspark.ml.regression import LinearRegression
    from pyspark.sql.session import SparkSession
    # The imports, above, allow us to access SparkML features specific to linear
    # regression as well as the Vectors types.
    
    # Define a function that collects the features of interest
    # (mother_age, father_age, and gestation_weeks) into a vector.
    # Package the vector in a tuple containing the label (`weight_pounds`) for that
    # row.
    
    def vector_from_inputs(r):
      return (r["weight_pounds"], Vectors.dense(float(r["mother_age"]),
                                                float(r["father_age"]),
                                                float(r["gestation_weeks"]),
                                                float(r["weight_gain_pounds"]),
                                                float(r["apgar_5min"])))
    
    # Use Cloud Dataprocs automatically propagated configurations to get
    # the Cloud Storage bucket and Google Cloud Platform project for this
    # cluster.
    sc = SparkContext()
    spark = SparkSession(sc)
    bucket = spark._jsc.hadoopConfiguration().get("fs.gs.system.bucket")
    project = spark._jsc.hadoopConfiguration().get("fs.gs.project.id")
    
    # Set an input directory for reading data from Bigquery.
    todays_date = datetime.strftime(datetime.today(), "%Y-%m-%d-%H-%M-%S")
    input_directory = "gs://{}/tmp/natality-{}".format(bucket, todays_date)
    
    # Set the configuration for importing data from BigQuery.
    # Specifically, make sure to set the project ID and bucket for Cloud Dataproc,
    # and the project ID, dataset, and table names for BigQuery.
    
    conf = {
        # Input Parameters
        "mapred.bq.project.id": project,
        "mapred.bq.gcs.bucket": bucket,
        "mapred.bq.temp.gcs.path": input_directory,
        "mapred.bq.input.project.id": project,
        "mapred.bq.input.dataset.id": "natality_regression",
        "mapred.bq.input.table.id": "regression_input",
    }
    
    # Read the data from BigQuery into Spark as an RDD.
    table_data = spark.sparkContext.newAPIHadoopRDD(
        "com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat",
        "org.apache.hadoop.io.LongWritable",
        "com.google.gson.JsonObject",
        conf=conf)
    
    # Extract the JSON strings from the RDD.
    table_json = table_data.map(lambda x: x[1])
    
    # Load the JSON strings as a Spark Dataframe.
    natality_data = spark.read.json(table_json)
    # Create a view so that Spark SQL queries can be run against the data.
    natality_data.createOrReplaceTempView("natality")
    
    # As a precaution, run a query in Spark SQL to ensure no NULL values exist.
    sql_query = """
    SELECT *
    from natality
    where weight_pounds is not null
    and mother_age is not null
    and father_age is not null
    and gestation_weeks is not null
    """
    clean_data = spark.sql(sql_query)
    
    # Create an input DataFrame for Spark ML using the above function.
    training_data = clean_data.rdd.map(vector_from_inputs).toDF(["label",
                                                                 "features"])
    training_data.cache()
    
    # Construct a new LinearRegression object and fit the training data.
    lr = LinearRegression(maxIter=5, regParam=0.2, solver="normal")
    model = lr.fit(training_data)
    # Print the model summary.
    print "Coefficients:" + str(model.coefficients)
    print "Intercept:" + str(model.intercept)
    print "R^2:" + str(model.summary.r2)
    model.summary.residuals.show()
    
        
    Cuando finaliza el trabajo, el resultado de la regresión lineal (resumen del modelo) aparece en la ventana de la terminal.
    <<< # Print the model summary.
    ... print "Coefficients:" + str(model.coefficients)
    Coefficients:[0.0166657454602,-0.00296751984046,0.235714392936,0.00213002070133,-0.00048577251587]
    <<< print "Intercept:" + str(model.intercept)
    Intercept:-2.26130330748
    <<< print "R^2:" + str(model.summary.r2)
    R^2:0.295200579035
    <<< model.summary.residuals.show()
    +--------------------+
    |           residuals|
    +--------------------+
    | -0.7234737533344147|
    |  -0.985466980630501|
    | -0.6669710598385468|
    |  1.4162434829714794|
    |-0.09373154375186754|
    |-0.15461747949235072|
    | 0.32659061654192545|
    |  1.5053877697929803|
    |  -0.640142797263989|
    |   1.229530260294963|
    |-0.03776160295256...|
    | -0.5160734239126814|
    | -1.5165972740062887|
    |  1.3269085258245008|
    |  1.7604670124710626|
    |  1.2348130901905972|
    |   2.318660276655887|
    |  1.0936947030883175|
    |  1.0169768511417363|
    | -1.7744915698181583|
    +--------------------+
    only showing top 20 rows.
    

Limpiar

Después de que terminaste el instructivo de Uso de Cloud Dataproc, BigQuery y Spark ML para el aprendizaje automático, puedes limpiar los recursos que creaste en Google Cloud Platform para que no se te facture por ellos en el futuro. La siguiente sección describe cómo borrar o desactivar estos recursos.

Cómo borrar el proyecto

La manera más fácil de eliminar la facturación es borrar el proyecto que creaste para el instructivo.

Para borrar el proyecto, haz lo siguiente:

  1. En la GCP Console, dirígete a la página Proyectos.

    Ir a la página Proyectos

  2. En la lista de proyectos, selecciona el proyecto que deseas borrar y haz clic en Borrar.
  3. En el cuadro de diálogo, escribe el ID del proyecto y, luego, haz clic en Cerrar para borrar el proyecto.

Cómo borrar el clúster de Cloud Dataproc

Consulta Borra un clúster.

Cómo borrar datos de Cloud Storage

Para limpiar los archivos de datos de entrada de BigQuery temporales del depósito de Cloud Storage de tu proyecto:

  1. Ejecuta los siguientes comandos para obtener el nombre del depósito en tu proyecto que se usó para almacenar los archivos de datos temporarios (este depósito comienza con los caracteres "dataproc-"). Inserta el nombre de tu clúster, como se muestra a continuación.
    CLUSTER_NAME=<my cluster>
    TEMP_DATA_BUCKET="$(gcloud dataproc clusters describe $CLUSTER_NAME| \
    grep configBucket|awk '{print $2}')"
    echo $TEMP_DATA_BUCKET
    
  2. Ve a la página Cloud Dataproc→Navegador de tu proyecto y luego haz clic en el nombre TEMP_DATA_BUCKET. Dentro de la carpeta tmp/, verás una carpeta que comienza con el nombre "natality-" (los caracteres restantes representan a fecha y la hora que se creó la carpeta). Esta es una carpeta de datos de entrada temporaria para quitar. Selecciona la casilla de verificación a la izquierda del nombre de la carpeta natality- y luego selecciona Quitar en la parte superior de la página para quitar la carpeta de Cloud Storage.
¿Te ha resultado útil esta página? Enviar comentarios:

Enviar comentarios sobre...

Documentación de Cloud Dataproc
Si necesitas ayuda, visita nuestra página de asistencia.