Objectives
Write a simple wordcount Spark job in Java, Scala, or Python, then run the job on a Dataproc cluster.
Costs
This tutorial uses billable components of Google Cloud, including:- Compute Engine
- Dataproc
- Cloud Storage
Use the Pricing Calculator to generate a cost estimate based on your projected usage. New Cloud Platform users might be eligible for a free trial.
Before you begin
Run the steps below to prepare to run the code in this tutorial.
Set up your project. If necessary, set up a project with the Dataproc, Compute Engine, and Cloud Storage APIs enabled and the Cloud SDK installed on your local machine.
-
Sign in to your Google Account.
If you don't already have one, sign up for a new account.
-
In the Google Cloud Console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Cloud project. Learn how to confirm that billing is enabled for your project.
- Enable the Dataproc, Compute Engine, and Cloud Storage APIs.
- Install and initialize the Cloud SDK.
-
Sign in to your Google Account.
Create a Cloud Storage bucket. You need a Cloud Storage to hold tutorial data. If you do not have one ready to use, create a new bucket in your project.
- In the Cloud Console, go to the Cloud Storage Browser page.
- Click Create bucket.
- In the Create bucket dialog, specify the following attributes:
- A unique bucket name, subject to the bucket name requirements.
- A storage class.
- A location where bucket data will be stored.
- Click Create.
Set local environment variables. Set environment variables on your local machine. Set your Google Cloud project-id and the name of the Cloud Storage bucket you will use for this tutorial. Also provide the name and region of an existing or new Dataproc cluster. You can create a cluster to use in this tutorial in the next step.
PROJECT=project-id
BUCKET_NAME=bucket-name
CLUSTER=cluster-name
REGION=cluster-region Example: "us-central1"
Create a Dataproc cluster. Run the command, below, to create a single-node Dataproc cluster in the specified Compute Engine zone.
gcloud dataproc clusters create ${CLUSTER} \ --project=${PROJECT} \ --region=${REGION} \ --single-node
Copy public data to your Cloud Storage bucket. Copy a public data Shakespeare text snippet into the
input
folder of your Cloud Storage bucket:gsutil cp gs://pub/shakespeare/rose.txt \ gs://${BUCKET_NAME}/input/rose.txt
Set up a Java (Apache Maven), Scala (SBT), or Python development environment.
Prepare the Spark wordcount job
Select a tab, below, to follow the steps to prepare a job package or file to submit to your cluster. You can prepare one of the following job types;
- Spark job in Java using Apache Maven to build a JAR package
- Spark job in Scala using SBT to build a JAR package
- Spark job in Python (PySpark)
Java
- Copy
pom.xml
file to your local machine. The followingpom.xml
file specifies Scala and Spark library dependencies, which are given aprovided
scope to indicate that the Dataproc cluster will provide these libraries at runtime. Thepom.xml
file does not specify a Cloud Storage dependency because the connector implements the standard HDFS interface. When a Spark job accesses Cloud Storage cluster files (files with URIs that start withgs://
), the system automatically uses the Cloud Storage connector to access the files in Cloud Storage<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>dataproc.codelab</groupId> <artifactId>word-count</artifactId> <version>1.0</version> <properties> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>Scala version, for example,
2.11.8
</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_Scala major.minor.version, for example,2.11
</artifactId> <version>Spark version, for example,2.3.1
</version> <scope>provided</scope> </dependency> </dependencies> </project> - Copy the
WordCount.java
code listed, below, to your local machine.- Create a set of directories with the path
src/main/java/dataproc/codelab
:mkdir -p src/main/java/dataproc/codelab
- Copy
WordCount.java
to your local machine intosrc/main/java/dataproc/codelab
:cp WordCount.java src/main/java/dataproc/codelab
WordCount.java is a simple Spark job in Java that reads text files from Cloud Storage, performs a word count, then writes the text file results to Cloud Storage.
package dataproc.codelab; import java.util.Arrays; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import scala.Tuple2; public class WordCount { public static void main(String[] args) { if (args.length != 2) { throw new IllegalArgumentException("Exactly 2 arguments are required: <inputUri> <outputUri>"); } String inputPath = args[0]; String outputPath = args[1]; JavaSparkContext sparkContext = new JavaSparkContext(new SparkConf().setAppName("Word Count")); JavaRDD<String> lines = sparkContext.textFile(inputPath); JavaRDD<String> words = lines.flatMap( (String line) -> Arrays.asList(line.split(" ")).iterator() ); JavaPairRDD<String, Integer> wordCounts = words.mapToPair( (String word) -> new Tuple2<>(word, 1) ).reduceByKey( (Integer count1, Integer count2) -> count1 + count2 ); wordCounts.saveAsTextFile(outputPath); } }
- Create a set of directories with the path
- Build the package.
mvn clean package
If the build is successful, atarget/spark-with-gcs-1.0-SNAPSHOT.jar
is created. - Stage the package to Cloud Storage.
gsutil cp target/word-count-1.0.jar \ gs://${BUCKET_NAME}/java/word-count-1.0.jar
Scala
- Copy
build.sbt
file to your local machine. The followingbuild.sbt
file specifies Scala and Spark library dependencies, which are given aprovided
scope to indicate that the Dataproc cluster will provide these libraries at runtime. Thebuild.sbt
file does not specify a Cloud Storage dependency because the connector implements the standard HDFS interface. When a Spark job accesses Cloud Storage cluster files (files with URIs that start withgs://
), the system automatically uses the Cloud Storage connector to access the files in Cloud StoragescalaVersion := "Scala version, for example,
2.11.8
" name := "word-count" organization := "dataproc.codelab" version := "1.0" libraryDependencies ++= Seq( "org.scala-lang" % "scala-library" % scalaVersion.value % "provided", "org.apache.spark" %% "spark-core" % "Spark version, for example,2.3.1
" % "provided" ) - Copy
word-count.scala
to your local machine. This is a simple Spark job in Java that reads text files from Cloud Storage, performs a word count, then writes the text file results to Cloud Storage.package dataproc.codelab import org.apache.spark.SparkContext import org.apache.spark.SparkConf object WordCount { def main(args: Array[String]) { if (args.length != 2) { throw new IllegalArgumentException( "Exactly 2 arguments are required: <inputPath> <outputPath>") } val inputPath = args(0) val outputPath = args(1) val sc = new SparkContext(new SparkConf().setAppName("Word Count")) val lines = sc.textFile(inputPath) val words = lines.flatMap(line => line.split(" ")) val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _) wordCounts.saveAsTextFile(outputPath) } }
- Build the package.
sbt clean package
If the build is successful, atarget/scala-2.11/word-count_2.11-1.0.jar
is created. - Stage the package to Cloud Storage.
gsutil cp target/scala-2.11/word-count_2.11-1.0.jar \ gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar
Python
- Copy
word-count.py
to your local machine. This is a simple Spark job in Python using PySpark that reads text files from Cloud Storage, performs a word count, then writes the text file results to Cloud Storage.#!/usr/bin/env python import pyspark import sys if len(sys.argv) != 3: raise Exception("Exactly 2 arguments are required: <inputUri> <outputUri>") inputUri=sys.argv[1] outputUri=sys.argv[2] sc = pyspark.SparkContext() lines = sc.textFile(sys.argv[1]) words = lines.flatMap(lambda line: line.split()) wordCounts = words.map(lambda word: (word, 1)).reduceByKey(lambda count1, count2: count1 + count2) wordCounts.saveAsTextFile(sys.argv[2])
Submit the job
Run the following gcloud
command to submit the wordcount job to your
Dataproc cluster.
Java
gcloud dataproc jobs submit spark \ --cluster=${CLUSTER} \ --class=dataproc.codelab.WordCount \ --jars=gs://${BUCKET_NAME}/java/word-count-1.0.jar \ --region=${REGION} \ -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/
Scala
gcloud dataproc jobs submit spark \ --cluster=${CLUSTER} \ --class=dataproc.codelab.WordCount \ --jars=gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar \ --region=${REGION} \ -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/
Python
gcloud dataproc jobs submit pyspark word-count.py \ --cluster=${CLUSTER} \ --region=${REGION} \ -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/
View the output
After the job finishes, run the following Cloud SDK gsutil
command to view the wordcount output.
gsutil cat gs://${BUCKET_NAME}/output/*
The wordcount output should be similar to the following:
(a,2) (call,1) (What's,1) (sweet.,1) (we,1) (as,1) (name?,1) (any,1) (other,1) (rose,1) (smell,1) (name,1) (would,1) (in,1) (which,1) (That,1) (By,1)
Cleaning up
After you've finished the Use Dataproc tutorial, you can clean up the resources that you created on Google Cloud so they won't take up quota and you won't be billed for them in the future. The following sections describe how to delete or turn off these resources.
Deleting the project
The easiest way to eliminate billing is to delete the project that you created for the tutorial.
To delete the project:
- In the Cloud Console, go to the Manage resources page.
- In the project list, select the project that you want to delete, and then click Delete.
- In the dialog, type the project ID, and then click Shut down to delete the project.
Deleting the Dataproc cluster
Instead of deleting your project, you may wish to only delete your cluster within the project.
Deleting the Cloud Storage bucket
Cloud Console
- In the Cloud Console, go to the Cloud Storage Browser page.
- Click the checkbox for the bucket you want to delete.
- To delete the bucket, click Delete delete.
Command line
-
Delete the bucket:
gsutil rb [BUCKET_NAME]