Cloud Dataproc クラスタでの Spark Scala ジョブの作成と実行

このチュートリアルでは、Spark Scala ジョブを作成し、Cloud Dataproc クラスタに送信するさまざまな方法について説明します。次の方法が含まれます。

  • Scala REPL(Read-Evaluate-Print-Loop またはインタラクティブ インタープリタ)、SBT ビルドツール、または Eclipse の Scala IDE プラグインを使用した Eclipse IDE を使用して、ローカルマシンでコマンドラインから Spark Scala の「Hello World」というアプリを作成し、コンパイルする
  • コンパイルした Scala クラスをマニフェストと共に jar ファイルにコンパイルする
  • Cloud Dataproc クラスタで実行される Spark ジョブに Scala jar を送信する
  • Google Cloud Platform Console から Scala ジョブ出力を調べる

このチュートリアルでは、次の方法についても説明します。

  • spark-shell REPL を使用して Cloud Dataproc クラスタで Spark Scala の「WordCount」mapreduce ジョブを直接作成し、実行する

  • プレインストールされた Apache Spark と Hadoop のサンプルをクラスタで実行する

Google Cloud Platform プロジェクトの設定

次のことをまだ実行していない場合は、実行します。

  1. プロジェクトの設定
  2. Cloud Storage バケットの作成
  3. Cloud Dataproc クラスタの作成

ローカルでの Scala コードの作成とコンパイル

このチュートリアルの簡単な演習として、Scala REPLSBT コマンドライン インターフェース、または Eclipse の Scala IDE プラグインを使用して、開発マシンでローカルに「Hello World」という Scala アプリを作成します。

Scala の使用

  1. Scala のインストール ページから Scala バイナリをダウンロードします。
  2. Scala のインストールの手順に沿って、ファイルを解凍し、SCALA_HOME 環境変数を設定して、パスに追加します。次に例を示します。

    export SCALA_HOME=/usr/local/share/scala
    export PATH=$PATH:$SCALA_HOME/
    

  3. Scala REPL を起動します。

    $ scala
    Welcome to Scala version ...
    Type in expressions to have them evaluated.
    Type :help for more information.
    scala>
    

  4. HelloWorld コードをコピーして Scala REPL に貼り付けます。

    object HelloWorld {
      def main(args: Array[String]): Unit = {
        println("Hello, world!")
      }
    }
    
    

  5. HelloWorld.scala を保存して REPL を終了します。

    scala> :save HelloWorld.scala
    scala> :q
    

  6. scalac でコンパイルします。

    $ scalac HelloWorld.scala
    

  7. コンパイルした .class ファイルを一覧表示します。

    $ ls HelloWorld*.class
    HelloWorld$.class   HelloWorld.class
    

SBT の使用

  1. SBT をダウンロードします。

  2. 次のように「HelloWorld」プロジェクトを作成します

    $ mkdir hello
    $ cd hello
    $ echo \
      'object HelloWorld {def main(args: Array[String]) = println("Hello, world!")}' > \
      HelloWorld.scala
    

  3. sbt.build 設定ファイルを作成し、artifactName(下記のように生成する jar ファイルの名前)を「HelloWorld.jar」に設定します(デフォルト アーティファクトの変更をご覧ください)。

    echo \
      'artifactName := { (sv: ScalaVersion, module: ModuleID, artifact: Artifact) =>
      "HelloWorld.jar" }' > \
      build.sbt
    

  4. SBT を起動し、コードを実行します。

    $ sbt
    [info] Set current project to hello ...
    > run
    ... Compiling 1 Scala source to .../hello/target/scala-.../classes...
    ... Running HelloWorld
    Hello, world!
    [success] Total time: 3 s ...
    

  5. コードをメインクラスのエントリ ポイントを指定するマニフェストHelloWorld)とともに jar ファイルにパッケージ化し、終了します。

    > package
    ... Packaging .../hello/target/scala-.../HelloWorld.jar ...
    ... Done packaging.
    [success] Total time: ...
    > exit
    

