O spark-bigquery-conector é usado com o Apache Spark para ler e gravar dados do e para o BigQuery. Neste tutorial, fornecemos um código de exemplo que usa o conector Spark-bigquery em um aplicativo Spark. Para instruções sobre como criar um cluster, consulte os Guias de início rápido do Dataproc .
Como fornecer o conector ao aplicativo
O spark-bigquery-connector precisa estar disponível para o aplicativo no ambiente de execução. Isso pode ser feito de uma das seguintes maneiras:
- Instale o spark-bigquery-connector no diretório jars do Spark de cada nó usando a ação de inicialização dos conectores do Dataproc ao criar o cluster.
- Adicione o conector no ambiente de execução usando o parâmetro
--jars
, que pode ser usado com a API Dataproc ouspark-submit
.- Se você estiver usando a imagem 1.5 do Dataproc, adicione o seguinte parâmetro:
--jars=gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar
- Se você estiver usando a imagem 1.4 ou anterior do Dataproc, adicione o seguinte parâmetro:
--jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar
- Se você estiver usando a imagem 1.5 do Dataproc, adicione o seguinte parâmetro:
- Inclua o jar no aplicativo Scala ou Java Spark como uma dependência. Consulte Como compilar com o conector.
Se o conector não estiver disponível no ambiente de execução, um ClassNotFoundException
será gerado.
Cálculo de custos
Neste tutorial, há componentes faturáveis do Google Cloud, entre eles:
- Dataproc
- BigQuery
- Cloud Storage
Use a Calculadora de preços para gerar uma estimativa de custo com base no uso previsto. É possível que novos usuários do Cloud Platform tenham direito a uma avaliação gratuita.
Como gravar e ler dados do BigQuery
Este exemplo lê dados do BigQuery em um DataFrame do Spark para executar uma contagem de palavras usando a API de origem de dados padrão.
O conector grava os dados no BigQuery primeiro armazenando-os em buffer em uma tabela temporária do Cloud Storage e, em seguida, copiando todos os dados do BigQuery em uma operação. O conector tenta excluir os arquivos temporários depois que a operação de carregamento do BigQuery for bem-sucedida e mais uma vez quando o aplicativo Spark é encerrado.
Se o job falhar, talvez seja necessário remover manualmente os arquivos temporários restantes do Cloud Storage. Normalmente, você encontra exportações temporárias do BigQuery em gs://[bucket]/.spark-bigquery-[jobid]-[UUID]
.
Como configurar o faturamento
Por padrão. o projeto associado às credenciais ou à conta de serviço é cobrado pelo uso da API. Para faturar um projeto diferente, defina a seguinte configuração: spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>")
.
Ele também pode ser adicionado a uma operação de leitura/gravação, da seguinte maneira: .option("parentProject", "<BILLED-GCP-PROJECT>")
.
Como executar o código
Antes de executar este exemplo, crie um conjunto de dados chamado "wordcount_dataset" ou altere o conjunto de dados de saída no código para um conjunto de dados existente do BigQuery no projeto do Google Cloud.
Use o comando bq para criar o wordcount_dataset
:
bq mk wordcount_dataset
Use o comando gsutil para criar um bucket do Cloud Storage, que será usado para exportar para o BigQuery:
gsutil mb gs://[bucket]
Scala
- Examine o código e substitua o marcador [bucket] pelo bucket do Cloud Storage criado acima.
/* * Remove comment if you are not running in spark-shell. * import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .appName("spark-bigquery-demo") .getOrCreate() */ // Use the Cloud Storage bucket for temporary BigQuery export data used // by the connector. val bucket = "[bucket]" spark.conf.set("temporaryGcsBucket", bucket) // Load data in from BigQuery. See // https://github.com/GoogleCloudDataproc/spark-bigquery-connector/tree/0.17.3#properties // for option information. val wordsDF = spark.read.format("bigquery") .option("table","bigquery-public-data:samples.shakespeare") .load() .cache() wordsDF.createOrReplaceTempView("words") // Perform word count. val wordCountDF = spark.sql( "SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word") wordCountDF.show() wordCountDF.printSchema() // Saving the data to BigQuery. wordCountDF.write.format("bigquery") .option("table","wordcount_dataset.wordcount_output") .save()
- Executar o código no seu cluster
- Incorpore o SSH ao nó mestre do cluster do Dataproc.
- Acesse a página Clusters do Dataproc do seu projeto no Console do Cloud e, em seguida, clique no nome do cluster
- Na página de detalhes do cluster, selecione a guia Instâncias de VM e clique na seleção SSH à direita do nome do nó mestre do cluster.
Uma janela do navegador é aberta no diretório principal no nó mestre.Connected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Acesse a página Clusters do Dataproc do seu projeto no Console do Cloud e, em seguida, clique no nome do cluster
- Crie
wordcount.scala
com o editor de textovi
,vim
ounano
pré-instalado e cole o código da lista de códigos Scalanano wordcount.scala
- Inicie o REPL
spark-shell
.$ spark-shell --jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar ... Using Scala version ... Type in expressions to have them evaluated. Type :help for more information. ... Spark context available as sc. ... SQL context available as sqlContext. scala>
- Execute o wordcount.scala com o comando
:load wordcount.scala
para criar a tabelawordcount_output
do BigQuery. A listagem de saída exibe 20 linhas a partir da saída de wordcount.:load wordcount.scala ... +---------+----------+ | word|word_count| +---------+----------+ | XVII| 2| | spoil| 28| | Drink| 7| |forgetful| 5| | Cannot| 46| | cures| 10| | harder| 13| | tresses| 3| | few| 62| | steel'd| 5| | tripping| 7| | travel| 35| | ransom| 55| | hope| 366| | By| 816| | some| 1169| | those| 508| | still| 567| | art| 893| | feign| 10| +---------+----------+ only showing top 20 rows root |-- word: string (nullable = false) |-- word_count: long (nullable = true)
Para visualizar a tabela de saída, abra a página do BigQuery do projeto, selecione a tabelawordcount_output
e clique em Visualizar.
- Incorpore o SSH ao nó mestre do cluster do Dataproc.
PySpark
- Examine o código e substitua o marcador [bucket] pelo bucket do Cloud Storage criado acima.
#!/usr/bin/python """BigQuery I/O PySpark example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .master('yarn') \ .appName('spark-bigquery-demo') \ .getOrCreate() # Use the Cloud Storage bucket for temporary BigQuery export data used # by the connector. bucket = "[bucket]" spark.conf.set('temporaryGcsBucket', bucket) # Load data from BigQuery. words = spark.read.format('bigquery') \ .option('table', 'bigquery-public-data:samples.shakespeare') \ .load() words.createOrReplaceTempView('words') # Perform word count. word_count = spark.sql( 'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word') word_count.show() word_count.printSchema() # Saving the data to BigQuery word_count.write.format('bigquery') \ .option('table', 'wordcount_dataset.wordcount_output') \ .save()
- Execute o código no cluster
- Incorpore o SSH ao nó mestre do cluster do Dataproc.
- Acesse a página Clusters do Dataproc do seu projeto no Console do Cloud e, em seguida, clique no nome do cluster
- Na página de detalhes do cluster, selecione a guia Instâncias de VM e clique na seleção SSH à direita do nome do nó mestre do cluster.
Uma janela do navegador é aberta no diretório principal no nó mestre.Connected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Acesse a página Clusters do Dataproc do seu projeto no Console do Cloud e, em seguida, clique no nome do cluster
- Crie
wordcount.py
com o editor de textovi
,vim
ounano
pré-instalado e cole o código PySpark da lista de códigos PySparknano wordcount.py
- Execute a contagem de palavras com
spark-submit
para criar a tabelawordcount_output
do BigQuery. A listagem de saída exibe 20 linhas a partir da saída de wordcount.spark-submit --jars gs://spark-lib/bigquery/spark-bigquery-latest.jar wordcount.py ... +---------+----------+ | word|word_count| +---------+----------+ | XVII| 2| | spoil| 28| | Drink| 7| |forgetful| 5| | Cannot| 46| | cures| 10| | harder| 13| | tresses| 3| | few| 62| | steel'd| 5| | tripping| 7| | travel| 35| | ransom| 55| | hope| 366| | By| 816| | some| 1169| | those| 508| | still| 567| | art| 893| | feign| 10| +---------+----------+ only showing top 20 rows root |-- word: string (nullable = false) |-- word_count: long (nullable = true)
Para visualizar a tabela de saída, abra a página do BigQuery do projeto, selecione a tabelawordcount_output
e clique em Visualizar.
- Incorpore o SSH ao nó mestre do cluster do Dataproc.
Para saber mais
- Armazenamento do BigQuery e Spark SQL, Python
- Como criar um arquivo de definição de tabela para uma fonte de dados externa
- Como consultar dados particionados externamente
- Dicas de ajuste de jobs do Spark