Dataproc, BigQuery und Apache Spark ML für maschinelles Lernen verwenden

Mit dem BigQuery-Connector für Apache Spark können Data Scientists das Leistungspotenzial der nahtlos skalierbaren SQL-Engine von BigQuery mit den Funktionen für maschinelles Lernen von Apache Spark kombinieren. In dieser Anleitung wird gezeigt, wie Sie Dataproc, BigQuery und Apache Spark ML verwenden, um maschinelles Lernen mit einem Dataset durchzuführen.

Lernziele

Verwenden Sie die lineare Regression, um ein Modell zum Geburtsgewicht als Funktion mit fünf Faktoren zu erstellen:

  1. Schwangerschaftswochen
  2. Alter der Mutter
  3. Alter des Vaters
  4. Zunahme des Gewichts der Mutter während der Schwangerschaft
  5. Apgar-Score

Mit BigQuery wird die Tabelle der linearen Regression vorbereitet, die in Ihr Google Cloud Platform-Projekt geschrieben wird. Python wird verwendet, um Daten in BigQuery abzufragen und zu verwalten. Der Zugriff auf die resultierende Tabelle der linearen Regression erfolgt über Apache Spark. Das Erstellen und Bewerten des Modells erfolgt mithilfe von Spark ML. Zum Aufrufen von Spark ML-Funktionen wird ein Dataproc PySpark-Job verwendet.

Kosten

In dieser Anleitung werden die folgenden kostenpflichtigen Komponenten von Google Cloud verwendet:

  • Compute Engine
  • Dataproc
  • BigQuery

Mit dem Preisrechner können Sie eine Kostenschätzung für Ihre voraussichtliche Nutzung vornehmen. Neuen Google Cloud-Nutzern steht möglicherweise eine kostenlose Testversion zur Verfügung.

Hinweis

In Dataproc-Clustern sind die Spark-Komponenten einschließlich Spark ML installiert. Wenn Sie einen Dataproc-Cluster einrichten und den Code in diesem Beispiel auszuführen, müssen Sie folgende Aufgaben ausführen bzw. ausgeführt haben:

  1. Melden Sie sich bei Ihrem Google Cloud-Konto an. Wenn Sie mit Google Cloud noch nicht vertraut sind, erstellen Sie ein Konto, um die Leistungsfähigkeit unserer Produkte in der Praxis sehen und bewerten zu können. Neukunden erhalten außerdem ein Guthaben von 300 $, um Arbeitslasten auszuführen, zu testen und bereitzustellen.
  2. Wählen Sie in der Google Cloud Console auf der Seite der Projektauswahl ein Google Cloud-Projekt aus oder erstellen Sie eines.

    Zur Projektauswahl

  3. Dataproc, BigQuery, Compute Engine APIs aktivieren.

    Aktivieren Sie die APIs

  4. Installieren und initialisieren Sie Google Cloud CLI.
  5. Wählen Sie in der Google Cloud Console auf der Seite der Projektauswahl ein Google Cloud-Projekt aus oder erstellen Sie eines.

    Zur Projektauswahl

  6. Dataproc, BigQuery, Compute Engine APIs aktivieren.

    Aktivieren Sie die APIs

  7. Installieren und initialisieren Sie Google Cloud CLI.
  8. Erstellen Sie einen Dataproc-Cluster in Ihrem Projekt. In Ihrem Cluster sollte eine Dataproc-Version mit Spark 2.0 oder höher ausgeführt werden (enthält ML-Bibliotheken).

Teilmenge der natality-Daten von BigQuery erstellen

In diesem Abschnitt erstellen Sie ein Dataset in Ihrem Projekt, dann eine Tabelle in diesem Dataset, in das Sie eine Teilmenge der Daten zu Geburtenraten aus dem öffentlich zugänglichen BigQuery-Dataset natality kopieren. Später in dieser Anleitung verwenden Sie die Teilmengendaten in dieser Tabelle, um das Geburtsgewicht in Abhängigkeit vom Alter der Mutter, des Vaters und von den Schwangerschaftswochen vorherzusagen.

Sie können die Datenteilmenge in der Google Cloud Console oder über ein Python-Skript auf Ihrem lokalen Computer erstellen.