Eclipse の SBT プラグインの使用

  1. Eclipse の SBT プラグインをインストールします。

  2. ワークスペースを選択するか作成し、新しい Scala プロジェクトを開始します。

  3. [Create a Scala project] ページで「HelloWorld」プロジェクトに名前を付け、[Finish] をクリックしてデフォルト設定を受け入れます。
  4. 左ペイン(パッケージ エクスプローラ)で src フォルダを選択し、[New] → [Scala Object] の順に選択します。
  5. [Create New File] ダイアログで Scala オブジェクト名として「HelloWorld」を入力し、[Finish] をクリックします。
  6. HelloWorld オブジェクト テンプレートで HelloWorld.scala ファイルが作成されます。次のようにテンプレートに入力して HelloWorld オブジェクト定義を完了します。
    object HelloWorld {
      def main(args: Array[String]): Unit = {
        println("Hello, world!")
      }
    }
    
    
  7. HelloWorld.scala ドキュメントをクリック(または左ペイン(パッケージ エクスプローラ)でドキュメント名を選択)し、右クリックして [Run As] → [Scala Application] の順に選択し、アプリを構築して実行します。
  8. アプリが構築され、正常に実行されたことがコンソールに表示されます。
  9. コンパイルされたクラスファイルは Eclipse ワークスペースのプロジェクトの bin フォルダに配置されます。これらのファイルを使用して jar を作成します。jar の作成をご覧ください。

jar の作成

SBT または jar コマンドを使用して jar ファイルを作成します。

SBT の使用

SBT package コマンドは jar ファイルを作成します(SBT の使用をご覧ください)。

手動での jar の作成

  1. コンパイルした HelloWorld*.class ファイルが格納されているディレクトリに移動し(cd)、次のコマンドを実行してクラスファイルをメインクラスのエントリ ポイントを指定するマニフェストHelloWorld)とともに jar にパッケージ化します。
    $ jar cvfe HelloWorld.jar HelloWorld HelloWorld*.class
    added manifest
    adding: HelloWorld$.class(in = 637) (out= 403)(deflated 36%)
    adding: HelloWorld.class(in = 586) (out= 482)(deflated 17%)
    

Cloud Storage への jar のコピー

  1. gsutil コマンドを使用して、jar をプロジェクトの Cloud Storage バケットにコピーします。
$ gsutil cp HelloWorld.jar gs://<bucket-name>/
Copying file://HelloWorld.jar [Content-Type=application/java-archive]...
Uploading   gs://bucket-name/HelloWorld.jar:         1.46 KiB/1.46 KiB

