Cloud Dataproc での Spark Scala ジョブの作成および実行

このチュートリアルでは、Spark Scala ジョブを作成し、Cloud Dataproc クラスタに送信するさまざまな方法について説明します。次の方法が含まれます。

  • ローカルマシン上の Spark Scala「Hello World」アプリを、Scala REPL(Read-Evaluate-Print-Loop または対話型インタープリタ)、SBT ビルドツール、Eclipse 用 Scala IDE プラグインの Eclipse IDE のいずれかを使用してコマンドラインから作成し、コンパイルする。
  • コンパイルした Scala クラスをマニフェストとともに jar ファイルにコンパイルする
  • Cloud Dataproc クラスタで実行される Spark ジョブに Scala jar を送信する
  • Google Cloud Console からの Scala ジョブ出力を確認する

このチュートリアルでは、次の方法についても説明します。

  • Cloud Dataproc クラスタ上で、spark-shell REPL を使用して Spark Scala の「WordCount」MapReduce ジョブを直接作成し、実行する

  • プレインストールされた Apache Spark と Hadoop のサンプルをクラスタで実行する

Google Cloud Platform プロジェクトの設定

次の手順をまだ行っていない場合は、行います。

  1. プロジェクトを設定する
  2. Cloud Storage バケットを作成する
  3. Cloud Dataproc クラスタを作成する

ローカルでの Scala コードの作成とコンパイル

このチュートリアルの簡単な演習として、Scala REPLSBT、コマンドライン インターフェース、または Eclipse の Scala IDE プラグインを使用して、開発マシンでローカルに「Hello World」という Scala アプリを作成します。

Scala の使用

  1. Scala のインストール ページから Scala バイナリをダウンロードします。
  2. Scala のインストールの手順に沿って、ファイルを解凍し、SCALA_HOME 環境変数を設定して、パスに追加します。例:

    export SCALA_HOME=/usr/local/share/scala
    export PATH=$PATH:$SCALA_HOME/
    

  3. Scala REPL を起動します。

    $ scala
    Welcome to Scala version ...
    Type in expressions to have them evaluated.
    Type :help for more information.
    scala>
    

  4. HelloWorld コードをコピーして Scala REPL に貼り付けます。

    object HelloWorld {
      def main(args: Array[String]): Unit = {
        println("Hello, world!")
      }
    }
    
    

  5. HelloWorld.scala を保存して REPL を終了します。

    scala> :save HelloWorld.scala
    scala> :q
    

  6. scalac をコンパイルします。

    $ scalac HelloWorld.scala
    

  7. コンパイル済みの .class ファイルを一覧表示します。

    $ ls HelloWorld*.class
    HelloWorld$.class   HelloWorld.class
    

SBT の使用

  1. SBT をダウンロードします。

  2. 次のように、「HelloWorld」プロジェクトを作成します

    $ mkdir hello
    $ cd hello
    $ echo \
    'object HelloWorld {def main(args: Array[String]) = println("Hello, world!")}' > \
    HelloWorld.scala
    

  3. sbt.build 構成ファイルを作成して「HelloWorld.jar」に artifactName(生成した jar ファイル名は下記に表示)を設定します(デフォルトのアーティファクトの変更をご覧ください)。

    echo \
    'artifactName := { (sv: ScalaVersion, module: ModuleID, artifact: Artifact) =>
    "HelloWorld.jar" }' > \
    build.sbt
    

  4. SBT を起動し、コードを実行します。

    $ sbt
    [info] Set current project to hello ...
    > run
    ... Compiling 1 Scala source to .../hello/target/scala-.../classes...
    ... Running HelloWorld
    Hello, world!
    [success] Total time: 3 s ...
    

  5. コードをメインクラスのエントリ ポイントを指定するマニフェストHelloWorld)とともに jar ファイルにパッケージ化し、終了します。

    > package
    ... Packaging .../hello/target/scala-.../HelloWorld.jar ...
    ... Done packaging.
    [success] Total time: ...
    > exit
    

