BigQuery コネクタを Spark と使用する

BigQuery コネクタApache Spark と併用して、BigQuery に対するデータの読み取りと書き込みを行うことができます。このページでは、BigQuery コネクタを Spark と一緒に使用するコードの例について説明します。クラスタの作成手順については、Cloud Dataproc クイックスタートをご覧ください。

BigQuery のデータの読み取りと書き込み

この例では、BigQuery から Spark にデータを読み込み、SparkContext.newAPIHadoopRDD を使用してワード数をカウントします(詳しくは、Spark のドキュメントをご覧ください)。次に、PairRDDFunctions.saveAsNewAPIHadoopDataset を使用して BigQuery にデータを書き戻します。

この例を試す前に、「wordcount_dataset」という名前のデータセットを作成するか、コード内の出力データセットを Google Cloud Platform プロジェクトの既存の BigQuery データセットに変更してください。データセットを作成する bq コマンドは以下のとおりです。

bq mk wordcount_dataset

Scala

コードを調べる

IndirectBigQueryOutputFormat は、BigQuery テーブルに JsonObject 値を直接書き込む機能を Hadoop に提供します。このクラスを使用すると、Hadoop の OutputFormat クラスの拡張機能により BigQuery レコードにアクセスできるようになります。これを正しく使用するには、Hadoop 構成でいくつかのパラメータを設定し、OutputFormat クラスを IndirectBigQueryOutputFormat に設定する必要があります。以下は、IndirectBigQueryOutputFormat を正しく使用するために設定するパラメータと必要なコード行の例です。
import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration
import com.google.cloud.hadoop.io.bigquery.BigQueryFileFormat
import com.google.cloud.hadoop.io.bigquery.GsonBigQueryInputFormat
import com.google.cloud.hadoop.io.bigquery.output.BigQueryOutputConfiguration
import com.google.cloud.hadoop.io.bigquery.output.IndirectBigQueryOutputFormat
import com.google.gson.JsonObject
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat

// Assumes you have a spark context (sc) -- running from spark-shell REPL.
// Marked as transient since configuration is not Serializable. This should
// only be necessary in spark-shell REPL.
@transient
val conf = sc.hadoopConfiguration

// Input parameters.
val fullyQualifiedInputTableId = "publicdata:samples.shakespeare"
val projectId = conf.get("fs.gs.project.id")
val bucket = conf.get("fs.gs.system.bucket")

// Input configuration.
conf.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId)
conf.set(BigQueryConfiguration.GCS_BUCKET_KEY, bucket)
BigQueryConfiguration.configureBigQueryInput(conf, fullyQualifiedInputTableId)

// Output parameters.
val outputTableId = projectId + ":wordcount_dataset.wordcount_output"
// Temp output bucket that is deleted upon completion of job.
val outputGcsPath = ("gs://" + bucket + "/hadoop/tmp/bigquery/wordcountoutput")

// Output configuration.
// Let BigQuery auto-detect output schema (set to null below).
BigQueryOutputConfiguration.configureWithAutoSchema(
    conf,
    outputTableId,
    outputGcsPath,
    BigQueryFileFormat.NEWLINE_DELIMITED_JSON,
    classOf[TextOutputFormat[_,_]])

// (Optional) Configure the KMS key used to encrypt the output table.
BigQueryOutputConfiguration.setKmsKeyName(
    conf, "projects/myproject/locations/us-west1/keyRings/r1/cryptoKeys/k1");

conf.set("mapreduce.job.outputformat.class",
         classOf[IndirectBigQueryOutputFormat[_,_]].getName)

// Truncate the table before writing output to allow multiple runs.
conf.set(BigQueryConfiguration.OUTPUT_TABLE_WRITE_DISPOSITION_KEY,
         "WRITE_TRUNCATE")

// Helper to convert JsonObjects to (word, count) tuples.
def convertToTuple(record: JsonObject) : (String, Long) = {
  val word = record.get("word").getAsString.toLowerCase
  val count = record.get("word_count").getAsLong
  return (word, count)
}

// Helper to convert (word, count) tuples to JsonObjects.
def convertToJson(pair: (String, Long)) : JsonObject = {
  val word = pair._1
  val count = pair._2
  val jsonObject = new JsonObject()
  jsonObject.addProperty("word", word)
  jsonObject.addProperty("word_count", count)
  return jsonObject
}

// Load data from BigQuery.
val tableData = sc.newAPIHadoopRDD(
    conf,
    classOf[GsonBigQueryInputFormat],
    classOf[LongWritable],
    classOf[JsonObject])

// Perform word count.
val wordCounts = (tableData
    .map(entry => convertToTuple(entry._2))
    .reduceByKey(_ + _))

// Display 10 results.
wordCounts.take(10).foreach(l => println(l))

// Write data back into a new BigQuery table.
// IndirectBigQueryOutputFormat discards keys, so set key to null.
(wordCounts
    .map(pair => (null, convertToJson(pair)))
    .saveAsNewAPIHadoopDataset(conf))

