Obiettivi
Scrivi un semplice job Spark di conteggio parole in Java, Scala o Python, quindi eseguilo su un cluster Dataproc.
Costi
In questo documento utilizzi i seguenti componenti fatturabili di Google Cloud:
- Compute Engine
- Dataproc
- Cloud Storage
Per generare una stima dei costi basata sull'utilizzo previsto,
utilizza il Calcolatore prezzi.
Prima di iniziare
Segui i passaggi riportati di seguito per prepararti a eseguire il codice in questo tutorial.
Configura il progetto. Se necessario, configura un progetto con le API Dataproc, Compute Engine e Cloud Storage abilitate e Google Cloud CLI installato sulla tua macchina locale.
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Dataproc, Compute Engine, and Cloud Storage APIs.
-
Create a service account:
-
In the Google Cloud console, go to the Create service account page.
Go to Create service account - Select your project.
-
In the Service account name field, enter a name. The Google Cloud console fills in the Service account ID field based on this name.
In the Service account description field, enter a description. For example,
Service account for quickstart
. - Click Create and continue.
-
Grant the Project > Owner role to the service account.
To grant the role, find the Select a role list, then select Project > Owner.
- Click Continue.
-
Click Done to finish creating the service account.
Do not close your browser window. You will use it in the next step.
-
-
Create a service account key:
- In the Google Cloud console, click the email address for the service account that you created.
- Click Keys.
- Click Add key, and then click Create new key.
- Click Create. A JSON key file is downloaded to your computer.
- Click Close.
-
Set the environment variable
GOOGLE_APPLICATION_CREDENTIALS
to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again. - Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Dataproc, Compute Engine, and Cloud Storage APIs.
-
Create a service account:
-
In the Google Cloud console, go to the Create service account page.
Go to Create service account - Select your project.
-
In the Service account name field, enter a name. The Google Cloud console fills in the Service account ID field based on this name.
In the Service account description field, enter a description. For example,
Service account for quickstart
. - Click Create and continue.
-
Grant the Project > Owner role to the service account.
To grant the role, find the Select a role list, then select Project > Owner.
- Click Continue.
-
Click Done to finish creating the service account.
Do not close your browser window. You will use it in the next step.
-
-
Create a service account key:
- In the Google Cloud console, click the email address for the service account that you created.
- Click Keys.
- Click Add key, and then click Create new key.
- Click Create. A JSON key file is downloaded to your computer.
- Click Close.
-
Set the environment variable
GOOGLE_APPLICATION_CREDENTIALS
to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again. - Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
Crea un bucket Cloud Storage. Ti serve un bucket Cloud Storage per archiviare i dati del tutorial. Se non ne hai uno pronto per l'uso, crea un nuovo bucket nel tuo progetto.
- In the Google Cloud console, go to the Cloud Storage Buckets page.
- Click Create bucket.
- On the Create a bucket page, enter your bucket information. To go to the next
step, click Continue.
- For Name your bucket, enter a name that meets the bucket naming requirements.
-
For Choose where to store your data, do the following:
- Select a Location type option.
- Select a Location option.
- For Choose a default storage class for your data, select a storage class.
- For Choose how to control access to objects, select an Access control option.
- For Advanced settings (optional), specify an encryption method, a retention policy, or bucket labels.
- Click Create.
Imposta le variabili di ambiente locali. Imposta le variabili di ambiente sulla macchina locale. Imposta l'ID progetto Google Cloud e il nome del bucket Cloud Storage che utilizzerai per questo tutorial. Fornisci anche il nome e la regione di un cluster Dataproc esistente o nuovo. Puoi creare un cluster da utilizzare in questo tutorial nel passaggio successivo.
PROJECT=project-id
BUCKET_NAME=bucket-name
CLUSTER=cluster-name
REGION=cluster-region Example: "us-central1"
Crea un cluster Dataproc. Esegui il comando riportato di seguito per creare un cluster Dataproc a un solo nodo nella zona Compute Engine specificata.
gcloud dataproc clusters create ${CLUSTER} \ --project=${PROJECT} \ --region=${REGION} \ --single-node
Copia i dati pubblici nel tuo bucket Cloud Storage. Copia uno snippet di testo di Shakespeare di dati pubblici nella cartella
input
del tuo bucket Cloud Storage:gcloud storage cp gs://pub/shakespeare/rose.txt \ gs://${BUCKET_NAME}/input/rose.txt
Configura un ambiente di sviluppo Java (Apache Maven), Scala (SBT) o Python.
Prepara il job Spark di conteggio parole
Seleziona una scheda di seguito per seguire i passaggi per preparare un pacchetto o un file di job da inviare al cluster. Puoi preparare uno dei seguenti tipi di job:
- Crea job in Java utilizzando Apache Maven per creare un pacchetto JAR
- Job Spark in Scala che utilizza SBT per compilare un JAR
- Job Spark in Python (PySpark)
Java
- Copia il file
pom.xml
sulla tua macchina locale. Il seguente filepom.xml
specifica le dipendenze delle librerie Scala e Spark, a cui viene assegnato un ambitoprovided
per indicare che il cluster Dataproc fornirà queste librerie in fase di esecuzione. Il filepom.xml
non specifica una dipendenza da Cloud Storage perché il connettore implementa l'interfaccia HDFS standard. Quando un job Spark accede ai file del cluster Cloud Storage (file con URI che iniziano congs://
), il sistema utilizza automaticamente il connettore Cloud Storage per accedere ai file in Cloud Storage<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>dataproc.codelab</groupId> <artifactId>word-count</artifactId> <version>1.0</version> <properties> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>Scala version, for example,
2.11.8
</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_Scala major.minor.version, for example,2.11
</artifactId> <version>Spark version, for example,2.3.1
</version> <scope>provided</scope> </dependency> </dependencies> </project> - Copia il codice
WordCount.java
elencato di seguito sul tuo computer locale.- Crea un insieme di directory con il percorso
src/main/java/dataproc/codelab
:mkdir -p src/main/java/dataproc/codelab
- Copia
WordCount.java
sulla tua macchina locale insrc/main/java/dataproc/codelab
:cp WordCount.java src/main/java/dataproc/codelab
WordCount.java è un semplice job Spark in Java che legge i file di testo da Cloud Storage, esegue un conteggio delle parole e poi scrive i risultati del file di testo in Cloud Storage.
package dataproc.codelab; import java.util.Arrays; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import scala.Tuple2; public class WordCount { public static void main(String[] args) { if (args.length != 2) { throw new IllegalArgumentException("Exactly 2 arguments are required: <inputUri> <outputUri>"); } String inputPath = args[0]; String outputPath = args[1]; JavaSparkContext sparkContext = new JavaSparkContext(new SparkConf().setAppName("Word Count")); JavaRDD<String> lines = sparkContext.textFile(inputPath); JavaRDD<String> words = lines.flatMap( (String line) -> Arrays.asList(line.split(" ")).iterator() ); JavaPairRDD<String, Integer> wordCounts = words.mapToPair( (String word) -> new Tuple2<>(word, 1) ).reduceByKey( (Integer count1, Integer count2) -> count1 + count2 ); wordCounts.saveAsTextFile(outputPath); } }
- Crea un insieme di directory con il percorso
- Genera il pacchetto.
Se la compilazione è riuscita, viene creato unmvn clean package
target/word-count-1.0.jar
. - Esegui la gestione delle fasi del pacchetto in Cloud Storage.
gcloud storage cp target/word-count-1.0.jar \ gs://${BUCKET_NAME}/java/word-count-1.0.jar
Scala
- Copia il file
build.sbt
sulla tua macchina locale. Il seguente filebuild.sbt
specifica le dipendenze delle librerie Scala e Spark, a cui viene assegnato un ambitoprovided
per indicare che il cluster Dataproc fornirà queste librerie in fase di esecuzione. Il filebuild.sbt
non specifica una dipendenza da Cloud Storage perché il connettore implementa l'interfaccia HDFS standard. Quando un job Spark accede ai file del cluster Cloud Storage (file con URI che iniziano congs://
), il sistema utilizza automaticamente il connettore Cloud Storage per accedere ai file in Cloud StoragescalaVersion := "Scala version, for example,
2.11.8
" name := "word-count" organization := "dataproc.codelab" version := "1.0" libraryDependencies ++= Seq( "org.scala-lang" % "scala-library" % scalaVersion.value % "provided", "org.apache.spark" %% "spark-core" % "Spark version, for example,2.3.1
" % "provided" ) - Copia
word-count.scala
sulla tua macchina locale. Si tratta di un semplice job Spark in Java che legge i file di testo da Cloud Storage, esegue un conteggio delle parole e poi scrive i risultati del file di testo in Cloud Storage.package dataproc.codelab import org.apache.spark.SparkContext import org.apache.spark.SparkConf object WordCount { def main(args: Array[String]) { if (args.length != 2) { throw new IllegalArgumentException( "Exactly 2 arguments are required: <inputPath> <outputPath>") } val inputPath = args(0) val outputPath = args(1) val sc = new SparkContext(new SparkConf().setAppName("Word Count")) val lines = sc.textFile(inputPath) val words = lines.flatMap(line => line.split(" ")) val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _) wordCounts.saveAsTextFile(outputPath) } }
- Genera il pacchetto.
Se la compilazione è riuscita, viene creato unsbt clean package
target/scala-2.11/word-count_2.11-1.0.jar
. - Esegui la gestione delle fasi del pacchetto in Cloud Storage.
gcloud storage cp target/scala-2.11/word-count_2.11-1.0.jar \ gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar
Python
- Copia
word-count.py
sulla tua macchina locale. Si tratta di un semplice job Spark in Python che utilizza PySpark per leggere i file di testo da Cloud Storage, conteggia le parole e poi scrive i risultati del file di testo in Cloud Storage.#!/usr/bin/env python import pyspark import sys if len(sys.argv) != 3: raise Exception("Exactly 2 arguments are required: <inputUri> <outputUri>") inputUri=sys.argv[1] outputUri=sys.argv[2] sc = pyspark.SparkContext() lines = sc.textFile(sys.argv[1]) words = lines.flatMap(lambda line: line.split()) wordCounts = words.map(lambda word: (word, 1)).reduceByKey(lambda count1, count2: count1 + count2) wordCounts.saveAsTextFile(sys.argv[2])
Invia il job
Esegui il seguente comando gcloud
per inviare il job di conteggio parole al
cluster Dataproc.
Java
gcloud dataproc jobs submit spark \ --cluster=${CLUSTER} \ --class=dataproc.codelab.WordCount \ --jars=gs://${BUCKET_NAME}/java/word-count-1.0.jar \ --region=${REGION} \ -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/
Scala
gcloud dataproc jobs submit spark \ --cluster=${CLUSTER} \ --class=dataproc.codelab.WordCount \ --jars=gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar \ --region=${REGION} \ -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/
Python
gcloud dataproc jobs submit pyspark word-count.py \ --cluster=${CLUSTER} \ --region=${REGION} \ -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/
Visualizza l'output
Al termine del job, esegui il seguente comando gcloud CLI per visualizzare l'output del conteggio delle parole.
gcloud storage cat gs://${BUCKET_NAME}/output/*
L'output di conteggio parole dovrebbe essere simile al seguente:
(a,2) (call,1) (What's,1) (sweet.,1) (we,1) (as,1) (name?,1) (any,1) (other,1) (rose,1) (smell,1) (name,1) (would,1) (in,1) (which,1) (That,1) (By,1)
Esegui la pulizia
Al termine del tutorial, puoi eliminare le risorse che hai creato in modo che smettano di utilizzare la quota e di generare addebiti. Le sezioni seguenti descrivono come eliminare o disattivare queste risorse.
Elimina il progetto
Il modo più semplice per eliminare la fatturazione è eliminare il progetto che hai creato per il tutorial.
Per eliminare il progetto:
- In the Google Cloud console, go to the Manage resources page.
- In the project list, select the project that you want to delete, and then click Delete.
- In the dialog, type the project ID, and then click Shut down to delete the project.
Eliminazione del cluster Dataproc
Anziché eliminare il progetto, ti consigliamo di eliminare solo il cluster al suo interno.
Eliminazione del bucket Cloud Storage
Console Google Cloud
- In the Google Cloud console, go to the Cloud Storage Buckets page.
- Click the checkbox for the bucket that you want to delete.
- To delete the bucket, click Delete, and then follow the instructions.
Riga di comando
-
Elimina il bucket:
gcloud storage buckets delete BUCKET_NAME