Il connettore spark-bigquery-connector viene utilizzato con Apache Spark per leggere e scrivere dati da e in BigQuery. Questo tutorial fornisce un codice di esempio che utilizza il connettore spark-bigquery-connector in un'applicazione Spark. Per istruzioni sulla creazione di un cluster, consulta Guide rapide di Dataproc.
Rendi il connettore disponibile per la tua applicazione
Puoi rendere disponibile il connettore spark-bigquery per la tua applicazione in uno dei seguenti modi:
Installa spark-bigquery-connector nella directory dei jar di Spark di ogni nodo utilizzando l'azione di inizializzazione dei connettori Dataproc quando crei il cluster.
Fornisci l'URI del connettore quando invii il job:
- Console Google Cloud: utilizza l'elemento Job Spark
Jars files
nella pagina Invia un job di Dataproc. - gcloud CLI: utilizza il flag
gcloud dataproc jobs submit spark --jars
. - API Dataproc: utilizza il metodo
Campo
SparkJob.jarFileUris
.
- Console Google Cloud: utilizza l'elemento Job Spark
Includi il file JAR nell'applicazione Scala o Java Spark come dipendenza (vedi Compilazione in base al connettore).
Come specificare l'URI del jar del connettore
Le versioni del connettore Spark-BigQuery sono elencate nel file GitHub Repository GoogleCloudDataproc/spark-bigquery-connector.
Specifica il file jar del connettore sostituendo le informazioni sulla versione di Scala e del connettore nella seguente stringa URI:
gs://spark-lib/bigquery/spark-bigquery-with-dependencies_SCALA_VERSION-CONNECTOR_VERSION.jar
Utilizza Scala
2.12
con le versioni immagine di Dataproc1.5+
gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-CONNECTOR_VERSION.jar
Esempio della gcloud CLI:
gcloud dataproc jobs submit spark \ --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.23.2.jar \ -- job-args
Utilizza Scala
2.11
con le versioni immagine di Dataproc1.4
e precedenti:gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-CONNECTOR_VERSION.jar
Esempio di interfaccia a riga di comando gcloud:
gcloud dataproc jobs submit spark \ --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-0.23.2.jar \ -- job-args
Calcolo dei costi
In questo documento utilizzi i seguenti componenti fatturabili di Google Cloud:
- Dataproc
- BigQuery
- Cloud Storage
Per generare una stima dei costi basata sull'utilizzo previsto,
utilizza il Calcolatore prezzi.
Lettura e scrittura di dati da BigQuery
Questo esempio legge i dati da BigQuery in un DataFrame Spark per eseguire il conteggio delle parole utilizzando l'API dell'origine dati standard.
Il connettore scrive i dati in BigQuery eseguendo prima il buffering di tutti i dati in una tabella temporanea di Cloud Storage. Quindi, copia tutti i dati in BigQuery in un'unica operazione. La
tenta di eliminare i file temporanei una volta che
dell'operazione di caricamento è riuscita e di nuovo al termine dell'applicazione Spark.
Se il job non riesce, rimuovi eventuali istanze temporanee rimanenti
di archiviazione dei file di Cloud Storage. In genere, BigQuery temporaneo
file si trovano in gs://[bucket]/.spark-bigquery-[jobid]-[UUID]
.
Configurazione della fatturazione
Per impostazione predefinita, al progetto associato alle credenziali o all'account di servizio viene addebitato l'utilizzo dell'API. Per fatturare un progetto diverso, imposta quanto segue
configurazione: spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>")
.
Può anche essere aggiunto a un'operazione di lettura/scrittura, come segue:
.option("parentProject", "<BILLED-GCP-PROJECT>")
.
Esecuzione del codice
Prima di eseguire questo esempio, crea un set di dati denominato "wordcount_dataset" o modifica il set di dati di output nel codice in un set di dati BigQuery esistente nel tuo progetto Google Cloud.
Utilizza la
il comando bq per creare
wordcount_dataset
:
bq mk wordcount_dataset
Utilizza il comando Google Cloud CLI per creare un bucket Cloud Storage, che verrà utilizzato per l'esportazione in BigQuery:
gcloud storage buckets create gs://[bucket]
Scala
- Esamina il codice e sostituisci il segnaposto [bucket] con il bucket Cloud Storage che hai creato in precedenza.
/* * Remove comment if you are not running in spark-shell. * import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .appName("spark-bigquery-demo") .getOrCreate() */ // Use the Cloud Storage bucket for temporary BigQuery export data used // by the connector. val bucket = "[bucket]" spark.conf.set("temporaryGcsBucket", bucket) // Load data in from BigQuery. See // https://github.com/GoogleCloudDataproc/spark-bigquery-connector/tree/0.17.3#properties // for option information. val wordsDF = (spark.read.format("bigquery") .option("table","bigquery-public-data:samples.shakespeare") .load() .cache()) wordsDF.createOrReplaceTempView("words") // Perform word count. val wordCountDF = spark.sql( "SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word") wordCountDF.show() wordCountDF.printSchema() // Saving the data to BigQuery. (wordCountDF.write.format("bigquery") .option("table","wordcount_dataset.wordcount_output") .save())
- Esegui il codice sul tuo cluster
- Utilizza SSH per connetterti al nodo master del cluster Dataproc
- Vai alla pagina Cluster Dataproc nella console Google Cloud, quindi fai clic sul nome del cluster
- Nella pagina >Dettagli cluster, seleziona la scheda Istanze VM. Quindi, fai clic su
SSH
a destra del nome del nodo master del cluster
Si apre una finestra del browser nella home directory sul nodo masterConnected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Crea
wordcount.scala
con l'elementovi
preinstallato,vim
onano
, quindi incolla il testo in Scala il codice Elenco di codice Scalanano wordcount.scala
- Avvia la REPL di
spark-shell
.$ spark-shell --jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar ... Using Scala version ... Type in expressions to have them evaluated. Type :help for more information. ... Spark context available as sc. ... SQL context available as sqlContext. scala>
- Esegui wordcount.scala con il comando
:load wordcount.scala
per creare la tabellawordcount_output
di BigQuery. L'output l'elenco mostra 20 righe dall'output del conteggio parole.:load wordcount.scala ... +---------+----------+ | word|word_count| +---------+----------+ | XVII| 2| | spoil| 28| | Drink| 7| |forgetful| 5| | Cannot| 46| | cures| 10| | harder| 13| | tresses| 3| | few| 62| | steel'd| 5| | tripping| 7| | travel| 35| | ransom| 55| | hope| 366| | By| 816| | some| 1169| | those| 508| | still| 567| | art| 893| | feign| 10| +---------+----------+ only showing top 20 rows root |-- word: string (nullable = false) |-- word_count: long (nullable = true)
Per visualizzare l'anteprima della tabella di output, apri laBigQuery
seleziona la tabellawordcount_output
e poi fai clic su Anteprima.
- Utilizza SSH per connetterti al nodo master del cluster Dataproc
PySpark
- Esamina il codice e sostituisci il segnaposto [bucket] con
nel bucket Cloud Storage che hai creato in precedenza.
#!/usr/bin/env python """BigQuery I/O PySpark example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .master('yarn') \ .appName('spark-bigquery-demo') \ .getOrCreate() # Use the Cloud Storage bucket for temporary BigQuery export data used # by the connector. bucket = "[bucket]" spark.conf.set('temporaryGcsBucket', bucket) # Load data from BigQuery. words = spark.read.format('bigquery') \ .option('table', 'bigquery-public-data:samples.shakespeare') \ .load() words.createOrReplaceTempView('words') # Perform word count. word_count = spark.sql( 'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word') word_count.show() word_count.printSchema() # Save the data to BigQuery word_count.write.format('bigquery') \ .option('table', 'wordcount_dataset.wordcount_output') \ .save()
- Esegui il codice sul tuo cluster
- Utilizza SSH per connetterti al nodo master del cluster Dataproc
- Vai alla pagina Cluster Dataproc nella console Google Cloud, quindi fai clic sul nome del cluster
- Nella pagina Dettagli cluster, seleziona la scheda Istanze VM. Quindi, fai clic su
SSH
a destra del nome del nodo principale del cluster
Si apre una finestra del browser nella home directory sul nodo principaleConnected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Crea
wordcount.py
con l'elementovi
preinstallato, Editor di testovim
onano
, poi incollalo in PySpark il codice Scheda di codice PySparknano wordcount.py
- Esegui conteggio parole con
spark-submit
per creare BigQuery Tabellawordcount_output
. L'elenco di output mostra 20 righe dall'output del conteggio parole.spark-submit --jars gs://spark-lib/bigquery/spark-bigquery-latest.jar wordcount.py ... +---------+----------+ | word|word_count| +---------+----------+ | XVII| 2| | spoil| 28| | Drink| 7| |forgetful| 5| | Cannot| 46| | cures| 10| | harder| 13| | tresses| 3| | few| 62| | steel'd| 5| | tripping| 7| | travel| 35| | ransom| 55| | hope| 366| | By| 816| | some| 1169| | those| 508| | still| 567| | art| 893| | feign| 10| +---------+----------+ only showing top 20 rows root |-- word: string (nullable = false) |-- word_count: long (nullable = true)
Per visualizzare l'anteprima della tabella di output, apri la paginaBigQuery
, seleziona la tabellawordcount_output
e poi fai clic su Anteprima.
- Utilizza SSH per connetterti al nodo master del cluster Dataproc
Per ulteriori informazioni
- BigQuery Storage e Spark SQL - Python
- Creazione di un file di definizione della tabella per un'origine dati esterna
- Esecuzione di query sui dati partizionati esternamente
- Suggerimenti per l'ottimizzazione dei job Spark