Cloud Dataproc Spark ジョブへの jar の送信

  1. GCP Console を使用して、jar ファイルを Cloud Dataproc Spark ジョブに送信します。

  2. [ジョブを送信] ページの項目に次のように入力します。

    • [クラスタ]: クラスタリストからクラスタの名前を選択します。
    • [ジョブタイプ]: Spark
    • [メインクラスまたは JAR]: HelloWorld jar への Cloud Storage URI パスを指定します(gs://your-bucket-name/HelloWorld.jar)。コードへのエントリ ポイント(「Main-Class: HelloWorld」)を指定するマニフェストが jar に含まれていない場合は、[メインクラスまたは JAR] 項目でメインクラスの名前(「HelloWorld」)を指定し、[JAR ファイル] 項目に jar ファイルへの URI パス(gs://your-bucket-name/HelloWorld.jar)を入力する必要があります。
  3. [送信] をクリックしてジョブを開始します。ジョブが開始されると、ジョブのリストにジョブが追加されます。

  4. ジョブ ID をクリックして [ジョブ] ページを開くと、ジョブのドライバ出力が表示されます。

クラスタの spark-shell REPL を使用した Spark Scala コードの作成と実行

Scala アプリは Cloud Dataproc クラスタで直接開発することをおすすめします。Hadoop と Spark は Cloud Dataproc クラスタにプレインストールされ、Google Cloud Storage コネクタを使用して設定されているため、コードを使用して Google Cloud Storage との間でデータを直接読み書きすることができます。

この例では、プロジェクトの Cloud Dataproc クラスタのマスターノードに SSH 接続し、spark-shell REPL を使用して Scala wordcount mapreduce アプリケーションを作成し実行する方法を示します。

  1. Cloud Dataproc クラスタのマスターノードに SSH を挿入する
    1. GCP Console でプロジェクトの Cloud Dataproc [クラスタ] ページに移動し、クラスタ名をクリックします
    2. クラスタの詳細ページで [VM インスタンス] タブを選択し、クラスタのマスターノード名の右側に表示される [SSH] 選択ボタンをクリックします。
      マスターノード上のホーム ディレクトリでブラウザ ウィンドウが開きます。
      Connected, host fingerprint: ssh-rsa 2048 ...
      ...
      user@clusterName-m:~$
      
  2. spark-shell を起動します。
    $ spark-shell
    ...
    Using Scala version ...
    Type in expressions to have them evaluated.
    Type :help for more information.
    ...
    Spark context available as sc.
    ...
    SQL context available as sqlContext.
    scala>
    
  3. パブリック Google Cloud Storage にあるシェイクスピア テキスト スニペットから RDD(Resilient Distributed Dataset)を作成します。
    scala> val text_file = sc.textFile("gs://pub/shakespeare/rose.txt")
    
  4. テキストで wordcount mapreduce を実行し、wordcounts の結果を表示します。
    scala> val wordCounts = text_file.flatMap(line => line.split(" ")).map(word =>
      (word, 1)).reduceByKey((a, b) => a + b)
    scala> wordCounts.collect
    ... Array((call,1), (What's,1), (sweet.,1), (we,1), (as,1), (name?,1), (any,1), (other,1),
    (rose,1), (smell,1), (name,1), (a,2), (would,1), (in,1), (which,1), (That,1), (By,1))
    
  5. Cloud Storage の <bucket-name>/wordcounts-out にカウントを保存し、scala-shell を終了します。
    scala> wordCounts.saveAsTextFile("gs://<bucket-name>/wordcounts-out/")
    scala> exit
    
  6. gsutil を使用して出力ファイルを一覧表示し、ファイルの内容を表示します。
    $ gsutil ls gs://<bucket-name>/wordcounts-out/
    gs://spark-scala-demo-bucket/wordcounts-out/
    gs://spark-scala-demo-bucket/wordcounts-out/_SUCCESS
    gs://spark-scala-demo-bucket/wordcounts-out/part-00000
    gs://spark-scala-demo-bucket/wordcounts-out/part-00001
    
  7. gs://<bucket-name>/wordcounts-out/part-00000 の内容をチェックします。
    $ gsutil cat gs://<bucket-name>/wordcounts-out/part-00000
    (call,1)
    (What's,1)
    (sweet.,1)
    (we,1)
    (as,1)
    (name?,1)
    (any,1)
    (other,1)
    

プレインストールされたサンプルコードの実行

Cloud Dataproc のマスターノードには、実行可能な jar ファイルが標準の Apache Hadoop と Spark のサンプルと共に含まれています。

Jar タイプ Master node /usr/lib/ location GitHub のソース Apache ドキュメント
Hadoop hadoop-mapreduce/hadoop-mapreduce-examples.jar ソースリンク MapReduce のチュートリアル
Spark spark/lib/spark-examples.jar ソースリンク Spark のサンプル

クラスタへのコマンドラインからのサンプルの送信

Google Cloud SDK の gcloud コマンドライン ツールを使用して、ローカル開発マシンからサンプルを送信できます(GCP Console からジョブを送信するには、Google Cloud Platform Console の使用をご覧ください)。

Hadoop WordCount のサンプル

gcloud dataproc jobs submit hadoop --cluster <cluster-name> \
  --jar file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \
  --class org.apache.hadoop.examples.WordCount \
  <URI of input file> <URI of output file>

Spark WordCount サンプル

gcloud dataproc jobs submit spark --cluster <cluster-name> \
  --jar file:///usr/lib/spark/lib/spark-examples.jar \
  --class org.apache.spark.examples.JavaWordCount
  <URI of input file>

クラスタのシャットダウン

課金され続けることがないようにするために、クラスタをシャットダウンし、このチュートリアルで使用した Cloud Storage リソース(Cloud Storage バケットとファイル)を削除します。

クラスタをシャットダウンするには:

gcloud dataproc clusters delete <cluster-name>

Cloud Storage の jar ファイルを削除するには:

gsutil rm gs://<bucket-name>/HelloWorld.jar

次のコマンドを使用すると、バケットおよびバケットのすべてのフォルダとファイルを削除できます。

gsutil rb -r gs://<bucket-name>/

このページは役立ちましたか?評価をお願いいたします。

フィードバックを送信...

Cloud Dataproc ドキュメント
ご不明な点がありましたら、Google のサポートページをご覧ください。