クラスタ上でコードを実行する

  1. Cloud Dataproc クラスタのマスターノードに SSH 接続します。
    1. GCP Console でプロジェクトの Cloud Dataproc の [クラスタ] ページに移動し、クラスタ名をクリックします。
    2. クラスタの詳細ページで [VM インスタンス] タブを選択し、クラスタのマスターノード名の右側に表示される [SSH] 選択ボタンをクリックします。

      マスターノード上のホーム ディレクトリでブラウザ ウィンドウが開きます。
          Connected, host fingerprint: ssh-rsa 2048 ...
          ...
          user@clusterName-m:~$
          
  2. プリインストールされた vivimnano テキストエディタで wordcount.scala を作成し、Scala コードリストから Scala コードを貼り付けます。
    nano wordcount.scala
      
  3. spark-shell REPL を起動します(Cloud Dataproc 1.0~1.2 を使用する場合、jars パラメータを削除してください)。
    $ spark-shell --jars=gs://hadoop-lib/bigquery/bigquery-connector-hadoop2-latest.jar
    ...
    Using Scala version ...
    Type in expressions to have them evaluated.
    Type :help for more information.
    ...
    Spark context available as sc.
    ...
    SQL context available as sqlContext.
    scala>
    
  4. wordcount.scala を :load wordcount.scala コマンドで実行し、BigQuery の wordcount_output テーブルを作成します。出力リストには、ワードカウント出力のうち 10 行が表示されます。
    :load wordcount.scala
    ...
    (pinnace,3)
    (bone,21)
    (lug,2)
    (vailing,2)
    (bombast,3)
    (gaping,11)
    (hem,5)
    ('non,1)
    (stinks,1)
    (forsooth,48)
    

    出力テーブルをプレビューするには、プロジェクトの BigQuery ページで wordcount_output テーブルを選択し、[Preview] をクリックします。

PySpark

コードを調べる

#!/usr/bin/python
"""BigQuery I/O PySpark example."""
from __future__ import absolute_import
import json
import pprint
import subprocess
import pyspark
from pyspark.sql import SQLContext

sc = pyspark.SparkContext()

# Use the Cloud Storage bucket for temporary BigQuery export data used
# by the InputFormat. This assumes the Cloud Storage connector for
# Hadoop is configured.
bucket = sc._jsc.hadoopConfiguration().get('fs.gs.system.bucket')
project = sc._jsc.hadoopConfiguration().get('fs.gs.project.id')
input_directory = 'gs://{}/hadoop/tmp/bigquery/pyspark_input'.format(bucket)

conf = {
    # Input Parameters.
    'mapred.bq.project.id': project,
    'mapred.bq.gcs.bucket': bucket,
    'mapred.bq.temp.gcs.path': input_directory,
    'mapred.bq.input.project.id': 'publicdata',
    'mapred.bq.input.dataset.id': 'samples',
    'mapred.bq.input.table.id': 'shakespeare',
}

# Output Parameters.
output_dataset = 'wordcount_dataset'
output_table = 'wordcount_output'

# Load data in from BigQuery.
table_data = sc.newAPIHadoopRDD(
    'com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat',
    'org.apache.hadoop.io.LongWritable',
    'com.google.gson.JsonObject',
    conf=conf)

# Perform word count.
word_counts = (
    table_data
    .map(lambda record: json.loads(record[1]))
    .map(lambda x: (x['word'].lower(), int(x['word_count'])))
    .reduceByKey(lambda x, y: x + y))

# Display 10 results.
pprint.pprint(word_counts.take(10))

# Stage data formatted as newline-delimited JSON in Cloud Storage.
output_directory = 'gs://{}/hadoop/tmp/bigquery/pyspark_output'.format(bucket)
output_files = output_directory + '/part-*'

sql_context = SQLContext(sc)
(word_counts
 .toDF(['word', 'word_count'])
 .write.format('json').save(output_directory))

# Shell out to bq CLI to perform BigQuery import.
subprocess.check_call(
    'bq load --source_format NEWLINE_DELIMITED_JSON '
    '--replace '
    '--autodetect '
    '{dataset}.{table} {files}'.format(
        dataset=output_dataset, table=output_table, files=output_files
    ).split())

# Manually clean up the staging_directories, otherwise BigQuery
# files will remain indefinitely.
input_path = sc._jvm.org.apache.hadoop.fs.Path(input_directory)
input_path.getFileSystem(sc._jsc.hadoopConfiguration()).delete(input_path, True)
output_path = sc._jvm.org.apache.hadoop.fs.Path(output_directory)
output_path.getFileSystem(sc._jsc.hadoopConfiguration()).delete(
    output_path, True)

クラスタ上でコードを実行する

  1. Cloud Dataproc クラスタのマスターノードに SSH 接続します。
    1. GCP Console でプロジェクトの Cloud Dataproc の [クラスタ] ページに移動し、クラスタ名をクリックします。
    2. クラスタの詳細ページで [VM インスタンス] タブを選択し、クラスタのマスターノード名の右側に表示される [SSH] 選択ボタンをクリックします。

      マスターノード上のホーム ディレクトリでブラウザ ウィンドウが開きます。
          Connected, host fingerprint: ssh-rsa 2048 ...
          ...
          user@clusterName-m:~$
          
  2. 事前にインストールされた vivimnano テキスト エディタで wordcount.py を作成し、PySpark コードリストから PySpark コードを貼り付けます。
    nano wordcount.py
      
  3. spark-submit で wordcount を実行し、BigQuery の wordcount_output テーブルを作成します。出力リストには、ワードカウント出力のうち 10 行が表示されます。
    spark-submit wordcount.py
    ...
    (pinnace,3)
    (bone,21)
    (lug,2)
    (vailing,2)
    (bombast,3)
    (gaping,11)
    (hem,5)
    ('non,1)
    (stinks,1)
    (forsooth,48)
    

    出力テーブルをプレビューするには、プロジェクトの BigQuery ページで wordcount_output テーブルを選択し、[Preview] をクリックします。
このページは役立ちましたか?評価をお願いいたします。

フィードバックを送信...

Cloud Dataproc ドキュメント
ご不明な点がありましたら、Google のサポートページをご覧ください。