Spark Scala-Jobs in Dataproc schreiben und ausführen

In dieser Anleitung werden die verschiedenen Methoden erläutert, mit denen Sie Spark Scala-Jobs erstellen und an einen Dataproc-Cluster senden können:

  • Eine Spark Scala-Anwendung namens „Hello World“ auf einem lokalen Computer über die Befehlszeile schreiben und kompilieren: mit der Scala REPL (Read-Evaluate-Print-Loop bzw. den interaktiven Interpreter) oder dem SBT
  • Kompilierte Scala-Klassen in eine JAR-Datei mit einem Manifest kompilieren
  • Die Scala-JAR-Datei an einen Spark-Job senden, der auf dem Dataproc-Cluster ausgeführt wird
  • Scala-Jobausgabe von der Google Cloud Console aus überprüfen

In dieser Anleitung lernen Sie außerdem Folgendes:

  • Den Spark Scala-MapReduce-Job „WordCount“ mit der REPL spark-shell direkt in einem Dataproc-Cluster schreiben und ausführen

  • Vorinstallierte Apache Spark- und Hadoop-Beispiele auf einem Cluster ausführen

Google Cloud Platform-Projekt einrichten

Führen Sie ggf. die folgenden Aufgaben aus:

  1. Projekt einrichten
  2. Cloud Storage-Bucket erstellen
  3. Dataproc-Cluster erstellen

Scala-Code lokal schreiben und kompilieren

Als einfache Übung für diese Anleitung schreiben Sie eine "Hello World"-Scala-Anwendung mit der Scala-REPL oder der SBT lokal auf Ihrem Entwicklungscomputer.

Scala verwenden

  1. Laden Sie die Scala-Binärdateien von der Seite Scala-Installation herunter.
  2. Entpacken Sie die Datei, legen Sie die Umgebungsvariable SCALA_HOME fest und fügen Sie sie dem Pfad hinzu. Folgen Sie dazu der Anleitung für die Scala-Installation. Beispiel:

    export SCALA_HOME=/usr/local/share/scala
    export PATH=$PATH:$SCALA_HOME/
    

  3. Starten Sie die Scala-REPL

    $ scala
    Welcome to Scala version ...
    Type in expressions to have them evaluated.
    Type :help for more information.
    scala>
    

  4. Kopieren Sie den Code HelloWorld und fügen Sie ihn in die Scala-REPL ein.

    object HelloWorld {
      def main(args: Array[String]): Unit = {
        println("Hello, world!")
      }
    }

  5. Speichern Sie HelloWorld.scala und beenden Sie die REPL.

    scala> :save HelloWorld.scala
    scala> :q
    

  6. Kompilieren Sie den Code mit scalac.

    $ scalac HelloWorld.scala
    

  7. Listen Sie die kompilierten .class-Dateien auf.

    $ ls HelloWorld*.class
    HelloWorld$.class   HelloWorld.class
    

SBT verwenden

  1. Laden Sie SBT herunter

  2. Erstellen Sie ein "HelloWorld"-Projekt (siehe unten)

    $ mkdir hello
    $ cd hello
    $ echo \
    'object HelloWorld {def main(args: Array[String]) = println("Hello, world!")}' > \
    HelloWorld.scala
    

  3. Erstellen Sie eine sbt.build-Konfigurationsdatei, um artifactName (Name der unten generierten JAR-Datei) auf „HelloWorld.jar“ festzulegen. Weitere Informationen finden Sie unter Standardartefakte ändern.

    echo \
    'artifactName := { (sv: ScalaVersion, module: ModuleID, artifact: Artifact) =>
    "HelloWorld.jar" }' > \
    build.sbt
    

  4. Starten Sie SBT und führen Sie den Code aus

    $ sbt
    [info] Set current project to hello ...
    > run
    ... Compiling 1 Scala source to .../hello/target/scala-.../classes...
    ... Running HelloWorld
    Hello, world!
    [success] Total time: 3 s ...
    

  5. Verpacken Sie den Code in eine JAR-Datei mit einem Manifest, das den Einstiegspunkt der Hauptklasse angibt (HelloWorld), und beenden Sie dann SBT.

    > package
    ... Packaging .../hello/target/scala-.../HelloWorld.jar ...
    ... Done packaging.
    [success] Total time: ...
    > exit
    

