spark-bigquery-connector digunakan dengan Apache Spark untuk membaca dan menulis data dari dan ke BigQuery. Tutorial ini memberikan contoh kode yang menggunakan spark-bigquery-connector dalam aplikasi Spark. Untuk petunjuk cara membuat cluster, lihat Panduan Memulai Dataproc.
Menyediakan konektor untuk aplikasi Anda
Anda dapat menyediakan spark-bigquery-connector untuk aplikasi Anda dengan salah satu cara berikut:
Instal spark-bigquery-connector di direktori jar Spark di setiap node menggunakan tindakan inisialisasi konektor Dataproc saat Anda membuat cluster.
Berikan URI konektor saat Anda mengirimkan tugas:
- Konsol Google Cloud: Gunakan item
Jars files
tugas Spark di halaman Kirim tugas Dataproc. - gcloud CLI: Gunakan flag
gcloud dataproc jobs submit spark --jars
. - Dataproc API: Gunakan
kolom
SparkJob.jarFileUris
.
- Konsol Google Cloud: Gunakan item
Sertakan jar dalam aplikasi Scala atau Java Spark Anda sebagai dependensi (lihat Mengompilasi terhadap konektor).
Cara menentukan URI jar konektor
Versi konektor Spark-BigQuery tercantum di repositori GoogleCloudDataproc/spark-bigquery-connector GitHub.
Tentukan jar konektor dengan mengganti informasi versi Scala dan konektor
dalam string URI berikut:
gs://spark-lib/bigquery/spark-bigquery-with-dependencies_SCALA_VERSION-CONNECTOR_VERSION.jar
Menggunakan Scala
2.12
dengan versi image Dataproc1.5+
gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-CONNECTOR_VERSION.jar
Contoh gcloud CLI:
gcloud dataproc jobs submit spark \ --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.23.2.jar \ -- job-args
Gunakan Scala
2.11
dengan versi image Dataproc1.4
dan yang lebih lama:gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-CONNECTOR_VERSION.jar
Contoh gcloud CLI:
gcloud dataproc jobs submit spark \ --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-0.23.2.jar \ -- job-args
Menghitung biaya
Dalam dokumen ini, Anda akan menggunakan komponen Google Cloud yang dapat ditagih berikut:
- Dataproc
- BigQuery
- Cloud Storage
Untuk membuat perkiraan biaya berdasarkan proyeksi penggunaan Anda,
gunakan kalkulator harga.
Membaca dan menulis data dari BigQuery
Contoh ini membaca data dari BigQuery ke dalam DataFrame Spark untuk melakukan penghitungan kata menggunakan API sumber data standar.
Konektor menulis data ke BigQuery dengan
melakukan buffering semua data ke tabel sementara Cloud Storage terlebih dahulu. Kemudian, alat ini
akan menyalin semua data dari ke BigQuery dalam satu operasi. Konektor
akan mencoba menghapus file sementara setelah operasi pemuatan
BigQuery berhasil dan sekali lagi saat aplikasi Spark dihentikan.
Jika tugas gagal, hapus file Cloud Storage sementara yang tersisa. Biasanya, file BigQuery sementara berada di gs://[bucket]/.spark-bigquery-[jobid]-[UUID]
.
Mengonfigurasi penagihan
Secara default, project yang terkait dengan kredensial atau akun layanan akan ditagih untuk penggunaan API. Untuk menagih project lain, tetapkan konfigurasi
berikut: spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>")
.
Ini juga dapat ditambahkan ke operasi baca/tulis, sebagai berikut:
.option("parentProject", "<BILLED-GCP-PROJECT>")
.
Menjalankan kode
Sebelum menjalankan contoh ini, buat set data bernama "wordcount_dataset" atau ubah set data output dalam kode ke set data BigQuery yang ada di project Google Cloud Anda.
Gunakan perintah bq untuk membuat wordcount_dataset
:
bq mk wordcount_dataset
Gunakan perintah Google Cloud CLI untuk membuat bucket Cloud Storage, yang akan digunakan untuk mengekspor ke BigQuery:
gcloud storage buckets create gs://[bucket]
Scala
- Periksa kode dan ganti placeholder [bucket] dengan bucket Cloud Storage yang Anda buat sebelumnya.
/* * Remove comment if you are not running in spark-shell. * import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .appName("spark-bigquery-demo") .getOrCreate() */ // Use the Cloud Storage bucket for temporary BigQuery export data used // by the connector. val bucket = "[bucket]" spark.conf.set("temporaryGcsBucket", bucket) // Load data in from BigQuery. See // https://github.com/GoogleCloudDataproc/spark-bigquery-connector/tree/0.17.3#properties // for option information. val wordsDF = (spark.read.format("bigquery") .option("table","bigquery-public-data:samples.shakespeare") .load() .cache()) wordsDF.createOrReplaceTempView("words") // Perform word count. val wordCountDF = spark.sql( "SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word") wordCountDF.show() wordCountDF.printSchema() // Saving the data to BigQuery. (wordCountDF.write.format("bigquery") .option("table","wordcount_dataset.wordcount_output") .save())
- Jalankan kode di cluster Anda
- Gunakan SSH untuk terhubung ke node master cluster Dataproc
- Buka halaman Cluster Dataproc di konsol Google Cloud, lalu klik nama cluster Anda
- Di halaman >Cluster details, pilih tab VM Instances. Kemudian, klik
SSH
di sebelah kanan nama node master cluster
Jendela browser akan terbuka di direktori utama Anda di node masterConnected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Buat
wordcount.scala
dengan editor teksvi
,vim
, ataunano
bawaan, lalu tempel kode Scala dari listingan kode Scalanano wordcount.scala
- Luncurkan REPL
spark-shell
.$ spark-shell --jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar ... Using Scala version ... Type in expressions to have them evaluated. Type :help for more information. ... Spark context available as sc. ... SQL context available as sqlContext. scala>
- Jalankan wordcount.scala dengan perintah
:load wordcount.scala
untuk membuat tabelwordcount_output
BigQuery. Listingan output menampilkan 20 baris dari output jumlah kata.:load wordcount.scala ... +---------+----------+ | word|word_count| +---------+----------+ | XVII| 2| | spoil| 28| | Drink| 7| |forgetful| 5| | Cannot| 46| | cures| 10| | harder| 13| | tresses| 3| | few| 62| | steel'd| 5| | tripping| 7| | travel| 35| | ransom| 55| | hope| 366| | By| 816| | some| 1169| | those| 508| | still| 567| | art| 893| | feign| 10| +---------+----------+ only showing top 20 rows root |-- word: string (nullable = false) |-- word_count: long (nullable = true)
Untuk melihat pratinjau tabel output, buka halamanBigQuery
, pilih tabelwordcount_output
, lalu klik Preview.
- Gunakan SSH untuk terhubung ke node master cluster Dataproc
PySpark
- Periksa kode dan ganti placeholder [bucket] dengan bucket Cloud Storage yang Anda buat sebelumnya.
#!/usr/bin/env python """BigQuery I/O PySpark example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .master('yarn') \ .appName('spark-bigquery-demo') \ .getOrCreate() # Use the Cloud Storage bucket for temporary BigQuery export data used # by the connector. bucket = "[bucket]" spark.conf.set('temporaryGcsBucket', bucket) # Load data from BigQuery. words = spark.read.format('bigquery') \ .option('table', 'bigquery-public-data:samples.shakespeare') \ .load() words.createOrReplaceTempView('words') # Perform word count. word_count = spark.sql( 'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word') word_count.show() word_count.printSchema() # Save the data to BigQuery word_count.write.format('bigquery') \ .option('table', 'wordcount_dataset.wordcount_output') \ .save()
- Jalankan kode di cluster Anda
- Gunakan SSH untuk terhubung ke node master cluster Dataproc
- Buka halaman Cluster Dataproc di konsol Google Cloud, lalu klik nama cluster Anda
- Di halaman Cluster details, pilih tab VM Instances. Kemudian, klik
SSH
di sebelah kanan nama node master cluster
Jendela browser akan terbuka di direktori utama Anda di node masterConnected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Buat
wordcount.py
dengan editor teksvi
,vim
, ataunano
yang telah diinstal sebelumnya, lalu tempel kode PySpark dari listingan kode PySparknano wordcount.py
- Jalankan wordcount dengan
spark-submit
untuk membuat tabelwordcount_output
BigQuery. Listingan output menampilkan 20 baris dari output jumlah kata.spark-submit --jars gs://spark-lib/bigquery/spark-bigquery-latest.jar wordcount.py ... +---------+----------+ | word|word_count| +---------+----------+ | XVII| 2| | spoil| 28| | Drink| 7| |forgetful| 5| | Cannot| 46| | cures| 10| | harder| 13| | tresses| 3| | few| 62| | steel'd| 5| | tripping| 7| | travel| 35| | ransom| 55| | hope| 366| | By| 816| | some| 1169| | those| 508| | still| 567| | art| 893| | feign| 10| +---------+----------+ only showing top 20 rows root |-- word: string (nullable = false) |-- word_count: long (nullable = true)
Untuk melihat pratinjau tabel output, buka halamanBigQuery
, pilih tabelwordcount_output
, lalu klik Preview.
- Gunakan SSH untuk terhubung ke node master cluster Dataproc
Untuk informasi selengkapnya
- BigQuery Storage & Spark SQL - Python
- Membuat file definisi tabel untuk sumber data eksternal
- Membuat kueri data yang dipartisi secara eksternal
- Tips penyesuaian tugas Spark