Use o conector do BigQuery com o Spark

O spark-bigquery-connector é usado com o Apache Spark para ler e gravar dados do e para o BigQuery. Neste tutorial, fornecemos um código de exemplo que usa o conector Spark-bigquery em um aplicativo Spark. Para instruções sobre como criar um cluster, consulte os Guias de início rápido do Dataproc.

Disponibilizar o conector para o app

É possível disponibilizar o spark-bigquery-connector para o aplicativo de uma das seguintes maneiras:

  1. Instale o spark-bigquery-connector no diretório jars do Spark de cada nó usando a ação de inicialização dos conectores do Dataproc ao criar o cluster.

  2. Informe o URI do conector ao enviar o job:

    1. Console do Google Cloud:use o item Jars files do job do Spark na página Enviar um job do Dataproc.
    2. CLI gcloud:use a flag gcloud dataproc jobs submit spark --jars.
    3. API Dataproc:use o campo SparkJob.jarFileUris.
  3. Inclua o jar no aplicativo Scala ou Java Spark como uma dependência. Consulte Como compilar com o conector.

Como especificar o URI do jar do conector

As versões do conector Spark-BigQuery estão listadas no repositório do GitHub GoogleCloudDataproc/spark-bigquery-connector.

Especifique o jar do conector substituindo as informações da versão do Scala e do conector na seguinte string de URI:

gs://spark-lib/bigquery/spark-bigquery-with-dependencies_SCALA_VERSION-CONNECTOR_VERSION.jar

  • Usar o Scala 2.12 com as versões de imagem do Dataproc 1.5+

    gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-CONNECTOR_VERSION.jar
    

    Exemplo da CLI gcloud:

    gcloud dataproc jobs submit spark \
        --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.23.2.jar \
        -- job-args
    

  • Use o Scala 2.11 com as versões de imagem do Dataproc 1.4 e anteriores:

    gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-CONNECTOR_VERSION.jar
    

    Exemplo da CLI gcloud:

    gcloud dataproc jobs submit spark \
        --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-0.23.2.jar \
        -- job-args
    

Cálculo de custos

Neste documento, você usará os seguintes componentes faturáveis do Google Cloud:

  • Dataproc
  • BigQuery
  • Cloud Storage

Para gerar uma estimativa de custo baseada na projeção de uso deste tutorial, use a calculadora de preços. Novos usuários do Google Cloud podem estar qualificados para uma avaliação gratuita.

Como gravar e ler dados do BigQuery

Este exemplo lê dados do BigQuery em um DataFrame do Spark para executar uma contagem de palavras usando a API de origem de dados padrão.

O conector grava os dados no BigQuery primeiro armazenando-os em buffer em uma tabela temporária do Cloud Storage. Em seguida, ele copia todos os dados do BigQuery em uma única operação. O conector tenta excluir os arquivos temporários depois que a operação de carregamento do BigQuery for bem-sucedida e mais uma vez quando o aplicativo Spark é encerrado. Se o job falhar, remova todos os arquivos temporários do Cloud Storage restantes. Normalmente, os arquivos temporários do BigQuery estão localizados em gs://[bucket]/.spark-bigquery-[jobid]-[UUID].

Como configurar o faturamento

Por padrão, o projeto associado às credenciais ou à conta de serviço é cobrado pelo uso da API. Para faturar um projeto diferente, defina a seguinte configuração: spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>").

Ele também pode ser adicionado a uma operação de leitura/gravação, da seguinte maneira: .option("parentProject", "<BILLED-GCP-PROJECT>").

Como executar o código

Antes de executar este exemplo, crie um conjunto de dados chamado "wordcount_dataset" ou altere o conjunto de dados de saída no código para um conjunto de dados existente do BigQuery no projeto do Google Cloud.

Use o comando bq para criar o wordcount_dataset:

bq mk wordcount_dataset

Use o comando Google Cloud CLI para criar um bucket do Cloud Storage, que será usado para exportar para o BigQuery:

gcloud storage buckets create gs://[bucket]

