spark-bigquery-connector を Apache Spark とともに使用すると、BigQuery との間でデータの読み書きを行えます。 このチュートリアルでは、Spark アプリケーション内で spark-bigquery-connector を使用するコード例を示します。クラスタの作成手順については、Dataproc のクイックスタートをご覧ください。
アプリケーションでコネクタを使用できるようにする
次のいずれかの方法で、spark-bigquery-connector をアプリケーションで使用できるようにします。
クラスタの作成時に Dataproc コネクタ初期化アクションを使用して、すべてのノードの Spark の jars ディレクトリに spark-bigquery-connector をインストールします。
ジョブを送信するときに、次のコネクタ URI を指定します。
- Google Cloud コンソール: Dataproc の [ジョブの送信] ページにある Spark ジョブの
Jars files
項目を使用します。 - gcloud CLI:
gcloud dataproc jobs submit spark --jars
フラグを使用します。 - Dataproc API:
SparkJob.jarFileUris
フィールドを使用します。
- Google Cloud コンソール: Dataproc の [ジョブの送信] ページにある Spark ジョブの
Scala または Java Spark アプリケーションに依存関係として jar を含めます(コネクタのコンパイルをご覧ください)。
コネクタの jar URI を指定する方法
Spark-BigQuery コネクタのバージョンは、GitHub の GoogleCloudDataproc/spark-bigquery-connector リポジトリに記載されています。
コネクタ jar は、次の URI 文字列の Scala とコネクタのバージョン情報を置き換えて指定します。
gs://spark-lib/bigquery/spark-bigquery-with-dependencies_SCALA_VERSION-CONNECTOR_VERSION.jar
Dataproc イメージ バージョン
1.5+
で Scala2.12
を使用するgs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-CONNECTOR_VERSION.jar
gcloud CLI の例:
gcloud dataproc jobs submit spark \ --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.23.2.jar \ -- job-args
Dataproc イメージ バージョン
1.4
以前には Scala2.11
を使用します。gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-CONNECTOR_VERSION.jar
gcloud CLI の例:
gcloud dataproc jobs submit spark \ --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-0.23.2.jar \ -- job-args
費用の計算
このドキュメントでは、Google Cloud の次の課金対象のコンポーネントを使用します。
- Dataproc
- BigQuery
- Cloud Storage
料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。
BigQuery のデータの読み取りと書き込み
このサンプルでは、BigQuery から Spark DataFrame にデータを読み込み、標準データソース API を使用してワード数をカウントします。
コネクタは、最初にすべてのデータを Cloud Storage の一時テーブルにバッファリングしてから、BigQuery にデータを書き込みます。その後、1 回のオペレーションですべてのデータを BigQuery にコピーします。BigQuery の読み込みオペレーションが成功した直後と、Spark アプリケーションが終了する際に、コネクタは一時ファイルの削除を試みます。ジョブが失敗した場合は、残っている Cloud Storage の一時ファイルをすべて削除します。通常、一時 BigQuery ファイルは gs://[bucket]/.spark-bigquery-[jobid]-[UUID]
に配置されます。
課金の設定
デフォルトでは、認証情報またはサービス アカウントに関連付けられているプロジェクトには、API 使用量に対して課金されます。別のプロジェクトに請求するには、次のように構成します: spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>")
また、読み取り/書き込みオペレーションに次のように追加することもできます: .option("parentProject", "<BILLED-GCP-PROJECT>")
。
コードの実行
このサンプルを実行する場合は、事前に「wordcount_dataset」という名前のデータセットを作成するか、コード内の出力データセットを Google Cloud プロジェクトの既存の BigQuery データセットに変更します。
次のように bq コマンドを使用して wordcount_dataset
を作成します。
bq mk wordcount_dataset
Google Cloud CLI コマンドを使用して、Cloud Storage バケットを作成します。これは、BigQuery にエクスポートするために使われます。
gcloud storage buckets create gs://[bucket]
Scala
- コードを調査し、[bucket] プレースホルダを以前に作成した Cloud Storage バケットに置き換えます。
/* * Remove comment if you are not running in spark-shell. * import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .appName("spark-bigquery-demo") .getOrCreate() */ // Use the Cloud Storage bucket for temporary BigQuery export data used // by the connector. val bucket = "[bucket]" spark.conf.set("temporaryGcsBucket", bucket) // Load data in from BigQuery. See // https://github.com/GoogleCloudDataproc/spark-bigquery-connector/tree/0.17.3#properties // for option information. val wordsDF = (spark.read.format("bigquery") .option("table","bigquery-public-data:samples.shakespeare") .load() .cache()) wordsDF.createOrReplaceTempView("words") // Perform word count. val wordCountDF = spark.sql( "SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word") wordCountDF.show() wordCountDF.printSchema() // Saving the data to BigQuery. (wordCountDF.write.format("bigquery") .option("table","wordcount_dataset.wordcount_output") .save())
- クラスタ上でコードを実行します
- SSH を使用して Dataproc クラスタ マスター ノードに接続します。
- Google Cloud コンソールで Dataproc の [クラスタ] ページに移動し、クラスタの名前をクリックします。
- [クラスタの詳細] ページで、[VM インスタンス] タブを選択します。次に、クラスタ マスターノード名の右側にある
SSH
をクリックします。
マスターノードのホーム ディレクトリでブラウザ ウィンドウが開きます。Connected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- 事前にインストールされた
vi
、vim
、nano
テキスト エディタでwordcount.scala
を作成し、Scala コードの一覧から Scala コードを貼り付けます。nano wordcount.scala
spark-shell
REPL を起動します。$ spark-shell --jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar ... Using Scala version ... Type in expressions to have them evaluated. Type :help for more information. ... Spark context available as sc. ... SQL context available as sqlContext. scala>
:load wordcount.scala
コマンドで wordcount.scala を実行して、BigQuery のwordcount_output
テーブルを作成します。出力リストには、ワードカウント出力のうち 20 行が表示されます。:load wordcount.scala ... +---------+----------+ | word|word_count| +---------+----------+ | XVII| 2| | spoil| 28| | Drink| 7| |forgetful| 5| | Cannot| 46| | cures| 10| | harder| 13| | tresses| 3| | few| 62| | steel'd| 5| | tripping| 7| | travel| 35| | ransom| 55| | hope| 366| | By| 816| | some| 1169| | those| 508| | still| 567| | art| 893| | feign| 10| +---------+----------+ only showing top 20 rows root |-- word: string (nullable = false) |-- word_count: long (nullable = true)
出力テーブルをプレビューするには、[BigQuery
] ページを開いて、[wordcount_output
] テーブルを選択してから、[プレビュー]をクリックします。
- SSH を使用して Dataproc クラスタ マスター ノードに接続します。
PySpark
- コードを調査し、[bucket] プレースホルダを以前に作成した Cloud Storage バケットに置き換えます。
#!/usr/bin/env python """BigQuery I/O PySpark example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .master('yarn') \ .appName('spark-bigquery-demo') \ .getOrCreate() # Use the Cloud Storage bucket for temporary BigQuery export data used # by the connector. bucket = "[bucket]" spark.conf.set('temporaryGcsBucket', bucket) # Load data from BigQuery. words = spark.read.format('bigquery') \ .option('table', 'bigquery-public-data:samples.shakespeare') \ .load() words.createOrReplaceTempView('words') # Perform word count. word_count = spark.sql( 'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word') word_count.show() word_count.printSchema() # Save the data to BigQuery word_count.write.format('bigquery') \ .option('table', 'wordcount_dataset.wordcount_output') \ .save()
- クラスタ上でコードを実行する
- SSH を使用して Dataproc クラスタ マスター ノードに接続します。
- Google Cloud コンソールで Dataproc の [クラスタ] ページに移動し、クラスタの名前をクリックします。
- [クラスタの詳細] ページで、[VM インスタンス] タブを選択します。次に、クラスタ マスターノード名の右側にある
SSH
をクリックします。
マスターノードのホーム ディレクトリでブラウザ ウィンドウが開きます。Connected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- 事前にインストールされた
vi
、vim
、またはnano
テキスト エディタでwordcount.py
を作成し、PySpark のコード一覧から PySpark コードを貼り付けます。nano wordcount.py
spark-submit
で wordcount を実行し、BigQuery のwordcount_output
テーブルを作成します。出力リストには、ワードカウント出力のうち 20 行が表示されます。spark-submit --jars gs://spark-lib/bigquery/spark-bigquery-latest.jar wordcount.py ... +---------+----------+ | word|word_count| +---------+----------+ | XVII| 2| | spoil| 28| | Drink| 7| |forgetful| 5| | Cannot| 46| | cures| 10| | harder| 13| | tresses| 3| | few| 62| | steel'd| 5| | tripping| 7| | travel| 35| | ransom| 55| | hope| 366| | By| 816| | some| 1169| | those| 508| | still| 567| | art| 893| | feign| 10| +---------+----------+ only showing top 20 rows root |-- word: string (nullable = false) |-- word_count: long (nullable = true)
出力テーブルをプレビューするには、[BigQuery
] ページを開いて、[wordcount_output
] テーブルを選択してから、[プレビュー]をクリックします。
- SSH を使用して Dataproc クラスタ マスター ノードに接続します。
詳細情報
- BigQuery ストレージと Spark SQL - Python
- 外部データソースに対するテーブル定義ファイルの作成
- 外部でパーティションに分割されたデータのクエリ
- Spark ジョブ調整のヒント