Use o spark-bigquery-connector
com o Apache Spark
para ler e gravar dados no e a partir do BigQuery.
Este tutorial demonstra um aplicativo PySpark que usa o
spark-bigquery-connector
.
Usar o conector do BigQuery com sua carga de trabalho
Consulte Dataproc Serverless for Spark releases para determinar a versão do conector do BigQuery instalada na versão do ambiente de execução da carga de trabalho em lote. Se o conector não estiver listado, consulte a próxima seção para instruções sobre como disponibilizar o conector para aplicativos.
Como usar o conector com o ambiente de execução do Spark versão 2.0
O conector do BigQuery não está instalado no ambiente de execução do Spark versão 2.0. Ao usar a versão 2.0 do ambiente de execução do Spark, é possível disponibilizar o conector para o aplicativo de uma das seguintes maneiras:
- Use o parâmetro
jars
para apontar para um arquivo jar de conector ao enviar sua carga de trabalho em lote do Dataproc Serverless para Spark. O exemplo a seguir especifica um arquivo jar de conector. Consulte o repositório GoogleCloudDataproc/spark-bigquery-connector no GitHub para conferir uma lista de arquivos jar de conector disponíveis.- Exemplo da Google Cloud CLI:
gcloud dataproc batches submit pyspark \ --region=region \ --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.13-version.jar \ ... other args
- Exemplo da Google Cloud CLI:
- Inclua o arquivo jar do conector no aplicativo Spark como uma dependência. Consulte Como compilar com o conector.
Calcular custos
Neste tutorial, há componentes faturáveis do Google Cloud, entre eles:
- Dataproc sem servidor
- BigQuery
- Cloud Storage
Use a Calculadora de preços para gerar uma estimativa de custo com base no uso previsto. É possível que novos usuários do Cloud Platform tenham direito a uma avaliação gratuita.
E/S do BigQuery
Este exemplo lê dados do BigQuery em um DataFrame do Spark para executar uma contagem de palavras usando a API de origem de dados padrão.
O conector grava a saída de contagem de palavras no BigQuery da seguinte maneira:
Armazenar os dados em arquivos temporários no bucket do Cloud Storage
Como copiar os dados em uma operação do bucket do Cloud Storage para o BigQuery
Exclusão dos arquivos temporários no Cloud Storage após a conclusão da operação de carregamento do BigQuery. Os arquivos temporários também são excluídos após o encerramento do aplicativo Spark. Se a exclusão falhar, será necessário excluir todos os arquivos temporários indesejados do Cloud Storage, que geralmente são colocados em
gs://YOUR_BUCKET/.spark-bigquery-JOB_ID-UUID
.
Configurar o faturamento
Por padrão, o projeto associado às credenciais ou à conta de serviço é
cobrado pelo uso da API. Para faturar um projeto diferente, defina a seguinte configuração: spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>")
.
Também é possível adicionar a uma operação de leitura ou gravação, da seguinte maneira:
.option("parentProject", "<BILLED-GCP-PROJECT>")
.
Enviar uma carga de trabalho de contagem de palavras do PySpark
Execute uma carga de trabalho em lote do Spark que conta o número de palavras em um conjunto de dados público.
- Abra um terminal local ou o Cloud Shell.
- Crie o
wordcount_dataset
com a ferramenta de linha de comando bq em um terminal local ou no Cloud Shell.bq mk wordcount_dataset
- Crie um bucket do Cloud Storage com a
Google Cloud CLI.
Substituagcloud storage buckets create gs://YOUR_BUCKET
YOUR_BUCKET
pelo nome do bucket do Cloud Storage criado. - Crie o arquivo
wordcount.py
localmente em um editor de texto copiando o código PySpark a seguir.#!/usr/bin/python """BigQuery I/O PySpark example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName('spark-bigquery-demo') \ .getOrCreate() # Use the Cloud Storage bucket for temporary BigQuery export data used # by the connector. bucket = "YOUR_BUCKET" spark.conf.set('temporaryGcsBucket', bucket) # Load data from BigQuery. words = spark.read.format('bigquery') \ .option('table', 'bigquery-public-data:samples.shakespeare') \ .load() words.createOrReplaceTempView('words') # Perform word count. word_count = spark.sql( 'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word') word_count.show() word_count.printSchema() # Saving the data to BigQuery word_count.write.format('bigquery') \ .option('table', 'wordcount_dataset.wordcount_output') \ .save()
- Envie a carga de trabalho em lote do PySpark:
Exemplo de saída do terminal:gcloud dataproc batches submit pyspark wordcount.py \ --region=REGION \ --deps-bucket=YOUR_BUCKET
... +---------+----------+ | word|word_count| +---------+----------+ | XVII| 2| | spoil| 28| | Drink| 7| |forgetful| 5| | Cannot| 46| | cures| 10| | harder| 13| | tresses| 3| | few| 62| | steel'd| 5| | tripping| 7| | travel| 35| | ransom| 55| | hope| 366| | By| 816| | some| 1169| | those| 508| | still| 567| | art| 893| | feign| 10| +---------+----------+ only showing top 20 rows root |-- word: string (nullable = false) |-- word_count: long (nullable = true)
Para visualizar a tabela de saída no console do Google Cloud, abra a página do BigQuery do projeto, selecione a tabelawordcount_output
e clique em Visualizar.
Para mais informações
- Armazenamento do BigQuery e Spark SQL, Python
- Como criar um arquivo de definição de tabela para uma fonte de dados externa
- Usar dados particionados externamente