Scala

  1. Examine o código e substitua o marcador [bucket] pelo bucket do Cloud Storage criado anteriormente.
    /*
     * Remove comment if you are not running in spark-shell.
     *
    import org.apache.spark.sql.SparkSession
    val spark = SparkSession.builder()
      .appName("spark-bigquery-demo")
      .getOrCreate()
    */
    
    // Use the Cloud Storage bucket for temporary BigQuery export data used
    // by the connector.
    val bucket = "[bucket]"
    spark.conf.set("temporaryGcsBucket", bucket)
    
    // Load data in from BigQuery. See
    // https://github.com/GoogleCloudDataproc/spark-bigquery-connector/tree/0.17.3#properties
    // for option information.
    val wordsDF =
      (spark.read.format("bigquery")
      .option("table","bigquery-public-data:samples.shakespeare")
      .load()
      .cache())
    
    wordsDF.createOrReplaceTempView("words")
    
    // Perform word count.
    val wordCountDF = spark.sql(
      "SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word")
    wordCountDF.show()
    wordCountDF.printSchema()
    
    // Saving the data to BigQuery.
    (wordCountDF.write.format("bigquery")
      .option("table","wordcount_dataset.wordcount_output")
      .save())
  2. Executar o código no seu cluster
    1. Use o SSH para se conectar ao nó mestre do cluster do Dataproc.
      1. Acesse a página Clusters do Dataproc no console do Google Cloud e clique no nome do cluster.
        Página de clusters do Dataproc no Console do Cloud.
      2. Na página >Detalhes do cluster, selecione a guia "Instâncias de VM". Em seguida, clique em SSH à direita do nome do nó mestre do cluster
        Página de detalhes do cluster do Dataproc no console do Cloud

        Uma janela do navegador é aberta no diretório principal do nó mestre.
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. Crie wordcount.scala com o editor de texto vi, vim ou nano pré-instalado e cole o código da lista de códigos Scala
      nano wordcount.scala
        
    3. Inicie o REPL spark-shell.
      $ spark-shell --jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar
      ...
      Using Scala version ...
      Type in expressions to have them evaluated.
      Type :help for more information.
      ...
      Spark context available as sc.
      ...
      SQL context available as sqlContext.
      scala>
      
    4. Execute o wordcount.scala com o comando :load wordcount.scala para criar a tabela wordcount_output do BigQuery. A listagem de saída exibe 20 linhas a partir da saída de wordcount.
      :load wordcount.scala
      ...
      +---------+----------+
      |     word|word_count|
      +---------+----------+
      |     XVII|         2|
      |    spoil|        28|
      |    Drink|         7|
      |forgetful|         5|
      |   Cannot|        46|
      |    cures|        10|
      |   harder|        13|
      |  tresses|         3|
      |      few|        62|
      |  steel'd|         5|
      | tripping|         7|
      |   travel|        35|
      |   ransom|        55|
      |     hope|       366|
      |       By|       816|
      |     some|      1169|
      |    those|       508|
      |    still|       567|
      |      art|       893|
      |    feign|        10|
      +---------+----------+
      only showing top 20 rows
      
      root
       |-- word: string (nullable = false)
       |-- word_count: long (nullable = true)
      

      Para visualizar a tabela de saída, abra a página BigQuery, selecione a tabela wordcount_output e clique em Visualizar.
      Visualização da tabela na página do BigQuery Explorer no console do Cloud.

PySpark

  1. Examine o código e substitua o marcador [bucket] pelo bucket do Cloud Storage criado anteriormente.
    #!/usr/bin/env python
    
    """BigQuery I/O PySpark example."""
    
    from pyspark.sql import SparkSession
    
    spark = SparkSession \
      .builder \
      .master('yarn') \
      .appName('spark-bigquery-demo') \
      .getOrCreate()
    
    # Use the Cloud Storage bucket for temporary BigQuery export data used
    # by the connector.
    bucket = "[bucket]"
    spark.conf.set('temporaryGcsBucket', bucket)
    
    # Load data from BigQuery.
    words = spark.read.format('bigquery') \
      .option('table', 'bigquery-public-data:samples.shakespeare') \
      .load()
    words.createOrReplaceTempView('words')
    
    # Perform word count.
    word_count = spark.sql(
        'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
    word_count.show()
    word_count.printSchema()
    
    # Save the data to BigQuery
    word_count.write.format('bigquery') \
      .option('table', 'wordcount_dataset.wordcount_output') \
      .save()
  2. Execute o código no cluster
    1. Use o SSH para se conectar ao nó mestre do cluster do Dataproc.
      1. Acesse a página Clusters do Dataproc no console do Google Cloud e clique no nome do cluster.
        Página &quot;Clusters&quot; no console do Cloud.
      2. Na página Detalhes do cluster, selecione a guia "Instâncias de VM". Em seguida, clique em SSH à direita do nome do nó mestre do cluster
        Selecione SSH na linha do nome do cluster na página &quot;Detalhes do cluster&quot; no Console do Cloud.

        Uma janela do navegador é aberta no diretório principal do nó mestre.
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. Crie wordcount.py com o editor de texto vi, vim ou nano pré-instalado e cole o código PySpark da lista de códigos PySpark
      nano wordcount.py
      
    3. Execute a contagem de palavras com spark-submit para criar a tabela wordcount_output do BigQuery. A listagem de saída exibe 20 linhas a partir da saída de wordcount.
      spark-submit --jars gs://spark-lib/bigquery/spark-bigquery-latest.jar wordcount.py
      ...
      +---------+----------+
      |     word|word_count|
      +---------+----------+
      |     XVII|         2|
      |    spoil|        28|
      |    Drink|         7|
      |forgetful|         5|
      |   Cannot|        46|
      |    cures|        10|
      |   harder|        13|
      |  tresses|         3|
      |      few|        62|
      |  steel'd|         5|
      | tripping|         7|
      |   travel|        35|
      |   ransom|        55|
      |     hope|       366|
      |       By|       816|
      |     some|      1169|
      |    those|       508|
      |    still|       567|
      |      art|       893|
      |    feign|        10|
      +---------+----------+
      only showing top 20 rows
      
      root
       |-- word: string (nullable = false)
       |-- word_count: long (nullable = true)
      

      Para visualizar a tabela de saída, abra a página BigQuery, selecione a tabela wordcount_output e clique em Visualizar.
      Visualização da tabela na página do BigQuery Explorer no console do Cloud.

Para mais informações