JAR-Datei erstellen

Erstellen Sie eine JAR-Datei mit SBT oder dem jar-Befehl.

JAR-Datei mit SBT erstellen

Durch den SBT-Befehl package wird eine JAR-Datei erstellt. Weitere Informationen finden Sie unter Mit SBT.

JAR-Datei manuell erstellen

  1. Ändern Sie das Verzeichnis (cd) in das Verzeichnis, das die kompilierten HelloWorld*.class-Dateien enthält. Führen Sie dann den folgenden Befehl aus, um die Klassendateien in einer JAR-Datei zu verpacken, die ein Manifest enthält, das den Einstiegspunkt der Hauptklasse angibt (HelloWorld).
    $ jar cvfe HelloWorld.jar HelloWorld HelloWorld*.class
    added manifest
    adding: HelloWorld$.class(in = 637) (out= 403)(deflated 36%)
    adding: HelloWorld.class(in = 586) (out= 482)(deflated 17%)
    

JAR-Datei in Cloud Storage kopieren

  1. Verwenden Sie die Google Cloud CLI, um die JAR-Datei in einen Cloud Storage-Bucket in Ihrem Projekt zu kopieren.
$ gcloud storage cp HelloWorld.jar gs://<bucket-name>/
Copying file://HelloWorld.jar [Content-Type=application/java-archive]...
Uploading   gs://bucket-name/HelloWorld.jar:         1.46 KiB/1.46 KiB

