Escribir y ejecutar tareas de Scala en Spark en Dataproc

En este tutorial se muestran diferentes formas de crear y enviar una tarea de Spark Scala a un clúster de Dataproc, entre las que se incluyen las siguientes:

  • Escribir y compilar una aplicación "Hello World" de Scala en Spark en un ordenador local desde la línea de comandos mediante el REPL de Scala (Read-Evaluate-Print-Loop o intérprete interactivo) o la herramienta de compilación SBT
  • Empaqueta las clases de Scala compiladas en un archivo JAR con un manifiesto.
  • Enviar el archivo JAR de Scala a una tarea de Spark que se ejecute en tu clúster de Dataproc
  • Examinar la salida de la tarea de Scala desde la Google Cloud consola

En este tutorial también se explica cómo hacer lo siguiente:

  • Escribir y ejecutar una tarea de MapReduce "WordCount" de Spark Scala directamente en un clúster de Dataproc mediante el REPL de spark-shell

  • Ejecutar ejemplos de Apache Spark y Hadoop preinstalados en un clúster

Configura un proyecto de Google Cloud Platform.

Ejecuta estos comandos si aún no lo has hecho:

  1. Configurar un proyecto
  2. Crea un segmento de Cloud Storage.
  3. Crear un clúster de Dataproc

Escribir y compilar código Scala de forma local

Como ejercicio sencillo para este tutorial, escribe una aplicación "Hello World" de Scala con el REPL de Scala o la interfaz de línea de comandos SBT de forma local en tu máquina de desarrollo.

Usar Scala

  1. Descarga los archivos binarios de Scala desde la página Instalación de Scala.
  2. Descomprime el archivo, define la variable de entorno SCALA_HOME y añádela a tu ruta, tal como se muestra en las instrucciones de instalación de Scala. Por ejemplo:

    export SCALA_HOME=/usr/local/share/scala
    export PATH=$PATH:$SCALA_HOME/
    

  3. Iniciar el REPL de Scala

    $ scala
    Welcome to Scala version ...
    Type in expressions to have them evaluated.
    Type :help for more information.
    scala>
    

  4. Copia y pega el código HelloWorld en Scala REPL

    object HelloWorld {
      def main(args: Array[String]): Unit = {
        println("Hello, world!")
      }
    }

  5. Guarda HelloWorld.scala y sal del REPL

    scala> :save HelloWorld.scala
    scala> :q
    

  6. Compilar con scalac

    $ scalac HelloWorld.scala
    

  7. Lista de archivos .class compilados

    $ ls HelloWorld*.class
    HelloWorld$.class   HelloWorld.class
    

Usar SBT

  1. Descargar SBT

  2. Crea un proyecto "HelloWorld", como se muestra a continuación.

    $ mkdir hello
    $ cd hello
    $ echo \
    'object HelloWorld {def main(args: Array[String]) = println("Hello, world!")}' > \
    HelloWorld.scala
    

  3. Crea un archivo de configuración sbt.build para definir artifactName (el nombre del archivo JAR que generarás más abajo) como "HelloWorld.jar" (consulta Modificar artefactos predeterminados).

    echo \
    'artifactName := { (sv: ScalaVersion, module: ModuleID, artifact: Artifact) =>
    "HelloWorld.jar" }' > \
    build.sbt
    

  4. Iniciar SBT y ejecutar código

    $ sbt
    [info] Set current project to hello ...
    > run
    ... Compiling 1 Scala source to .../hello/target/scala-.../classes...
    ... Running HelloWorld
    Hello, world!
    [success] Total time: 3 s ...
    

  5. Empaqueta el código en un archivo JAR con un manifiesto que especifique el punto de entrada de la clase principal (HelloWorld) y, a continuación, sal.

    > package
    ... Packaging .../hello/target/scala-.../HelloWorld.jar ...
    ... Done packaging.
    [success] Total time: ...
    > exit
    

Crear un archivo JAR

Crea un archivo JAR con SBT o con el comando jar.

Crear un archivo JAR con SBT

El comando package de SBT crea un archivo JAR (consulta Usar SBT).

Crear un archivo JAR manualmente

  1. Cambia el directorio (cd) al directorio que contiene los archivos HelloWorld*.class compilados y, a continuación, ejecuta el siguiente comando para empaquetar los archivos de clase en un archivo JAR con un manifiesto que especifique el punto de entrada de la clase principal (HelloWorld).
    $ jar cvfe HelloWorld.jar HelloWorld HelloWorld*.class
    added manifest
    adding: HelloWorld$.class(in = 637) (out= 403)(deflated 36%)
    adding: HelloWorld.class(in = 586) (out= 482)(deflated 17%)
    

Copiar el archivo JAR en Cloud Storage

  1. Usa la CLI de Google Cloud para copiar el archivo JAR en un segmento de Cloud Storage de tu proyecto.
$ gcloud storage cp HelloWorld.jar gs://<bucket-name>/
Copying file://HelloWorld.jar [Content-Type=application/java-archive]...
Uploading   gs://bucket-name/HelloWorld.jar:         1.46 KiB/1.46 KiB

