Der spark-bigquery-connector wird mit Apache Spark verwendet, um Daten aus BigQuery zu lesen und zu schreiben. Diese Anleitung enthält Beispielcode, der den spark-bigquery-connector in einer Spark-Anwendung verwendet. Eine Anleitung zum Erstellen eines Clusters finden Sie in den Dataproc-Kurzanleitungen.
Connector für Ihre Anwendung verfügbar machen
Sie haben folgende Möglichkeiten, den Spark-BigQuery-Connector Ihrer Anwendung zur Verfügung zu stellen:
Installieren Sie den spark-bigquery-Connector im Verzeichnis der Spark-JAR-Dateien jedes Knotens. Verwenden Sie dazu die Initialisierungsaktion für Dataproc-Connectors, wenn Sie Ihren Cluster erstellen.
Geben Sie den Connector-URI beim Senden des Jobs an:
- Google Cloud Console: Verwenden Sie das Element
Jars files
des Spark-Jobs auf der Dataproc-Seite Job senden. - gcloud-Befehlszeile: Verwenden Sie das Flag
gcloud dataproc jobs submit spark --jars
. - Dataproc API:Verwenden Sie das Feld
SparkJob.jarFileUris
.
- Google Cloud Console: Verwenden Sie das Element
Fügen Sie die JAR-Datei in die Scala- oder Java Spark-Anwendung als Abhängigkeit ein (siehe Kompilieren gegen den Connector).
So legen Sie den URI der Connector-JAR-Datei fest
Versionen von Spark-BigQuery-Connectors werden im GitHub-Repository GoogleCloud Dataproc/spark-bigquery-connector aufgeführt.
Geben Sie die Connector-JAR-Datei an, indem Sie die Scala- und Connector-Versionsinformationen im folgenden URI-String ersetzen:
gs://spark-lib/bigquery/spark-bigquery-with-dependencies_SCALA_VERSION-CONNECTOR_VERSION.jar
Scala
2.12
mit Dataproc-Image-Versionen1.5+
verwendengs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-CONNECTOR_VERSION.jar
Beispiel für die gcloud-Befehlszeile:
gcloud dataproc jobs submit spark \ --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.23.2.jar \ -- job-args
Verwenden Sie die Scala
2.11
mit den Dataproc-Image-Versionen1.4
und früher:gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-CONNECTOR_VERSION.jar
Beispiel für die gcloud-Befehlszeile:
gcloud dataproc jobs submit spark \ --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-0.23.2.jar \ -- job-args
Kosten berechnen
In diesem Dokument verwenden Sie die folgenden kostenpflichtigen Komponenten von Google Cloud:
- Dataproc
- BigQuery
- Cloud Storage
Mit dem Preisrechner können Sie eine Kostenschätzung für Ihre voraussichtliche Nutzung vornehmen.
Lese-/Schreibvorgänge in BigQuery
In diesem Beispiel werden Daten aus BigQuery in einen Spark-DataFrame eingelesen und dann mit der Standard-Datenquellen-API einer Wortzählung unterzogen.
Der Connector schreibt die Daten in BigQuery, indem sie alle Daten zuerst in einer temporären Cloud Storage-Tabelle zwischenspeichern. Anschließend werden alle Daten in einem Vorgang in BigQuery kopiert. Sobald der BigQuery-Ladevorgang erfolgreich war, versucht der Connector, die temporären Dateien zu löschen.
Entfernen Sie alle verbleibenden temporären Cloud Storage-Dateien, wenn der Job fehlschlägt. In der Regel befinden sich temporäre BigQuery-Dateien in gs://[bucket]/.spark-bigquery-[jobid]-[UUID]
.
Abrechnung konfigurieren
Standardmäßig wird das Projekt, das den Anmeldedaten oder dem Dienstkonto zugeordnet ist, nach API-Nutzung abgerechnet. Wenn Sie ein anderes Projekt in Rechnung stellen möchten, legen Sie die folgende Konfiguration fest: spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>")
.
Sie kann auch folgendermaßen einem Lese-/Schreibvorgang hinzugefügt werden: .option("parentProject", "<BILLED-GCP-PROJECT>")
.
Code ausführen
Bevor Sie dieses Beispiel ausführen, erstellen Sie ein Dataset mit dem Namen "wordcount_dataset" oder ändern Sie das Ausgabe-Dataset im Code in ein vorhandenes BigQuery-Dataset in Ihrem Google Cloud-Projekt.
Verwenden Sie den bq-Befehl zum Erstellen des wordcount_dataset
:
bq mk wordcount_dataset
Verwenden Sie den gsutil-Befehl zum Erstellen eines Cloud Storage-Buckets, der für den Export nach BigQuery verwendet wird:
gsutil mb gs://[bucket]
Scala
- Prüfen Sie den Code und ersetzen Sie den Platzhalter [bucket] durch den zuvor erstellten Cloud Storage-Bucket.
/* * Remove comment if you are not running in spark-shell. * import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .appName("spark-bigquery-demo") .getOrCreate() */ // Use the Cloud Storage bucket for temporary BigQuery export data used // by the connector. val bucket = "[bucket]" spark.conf.set("temporaryGcsBucket", bucket) // Load data in from BigQuery. See // https://github.com/GoogleCloudDataproc/spark-bigquery-connector/tree/0.17.3#properties // for option information. val wordsDF = (spark.read.format("bigquery") .option("table","bigquery-public-data:samples.shakespeare") .load() .cache()) wordsDF.createOrReplaceTempView("words") // Perform word count. val wordCountDF = spark.sql( "SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word") wordCountDF.show() wordCountDF.printSchema() // Saving the data to BigQuery. (wordCountDF.write.format("bigquery") .option("table","wordcount_dataset.wordcount_output") .save())
- Code auf dem Cluster ausführen
- Mit SSH eine Verbindung zum Dataproc-Cluster-Masterknoten herstellen
- Rufen Sie in der Google Cloud Console die Seite Dataproc Cluster auf und klicken Sie auf den Namen Ihres Clusters
- Wählen Sie auf der Seite >Clusterdetails den Tab „VM-Instanzen“ aus. Klicken Sie dann
SSH
rechts neben dem Namen des Cluster-Master-Knotens
Ein Browserfenster wird auf Ihrem Basisverzeichnis auf dem Master-Knoten geöffnetConnected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Erstelle
wordcount.scala
mit dem vorinstallierten Texteditorvi
,vim
odernano
und füge dann den Scala-Code aus der Scala-Codeliste ein.nano wordcount.scala
- Starten Sie die REPL
spark-shell
.$ spark-shell --jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar ... Using Scala version ... Type in expressions to have them evaluated. Type :help for more information. ... Spark context available as sc. ... SQL context available as sqlContext. scala>
- Führen Sie wordcount.scala mit dem
:load wordcount.scala
-Befehl aus, um die BigQuery-wordcount_output
-Tabelle zu erstellen. Die Ausgabeliste zeigt 20 Zeilen von der Wordcount-Ausgabe an.:load wordcount.scala ... +---------+----------+ | word|word_count| +---------+----------+ | XVII| 2| | spoil| 28| | Drink| 7| |forgetful| 5| | Cannot| 46| | cures| 10| | harder| 13| | tresses| 3| | few| 62| | steel'd| 5| | tripping| 7| | travel| 35| | ransom| 55| | hope| 366| | By| 816| | some| 1169| | those| 508| | still| 567| | art| 893| | feign| 10| +---------+----------+ only showing top 20 rows root |-- word: string (nullable = false) |-- word_count: long (nullable = true)
Wenn Sie die Vorschautabelle aufrufen möchten, öffnen Sie die SeiteBigQuery
, wählen Sie die Tabellewordcount_output
aus und klicken Sie auf Vorschau.
- Mit SSH eine Verbindung zum Dataproc-Cluster-Masterknoten herstellen
PySpark
- Prüfen Sie den Code und ersetzen Sie den Platzhalter [bucket] durch den zuvor erstellten Cloud Storage-Bucket.
#!/usr/bin/env python """BigQuery I/O PySpark example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .master('yarn') \ .appName('spark-bigquery-demo') \ .getOrCreate() # Use the Cloud Storage bucket for temporary BigQuery export data used # by the connector. bucket = "[bucket]" spark.conf.set('temporaryGcsBucket', bucket) # Load data from BigQuery. words = spark.read.format('bigquery') \ .option('table', 'bigquery-public-data:samples.shakespeare') \ .load() words.createOrReplaceTempView('words') # Perform word count. word_count = spark.sql( 'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word') word_count.show() word_count.printSchema() # Saving the data to BigQuery word_count.write.format('bigquery') \ .option('table', 'wordcount_dataset.wordcount_output') \ .save()
- Führen Sie den Code auf Ihrem Cluster aus
- Mit SSH eine Verbindung zum Dataproc-Cluster-Masterknoten herstellen
- Rufen Sie in der Google Cloud Console die Seite Dataproc Cluster auf und klicken Sie auf den Namen Ihres Clusters
- Wählen Sie auf der Seite Clusterdetails den Tab „VM-Instanzen“ aus. Klicken Sie dann
SSH
rechts neben dem Namen des Cluster-Master-Knotens
Ein Browserfenster wird auf Ihrem Basisverzeichnis auf dem Master-Knoten geöffnetConnected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Erstelle
wordcount.py
mit dem vorinstallierten Texteditorvi
,vim
odernano
und füge dann den PySpark-Code aus der PySpark-Codeliste ein.nano wordcount.py
- Führen Sie Wordcount mit
spark-submit
aus, um die BigQuery-wordcount_output
-Tabelle zu erstellen. Die Ausgabeliste zeigt 20 Zeilen von der Wordcount-Ausgabe an.spark-submit --jars gs://spark-lib/bigquery/spark-bigquery-latest.jar wordcount.py ... +---------+----------+ | word|word_count| +---------+----------+ | XVII| 2| | spoil| 28| | Drink| 7| |forgetful| 5| | Cannot| 46| | cures| 10| | harder| 13| | tresses| 3| | few| 62| | steel'd| 5| | tripping| 7| | travel| 35| | ransom| 55| | hope| 366| | By| 816| | some| 1169| | those| 508| | still| 567| | art| 893| | feign| 10| +---------+----------+ only showing top 20 rows root |-- word: string (nullable = false) |-- word_count: long (nullable = true)
Wenn Sie die Vorschautabelle aufrufen möchten, öffnen Sie die SeiteBigQuery
, wählen Sie die Tabellewordcount_output
aus und klicken Sie auf Vorschau.
- Mit SSH eine Verbindung zum Dataproc-Cluster-Masterknoten herstellen
Weitere Informationen
- BigQuery-Storage &Spark SQL – Python
- Tabellendefinitionsdatei für eine externe Datenquelle erstellen
- Extern partitionierte Daten abfragen
- Tipps zur Abstimmung von Spark-Jobs