Utilizzo di Dataproc, BigQuery e Apache Spark ML per il machine learning


Il connettore BigQuery per Apache Spark consente ai data scientist di combinare la potenza del motore SQL BigQuery, scalabile senza problemi, con le funzionalità di machine learning di Apache Spark. In questo tutorial viene mostrato come utilizzare Dataproc, BigQuery e Apache Spark ML per eseguire il machine learning su un set di dati.

Obiettivi

Utilizza la regressione lineare per creare un modello del peso alla nascita in funzione di cinque fattori:

  • settimane di gestazione
  • età della madre
  • età del padre
  • Aumento di peso della madre durante la gravidanza
  • Punteggio di Apgar

Utilizza i seguenti strumenti:

  • BigQuery, per preparare la tabella di input della regressione lineare, che viene scritta nel progetto Google Cloud
  • Python, per eseguire query e gestire i dati in BigQuery
  • Apache Spark, per accedere alla tabella di regressione lineare risultante
  • Spark ML, per creare e valutare il modello
  • Job PySpark Dataproc per richiamare le funzioni Spark ML

Costi

In questo documento utilizzi i seguenti componenti fatturabili di Google Cloud:

  • Compute Engine
  • Dataproc
  • BigQuery

Per generare una stima dei costi basata sull'utilizzo previsto, utilizza il Calcolatore prezzi. I nuovi utenti di Google Cloud potrebbero essere idonei per una prova gratuita.

Prima di iniziare

In un cluster Dataproc sono installati i componenti Spark, tra cui Spark ML. Per configurare un cluster Dataproc ed eseguire il codice in questo esempio, devi svolgere (o aver svolto) i seguenti passaggi:

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Enable the Dataproc, BigQuery, Compute Engine APIs.

    Enable the APIs

  4. Install the Google Cloud CLI.
  5. To initialize the gcloud CLI, run the following command:

    gcloud init
  6. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  7. Enable the Dataproc, BigQuery, Compute Engine APIs.

    Enable the APIs

  8. Install the Google Cloud CLI.
  9. To initialize the gcloud CLI, run the following command:

    gcloud init
  10. Crea un cluster Dataproc nel tuo progetto. Nel cluster deve essere in esecuzione una versione di Dataproc con Spark 2.0 o versioni successive (incluse le librerie di machine learning).

Crea un sottoinsieme di dati natality BigQuery

In questa sezione crei un set di dati nel tuo progetto, quindi una tabella nel set di dati in cui copiare un sottoinsieme di dati sul tasso di natalità dal set di dati BigQuery natality disponibile pubblicamente. Più avanti in questo tutorial utilizzerai i dati del sottoinsieme di questa tabella per prevedere il peso alla nascita in funzione dell'età materna, dell'età paterna e delle settimane di gestazione.

Puoi creare il sottoinsieme di dati utilizzando la console Google Cloud o eseguendo uno script Python sulla tua macchina locale.

Console

  1. Crea un set di dati nel tuo progetto.

    1. Vai all'interfaccia utente web di BigQuery.
    2. Nel pannello di navigazione a sinistra, fai clic sul nome del progetto, quindi su CREA SET DI DATI.
    3. Nella finestra di dialogo Crea set di dati:
      1. In ID set di dati, inserisci "natality_regression".
      2. Per Località dei dati, puoi scegliere una località per il set di dati. La posizione del valore predefinito è US multi-region. Una volta creato un set di dati, la posizione non può essere modificata.
      3. Per Scadenza tabella predefinita, scegli una delle seguenti opzioni:
        • Mai (valore predefinito): devi eliminare la tabella manualmente.
        • Numero di giorni: la tabella verrà eliminata dopo il numero di giorni specificato dalla data di creazione.
      4. In Crittografia, scegli una delle seguenti opzioni:
      5. Fai clic su Crea set di dati.
  2. Esegui una query sul set di dati pubblico sulla natalità, quindi salva i risultati della query in una nuova tabella del set di dati.

    1. Copia e incolla la seguente query nell'Editor query, quindi fai clic su Esegui.
      CREATE OR REPLACE TABLE natality_regression.regression_input as
      SELECT
      weight_pounds,
      mother_age,
      father_age,
      gestation_weeks,
      weight_gain_pounds,
      apgar_5min
      FROM
      `bigquery-public-data.samples.natality`
      WHERE
      weight_pounds IS NOT NULL
      AND mother_age IS NOT NULL
      AND father_age IS NOT NULL
      AND gestation_weeks IS NOT NULL
      AND weight_gain_pounds IS NOT NULL
      AND apgar_5min IS NOT NULL
      
    2. Al termine della query (in circa un minuto), i risultati vengono salvati come tabella BigQuery "regression_input" nel set di dati natality_regression del progetto.

