Use o spark-bigquery-connector
com o Apache Spark
para ler e gravar dados no e a partir do BigQuery.
Este tutorial demonstra um aplicativo PySpark que usa a
spark-bigquery-connector
:
Usar o conector do BigQuery com sua carga de trabalho
Consulte Dataproc Serverless for Spark releases para determinar a versão do conector do BigQuery instalada na versão do ambiente de execução da carga de trabalho em lote. Se o conector não estiver listado, consulte a próxima seção para instruções sobre como disponibilizar o conector para aplicativos conteinerizados.
Como usar o conector com o ambiente de execução do Spark versão 2.0
O conector do BigQuery não está instalado na versão 2.0 do ambiente de execução do Spark. Ao usar A versão 2.0 do ambiente de execução do Spark, é possível disponibilizar o conector para o aplicativo de uma das seguintes maneiras:
- Use o parâmetro
jars
para apontar para um arquivo jar de conector ao enviar sua carga de trabalho em lote do Dataproc Serverless para Spark. O exemplo a seguir especifica um arquivo jar de conector. Consulte o repositório GoogleCloudDataproc/spark-bigquery-connector no GitHub para conferir uma lista de arquivos jar de conector disponíveis.- Exemplo da Google Cloud CLI:
gcloud dataproc batches submit pyspark \ --region=region \ --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.13-version.jar \ ... other args
- Exemplo da Google Cloud CLI:
- Inclua o arquivo jar do conector no aplicativo Spark como uma dependência. Consulte Como compilar com o conector.
Calcular custos
Neste tutorial, há componentes faturáveis do Google Cloud, entre eles:
- Dataproc sem servidor
- BigQuery
- Cloud Storage
Use a Calculadora de preços para gerar uma estimativa de custo com base no uso previsto. É possível que novos usuários do Cloud Platform tenham direito a uma avaliação gratuita.
E/S do BigQuery
Este exemplo lê dados do BigQuery em um DataFrame do Spark para executar uma contagem de palavras usando a API de origem de dados padrão.
O conector grava a saída de contagem de palavras no BigQuery da seguinte maneira:
armazenar os dados em buffer em arquivos temporários no bucket do Cloud Storage;
Como copiar os dados em uma operação do bucket do Cloud Storage para o BigQuery
Excluir os arquivos temporários no Cloud Storage após o BigQuery a operação de carregamento do arquivo for concluída (os arquivos temporários também serão excluídos o aplicativo Spark é encerrado). Se a exclusão falhar, será necessário excluir todos os arquivos temporários indesejados do Cloud Storage, que geralmente são colocados em
gs://your-bucket/.spark-bigquery-jobid-UUID
.
Configurar faturamento
Por padrão. o projeto associado às credenciais ou à conta de serviço é cobrado pelo uso da API. Para faturar um projeto diferente, defina a seguinte configuração: spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>")
.
Ele também pode ser adicionado a uma operação de leitura/gravação, da seguinte maneira: .option("parentProject", "<BILLED-GCP-PROJECT>")
.
Enviar uma carga de trabalho de contagem de palavras do PySpark
- Crie o
wordcount_dataset
com a ferramenta de linha de comando bq em um terminal local ou no Cloud Shell.bq mk wordcount_dataset
- Crie um bucket do Cloud Storage com a
CLI do Google Cloud em um terminal local ou no
Cloud Shell.
gcloud storage buckets create gs://your-bucket
- Examine o código.
#!/usr/bin/python """BigQuery I/O PySpark example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName('spark-bigquery-demo') \ .getOrCreate() # Use the Cloud Storage bucket for temporary BigQuery export data used # by the connector. bucket = "[your-bucket-name]" spark.conf.set('temporaryGcsBucket', bucket) # Load data from BigQuery. words = spark.read.format('bigquery') \ .option('table', 'bigquery-public-data:samples.shakespeare') \ .load() words.createOrReplaceTempView('words') # Perform word count. word_count = spark.sql( 'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word') word_count.show() word_count.printSchema() # Saving the data to BigQuery word_count.write.format('bigquery') \ .option('table', 'wordcount_dataset.wordcount_output') \ .save()
- Crie
wordcount.py
localmente em um editor de texto copiando o código PySpark da lista de códigos do PySpark. Substitua o marcador de posição [seu-bucket] pelo nome do bucket do Cloud Storage criado. - Envie a carga de trabalho em lote do PySpark:
Exemplo de saída do terminal:gcloud dataproc batches submit pyspark wordcount.py \ --region=region \ --deps-bucket=your-bucket
... +---------+----------+ | word|word_count| +---------+----------+ | XVII| 2| | spoil| 28| | Drink| 7| |forgetful| 5| | Cannot| 46| | cures| 10| | harder| 13| | tresses| 3| | few| 62| | steel'd| 5| | tripping| 7| | travel| 35| | ransom| 55| | hope| 366| | By| 816| | some| 1169| | those| 508| | still| 567| | art| 893| | feign| 10| +---------+----------+ only showing top 20 rows root |-- word: string (nullable = false) |-- word_count: long (nullable = true)
Para visualizar a tabela de saída no console do Google Cloud, abra o BigQuery selecione a tabelawordcount_output
e clique em Visualização.
Para mais informações
- Armazenamento do BigQuery e Spark SQL, Python
- Como criar um arquivo de definição de tabela para uma fonte de dados externa
- Como consultar dados particionados externamente