このチュートリアルでは、Spark Scala ジョブを作成し、Dataproc クラスタに送信するさまざまな方法について説明します。次の方法が含まれます。
- Scala REPL(Read-Evaluate-Print-Loop またはインタラクティブ インタープリタ)または SBT ビルドツールを使用して、コマンドラインから、ローカルマシンで Spark Scala の「Hello World」アプリを作成しコンパイルする
- コンパイルした Scala クラスをマニフェストとともに jar ファイルにコンパイルする
- Dataproc クラスタで実行される Spark ジョブに Scala jar を送信する
- Google Cloud コンソールから Scala ジョブ出力を確認する
このチュートリアルでは、次の方法についても説明します。
- Dataproc クラスタ上で、 - spark-shellREPL を使用して Spark Scala の「WordCount」MapReduce ジョブを直接作成し、実行する
- プレインストールされた Apache Spark と Hadoop のサンプルをクラスタで実行する 
Google Cloud Platform プロジェクトの設定
次の手順をまだ行っていない場合は、行います。
ローカルでの Scala コードの作成とコンパイル
このチュートリアルの簡単な演習として、開発マシンで Scala REPL または SBT コマンドライン インターフェースを使用し、ローカルで「Hello World」Scala アプリを作成します。
Scala を使用する
- Scala のインストール ページから Scala バイナリをダウンロードします。
- Scala のインストールの手順に沿って、ファイルを解凍し、 - SCALA_HOME環境変数を設定して、パスに追加します。例:- export SCALA_HOME=/usr/local/share/scala export PATH=$PATH:$SCALA_HOME/ 
- Scala REPL を起動します。 - $ scala Welcome to Scala version ... Type in expressions to have them evaluated. Type :help for more information. scala> 
- HelloWorldコードをコピーして Scala REPL に貼り付けます。- object HelloWorld { def main(args: Array[String]): Unit = { println("Hello, world!") } } 
- HelloWorld.scalaを保存して REPL を終了します。- scala> :save HelloWorld.scala scala> :q 
- scalacをコンパイルします。- $ scalac HelloWorld.scala 
- コンパイル済みの - .classファイルを一覧表示します。- $ ls HelloWorld*.class HelloWorld$.class HelloWorld.class 
SBT を使用する
- 次のように、「HelloWorld」プロジェクトを作成します。 - $ mkdir hello $ cd hello $ echo \ 'object HelloWorld {def main(args: Array[String]) = println("Hello, world!")}' > \ HelloWorld.scala
- sbt.build構成ファイルを作成して「HelloWorld.jar」に- artifactName(生成した jar ファイル名は下記に表示)を設定します(デフォルトのアーティファクトの変更をご覧ください)。- echo \ 'artifactName := { (sv: ScalaVersion, module: ModuleID, artifact: Artifact) => "HelloWorld.jar" }' > \ build.sbt
- SBT を起動し、コードを実行します。 - $ sbt [info] Set current project to hello ... > run ... Compiling 1 Scala source to .../hello/target/scala-.../classes... ... Running HelloWorld Hello, world! [success] Total time: 3 s ... 
- コードをメインクラスのエントリ ポイントを指定するマニフェスト( - HelloWorld)とともに jar ファイルにパッケージ化し、終了します。- > package ... Packaging .../hello/target/scala-.../HelloWorld.jar ... ... Done packaging. [success] Total time: ... > exit 
jar を作成する
SBT または jar コマンドを使用して jar ファイルを作成します。
SBT で jar を作成する
SBT package コマンドは jar ファイルを作成します(SBT を使用するをご覧ください)。
手動で jar を作成する
- コンパイル済みの HelloWorld*.classファイルを含むディレクトリにディレクトリを変更(cd)し、次のコマンドを実行して、メインクラスのエントリ ポイントを指定するマニフェスト(HelloWorld)とともにクラスファイルを jar ファイルにパッケージ化します。$ jar cvfe HelloWorld.jar HelloWorld HelloWorld*.class added manifest adding: HelloWorld$.class(in = 637) (out= 403)(deflated 36%) adding: HelloWorld.class(in = 586) (out= 482)(deflated 17%) 
Cloud Storage への jar のコピー
- Google Cloud CLI を使用して、jar をプロジェクトの Cloud Storage バケットにコピーします。
$ gcloud storage cp HelloWorld.jar gs://<bucket-name>/ Copying file://HelloWorld.jar [Content-Type=application/java-archive]... Uploading gs://bucket-name/HelloWorld.jar: 1.46 KiB/1.46 KiB
Dataproc Spark ジョブに jar を送信する
- Google Cloud コンソールを使用して、jar ファイルを Dataproc Spark ジョブに送信します。[ジョブを送信] ページの項目に次のように入力します。 - [クラスタ]: クラスタリストからクラスタの名前を選択します。
- [ジョブタイプ]: Spark
- [メインクラスまたは JAR]: HelloWorld jar への Cloud Storage URI パスを指定します( - gs://your-bucket-name/HelloWorld.jar)。- コードへのエントリ ポイント(「Main-Class: HelloWorld」)を指定するマニフェストが jar に含まれていない場合は、[メインクラスまたは JAR] 項目でメインクラスの名前(「HelloWorld」)を指定し、[JAR ファイル] 項目に jar ファイルへの URI パス( - gs://your-bucket-name/HelloWorld.jar)を入力する必要があります。
 
- [送信] をクリックしてジョブを開始します。ジョブが開始されると、ジョブのリストにジョブが追加されます。 
- ジョブ ID をクリックして [ジョブ] ページを開くと、ジョブのドライバ出力が表示されます。 
クラスタの spark-shell REPL を使用した Spark Scala コードの作成と実行
Scala アプリは Dataproc クラスタで直接開発することをおすすめします。Hadoop と Spark は Dataproc クラスタにプレインストールされており、Cloud Storage コネクタを使用して構成を行います。Cloud Storage との間で直接データの読み取りや書き込みができます。
この例では、プロジェクトの Dataproc クラスタ マスターノードに SSH 接続し、spark-shell REPL を使用して Scala wordcount mapreduce アプリケーションを作成し実行する方法を示します。
- Dataproc クラスタのマスターノードに SSH を挿入する - Google Cloud コンソールで、プロジェクトの Dataproc [クラスタ] ページに移動し、クラスタ名をクリックします。 
- クラスタの詳細ページで [VM インスタンス] タブを選択し、クラスタの名前がある行の右側に表示される [SSH] 選択ボタンをクリックします。 - マスターノード上のホーム ディレクトリでブラウザ ウィンドウが開きます 
 
- spark-shellを起動します。- $ spark-shell ... Using Scala version ... Type in expressions to have them evaluated. Type :help for more information. ... Spark context available as sc. ... SQL context available as sqlContext. scala> 
- 一般に公開されている Cloud Storage にあるシェイクスピア テキスト スニペットから RDD(耐障害性分散データセット)を作成する - scala> val text_file = sc.textFile("gs://pub/shakespeare/rose.txt")
- テキストで wordcount mapreduce を実行し、 - wordcountsの結果を表示します。- scala> val wordCounts = text_file.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b) scala> wordCounts.collect ... Array((call,1), (What's,1), (sweet.,1), (we,1), (as,1), (name?,1), (any,1), (other,1), (rose,1), (smell,1), (name,1), (a,2), (would,1), (in,1), (which,1), (That,1), (By,1))
- Cloud Storage の - <bucket-name>/wordcounts-outのカウントを保存し、- scala-shellを終了します。- scala> wordCounts.saveAsTextFile("gs://<bucket-name>/wordcounts-out/") scala> exit
- gcloud CLI を使用して出力ファイルを一覧表示し、ファイルの内容を表示する - $ gcloud storage ls gs://bucket-name/wordcounts-out/ gs://spark-scala-demo-bucket/wordcounts-out/ gs://spark-scala-demo-bucket/wordcounts-out/_SUCCESS gs://spark-scala-demo-bucket/wordcounts-out/part-00000 gs://spark-scala-demo-bucket/wordcounts-out/part-00001 
- gs://<bucket-name>/wordcounts-out/part-00000の内容を確認します。- $ gcloud storage cat gs://bucket-name/wordcounts-out/part-00000 (call,1) (What's,1) (sweet.,1) (we,1) (as,1) (name?,1) (any,1) (other,1) 
プレインストールされたサンプルコードの実行
Dataproc のマスターノードには、実行可能な jar ファイルが標準の Apache Hadoop と Spark のサンプルとともに含まれています。
| Jar タイプ | Master node /usr/lib/ location | GitHub のソース | Apache ドキュメント | 
|---|---|---|---|
| Hadoop | hadoop-mapreduce/hadoop-mapreduce-examples.jar | ソースリンク | MapReduce のチュートリアル | 
| Spark | spark/lib/spark-examples.jar | ソースリンク | Spark のサンプル | 
クラスタへのコマンドラインからのサンプルの送信
Google Cloud CLI gcloud コマンドライン ツールを使用して、ローカル開発マシンからサンプルを送信できます。( Google Cloud コンソールからジョブを送信するには、 Google Cloud コンソールの使用をご覧ください)。
Hadoop WordCount のサンプル
gcloud dataproc jobs submit hadoop --cluster=cluster-name \ --region=region \ --jars=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \ --class=org.apache.hadoop.examples.WordCount \ -- URI of input file URI of output file
Spark WordCount サンプル
gcloud dataproc jobs submit spark --cluster=cluster-name \ --region=region \ --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \ --class=org.apache.spark.examples.JavaWordCount \ -- URI of input file
クラスタのシャットダウン
課金され続けることがないようにするために、クラスタをシャットダウンし、このチュートリアルで使用した Cloud Storage リソース(Cloud Storage バケットとファイル)を削除します。
クラスタをシャットダウンするには:
gcloud dataproc clusters delete cluster-name \ --region=region
Cloud Storage の jar ファイルを削除するには:
gcloud storage rm gs://bucket-name/HelloWorld.jar
次のコマンドを使用すると、バケットおよびバケットのすべてのフォルダとファイルを削除できます。
gcloud storage rm gs://bucket-name/ --recursive