Gunakan spark-bigquery-connector
dengan Apache Spark
untuk membaca dan menulis data dari dan ke BigQuery.
Tutorial ini menunjukkan aplikasi PySpark yang menggunakan
spark-bigquery-connector
.
Menggunakan konektor BigQuery dengan beban kerja Anda
Lihat Rilis runtime Dataproc Serverless untuk Spark untuk menentukan versi konektor BigQuery yang diinstal di versi runtime beban kerja batch Anda. Jika konektor tidak tercantum, lihat bagian berikutnya untuk mengetahui petunjuk tentang cara menyediakan konektor untuk aplikasi.
Cara menggunakan konektor dengan runtime Spark versi 2.0
Konektor BigQuery tidak diinstal di runtime Spark versi 2.0. Saat menggunakan runtime Spark versi 2.0, Anda dapat menyediakan konektor untuk aplikasi dengan salah satu cara berikut:
- Gunakan parameter
jars
untuk mengarah ke file jar konektor saat Anda mengirimkan Dataproc Serverless untuk workload batch Spark. Contoh berikut menentukan file jar konektor (lihat repositori GoogleCloudDataproc/spark-bigquery-connector di GitHub untuk mengetahui daftar file jar konektor yang tersedia).- Contoh Google Cloud CLI:
gcloud dataproc batches submit pyspark \ --region=region \ --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.13-version.jar \ ... other args
- Contoh Google Cloud CLI:
- Sertakan file jar konektor dalam aplikasi Spark Anda sebagai dependensi (lihat Mengompilasi terhadap konektor)
Menghitung biaya
Tutorial ini menggunakan komponen Google Cloud yang dapat ditagih, termasuk:
- Dataproc Serverless
- BigQuery
- Cloud Storage
Gunakan Kalkulator Harga untuk membuat perkiraan biaya berdasarkan penggunaan yang Anda proyeksikan. Pengguna Cloud Platform baru mungkin memenuhi syarat untuk mendapatkan uji coba gratis.
I/O BigQuery
Contoh ini membaca data dari BigQuery ke dalam DataFrame Spark untuk melakukan penghitungan kata menggunakan API sumber data standar.
Konektor menulis output jumlah kata ke BigQuery sebagai berikut:
Menyimpan data ke dalam file sementara di bucket Cloud Storage
Menyalin data dalam satu operasi dari bucket Cloud Storage ke BigQuery
Menghapus file sementara di Cloud Storage setelah operasi pemuatan BigQuery selesai (file sementara juga dihapus setelah aplikasi Spark dihentikan). Jika penghapusan gagal, Anda harus menghapus file Cloud Storage sementara yang tidak diinginkan, yang biasanya ditempatkan di
gs://YOUR_BUCKET/.spark-bigquery-JOB_ID-UUID
.
Mengonfigurasi penagihan
Secara default, project yang terkait dengan kredensial atau akun layanan akan ditagih untuk penggunaan API. Untuk menagih project lain, tetapkan konfigurasi
berikut: spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>")
.
Anda juga dapat menambahkan ke operasi baca atau tulis, sebagai berikut:
.option("parentProject", "<BILLED-GCP-PROJECT>")
.
Mengirimkan workload batch jumlah kata PySpark
Jalankan beban kerja batch Spark yang menghitung jumlah kata dalam set data publik.
- Buka terminal lokal atau Cloud Shell
- Buat
wordcount_dataset
dengan alat command line bq di terminal lokal atau di Cloud Shell.bq mk wordcount_dataset
- Buat bucket Cloud Storage dengan Google Cloud CLI.
Gantigcloud storage buckets create gs://YOUR_BUCKET
YOUR_BUCKET
dengan nama bucket Cloud Storage yang Anda buat. - Buat file
wordcount.py
secara lokal di editor teks dengan menyalin kode PySpark berikut.#!/usr/bin/python """BigQuery I/O PySpark example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName('spark-bigquery-demo') \ .getOrCreate() # Use the Cloud Storage bucket for temporary BigQuery export data used # by the connector. bucket = "YOUR_BUCKET" spark.conf.set('temporaryGcsBucket', bucket) # Load data from BigQuery. words = spark.read.format('bigquery') \ .option('table', 'bigquery-public-data:samples.shakespeare') \ .load() words.createOrReplaceTempView('words') # Perform word count. word_count = spark.sql( 'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word') word_count.show() word_count.printSchema() # Saving the data to BigQuery word_count.write.format('bigquery') \ .option('table', 'wordcount_dataset.wordcount_output') \ .save()
- Kirimkan beban kerja batch PySpark:
Contoh output terminal:gcloud dataproc batches submit pyspark wordcount.py \ --region=REGION \ --deps-bucket=YOUR_BUCKET
... +---------+----------+ | word|word_count| +---------+----------+ | XVII| 2| | spoil| 28| | Drink| 7| |forgetful| 5| | Cannot| 46| | cures| 10| | harder| 13| | tresses| 3| | few| 62| | steel'd| 5| | tripping| 7| | travel| 35| | ransom| 55| | hope| 366| | By| 816| | some| 1169| | those| 508| | still| 567| | art| 893| | feign| 10| +---------+----------+ only showing top 20 rows root |-- word: string (nullable = false) |-- word_count: long (nullable = true)
Untuk melihat pratinjau tabel output di konsol Google Cloud, buka halaman BigQuery project Anda, pilih tabelwordcount_output
, lalu klik Preview.
Untuk informasi selengkapnya
- BigQuery Storage & Spark SQL - Python
- Membuat file definisi tabel untuk sumber data eksternal
- Menggunakan data yang dipartisi secara eksternal