Eclipse の SBT プラグインの使用

  1. Eclipse の SBT プラグインをインストールします。

  2. ワークスペースを選択するか作成し、新しい Scala プロジェクトを開始します。

  3. [Create a Scala project] ページで「HelloWorld」プロジェクトに名前を付け、[Finish] をクリックしてデフォルト設定を受け入れます。

  4. 左側ペイン(Package Explorer)で src フォルダを選択し、新規→Scala オブジェクトを選択します。

  5. [Create New File] ダイアログで Scala オブジェクト名として「HelloWorld」を入力し、[Finish] をクリックします。

  6. HelloWorld オブジェクト テンプレートで HelloWorld.scala ファイルが作成されます。次のようにテンプレートに入力して HelloWorld オブジェクト定義を完了します。

    object HelloWorld {
      def main(args: Array[String]): Unit = {
        println("Hello, world!")
      }
    }
    
    

  7. HelloWorld.scala ドキュメントをクリックするか、左側ペイン(Package Explorer)のドキュメント名を選択します。次に右クリックして [Run As]→[Scala Application] を選択し、アプリをビルドして実行します。

  8. アプリが構築され、正常に実行されたことがコンソールに表示されます。

  9. コンパイル済みのクラスファイルは、Eclipse ワークスペースのプロジェクトの bin フォルダにあります。これらのファイルを使用して jar を作成します。jar の作成をご覧ください。

jar の作成

SBT または jar コマンドを使用して jar ファイルを作成します。

SBT の使用

SBT package コマンドは jar ファイルを作成します(SBT の使用をご覧ください)。

手動での jar の作成

  1. コンパイル済みの HelloWorld*.class ファイルを含むディレクトリにディレクトリを変更(cd)し、次のコマンドを実行して、メインクラスのエントリ ポイント(HelloWorld)を指定するマニフェストとともにクラスファイルを jar ファイルにパッケージ化します。
    $ jar cvfe HelloWorld.jar HelloWorld HelloWorld*.class
    added manifest
    adding: HelloWorld$.class(in = 637) (out= 403)(deflated 36%)
    adding: HelloWorld.class(in = 586) (out= 482)(deflated 17%)
    

Cloud Storage への jar のコピー

  1. jar をプロジェクトの Cloud Storage バケットへコピーするには、gsutil コマンドを使用します。
$ gsutil cp HelloWorld.jar gs://<bucket-name>/
Copying file://HelloWorld.jar [Content-Type=application/java-archive]...
Uploading   gs://bucket-name/HelloWorld.jar:         1.46 KiB/1.46 KiB

