Use o Dataproc, o BigQuery e o Apache Spark ML para aprendizagem automática


O conetor do BigQuery para Apache Spark permite que os cientistas de dados combinem o poder do motor SQL perfeitamente escalável do BigQuery com as capacidades de aprendizagem automática do Apache Spark. Neste tutorial, mostramos como usar o Dataproc, o BigQuery e o Apache Spark ML para realizar a aprendizagem automática num conjunto de dados.

Objetivos

Use a regressão linear para criar um modelo do peso ao nascer como função de cinco fatores:

  • semanas de gestação
  • Idade da mãe
  • Idade do pai
  • aumento de peso da mãe durante a gravidez
  • Índice de Apgar

Use as seguintes ferramentas:

  • BigQuery, para preparar a tabela de entrada de regressão linear, que é escrita no seu Google Cloud projeto
  • Python, para consultar e gerir dados no BigQuery
  • Apache Spark, para aceder à tabela de regressão linear resultante
  • Spark ML, para criar e avaliar o modelo
  • Tarefa PySpark do Dataproc para invocar funções do Spark ML

Custos

Neste documento, usa os seguintes componentes faturáveis do Google Cloud:

  • Compute Engine
  • Dataproc
  • BigQuery

Para gerar uma estimativa de custos com base na sua utilização projetada, use a calculadora de preços.

Os novos Google Cloud utilizadores podem ser elegíveis para uma avaliação gratuita.

Antes de começar