Enviar un archivo JAR a una tarea de Spark de Dataproc

  1. Usa la Google Cloud consola para enviar el archivo JAR a tu tarea de Spark de Dataproc. Rellena los campos de la página Enviar un trabajo de la siguiente manera:

    • Clúster: selecciona el nombre de tu clúster en la lista de clústeres.
    • Tipo de tarea: Spark
    • Clase principal o JAR: especifica la ruta del URI de Cloud Storage a tu archivo JAR HelloWorld (gs://your-bucket-name/HelloWorld.jar).

      Si tu archivo JAR no incluye un manifiesto que especifique el punto de entrada de tu código ("Main-Class: HelloWorld"), el campo "Clase principal o archivo JAR" debe indicar el nombre de tu clase principal ("HelloWorld"), y debes rellenar el campo "Archivos JAR" con la ruta URI de tu archivo JAR (gs://your-bucket-name/HelloWorld.jar).

  2. Haz clic en Enviar para iniciar el trabajo. Una vez que se inicia el trabajo, se añade a la lista Trabajos.

  3. Haz clic en el ID de la tarea para abrir la página Tareas, donde puedes ver el resultado del controlador de la tarea.

Escribir y ejecutar código de Scala en Spark mediante el REPL spark-shell del clúster

Puede que quieras desarrollar aplicaciones Scala directamente en tu clúster de Dataproc. Hadoop y Spark están preinstalados en los clústeres de Dataproc y configurados con el conector de Cloud Storage, que permite que tu código lea y escriba datos directamente desde y hacia Cloud Storage.

En este ejemplo se muestra cómo conectarse por SSH al nodo maestro del clúster de Dataproc de tu proyecto y, a continuación, usar el REPL spark-shell para crear y ejecutar una aplicación de recuento de palabras de Scala.

  1. Conectarse mediante SSH al nodo maestro del clúster de Dataproc

    1. Ve a la página Clústeres de Dataproc de tu proyecto en la Google Cloud consola y, a continuación, haz clic en el nombre de tu clúster.

    2. En la página de detalles del clúster, selecciona la pestaña Instancias de VM y, a continuación, haz clic en la selección de SSH que aparece a la derecha de la fila del nombre del clúster.

      Se abre una ventana del navegador en el directorio principal del nodo maestro.

  2. Abre la spark-shell

    $ spark-shell
    ...
    Using Scala version ...
    Type in expressions to have them evaluated.
    Type :help for more information.
    ...
    Spark context available as sc.
    ...
    SQL context available as sqlContext.
    scala>
    

  3. Crea un RDD (conjunto de datos distribuido resistente) a partir de un fragmento de texto de Shakespeare ubicado en Cloud Storage público

    scala> val text_file = sc.textFile("gs://pub/shakespeare/rose.txt")
    

  4. Ejecuta un mapreduce de recuento de palabras en el texto y, a continuación, muestra el resultado de wordcounts.

    scala> val wordCounts = text_file.flatMap(line => line.split(" ")).map(word =>
    (word, 1)).reduceByKey((a, b) => a + b)
    scala> wordCounts.collect
    ... Array((call,1), (What's,1), (sweet.,1), (we,1), (as,1), (name?,1), (any,1), (other,1),
    (rose,1), (smell,1), (name,1), (a,2), (would,1), (in,1), (which,1), (That,1), (By,1))
    

  5. Guarda los recuentos en <bucket-name>/wordcounts-out en Cloud Storage y, a continuación, cierra scala-shell.

    scala> wordCounts.saveAsTextFile("gs://<bucket-name>/wordcounts-out/")
    scala> exit
    

  6. Usar gcloud CLI para enumerar los archivos de salida y mostrar el contenido de los archivos

    $ gcloud storage ls gs://bucket-name/wordcounts-out/
    gs://spark-scala-demo-bucket/wordcounts-out/
    gs://spark-scala-demo-bucket/wordcounts-out/_SUCCESS
    gs://spark-scala-demo-bucket/wordcounts-out/part-00000
    gs://spark-scala-demo-bucket/wordcounts-out/part-00001
    

  7. Consultar el contenido de gs://<bucket-name>/wordcounts-out/part-00000

    $ gcloud storage cat gs://bucket-name/wordcounts-out/part-00000
    (call,1)
    (What's,1)
    (sweet.,1)
    (we,1)
    (as,1)
    (name?,1)
    (any,1)
    (other,1)
    

Ejecutar el código de ejemplo preinstalado

El nodo maestro de Dataproc contiene archivos JAR ejecutables con ejemplos estándar de Apache Hadoop y Spark.

Tipo de tarro Master node /usr/lib/ location Fuente de GitHub Documentación de Apache
Hadoop hadoop-mapreduce/hadoop-mapreduce-examples.jar enlace de origen Tutorial de MapReduce
Spark spark/lib/spark-examples.jar enlace de origen Ejemplos de Spark

Enviar ejemplos a tu clúster desde la línea de comandos

Los ejemplos se pueden enviar desde tu máquina de desarrollo local mediante la herramienta de línea de comandos gcloud de la CLI de Google Cloud (consulta Usar la consola Google Cloud para enviar trabajos desde la consola Google Cloud ).

Ejemplo de WordCount de Hadoop

gcloud dataproc jobs submit hadoop --cluster=cluster-name \
    --region=region \
    --jars=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \
    --class=org.apache.hadoop.examples.WordCount \
    -- URI of input file URI of output file

Ejemplo de WordCount de Spark

gcloud dataproc jobs submit spark --cluster=cluster-name \
    --region=region \
    --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
    --class=org.apache.spark.examples.JavaWordCount \
    -- URI of input file

Apagar un clúster

Para evitar que se te apliquen cargos, apaga el clúster y elimina los recursos de Cloud Storage (segmento y archivos de Cloud Storage) que has usado en este tutorial.

Para apagar un clúster, sigue estos pasos:

gcloud dataproc clusters delete cluster-name \
    --region=region

Para eliminar el archivo JAR de Cloud Storage, haz lo siguiente:

gcloud storage rm gs://bucket-name/HelloWorld.jar

Puedes eliminar un contenedor y todas sus carpetas y archivos con el siguiente comando:

gcloud storage rm gs://bucket-name/ --recursive

Siguientes pasos