Se usó la API de Cloud Translation para traducir esta página.
Switch to English

Usa el conector de Cloud Storage con Apache Spark

En este instructivo, se muestra cómo ejecutar código de ejemplo que usa el conector de Cloud Storage con Apache Spark.

Objetivos

Escribe un trabajo simple de Spark de recuento de palabras en Java, Scala o Python y, luego, ejecuta el trabajo en un clúster de Dataproc.

Costos

En este instructivo, se usan los siguientes componentes facturables de Google Cloud:

  • Compute Engine
  • Dataproc
  • Cloud Storage

Usa la calculadora de precios para generar una estimación de los costos según el uso previsto. Los usuarios nuevos de Cloud Platform pueden cumplir los requisitos para una prueba gratuita.

Antes de comenzar

Ejecuta los siguientes pasos para prepararte para ejecutar el código en este instructivo.

  1. Configura tu proyecto. Si es necesario, configura un proyecto con las API de Dataproc, Compute Engine y Cloud Storage habilitadas y el SDK de Cloud instalado en tu máquina local.

    1. Accede a tu Cuenta de Google.

      Si todavía no tienes una cuenta, regístrate para obtener una nueva.

    2. En la página del selector de proyectos de Google Cloud Console, selecciona o crea un proyecto de Google Cloud.

      Ir a la página del selector de proyectos

    3. Asegúrate de que la facturación esté habilitada para tu proyecto de Cloud. Descubre cómo confirmar que tienes habilitada la facturación en un proyecto.

    4. Habilita las API de Dataproc, Compute Engine, and Cloud Storage.

      Habilita las API

    5. Instala e inicializa el SDK de Cloud.

  2. Cree un bucket de Cloud Storage Necesitas Cloud Storage para guardar los datos del instructivo. Si no tienes uno listo para usar, crea un bucket nuevo en tu proyecto.

    1. En Cloud Console, ve a la página Navegador de Cloud Storage.

      Ir a la página Navegador de Cloud Storage

    2. Haz clic en Crear depósito.
    3. En el diálogo Crear bucket, especifica los siguientes atributos:
    4. Haz clic en Crear.

  3. Configura las variables de entorno locales. Configura variables de entorno en tu máquina local. Configura tu ID del proyecto de Google Cloud y el nombre del depósito de Cloud Storage que usarás para este instructivo. Además, proporciona el nombre y la región de un clúster de Dataproc nuevo o existente. Puedes crear un clúster para usar en este instructivo en el siguiente paso.

    PROJECT=project-id
    
    BUCKET_NAME=bucket-name
    
    CLUSTER=cluster-name
    
    REGION=cluster-region Example: "us-central1"
    

  4. Crea un clúster de Dataproc. Ejecuta el comando siguiente para crear un clúster de Dataproc de nodo único en la zona de Compute Engine especificada.

    gcloud dataproc clusters create ${CLUSTER} \
        --project=${PROJECT} \
        --region=${REGION} \
        --single-node
    

  5. Copia los datos públicos en tu bucket de Cloud Storage Copia un fragmento de texto público de un fragmento de texto de Shakespeare en la carpeta input de tu bucket de Cloud Storage:

    gsutil cp gs://pub/shakespeare/rose.txt \
        gs://${BUCKET_NAME}/input/rose.txt
    

  6. Configura un entorno de desarrollo de Java (Apache Maven), Scala (SBT) o Python.

Prepara el trabajo de conteo de palabras de Spark

Selecciona una pestaña a continuación a fin de seguir los pasos para preparar un paquete de trabajo o un archivo para enviar a tu clúster. Puedes preparar uno de los siguientes tipos de trabajo;

