BigQuery コネクタを Spark と使用する

spark-bigquery-connectorApache Spark とともに使用すると、BigQuery との間でデータの読み書きを行えます。このチュートリアルでは、Spark アプリケーション内で spark-bigquery-connector を使用するコード例を示します。クラスタの作成手順については、Dataproc のクイックスタートをご覧ください。

アプリケーションにコネクタを提供する

spark-bigquery-connector は、実行中にアプリケーションで使用できる必要があります。これは次のいずれかの方法で実現できます。

  • Spark の jars ディレクトリにコネクタをインストールする。
  • --jars パラメータを使用して実行時にコネクタを追加する。このコネクタは、Dataproc API または spark-submit とともに使用されます。
    • Dataproc イメージ 1.5 を使用している場合は、次のパラメータを追加します。
      --jars=gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar
    • Dataproc イメージ 1.4 かそれ未満を使用している場合は、次のパラメータを追加します。
      --jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar
  • Scala または Java Spark アプリケーションに依存関係として jar を含める(コネクタのコンパイルをご覧ください)。

実行時にコネクタが使用できない場合は、ClassNotFoundException がスローされます。

費用の計算

このチュートリアルでは、Google Cloud の課金対象となる以下のコンポーネントを使用します。

  • Dataproc
  • BigQuery
  • Cloud Storage

料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを出すことができます。Cloud Platform の新規ユーザーは無料トライアルをご利用いただけます。

BigQuery のデータの読み取りと書き込み

このサンプルでは、BigQuery から Spark DataFrame にデータを読み込み、標準データソース API を使用してワード数をカウントします。

コネクタは、最初にすべてのデータを Cloud Storage の一時テーブルにバッファリングしてから、1 回のオペレーションですべてのデータを BigQuery へコピーします。BigQuery の読み込みオペレーションが成功した直後と、Spark アプリケーションが終了する際に、コネクタは一時ファイルの削除を試みます。ジョブが失敗した場合、残っている Cloud Storage の一時ファイルの手動削除が必要となる場合があります。通常は、BigQuery からのエクスポートされたものは、一時的に gs://[bucket]/.spark-bigquery-[jobid]-[UUID] に保存されます。

課金の設定

デフォルトでは、認証情報またはサービス アカウントに関連付けられているプロジェクトには、API 使用量に対して課金されます。別のプロジェクトに請求するには、次の構成 spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>") を設定します。

また、読み取り/書き込みオペレーションに次のように追加することもできます: .option("parentProject", "<BILLED-GCP-PROJECT>")

コードの実行

このサンプルを実行する場合は、事前に「wordcount_dataset」という名前のデータセットを作成するか、コード内の出力データセットを Google Cloud プロジェクトの既存の BigQuery データセットに変更します。

次のように bq コマンドを使用して wordcount_dataset を作成します。

bq mk wordcount_dataset

gsutil コマンドを使用して、Cloud Storage バケットを作成します。これは、BigQuery にエクスポートするために使われます。

gsutil mb gs://[bucket]

