Verwenden Sie den spark-bigquery-connector mit Apache Spark, um Daten in BigQuery zu lesen und zu schreiben.
In dieser Anleitung wird eine PySpark-Anwendung mit der spark-bigquery-connector
veranschaulicht.
BigQuery-Connector mit Ihrer Arbeitslast verwenden
Siehe Dataproc Serverless für Spark-Laufzeit-Releases die Version des BigQuery-Connectors, die Laufzeitversion der Batcharbeitslast. Ist der Connector nicht aufgeführt, finden Sie im nächsten Abschnitt eine Anleitung, wie Sie den Connector für alle Anwendungen.
Connector mit der Spark-Laufzeitversion 2.0 verwenden
Der BigQuery-Connector ist in der Spark-Laufzeitversion 2.0 nicht installiert. Bei Verwendung Spark-Laufzeitversion 2.0 können Sie den Connector für Ihre Anwendung verfügbar machen, auf eine der folgenden Arten:
- Verwenden Sie den Parameter
jars
, um auf eine JAR-Datei des Connectors zu verweisen, wenn Sie senden Ihre Dataproc Serverless for Spark-Batcharbeitslast Im folgenden Beispiel wird eine Connector-JAR-Datei angegeben (siehe GoogleCloudDataproc/spark-bigquery-connector Repository auf GitHub, um eine Liste der verfügbaren Connector-JAR-Dateien zu erhalten.- Beispiel für die Google Cloud CLI:
gcloud dataproc batches submit pyspark \ --region=region \ --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.13-version.jar \ ... other args
- Beispiel für die Google Cloud CLI:
- Schließen Sie die JAR-Datei des Connectors als Abhängigkeit in Ihre Spark-Anwendung ein (siehe Für den Connector kompilieren).
Kosten berechnen
In dieser Anleitung werden kostenpflichtige Komponenten von Google Cloud verwendet, darunter:
- Dataproc Serverless
- BigQuery
- Cloud Storage
Der Preisrechner kann eine Kostenschätzung anhand Ihrer voraussichtlichen Nutzung generieren. Neuen Cloud Platform-Nutzern steht gegebenenfalls eine kostenlose Testversion zur Verfügung.
BigQuery-E/A
In diesem Beispiel werden Daten aus BigQuery in einen Spark-DataFrame eingelesen und dann mit der Standard-Datenquellen-API einer Wortzählung unterzogen.
Der Connector schreibt die Wordcount-Ausgabe in BigQuery so:
Daten in temporären Dateien in Ihrem Cloud Storage-Bucket puffern
Kopieren Sie die Daten in einem Vorgang aus Ihrem Cloud Storage-Bucket in BigQuery
Die temporären Dateien in Cloud Storage werden nach Abschluss des BigQuery-Ladevorgangs gelöscht. Sie werden auch nach dem Beenden der Spark-Anwendung gelöscht. Wenn der Löschvorgang fehlschlägt, müssen Sie unerwünschte temporäre Cloud Storage-Dateien, die in der Regel in „
gs://your-bucket/.spark-bigquery-jobid-UUID
“.
Abrechnung konfigurieren
Standardmäßig wird das Projekt, das mit den Anmeldeinformationen oder dem Servicekonto verbunden ist, für die API-Nutzung abgerechnet. Wenn Sie ein anderes Projekt in Rechnung stellen möchten, legen Sie die folgende Konfiguration fest: spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>")
.
Sie kann auch folgendermaßen einem Lese-/Schreibvorgang hinzugefügt werden: .option("parentProject", "<BILLED-GCP-PROJECT>")
.
PySpark-Batcharbeitslast für Wortzählung einreichen
-
wordcount_dataset
erstellen mit dem bq-Befehlszeilentool oder in einem lokalen Terminal Cloud Shell:bq mk wordcount_dataset
- Cloud Storage-Bucket erstellen mit der
Google Cloud CLI in einem lokalen
oder in
Cloud Shell
gcloud storage buckets create gs://your-bucket
- Sehen Sie sich den Code an.
#!/usr/bin/python """BigQuery I/O PySpark example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName('spark-bigquery-demo') \ .getOrCreate() # Use the Cloud Storage bucket for temporary BigQuery export data used # by the connector. bucket = "[your-bucket-name]" spark.conf.set('temporaryGcsBucket', bucket) # Load data from BigQuery. words = spark.read.format('bigquery') \ .option('table', 'bigquery-public-data:samples.shakespeare') \ .load() words.createOrReplaceTempView('words') # Perform word count. word_count = spark.sql( 'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word') word_count.show() word_count.printSchema() # Saving the data to BigQuery word_count.write.format('bigquery') \ .option('table', 'wordcount_dataset.wordcount_output') \ .save()
-
wordcount.py
durch Kopieren lokal in einem Texteditor erstellen den PySpark-Code aus dem PySpark-Codeeintrag. Ersetzen Sie den Parameter [your-bucket] mit dem Namen der Cloud Storage-Bucket, den Sie erstellt haben. - Senden Sie die PySpark-Batcharbeitslast:
Beispielausgabe im Terminal:gcloud dataproc batches submit pyspark wordcount.py \ --region=region \ --deps-bucket=your-bucket
... +---------+----------+ | word|word_count| +---------+----------+ | XVII| 2| | spoil| 28| | Drink| 7| |forgetful| 5| | Cannot| 46| | cures| 10| | harder| 13| | tresses| 3| | few| 62| | steel'd| 5| | tripping| 7| | travel| 35| | ransom| 55| | hope| 366| | By| 816| | some| 1169| | those| 508| | still| 567| | art| 893| | feign| 10| +---------+----------+ only showing top 20 rows root |-- word: string (nullable = false) |-- word_count: long (nullable = true)
Wenn Sie sich eine Vorschau der Ausgabetabelle in der Google Cloud Console ansehen möchten, öffnen Sie das BigQuery die Tabellewordcount_output
aus und klicken Sie auf Vorschau.
Weitere Informationen
- BigQuery Storage und Spark SQL – Python
- Tabellendefinitionsdatei für eine externe Datenquelle erstellen
- Extern partitionierte Daten abfragen