Use o conector do BigQuery com o Spark

O spark-bigquery-conector é usado com o Apache Spark para ler e gravar dados do e para o BigQuery. Neste tutorial, fornecemos um código de exemplo que usa o conector Spark-bigquery em um aplicativo Spark. Para instruções sobre como criar um cluster, consulte os Guias de início rápido do Dataproc .

Como fornecer o conector ao aplicativo

O spark-bigquery-connector precisa estar disponível para o aplicativo no ambiente de execução. Isso pode ser feito de uma das seguintes maneiras:

  • Instale o conector no diretório jars do Spark.
  • Adicione o conector no ambiente de execução usando o parâmetro --jars, que pode ser usado com a API Dataproc ou spark-submit.
    • Se você estiver usando a imagem 1.5 do Dataproc, adicione o seguinte parâmetro:
      --jars=gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar
    • Se você estiver usando a imagem 1.4 ou anterior do Dataproc, adicione o seguinte parâmetro:
      --jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar
  • Inclua o jar no aplicativo Scala ou Java Spark como uma dependência. Consulte Como compilar com o conector.

Se o conector não estiver disponível no ambiente de execução, um ClassNotFoundException será gerado.

Cálculo de custos

Neste tutorial, há componentes faturáveis do Google Cloud, entre eles:

  • Dataproc
  • BigQuery
  • Cloud Storage

Use a Calculadora de preços para gerar uma estimativa de custo com base no uso previsto. É possível que novos usuários do Cloud Platform tenham direito a uma avaliação gratuita.

Como gravar e ler dados do BigQuery

Este exemplo lê dados do BigQuery em um DataFrame do Spark para executar uma contagem de palavras usando a API de origem de dados padrão.

O conector grava os dados no BigQuery primeiro armazenando-os em buffer em uma tabela temporária do Cloud Storage e, em seguida, copiando todos os dados do BigQuery em uma operação. O conector tenta excluir os arquivos temporários depois que a operação de carregamento do BigQuery for bem-sucedida e mais uma vez quando o aplicativo Spark é encerrado. Se o job falhar, talvez seja necessário remover manualmente os arquivos temporários restantes do Cloud Storage. Normalmente, você encontra exportações temporárias do BigQuery em gs://[bucket]/.spark-bigquery-[jobid]-[UUID].

Como configurar o faturamento

Por padrão. o projeto associado às credenciais ou à conta de serviço é cobrado pelo uso da API. Para faturar um projeto diferente, defina a seguinte configuração: spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>").

Ele também pode ser adicionado a uma operação de leitura/gravação, da seguinte maneira: .option("parentProject", "<BILLED-GCP-PROJECT>").

Como executar o código

Antes de executar este exemplo, crie um conjunto de dados chamado "wordcount_dataset" ou altere o conjunto de dados de saída no código para um conjunto de dados existente do BigQuery no projeto do Google Cloud.

Use o comando bq para criar o wordcount_dataset:

bq mk wordcount_dataset

Use o comando gsutil para criar um bucket do Cloud Storage, que será usado para exportar para o BigQuery:

gsutil mb gs://[bucket]

