spark-bigquery-connector を Apache Spark とともに使用すると、BigQuery との間でデータの読み書きを行えます。このチュートリアルでは、Spark アプリケーション内で spark-bigquery-connector を使用するコード例を示します。クラスタの作成手順については、Dataproc のクイックスタートをご覧ください。
アプリケーションにコネクタを提供する
spark-bigquery-connector は、実行中にアプリケーションで使用できる必要があります。これは次のいずれかの方法で実現できます。
- クラスタの作成時に Dataproc コネクタ初期化アクションを使用して、すべてのノードの Spark の jars ディレクトリに spark-bigquery-connector をインストールします。
--jars
パラメータを使用して実行時にコネクタを追加する。このコネクタは、Dataproc API またはspark-submit
とともに使用されます。- Dataproc イメージ 1.5 を使用している場合は、次のパラメータを追加します。
--jars=gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar
- Dataproc イメージ 1.4 かそれ未満を使用している場合は、次のパラメータを追加します。
--jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar
- Dataproc イメージ 1.5 を使用している場合は、次のパラメータを追加します。
- Scala または Java Spark アプリケーションに依存関係として jar を含める(コネクタのコンパイルをご覧ください)。
実行時にコネクタが使用できない場合は、ClassNotFoundException
がスローされます。
費用の計算
このチュートリアルでは、Google Cloud の課金対象となる以下のコンポーネントを使用します。
- Dataproc
- BigQuery
- Cloud Storage
料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを出すことができます。Cloud Platform の新規ユーザーは無料トライアルをご利用いただけます。
BigQuery のデータの読み取りと書き込み
このサンプルでは、BigQuery から Spark DataFrame にデータを読み込み、標準データソース API を使用してワード数をカウントします。
コネクタは、最初にすべてのデータを Cloud Storage の一時テーブルにバッファリングしてから、1 回のオペレーションですべてのデータを BigQuery へコピーします。BigQuery の読み込みオペレーションが成功した直後と、Spark アプリケーションが終了する際に、コネクタは一時ファイルの削除を試みます。ジョブが失敗した場合、残っている Cloud Storage の一時ファイルの手動削除が必要となる場合があります。通常は、BigQuery からのエクスポートされたものは、一時的に gs://[bucket]/.spark-bigquery-[jobid]-[UUID]
に保存されます。
課金の設定
デフォルトでは、認証情報またはサービス アカウントに関連付けられているプロジェクトには、API 使用量に対して課金されます。別のプロジェクトに請求するには、次のように構成します: spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>")
また、読み取り/書き込みオペレーションに次のように追加することもできます: .option("parentProject", "<BILLED-GCP-PROJECT>")
。
コードの実行
このサンプルを実行する場合は、事前に「wordcount_dataset」という名前のデータセットを作成するか、コード内の出力データセットを Google Cloud プロジェクトの既存の BigQuery データセットに変更します。
次のように bq コマンドを使用して wordcount_dataset
を作成します。
bq mk wordcount_dataset
gsutil コマンドを使用して、Cloud Storage バケットを作成します。これは、BigQuery にエクスポートするために使われます。
gsutil mb gs://[bucket]
Scala
- コードを調べて、[bucket] プレースホルダを上記で作成した Cloud Storage バケットに置き換えます。
/* * Remove comment if you are not running in spark-shell. * import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .appName("spark-bigquery-demo") .getOrCreate() */ // Use the Cloud Storage bucket for temporary BigQuery export data used // by the connector. val bucket = "[bucket]" spark.conf.set("temporaryGcsBucket", bucket) // Load data in from BigQuery. See // https://github.com/GoogleCloudDataproc/spark-bigquery-connector/tree/0.17.3#properties // for option information. val wordsDF = spark.read.format("bigquery") .option("table","bigquery-public-data:samples.shakespeare") .load() .cache() wordsDF.createOrReplaceTempView("words") // Perform word count. val wordCountDF = spark.sql( "SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word") wordCountDF.show() wordCountDF.printSchema() // Saving the data to BigQuery. wordCountDF.write.format("bigquery") .option("table","wordcount_dataset.wordcount_output") .save()
- クラスタ上でコードを実行します
- Dataproc クラスタのマスターノードに SSH で接続します。
- Cloud Console でプロジェクトの Dataproc クラスタ ページを開き、クラスタの名前をクリックします。
- クラスタの詳細ページで VM インスタンス タブを選択し、クラスタのマスターノード名の右側に表示される SSH 選択をクリックします。
ブラウザ ウィンドウがマスターノードのホーム ディレクトリで開きます。Connected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Cloud Console でプロジェクトの Dataproc クラスタ ページを開き、クラスタの名前をクリックします。
- 事前にインストールされた
vi
、vim
、nano
テキスト エディタでwordcount.scala
を作成し、Scala コードの一覧から Scala コードを貼り付けます。nano wordcount.scala
spark-shell
REPL を起動します。$ spark-shell --jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar ... Using Scala version ... Type in expressions to have them evaluated. Type :help for more information. ... Spark context available as sc. ... SQL context available as sqlContext. scala>
:load wordcount.scala
コマンドで wordcount.scala を実行して、BigQuery のwordcount_output
テーブルを作成します。出力リストには、ワードカウント出力のうち 20 行が表示されます。:load wordcount.scala ... +---------+----------+ | word|word_count| +---------+----------+ | XVII| 2| | spoil| 28| | Drink| 7| |forgetful| 5| | Cannot| 46| | cures| 10| | harder| 13| | tresses| 3| | few| 62| | steel'd| 5| | tripping| 7| | travel| 35| | ransom| 55| | hope| 366| | By| 816| | some| 1169| | those| 508| | still| 567| | art| 893| | feign| 10| +---------+----------+ only showing top 20 rows root |-- word: string (nullable = false) |-- word_count: long (nullable = true)
出力テーブルをプレビューするには、プロジェクトの BigQuery ページでwordcount_output
テーブルを選択し、[プレビュー] をクリックします。
- Dataproc クラスタのマスターノードに SSH で接続します。
PySpark
- コードを調べて、[bucket] プレースホルダを上記で作成した Cloud Storage バケットに置き換えます。
#!/usr/bin/python """BigQuery I/O PySpark example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .master('yarn') \ .appName('spark-bigquery-demo') \ .getOrCreate() # Use the Cloud Storage bucket for temporary BigQuery export data used # by the connector. bucket = "[bucket]" spark.conf.set('temporaryGcsBucket', bucket) # Load data from BigQuery. words = spark.read.format('bigquery') \ .option('table', 'bigquery-public-data:samples.shakespeare') \ .load() words.createOrReplaceTempView('words') # Perform word count. word_count = spark.sql( 'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word') word_count.show() word_count.printSchema() # Saving the data to BigQuery word_count.write.format('bigquery') \ .option('table', 'wordcount_dataset.wordcount_output') \ .save()
- クラスタ上でコードを実行します
- Dataproc クラスタのマスターノードに SSH で接続します。
- Cloud Console でプロジェクトの Dataproc クラスタ ページを開き、クラスタの名前をクリックします。
- クラスタの詳細ページで VM インスタンス タブを選択し、クラスタのマスターノード名の右側に表示される SSH 選択をクリックします。
ブラウザ ウィンドウがマスターノードのホーム ディレクトリで開きます。Connected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Cloud Console でプロジェクトの Dataproc クラスタ ページを開き、クラスタの名前をクリックします。
- 事前にインストールされた
vi
、vim
、またはnano
テキスト エディタでwordcount.py
を作成し、PySpark のコード一覧から PySpark コードを貼り付けます。nano wordcount.py
spark-submit
で wordcount を実行し、BigQuery のwordcount_output
テーブルを作成します。出力リストには、ワードカウント出力のうち 20 行が表示されます。spark-submit --jars gs://spark-lib/bigquery/spark-bigquery-latest.jar wordcount.py ... +---------+----------+ | word|word_count| +---------+----------+ | XVII| 2| | spoil| 28| | Drink| 7| |forgetful| 5| | Cannot| 46| | cures| 10| | harder| 13| | tresses| 3| | few| 62| | steel'd| 5| | tripping| 7| | travel| 35| | ransom| 55| | hope| 366| | By| 816| | some| 1169| | those| 508| | still| 567| | art| 893| | feign| 10| +---------+----------+ only showing top 20 rows root |-- word: string (nullable = false) |-- word_count: long (nullable = true)
出力テーブルをプレビューするには、プロジェクトの BigQuery ページでwordcount_output
テーブルを選択し、[プレビュー] をクリックします。
- Dataproc クラスタのマスターノードに SSH で接続します。
詳細情報
- BigQuery ストレージと Spark SQL - Python
- 外部データソースに対するテーブル定義ファイルの作成
- 外部でパーティションに分割されたデータのクエリ
- Spark ジョブ調整のヒント