Use the Cloud Storage connector with Apache Spark

This tutorial show you how to run example code that uses the Cloud Storage connector with Apache Spark.

Objectives

Write a simple wordcount Spark job in Java, Scala, or Python, then run the job on a Dataproc cluster.

Costs

This tutorial uses billable components of Google Cloud Platform, including:

  • Compute Engine
  • Dataproc
  • Cloud Storage

Use the Pricing Calculator to generate a cost estimate based on your projected usage. New Cloud Platform users might be eligible for a free trial.

Before you begin

Run the steps below to prepare to run the code in this tutorial.

  1. Set up your project. If necessary, set up a project with the Dataproc, Compute Engine, and Cloud Storage APIs enabled and the Cloud SDK installed on your local machine.

    1. Accede a tu Cuenta de Google.

      Si todavía no tienes una cuenta, regístrate para obtener una nueva.

    2. En GCP Console, en la página de selección de proyecto, selecciona o crea un proyecto de GCP.

      Ir a la página de selección de proyecto

    3. Asegúrate de tener habilitada la facturación para tu proyecto de Google Cloud Platform. Obtén información sobre cómo confirmar que tienes habilitada la facturación para tu proyecto.

    4. Habilita lasDataproc, Compute Engine, and Cloud StorageAPIAPI.

      Habilita lasAPI

    5. Instala e inicializa el SDK de Cloud.

  2. Create a Cloud Storage bucket. You need a Cloud Storage to hold tutorial data. If you do not have one ready to use, create a new bucket in your project.

    1. En GCP Console, ve a la página Navegador de Cloud Storage.

      Ir a la página Navegador de Cloud Storage

    2. Haz clic en Crear depósito.
    3. En el diálogo Crear depósito, especifica los siguientes atributos:
    4. Haz clic en Crear.

  3. Set local environment variables. Set environment variables on your local machine. Set your Google Cloud project-id and the name of the Cloud Storage bucket you will use for this tutorial. Also provide the name and zone of an existing or new Dataproc cluster. You can create a cluster to use in this tutorial in the next step.

    PROJECT=project-id
    
    BUCKET_NAME=bucket-name
    
    CLUSTER=cluster-name
    
    ZONE=cluster-region Example: "us-west1-a"
    

  4. Create a Dataproc cluster. Run the command, below, to create a single-node Dataproc cluster in the specified Compute Engine zone.

    gcloud dataproc clusters create $CLUSTER \
        --project=${PROJECT} \
        --zone=${ZONE} \
        --single-node
    

  5. Copy public data to your Cloud Storage bucket. Copy a public data Shakespeare text snippet into the input folder of your Cloud Storage bucket:

    gsutil cp gs://pub/shakespeare/rose.txt \
        gs://${BUCKET_NAME}/input/rose.txt
    

  6. Set up a Java (Apache Maven), Scala (SBT), or Python development environment.

Prepare the Spark wordcount job

Select a tab, below, to follow the steps to prepare a job package or file to submit to your cluster. You can prepare one of the following job types;