Um cluster do Dataproc tem os componentes do Spark, incluindo o Spark ML, instalados. Para configurar um cluster do Dataproc e executar o código neste exemplo, tem de fazer (ou ter feito) o seguinte:

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Enable the Dataproc, BigQuery, Compute Engine APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  4. Install the Google Cloud CLI.

  5. Se estiver a usar um fornecedor de identidade (IdP) externo, tem primeiro de iniciar sessão na CLI gcloud com a sua identidade federada.

  6. Para inicializar a CLI gcloud, execute o seguinte comando:

    gcloud init
  7. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  8. Enable the Dataproc, BigQuery, Compute Engine APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  9. Install the Google Cloud CLI.

  10. Se estiver a usar um fornecedor de identidade (IdP) externo, tem primeiro de iniciar sessão na CLI gcloud com a sua identidade federada.

  11. Para inicializar a CLI gcloud, execute o seguinte comando:

    gcloud init
  12. Crie um cluster do Dataproc no seu projeto. O cluster deve estar a executar uma versão do Dataproc com o Spark 2.0 ou superior (inclui bibliotecas de aprendizagem automática).
  13. Crie um subconjunto de dados de natalidade do BigQuery

    Nesta secção, cria um conjunto de dados no seu projeto e, em seguida, cria uma tabela no conjunto de dados para o qual copia um subconjunto de dados da taxa de natalidade do conjunto de dados do BigQuery natality disponível publicamente. Mais adiante neste tutorial, vai usar os dados do subconjunto nesta tabela para prever o peso ao nascer como uma função da idade materna, da idade paterna e das semanas de gestação.

    Pode criar o subconjunto de dados através da Google Cloud consola ou executando um script Python no seu computador local.

    Consola

    1. Crie um conjunto de dados no seu projeto.

      1. Aceda à IU Web do BigQuery.
      2. No painel de navegação do lado esquerdo, clique no nome do projeto e, de seguida, em CRIAR CONJUNTO DE DADOS.
      3. Na caixa de diálogo Criar conjunto de dados:
        1. Para ID do conjunto de dados, introduza "natality_regression".
        2. Para Localização de dados, pode escolher uma localização para o conjunto de dados. A localização do valor predefinido é US multi-region. Após a criação de um conjunto de dados, não é possível alterar a localização.
        3. Em Validade predefinida da tabela, escolha uma das seguintes opções:
          • Nunca (predefinição): tem de eliminar a tabela manualmente.
          • Número de dias: a tabela é eliminada após o número de dias especificado a partir da hora de criação.
        4. Para Encriptação, escolha uma das seguintes opções:
        5. Clique em Criar conjunto de dados.
    2. Execute uma consulta no conjunto de dados público de natalidade e, em seguida, guarde os resultados da consulta numa nova tabela no seu conjunto de dados.

      1. Copie e cole a seguinte consulta no editor de consultas e, de seguida, clique em Executar.
        CREATE OR REPLACE TABLE natality_regression.regression_input as
        SELECT
        weight_pounds,
        mother_age,
        father_age,
        gestation_weeks,
        weight_gain_pounds,
        apgar_5min
        FROM
        `bigquery-public-data.samples.natality`
        WHERE
        weight_pounds IS NOT NULL
        AND mother_age IS NOT NULL
        AND father_age IS NOT NULL
        AND gestation_weeks IS NOT NULL
        AND weight_gain_pounds IS NOT NULL
        AND apgar_5min IS NOT NULL
        
      2. Após a conclusão da consulta (em aproximadamente um minuto), os resultados são guardados como a tabela do BigQuery "regression_input" no conjunto de dados natality_regression no seu projeto.

    Python

    Antes de experimentar este exemplo, siga as Pythoninstruções de configuração no início rápido do Dataproc com as bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Python Dataproc.

    Para se autenticar no Dataproc, configure as Credenciais padrão da aplicação. Para mais informações, consulte o artigo Configure a autenticação para um ambiente de desenvolvimento local.

    1. Consulte o artigo Configurar um ambiente de desenvolvimento Python para ver instruções sobre a instalação do Python e da biblioteca cliente Google Cloud para Python (necessária para executar o código). Recomendamos a instalação e a utilização de um Python virtualenv.

    2. Copie e cole o código natality_tutorial.py abaixo numa shell python na sua máquina local. Prima a tecla <return> na shell para executar o código para criar um conjunto de dados do BigQuery "natality_regression" no seu projetoGoogle Cloud predefinido com uma tabela "regression_input" preenchida com um subconjunto dos dados natality públicos.

      """Create a Google BigQuery linear regression input table.
      
      In the code below, the following actions are taken:
      * A new dataset is created "natality_regression."
      * A query is run against the public dataset,
          bigquery-public-data.samples.natality, selecting only the data of
          interest to the regression, the output of which is stored in a new
          "regression_input" table.
      * The output table is moved over the wire to the user's default project via
          the built-in BigQuery Connector for Spark that bridges BigQuery and
          Cloud Dataproc.
      """
      
      from google.cloud import bigquery
      
      # Create a new Google BigQuery client using Google Cloud Platform project
      # defaults.
      client = bigquery.Client()
      
      # Prepare a reference to a new dataset for storing the query results.
      dataset_id = "natality_regression"
      dataset_id_full = f"{client.project}.{dataset_id}"
      
      dataset = bigquery.Dataset(dataset_id_full)
      
      # Create the new BigQuery dataset.
      dataset = client.create_dataset(dataset)
      
      # Configure the query job.
      job_config = bigquery.QueryJobConfig()
      
      # Set the destination table to where you want to store query results.
      # As of google-cloud-bigquery 1.11.0, a fully qualified table ID can be
      # used in place of a TableReference.
      job_config.destination = f"{dataset_id_full}.regression_input"
      
      # Set up a query in Standard SQL, which is the default for the BigQuery
      # Python client library.
      # The query selects the fields of interest.
      query = """
          SELECT
              weight_pounds, mother_age, father_age, gestation_weeks,
              weight_gain_pounds, apgar_5min
          FROM
              `bigquery-public-data.samples.natality`
          WHERE
              weight_pounds IS NOT NULL
              AND mother_age IS NOT NULL
              AND father_age IS NOT NULL
              AND gestation_weeks IS NOT NULL
              AND weight_gain_pounds IS NOT NULL
              AND apgar_5min IS NOT NULL
      """
      
      # Run the query.
      client.query_and_wait(query, job_config=job_config)  # Waits for the query to finish
    3. Confirme a criação do conjunto de dados natality_regression e da tabela regression_input.

    Execute uma regressão linear

    Nesta secção, vai executar uma regressão linear do PySpark enviando a tarefa para o serviço Dataproc através da Google Cloud consola ou executando o comando gcloud a partir de um terminal local.

    Consola

    1. Copie e cole o seguinte código num novo natality_sparkml.py ficheiro no seu computador local.

      """Run a linear regression using Apache Spark ML.
      
      In the following PySpark (Spark Python API) code, we take the following actions:
      
        * Load a previously created linear regression (BigQuery) input table
          into our Cloud Dataproc Spark cluster as an RDD (Resilient
          Distributed Dataset)
        * Transform the RDD into a Spark Dataframe
        * Vectorize the features on which the model will be trained
        * Compute a linear regression using Spark ML
      
      """
      from pyspark.context import SparkContext
      from pyspark.ml.linalg import Vectors
      from pyspark.ml.regression import LinearRegression
      from pyspark.sql.session import SparkSession
      # The imports, above, allow us to access SparkML features specific to linear
      # regression as well as the Vectors types.
      
      
      # Define a function that collects the features of interest
      # (mother_age, father_age, and gestation_weeks) into a vector.
      # Package the vector in a tuple containing the label (`weight_pounds`) for that
      # row.
      def vector_from_inputs(r):
        return (r["weight_pounds"], Vectors.dense(float(r["mother_age"]),
                                                  float(r["father_age"]),
                                                  float(r["gestation_weeks"]),
                                                  float(r["weight_gain_pounds"]),
                                                  float(r["apgar_5min"])))
      
      sc = SparkContext()
      spark = SparkSession(sc)
      
      # Read the data from BigQuery as a Spark Dataframe.
      natality_data = spark.read.format("bigquery").option(
          "table", "natality_regression.regression_input").load()
      # Create a view so that Spark SQL queries can be run against the data.
      natality_data.createOrReplaceTempView("natality")
      
      # As a precaution, run a query in Spark SQL to ensure no NULL values exist.
      sql_query = """
      SELECT *
      from natality
      where weight_pounds is not null
      and mother_age is not null
      and father_age is not null
      and gestation_weeks is not null
      """
      clean_data = spark.sql(sql_query)
      
      # Create an input DataFrame for Spark ML using the above function.
      training_data = clean_data.rdd.map(vector_from_inputs).toDF(["label",
                                                                   "features"])
      training_data.cache()
      
      # Construct a new LinearRegression object and fit the training data.
      lr = LinearRegression(maxIter=5, regParam=0.2, solver="normal")
      model = lr.fit(training_data)
      # Print the model summary.
      print("Coefficients:" + str(model.coefficients))
      print("Intercept:" + str(model.intercept))
      print("R^2:" + str(model.summary.r2))
      model.summary.residuals.show()

    2. Copie o ficheiro natality_sparkml.py local para um contentor do Cloud Storage no seu projeto.

      gcloud storage cp natality_sparkml.py gs://bucket-name
      

    3. Execute a regressão a partir da página Enviar uma tarefa do Dataproc.

      1. No campo Ficheiro Python principal, insira o URI do contentor do Cloud Storage onde se encontra a sua cópia do ficheiro natality_sparkml.py.gs://

      2. Selecione PySpark como o Tipo de serviço.

      3. Insira gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar no campo Ficheiros JAR. Isto torna o spark-bigquery-connector disponível para a aplicação PySpark no momento da execução, o que lhe permite ler dados do BigQuery num DataFrame do Spark.

      4. Preencha os campos ID da tarefa, Região e Cluster.

      5. Clique em Enviar para executar a tarefa no seu cluster.

    Quando a tarefa estiver concluída, o resumo do modelo de saída de regressão linear é apresentado na janela de detalhes da tarefa do Dataproc.

    gcloud

    1. Copie e cole o seguinte código num novo natality_sparkml.py ficheiro no seu computador local.

      """Run a linear regression using Apache Spark ML.
      
      In the following PySpark (Spark Python API) code, we take the following actions:
      
        * Load a previously created linear regression (BigQuery) input table
          into our Cloud Dataproc Spark cluster as an RDD (Resilient
          Distributed Dataset)
        * Transform the RDD into a Spark Dataframe
        * Vectorize the features on which the model will be trained
        * Compute a linear regression using Spark ML
      
      """
      from pyspark.context import SparkContext
      from pyspark.ml.linalg import Vectors
      from pyspark.ml.regression import LinearRegression
      from pyspark.sql.session import SparkSession
      # The imports, above, allow us to access SparkML features specific to linear
      # regression as well as the Vectors types.
      
      
      # Define a function that collects the features of interest
      # (mother_age, father_age, and gestation_weeks) into a vector.
      # Package the vector in a tuple containing the label (`weight_pounds`) for that
      # row.
      def vector_from_inputs(r):
        return (r["weight_pounds"], Vectors.dense(float(r["mother_age"]),
                                                  float(r["father_age"]),
                                                  float(r["gestation_weeks"]),
                                                  float(r["weight_gain_pounds"]),
                                                  float(r["apgar_5min"])))
      
      sc = SparkContext()
      spark = SparkSession(sc)
      
      # Read the data from BigQuery as a Spark Dataframe.
      natality_data = spark.read.format("bigquery").option(
          "table", "natality_regression.regression_input").load()
      # Create a view so that Spark SQL queries can be run against the data.
      natality_data.createOrReplaceTempView("natality")
      
      # As a precaution, run a query in Spark SQL to ensure no NULL values exist.
      sql_query = """
      SELECT *
      from natality
      where weight_pounds is not null
      and mother_age is not null
      and father_age is not null
      and gestation_weeks is not null
      """
      clean_data = spark.sql(sql_query)
      
      # Create an input DataFrame for Spark ML using the above function.
      training_data = clean_data.rdd.map(vector_from_inputs).toDF(["label",
                                                                   "features"])
      training_data.cache()
      
      # Construct a new LinearRegression object and fit the training data.
      lr = LinearRegression(maxIter=5, regParam=0.2, solver="normal")
      model = lr.fit(training_data)
      # Print the model summary.
      print("Coefficients:" + str(model.coefficients))
      print("Intercept:" + str(model.intercept))
      print("R^2:" + str(model.summary.r2))
      model.summary.residuals.show()

    2. Copie o ficheiro natality_sparkml.py local para um contentor do Cloud Storage no seu projeto.

      gcloud storage cp natality_sparkml.py gs://bucket-name
      

    3. Envie a tarefa do Pyspark para o serviço Dataproc executando o comando gcloud, apresentado abaixo, a partir de uma janela de terminal na sua máquina local.

      1. O valor da flag --jars torna o spark-bigquery-connector disponível para o trabalho do PySpark em tempo de execução para lhe permitir ler dados do BigQuery num DataFrame do Spark.
        gcloud dataproc jobs submit pyspark \
            gs://your-bucket/natality_sparkml.py \
            --cluster=cluster-name \
            --region=region \
            --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_SCALA_VERSION-CONNECTOR_VERSION.jar
        

    O resultado da regressão linear (resumo do modelo) é apresentado na janela de terminal quando a tarefa é concluída.

    <<< # Print the model summary.
    ... print "Coefficients:" + str(model.coefficients)
    Coefficients:[0.0166657454602,-0.00296751984046,0.235714392936,0.00213002070133,-0.00048577251587]
    <<< print "Intercept:" + str(model.intercept)
    Intercept:-2.26130330748
    <<< print "R^2:" + str(model.summary.r2)
    R^2:0.295200579035
    <<< model.summary.residuals.show()
    +--------------------+
    |           residuals|
    +--------------------+
    | -0.7234737533344147|
    |  -0.985466980630501|
    | -0.6669710598385468|
    |  1.4162434829714794|
    |-0.09373154375186754|
    |-0.15461747949235072|
    | 0.32659061654192545|
    |  1.5053877697929803|
    |  -0.640142797263989|
    |   1.229530260294963|
    |-0.03776160295256...|
    | -0.5160734239126814|
    | -1.5165972740062887|
    |  1.3269085258245008|
    |  1.7604670124710626|
    |  1.2348130901905972|
    |   2.318660276655887|
    |  1.0936947030883175|
    |  1.0169768511417363|
    | -1.7744915698181583|
    +--------------------+
    only showing top 20 rows.
    
      
    

    Clean up

    After you finish the tutorial, you can clean up the resources that you created so that they stop using quota and incurring charges. The following sections describe how to delete or turn off these resources.

    Delete the project

    The easiest way to eliminate billing is to delete the project that you created for the tutorial.

    To delete the project:

    1. In the Google Cloud console, go to the Manage resources page.

      Go to Manage resources

    2. In the project list, select the project that you want to delete, and then click Delete.
    3. In the dialog, type the project ID, and then click Shut down to delete the project.
    4. Elimine o cluster do Dataproc

      Consulte o artigo Elimine um grupo.

    O que se segue?