Python

Prima di provare questo esempio, segui le istruzioni di configurazione di Python riportate nella guida rapida all'utilizzo delle librerie client di Dataproc. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Dataproc Python.

Per autenticarti a Dataproc, configura le Credenziali predefinite dell'applicazione. Per ulteriori informazioni, consulta Configurare l'autenticazione per un ambiente di sviluppo locale.

  1. Consulta la sezione Configurazione di un ambiente di sviluppo Python per istruzioni su come installare Python e la libreria client Google Cloud per Python (necessaria per eseguire il codice). Ti consigliamo di installare e utilizzare un interprete Pythonvirtualenv.

  2. Copia e incolla il codice natality_tutorial.py riportato di seguito in una shell python sulla tua macchina locale. Premi il tasto <return> nella shell per eseguire il codice per creare un set di dati BigQuery "natality_regression" nel tuo progetto Google Cloud predefinito con una tabella "regression_input" compilata con un sottoinsieme dei dati pubblici natality.

    """Create a Google BigQuery linear regression input table.
    
    In the code below, the following actions are taken:
    * A new dataset is created "natality_regression."
    * A query is run against the public dataset,
        bigquery-public-data.samples.natality, selecting only the data of
        interest to the regression, the output of which is stored in a new
        "regression_input" table.
    * The output table is moved over the wire to the user's default project via
        the built-in BigQuery Connector for Spark that bridges BigQuery and
        Cloud Dataproc.
    """
    
    from google.cloud import bigquery
    
    # Create a new Google BigQuery client using Google Cloud Platform project
    # defaults.
    client = bigquery.Client()
    
    # Prepare a reference to a new dataset for storing the query results.
    dataset_id = "natality_regression"
    dataset_id_full = f"{client.project}.{dataset_id}"
    
    dataset = bigquery.Dataset(dataset_id_full)
    
    # Create the new BigQuery dataset.
    dataset = client.create_dataset(dataset)
    
    # Configure the query job.
    job_config = bigquery.QueryJobConfig()
    
    # Set the destination table to where you want to store query results.
    # As of google-cloud-bigquery 1.11.0, a fully qualified table ID can be
    # used in place of a TableReference.
    job_config.destination = f"{dataset_id_full}.regression_input"
    
    # Set up a query in Standard SQL, which is the default for the BigQuery
    # Python client library.
    # The query selects the fields of interest.
    query = """
        SELECT
            weight_pounds, mother_age, father_age, gestation_weeks,
            weight_gain_pounds, apgar_5min
        FROM
            `bigquery-public-data.samples.natality`
        WHERE
            weight_pounds IS NOT NULL
            AND mother_age IS NOT NULL
            AND father_age IS NOT NULL
            AND gestation_weeks IS NOT NULL
            AND weight_gain_pounds IS NOT NULL
            AND apgar_5min IS NOT NULL
    """
    
    # Run the query.
    client.query_and_wait(query, job_config=job_config)  # Waits for the query to finish
  3. Conferma la creazione del set di dati natality_regression e della tabella regression_input.

Eseguire una regressione lineare

In questa sezione eseguirai una regressione lineare PySpark inviando il job al servizio Dataproc utilizzando la console Google Cloud o eseguendo il comando gcloud da un terminale locale.

