O spark-bigquery-connector é usado com o Apache Spark para ler e gravar dados do e para o BigQuery. Neste tutorial, fornecemos um código de exemplo que usa o conector Spark-bigquery em um aplicativo Spark. Para instruções sobre como criar um cluster, consulte os Guias de início rápido do Dataproc.
Disponibilizar o conector para o app
É possível disponibilizar o spark-bigquery-connector para o aplicativo de uma das seguintes maneiras:
Instale o spark-bigquery-connector no diretório jars do Spark de cada nó usando a ação de inicialização dos conectores do Dataproc ao criar o cluster.
Informe o URI do conector ao enviar o job:
- Console do Google Cloud:use o item
Jars files
do job do Spark na página Enviar um job do Dataproc. - CLI gcloud:use a flag
gcloud dataproc jobs submit spark --jars
. - API Dataproc:use o campo
SparkJob.jarFileUris
.
- Console do Google Cloud:use o item
Inclua o jar no aplicativo Scala ou Java Spark como uma dependência. Consulte Como compilar com o conector.
Como especificar o URI do jar do conector
As versões do conector Spark-BigQuery estão listadas no repositório do GitHub GoogleCloudDataproc/spark-bigquery-connector.
Especifique o jar do conector substituindo as informações da versão do Scala e do conector
na seguinte string de URI:
gs://spark-lib/bigquery/spark-bigquery-with-dependencies_SCALA_VERSION-CONNECTOR_VERSION.jar
Usar o Scala
2.12
com as versões de imagem do Dataproc1.5+
gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-CONNECTOR_VERSION.jar
Exemplo da CLI gcloud:
gcloud dataproc jobs submit spark \ --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.23.2.jar \ -- job-args
Use o Scala
2.11
com as versões de imagem do Dataproc1.4
e anteriores:gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-CONNECTOR_VERSION.jar
Exemplo da CLI gcloud:
gcloud dataproc jobs submit spark \ --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-0.23.2.jar \ -- job-args
Cálculo de custos
Neste documento, você usará os seguintes componentes faturáveis do Google Cloud:
- Dataproc
- BigQuery
- Cloud Storage
Para gerar uma estimativa de custo baseada na projeção de uso deste tutorial, use a calculadora de preços.
Como gravar e ler dados do BigQuery
Este exemplo lê dados do BigQuery em um DataFrame do Spark para executar uma contagem de palavras usando a API de origem de dados padrão.
O conector grava os dados no BigQuery primeiro armazenando-os em buffer em uma tabela temporária do Cloud Storage. Em seguida, ele copia todos os dados do BigQuery em uma única operação. O conector tenta excluir os arquivos temporários depois que a operação de carregamento do BigQuery for bem-sucedida e mais uma vez quando o aplicativo Spark é encerrado.
Se o job falhar, remova todos os arquivos temporários
do Cloud Storage. Normalmente, os arquivos temporários do BigQuery estão localizados em gs://[bucket]/.spark-bigquery-[jobid]-[UUID]
.
Como configurar o faturamento
Por padrão, o projeto associado às credenciais ou à conta de serviço é
cobrado pelo uso da API. Para faturar um projeto diferente, defina a seguinte configuração: spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>")
.
Ele também pode ser adicionado a uma operação de leitura/gravação, da seguinte maneira: .option("parentProject", "<BILLED-GCP-PROJECT>")
.
Como executar o código
Antes de executar este exemplo, crie um conjunto de dados chamado "wordcount_dataset" ou altere o conjunto de dados de saída no código para um conjunto de dados existente do BigQuery no projeto do Google Cloud.
Use o comando bq para criar o wordcount_dataset
:
bq mk wordcount_dataset
Use o comando Google Cloud CLI para criar um bucket do Cloud Storage, que será usado para exportar para o BigQuery:
gcloud storage buckets create gs://[bucket]
Scala
- Examine o código e substitua o marcador [bucket] pelo bucket do Cloud Storage criado anteriormente.
/* * Remove comment if you are not running in spark-shell. * import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .appName("spark-bigquery-demo") .getOrCreate() */ // Use the Cloud Storage bucket for temporary BigQuery export data used // by the connector. val bucket = "[bucket]" spark.conf.set("temporaryGcsBucket", bucket) // Load data in from BigQuery. See // https://github.com/GoogleCloudDataproc/spark-bigquery-connector/tree/0.17.3#properties // for option information. val wordsDF = (spark.read.format("bigquery") .option("table","bigquery-public-data:samples.shakespeare") .load() .cache()) wordsDF.createOrReplaceTempView("words") // Perform word count. val wordCountDF = spark.sql( "SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word") wordCountDF.show() wordCountDF.printSchema() // Saving the data to BigQuery. (wordCountDF.write.format("bigquery") .option("table","wordcount_dataset.wordcount_output") .save())
- Executar o código no seu cluster
- Use o SSH para se conectar ao nó mestre do cluster do Dataproc.
- Acesse a página Clusters do Dataproc no console do Google Cloud e clique no nome do cluster.
- Na página >Detalhes do cluster, selecione a guia "Instâncias de VM". Em seguida, clique em
SSH
à direita do nome do nó mestre do cluster
Uma janela do navegador é aberta no diretório principal do nó mestre.Connected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Crie
wordcount.scala
com o editor de textovi
,vim
ounano
pré-instalado e cole o código da lista de códigos Scalanano wordcount.scala
- Inicie o REPL
spark-shell
.$ spark-shell --jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar ... Using Scala version ... Type in expressions to have them evaluated. Type :help for more information. ... Spark context available as sc. ... SQL context available as sqlContext. scala>
- Execute o wordcount.scala com o comando
:load wordcount.scala
para criar a tabelawordcount_output
do BigQuery. A listagem de saída exibe 20 linhas a partir da saída de wordcount.:load wordcount.scala ... +---------+----------+ | word|word_count| +---------+----------+ | XVII| 2| | spoil| 28| | Drink| 7| |forgetful| 5| | Cannot| 46| | cures| 10| | harder| 13| | tresses| 3| | few| 62| | steel'd| 5| | tripping| 7| | travel| 35| | ransom| 55| | hope| 366| | By| 816| | some| 1169| | those| 508| | still| 567| | art| 893| | feign| 10| +---------+----------+ only showing top 20 rows root |-- word: string (nullable = false) |-- word_count: long (nullable = true)
Para visualizar a tabela de saída, abra a páginaBigQuery
, selecione a tabelawordcount_output
e clique em Visualizar.
- Use o SSH para se conectar ao nó mestre do cluster do Dataproc.
PySpark
- Examine o código e substitua o marcador [bucket] pelo bucket do Cloud Storage criado anteriormente.
#!/usr/bin/env python """BigQuery I/O PySpark example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .master('yarn') \ .appName('spark-bigquery-demo') \ .getOrCreate() # Use the Cloud Storage bucket for temporary BigQuery export data used # by the connector. bucket = "[bucket]" spark.conf.set('temporaryGcsBucket', bucket) # Load data from BigQuery. words = spark.read.format('bigquery') \ .option('table', 'bigquery-public-data:samples.shakespeare') \ .load() words.createOrReplaceTempView('words') # Perform word count. word_count = spark.sql( 'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word') word_count.show() word_count.printSchema() # Save the data to BigQuery word_count.write.format('bigquery') \ .option('table', 'wordcount_dataset.wordcount_output') \ .save()
- Execute o código no cluster
- Use o SSH para se conectar ao nó mestre do cluster do Dataproc.
- Acesse a página Clusters do Dataproc no console do Google Cloud e clique no nome do cluster.
- Na página Detalhes do cluster, selecione a guia "Instâncias de VM". Em seguida, clique em
SSH
à direita do nome do nó mestre do cluster
Uma janela do navegador é aberta no diretório principal do nó mestre.Connected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Crie
wordcount.py
com o editor de textovi
,vim
ounano
pré-instalado e cole o código PySpark da lista de códigos PySparknano wordcount.py
- Execute a contagem de palavras com
spark-submit
para criar a tabelawordcount_output
do BigQuery. A listagem de saída exibe 20 linhas a partir da saída de wordcount.spark-submit --jars gs://spark-lib/bigquery/spark-bigquery-latest.jar wordcount.py ... +---------+----------+ | word|word_count| +---------+----------+ | XVII| 2| | spoil| 28| | Drink| 7| |forgetful| 5| | Cannot| 46| | cures| 10| | harder| 13| | tresses| 3| | few| 62| | steel'd| 5| | tripping| 7| | travel| 35| | ransom| 55| | hope| 366| | By| 816| | some| 1169| | those| 508| | still| 567| | art| 893| | feign| 10| +---------+----------+ only showing top 20 rows root |-- word: string (nullable = false) |-- word_count: long (nullable = true)
Para visualizar a tabela de saída, abra a páginaBigQuery
, selecione a tabelawordcount_output
e clique em Visualizar.
- Use o SSH para se conectar ao nó mestre do cluster do Dataproc.
Para mais informações
- Armazenamento do BigQuery e Spark SQL, Python
- Como criar um arquivo de definição de tabela para uma fonte de dados externa
- Como consultar dados particionados externamente
- Dicas de ajuste de jobs do Spark