Scala

  1. Examine o código e substitua o marcador [bucket] pelo bucket do Cloud Storage criado acima.
    /*
     * Remove comment if you are not running in spark-shell.
     *
    import org.apache.spark.sql.SparkSession
    val spark = SparkSession.builder()
      .appName("spark-bigquery-demo")
      .getOrCreate()
    */
    
    // Use the Cloud Storage bucket for temporary BigQuery export data used
    // by the connector.
    val bucket = "[bucket]"
    spark.conf.set("temporaryGcsBucket", bucket)
    
    // Load data in from BigQuery.
    val wordsDF = spark.read.format("bigquery")
      .option("table","bigquery-public-data:samples.shakespeare")
      .load()
      .cache()
    wordsDF.createOrReplaceTempView("words")
    
    // Perform word count.
    val wordCountDF = spark.sql(
      "SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word")
    wordCountDF.show()
    wordCountDF.printSchema()
    
    // Saving the data to BigQuery.
    wordCountDF.write.format("bigquery")
      .option("table","wordcount_dataset.wordcount_output")
      .save()
    
    
  2. Executar o código no seu cluster
    1. Incorpore o SSH ao nó mestre do cluster do Dataproc.
      1. Acesse a página Clusters do Dataproc do seu projeto no Console do Cloud e, em seguida, clique no nome do cluster
      2. Na página de detalhes do cluster, selecione a guia Instâncias de VM e clique na seleção SSH à direita do nome do nó mestre do cluster.

        Uma janela do navegador é aberta no diretório principal no nó mestre.
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. Crie wordcount.scala com o editor de texto vi, vim ou nano pré-instalado e cole o código da lista de códigos Scala
      nano wordcount.scala
        
    3. Inicie o REPL spark-shell.
      $ spark-shell --jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar
      ...
      Using Scala version ...
      Type in expressions to have them evaluated.
      Type :help for more information.
      ...
      Spark context available as sc.
      ...
      SQL context available as sqlContext.
      scala>
      
    4. Execute o wordcount.scala com o comando :load wordcount.scala para criar a tabela wordcount_output do BigQuery. A listagem de saída exibe 20 linhas a partir da saída de wordcount.
      :load wordcount.scala
      ...
      +---------+----------+
      |     word|word_count|
      +---------+----------+
      |     XVII|         2|
      |    spoil|        28|
      |    Drink|         7|
      |forgetful|         5|
      |   Cannot|        46|
      |    cures|        10|
      |   harder|        13|
      |  tresses|         3|
      |      few|        62|
      |  steel'd|         5|
      | tripping|         7|
      |   travel|        35|
      |   ransom|        55|
      |     hope|       366|
      |       By|       816|
      |     some|      1169|
      |    those|       508|
      |    still|       567|
      |      art|       893|
      |    feign|        10|
      +---------+----------+
      only showing top 20 rows
      
      root
       |-- word: string (nullable = false)
       |-- word_count: long (nullable = true)
      

      Para visualizar a tabela de saída, abra a página do BigQuery do projeto, selecione a tabela wordcount_output e clique em Visualizar.

PySpark

  1. Examine o código e substitua o marcador [bucket] pelo bucket do Cloud Storage criado acima.
    #!/usr/bin/python
    """BigQuery I/O PySpark example."""
    from pyspark.sql import SparkSession
    
    spark = SparkSession \
      .builder \
      .master('yarn') \
      .appName('spark-bigquery-demo') \
      .getOrCreate()
    
    # Use the Cloud Storage bucket for temporary BigQuery export data used
    # by the connector.
    bucket = "[bucket]"
    spark.conf.set('temporaryGcsBucket', bucket)
    
    # Load data from BigQuery.
    words = spark.read.format('bigquery') \
      .option('table', 'bigquery-public-data:samples.shakespeare') \
      .load()
    words.createOrReplaceTempView('words')
    
    # Perform word count.
    word_count = spark.sql(
        'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
    word_count.show()
    word_count.printSchema()
    
    # Saving the data to BigQuery
    word_count.write.format('bigquery') \
      .option('table', 'wordcount_dataset.wordcount_output') \
      .save()
    
  2. Execute o código no cluster
    1. Incorpore o SSH ao nó mestre do cluster do Dataproc.
      1. Acesse a página Clusters do Dataproc do seu projeto no Console do Cloud e, em seguida, clique no nome do cluster
      2. Na página de detalhes do cluster, selecione a guia Instâncias de VM e clique na seleção SSH à direita do nome do nó mestre do cluster.

        Uma janela do navegador é aberta no diretório principal no nó mestre.
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. Crie wordcount.py com o editor de texto vi, vim ou nano pré-instalado e cole o código PySpark da lista de códigos PySpark
      nano wordcount.py
      
    3. Execute a contagem de palavras com spark-submit para criar a tabela wordcount_output do BigQuery. A listagem de saída exibe 20 linhas a partir da saída de wordcount.
      spark-submit --jars gs://spark-lib/bigquery/spark-bigquery-latest.jar wordcount.py
      ...
      +---------+----------+
      |     word|word_count|
      +---------+----------+
      |     XVII|         2|
      |    spoil|        28|
      |    Drink|         7|
      |forgetful|         5|
      |   Cannot|        46|
      |    cures|        10|
      |   harder|        13|
      |  tresses|         3|
      |      few|        62|
      |  steel'd|         5|
      | tripping|         7|
      |   travel|        35|
      |   ransom|        55|
      |     hope|       366|
      |       By|       816|
      |     some|      1169|
      |    those|       508|
      |    still|       567|
      |      art|       893|
      |    feign|        10|
      +---------+----------+
      only showing top 20 rows
      
      root
       |-- word: string (nullable = false)
       |-- word_count: long (nullable = true)
      

      Para visualizar a tabela de saída, abra a página do BigQuery do projeto, selecione a tabela wordcount_output e clique em Visualizar.

Para saber mais