Utiliser Cloud Dataproc, BigQuery et Apache Spark ML pour le machine learning

Le connecteur Google BigQuery pour Apache Spark permet aux data scientists d'associer la puissance du moteur SQL évolutif de BigQuery aux capacités de machine learning d'Apache Spark. Dans ce tutoriel, vous apprendrez à utiliser Cloud Dataproc, BigQuery et Apache Spark ML pour effectuer des opérations de machine learning sur un ensemble de données.

Objectifs

Utilisez la régression linéaire pour créer un modèle de poids de naissance en fonction des cinq facteurs suivants :

  1. Semaines de gestation
  2. Âge de la mère
  3. Âge du père
  4. Prise de poids de la mère pendant la grossesse
  5. Score d'Apgar

BigQuery permet de préparer la table d'entrée de régression linéaire, qui est écrite dans votre projet Google Cloud Platform. Python est utilisé pour interroger et gérer les données dans BigQuery. La table de régression linéaire obtenue est accessible dans Apache Spark. Spark ML permet de créer et d'évaluer le modèle. Une tâche Dataproc PySpark est utilisée pour appeler des fonctions Spark ML.

Coûts

Ce tutoriel fait appel à des composants payants de Google Cloud Platform, par exemple :

  • Google Compute Engine
  • Google Cloud Dataproc
  • Google BigQuery

Utilisez le Simulateur de coût pour générer une estimation des coûts en fonction de votre utilisation prévue. Les nouveaux utilisateurs de Cloud Platform peuvent bénéficier d'un essai gratuit.

Avant de commencer