Java

  1. Copy pom.xml file to your local machine. The following pom.xml file specifies Scala and Spark library dependencies, which are given a provided scope to indicate that the Dataproc cluster will provide these libraries at runtime. The pom.xml file does not specify a Cloud Storage dependency because the connector implements the standard HDFS interface. When a Spark job accesses Cloud Storage cluster files (files with URIs that start with gs://), the system automatically uses the Cloud Storage connector to access the files in Cloud Storage
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
    
      <groupId>dataproc.codelab</groupId>
      <artifactId>word-count</artifactId>
      <version>1.0</version>
    
      <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
      </properties>
    
      <dependencies>
        <dependency>
          <groupId>org.scala-lang</groupId>
          <artifactId>scala-library</artifactId>
          <version>Scala version, for example, 2.11.8</version>
          <scope>provided</scope>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_Scala major.minor.version, for example, 2.11</artifactId>
          <version>Spark version, for example, 2.3.1</version>
          <scope>provided</scope>
        </dependency>
      </dependencies>
    </project>
    
  2. Copy the WordCount.java code listed, below, to your local machine.
    1. Create a set of directories with the path src/main/java/dataproc/codelab:
      mkdir -p src/main/java/dataproc/codelab
      
    2. Copy WordCount.java to your local machine into src/main/java/dataproc/codelab:
      cp WordCount.java src/main/java/dataproc/codelab
      

    WordCount.java is a simple Spark job in Java that reads text files from Cloud Storage, performs a word count, then writes the text file results to Cloud Storage.

    package dataproc.codelab;
    
    import java.util.Arrays;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import scala.Tuple2;
    
    public class WordCount {
      public static void main(String[] args) {
        if (args.length != 2) {
          throw new IllegalArgumentException("Exactly 2 arguments are required: <inputUri> <outputUri>");
        }
        String inputPath = args[0];
        String outputPath = args[1];
        JavaSparkContext sparkContext = new JavaSparkContext(new SparkConf().setAppName("Word Count"));
        JavaRDD<String> lines = sparkContext.textFile(inputPath);
        JavaRDD<String> words = lines.flatMap(
            (String line) -> Arrays.asList(line.split(" ")).iterator()
        );
        JavaPairRDD<String, Integer> wordCounts = words.mapToPair(
            (String word) -> new Tuple2<>(word, 1)
        ).reduceByKey(
            (Integer count1, Integer count2) -> count1 + count2
        );
        wordCounts.saveAsTextFile(outputPath);
      }
    }
    
  3. Build the package.
    mvn clean package
    
    If the build is successful, a target/spark-with-gcs-1.0-SNAPSHOT.jar is created.
  4. Stage the package to Cloud Storage.
    gsutil cp target/word-count-1.0.jar \
        gs://${BUCKET_NAME}/java/word-count-1.0.jar
    

Scala

  1. Copy build.sbt file to your local machine. The following build.sbt file specifies Scala and Spark library dependencies, which are given a provided scope to indicate that the Dataproc cluster will provide these libraries at runtime. The build.sbt file does not specify a Cloud Storage dependency because the connector implements the standard HDFS interface. When a Spark job accesses Cloud Storage cluster files (files with URIs that start with gs://), the system automatically uses the Cloud Storage connector to access the files in Cloud Storage
    scalaVersion := "Scala version, for example, 2.11.8"
    
    name := "word-count"
    organization := "dataproc.codelab"
    version := "1.0"
    
    libraryDependencies ++= Seq(
      "org.scala-lang" % "scala-library" % scalaVersion.value % "provided",
      "org.apache.spark" %% "spark-core" % "Spark version, for example, 2.3.1" % "provided"
    )
    
    
  2. Copy word-count.scala to your local machine. This is a simple Spark job in Java that reads text files from Cloud Storage, performs a word count, then writes the text file results to Cloud Storage.
    package dataproc.codelab
    
    import org.apache.spark.SparkContext
    import org.apache.spark.SparkConf
    
    object WordCount {
      def main(args: Array[String]) {
        if (args.length != 2) {
          throw new IllegalArgumentException(
              "Exactly 2 arguments are required: <inputPath> <outputPath>")
        }
    
        val inputPath = args(0)
        val outputPath = args(1)
    
        val sc = new SparkContext(new SparkConf().setAppName("Word Count"))
        val lines = sc.textFile(inputPath)
        val words = lines.flatMap(line => line.split(" "))
        val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
        wordCounts.saveAsTextFile(outputPath)
      }
    }
    
    
  3. Build the package.
    sbt clean package
    
    If the build is successful, a target/scala-2.11/word-count_2.11-1.0.jar is created.
  4. Stage the package to Cloud Storage.
    gsutil cp target/scala-2.11/word-count_2.11-1.0.jar \
        gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar
    

Python

  1. Copy word-count.py to your local machine. This is a simple Spark job in Python using PySpark that reads text files from Cloud Storage, performs a word count, then writes the text file results to Cloud Storage.
    #!/usr/bin/env python
    
    import pyspark
    import sys
    
    if len(sys.argv) != 3:
      raise Exception("Exactly 2 arguments are required: <inputUri> <outputUri>")
    
    inputUri=sys.argv[1]
    outputUri=sys.argv[2]
    
    sc = pyspark.SparkContext()
    lines = sc.textFile(sys.argv[1])
    words = lines.flatMap(lambda line: line.split())
    wordCounts = words.map(lambda word: (word, 1)).reduceByKey(lambda count1, count2: count1 + count2)
    wordCounts.saveAsTextFile(sys.argv[2])
    

Submit the job

Run the following gcloud command to submit the wordcount job to your Dataproc cluster.

Java

gcloud dataproc jobs submit spark \
    --cluster=${CLUSTER} \
    --class dataproc.codelab.WordCount \
    --jars gs://${BUCKET_NAME}/java/word-count-1.0.jar \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

Scala

gcloud dataproc jobs submit spark \
    --cluster=${CLUSTER} \
    --class dataproc.codelab.WordCount \
    --jars gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

Python

gcloud dataproc jobs submit pyspark word-count.py \
    --cluster=${CLUSTER} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

View the output

After the job finishes, run the following Cloud SDK gsutil command to view the wordcount output.

gsutil cat gs://${BUCKET_NAME}/output/*

The wordcount output should be similar to the following:

(a,2)
(call,1)
(What's,1)
(sweet.,1)
(we,1)
(as,1)
(name?,1)
(any,1)
(other,1)
(rose,1)
(smell,1)
(name,1)
(would,1)
(in,1)
(which,1)
(That,1)
(By,1)

Cleaning up

After you've finished the Use Cloud Dataproc tutorial, you can clean up the resources that you created on GCP so they won't take up quota and you won't be billed for them in the future. The following sections describe how to delete or turn off these resources.

Deleting the project

The easiest way to eliminate billing is to delete the project that you created for the tutorial.

To delete the project:

  1. En GCP Console, dirígete a la página Administrar recursos.

    Ir a la página Administración de recursos

  2. En la lista de proyectos, selecciona el proyecto que deseas borrar y haz clic en Borrar .
  3. En el cuadro de diálogo, escribe el ID del proyecto y, luego, haz clic en Cerrar para borrar el proyecto.

Deleting the Dataproc cluster

Instead of deleting your project, you may wish to only delete your cluster within the project.

Deleting the Cloud Storage bucket

GCP Console

  1. En GCP Console, ve a la página Navegador de Cloud Storage.

    Ir a la página Navegador de Cloud Storage

  2. Haz clic en la casilla de verificación del depósito que deseas borrar.
  3. Haz clic en Borrar para borrar el depósito.

Línea de comandos

    Borra el depósito
    gsutil rb [BUCKET_NAME]
      
¿Te sirvió esta página? Envíanos tu opinión:

Enviar comentarios sobre…

Cloud Dataproc Documentation
¿Necesitas ayuda? Visita nuestra página de asistencia.