Console

  1. Copia e incolla il seguente codice in un nuovo natality_sparkml.py file sulla tua macchina locale.

    """Run a linear regression using Apache Spark ML.
    
    In the following PySpark (Spark Python API) code, we take the following actions:
    
      * Load a previously created linear regression (BigQuery) input table
        into our Cloud Dataproc Spark cluster as an RDD (Resilient
        Distributed Dataset)
      * Transform the RDD into a Spark Dataframe
      * Vectorize the features on which the model will be trained
      * Compute a linear regression using Spark ML
    
    """
    from pyspark.context import SparkContext
    from pyspark.ml.linalg import Vectors
    from pyspark.ml.regression import LinearRegression
    from pyspark.sql.session import SparkSession
    # The imports, above, allow us to access SparkML features specific to linear
    # regression as well as the Vectors types.
    
    
    # Define a function that collects the features of interest
    # (mother_age, father_age, and gestation_weeks) into a vector.
    # Package the vector in a tuple containing the label (`weight_pounds`) for that
    # row.
    def vector_from_inputs(r):
      return (r["weight_pounds"], Vectors.dense(float(r["mother_age"]),
                                                float(r["father_age"]),
                                                float(r["gestation_weeks"]),
                                                float(r["weight_gain_pounds"]),
                                                float(r["apgar_5min"])))
    
    sc = SparkContext()
    spark = SparkSession(sc)
    
    # Read the data from BigQuery as a Spark Dataframe.
    natality_data = spark.read.format("bigquery").option(
        "table", "natality_regression.regression_input").load()
    # Create a view so that Spark SQL queries can be run against the data.
    natality_data.createOrReplaceTempView("natality")
    
    # As a precaution, run a query in Spark SQL to ensure no NULL values exist.
    sql_query = """
    SELECT *
    from natality
    where weight_pounds is not null
    and mother_age is not null
    and father_age is not null
    and gestation_weeks is not null
    """
    clean_data = spark.sql(sql_query)
    
    # Create an input DataFrame for Spark ML using the above function.
    training_data = clean_data.rdd.map(vector_from_inputs).toDF(["label",
                                                                 "features"])
    training_data.cache()
    
    # Construct a new LinearRegression object and fit the training data.
    lr = LinearRegression(maxIter=5, regParam=0.2, solver="normal")
    model = lr.fit(training_data)
    # Print the model summary.
    print("Coefficients:" + str(model.coefficients))
    print("Intercept:" + str(model.intercept))
    print("R^2:" + str(model.summary.r2))
    model.summary.residuals.show()

  2. Copia il file natality_sparkml.py locale in un bucket Cloud Storage nel tuo progetto.

    gcloud storage cp natality_sparkml.py gs://bucket-name
    

  3. Esegui la regressione dalla pagina Dataproc Invia un job.

    1. Nel campo File Python principale, inserisci l'URI gs:// del bucket Cloud Storage in cui si trova la copia del file natality_sparkml.py.

    2. Seleziona PySpark come Tipo di job.

    3. Inserisci gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar nel campo File jar. In questo modo, il connettore spark-bigquery è disponibile per l'applicazione PySpark in fase di esecuzione per consentirle di leggere i dati BigQuery in un DataFrame Spark.

    4. Compila i campi Job ID, Regione e Cluster.

    5. Fai clic su Invia per eseguire il job sul cluster.

Al termine del job, il riepilogo del modello di output della regressione lineare viene visualizzato nella finestra dei dettagli del job Dataproc.

