Usar o Dataproc, o BigQuery e o Apache Spark ML para machine learning

O BigQuery Connector para Apache Spark permite que cientistas de dados mesclem a força do mecanismo SQL escalonável e confiável do BigQuery com os recursos de machine learning do Apache Spark. Neste tutorial, mostramos como usar o Dataproc, o BigQuery e o Apache Spark ML para realizar machine learning em um conjunto de dados.

Objetivos

Usar a regressão linear para criar um modelo de peso de nascimento como uma função de cinco fatores:

  1. semanas de gestação
  2. idade da mãe
  3. idade do pai
  4. ganho de peso da mãe durante a gestação
  5. escala de Apgar

O BigQuery é usado para preparar a tabela de entrada de regressão linear, que é gravada no projeto do Google Cloud Platform. O Python é usado para consultar e gerenciar dados no BigQuery. A tabela de regressão linear resultante é acessada no Apache Spark, e o ML do Spark é usado para criar e avaliar o modelo. Um job do Dataproc PySpark é usado para invocar as funções do ML do Spark.

Custos

Neste tutorial, há componentes do Google Cloud Platform que são cobrados, entre eles:

  • Compute Engine
  • Dataproc
  • BigQuery

Use a Calculadora de preços para gerar uma estimativa de custo com base no uso previsto. Usuários novos do Cloud Platform podem ter direito a uma avaliação gratuita.

Antes de começar

Um cluster do Dataproc tem os componentes do Spark, inclusive o Spark ML, instalados. Para configurar um cluster do Dataproc e executar o código deste exemplo, você precisará fazer (ou ter feito) o seguinte:

  1. Faça login na sua conta do Google.

    Se você ainda não tiver uma, inscreva-se.

  2. No Console do Google Cloud, na página do seletor de projetos, selecione ou crie um projeto do Google Cloud.

    Acessar a página do seletor de projetos

  3. Ative as APIs Dataproc, BigQuery, Compute Engine.

    Ative as APIs

  4. Instale e inicialize o SDK do Cloud..
  5. Crie um cluster do Dataproc no projeto. Seu cluster precisa executar uma versão do Dataproc com Spark 2.0 ou superior, incluindo bibliotecas de machine learning.

Criar um subconjunto de dados do BigQuery natality

Nesta seção, você cria um conjunto de dados no projeto e uma tabela no conjunto de dados para que queira copiar um subconjunto de dados da taxa de natalidade do conjunto de dados natality do BigQuery disponível publicamente. Posteriormente, neste tutorial, você usará os dados do subconjunto nesta tabela para prever o peso de nascimento como uma função da idade materna, da idade paterna e das semanas de gestação.

É possível criar o subconjunto de dados usando o Console do Google Cloud ou executando um script Python na máquina local.

Console

  1. Crie um conjunto de dados no projeto.

    1. Acesse a IU da Web do BigQuery.
    2. No painel de navegação esquerdo, clique no nome do projeto e, em seguida, clique em CRIAR CONJUNTO DE DADOS.
    3. Na caixa de diálogo Criar conjunto de dados:
      1. Em ID do conjunto de dados, digite "natality_regression".
      2. Em Local de dados, escolha um local para o conjunto de dados. O local do valor padrão é US multi-region. Depois que um conjunto de dados é criado, o local não pode ser alterado.
      3. Em Validade da tabela padrão, escolha uma das seguintes opções:
        • Nunca (padrão): você deve excluir a tabela manualmente.
        • Número de dias: a tabela será excluída após o número de dias especificado a partir da data de criação.
      4. Para Criptografia, escolha uma das seguintes opções:
      5. Clique em Criar conjunto de dados.
  2. Execute uma consulta no conjunto de dados de natalidade pública e salve os resultados da consulta em uma nova tabela no conjunto de dados.

    1. Copie e cole a seguinte consulta no Editor de Consultas e clique em Executar.
      SELECT
      weight_pounds,
      mother_age,
      father_age,
      gestation_weeks,
      weight_gain_pounds,
      apgar_5min
      FROM
      `bigquery-public-data.samples.natality`
      WHERE
      weight_pounds IS NOT NULL
      AND mother_age IS NOT NULL
      AND father_age IS NOT NULL
      AND gestation_weeks IS NOT NULL
      AND weight_gain_pounds IS NOT NULL
      AND apgar_5min IS NOT NULL
      
    2. Depois que a consulta for concluída (após aproximadamente um minuto), clique em SALVAR RESULTADOS e selecione as opções para salvar os resultados como uma tabela "regression_input" do BigQuery no conjunto de dados natality_regression em seu projeto.