Les composants Spark, y compris Spark ML, sont installés sur un cluster Cloud Dataproc. Pour configurer un cluster Cloud Dataproc et exécuter le code dans cet exemple, vous devez effectuer (ou faire en sorte que s'effectuent) les opérations suivantes :

  1. Connectez-vous à votre compte Google.

    Si vous n'en possédez pas déjà un, vous devez en créer un.

  2. Dans Cloud Console, sur la page de sélection du projet, sélectionnez ou créez un projet Cloud.

    Accéder à la page de sélection du projet

  3. Activer les API Google Cloud Dataproc, BigQuery, Google Compute Engine.

    Activer les API

  4. Installez et initialisez le SDK Cloud.
  5. Créez un cluster Cloud Dataproc dans le projet. Le cluster doit exécuter une version Dataproc avec Spark 2.0 ou une version ultérieure (y compris les bibliothèques de machine learning).

Créer un sous-ensemble de données natality BigQuery

Dans cette section, vous créez un ensemble de données dans votre projet, puis une table dans cet ensemble de données. Copiez un sous-ensemble de données dans la table regroupant des données de taux de natalité, issues de l'ensemble de données publiques natality BigQuery. Dans la suite de ce tutoriel, vous utiliserez les données du sous-ensemble de cette table pour prédire le poids de naissance en fonction de l'âge de la mère, de l'âge du père et du nombre de semaines de gestation.

Vous pouvez créer le sous-ensemble de données à l'aide de Google Cloud Console ou en exécutant un script Python sur votre machine locale.

Console

  1. Créez un ensemble de données dans votre projet.

    1. Accédez à l'interface utilisateur Web de BigQuery.
    2. Dans le volet de navigation de gauche, cliquez sur le nom du projet, puis sur CRÉER UN ENSEMBLE DE DONNÉES.
    3. Dans la boîte de dialogue Create Dataset (Créer un ensemble de données) :
      1. Dans le champ Dataset ID (ID de l'ensemble de données), saisissez "natality_regression".
      2. Dans le champ Emplacement des données, sélectionnez un emplacement pour l'ensemble de données. La valeur par défaut est US multi-region. Une fois l'ensemble de données créé, l'emplacement ne peut plus être modifié.
      3. Pour indiquer le délai d'expiration par défaut de la table, choisissez l'une des options suivantes :
        • Jamais (par défaut) : vous devez supprimer la table manuellement.
        • Nombre de jours : la table est supprimée une fois le nombre de jours spécifié écoulé, à compter de sa date de création.
      4. Dans le champ Chiffrement, choisissez l'une des options suivantes :
      5. Cliquez sur Créer un ensemble de données.
  2. Exécutez une requête sur l'ensemble de données public "natality", puis enregistrez les résultats de la requête dans une nouvelle table de l'ensemble de données.

    1. Copiez et collez la requête suivante dans l'éditeur de requête, puis cliquez sur "Exécuter".
      SELECT
      weight_pounds,
      mother_age,
      father_age,
      gestation_weeks,
      weight_gain_pounds,
      apgar_5min
      FROM
      `bigquery-public-data.samples.natality`
      WHERE
      weight_pounds IS NOT NULL
      AND mother_age IS NOT NULL
      AND father_age IS NOT NULL
      AND gestation_weeks IS NOT NULL
      AND weight_gain_pounds IS NOT NULL
      AND apgar_5min IS NOT NULL
      
    2. Une fois la requête terminée (au bout d'une minute environ), cliquez sur ENREGISTRER LES RÉSULTATS, puis sélectionnez des options permettant d'enregistrer les résultats sous la forme d'une table BigQuery "regression_input" dans l'ensemble de données natality_regression de votre projet.

Python

  1. Consultez la page Configurer un environnement de développement Python pour obtenir des instructions sur l'installation de Python et de la bibliothèque cliente Google Cloud pour Python (nécessaire à l'exécution du code). Il est recommandé d'installer et d'utiliser un environnement virtualenv Python.

  2. Copiez et collez le code natality_tutorial.py ci-dessous dans une interface système python sur votre machine locale. Appuyez sur la touche <return> dans l'interface système pour exécuter le code permettant de créer un ensemble de données BigQuery "natality_regression" dans votre projet Google Cloud par défaut avec une table "regression_input", laquelle est remplie avec un sous-ensemble de données natality publiques.

    """Create a Google BigQuery linear regression input table.
    
    In the code below, the following actions are taken:
    * A new dataset is created "natality_regression."
    * A query is run against the public dataset,
        bigquery-public-data.samples.natality, selecting only the data of
        interest to the regression, the output of which is stored in a new
        "regression_input" table.
    * The output table is moved over the wire to the user's default project via
        the built-in BigQuery Connector for Spark that bridges BigQuery and
        Cloud Dataproc.
    """
    
    from google.cloud import bigquery
    
    # Create a new Google BigQuery client using Google Cloud Platform project
    # defaults.
    client = bigquery.Client()
    
    # Prepare a reference to a new dataset for storing the query results.
    dataset_id = 'natality_regression'
    
    dataset = bigquery.Dataset(client.dataset(dataset_id))
    
    # Create the new BigQuery dataset.
    dataset = client.create_dataset(dataset)
    
    # In the new BigQuery dataset, create a reference to a new table for
    # storing the query results.
    table_ref = dataset.table('regression_input')
    
    # Configure the query job.
    job_config = bigquery.QueryJobConfig()
    
    # Set the destination table to the table reference created above.
    job_config.destination = table_ref
    
    # Set up a query in Standard SQL, which is the default for the BigQuery
    # Python client library.
    # The query selects the fields of interest.
    query = """
        SELECT
            weight_pounds, mother_age, father_age, gestation_weeks,
            weight_gain_pounds, apgar_5min
        FROM
            `bigquery-public-data.samples.natality`
        WHERE
            weight_pounds IS NOT NULL
            AND mother_age IS NOT NULL
            AND father_age IS NOT NULL
            AND gestation_weeks IS NOT NULL
            AND weight_gain_pounds IS NOT NULL
            AND apgar_5min IS NOT NULL
    """
    
    # Run the query.
    query_job = client.query(query, job_config=job_config)
    query_job.result()  # Waits for the query to finish
  3. Confirmez la création de l'ensemble de données natality_regression et de la table regression_input.

Exécuter une régression linéaire

Dans cette section, vous allez exécuter une régression linéaire PySpark en envoyant la tâche au service Dataproc à l'aide de Google Cloud Console ou en exécutant la commande gcloud à partir d'un terminal local.

Console

  1. Copiez et collez le code suivant dans un nouveau fichier natality_sparkml.py sur votre machine locale.

    """Run a linear regression using Apache Spark ML.
    
    In the following PySpark (Spark Python API) code, we take the following actions:
    
      * Load a previously created linear regression (BigQuery) input table
        into our Cloud Dataproc Spark cluster as an RDD (Resilient
        Distributed Dataset)
      * Transform the RDD into a Spark Dataframe
      * Vectorize the features on which the model will be trained
      * Compute a linear regression using Spark ML
    
    """
    
    from __future__ import print_function
    from pyspark.context import SparkContext
    from pyspark.ml.linalg import Vectors
    from pyspark.ml.regression import LinearRegression
    from pyspark.sql.session import SparkSession
    # The imports, above, allow us to access SparkML features specific to linear
    # regression as well as the Vectors types.
    
    # Define a function that collects the features of interest
    # (mother_age, father_age, and gestation_weeks) into a vector.
    # Package the vector in a tuple containing the label (`weight_pounds`) for that
    # row.
    def vector_from_inputs(r):
      return (r["weight_pounds"], Vectors.dense(float(r["mother_age"]),
                                                float(r["father_age"]),
                                                float(r["gestation_weeks"]),
                                                float(r["weight_gain_pounds"]),
                                                float(r["apgar_5min"])))
    
    sc = SparkContext()
    spark = SparkSession(sc)
    
    # Read the data from BigQuery as a Spark Dataframe.
    natality_data = spark.read.format("bigquery").option(
        "table", "natality_regression.regression_input").load()
    # Create a view so that Spark SQL queries can be run against the data.
    natality_data.createOrReplaceTempView("natality")
    
    # As a precaution, run a query in Spark SQL to ensure no NULL values exist.
    sql_query = """
    SELECT *
    from natality
    where weight_pounds is not null
    and mother_age is not null
    and father_age is not null
    and gestation_weeks is not null
    """
    clean_data = spark.sql(sql_query)
    
    # Create an input DataFrame for Spark ML using the above function.
    training_data = clean_data.rdd.map(vector_from_inputs).toDF(["label",
                                                                 "features"])
    training_data.cache()
    
    # Construct a new LinearRegression object and fit the training data.
    lr = LinearRegression(maxIter=5, regParam=0.2, solver="normal")
    model = lr.fit(training_data)
    # Print the model summary.
    print("Coefficients:" + str(model.coefficients))
    print("Intercept:" + str(model.intercept))
    print("R^2:" + str(model.summary.r2))
    model.summary.residuals.show()
    
    

  2. Copiez le fichier natality_sparkml.py local dans un bucket Cloud Storage de votre projet.

    gsutil cp natality_sparkml.py gs://bucket-name
    

  3. Exécutez la régression depuis la page Dataproc Envoyer une tâche.

    1. Dans le champ Fichier Python principal, insérez l'URI gs:// du bucket Cloud Storage dans lequel se trouve votre copie du fichier natality_sparkml.py.

    2. Sélectionnez PySpark comme type de tâche.

    3. Insérez gs://spark-lib/bigquery/spark-bigquery-latest.jar dans le champ Fichiers JAR. Cela met ainsi le connecteur spark-bigquery à la disposition de l'application PySpark au moment de l'exécution, de sorte qu'elle puisse lire les données BigQuery dans un DataFrame Spark.

    4. Renseignez les champs ID de tâche, Région et Cluster.

    5. Cliquez sur Envoyer pour exécuter la tâche sur votre cluster.

Une fois la tâche terminée, le résumé du modèle de résultats de la régression linéaire s'affiche dans la fenêtre "Informations sur la tâche" de Dataproc.

Commande gcloud

  1. Copiez et collez le code suivant dans un nouveau fichier natality_sparkml.py sur votre machine locale.

    """Run a linear regression using Apache Spark ML.
    
    In the following PySpark (Spark Python API) code, we take the following actions:
    
      * Load a previously created linear regression (BigQuery) input table
        into our Cloud Dataproc Spark cluster as an RDD (Resilient
        Distributed Dataset)
      * Transform the RDD into a Spark Dataframe
      * Vectorize the features on which the model will be trained
      * Compute a linear regression using Spark ML
    
    """
    
    from __future__ import print_function
    from pyspark.context import SparkContext
    from pyspark.ml.linalg import Vectors
    from pyspark.ml.regression import LinearRegression
    from pyspark.sql.session import SparkSession
    # The imports, above, allow us to access SparkML features specific to linear
    # regression as well as the Vectors types.
    
    # Define a function that collects the features of interest
    # (mother_age, father_age, and gestation_weeks) into a vector.
    # Package the vector in a tuple containing the label (`weight_pounds`) for that
    # row.
    def vector_from_inputs(r):
      return (r["weight_pounds"], Vectors.dense(float(r["mother_age"]),
                                                float(r["father_age"]),
                                                float(r["gestation_weeks"]),
                                                float(r["weight_gain_pounds"]),
                                                float(r["apgar_5min"])))
    
    sc = SparkContext()
    spark = SparkSession(sc)
    
    # Read the data from BigQuery as a Spark Dataframe.
    natality_data = spark.read.format("bigquery").option(
        "table", "natality_regression.regression_input").load()
    # Create a view so that Spark SQL queries can be run against the data.
    natality_data.createOrReplaceTempView("natality")
    
    # As a precaution, run a query in Spark SQL to ensure no NULL values exist.
    sql_query = """
    SELECT *
    from natality
    where weight_pounds is not null
    and mother_age is not null
    and father_age is not null
    and gestation_weeks is not null
    """
    clean_data = spark.sql(sql_query)
    
    # Create an input DataFrame for Spark ML using the above function.
    training_data = clean_data.rdd.map(vector_from_inputs).toDF(["label",
                                                                 "features"])
    training_data.cache()
    
    # Construct a new LinearRegression object and fit the training data.
    lr = LinearRegression(maxIter=5, regParam=0.2, solver="normal")
    model = lr.fit(training_data)
    # Print the model summary.
    print("Coefficients:" + str(model.coefficients))
    print("Intercept:" + str(model.intercept))
    print("R^2:" + str(model.summary.r2))
    model.summary.residuals.show()
    
    

  2. Copiez le fichier natality_sparkml.py local dans un bucket Cloud Storage de votre projet.

    gsutil cp natality_sparkml.py gs://bucket-name
    

  3. Envoyez la tâche Pyspark au service Dataproc en exécutant la commande gcloud, illustrée ci-dessous, à partir d'une fenêtre de terminal sur votre machine locale.

    1. Pour la valeur de l'option --files, insérez le nom du bucket Cloud Storage dans lequel se trouve votre copie du fichier natality_sparkml.py.
    2. La valeur de l'option --jars met le connecteur spark-bigquery à la disposition de la tâche PySpark au moment de l'exécution, de sorte qu'elle puisse lire les données BigQuery dans un DataFrame Spark.
      gcloud dataproc jobs submit pyspark \  
          --cluster=cluster-name \  
          --region=cluster-region \  
          --files=gs://your-bucket/natality_sparkml.py \  
          --jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar
      

Les résultats de la régression linéaire (résumé du modèle) s'affichent dans la fenêtre du terminal une fois la tâche terminée.

<<< # Print the model summary.
... print "Coefficients:" + str(model.coefficients)
Coefficients:[0.0166657454602,-0.00296751984046,0.235714392936,0.00213002070133,-0.00048577251587]
<<< print "Intercept:" + str(model.intercept)
Intercept:-2.26130330748
<<< print "R^2:" + str(model.summary.r2)
R^2:0.295200579035
<<< model.summary.residuals.show()
+--------------------+
|           residuals|
+--------------------+
| -0.7234737533344147|
|  -0.985466980630501|
| -0.6669710598385468|
|  1.4162434829714794|
|-0.09373154375186754|
|-0.15461747949235072|
| 0.32659061654192545|
|  1.5053877697929803|
|  -0.640142797263989|
|   1.229530260294963|
|-0.03776160295256...|
| -0.5160734239126814|
| -1.5165972740062887|
|  1.3269085258245008|
|  1.7604670124710626|
|  1.2348130901905972|
|   2.318660276655887|
|  1.0936947030883175|
|  1.0169768511417363|
| -1.7744915698181583|
+--------------------+
only showing top 20 rows.

Nettoyer

Une fois que vous avez terminé le tutoriel Utiliser Cloud Dataproc, BigQuery et Spark ML pour le machine learning, vous pouvez nettoyer les ressources que vous avez créées sur Google Cloud afin qu'elles ne prennent pas de quota et qu'elles ne vous soient pas facturées à l'avenir. Dans les sections suivantes, nous allons voir comment supprimer ou désactiver ces ressources.

Supprimer le projet

Le moyen le plus simple d'empêcher la facturation est de supprimer le projet que vous avez créé pour ce tutoriel.

Pour supprimer le projet :

  1. Dans Cloud Console, accédez à la page Gérer les ressources.

    Accéder à la page Gérer les ressources

  2. Dans la liste des projets, sélectionnez le projet que vous souhaitez supprimer, puis cliquez sur Supprimer .
  3. Dans la boîte de dialogue, saisissez l'ID du projet, puis cliquez sur Arrêter pour supprimer le projet.

Supprimer le cluster Cloud Dataproc

Consultez la section Supprimer un cluster.