gcloud

  1. Copia e incolla il seguente codice in un nuovo natality_sparkml.py file sulla tua macchina locale.

    """Run a linear regression using Apache Spark ML.
    
    In the following PySpark (Spark Python API) code, we take the following actions:
    
      * Load a previously created linear regression (BigQuery) input table
        into our Cloud Dataproc Spark cluster as an RDD (Resilient
        Distributed Dataset)
      * Transform the RDD into a Spark Dataframe
      * Vectorize the features on which the model will be trained
      * Compute a linear regression using Spark ML
    
    """
    from pyspark.context import SparkContext
    from pyspark.ml.linalg import Vectors
    from pyspark.ml.regression import LinearRegression
    from pyspark.sql.session import SparkSession
    # The imports, above, allow us to access SparkML features specific to linear
    # regression as well as the Vectors types.
    
    
    # Define a function that collects the features of interest
    # (mother_age, father_age, and gestation_weeks) into a vector.
    # Package the vector in a tuple containing the label (`weight_pounds`) for that
    # row.
    def vector_from_inputs(r):
      return (r["weight_pounds"], Vectors.dense(float(r["mother_age"]),
                                                float(r["father_age"]),
                                                float(r["gestation_weeks"]),
                                                float(r["weight_gain_pounds"]),
                                                float(r["apgar_5min"])))
    
    sc = SparkContext()
    spark = SparkSession(sc)
    
    # Read the data from BigQuery as a Spark Dataframe.
    natality_data = spark.read.format("bigquery").option(
        "table", "natality_regression.regression_input").load()
    # Create a view so that Spark SQL queries can be run against the data.
    natality_data.createOrReplaceTempView("natality")
    
    # As a precaution, run a query in Spark SQL to ensure no NULL values exist.
    sql_query = """
    SELECT *
    from natality
    where weight_pounds is not null
    and mother_age is not null
    and father_age is not null
    and gestation_weeks is not null
    """
    clean_data = spark.sql(sql_query)
    
    # Create an input DataFrame for Spark ML using the above function.
    training_data = clean_data.rdd.map(vector_from_inputs).toDF(["label",
                                                                 "features"])
    training_data.cache()
    
    # Construct a new LinearRegression object and fit the training data.
    lr = LinearRegression(maxIter=5, regParam=0.2, solver="normal")
    model = lr.fit(training_data)
    # Print the model summary.
    print("Coefficients:" + str(model.coefficients))
    print("Intercept:" + str(model.intercept))
    print("R^2:" + str(model.summary.r2))
    model.summary.residuals.show()

  2. Copia il file natality_sparkml.py locale in un bucket Cloud Storage nel tuo progetto.

    gcloud storage cp natality_sparkml.py gs://bucket-name
    

  3. Invia il job Pyspark al servizio Dataproc eseguendo il comandogcloud, mostrato di seguito, da una finestra del terminale sulla tua macchina locale.

    1. Il valore del flag --jars rende disponibile il connettore spark-bigquery per il job PySpark in fase di esecuzione per consentirgli di leggere i dati di BigQuery in un DataFrame Spark.
      gcloud dataproc jobs submit pyspark \
          gs://your-bucket/natality_sparkml.py \
          --cluster=cluster-name \
          --region=region \
          --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_SCALA_VERSION-CONNECTOR_VERSION.jar
      

L'output della regressione lineare (riepilogo del modello) viene visualizzato nella finestra del terminale al termine del job.

<<< # Stampa il riepilogo del modello.
... print "Coefficients:" + str(model.coefficients)
Coefficients:[0.0166657454602,-0.00296751984046,0.235714392936,0.00213002070133,-0.00048577251587]
<<< print "Intercept:" + str(model.intercept)
Intercept:-2.26130330748
<<< print "R^2:" + str(model.summary.r2)
R^2:0.295200579035
<<< model.summary.residuals.show()
+--------------------+
|           residuals|
+--------------------+
| -0.7234737533344147|
|  -0.985466980630501|
| -0.6669710598385468|
|  1.4162434829714794|
|-0.09373154375186754|
|-0.15461747949235072|
| 0.32659061654192545|
|  1.5053877697929803|
|  -0.640142797263989|
|   1.229530260294963|
|-0.03776160295256...|
| -0.5160734239126814|
| -1.5165972740062887|
|  1.3269085258245008|
|  1.7604670124710626|
|  1.2348130901905972|
|   2.318660276655887|
|  1.0936947030883175|
|  1.0169768511417363|
| -1.7744915698181583|
+--------------------+
vengono mostrate solo le prime 20 righe.

  

Esegui la pulizia

Al termine del tutorial, puoi eliminare le risorse che hai creato in modo che smettano di utilizzare la quota e di generare addebiti. Le sezioni seguenti descrivono come eliminare o disattivare queste risorse.

Elimina il progetto

Il modo più semplice per eliminare la fatturazione è eliminare il progetto che hai creato per il tutorial.

Per eliminare il progetto:

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

Eliminazione del cluster Dataproc

Vedi Eliminare un cluster.

Passaggi successivi