Menggunakan konektor BigQuery dengan Spark

spark-bigquery-connector digunakan bersama Apache Spark untuk membaca dan menulis data dari dan ke BigQuery. Tutorial ini menyediakan kode contoh yang menggunakan spark-bigquery-connector dalam aplikasi Spark. Untuk mendapatkan petunjuk tentang cara membuat cluster, baca Panduan Memulai Dataproc.

Sediakan konektor untuk aplikasi Anda

Anda dapat menyediakan spark-bigquery-connector untuk aplikasi Anda dengan salah satu cara berikut:

  1. Instal spark-bigquery-connector di direktori jars Spark di setiap node menggunakan tindakan inisialisasi konektor Dataproc saat Anda membuat cluster.

  2. Berikan URI konektor saat Anda mengirimkan tugas:

    1. Konsol Google Cloud: Gunakan item Jars files tugas Spark di halaman Submit a job Dataproc.
    2. gcloud CLI: Gunakan flag gcloud dataproc jobs submit spark --jars.
    3. Dataproc API: Gunakan kolom SparkJob.jarFileUris.
  3. Sertakan jar di aplikasi Scala atau Java Spark sebagai dependensi (lihat Mengompilasi konektor).

Cara menentukan URI jar konektor

Versi konektor Spark-BigQuery tercantum di repositori GoogleCloudDataproc/spark-bigquery-connector GitHub.

Tentukan jar konektor dengan mengganti informasi versi Scala dan konektor dalam string URI berikut:

gs://spark-lib/bigquery/spark-bigquery-with-dependencies_SCALA_VERSION-CONNECTOR_VERSION.jar

  • Gunakan Scala 2.12 dengan versi image Dataproc 1.5+

    gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-CONNECTOR_VERSION.jar
    

    contoh gcloud CLI:

    gcloud dataproc jobs submit spark \
        --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.23.2.jar \
        -- job-args
    

  • Gunakan Scala 2.11 dengan image Dataproc versi 1.4 dan yang lebih lama:

    gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-CONNECTOR_VERSION.jar
    

    contoh gcloud CLI:

    gcloud dataproc jobs submit spark \
        --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-0.23.2.jar \
        -- job-args
    

Menghitung biaya

Dalam dokumen ini, Anda menggunakan komponen Google Cloud yang dapat ditagih berikut:

  • Dataproc
  • BigQuery
  • Cloud Storage

Untuk membuat perkiraan biaya berdasarkan proyeksi penggunaan Anda, gunakan kalkulator harga. Pengguna baru Google Cloud mungkin memenuhi syarat untuk mendapatkan uji coba gratis.

Membaca dan menulis data dari BigQuery

Contoh ini membaca data dari BigQuery ke Spark DataFrame untuk melakukan jumlah kata menggunakan API sumber data standar.

Konektor menulis data ke BigQuery dengan terlebih dahulu melakukan buffering semua data ke tabel sementara Cloud Storage. Kemudian, fungsi ini menyalin semua data dari ke dalam BigQuery dalam satu operasi. Konektor mencoba menghapus file sementara setelah operasi pemuatan BigQuery berhasil dan sekali lagi saat aplikasi Spark dihentikan. Jika tugas gagal, hapus file Cloud Storage sementara yang tersisa. Biasanya, file BigQuery sementara berada di gs://[bucket]/.spark-bigquery-[jobid]-[UUID].

Mengonfigurasi penagihan

Secara default, project yang terkait dengan kredensial atau akun layanan ditagih untuk penggunaan API. Untuk menagih project yang berbeda, tetapkan konfigurasi berikut: spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>").

Ini juga dapat ditambahkan ke operasi baca/tulis, seperti berikut: .option("parentProject", "<BILLED-GCP-PROJECT>").

Menjalankan kode

Sebelum menjalankan contoh ini, buat set data dengan nama "wordcount_dataset" atau ubah set data output dalam kode menjadi set data BigQuery yang ada di project Google Cloud Anda.

Gunakan perintah bq untuk membuat wordcount_dataset:

bq mk wordcount_dataset

Gunakan perintah gsutil untuk membuat bucket Cloud Storage yang akan digunakan untuk mengekspor ke BigQuery:

gsutil mb gs://[bucket]