Python

  1. Consulte Como configurar um ambiente de desenvolvimento do Python para ver instruções sobre como instalar o Python e a biblioteca de cliente do Google Cloud para Python (necessária para executar o código). É recomendado instalar e usar um virtualenv do Python.

  2. Copie e cole o código natality_tutorial.py abaixo em um shell python na máquina local. Pressione o botão <return> no shell para executar o código para criar um conjunto de dados "natality_regression" do BigQuery no seu projeto padrão do Google Cloud com uma tabela "regression_input" preenchida com um subconjunto dos dados natality públicos.

    """Create a Google BigQuery linear regression input table.
    
    In the code below, the following actions are taken:
    * A new dataset is created "natality_regression."
    * A query is run against the public dataset,
        bigquery-public-data.samples.natality, selecting only the data of
        interest to the regression, the output of which is stored in a new
        "regression_input" table.
    * The output table is moved over the wire to the user's default project via
        the built-in BigQuery Connector for Spark that bridges BigQuery and
        Cloud Dataproc.
    """
    
    from google.cloud import bigquery
    
    # Create a new Google BigQuery client using Google Cloud Platform project
    # defaults.
    client = bigquery.Client()
    
    # Prepare a reference to a new dataset for storing the query results.
    dataset_id = "natality_regression"
    
    dataset = bigquery.Dataset(client.dataset(dataset_id))
    
    # Create the new BigQuery dataset.
    dataset = client.create_dataset(dataset)
    
    # In the new BigQuery dataset, create a reference to a new table for
    # storing the query results.
    table_ref = dataset.table("regression_input")
    
    # Configure the query job.
    job_config = bigquery.QueryJobConfig()
    
    # Set the destination table to the table reference created above.
    job_config.destination = table_ref
    
    # Set up a query in Standard SQL, which is the default for the BigQuery
    # Python client library.
    # The query selects the fields of interest.
    query = """
        SELECT
            weight_pounds, mother_age, father_age, gestation_weeks,
            weight_gain_pounds, apgar_5min
        FROM
            `bigquery-public-data.samples.natality`
        WHERE
            weight_pounds IS NOT NULL
            AND mother_age IS NOT NULL
            AND father_age IS NOT NULL
            AND gestation_weeks IS NOT NULL
            AND weight_gain_pounds IS NOT NULL
            AND apgar_5min IS NOT NULL
    """
    
    # Run the query.
    query_job = client.query(query, job_config=job_config)
    query_job.result()  # Waits for the query to finish
  3. Confirme a criação do conjunto de dados natality_regression e da tabela regression_input.

Executar uma regressão linear

Nesta seção, você executará uma regressão linear do PySpark enviando o job ao serviço do Dataproc por meio do Console do Google Cloud ou executando o comando gcloud a partir de um terminal local.

