BigQuery コネクタを Apache Spark とともに使用すると、BigQuery との間でデータの読み取りと書き込みを行うことができます。このページでは、BigQuery コネクタを Spark と一緒に使用するコードの例について説明します。クラスタの作成手順については、Cloud Dataproc クイックスタートをご覧ください。
費用
このチュートリアルでは、Google Cloud Platform の課金対象となる以下のコンポーネントを使用します。
- Cloud Dataproc
- BigQuery
- Cloud Storage
料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを出すことができます。 Cloud Platform を初めて使用される方は、無料トライアルをご利用いただけます。
BigQuery のデータの読み取りと書き込み
このサンプルでは、BigQuery から Spark にデータを読み込み、SparkContext.newAPIHadoopRDD
を使用してワード数をカウントします(詳細については、Spark のドキュメントをご覧ください)。次に、PairRDDFunctions.saveAsNewAPIHadoopDataset
を使用して BigQuery にデータを書き戻します。
この例を試す前に、「wordcount_dataset」という名前のデータセットを作成するか、コード内の出力データセットを Google Cloud Platform プロジェクトの既存の BigQuery データセットに変更してください。データセットを作成する bq コマンドは以下のとおりです。
bq mk wordcount_dataset
Scala
コードを調べる
IndirectBigQueryOutputFormat
は、BigQuery テーブルに JsonObject
値を直接書き込む機能を Hadoop に提供します。このクラスを使用すると、Hadoop の OutputFormat クラスの拡張機能により BigQuery レコードにアクセスできるようになります。これを正しく使用するには、Hadoop 構成でいくつかのパラメータを設定し、OutputFormat クラスを IndirectBigQueryOutputFormat
に設定する必要があります。以下は、IndirectBigQueryOutputFormat
を正しく使用するために設定するパラメータと必要なコード行の例です。
import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration import com.google.cloud.hadoop.io.bigquery.BigQueryFileFormat import com.google.cloud.hadoop.io.bigquery.GsonBigQueryInputFormat import com.google.cloud.hadoop.io.bigquery.output.BigQueryOutputConfiguration import com.google.cloud.hadoop.io.bigquery.output.IndirectBigQueryOutputFormat import com.google.gson.JsonObject import org.apache.hadoop.io.LongWritable import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat // Assumes you have a spark context (sc) -- running from spark-shell REPL. // Marked as transient since configuration is not Serializable. This should // only be necessary in spark-shell REPL. @transient val conf = sc.hadoopConfiguration // Input parameters. val fullyQualifiedInputTableId = "publicdata:samples.shakespeare" val projectId = conf.get("fs.gs.project.id") val bucket = conf.get("fs.gs.system.bucket") // Input configuration. conf.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId) conf.set(BigQueryConfiguration.GCS_BUCKET_KEY, bucket) BigQueryConfiguration.configureBigQueryInput(conf, fullyQualifiedInputTableId) // Output parameters. val outputTableId = projectId + ":wordcount_dataset.wordcount_output" // Temp output bucket that is deleted upon completion of job. val outputGcsPath = ("gs://" + bucket + "/hadoop/tmp/bigquery/wordcountoutput") // Output configuration. // Let BigQuery auto-detect output schema (set to null below). BigQueryOutputConfiguration.configureWithAutoSchema( conf, outputTableId, outputGcsPath, BigQueryFileFormat.NEWLINE_DELIMITED_JSON, classOf[TextOutputFormat[_,_]]) conf.set("mapreduce.job.outputformat.class", classOf[IndirectBigQueryOutputFormat[_,_]].getName) // Truncate the table before writing output to allow multiple runs. conf.set(BigQueryConfiguration.OUTPUT_TABLE_WRITE_DISPOSITION_KEY, "WRITE_TRUNCATE") // Helper to convert JsonObjects to (word, count) tuples. def convertToTuple(record: JsonObject) : (String, Long) = { val word = record.get("word").getAsString.toLowerCase val count = record.get("word_count").getAsLong return (word, count) } // Helper to convert (word, count) tuples to JsonObjects. def convertToJson(pair: (String, Long)) : JsonObject = { val word = pair._1 val count = pair._2 val jsonObject = new JsonObject() jsonObject.addProperty("word", word) jsonObject.addProperty("word_count", count) return jsonObject } // Load data from BigQuery. val tableData = sc.newAPIHadoopRDD( conf, classOf[GsonBigQueryInputFormat], classOf[LongWritable], classOf[JsonObject]) // Perform word count. val wordCounts = (tableData .map(entry => convertToTuple(entry._2)) .reduceByKey(_ + _)) // Display 10 results. wordCounts.take(10).foreach(l => println(l)) // Write data back into a new BigQuery table. // IndirectBigQueryOutputFormat discards keys, so set key to null. (wordCounts .map(pair => (null, convertToJson(pair))) .saveAsNewAPIHadoopDataset(conf))
KMS 暗号化: 出力テーブルを KMS で暗号化するには、KMS キーを出力構成に追加します。次に例を示します。
// (Optional) Configure the KMS key used to encrypt the output table. BigQueryOutputConfiguration.setKmsKeyName( conf, "projects/myproject/locations/us-west1/keyRings/r1/cryptoKeys/k1");
クラスタ上でコードを実行する
- Cloud Dataproc クラスタのマスターノードに SSH 接続します。
- GCP Console でプロジェクトの Cloud Dataproc の [クラスタ] ページに移動し、クラスタ名をクリックします。
- クラスタの詳細ページで [VM インスタンス] タブを選択し、クラスタのマスターノード名の右側に表示される [SSH] 選択ボタンをクリックします。
マスターノード上のホーム ディレクトリでブラウザ ウィンドウが開きます。Connected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- GCP Console でプロジェクトの Cloud Dataproc の [クラスタ] ページに移動し、クラスタ名をクリックします。
- プリインストールされた
vi
、vim
、nano
テキストエディタでwordcount.scala
を作成し、Scala コードリストから Scala コードを貼り付けます。nano wordcount.scala
spark-shell
REPL を起動します(Cloud Dataproc 1.0~1.2 を使用する場合、jars
パラメータを削除してください)。$ spark-shell --jars=gs://hadoop-lib/bigquery/bigquery-connector-hadoop2-latest.jar ... Using Scala version ... Type in expressions to have them evaluated. Type :help for more information. ... Spark context available as sc. ... SQL context available as sqlContext. scala>
- wordcount.scala を
:load wordcount.scala
コマンドで実行し、BigQuery のwordcount_output
テーブルを作成します。出力リストには、ワードカウント出力のうち 10 行が表示されます。:load wordcount.scala ... (pinnace,3) (bone,21) (lug,2) (vailing,2) (bombast,3) (gaping,11) (hem,5) ('non,1) (stinks,1) (forsooth,48)
出力テーブルをプレビューするには、プロジェクトの BigQuery ページでwordcount_output
テーブルを選択し、[プレビュー] をクリックします。
PySpark
コードを調べる
#!/usr/bin/python """BigQuery I/O PySpark example.""" from __future__ import absolute_import import json import pprint import subprocess import pyspark from pyspark.sql import SQLContext sc = pyspark.SparkContext() # Use the Cloud Storage bucket for temporary BigQuery export data used # by the InputFormat. This assumes the Cloud Storage connector for # Hadoop is configured. bucket = sc._jsc.hadoopConfiguration().get('fs.gs.system.bucket') project = sc._jsc.hadoopConfiguration().get('fs.gs.project.id') input_directory = 'gs://{}/hadoop/tmp/bigquery/pyspark_input'.format(bucket) conf = { # Input Parameters. 'mapred.bq.project.id': project, 'mapred.bq.gcs.bucket': bucket, 'mapred.bq.temp.gcs.path': input_directory, 'mapred.bq.input.project.id': 'publicdata', 'mapred.bq.input.dataset.id': 'samples', 'mapred.bq.input.table.id': 'shakespeare', } # Output Parameters. output_dataset = 'wordcount_dataset' output_table = 'wordcount_output' # Load data in from BigQuery. table_data = sc.newAPIHadoopRDD( 'com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat', 'org.apache.hadoop.io.LongWritable', 'com.google.gson.JsonObject', conf=conf) # Perform word count. word_counts = ( table_data .map(lambda record: json.loads(record[1])) .map(lambda x: (x['word'].lower(), int(x['word_count']))) .reduceByKey(lambda x, y: x + y)) # Display 10 results. pprint.pprint(word_counts.take(10)) # Stage data formatted as newline-delimited JSON in Cloud Storage. output_directory = 'gs://{}/hadoop/tmp/bigquery/pyspark_output'.format(bucket) output_files = output_directory + '/part-*' sql_context = SQLContext(sc) (word_counts .toDF(['word', 'word_count']) .write.format('json').save(output_directory)) # Shell out to bq CLI to perform BigQuery import. subprocess.check_call( 'bq load --source_format NEWLINE_DELIMITED_JSON ' '--replace ' '--autodetect ' '{dataset}.{table} {files}'.format( dataset=output_dataset, table=output_table, files=output_files ).split()) # Manually clean up the staging_directories, otherwise BigQuery # files will remain indefinitely. input_path = sc._jvm.org.apache.hadoop.fs.Path(input_directory) input_path.getFileSystem(sc._jsc.hadoopConfiguration()).delete(input_path, True) output_path = sc._jvm.org.apache.hadoop.fs.Path(output_directory) output_path.getFileSystem(sc._jsc.hadoopConfiguration()).delete( output_path, True)
クラスタ上でコードを実行する
- Cloud Dataproc クラスタのマスターノードに SSH 接続します。
- GCP Console でプロジェクトの Cloud Dataproc の [クラスタ] ページに移動し、クラスタ名をクリックします。
- クラスタの詳細ページで [VM インスタンス] タブを選択し、クラスタのマスターノード名の右側に表示される [SSH] 選択ボタンをクリックします。
マスターノード上のホーム ディレクトリでブラウザ ウィンドウが開きます。Connected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- GCP Console でプロジェクトの Cloud Dataproc の [クラスタ] ページに移動し、クラスタ名をクリックします。
- 事前にインストールされた
vi
、vim
、nano
テキスト エディタでwordcount.py
を作成し、PySpark コードリストから PySpark コードを貼り付けます。nano wordcount.py
spark-submit
で wordcount を実行し、BigQuery のwordcount_output
テーブルを作成します。出力リストには、ワードカウント出力のうち 10 行が表示されます。spark-submit --jars gs://hadoop-lib/bigquery/bigquery-connector-hadoop2-latest.jar wordcount.py ... (pinnace,3) (bone,21) (lug,2) (vailing,2) (bombast,3) (gaping,11) (hem,5) ('non,1) (stinks,1) (forsooth,48)
出力テーブルをプレビューするには、プロジェクトの BigQuery ページでwordcount_output
テーブルを選択し、[プレビュー] をクリックします。