Scala

  1. コードを調べて、[bucket] プレースホルダを上記で作成した Cloud Storage バケットに置き換えます。
    /*
     * Remove comment if you are not running in spark-shell.
     *
    import org.apache.spark.sql.SparkSession
    val spark = SparkSession.builder()
      .appName("spark-bigquery-demo")
      .getOrCreate()
    */
    
    // Use the Cloud Storage bucket for temporary BigQuery export data used
    // by the connector.
    val bucket = "[bucket]"
    spark.conf.set("temporaryGcsBucket", bucket)
    
    // Load data in from BigQuery.
    val wordsDF = spark.read.format("bigquery")
      .option("table","bigquery-public-data:samples.shakespeare")
      .load()
      .cache()
    wordsDF.createOrReplaceTempView("words")
    
    // Perform word count.
    val wordCountDF = spark.sql(
      "SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word")
    wordCountDF.show()
    wordCountDF.printSchema()
    
    // Saving the data to BigQuery.
    wordCountDF.write.format("bigquery")
      .option("table","wordcount_dataset.wordcount_output")
      .save()
    
    
  2. クラスタ上でコードを実行します
    1. Dataproc クラスタのマスターノードに SSH で接続します。
      1. Cloud Console でプロジェクトの Dataproc クラスタ ページを開き、クラスタの名前をクリックします。
      2. クラスタの詳細ページで VM インスタンス タブを選択し、クラスタのマスターノード名の右側に表示される SSH 選択をクリックします。

        ブラウザ ウィンドウがマスターノードのホーム ディレクトリで開きます。
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. 事前にインストールされた vivimnano テキスト エディタで wordcount.scala を作成し、Scala コードの一覧から Scala コードを貼り付けます。
      nano wordcount.scala
        
    3. spark-shell REPL を起動します。
      $ spark-shell --jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar
      ...
      Using Scala version ...
      Type in expressions to have them evaluated.
      Type :help for more information.
      ...
      Spark context available as sc.
      ...
      SQL context available as sqlContext.
      scala>
      
    4. :load wordcount.scala コマンドで wordcount.scala を実行して、BigQuery の wordcount_output テーブルを作成します。出力リストには、ワードカウント出力のうち 20 行が表示されます。
      :load wordcount.scala
      ...
      +---------+----------+
      |     word|word_count|
      +---------+----------+
      |     XVII|         2|
      |    spoil|        28|
      |    Drink|         7|
      |forgetful|         5|
      |   Cannot|        46|
      |    cures|        10|
      |   harder|        13|
      |  tresses|         3|
      |      few|        62|
      |  steel'd|         5|
      | tripping|         7|
      |   travel|        35|
      |   ransom|        55|
      |     hope|       366|
      |       By|       816|
      |     some|      1169|
      |    those|       508|
      |    still|       567|
      |      art|       893|
      |    feign|        10|
      +---------+----------+
      only showing top 20 rows
      
      root
       |-- word: string (nullable = false)
       |-- word_count: long (nullable = true)
      

      出力テーブルをプレビューするには、プロジェクトの BigQuery ページで wordcount_output テーブルを選択し、[プレビュー] をクリックします。

PySpark

  1. コードを調べて、[bucket] プレースホルダを上記で作成した Cloud Storage バケットに置き換えます。
    #!/usr/bin/python
    """BigQuery I/O PySpark example."""
    from pyspark.sql import SparkSession
    
    spark = SparkSession \
      .builder \
      .master('yarn') \
      .appName('spark-bigquery-demo') \
      .getOrCreate()
    
    # Use the Cloud Storage bucket for temporary BigQuery export data used
    # by the connector.
    bucket = "[bucket]"
    spark.conf.set('temporaryGcsBucket', bucket)
    
    # Load data from BigQuery.
    words = spark.read.format('bigquery') \
      .option('table', 'bigquery-public-data:samples.shakespeare') \
      .load()
    words.createOrReplaceTempView('words')
    
    # Perform word count.
    word_count = spark.sql(
        'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
    word_count.show()
    word_count.printSchema()
    
    # Saving the data to BigQuery
    word_count.write.format('bigquery') \
      .option('table', 'wordcount_dataset.wordcount_output') \
      .save()
    
  2. クラスタ上でコードを実行します
    1. Dataproc クラスタのマスターノードに SSH で接続します。
      1. Cloud Console でプロジェクトの Dataproc クラスタ ページを開き、クラスタの名前をクリックします。
      2. クラスタの詳細ページで VM インスタンス タブを選択し、クラスタのマスターノード名の右側に表示される SSH 選択をクリックします。

        ブラウザ ウィンドウがマスターノードのホーム ディレクトリで開きます。
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. 事前にインストールされた vivim、または nano テキスト エディタで wordcount.py を作成し、PySpark のコード一覧から PySpark コードを貼り付けます。
      nano wordcount.py
      
    3. spark-submit で wordcount を実行し、BigQuery の wordcount_output テーブルを作成します。出力リストには、ワードカウント出力のうち 20 行が表示されます。
      spark-submit --jars gs://spark-lib/bigquery/spark-bigquery-latest.jar wordcount.py
      ...
      +---------+----------+
      |     word|word_count|
      +---------+----------+
      |     XVII|         2|
      |    spoil|        28|
      |    Drink|         7|
      |forgetful|         5|
      |   Cannot|        46|
      |    cures|        10|
      |   harder|        13|
      |  tresses|         3|
      |      few|        62|
      |  steel'd|         5|
      | tripping|         7|
      |   travel|        35|
      |   ransom|        55|
      |     hope|       366|
      |       By|       816|
      |     some|      1169|
      |    those|       508|
      |    still|       567|
      |      art|       893|
      |    feign|        10|
      +---------+----------+
      only showing top 20 rows
      
      root
       |-- word: string (nullable = false)
       |-- word_count: long (nullable = true)
      

      出力テーブルをプレビューするには、プロジェクトの BigQuery ページで wordcount_output テーブルを選択し、[プレビュー] をクリックします。

詳細情報