Le connecteur spark-bigquery-connector est utilisé avec Apache Spark pour lire et écrire des données depuis et vers BigQuery. Ce tutoriel fournit un exemple de code utilisant le connecteur spark-bigquery dans une application Spark. Pour savoir comment créer un cluster, consultez la Guides de démarrage rapide de Dataproc.
Mettre le connecteur à la disposition de votre application
Vous pouvez mettre le connecteur spark-bigquery à la disposition de votre application de l'une des manières suivantes:
Installez le connecteur spark-bigquery-connector dans le répertoire JAR Spark de chaque à l'aide de la commande Action d'initialisation des connecteurs Dataproc lorsque vous créez votre cluster.
Indiquez l'URI du connecteur lorsque vous envoyez votre job:
- Console Google Cloud:utilisez l'élément
Jars files
du job Spark sur la page Dataproc Envoyer une tâche. - gcloud CLI:utilisez l'option
gcloud dataproc jobs submit spark --jars
. - API Dataproc:utilisez le
Champ
SparkJob.jarFileUris
.
- Console Google Cloud:utilisez l'élément
Inclure le fichier JAR dans votre application Scala ou Java Spark en tant que dépendance (voir Compilation avec le connecteur).
Spécifier l'URI du fichier JAR du connecteur
Les versions du connecteur Spark-BigQuery sont répertoriées dans le dépôt GoogleCloudDataproc/spark-bigquery-connector GitHub.
Spécifiez le fichier JAR du connecteur en remplaçant la version de Scala et celle du connecteur
informations dans la chaîne d'URI suivante:
gs://spark-lib/bigquery/spark-bigquery-with-dependencies_SCALA_VERSION-CONNECTOR_VERSION.jar
Utiliser Scala
2.12
avec les versions d'image Dataproc1.5+
gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-CONNECTOR_VERSION.jar
Exemple de gcloud CLI:
gcloud dataproc jobs submit spark \ --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.23.2.jar \ -- job-args
Utilisez Scala
2.11
avec les versions d'image Dataproc1.4
et antérieures:gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-CONNECTOR_VERSION.jar
Exemple de gcloud CLI:
gcloud dataproc jobs submit spark \ --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-0.23.2.jar \ -- job-args
Calculer les coûts
Dans ce document, vous utilisez les composants facturables suivants de Google Cloud :
- Dataproc
- BigQuery
- Cloud Storage
Obtenez une estimation des coûts en fonction de votre utilisation prévue à l'aide du simulateur de coût.
Lire et écrire des données depuis BigQuery
Cet exemple lit les données de BigQuery dans un DataFrame Spark pour effectuer un décompte de mots à l'aide de l'API de source de données standard.
Le connecteur écrit les données dans BigQuery
en mettant d'abord toutes les données en mémoire tampon
dans une table Cloud Storage temporaire. Il
copie toutes les données dans BigQuery en une seule opération. Le connecteur tente de supprimer les fichiers temporaires une fois l'opération de chargement BigQuery terminée, puis effectue une nouvelle tentative lorsque l'application Spark se termine.
Si le job échoue, supprimez les éventuelles autorisations temporaires restantes
Fichiers Cloud Storage. En général, les requêtes BigQuery temporaires
fichiers se trouvent dans gs://[bucket]/.spark-bigquery-[jobid]-[UUID]
.
Configurer la facturation
Par défaut, le projet associé aux identifiants ou au compte de service est
facturés pour l'utilisation de l'API. Pour facturer un autre projet, définissez la configuration suivante : spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>")
.
Le projet à facturer peut également être défini au niveau des opérations de lecture/écriture, comme ceci : .option("parentProject", "<BILLED-GCP-PROJECT>")
.
Exécuter le code
Avant d'exécuter cet exemple, créez un ensemble de données nommé "WordCount_dataset" ou remplacez l'ensemble de données de sortie dans le code par un ensemble de données BigQuery existant dans votre projet Google Cloud.
Utilisez la commande bq pour créer l'élément wordcount_dataset
:
bq mk wordcount_dataset
Utiliser la commande Google Cloud CLI pour créer un bucket Cloud Storage qui servira à exporter vers BigQuery:
gcloud storage buckets create gs://[bucket]
Scala
- Examinez le code et remplacez l'espace réservé [bucket] par
dans le bucket Cloud Storage créé précédemment.
/* * Remove comment if you are not running in spark-shell. * import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .appName("spark-bigquery-demo") .getOrCreate() */ // Use the Cloud Storage bucket for temporary BigQuery export data used // by the connector. val bucket = "[bucket]" spark.conf.set("temporaryGcsBucket", bucket) // Load data in from BigQuery. See // https://github.com/GoogleCloudDataproc/spark-bigquery-connector/tree/0.17.3#properties // for option information. val wordsDF = (spark.read.format("bigquery") .option("table","bigquery-public-data:samples.shakespeare") .load() .cache()) wordsDF.createOrReplaceTempView("words") // Perform word count. val wordCountDF = spark.sql( "SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word") wordCountDF.show() wordCountDF.printSchema() // Saving the data to BigQuery. (wordCountDF.write.format("bigquery") .option("table","wordcount_dataset.wordcount_output") .save())
- Exécutez le code sur votre cluster.
- Utiliser SSH pour se connecter au nœud maître du cluster Dataproc
<ph type="x-smartling-placeholder">
- </ph>
- Accédez au Clusters Dataproc de la console Google Cloud, puis cliquez sur le nom de votre cluster
- Sur la page >Détails du cluster, sélectionnez l'onglet "Instances de VM". Cliquez ensuite sur
SSH
à droite du nom du nœud maître du cluster
Une fenêtre de navigateur s'ouvre dans votre répertoire d'accueil sur le nœud maîtreConnected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Créez
wordcount.scala
avec le fichiervi
préinstallé.vim
ounano
, puis collez-les dans Scala du code source Liste avec le code Scalanano wordcount.scala
- Lancez la REPL
spark-shell
.$ spark-shell --jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar ... Using Scala version ... Type in expressions to have them evaluated. Type :help for more information. ... Spark context available as sc. ... SQL context available as sqlContext. scala>
- Exécutez "WordCount.scala" avec la commande
:load wordcount.scala
pour créer la table BigQuerywordcount_output
. La liste de sortie affiche 10 lignes du résultat de la commande "wordcount".:load wordcount.scala ... +---------+----------+ | word|word_count| +---------+----------+ | XVII| 2| | spoil| 28| | Drink| 7| |forgetful| 5| | Cannot| 46| | cures| 10| | harder| 13| | tresses| 3| | few| 62| | steel'd| 5| | tripping| 7| | travel| 35| | ransom| 55| | hope| 366| | By| 816| | some| 1169| | those| 508| | still| 567| | art| 893| | feign| 10| +---------+----------+ only showing top 20 rows root |-- word: string (nullable = false) |-- word_count: long (nullable = true)
Pour prévisualiser la table de sortie, ouvrez laBigQuery
sélectionnez la tablewordcount_output
, puis cliquez sur Aperçu.
- Utiliser SSH pour se connecter au nœud maître du cluster Dataproc
<ph type="x-smartling-placeholder">
PySpark
- Examinez le code et remplacez l'espace réservé [bucket] par
dans le bucket Cloud Storage créé précédemment.
#!/usr/bin/env python """BigQuery I/O PySpark example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .master('yarn') \ .appName('spark-bigquery-demo') \ .getOrCreate() # Use the Cloud Storage bucket for temporary BigQuery export data used # by the connector. bucket = "[bucket]" spark.conf.set('temporaryGcsBucket', bucket) # Load data from BigQuery. words = spark.read.format('bigquery') \ .option('table', 'bigquery-public-data:samples.shakespeare') \ .load() words.createOrReplaceTempView('words') # Perform word count. word_count = spark.sql( 'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word') word_count.show() word_count.printSchema() # Save the data to BigQuery word_count.write.format('bigquery') \ .option('table', 'wordcount_dataset.wordcount_output') \ .save()
- Exécuter le code sur votre cluster
- Utiliser SSH pour se connecter au nœud maître du cluster Dataproc
<ph type="x-smartling-placeholder">
- </ph>
- Accédez au Clusters Dataproc de la console Google Cloud, puis cliquez sur le nom de votre cluster
- Sur la page Détails du cluster, sélectionnez l'onglet "Instances de VM". Cliquez ensuite sur
SSH
à droite du nom du nœud maître du cluster
Une fenêtre de navigateur s'ouvre dans votre répertoire d'accueil sur le nœud maîtreConnected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Créez
wordcount.py
avec le fichiervi
préinstallé.vim
ounano
, puis collez-y le code PySpark du code source Fiche de code PySparknano wordcount.py
- Exécutez "wordcount" avec
spark-submit
pour créer la table BigQuerywordcount_output
. La liste de sortie affiche 10 lignes du résultat de la commande "wordcount".spark-submit --jars gs://spark-lib/bigquery/spark-bigquery-latest.jar wordcount.py ... +---------+----------+ | word|word_count| +---------+----------+ | XVII| 2| | spoil| 28| | Drink| 7| |forgetful| 5| | Cannot| 46| | cures| 10| | harder| 13| | tresses| 3| | few| 62| | steel'd| 5| | tripping| 7| | travel| 35| | ransom| 55| | hope| 366| | By| 816| | some| 1169| | those| 508| | still| 567| | art| 893| | feign| 10| +---------+----------+ only showing top 20 rows root |-- word: string (nullable = false) |-- word_count: long (nullable = true)
Pour prévisualiser la table de sortie, ouvrez laBigQuery
sélectionnez la tablewordcount_output
, puis cliquez sur Aperçu.
- Utiliser SSH pour se connecter au nœud maître du cluster Dataproc
<ph type="x-smartling-placeholder">
Pour en savoir plus
- Stockage BigQuery et Spark SQL - Python
- Créer un fichier de définition de table pour une source de données externe
- Interroger des données partitionnées en externe
- Conseils pour régler des tâches Spark