Console

  1. Erstellen Sie ein Dataset in Ihrem Projekt.

    1. Rufen Sie die Web-UI von BigQuery auf.
    2. Klicken Sie im linken Navigationsbereich auf den Namen Ihres Projekts und anschließend auf DATASET ERSTELLEN.
    3. Führen Sie im Dialogfeld Dataset erstellen folgende Schritte aus:
      1. Geben Sie als Dataset ID (Dataset-ID) "natality_regression" ein.
      2. Wählen Sie unter Speicherort der Daten einen Standort für das Dataset aus. Der Standardwert für den Standort ist US multi-region. Nachdem ein Dataset erstellt wurde, kann der Standort nicht mehr geändert werden.
      3. Wählen Sie für Standard-Tabellenablauf eine der folgenden Optionen aus:
        • Nie: (Standardeinstellung) Sie müssen die Tabelle manuell löschen.
        • Anzahl Tage: Die Tabelle wird nach der angegebenen Anzahl von Tagen ab ihrer Erstellung gelöscht.
      4. Wählen Sie unter Verschlüsselung eine der folgenden Optionen aus:
      5. Klicken Sie auf Dataset erstellen.
  2. Führen Sie eine Abfrage für das öffentliche "natality"-Dataset aus und speichern Sie die Abfrageergebnisse in einer neuen Tabelle in Ihrem Dataset.

    1. Kopieren Sie die folgende Abfrage und fügen Sie sie in den Abfrageeditor ein. Klicken Sie dann auf „Ausführen“.
      SELECT
      weight_pounds,
      mother_age,
      father_age,
      gestation_weeks,
      weight_gain_pounds,
      apgar_5min
      FROM
      `bigquery-public-data.samples.natality`
      WHERE
      weight_pounds IS NOT NULL
      AND mother_age IS NOT NULL
      AND father_age IS NOT NULL
      AND gestation_weeks IS NOT NULL
      AND weight_gain_pounds IS NOT NULL
      AND apgar_5min IS NOT NULL
      
    2. Klicken Sie nach etwa 1 Minute auf ERGEBNISSE SPEICHERN und wählen Sie die Option „Speichern“ aus, um die Ergebnisse als BigQuery-Tabelle im Dataset natality_regression im Projekt zu speichern.

Python

  1. Anleitungen zum Installieren von Python und der Google Cloud-Clientbibliothek für Python (zur Ausführung des Codes erforderlich) finden Sie unter Python-Entwicklungsumgebung einrichten. Es wird empfohlen, eine virtualenv von Python zu installieren und zu verwenden.

  2. Kopieren Sie den nachfolgenden natality_tutorial.py-Code und fügen Sie ihn in eine python-Shell auf Ihrem lokalen Computer ein. Drücken Sie in der Shell die Taste <return>, um den Code auszuführen und das BigQuery-Dataset "natality_regression" in Ihrem Google Cloud-Standardprojekt mit der Tabelle "regression_input" zu erstellen. Diese Tabelle wird mit einer Teilmenge der öffentlichen natality-Daten gefüllt.

    """Create a Google BigQuery linear regression input table.
    
    In the code below, the following actions are taken:
    * A new dataset is created "natality_regression."
    * A query is run against the public dataset,
        bigquery-public-data.samples.natality, selecting only the data of
        interest to the regression, the output of which is stored in a new
        "regression_input" table.
    * The output table is moved over the wire to the user's default project via
        the built-in BigQuery Connector for Spark that bridges BigQuery and
        Cloud Dataproc.
    """
    
    from google.cloud import bigquery
    
    # Create a new Google BigQuery client using Google Cloud Platform project
    # defaults.
    client = bigquery.Client()
    
    # Prepare a reference to a new dataset for storing the query results.
    dataset_id = "natality_regression"
    dataset_id_full = f"{client.project}.{dataset_id}"
    
    dataset = bigquery.Dataset(dataset_id_full)
    
    # Create the new BigQuery dataset.
    dataset = client.create_dataset(dataset)
    
    # Configure the query job.
    job_config = bigquery.QueryJobConfig()
    
    # Set the destination table to where you want to store query results.
    # As of google-cloud-bigquery 1.11.0, a fully qualified table ID can be
    # used in place of a TableReference.
    job_config.destination = f"{dataset_id_full}.regression_input"
    
    # Set up a query in Standard SQL, which is the default for the BigQuery
    # Python client library.
    # The query selects the fields of interest.
    query = """
        SELECT
            weight_pounds, mother_age, father_age, gestation_weeks,
            weight_gain_pounds, apgar_5min
        FROM
            `bigquery-public-data.samples.natality`
        WHERE
            weight_pounds IS NOT NULL
            AND mother_age IS NOT NULL
            AND father_age IS NOT NULL
            AND gestation_weeks IS NOT NULL
            AND weight_gain_pounds IS NOT NULL
            AND apgar_5min IS NOT NULL
    """
    
    # Run the query.
    query_job = client.query(query, job_config=job_config)
    query_job.result()  # Waits for the query to finish
  3. Bestätigen Sie die Erstellung des natality_regression-Datasets und der regression_input-Tabelle.

Lineare Regression ausführen

In diesem Abschnitt führen Sie eine lineare PySpark-Regression durch. Dazu senden Sie den Job über die Google Cloud Console an den Dataproc-Dienst oder über einen lokalen Terminal mit dem Befehl gcloud.

