Der spark-bigquery-connector wird mit Apache Spark verwendet, um Daten aus BigQuery zu lesen und zu schreiben. Diese Anleitung enthält Beispielcode, der den spark-bigquery-connector in einer Spark-Anwendung verwendet. Eine Anleitung zum Erstellen eines Clusters finden Sie in den Dataproc-Kurzanleitungen.
Connector für Ihre Anwendung verfügbar machen
Sie haben folgende Möglichkeiten, den spark-bigquery-Connector für Ihre Anwendung verfügbar zu machen:
Installieren Sie den Spark-bigquery-Connector im Spark-JAR-Verzeichnis jedes Knotens. Verwenden Sie dazu beim Erstellen des Clusters die Initialisierungsaktion für Dataproc-Connectors.
Geben Sie den Connector-URI an, wenn Sie den Job einreichen:
- Google Cloud Console:Verwenden Sie den Spark-Job
Jars files
auf der Dataproc-Seite Job senden. - gcloud CLI:Verwenden Sie das Flag
gcloud dataproc jobs submit spark --jars
. - Dataproc API:Verwenden Sie das Feld
SparkJob.jarFileUris
.
- Google Cloud Console:Verwenden Sie den Spark-Job
Schließen Sie die JAR-Datei in Ihre Scala- oder Java Spark-Anwendung als Abhängigkeit ein (siehe Für den Connector kompilieren).
JAR-URI des Connectors angeben
Spark-BigQuery-Connector-Versionen sind im GitHub-Repository GoogleCloudDataproc/spark-bigquery-connector aufgeführt.
Geben Sie die JAR-Datei des Connectors an, indem Sie die Informationen zur Scala- und Connector-Version im folgenden URI-String ersetzen:
gs://spark-lib/bigquery/spark-bigquery-with-dependencies_SCALA_VERSION-CONNECTOR_VERSION.jar
Scala
2.12
mit Dataproc-Image-Versionen1.5+
verwendengs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-CONNECTOR_VERSION.jar
Beispiel für die gcloud CLI:
gcloud dataproc jobs submit spark \ --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.23.2.jar \ -- job-args
Scala
2.11
mit Dataproc-Image-Versionen1.4
und niedriger verwenden:gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-CONNECTOR_VERSION.jar
Beispiel für die gcloud CLI:
gcloud dataproc jobs submit spark \ --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-0.23.2.jar \ -- job-args
Kosten berechnen
In diesem Dokument verwenden Sie die folgenden kostenpflichtigen Komponenten von Google Cloud:
- Dataproc
- BigQuery
- Cloud Storage
Mit dem Preisrechner können Sie eine Kostenschätzung für Ihre voraussichtliche Nutzung vornehmen.
Lese-/Schreibvorgänge in BigQuery
In diesem Beispiel werden Daten aus BigQuery in einen Spark-DataFrame eingelesen und dann mit der Standard-Datenquellen-API einer Wortzählung unterzogen.
Der Connector schreibt die Daten zuerst in BigQuery, indem er zuerst alle Daten in eine temporäre Tabelle von Cloud Storage zwischenspeichert. Anschließend werden alle Daten in einem Vorgang in BigQuery kopiert. Sobald der BigQuery-Ladevorgang erfolgreich war, versucht der Connector, die temporären Dateien zu löschen.
Wenn der Job fehlschlägt, entfernen Sie alle verbleibenden temporären Cloud Storage-Dateien. In der Regel befinden sich temporäre BigQuery-Dateien in gs://[bucket]/.spark-bigquery-[jobid]-[UUID]
.
Abrechnung konfigurieren
Standardmäßig wird das Projekt, das mit den Anmeldeinformationen oder dem Servicekonto verbunden ist, für die API-Nutzung abgerechnet. Wenn Sie ein anderes Projekt in Rechnung stellen möchten, legen Sie die folgende Konfiguration fest: spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>")
.
Sie kann auch folgendermaßen einem Lese-/Schreibvorgang hinzugefügt werden: .option("parentProject", "<BILLED-GCP-PROJECT>")
.
Code ausführen
Bevor Sie dieses Beispiel ausführen, erstellen Sie ein Dataset mit dem Namen „wordcount_dataset“ oder ändern Sie das Ausgabe-Dataset im Code in ein vorhandenes BigQuery-Dataset in IhremGoogle Cloud -Projekt.
Verwenden Sie den bq-Befehl zum Erstellen des wordcount_dataset
:
bq mk wordcount_dataset
Verwenden Sie den Befehl der Google Cloud CLI, um einen Cloud Storage-Bucket zu erstellen, der für den Export nach BigQuery verwendet wird:
gcloud storage buckets create gs://[bucket]
Scala
- Untersuchen Sie den Code und ersetzen Sie den Platzhalter [bucket] durch den zuvor erstellten Cloud Storage-Bucket.
/* * Remove comment if you are not running in spark-shell. * import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .appName("spark-bigquery-demo") .getOrCreate() */ // Use the Cloud Storage bucket for temporary BigQuery export data used // by the connector. val bucket = "[bucket]" spark.conf.set("temporaryGcsBucket", bucket) // Load data in from BigQuery. See // https://github.com/GoogleCloudDataproc/spark-bigquery-connector/tree/0.17.3#properties // for option information. val wordsDF = (spark.read.format("bigquery") .option("table","bigquery-public-data:samples.shakespeare") .load() .cache()) wordsDF.createOrReplaceTempView("words") // Perform word count. val wordCountDF = spark.sql( "SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word") wordCountDF.show() wordCountDF.printSchema() // Saving the data to BigQuery. (wordCountDF.write.format("bigquery") .option("table","wordcount_dataset.wordcount_output") .save())
- Code in einem Cluster ausführen
- SSH-Verbindung zum Masterknoten des Dataproc-Clusters herstellen
- Rufen Sie in der Google Cloud -Konsole die Seite Dataproc Cluster auf und klicken Sie auf den Namen Ihres Clusters.
- Wählen Sie auf der Seite > Clusterdetails den Tab „VM-Instanzen“ aus. Klicken Sie dann rechts neben dem Namen des Cluster-Masterknotens auf
SSH
.
Ein Browserfenster wird in Ihrem Basisverzeichnis auf dem Masterknoten geöffnet.Connected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Erstellen Sie
wordcount.scala
mit dem vorinstallierten Texteditorvi
,vim
odernano
und fügen Sie dann den Scala-Code aus der Scala-Code-Liste ein.nano wordcount.scala
- Starten Sie die
spark-shell
-REPL.$ spark-shell --jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar ... Using Scala version ... Type in expressions to have them evaluated. Type :help for more information. ... Spark context available as sc. ... SQL context available as sqlContext. scala>
- Führen Sie wordcount.scala mit dem
:load wordcount.scala
-Befehl aus, um die BigQuery-wordcount_output
-Tabelle zu erstellen. Die Ausgabeliste zeigt 20 Zeilen von der Wordcount-Ausgabe an.:load wordcount.scala ... +---------+----------+ | word|word_count| +---------+----------+ | XVII| 2| | spoil| 28| | Drink| 7| |forgetful| 5| | Cannot| 46| | cures| 10| | harder| 13| | tresses| 3| | few| 62| | steel'd| 5| | tripping| 7| | travel| 35| | ransom| 55| | hope| 366| | By| 816| | some| 1169| | those| 508| | still| 567| | art| 893| | feign| 10| +---------+----------+ only showing top 20 rows root |-- word: string (nullable = false) |-- word_count: long (nullable = true)
Wenn Sie eine Vorschau der Ausgabetabelle aufrufen möchten, öffnen Sie die SeiteBigQuery
, wählen Sie die Tabellewordcount_output
aus und klicken Sie dann auf Vorschau.
- SSH-Verbindung zum Masterknoten des Dataproc-Clusters herstellen
PySpark
- Untersuchen Sie den Code und ersetzen Sie den Platzhalter [bucket] durch den zuvor erstellten Cloud Storage-Bucket.
#!/usr/bin/env python """BigQuery I/O PySpark example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .master('yarn') \ .appName('spark-bigquery-demo') \ .getOrCreate() # Use the Cloud Storage bucket for temporary BigQuery export data used # by the connector. bucket = "[bucket]" spark.conf.set('temporaryGcsBucket', bucket) # Load data from BigQuery. words = spark.read.format('bigquery') \ .option('table', 'bigquery-public-data:samples.shakespeare') \ .load() words.createOrReplaceTempView('words') # Perform word count. word_count = spark.sql( 'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word') word_count.show() word_count.printSchema() # Save the data to BigQuery word_count.write.format('bigquery') \ .option('table', 'wordcount_dataset.wordcount_output') \ .save()
- Code in Ihrem Cluster ausführen
- SSH-Verbindung zum Masterknoten des Dataproc-Clusters herstellen
- Rufen Sie in der Google Cloud -Konsole die Seite Dataproc Cluster auf und klicken Sie auf den Namen Ihres Clusters.
- Wählen Sie auf der Seite Clusterdetails den Tab „VM-Instanzen“ aus. Klicken Sie dann rechts neben dem Namen des Cluster-Masterknotens auf
SSH
.
Ein Browserfenster wird in Ihrem Basisverzeichnis auf dem Masterknoten geöffnet.Connected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Erstellen Sie
wordcount.py
mit dem vorinstallierten Texteditorvi
,vim
odernano
und fügen Sie dann den PySpark-Code aus der PySpark-Codeliste ein.nano wordcount.py
- Führen Sie Wordcount mit
spark-submit
aus, um die BigQuery-wordcount_output
-Tabelle zu erstellen. Die Ausgabeliste zeigt 20 Zeilen von der Wordcount-Ausgabe an.spark-submit --jars gs://spark-lib/bigquery/spark-bigquery-latest.jar wordcount.py ... +---------+----------+ | word|word_count| +---------+----------+ | XVII| 2| | spoil| 28| | Drink| 7| |forgetful| 5| | Cannot| 46| | cures| 10| | harder| 13| | tresses| 3| | few| 62| | steel'd| 5| | tripping| 7| | travel| 35| | ransom| 55| | hope| 366| | By| 816| | some| 1169| | those| 508| | still| 567| | art| 893| | feign| 10| +---------+----------+ only showing top 20 rows root |-- word: string (nullable = false) |-- word_count: long (nullable = true)
Wenn Sie eine Vorschau der Ausgabetabelle aufrufen möchten, öffnen Sie die SeiteBigQuery
, wählen Sie die Tabellewordcount_output
aus und klicken Sie dann auf Vorschau.
- SSH-Verbindung zum Masterknoten des Dataproc-Clusters herstellen
Weitere Informationen
- BigQuery Storage und Spark SQL – Python
- Tabellendefinitionsdatei für eine externe Datenquelle erstellen
- Extern partitionierte Daten abfragen
- Tipps zur Abstimmung von Spark-Jobs