Spark Scala-Jobs in Dataproc schreiben und ausführen

In dieser Anleitung werden die verschiedenen Methoden erläutert, mit denen Sie Spark Scala-Jobs erstellen und an einen Dataproc-Cluster senden können:

  • Eine Spark Scala-Anwendung namens „Hello World“ auf einem lokalen Computer über die Befehlszeile schreiben und kompilieren: mit der Scala REPL (Read-Evaluate-Print-Loop bzw. den interaktiven Interpreter) oder dem SBT
  • Kompilierte Scala-Klassen in eine JAR-Datei mit einem Manifest kompilieren
  • Die Scala-JAR-Datei an einen Spark-Job senden, der auf dem Dataproc-Cluster ausgeführt wird
  • Die Scala-Jobausgabe von der Google Cloud Console aus überprüfen

In dieser Anleitung lernen Sie außerdem Folgendes:

  • Den Spark Scala-MapReduce-Job „WordCount“ mit der REPL spark-shell direkt in einem Dataproc-Cluster schreiben und ausführen

  • Vorinstallierte Apache Spark- und Hadoop-Beispiele auf einem Cluster ausführen

Google Cloud Platform-Projekt einrichten

Führen Sie ggf. die folgenden Aufgaben aus:

  1. Projekt einrichten
  2. Cloud Storage-Bucket erstellen
  3. Dataproc-Cluster erstellen

Scala-Code lokal schreiben und kompilieren

Als einfache Übung für diese Anleitung schreiben Sie eine "Hello World"-Scala-Anwendung mit der Scala-REPL oder der SBT lokal auf Ihrem Entwicklungscomputer.

Scala verwenden

  1. Laden Sie die Scala-Binärdateien von der Seite Scala-Installation herunter.
  2. Entpacken Sie die Datei, legen Sie die Umgebungsvariable SCALA_HOME fest und fügen Sie sie dem Pfad hinzu. Folgen Sie dazu der Anleitung für die Scala-Installation. Beispiel:

    export SCALA_HOME=/usr/local/share/scala
    export PATH=$PATH:$SCALA_HOME/
    

  3. Starten Sie die Scala-REPL

    $ scala
    Welcome to Scala version ...
    Type in expressions to have them evaluated.
    Type :help for more information.
    scala>
    

  4. Kopieren Sie den Code HelloWorld und fügen Sie ihn in die Scala-REPL ein.

    object HelloWorld {
      def main(args: Array[String]): Unit = {
        println("Hello, world!")
      }
    }

  5. Speichern Sie HelloWorld.scala und beenden Sie die REPL.

    scala> :save HelloWorld.scala
    scala> :q
    

  6. Kompilieren Sie den Code mit scalac.

    $ scalac HelloWorld.scala
    

  7. Listen Sie die kompilierten .class-Dateien auf.

    $ ls HelloWorld*.class
    HelloWorld$.class   HelloWorld.class
    

SBT verwenden

  1. Laden Sie SBT herunter

  2. Erstellen Sie ein "HelloWorld"-Projekt (siehe unten)

    $ mkdir hello
    $ cd hello
    $ echo \
    'object HelloWorld {def main(args: Array[String]) = println("Hello, world!")}' > \
    HelloWorld.scala
    

  3. Erstellen Sie eine sbt.build-Konfigurationsdatei, um artifactName (Name der unten generierten JAR-Datei) auf „HelloWorld.jar“ festzulegen. Weitere Informationen finden Sie unter Standardartefakte ändern.

    echo \
    'artifactName := { (sv: ScalaVersion, module: ModuleID, artifact: Artifact) =>
    "HelloWorld.jar" }' > \
    build.sbt
    

  4. Starten Sie SBT und führen Sie den Code aus

    $ sbt
    [info] Set current project to hello ...
    > run
    ... Compiling 1 Scala source to .../hello/target/scala-.../classes...
    ... Running HelloWorld
    Hello, world!
    [success] Total time: 3 s ...
    

  5. Verpacken Sie den Code in eine JAR-Datei mit einem Manifest, das den Einstiegspunkt der Hauptklasse angibt (HelloWorld), und beenden Sie dann SBT.

    > package
    ... Packaging .../hello/target/scala-.../HelloWorld.jar ...
    ... Done packaging.
    [success] Total time: ...
    > exit
    