Scala

  1. Periksa kode dan ganti placeholder [bucket] dengan bucket Cloud Storage yang Anda buat sebelumnya.
    /*
     * Remove comment if you are not running in spark-shell.
     *
    import org.apache.spark.sql.SparkSession
    val spark = SparkSession.builder()
      .appName("spark-bigquery-demo")
      .getOrCreate()
    */
    
    // Use the Cloud Storage bucket for temporary BigQuery export data used
    // by the connector.
    val bucket = "[bucket]"
    spark.conf.set("temporaryGcsBucket", bucket)
    
    // Load data in from BigQuery. See
    // https://github.com/GoogleCloudDataproc/spark-bigquery-connector/tree/0.17.3#properties
    // for option information.
    val wordsDF =
      (spark.read.format("bigquery")
      .option("table","bigquery-public-data:samples.shakespeare")
      .load()
      .cache())
    
    wordsDF.createOrReplaceTempView("words")
    
    // Perform word count.
    val wordCountDF = spark.sql(
      "SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word")
    wordCountDF.show()
    wordCountDF.printSchema()
    
    // Saving the data to BigQuery.
    (wordCountDF.write.format("bigquery")
      .option("table","wordcount_dataset.wordcount_output")
      .save())
  2. Jalankan kode di cluster Anda
    1. Gunakan SSH untuk terhubung ke node master cluster Dataproc
      1. Buka halaman Clusters di Konsol Google Cloud, lalu klik nama cluster Anda
        Halaman cluster Dataproc di Cloud Console.
      2. Di halaman >Cluster details, pilih tab VM Instances. Kemudian, klik SSH di sebelah kanan nama node master cluster
        Halaman detail Cluster Dataproc di Cloud Console.

        Jendela browser akan terbuka di direktori beranda Anda pada node master
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. Buat wordcount.scala dengan editor teks vi, vim, atau nano yang telah diinstal sebelumnya, lalu tempel kode Scala dari listingan kode Scala
      nano wordcount.scala
        
    3. Luncurkan spark-shell REPL.
      $ spark-shell --jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar
      ...
      Using Scala version ...
      Type in expressions to have them evaluated.
      Type :help for more information.
      ...
      Spark context available as sc.
      ...
      SQL context available as sqlContext.
      scala>
      
    4. Jalankan wordcount.scala dengan perintah :load wordcount.scala untuk membuat tabel wordcount_output BigQuery. Listingan output menampilkan 20 baris dari output jumlah kata.
      :load wordcount.scala
      ...
      +---------+----------+
      |     word|word_count|
      +---------+----------+
      |     XVII|         2|
      |    spoil|        28|
      |    Drink|         7|
      |forgetful|         5|
      |   Cannot|        46|
      |    cures|        10|
      |   harder|        13|
      |  tresses|         3|
      |      few|        62|
      |  steel'd|         5|
      | tripping|         7|
      |   travel|        35|
      |   ransom|        55|
      |     hope|       366|
      |       By|       816|
      |     some|      1169|
      |    those|       508|
      |    still|       567|
      |      art|       893|
      |    feign|        10|
      +---------+----------+
      only showing top 20 rows
      
      root
       |-- word: string (nullable = false)
       |-- word_count: long (nullable = true)
      

      Untuk melihat pratinjau tabel output, buka halaman BigQuery, pilih tabel wordcount_output, lalu klik Pratinjau.
      Pratinjau tabel di halaman BigQuery Explorer di Cloud Console.

PySpark

  1. Periksa kode dan ganti placeholder [bucket] dengan bucket Cloud Storage yang Anda buat sebelumnya.
    #!/usr/bin/env python
    
    """BigQuery I/O PySpark example."""
    
    from pyspark.sql import SparkSession
    
    spark = SparkSession \
      .builder \
      .master('yarn') \
      .appName('spark-bigquery-demo') \
      .getOrCreate()
    
    # Use the Cloud Storage bucket for temporary BigQuery export data used
    # by the connector.
    bucket = "[bucket]"
    spark.conf.set('temporaryGcsBucket', bucket)
    
    # Load data from BigQuery.
    words = spark.read.format('bigquery') \
      .option('table', 'bigquery-public-data:samples.shakespeare') \
      .load()
    words.createOrReplaceTempView('words')
    
    # Perform word count.
    word_count = spark.sql(
        'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
    word_count.show()
    word_count.printSchema()
    
    # Save the data to BigQuery
    word_count.write.format('bigquery') \
      .option('table', 'wordcount_dataset.wordcount_output') \
      .save()
  2. Jalankan kode pada cluster Anda
    1. Gunakan SSH untuk terhubung ke node master cluster Dataproc
      1. Buka halaman Clusters di Konsol Google Cloud, lalu klik nama cluster Anda
        Halaman cluster di Cloud Console.
      2. Di halaman Cluster details, pilih tab VM Instances. Kemudian, klik SSH di sebelah kanan nama node master cluster
        Pilih SSH pada baris nama cluster di halaman detail Cluster di Cloud Console.

        Jendela browser akan terbuka di direktori beranda Anda pada node master
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. Buat wordcount.py dengan editor teks vi, vim, atau nano yang telah diinstal sebelumnya, lalu tempel kode PySpark dari listingan kode PySpark
      nano wordcount.py
      
    3. Jalankan jumlah kata dengan spark-submit untuk membuat tabel wordcount_output BigQuery. Listingan output menampilkan 20 baris dari output jumlah kata.
      spark-submit --jars gs://spark-lib/bigquery/spark-bigquery-latest.jar wordcount.py
      ...
      +---------+----------+
      |     word|word_count|
      +---------+----------+
      |     XVII|         2|
      |    spoil|        28|
      |    Drink|         7|
      |forgetful|         5|
      |   Cannot|        46|
      |    cures|        10|
      |   harder|        13|
      |  tresses|         3|
      |      few|        62|
      |  steel'd|         5|
      | tripping|         7|
      |   travel|        35|
      |   ransom|        55|
      |     hope|       366|
      |       By|       816|
      |     some|      1169|
      |    those|       508|
      |    still|       567|
      |      art|       893|
      |    feign|        10|
      +---------+----------+
      only showing top 20 rows
      
      root
       |-- word: string (nullable = false)
       |-- word_count: long (nullable = true)
      

      Untuk melihat pratinjau tabel output, buka halaman BigQuery, pilih tabel wordcount_output, lalu klik Pratinjau.
      Pratinjau tabel di halaman BigQuery Explorer di Cloud Console.

Untuk informasi selengkapnya