Utilizza spark-bigquery-connector con Apache Spark per leggere e scrivere dati da e verso BigQuery.
Questo tutorial illustra un'applicazione PySpark che utilizza spark-bigquery-connector
.
Fornisci il connettore al tuo carico di lavoro
Puoi rendere disponibile il connettore per la tua applicazione in uno dei modi seguenti:
- Utilizza il parametro
jars
per indirizzare a un file jar del connettore quando invii il carico di lavoro batch di Dataproc Serverless per Spark. L'esempio seguente specifica un file jar del connettore (consulta il repository GoogleCloudDataproc/spark-bigquery-connector su GitHub per un elenco dei file jar del connettore disponibili).- Esempio di SDK:
gcloud dataproc batches submit pyspark \ --region=region \ --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-version.jar \ ... other args
- Esempio di SDK:
- Includi il file jar del connettore nella tua applicazione Spark come dipendenza (consulta la sezione Compilazione del connettore)
Calcola i costi
Questo tutorial utilizza componenti fatturabili di Google Cloud, tra cui:
- Dataproc serverless
- BigQuery
- Cloud Storage
Utilizza il Calcolatore prezzi per generare una stima dei costi in base all'utilizzo previsto. I nuovi utenti di Cloud Platform possono essere idonei per una prova gratuita.
I/O BigQuery
Questo esempio legge i dati da BigQuery in un Spark DataFrame per eseguire un conteggio delle parole utilizzando l'API standard di origine dati.
Il connettore scrive l'output del conteggio parole in BigQuery mediante:
Buffering dei dati in file temporanei nel bucket di Cloud Storage
Copia dei dati in un'operazione dal bucket Cloud Storage in BigQuery
Eliminazione dei file temporanei in Cloud Storage al termine dell'operazione di caricamento di BigQuery (i file temporanei vengono eliminati anche dopo la chiusura dell'applicazione Spark). Se l'eliminazione non va a buon fine, dovrai eliminare tutti i file temporanei di Cloud Storage indesiderati, che in genere vengono inseriti in
gs://your-bucket/.spark-bigquery-jobid-UUID
.
Configura fatturazione
Per impostazione predefinita, il progetto associato alle credenziali o all'account di servizio viene fatturato per l'utilizzo dell'API. Per fatturare un progetto diverso, imposta la configurazione seguente: spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>")
.
Può anche essere aggiunta a un'operazione di lettura/scrittura, come segue:
.option("parentProject", "<BILLED-GCP-PROJECT>")
.
Inviare un carico di lavoro batch per il conteggio delle parole di PySpark
- Crea
wordcount_dataset
con lo strumento a riga di comando bq in un terminale locale o in Cloud Shell.bq mk wordcount_dataset
- Crea un bucket Cloud Storage con lo strumento a riga di comando gsutil in un terminale locale o in Cloud Shell.
gsutil mb gs://your-bucket
- Esamina il codice.
#!/usr/bin/python """BigQuery I/O PySpark example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName('spark-bigquery-demo') \ .getOrCreate() # Use the Cloud Storage bucket for temporary BigQuery export data used # by the connector. bucket = "[your-bucket-name]" spark.conf.set('temporaryGcsBucket', bucket) # Load data from BigQuery. words = spark.read.format('bigquery') \ .option('table', 'bigquery-public-data:samples.shakespeare') \ .load() words.createOrReplaceTempView('words') # Perform word count. word_count = spark.sql( 'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word') word_count.show() word_count.printSchema() # Saving the data to BigQuery word_count.write.format('bigquery') \ .option('table', 'wordcount_dataset.wordcount_output') \ .save()
- Crea
wordcount.py
in locale in un editor di testo copiando il codice PySpark dall'elenco del codice PySpark, sostituisci il segnaposto [your-bucket] con il nome del bucket Cloud Storage che hai creato. - Invia il carico di lavoro in batch PySpark:
gcloud dataproc batches submit pyspark wordcount.py \ --region=region \ --deps-bucket=your-bucket
Esempio di output del terminale:... +---------+----------+ | word|word_count| +---------+----------+ | XVII| 2| | spoil| 28| | Drink| 7| |forgetful| 5| | Cannot| 46| | cures| 10| | harder| 13| | tresses| 3| | few| 62| | steel'd| 5| | tripping| 7| | travel| 35| | ransom| 55| | hope| 366| | By| 816| | some| 1169| | those| 508| | still| 567| | art| 893| | feign| 10| +---------+----------+ only showing top 20 rows root |-- word: string (nullable = false) |-- word_count: long (nullable = true)
Per visualizzare l'anteprima della tabella di output nella console, apri la pagina BigQuery del progetto, seleziona la tabellawordcount_output
, quindi fai clic su Anteprima.
Per ulteriori informazioni
- BigQuery Storage e AMP SQL - Python
- Creare un file di definizione della tabella per un'origine dati esterna
- Eseguire query su dati partizionati esternamente