JAR-Datei erstellen

Erstellen Sie eine JAR-Datei mit SBT oder dem jar-Befehl.

JAR-Datei mit SBT erstellen

Durch den SBT-Befehl package wird eine JAR-Datei erstellt. Weitere Informationen finden Sie unter Mit SBT.

JAR-Datei manuell erstellen

  1. Ändern Sie das Verzeichnis (cd) in das Verzeichnis, das die kompilierten HelloWorld*.class-Dateien enthält. Führen Sie dann den folgenden Befehl aus, um die Klassendateien in einer JAR-Datei zu verpacken, die ein Manifest enthält, das den Einstiegspunkt der Hauptklasse angibt (HelloWorld).
    $ jar cvfe HelloWorld.jar HelloWorld HelloWorld*.class
    added manifest
    adding: HelloWorld$.class(in = 637) (out= 403)(deflated 36%)
    adding: HelloWorld.class(in = 586) (out= 482)(deflated 17%)
    

JAR-Datei in Cloud Storage kopieren

  1. Verwenden Sie die Google Cloud CLI, um die JAR-Datei in einen Cloud Storage-Bucket in Ihrem Projekt zu kopieren.
$ gcloud storage cp HelloWorld.jar gs://<bucket-name>/
Copying file://HelloWorld.jar [Content-Type=application/java-archive]...
Uploading   gs://bucket-name/HelloWorld.jar:         1.46 KiB/1.46 KiB