Console

  1. Copie e cole o seguinte código em um novo arquivo natality_sparkml.py na máquina local.

    """Run a linear regression using Apache Spark ML.
    
    In the following PySpark (Spark Python API) code, we take the following actions:
    
      * Load a previously created linear regression (BigQuery) input table
        into our Cloud Dataproc Spark cluster as an RDD (Resilient
        Distributed Dataset)
      * Transform the RDD into a Spark Dataframe
      * Vectorize the features on which the model will be trained
      * Compute a linear regression using Spark ML
    
    """
    
    from __future__ import print_function
    from pyspark.context import SparkContext
    from pyspark.ml.linalg import Vectors
    from pyspark.ml.regression import LinearRegression
    from pyspark.sql.session import SparkSession
    # The imports, above, allow us to access SparkML features specific to linear
    # regression as well as the Vectors types.
    
    # Define a function that collects the features of interest
    # (mother_age, father_age, and gestation_weeks) into a vector.
    # Package the vector in a tuple containing the label (`weight_pounds`) for that
    # row.
    def vector_from_inputs(r):
      return (r["weight_pounds"], Vectors.dense(float(r["mother_age"]),
                                                float(r["father_age"]),
                                                float(r["gestation_weeks"]),
                                                float(r["weight_gain_pounds"]),
                                                float(r["apgar_5min"])))
    
    sc = SparkContext()
    spark = SparkSession(sc)
    
    # Read the data from BigQuery as a Spark Dataframe.
    natality_data = spark.read.format("bigquery").option(
        "table", "natality_regression.regression_input").load()
    # Create a view so that Spark SQL queries can be run against the data.
    natality_data.createOrReplaceTempView("natality")
    
    # As a precaution, run a query in Spark SQL to ensure no NULL values exist.
    sql_query = """
    SELECT *
    from natality
    where weight_pounds is not null
    and mother_age is not null
    and father_age is not null
    and gestation_weeks is not null
    """
    clean_data = spark.sql(sql_query)
    
    # Create an input DataFrame for Spark ML using the above function.
    training_data = clean_data.rdd.map(vector_from_inputs).toDF(["label",
                                                                 "features"])
    training_data.cache()
    
    # Construct a new LinearRegression object and fit the training data.
    lr = LinearRegression(maxIter=5, regParam=0.2, solver="normal")
    model = lr.fit(training_data)
    # Print the model summary.
    print("Coefficients:" + str(model.coefficients))
    print("Intercept:" + str(model.intercept))
    print("R^2:" + str(model.summary.r2))
    model.summary.residuals.show()
    
    

  2. Copie o arquivo natality_sparkml.py local para um bucket do Cloud Storage no seu projeto.

    gsutil cp natality_sparkml.py gs://bucket-name
    

  3. Execute a regressão na página Enviar um job do Dataproc.

    1. No campo Arquivo principal python, insira o URI gs:// do bucket do Cloud Storage no qual sua cópia do arquivo natality_sparkml.py está localizada.

    2. Selecione PySpark como o Tipo de job.

    3. Insira gs://spark-lib/bigquery/spark-bigquery-latest.jar no campo Arquivos Jar. Isso torna o conector spark-bigquery disponível para o aplicativo PySpark no ambiente de execução para permitir a leitura de dados do BigQuery em um Spark DataFrame.

    4. Preencha os campos ID do job, Região e Cluster.

    5. Clique em Enviar para executar o job no cluster.

Quando o job é concluído, o resumo do modelo de saída de regressão linear é exibido na janela de detalhes Job do Dataproc.

