Utilisez le connecteur spark-bigquery-connector
avec Apache Spark
pour lire et écrire des données depuis et vers BigQuery.
Ce tutoriel présente une application PySpark qui utilise le
spark-bigquery-connector
Utiliser le connecteur BigQuery avec votre charge de travail
Consultez la page Dataproc sans serveur pour les versions de l'environnement d'exécution Spark. pour déterminer la version du connecteur BigQuery qui est installée la version d'exécution de votre charge de travail par lot. Si le connecteur n'est pas répertorié, consultez la section suivante pour savoir comment mettre le connecteur à la disposition de applications.
Utiliser le connecteur avec la version d'exécution 2.0 de Spark
Le connecteur BigQuery n'est pas installé dans la version 2.0 de l'environnement d'exécution Spark. Lorsque vous utilisez Dans la version d'exécution 2.0 de Spark, vous pouvez mettre le connecteur à la disposition de votre application de l'une des manières suivantes:
- Utilisez le paramètre
jars
pour pointer vers un fichier JAR de connecteur lorsque vous envoyez votre charge de travail par lot Dataproc sans serveur pour Spark L'exemple suivant spécifie un fichier JAR de connecteur (voir les GoogleCloudDataproc/spark-bigquery-connector sur GitHub pour obtenir la liste des fichiers JAR du connecteur disponibles).- Exemple de Google Cloud CLI:
gcloud dataproc batches submit pyspark \ --region=region \ --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.13-version.jar \ ... other args
- Exemple de Google Cloud CLI:
- Inclure le fichier JAR du connecteur dans votre application Spark en tant que dépendance (voir Compilation avec le connecteur)
Calculer les coûts
Ce tutoriel utilise des composants facturables de Google Cloud, dont :
- Dataproc sans serveur
- BigQuery
- Cloud Storage
Utilisez le Simulateur de coût pour générer une estimation des coûts en fonction de votre utilisation prévue. Les nouveaux utilisateurs de Cloud Platform peuvent bénéficier d'un essai gratuit.
E/S BigQuery
Cet exemple lit les données de BigQuery dans un DataFrame Spark pour effectuer un décompte de mots à l'aide de l'API de source de données standard.
Le connecteur écrit la sortie de décompte de mots dans BigQuery de l'une des manières suivantes:
Mettre en mémoire tampon les données dans des fichiers temporaires dans votre bucket Cloud Storage
Copier les données de votre bucket Cloud Storage en une seule opération BigQuery
Supprimer les fichiers temporaires dans Cloud Storage après BigQuery l'opération de chargement est terminée (les fichiers temporaires sont également supprimés après l'application Spark s'arrête). Si la suppression échoue, vous devez supprimer tous les fichiers Cloud Storage temporaires indésirables, qui sont généralement placés dans
gs://your-bucket/.spark-bigquery-jobid-UUID
.
Configurer la facturation
Par défaut. le projet qui est facturé pour l'utilisation de l'API est le projet associé aux identifiants ou au compte de service. Pour facturer un autre projet, définissez la configuration suivante : spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>")
.
Le projet à facturer peut également être défini au niveau des opérations de lecture/écriture, comme ceci : .option("parentProject", "<BILLED-GCP-PROJECT>")
.
Envoyer une charge de travail par lot PySpark pour le nombre de mots
- Créer le
wordcount_dataset
avec l'outil de ligne de commande bq dans un terminal local ou dans Cloud Shell :bq mk wordcount_dataset
- Créez un bucket Cloud Storage avec
Google Cloud CLI dans un environnement
dans un terminal ou dans
Cloud Shell :
gcloud storage buckets create gs://your-bucket
- Examinez le code.
#!/usr/bin/python """BigQuery I/O PySpark example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName('spark-bigquery-demo') \ .getOrCreate() # Use the Cloud Storage bucket for temporary BigQuery export data used # by the connector. bucket = "[your-bucket-name]" spark.conf.set('temporaryGcsBucket', bucket) # Load data from BigQuery. words = spark.read.format('bigquery') \ .option('table', 'bigquery-public-data:samples.shakespeare') \ .load() words.createOrReplaceTempView('words') # Perform word count. word_count = spark.sql( 'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word') word_count.show() word_count.printSchema() # Saving the data to BigQuery word_count.write.format('bigquery') \ .option('table', 'wordcount_dataset.wordcount_output') \ .save()
- Créer
wordcount.py
localement dans un éditeur de texte en le copiant le code PySpark Liste de code PySpark, remplacez la espace réservé [your-bucket] avec le nom du bucket Cloud Storage que vous avez créé. - Envoyez la charge de travail par lot PySpark:
gcloud dataproc batches submit pyspark wordcount.py \ --region=region \ --deps-bucket=your-bucket
Exemple de sortie du terminal:... +---------+----------+ | word|word_count| +---------+----------+ | XVII| 2| | spoil| 28| | Drink| 7| |forgetful| 5| | Cannot| 46| | cures| 10| | harder| 13| | tresses| 3| | few| 62| | steel'd| 5| | tripping| 7| | travel| 35| | ransom| 55| | hope| 366| | By| 816| | some| 1169| | those| 508| | still| 567| | art| 893| | feign| 10| +---------+----------+ only showing top 20 rows root |-- word: string (nullable = false) |-- word_count: long (nullable = true)
Pour prévisualiser la table de sortie dans la console Google Cloud, ouvrez le fichier BigQuery sélectionnez le tableauwordcount_output
, puis cliquez sur Aperçu :
Pour en savoir plus
- Stockage BigQuery et Spark SQL - Python
- Créer un fichier de définition de table pour une source de données externe
- Interroger des données partitionnées en externe