BigQuery コネクタを Spark と使用する

spark-bigquery-connectorApache Spark とともに使用すると、BigQuery との間でデータの読み書きを行えます。このチュートリアルでは、Spark アプリケーション内で spark-bigquery-connector を使用するコード例を示します。クラスタの作成手順については、Dataproc のクイックスタートをご覧ください。

アプリケーションでコネクタを利用できるようにする

spark-bigquery-connector をアプリケーションで使用できるようにするには、次のいずれかの方法を使用します。

  1. クラスタの作成時に Dataproc コネクタ初期化アクションを使用して、すべてのノードの Spark の jars ディレクトリに spark-bigquery-connector をインストールします。

  2. ジョブを送信するときにコネクタの URI を指定します。

    1. コンソール: Dataproc の [ジョブの送信] ページにある Spark ジョブの Jars files 項目を使用します。
    2. gcloud CLI: gcloud dataproc jobs submit spark --jars フラグを使用します。
    3. Dataproc API: SparkJob.jarFileUris フィールドを使用します。
  3. Scala または Java Spark アプリケーションに依存関係として jar を含めます(コネクタのコンパイルをご覧ください)。

コネクタ jar URI の指定方法

Spark-BigQuery コネクタのバージョンは、GitHub の GoogleCloudDataproc/spark-bigquery-connector リポジトリに記載されています。

コネクタ jar は、次の URI 文字列の Scala とコネクタのバージョン情報を置き換えて指定します。

gs://spark-lib/bigquery/spark-bigquery-with-dependencies_SCALA_VERSION-CONNECTOR_VERSION.jar

  • Dataproc イメージ バージョン 1.5+ で Scala 2.12 を使用します。

    gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-CONNECTOR_VERSION.jar
    

    gcloud CLI の例:

    gcloud dataproc jobs submit spark \
        --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.23.2.jar \
        -- job-args
    

  • Dataproc イメージ バージョン 1.4 以前には Scala 2.11 を使用します。

    gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-CONNECTOR_VERSION.jar
    

    gcloud CLI の例:

    gcloud dataproc jobs submit spark \
        --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-0.23.2.jar \
        -- job-args
    

費用の計算

このチュートリアルでは、課金対象である次の Google Cloud コンポーネントを使用します。

  • Dataproc
  • BigQuery
  • Cloud Storage

料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。 新しい Google Cloud ユーザーは無料トライアルをご利用いただける場合があります。

BigQuery のデータの読み取りと書き込み

このサンプルでは、BigQuery から Spark DataFrame にデータを読み込み、標準データソース API を使用してワード数をカウントします。

コネクタは、最初にすべてのデータを Cloud Storage の一時テーブルにバッファリングして、BigQuery にデータを書き込みます。その後、1 回のオペレーションですべてのデータを BigQuery にコピーします。BigQuery の読み込みオペレーションが成功した直後と、Spark アプリケーションが終了する際に、コネクタは一時ファイルの削除を試みます。ジョブが失敗した場合、残っている Cloud Storage の一時ファイルを削除します。通常、BigQuery の一時ファイルは gs://[bucket]/.spark-bigquery-[jobid]-[UUID] にあります。

課金の設定

デフォルトでは、認証情報またはサービス アカウントに関連付けられているプロジェクトには、API 使用量に対して課金されます。別のプロジェクトに請求するには、次のように構成します: spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>")

また、読み取り/書き込みオペレーションに次のように追加することもできます: .option("parentProject", "<BILLED-GCP-PROJECT>")

コードの実行

このサンプルを実行する場合は、事前に「wordcount_dataset」という名前のデータセットを作成するか、コード内の出力データセットを Google Cloud プロジェクトの既存の BigQuery データセットに変更します。

次のように bq コマンドを使用して wordcount_dataset を作成します。

bq mk wordcount_dataset

gsutil コマンドを使用して、Cloud Storage バケットを作成します。これは、BigQuery にエクスポートするために使われます。

gsutil mb gs://[bucket]