Console

  1. Kopieren Sie den folgenden Code und fügen Sie ihn in eine neue natality_sparkml.py-Datei auf Ihrem lokalen Computer ein.

    """Run a linear regression using Apache Spark ML.
    
    In the following PySpark (Spark Python API) code, we take the following actions:
    
      * Load a previously created linear regression (BigQuery) input table
        into our Cloud Dataproc Spark cluster as an RDD (Resilient
        Distributed Dataset)
      * Transform the RDD into a Spark Dataframe
      * Vectorize the features on which the model will be trained
      * Compute a linear regression using Spark ML
    
    """
    from pyspark.context import SparkContext
    from pyspark.ml.linalg import Vectors
    from pyspark.ml.regression import LinearRegression
    from pyspark.sql.session import SparkSession
    # The imports, above, allow us to access SparkML features specific to linear
    # regression as well as the Vectors types.
    
    # Define a function that collects the features of interest
    # (mother_age, father_age, and gestation_weeks) into a vector.
    # Package the vector in a tuple containing the label (`weight_pounds`) for that
    # row.
    def vector_from_inputs(r):
      return (r["weight_pounds"], Vectors.dense(float(r["mother_age"]),
                                                float(r["father_age"]),
                                                float(r["gestation_weeks"]),
                                                float(r["weight_gain_pounds"]),
                                                float(r["apgar_5min"])))
    
    sc = SparkContext()
    spark = SparkSession(sc)
    
    # Read the data from BigQuery as a Spark Dataframe.
    natality_data = spark.read.format("bigquery").option(
        "table", "natality_regression.regression_input").load()
    # Create a view so that Spark SQL queries can be run against the data.
    natality_data.createOrReplaceTempView("natality")
    
    # As a precaution, run a query in Spark SQL to ensure no NULL values exist.
    sql_query = """
    SELECT *
    from natality
    where weight_pounds is not null
    and mother_age is not null
    and father_age is not null
    and gestation_weeks is not null
    """
    clean_data = spark.sql(sql_query)
    
    # Create an input DataFrame for Spark ML using the above function.
    training_data = clean_data.rdd.map(vector_from_inputs).toDF(["label",
                                                                 "features"])
    training_data.cache()
    
    # Construct a new LinearRegression object and fit the training data.
    lr = LinearRegression(maxIter=5, regParam=0.2, solver="normal")
    model = lr.fit(training_data)
    # Print the model summary.
    print("Coefficients:" + str(model.coefficients))
    print("Intercept:" + str(model.intercept))
    print("R^2:" + str(model.summary.r2))
    model.summary.residuals.show()
    
    

  2. Kopieren Sie die lokale natality_sparkml.py-Datei in einen Cloud Storage-Bucket in Ihrem Projekt.

    gsutil cp natality_sparkml.py gs://bucket-name
    

  3. Führen Sie die Regression über die Dataproc-Seite Job senden aus.

    1. Fügen Sie im Feld Python-Hauptdatei den gs://-URI des Cloud Storage-Buckets ein, in dem sich Ihre Kopie der natality_sparkml.py-Datei befindet.

    2. Wählen Sie PySpark als Jobtyp aus.

    3. Fügen Sie gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar in das Feld Jar-Dateien ein. Dadurch wird der Spark-BigQuery-Connector der PySpark-Anwendung zur Laufzeit zur Verfügung gestellt, damit sie BigQuery-Daten in einem Spark-DataFrame lesen kann.

    4. Füllen Sie die Felder Job-ID, Region und Cluster aus.

    5. Klicken Sie auf Submit (Senden), um den Job in Ihrem Cluster auszuführen.

Wenn der Job abgeschlossen ist, wird im Fenster mit den Dataproc-Jobdetails die Zusammenfassung der Ausgabe des linearen Regressionsmodells angezeigt.