JAR-Datei an einen Dataproc Spark-Job senden

  1. Verwenden Sie die Google Cloud Console, um die JAR-Datei an Ihren Dataproc Spark-Job zu senden. Füllen Sie die Felder auf der Seite Job senden wie folgt aus:

    • Cluster: Wählen Sie den Namen des Clusters aus der Clusterliste aus
    • Jobtyp: Spark
    • Hauptklasse oder JAR: Geben Sie den Pfad des Cloud Storage-URI zur JAR-Datei (gs://your-bucket-name/HelloWorld.jar) an.

      Wenn Ihre JAR-Datei kein Manifest enthält, das den Einstiegspunkt Ihres Codes angibt („Hauptklasse: HelloWorld“), muss im Feld „Hauptklasse oder JAR“ der Name Ihrer Hauptklasse („HelloWorld“) angegeben werden. Geben Sie außerdem im Feld „JAR-Dateien“ den URI-Pfad zu Ihrer JAR-Datei (gs://your-bucket-name/HelloWorld.jar) ein.

  2. Klicken Sie auf Senden, um den Job zu starten. Nach dem Start wird der Job zur Jobliste hinzugefügt.

  3. Klicken Sie auf die Job-ID, um die Seite Jobs zu öffnen, auf der Sie die Treiberausgabe des Jobs anzeigen können.

Spark Scala-Code mit der REPL spark-shell des Clusters schreiben und ausführen

Sie können Scala-Anwendungen direkt auf Ihrem Dataproc-Cluster entwickeln. Hadoop und Spark sind in Dataproc-Clustern vorinstalliert. Beide Dienste werden mit dem Cloud Storage-Connector konfiguriert, sodass der Code Daten direkt aus Cloud Storage lesen und in Cloud Storage schreiben kann.

In diesem Beispiel wird gezeigt, wie Sie eine SSH-Verbindung zum Masterknoten des Dataproc-Clusters Ihres Projekts herstellen und dann die REPL spark-shell verwenden können, um eine Scala-Mapreduce-Wordcount-Anwendung zu erstellen und auszuführen.

  1. SSH-Verbindung zum Masterknoten des Dataproc-Clusters herstellen

    1. Rufen Sie in der Google Cloud -Konsole die Seite Dataproc-Cluster Ihres Projekts auf und klicken Sie auf den Namen des Clusters.

    2. Wählen Sie auf der Seite mit den Clusterdetails den Tab VM-Instanzen aus und klicken Sie auf die SSH-Auswahl rechts neben der Namenszeile des Clusters.

      Im Stammverzeichnis des Master-Knotens wird ein Browserfenster angezeigt.

  2. Starten Sie spark-shell.

    $ spark-shell
    ...
    Using Scala version ...
    Type in expressions to have them evaluated.
    Type :help for more information.
    ...
    Spark context available as sc.
    ...
    SQL context available as sqlContext.
    scala>
    

  3. Erstellen Sie ein RDD aus einem Shakespeare-Text-Snippet, das sich im öffentlichen Cloud Storage befindet

    scala> val text_file = sc.textFile("gs://pub/shakespeare/rose.txt")
    

  4. Führen Sie eine WordCount-MapReduce-Berechnung für den Text aus und zeigen Sie das wordcounts-Ergebnis an.

    scala> val wordCounts = text_file.flatMap(line => line.split(" ")).map(word =>
    (word, 1)).reduceByKey((a, b) => a + b)
    scala> wordCounts.collect
    ... Array((call,1), (What's,1), (sweet.,1), (we,1), (as,1), (name?,1), (any,1), (other,1),
    (rose,1), (smell,1), (name,1), (a,2), (would,1), (in,1), (which,1), (That,1), (By,1))
    

  5. Speichern Sie die Werte in <bucket-name>/wordcounts-out in Cloud Storage und beenden Sie dann scala-shell.

    scala> wordCounts.saveAsTextFile("gs://<bucket-name>/wordcounts-out/")
    scala> exit
    

  6. Mit der gcloud CLI die Ausgabedateien auflisten und die Dateiinhalte anzeigen

    $ gcloud storage ls gs://bucket-name/wordcounts-out/
    gs://spark-scala-demo-bucket/wordcounts-out/
    gs://spark-scala-demo-bucket/wordcounts-out/_SUCCESS
    gs://spark-scala-demo-bucket/wordcounts-out/part-00000
    gs://spark-scala-demo-bucket/wordcounts-out/part-00001
    

  7. Prüfen Sie den Inhalt von gs://<bucket-name>/wordcounts-out/part-00000.

    $ gcloud storage cat gs://bucket-name/wordcounts-out/part-00000
    (call,1)
    (What's,1)
    (sweet.,1)
    (we,1)
    (as,1)
    (name?,1)
    (any,1)
    (other,1)
    

Vorinstallierten Beispielcode ausführen

Der Dataproc-Masterknoten enthält ausführbare JAR-Dateien mit Apache Hadoop- und Spark-Standardbeispielen.

JAR-Typ Master node /usr/lib/ location GitHub-Quelle Apache-Dokumente
Hadoop hadoop-mapreduce/hadoop-mapreduce-examples.jar Quelllink MapReduce-Anleitung
Spark spark/lib/spark-examples.jar Quelllink Spark-Beispiele

Beispiele über die Befehlszeile an Cluster senden

Beispiele können mit dem gcloud-Befehlszeilentool der Google Cloud CLI von Ihrem lokalen Entwicklungscomputer gesendet werden. Informationen zum Senden von Jobs über die Google Cloud -Konsole finden Sie unter Google Cloud -Konsole verwenden, um Jobs über die Google Cloud -Konsole zu senden.

Beispiel für Hadoop WordCount

gcloud dataproc jobs submit hadoop --cluster=cluster-name \
    --region=region \
    --jars=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \
    --class=org.apache.hadoop.examples.WordCount \
    -- URI of input file URI of output file

Beispiel für Spark WordCount

gcloud dataproc jobs submit spark --cluster=cluster-name \
    --region=region \
    --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
    --class=org.apache.spark.examples.JavaWordCount \
    -- URI of input file

Cluster herunterfahren

Um laufende Gebühren zu vermeiden, fahren Sie den Cluster herunter und löschen Sie die für diese Anleitung verwendeten Cloud Storage-Ressourcen (Cloud Storage-Bucket und -Dateien).

So fahren Sie den Cluster herunter:

gcloud dataproc clusters delete cluster-name \
    --region=region

So löschen Sie die Cloud Storage-JAR-Datei:

gcloud storage rm gs://bucket-name/HelloWorld.jar

Mit dem folgenden Befehl können Sie einen Bucket sowie alle zugehörigen Ordner und Dateien löschen:

gcloud storage rm gs://bucket-name/ --recursive

Nächste Schritte