Scala

  1. コードを調べて、[bucket] プレースホルダを以前に作成した Cloud Storage バケットに置き換えます。
    /*
     * Remove comment if you are not running in spark-shell.
     *
    import org.apache.spark.sql.SparkSession
    val spark = SparkSession.builder()
      .appName("spark-bigquery-demo")
      .getOrCreate()
    */
    
    // Use the Cloud Storage bucket for temporary BigQuery export data used
    // by the connector.
    val bucket = "[bucket]"
    spark.conf.set("temporaryGcsBucket", bucket)
    
    // Load data in from BigQuery. See
    // https://github.com/GoogleCloudDataproc/spark-bigquery-connector/tree/0.17.3#properties
    // for option information.
    val wordsDF =
      (spark.read.format("bigquery")
      .option("table","bigquery-public-data:samples.shakespeare")
      .load()
      .cache())
    
    wordsDF.createOrReplaceTempView("words")
    
    // Perform word count.
    val wordCountDF = spark.sql(
      "SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word")
    wordCountDF.show()
    wordCountDF.printSchema()
    
    // Saving the data to BigQuery.
    (wordCountDF.write.format("bigquery")
      .option("table","wordcount_dataset.wordcount_output")
      .save())
    
    
  2. クラスタ上でコードを実行します
    1. SSH を使用して Dataproc クラスタのマスターノードに接続する
      1. コンソールで Dataproc の [クラスタ] ページに移動し、クラスタの名前をクリックします。
        Cloud コンソールの Dataproc [クラスタ] ページ。
      2. > [クラスタの詳細] ページで、[VM インスタンス] タブを選択します。次に、クラスタ マスターノードの名前の右側にある SSH をクリックします。
        Cloud コンソールの Dataproc [クラスタの詳細] ページ。

        マスターノード上のホーム ディレクトリでブラウザ・ウィンドウが開きます。
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. 事前にインストールされた vivimnano テキスト エディタで wordcount.scala を作成し、Scala コードの一覧から Scala コードを貼り付けます。
      nano wordcount.scala
        
    3. spark-shell REPL を起動します。
      $ spark-shell --jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar
      ...
      Using Scala version ...
      Type in expressions to have them evaluated.
      Type :help for more information.
      ...
      Spark context available as sc.
      ...
      SQL context available as sqlContext.
      scala>
      
    4. :load wordcount.scala コマンドで wordcount.scala を実行して、BigQuery の wordcount_output テーブルを作成します。出力リストには、ワードカウント出力のうち 20 行が表示されます。
      :load wordcount.scala
      ...
      +---------+----------+
      |     word|word_count|
      +---------+----------+
      |     XVII|         2|
      |    spoil|        28|
      |    Drink|         7|
      |forgetful|         5|
      |   Cannot|        46|
      |    cures|        10|
      |   harder|        13|
      |  tresses|         3|
      |      few|        62|
      |  steel'd|         5|
      | tripping|         7|
      |   travel|        35|
      |   ransom|        55|
      |     hope|       366|
      |       By|       816|
      |     some|      1169|
      |    those|       508|
      |    still|       567|
      |      art|       893|
      |    feign|        10|
      +---------+----------+
      only showing top 20 rows
      
      root
       |-- word: string (nullable = false)
       |-- word_count: long (nullable = true)
      

      出力テーブルをプレビューするには、BigQueryページを開いてwordcount_outputテーブルを選択し、[プレビュー]をクリックします。
      Cloud コンソールの BigQuery Explorer ページでテーブルをプレビューする。

PySpark

  1. コードを調べて、[bucket] プレースホルダを以前に作成した Cloud Storage バケットに置き換えます。
    #!/usr/bin/env python
    
    """BigQuery I/O PySpark example."""
    
    from pyspark.sql import SparkSession
    
    spark = SparkSession \
      .builder \
      .master('yarn') \
      .appName('spark-bigquery-demo') \
      .getOrCreate()
    
    # Use the Cloud Storage bucket for temporary BigQuery export data used
    # by the connector.
    bucket = "[bucket]"
    spark.conf.set('temporaryGcsBucket', bucket)
    
    # Load data from BigQuery.
    words = spark.read.format('bigquery') \
      .option('table', 'bigquery-public-data:samples.shakespeare') \
      .load()
    words.createOrReplaceTempView('words')
    
    # Perform word count.
    word_count = spark.sql(
        'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
    word_count.show()
    word_count.printSchema()
    
    # Saving the data to BigQuery
    word_count.write.format('bigquery') \
      .option('table', 'wordcount_dataset.wordcount_output') \
      .save()
    
  2. クラスタ上でコードを実行する
    1. SSH を使用して Dataproc クラスタのマスターノードに接続する
      1. コンソールで Dataproc の [クラスタ] ページに移動し、クラスタの名前をクリックします。
        Cloud コンソールの [クラスタ] ページ。
      2. [クラスタの詳細] ページで、[VM インスタンス] タブを選択します。次に、クラスタ マスターノードの名前の右側にある SSH をクリックします。
        Cloud コンソールの [クラスタの詳細] ページで、クラスタ名の行の [SSH] を選択します。

        マスターノード上のホーム ディレクトリでブラウザ・ウィンドウが開きます。
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. 事前にインストールされた vivim、または nano テキスト エディタで wordcount.py を作成し、PySpark のコード一覧から PySpark コードを貼り付けます。
      nano wordcount.py
      
    3. spark-submit で wordcount を実行し、BigQuery の wordcount_output テーブルを作成します。出力リストには、ワードカウント出力のうち 20 行が表示されます。
      spark-submit --jars gs://spark-lib/bigquery/spark-bigquery-latest.jar wordcount.py
      ...
      +---------+----------+
      |     word|word_count|
      +---------+----------+
      |     XVII|         2|
      |    spoil|        28|
      |    Drink|         7|
      |forgetful|         5|
      |   Cannot|        46|
      |    cures|        10|
      |   harder|        13|
      |  tresses|         3|
      |      few|        62|
      |  steel'd|         5|
      | tripping|         7|
      |   travel|        35|
      |   ransom|        55|
      |     hope|       366|
      |       By|       816|
      |     some|      1169|
      |    those|       508|
      |    still|       567|
      |      art|       893|
      |    feign|        10|
      +---------+----------+
      only showing top 20 rows
      
      root
       |-- word: string (nullable = false)
       |-- word_count: long (nullable = true)
      

      出力テーブルをプレビューするには、BigQueryページを開いてwordcount_outputテーブルを見つけ、[プレビュー]をクリックします。
      Cloud コンソールの BigQuery Explorer ページでテーブルをプレビューする。

詳細情報