Java

  1. Copia el archivo pom.xml a tu máquina local. El siguiente archivo pom.xml especifica las dependencias de la biblioteca de Scala y Spark, que tienen un alcance provided para indicar que el clúster de Dataproc proporcionará estas bibliotecas en el entorno de ejecución. El archivo pom.xml no especifica una dependencia de Cloud Storage porque el conector implementa la interfaz estándar de HDFS. Cuando un trabajo de Spark accede a los archivos del clúster de Cloud Storage (archivos con URI que comienzan con gs://), el sistema utiliza de forma automática el conector de Cloud Storage para acceder a los archivos en Cloud Storage
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
    
      <groupId>dataproc.codelab</groupId>
      <artifactId>word-count</artifactId>
      <version>1.0</version>
    
      <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
      </properties>
    
      <dependencies>
        <dependency>
          <groupId>org.scala-lang</groupId>
          <artifactId>scala-library</artifactId>
          <version>Scala version, for example, 2.11.8</version>
          <scope>provided</scope>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_Scala major.minor.version, for example, 2.11</artifactId>
          <version>Spark version, for example, 2.3.1</version>
          <scope>provided</scope>
        </dependency>
      </dependencies>
    </project>
    
  2. Copia el código WordCount.java que aparece a continuación en tu máquina local.
    1. Crea un conjunto de directorios con la ruta de acceso src/main/java/dataproc/codelab:
      mkdir -p src/main/java/dataproc/codelab
      
    2. Copia WordCount.java en tu máquina local en src/main/java/dataproc/codelab:
      cp WordCount.java src/main/java/dataproc/codelab
      

    WordCount.java es un trabajo simple de Spark en Java que lee archivos de texto de Cloud Storage, realiza un conteo de palabras y, luego, escribe los resultados del archivo de texto en Cloud Storage.

    package dataproc.codelab;
    
    import java.util.Arrays;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import scala.Tuple2;
    
    public class WordCount {
      public static void main(String[] args) {
        if (args.length != 2) {
          throw new IllegalArgumentException("Exactly 2 arguments are required: <inputUri> <outputUri>");
        }
        String inputPath = args[0];
        String outputPath = args[1];
        JavaSparkContext sparkContext = new JavaSparkContext(new SparkConf().setAppName("Word Count"));
        JavaRDD<String> lines = sparkContext.textFile(inputPath);
        JavaRDD<String> words = lines.flatMap(
            (String line) -> Arrays.asList(line.split(" ")).iterator()
        );
        JavaPairRDD<String, Integer> wordCounts = words.mapToPair(
            (String word) -> new Tuple2<>(word, 1)
        ).reduceByKey(
            (Integer count1, Integer count2) -> count1 + count2
        );
        wordCounts.saveAsTextFile(outputPath);
      }
    }
    
  3. Compila el paquete.
    mvn clean package
    
    Si la compilación se realiza de forma correcta, se crea un target/spark-with-gcs-1.0-SNAPSHOT.jar.
  4. Almacena el paquete en etapa intermedia en Cloud Storage.
    gsutil cp target/word-count-1.0.jar \
        gs://${BUCKET_NAME}/java/word-count-1.0.jar
    

Scala

  1. Copia el archivo build.sbt a tu máquina local. El siguiente archivo build.sbt especifica las dependencias de la biblioteca de Scala y Spark, que tienen un alcance provided para indicar que el clúster de Dataproc proporcionará estas bibliotecas en el entorno de ejecución. El archivo build.sbt no especifica una dependencia de Cloud Storage porque el conector implementa la interfaz estándar de HDFS. Cuando un trabajo de Spark accede a los archivos del clúster de Cloud Storage (archivos con URI que comienzan con gs://), el sistema utiliza de forma automática el conector de Cloud Storage para acceder a los archivos en Cloud Storage
    scalaVersion := "Scala version, for example, 2.11.8"
    
    name := "word-count"
    organization := "dataproc.codelab"
    version := "1.0"
    
    libraryDependencies ++= Seq(
      "org.scala-lang" % "scala-library" % scalaVersion.value % "provided",
      "org.apache.spark" %% "spark-core" % "Spark version, for example, 2.3.1" % "provided"
    )
    
    
  2. Copia word-count.scala a tu máquina local. Este es un trabajo simple de Spark en Java que lee archivos de texto de Cloud Storage, realiza un conteo de palabras y, luego, escribe los resultados del archivo de texto en Cloud Storage.
    package dataproc.codelab
    
    import org.apache.spark.SparkContext
    import org.apache.spark.SparkConf
    
    object WordCount {
      def main(args: Array[String]) {
        if (args.length != 2) {
          throw new IllegalArgumentException(
              "Exactly 2 arguments are required: <inputPath> <outputPath>")
        }
    
        val inputPath = args(0)
        val outputPath = args(1)
    
        val sc = new SparkContext(new SparkConf().setAppName("Word Count"))
        val lines = sc.textFile(inputPath)
        val words = lines.flatMap(line => line.split(" "))
        val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
        wordCounts.saveAsTextFile(outputPath)
      }
    }
    
    
  3. Compila el paquete.
    sbt clean package
    
    Si la compilación se realiza de forma correcta, se crea un target/scala-2.11/word-count_2.11-1.0.jar.
  4. Almacena el paquete en etapa intermedia en Cloud Storage.
    gsutil cp target/scala-2.11/word-count_2.11-1.0.jar \
        gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar
    

Python

  1. Copia word-count.py a tu máquina local. Este es un trabajo de Spark simple en Python con PySpark que lee archivos de texto de Cloud Storage, realiza un conteo de palabras y, luego, escribe los resultados del archivo de texto en Cloud Storage.
    #!/usr/bin/env python
    
    import pyspark
    import sys
    
    if len(sys.argv) != 3:
      raise Exception("Exactly 2 arguments are required: <inputUri> <outputUri>")
    
    inputUri=sys.argv[1]
    outputUri=sys.argv[2]
    
    sc = pyspark.SparkContext()
    lines = sc.textFile(sys.argv[1])
    words = lines.flatMap(lambda line: line.split())
    wordCounts = words.map(lambda word: (word, 1)).reduceByKey(lambda count1, count2: count1 + count2)
    wordCounts.saveAsTextFile(sys.argv[2])
    

Envía el trabajo

Ejecuta el siguiente comando de gcloud para enviar el trabajo de conteo de palabras a tu clúster de Dataproc.

Java

gcloud dataproc jobs submit spark \
    --cluster=${CLUSTER} \
    --class=dataproc.codelab.WordCount \
    --jars=gs://${BUCKET_NAME}/java/word-count-1.0.jar \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

Scala

gcloud dataproc jobs submit spark \
    --cluster=${CLUSTER} \
    --class=dataproc.codelab.WordCount \
    --jars=gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

Python

gcloud dataproc jobs submit pyspark word-count.py \
    --cluster=${CLUSTER} \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

Revise el resultado.

Una vez que finalice el trabajo, ejecuta el siguiente comando de gsutil del SDK de Cloud para ver el resultado del recuento de palabras.

gsutil cat gs://${BUCKET_NAME}/output/*

El resultado del conteo de palabras debe ser similar al siguiente:

(a,2)
(call,1)
(What's,1)
(sweet.,1)
(we,1)
(as,1)
(name?,1)
(any,1)
(other,1)
(rose,1)
(smell,1)
(name,1)
(would,1)
(in,1)
(which,1)
(That,1)
(By,1)

Limpieza

Una vez que termines el instructivo Usa Dataproc, puedes realizar una limpieza de los recursos que creaste en Google Cloud a fin de que no consuman tu cuota y no se te cobre por ellos en el futuro. En las siguientes secciones, se describe cómo borrar o desactivar estos recursos.

Borra el proyecto

La manera más fácil de eliminar la facturación es borrar el proyecto que creaste para el instructivo.

Para borrar el proyecto, sigue estos pasos:

  1. En Cloud Console, ve a la página Administrar recursos.

    Ir a Administrar recursos

  2. En la lista de proyectos, elige el proyecto que quieres borrar y haz clic en Borrar.
  3. En el diálogo, escribe el ID del proyecto y, luego, haz clic en Cerrar para borrar el proyecto.

Cómo borrar el clúster de Dataproc

En lugar de borrar tu proyecto, es posible que solo quieras borrar tu clúster dentro del proyecto.

Borra el bucket de Cloud Storage

Cloud Console

  1. En Cloud Console, ve a la página Navegador de Cloud Storage.

    Ir a la página Navegador de Cloud Storage

  2. Haz clic en la casilla de verificación del bucket que quieras borrar.
  3. Para borrar el depósito, haz clic en Borrar .

Línea de comandos

    Borra el depósito:
    gsutil rb [BUCKET_NAME]

¿Qué sigue?