Siapkan tugas jumlah kata Spark
Pilih tab di bawah untuk mengikuti langkah-langkah menyiapkan paket atau file tugas untuk dikirimkan ke cluster Anda. Anda dapat menyiapkan salah satu jenis tugas berikut;
- Tugas Spark di Java menggunakan Apache Maven untuk membuat paket JAR
 - Tugas Spark di Scala menggunakan SBT untuk membuat paket JAR
 - Tugas Spark di Python (PySpark)
 
Java
- Salin file 
pom.xmlke komputer lokal Anda. Filepom.xmlberikut menentukan dependensi library Scala dan Spark, yang diberi cakupanprovideduntuk menunjukkan bahwa cluster Dataproc akan menyediakan library ini saat runtime. Filepom.xmltidak menentukan dependensi Cloud Storage karena konektor menerapkan antarmuka HDFS standar. Saat tugas Spark mengakses file cluster Cloud Storage (file dengan URI yang dimulai dengangs://), sistem secara otomatis menggunakan konektor Cloud Storage untuk mengakses file di Cloud Storage<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>dataproc.codelab</groupId> <artifactId>word-count</artifactId> <version>1.0</version> <properties> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>Scala version, for example,
2.11.8</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_Scala major.minor.version, for example,2.11</artifactId> <version>Spark version, for example,2.3.1</version> <scope>provided</scope> </dependency> </dependencies> </project> - Salin kode 
WordCount.javayang tercantum di bawah, ke komputer lokal Anda.- Buat serangkaian direktori dengan jalur
        
src/main/java/dataproc/codelab:mkdir -p src/main/java/dataproc/codelab
 - Salin 
WordCount.javake komputer lokal Anda ke dalamsrc/main/java/dataproc/codelab:cp WordCount.java src/main/java/dataproc/codelab
 
WordCount.javaadalah tugas Spark di Java yang membaca file teks dari Cloud Storage, melakukan penghitungan kata, lalu menulis hasil file teks ke Cloud Storage.package dataproc.codelab; import java.util.Arrays; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import scala.Tuple2; public class WordCount { public static void main(String[] args) { if (args.length != 2) { throw new IllegalArgumentException("Exactly 2 arguments are required: <inputUri> <outputUri>"); } String inputPath = args[0]; String outputPath = args[1]; JavaSparkContext sparkContext = new JavaSparkContext(new SparkConf().setAppName("Word Count")); JavaRDD<String> lines = sparkContext.textFile(inputPath); JavaRDD<String> words = lines.flatMap( (String line) -> Arrays.asList(line.split(" ")).iterator() ); JavaPairRDD<String, Integer> wordCounts = words.mapToPair( (String word) -> new Tuple2<>(word, 1) ).reduceByKey( (Integer count1, Integer count2) -> count1 + count2 ); wordCounts.saveAsTextFile(outputPath); } }
 - Buat serangkaian direktori dengan jalur
        
 - Bangun paket.
 Jika build berhasil,mvn clean package
target/word-count-1.0.jarakan dibuat. - Lakukan staging paket ke Cloud Storage.
gcloud storage cp target/word-count-1.0.jar \ gs://${BUCKET_NAME}/java/word-count-1.0.jar 
Scala
- Salin file 
build.sbtke komputer lokal Anda. Filebuild.sbtberikut menentukan dependensi library Scala dan Spark, yang diberi cakupanprovideduntuk menunjukkan bahwa cluster Dataproc akan menyediakan library ini saat runtime. Filebuild.sbttidak menentukan dependensi Cloud Storage karena konektor menerapkan antarmuka HDFS standar. Saat tugas Spark mengakses file cluster Cloud Storage (file dengan URI yang diawali dengangs://), sistem akan otomatis menggunakan konektor Cloud Storage untuk mengakses file di Cloud StoragescalaVersion := "Scala version, for example,
2.11.8" name := "word-count" organization := "dataproc.codelab" version := "1.0" libraryDependencies ++= Seq( "org.scala-lang" % "scala-library" % scalaVersion.value % "provided", "org.apache.spark" %% "spark-core" % "Spark version, for example,2.3.1" % "provided" ) - Salin 
word-count.scalake komputer lokal Anda. Ini adalah tugas Spark di Java yang membaca file teks dari Cloud Storage, melakukan penghitungan kata, lalu menulis hasil file teks ke Cloud Storage.package dataproc.codelab import org.apache.spark.SparkContext import org.apache.spark.SparkConf object WordCount { def main(args: Array[String]) { if (args.length != 2) { throw new IllegalArgumentException( "Exactly 2 arguments are required: <inputPath> <outputPath>") } val inputPath = args(0) val outputPath = args(1) val sc = new SparkContext(new SparkConf().setAppName("Word Count")) val lines = sc.textFile(inputPath) val words = lines.flatMap(line => line.split(" ")) val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _) wordCounts.saveAsTextFile(outputPath) } }
 - Bangun paket.
 Jika build berhasil,sbt clean package
target/scala-2.11/word-count_2.11-1.0.jarakan dibuat. - Lakukan staging paket ke Cloud Storage.
gcloud storage cp target/scala-2.11/word-count_2.11-1.0.jar \ gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar 
Python
- Salin 
word-count.pyke komputer lokal Anda. Ini adalah tugas Spark di Python menggunakan PySpark yang membaca file teks dari Cloud Storage, melakukan penghitungan kata, lalu menulis hasil file teks ke Cloud Storage.#!/usr/bin/env python import pyspark import sys if len(sys.argv) != 3: raise Exception("Exactly 2 arguments are required: <inputUri> <outputUri>") inputUri=sys.argv[1] outputUri=sys.argv[2] sc = pyspark.SparkContext() lines = sc.textFile(sys.argv[1]) words = lines.flatMap(lambda line: line.split()) wordCounts = words.map(lambda word: (word, 1)).reduceByKey(lambda count1, count2: count1 + count2) wordCounts.saveAsTextFile(sys.argv[2])
 
Kirim tugas
Jalankan perintah gcloud berikut untuk mengirimkan tugas wordcount ke cluster Dataproc Anda.
Java
gcloud dataproc jobs submit spark \
    --cluster=${CLUSTER} \
    --class=dataproc.codelab.WordCount \
    --jars=gs://${BUCKET_NAME}/java/word-count-1.0.jar \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/
Scala
gcloud dataproc jobs submit spark \
    --cluster=${CLUSTER} \
    --class=dataproc.codelab.WordCount \
    --jars=gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/
Python
gcloud dataproc jobs submit pyspark word-count.py \
    --cluster=${CLUSTER} \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/
Melihat output
Setelah tugas selesai, jalankan perintah gcloud CLI berikut untuk melihat output jumlah kata.
gcloud storage cat gs://${BUCKET_NAME}/output/*
Output jumlah kata akan mirip dengan berikut:
(a,2) (call,1) (What's,1) (sweet.,1) (we,1) (as,1) (name?,1) (any,1) (other,1) (rose,1) (smell,1) (name,1) (would,1) (in,1) (which,1) (That,1) (By,1)