Cloud Dataproc Spark ジョブへの jar の送信

  1. Cloud Console を使用して、jar ファイルを Cloud Dataproc Spark ジョブに送信します。

    • [ジョブを送信] ページの各フィールドに次のように入力します。
      • [クラスタ]: クラスタリストからクラスタの名前を選択します。
      • [ジョブタイプ]: Spark
      • メインクラスまたは jar: HelloWorld jar(gs://your-bucket-name/HelloWorld.jar)への Cloud Storage URI パスを指定します。コードへのエントリ ポイント(「Main-Class: HelloWorld」)を指定したマニフェストが jar に含まれていない場合は、「メインクラスまたは jar」フィールドのメインクラス名を(「HelloWorld」)とし、「jar ファイル」フィールドに jar ファイルの URI パス(gs://your-bucket-name/HelloWorld.jar)を入力します。
  2. [送信] をクリックしてジョブを開始します。開始されたジョブが [ジョブ] リストに追加されます。

  3. ジョブ ID をクリックして [ジョブ] ページを開くと、ジョブのドライバ出力が表示されます。

クラスタの spark-shell REPL を使用した Spark Scala コードの作成と実行

Scala アプリは Cloud Dataproc クラスタで直接開発することをおすすめします。Hadoop と Spark は Cloud Dataproc クラスタにプレインストールされており、Cloud Storage コネクタを使用して構成を行います。Cloud Storage との間で直接データの読み取りや書き込みができます。

この例では、プロジェクトの Cloud Dataproc クラスタのマスターノードに SSH 接続し、spark-shell REPL を使用して Scala wordcount mapreduce アプリケーションを作成し実行する方法を示します。

  1. Cloud Dataproc クラスタのマスターノードに SSH で接続します。
    1. Cloud Console のプロジェクト Cloud Dataproc クラスタのページに移動し、クラスタ名をクリックします。
    2. クラスタの詳細ページで VM インスタンス タブを選択し、クラスタのマスターノード名の右側に表示される SSH 選択をクリックします。
      ブラウザ ウィンドウがマスターノードのホーム ディレクトリで開きます。
      Connected, host fingerprint: ssh-rsa 2048 ...
      ...
      user@clusterName-m:~$
      
  2. spark-shell を起動します。
    $ spark-shell
    ...
    Using Scala version ...
    Type in expressions to have them evaluated.
    Type :help for more information.
    ...
    Spark context available as sc.
    ...
    SQL context available as sqlContext.
    scala>
    
  3. 公開されている Cloud Storage にあるシェイクスピアのテキスト スニペットから RDD(Resilient Distributed Dataset)を作成します。
    scala> val text_file = sc.textFile("gs://pub/shakespeare/rose.txt")
    
  4. テキストで wordcount mapreduce を実行し、wordcounts の結果を表示します。
    scala> val wordCounts = text_file.flatMap(line => line.split(" ")).map(word =>
    (word, 1)).reduceByKey((a, b) => a + b)
    scala> wordCounts.collect
    ... Array((call,1), (What's,1), (sweet.,1), (we,1), (as,1), (name?,1), (any,1), (other,1),
    (rose,1), (smell,1), (name,1), (a,2), (would,1), (in,1), (which,1), (That,1), (By,1))
    
  5. Cloud Storage の <bucket-name>/wordcounts-out のカウントを保存し、scala-shell を終了します。
    scala> wordCounts.saveAsTextFile("gs://<bucket-name>/wordcounts-out/")
    scala> exit
    
  6. gsutil を使用して出力ファイルを一覧表示し、ファイルの内容を表示します。
    $ gsutil ls gs://<bucket-name>/wordcounts-out/
    gs://spark-scala-demo-bucket/wordcounts-out/
    gs://spark-scala-demo-bucket/wordcounts-out/_SUCCESS
    gs://spark-scala-demo-bucket/wordcounts-out/part-00000
    gs://spark-scala-demo-bucket/wordcounts-out/part-00001
    
  7. gs://<bucket-name>/wordcounts-out/part-00000 の内容を確認します。
    $ gsutil cat gs://<bucket-name>/wordcounts-out/part-00000
    (call,1)
    (What's,1)
    (sweet.,1)
    (we,1)
    (as,1)
    (name?,1)
    (any,1)
    (other,1)
    

プレインストールされたサンプルコードの実行

Cloud Dataproc のマスターノードには、実行可能な jar ファイルが標準の Apache Hadoop と Spark のサンプルと共に含まれています。

Jar タイプ Master node /usr/lib/ location GitHub のソース Apache ドキュメント
Hadoop hadoop-mapreduce/hadoop-mapreduce-examples.jar ソースリンク MapReduce のチュートリアル
Spark spark/lib/spark-examples.jar ソースリンク Spark のサンプル

クラスタへのコマンドラインからのサンプルの送信

Cloud SDK gcloud コマンドライン ツールを使用して、ローカル開発マシンからサンプルを送信できます。(Cloud Console からジョブを送信するには、Google Cloud Console の使用をご覧ください)。

Hadoop WordCount のサンプル

gcloud dataproc jobs submit hadoop --cluster <cluster-name> \
  --jars file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \
  --class org.apache.hadoop.examples.WordCount \
  -- <URI of input file> <URI of output file>

Spark WordCount サンプル

gcloud dataproc jobs submit spark --cluster <cluster-name> \
  --jars file:///usr/lib/spark/examples/jars/spark-examples.jar \
  --class org.apache.spark.examples.JavaWordCount \
  -- <URI of input file>

クラスタのシャットダウン

課金され続けることがないようにするために、クラスタをシャットダウンし、このチュートリアルで使用した Cloud Storage リソース(Cloud Storage バケットとファイル)を削除します。

クラスタをシャットダウンするには:

gcloud dataproc clusters delete <cluster-name>

Cloud Storage の jar ファイルを削除するには:

gsutil rm gs://<bucket-name>/HelloWorld.jar

次のコマンドを使用すると、バケットおよびバケットのすべてのフォルダとファイルを削除できます。

gsutil rm -r gs://<bucket-name>/

次のステップ

Cloud Dataproc での Apache Spark アプリケーションの Java の依存関係の管理を確認する。