JAR-Datei an einen Dataproc Spark-Job senden

  1. Verwenden Sie die Google Cloud Console, um die JAR-Datei an den Dataproc Spark-Job zu senden. Füllen Sie die Felder auf der Seite Job senden wie folgt aus:

    • Cluster: Wählen Sie den Namen des Clusters aus der Clusterliste aus
    • Jobtyp: Spark
    • Hauptklasse oder JAR: Geben Sie den Pfad des Cloud Storage-URI zur JAR-Datei (gs://your-bucket-name/HelloWorld.jar) an.

      Wenn Ihre JAR-Datei kein Manifest enthält, das den Einstiegspunkt Ihres Codes angibt („Hauptklasse: HelloWorld“), muss im Feld „Hauptklasse oder JAR“ der Name Ihrer Hauptklasse („HelloWorld“) angegeben werden. Geben Sie außerdem im Feld „JAR-Dateien“ den URI-Pfad zu Ihrer JAR-Datei (gs://your-bucket-name/HelloWorld.jar) ein.

  2. Klicken Sie auf Senden, um den Job zu starten. Nach dem Start wird der Job zur Jobliste hinzugefügt.

  3. Klicken Sie auf die Job-ID, um die Seite Jobs zu öffnen, auf der Sie die Treiberausgabe des Jobs anzeigen können.

Spark Scala-Code mit der REPL spark-shell des Clusters schreiben und ausführen

Sie können Scala-Anwendungen direkt auf Ihrem Dataproc-Cluster entwickeln. Hadoop und Spark sind in Dataproc-Clustern vorinstalliert. Beide Dienste werden mit dem Cloud Storage-Connector konfiguriert, sodass der Code Daten direkt aus Cloud Storage lesen und in Cloud Storage schreiben kann.

In diesem Beispiel wird gezeigt, wie Sie eine SSH-Verbindung zum Masterknoten des Dataproc-Clusters Ihres Projekts herstellen und dann die REPL spark-shell verwenden, um eine Scala-Mapreduce-Wordcount-Anwendung zu erstellen und auszuführen.

  1. SSH-Verbindung zum Masterknoten des Dataproc-Clusters herstellen

    1. Rufen Sie in der Google Cloud Console die Seite Dataproc-Cluster Ihres Projekts auf und klicken Sie auf den Namen des Clusters.

    2. Wählen Sie auf der Seite mit den Clusterdetails den Tab VM-Instanzen aus und klicken Sie auf die SSH-Auswahl rechts neben der Namenszeile Ihres Clusters.

      Im Stammverzeichnis des Master-Knotens wird ein Browserfenster angezeigt.

  2. Starten Sie spark-shell.

    $ spark-shell
    ...
    Using Scala version ...
    Type in expressions to have them evaluated.
    Type :help for more information.
    ...
    Spark context available as sc.
    ...
    SQL context available as sqlContext.
    scala>
    

  3. Erstellen Sie ein RDD aus einem Shakespeare-Text-Snippet, das sich im öffentlichen Cloud Storage befindet

    scala> val text_file = sc.textFile("gs://pub/shakespeare/rose.txt")
    

  4. Führen Sie eine WordCount-MapReduce-Berechnung für den Text aus und zeigen Sie das wordcounts-Ergebnis an.

    scala> val wordCounts = text_file.flatMap(line => line.split(" ")).map(word =>
    (word, 1)).reduceByKey((a, b) => a + b)
    scala> wordCounts.collect
    ... Array((call,1), (What's,1), (sweet.,1), (we,1), (as,1), (name?,1), (any,1), (other,1),
    (rose,1), (smell,1), (name,1), (a,2), (would,1), (in,1), (which,1), (That,1), (By,1))
    

  5. Speichern Sie die Werte in <bucket-name>/wordcounts-out in Cloud Storage und beenden Sie dann scala-shell.

    scala> wordCounts.saveAsTextFile("gs://<bucket-name>/wordcounts-out/")
    scala> exit
    

  6. Mit der gcloud CLI die Ausgabedateien auflisten und die Dateiinhalte anzeigen

    $ gcloud storage ls gs://bucket-name/wordcounts-out/
    gs://spark-scala-demo-bucket/wordcounts-out/
    gs://spark-scala-demo-bucket/wordcounts-out/_SUCCESS
    gs://spark-scala-demo-bucket/wordcounts-out/part-00000
    gs://spark-scala-demo-bucket/wordcounts-out/part-00001
    

  7. Prüfen Sie den Inhalt von gs://<bucket-name>/wordcounts-out/part-00000.

    $ gcloud storage cat gs://bucket-name/wordcounts-out/part-00000
    (call,1)
    (What's,1)
    (sweet.,1)
    (we,1)
    (as,1)
    (name?,1)
    (any,1)
    (other,1)
    

Vorinstallierten Beispielcode ausführen

Der Dataproc-Masterknoten enthält ausführbare JAR-Dateien mit Apache Hadoop- und Spark-Standardbeispielen.

JAR-Typ Master node /usr/lib/ location GitHub-Quelle Apache-Dokumente
Hadoop hadoop-mapreduce/hadoop-mapreduce-examples.jar Quelllink MapReduce-Anleitung
Spark spark/lib/spark-examples.jar Quelllink Spark-Beispiele

Beispiele über die Befehlszeile an Cluster senden

Beispiele können mit dem gcloud-Befehlszeilentool der Google Cloud CLI von Ihrem lokalen Entwicklungscomputer gesendet werden. Informationen zum Senden von Jobs über die Google Cloud Console finden Sie unter Google Cloud Console verwenden.

Beispiel für Hadoop WordCount

gcloud dataproc jobs submit hadoop --cluster=cluster-name \
    --region=region \
    --jars=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \
    --class=org.apache.hadoop.examples.WordCount \
    -- URI of input file URI of output file

Beispiel für Spark WordCount

gcloud dataproc jobs submit spark --cluster=cluster-name \
    --region=region \
    --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
    --class=org.apache.spark.examples.JavaWordCount \
    -- URI of input file

Cluster herunterfahren

Um laufende Gebühren zu vermeiden, fahren Sie den Cluster herunter und löschen Sie die für diese Anleitung verwendeten Cloud Storage-Ressourcen (Cloud Storage-Bucket und -Dateien).

So fahren Sie den Cluster herunter:

gcloud dataproc clusters delete cluster-name \
    --region=region

So löschen Sie die Cloud Storage-JAR-Datei:

gcloud storage rm gs://bucket-name/HelloWorld.jar

Mit dem folgenden Befehl können Sie einen Bucket sowie alle zugehörigen Ordner und Dateien löschen:

gcloud storage rm gs://bucket-name/ --recursive

Nächste Schritte