Utilizza spark-bigquery-connector
con Apache Spark
per leggere e scrivere dati da e in BigQuery.
Questo tutorial illustra un'applicazione PySpark che utilizza
spark-bigquery-connector
.
Utilizza il connettore BigQuery con il tuo carico di lavoro
Consulta Dataproc Serverless per release di runtime Spark per determinare la versione del connettore BigQuery installato la versione del runtime del carico di lavoro batch. Se il connettore non è nell'elenco, consulta la sezione successiva per istruzioni su come rendere il connettore disponibile diverse applicazioni.
Come utilizzare il connettore con il runtime Spark versione 2.0
Il connettore BigQuery non è installato nel runtime Spark versione 2.0. Quando si utilizza Versione 2.0 del runtime Spark, puoi rendere il connettore disponibile per la tua applicazione in uno dei seguenti modi:
- Utilizza il parametro
jars
per puntare a un file jar del connettore quando devi inviare il tuo carico di lavoro batch Dataproc serverless per Spark L'esempio seguente specifica un file jar del connettore (vedi il GoogleCloudDataproc/spark-bigquery-connector su GitHub per un elenco dei file jar del connettore disponibili).- Esempio di Google Cloud CLI:
gcloud dataproc batches submit pyspark \ --region=region \ --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.13-version.jar \ ... other args
- Esempio di Google Cloud CLI:
- Includi il file jar del connettore nell'applicazione Spark come dipendenza (vedi Compilazione rispetto al connettore)
Calcolare i costi
Questo tutorial utilizza i componenti fatturabili di Google Cloud, tra cui:
- Dataproc Serverless
- BigQuery
- Cloud Storage
Utilizza il Calcolatore prezzi per generare una stima dei costi in base all'utilizzo previsto. I nuovi utenti della piattaforma Cloud potrebbero hai diritto a una prova gratuita.
BigQuery I/O
Questo esempio legge i dati da BigQuery in un DataFrame Spark per eseguire un conteggio delle parole utilizzando l'API dell'origine dati standard.
Il connettore scrive l'output del conteggio parole in BigQuery come segue:
Buffering dei dati in file temporanei nel bucket Cloud Storage
Copiare i dati in un'operazione dal bucket Cloud Storage in BigQuery
Eliminazione dei file temporanei in Cloud Storage dopo BigQuery l'operazione di caricamento è stata completata (i file temporanei vengono eliminati termina l'applicazione Spark). Se l'eliminazione non va a buon fine, dovrai eliminare file temporanei di Cloud Storage indesiderati, che in genere vengono posizionati nel seguente paese:
gs://your-bucket/.spark-bigquery-jobid-UUID
.
Configura la fatturazione
Per impostazione predefinita. il progetto associato alle credenziali o all'account di servizio
addebitati in base all'utilizzo dell'API. Per fatturare un progetto diverso, imposta quanto segue
configurazione: spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>")
.
Può anche essere aggiunto a un'operazione di lettura/scrittura, come segue:
.option("parentProject", "<BILLED-GCP-PROJECT>")
.
Invia un carico di lavoro batch per il conteggio di parole PySpark
- Crea
wordcount_dataset
con lo strumento a riga di comando bq presso un terminal locale Cloud Shell.bq mk wordcount_dataset
- Crea un bucket Cloud Storage con
Google Cloud CLI in un ambiente
nel terminale o all'interno
Cloud Shell.
gcloud storage buckets create gs://your-bucket
- Esamina il codice.
#!/usr/bin/python """BigQuery I/O PySpark example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName('spark-bigquery-demo') \ .getOrCreate() # Use the Cloud Storage bucket for temporary BigQuery export data used # by the connector. bucket = "[your-bucket-name]" spark.conf.set('temporaryGcsBucket', bucket) # Load data from BigQuery. words = spark.read.format('bigquery') \ .option('table', 'bigquery-public-data:samples.shakespeare') \ .load() words.createOrReplaceTempView('words') # Perform word count. word_count = spark.sql( 'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word') word_count.show() word_count.printSchema() # Saving the data to BigQuery word_count.write.format('bigquery') \ .option('table', 'wordcount_dataset.wordcount_output') \ .save()
- Crea
wordcount.py
localmente in un editor di testo copiando il codice PySpark Scheda di codice PySpark, sostituisci placeholder [your-bucket] con il nome del il bucket Cloud Storage che hai creato. - Invia il carico di lavoro batch PySpark:
Esempio di output del terminale:gcloud dataproc batches submit pyspark wordcount.py \ --region=region \ --deps-bucket=your-bucket
... +---------+----------+ | word|word_count| +---------+----------+ | XVII| 2| | spoil| 28| | Drink| 7| |forgetful| 5| | Cannot| 46| | cures| 10| | harder| 13| | tresses| 3| | few| 62| | steel'd| 5| | tripping| 7| | travel| 35| | ransom| 55| | hope| 366| | By| 816| | some| 1169| | those| 508| | still| 567| | art| 893| | feign| 10| +---------+----------+ only showing top 20 rows root |-- word: string (nullable = false) |-- word_count: long (nullable = true)
Per visualizzare l'anteprima della tabella di output nella console Google Cloud, apri BigQuery pagina, seleziona la tabellawordcount_output
e fai clic su Anteprima.
Per ulteriori informazioni
- Archiviazione e archiviazione BigQuery Spark SQL - Python
- Creazione di un file di definizione della tabella per un'origine dati esterna
- Esecuzione di query su dati partizionati esternamente