gcloud

  1. Kopieren Sie den folgenden Code und fügen Sie ihn in eine neue natality_sparkml.py-Datei auf Ihrem lokalen Computer ein.

    """Run a linear regression using Apache Spark ML.
    
    In the following PySpark (Spark Python API) code, we take the following actions:
    
      * Load a previously created linear regression (BigQuery) input table
        into our Cloud Dataproc Spark cluster as an RDD (Resilient
        Distributed Dataset)
      * Transform the RDD into a Spark Dataframe
      * Vectorize the features on which the model will be trained
      * Compute a linear regression using Spark ML
    
    """
    from pyspark.context import SparkContext
    from pyspark.ml.linalg import Vectors
    from pyspark.ml.regression import LinearRegression
    from pyspark.sql.session import SparkSession
    # The imports, above, allow us to access SparkML features specific to linear
    # regression as well as the Vectors types.
    
    # Define a function that collects the features of interest
    # (mother_age, father_age, and gestation_weeks) into a vector.
    # Package the vector in a tuple containing the label (`weight_pounds`) for that
    # row.
    def vector_from_inputs(r):
      return (r["weight_pounds"], Vectors.dense(float(r["mother_age"]),
                                                float(r["father_age"]),
                                                float(r["gestation_weeks"]),
                                                float(r["weight_gain_pounds"]),
                                                float(r["apgar_5min"])))
    
    sc = SparkContext()
    spark = SparkSession(sc)
    
    # Read the data from BigQuery as a Spark Dataframe.
    natality_data = spark.read.format("bigquery").option(
        "table", "natality_regression.regression_input").load()
    # Create a view so that Spark SQL queries can be run against the data.
    natality_data.createOrReplaceTempView("natality")
    
    # As a precaution, run a query in Spark SQL to ensure no NULL values exist.
    sql_query = """
    SELECT *
    from natality
    where weight_pounds is not null
    and mother_age is not null
    and father_age is not null
    and gestation_weeks is not null
    """
    clean_data = spark.sql(sql_query)
    
    # Create an input DataFrame for Spark ML using the above function.
    training_data = clean_data.rdd.map(vector_from_inputs).toDF(["label",
                                                                 "features"])
    training_data.cache()
    
    # Construct a new LinearRegression object and fit the training data.
    lr = LinearRegression(maxIter=5, regParam=0.2, solver="normal")
    model = lr.fit(training_data)
    # Print the model summary.
    print("Coefficients:" + str(model.coefficients))
    print("Intercept:" + str(model.intercept))
    print("R^2:" + str(model.summary.r2))
    model.summary.residuals.show()
    
    

  2. Kopieren Sie die lokale natality_sparkml.py-Datei in einen Cloud Storage-Bucket in Ihrem Projekt.

    gsutil cp natality_sparkml.py gs://bucket-name
    

  3. Senden Sie den PySpark-Job an den Dataproc-Dienst. Führen Sie dazu den unten gezeigten gcloud-Befehl in einem Terminalfenster auf Ihrem lokalen Computer aus.

    1. Der Flag-Wert --jars stellt den Py-BigQuery-Connector der Laufzeit während der Laufzeit des PySpark-Jobs zur Verfügung, damit BigQuery-Daten in einen Spark DataFrame gelesen werden können.
      gcloud dataproc jobs submit pyspark \
          gs://your-bucket/natality_sparkml.py \
          --cluster=cluster-name \
          --region=region \
          --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_SCALA_VERSION-CONNECTOR_VERSION.jar
      

Die Ausgabe der linearen Regression (Modellübersicht) wird im Terminalfenster angezeigt, wenn der Job abgeschlossen ist.

<<< # Print the model summary.
... print "Coefficients:" + str(model.coefficients)
Coefficients:[0.0166657454602,-0.00296751984046,0.235714392936,0.00213002070133,-0.00048577251587]
<<< print "Intercept:" + str(model.intercept)
Intercept:-2.26130330748
<<< print "R^2:" + str(model.summary.r2)
R^2:0.295200579035
<<< model.summary.residuals.show()
+--------------------+
|           residuals|
+--------------------+
| -0.7234737533344147|
|  -0.985466980630501|
| -0.6669710598385468|
|  1.4162434829714794|
|-0.09373154375186754|
|-0.15461747949235072|
| 0.32659061654192545|
|  1.5053877697929803|
|  -0.640142797263989|
|   1.229530260294963|
|-0.03776160295256...|
| -0.5160734239126814|
| -1.5165972740062887|
|  1.3269085258245008|
|  1.7604670124710626|
|  1.2348130901905972|
|   2.318660276655887|
|  1.0936947030883175|
|  1.0169768511417363|
| -1.7744915698181583|
+--------------------+
only showing top 20 rows.

Bereinigen

Nachdem Sie die Anleitung abgeschlossen haben, können Sie die erstellten Ressourcen bereinigen, damit sie keine Kontingente mehr nutzen und keine Gebühren mehr anfallen. In den folgenden Abschnitten erfahren Sie, wie Sie diese Ressourcen löschen oder deaktivieren.

Projekt löschen

Am einfachsten vermeiden Sie weitere Kosten, wenn Sie das zum Ausführen der Anleitung erstellte Projekt löschen.

So löschen Sie das Projekt:

  1. Wechseln Sie in der Console zur Seite Ressourcen verwalten.

    Zur Seite „Ressourcen verwalten“

  2. Wählen Sie in der Projektliste das Projekt aus, das Sie löschen möchten, und klicken Sie dann auf Löschen.
  3. Geben Sie im Dialogfeld die Projekt-ID ein und klicken Sie auf Shut down (Beenden), um das Projekt zu löschen.

Dataproc-Cluster löschen

Siehe Cluster löschen.

Weitere Informationen