gcloud

  1. Copie e cole o seguinte código em um novo arquivo natality_sparkml.py na máquina local.

    """Run a linear regression using Apache Spark ML.
    
    In the following PySpark (Spark Python API) code, we take the following actions:
    
      * Load a previously created linear regression (BigQuery) input table
        into our Cloud Dataproc Spark cluster as an RDD (Resilient
        Distributed Dataset)
      * Transform the RDD into a Spark Dataframe
      * Vectorize the features on which the model will be trained
      * Compute a linear regression using Spark ML
    
    """
    
    from __future__ import print_function
    from pyspark.context import SparkContext
    from pyspark.ml.linalg import Vectors
    from pyspark.ml.regression import LinearRegression
    from pyspark.sql.session import SparkSession
    # The imports, above, allow us to access SparkML features specific to linear
    # regression as well as the Vectors types.
    
    # Define a function that collects the features of interest
    # (mother_age, father_age, and gestation_weeks) into a vector.
    # Package the vector in a tuple containing the label (`weight_pounds`) for that
    # row.
    def vector_from_inputs(r):
      return (r["weight_pounds"], Vectors.dense(float(r["mother_age"]),
                                                float(r["father_age"]),
                                                float(r["gestation_weeks"]),
                                                float(r["weight_gain_pounds"]),
                                                float(r["apgar_5min"])))
    
    sc = SparkContext()
    spark = SparkSession(sc)
    
    # Read the data from BigQuery as a Spark Dataframe.
    natality_data = spark.read.format("bigquery").option(
        "table", "natality_regression.regression_input").load()
    # Create a view so that Spark SQL queries can be run against the data.
    natality_data.createOrReplaceTempView("natality")
    
    # As a precaution, run a query in Spark SQL to ensure no NULL values exist.
    sql_query = """
    SELECT *
    from natality
    where weight_pounds is not null
    and mother_age is not null
    and father_age is not null
    and gestation_weeks is not null
    """
    clean_data = spark.sql(sql_query)
    
    # Create an input DataFrame for Spark ML using the above function.
    training_data = clean_data.rdd.map(vector_from_inputs).toDF(["label",
                                                                 "features"])
    training_data.cache()
    
    # Construct a new LinearRegression object and fit the training data.
    lr = LinearRegression(maxIter=5, regParam=0.2, solver="normal")
    model = lr.fit(training_data)
    # Print the model summary.
    print("Coefficients:" + str(model.coefficients))
    print("Intercept:" + str(model.intercept))
    print("R^2:" + str(model.summary.r2))
    model.summary.residuals.show()
    
    

  2. Copie o arquivo natality_sparkml.py local para um bucket do Cloud Storage no seu projeto.

    gsutil cp natality_sparkml.py gs://bucket-name
    

  3. Envie o job Pyspark para o serviço Dataproc executando o comando gcloud, mostrado abaixo, a partir de uma janela de terminal na máquina local.

    1. Para o valor da sinalização --files, insira o nome do bucket do Cloud Storage no qual sua cópia do arquivo natality_sparkml.py está localizada.
    2. O valor da sinalização --jars torna o conector spark-bigquery disponível para o jobv do PySpark no ambiente de execução para permitir a leitura de dados do BigQuery em um Spark DataFrame.
      gcloud dataproc jobs submit pyspark \
          --cluster=cluster-name \
          --region=region \
          --files=gs://your-bucket/natality_sparkml.py \
          --jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar
      

A saída de regressão linear (resumo do modelo) é exibida na janela do terminal quando o job é concluído.

<<< # Print the model summary.
... print "Coefficients:" + str(model.coefficients)
Coefficients:[0.0166657454602,-0.00296751984046,0.235714392936,0.00213002070133,-0.00048577251587]
<<< print "Intercept:" + str(model.intercept)
Intercept:-2.26130330748
<<< print "R^2:" + str(model.summary.r2)
R^2:0.295200579035
<<< model.summary.residuals.show()
+--------------------+
|           residuals|
+--------------------+
| -0.7234737533344147|
|  -0.985466980630501|
| -0.6669710598385468|
|  1.4162434829714794|
|-0.09373154375186754|
|-0.15461747949235072|
| 0.32659061654192545|
|  1.5053877697929803|
|  -0.640142797263989|
|   1.229530260294963|
|-0.03776160295256...|
| -0.5160734239126814|
| -1.5165972740062887|
|  1.3269085258245008|
|  1.7604670124710626|
|  1.2348130901905972|
|   2.318660276655887|
|  1.0936947030883175|
|  1.0169768511417363|
| -1.7744915698181583|
+--------------------+
only showing top 20 rows.

Limpeza

Depois de concluir o tutorial "Usar Dataproc, BigQuery e Apache Spark para machine learning", limpe os recursos criados no Google Cloud para que eles não ocupem cota e você não receba cobranças no futuro. Veja como excluir e desativar esses recursos nas seções a seguir.

Como excluir o projeto

O jeito mais fácil de evitar cobranças é excluindo o projeto que você criou para o tutorial.

Para excluir o projeto:

  1. No Console do Cloud, acesse a página Gerenciar recursos:

    Acessar a página "Gerenciar recursos"

  2. Na lista de projetos, selecione o projeto que você quer excluir e clique em Excluir .
  3. Na caixa de diálogo, digite o ID do projeto e clique em Encerrar para excluí-lo.

Como excluir o cluster do Dataproc

Consulte Excluir um cluster.