Objetivos
Escreva um job simples de contagem de palavras do Spark em Java, Scala ou Python e execute-o em um cluster do Dataproc.
Custos
Neste tutorial, há componentes faturáveis do Google Cloud, entre eles:- Compute Engine
- Dataproc
- Cloud Storage
Use a Calculadora de preços para gerar uma estimativa de custo com base no uso previsto. Usuários novos do Cloud Platform podem ter direito a uma avaliação gratuita.
Antes de começar
Execute as etapas abaixo para se preparar para executar o código neste tutorial.
Criar o projeto. Se necessário, configure um projeto com as APIs Dataproc, Compute Engine e Cloud Storage ativadas e o SDK do Cloud instalado na máquina local.
-
Faça login na sua conta do Google.
Se você ainda não tiver uma, inscreva-se.
-
No Console do Google Cloud, na página do seletor de projetos, selecione ou crie um projeto do Google Cloud.
-
Verifique se o faturamento está ativado para seu projeto na nuvem. Saiba como confirmar se o faturamento está ativado para o projeto.
- Ative as APIs Dataproc, Compute Engine, and Cloud Storage.
- Instale e inicialize o SDK do Cloud..
-
Faça login na sua conta do Google.
Criar um bucket do Cloud Storage Você precisa do Cloud Storage para armazenar os dados do tutorial. Se você não tiver um pronto para usá-lo, crie um novo bucket no projeto.
- No Console do Cloud, acesse a página Navegador do Cloud Storage.
- Clique em Criar bucket.
- Na caixa de diálogo Criar bucket, especifique os seguintes atributos.
- Nome exclusivo do bucket, sujeito aos requisitos de nome de bucket.
- Uma classe de armazenamento.
- Um local onde os dados do bucket serão armazenados.
- Clique em Criar.
Defina variáveis de ambiente locais. Defina variáveis de ambiente na máquina local. Defina o ID do projeto do Google Cloud e o nome do bucket do Cloud Storage que você usará neste tutorial. Forneça também o nome e a região de um cluster novo ou existente do Dataproc. Você pode criar um cluster para usar neste tutorial na próxima etapa.
PROJECT=project-id
BUCKET_NAME=bucket-name
CLUSTER=cluster-name
REGION=cluster-region Example: "us-central1"
Criar um cluster de Dataproc. Execute o comando abaixo para criar um cluster do Dataproc de nó único na zona do Compute Engine especificada.
gcloud dataproc clusters create ${CLUSTER} \ --project=${PROJECT} \ --region=${REGION} \ --single-node
Copie dados públicos para o bucket do Cloud Storage. Copie um snippet de um texto de Shakespeare de domínio público para a pasta
input
do bucket do Cloud Storage:gsutil cp gs://pub/shakespeare/rose.txt \ gs://${BUCKET_NAME}/input/rose.txt
Configure um ambiente de desenvolvimento Java (Apache Maven), Scala (SBT) ou Python.
Preparar o job de contagem de palavras do Spark
Selecione uma guia abaixo para seguir as etapas e preparar um pacote ou arquivo de job para enviar ao cluster. Você pode preparar um dos seguintes tipos de job:
- Job Spark em Java usando o Apache Maven para criar um pacote JAR
- Job do Spark no Scala usando SBT para criar um pacote JAR
- Job do Spark em Python (PySpark)
Java
- Copie o arquivo
pom.xml
para sua máquina local. O arquivopom.xml
a seguir especifica as dependências da biblioteca Scala e Spark, que recebem um escopoprovided
para indicar que o cluster do Dataproc fornecerá essas bibliotecas no ambiente de execução. O arquivopom.xml
não especifica uma dependência do Cloud Storage porque o conector implementa a interface HDFS padrão. Quando um job do Spark acessa arquivos de cluster do Cloud Storage (arquivos com URIs que começam comgs://
), o sistema usa automaticamente o conector do Cloud Storage para acessar os arquivos no Cloud Storage<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>dataproc.codelab</groupId> <artifactId>word-count</artifactId> <version>1.0</version> <properties> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>Scala version, for example,
2.11.8
</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_Scala major.minor.version, for example,2.11
</artifactId> <version>Spark version, for example,2.3.1
</version> <scope>provided</scope> </dependency> </dependencies> </project> - Copie o código
WordCount.java
listado abaixo para sua máquina local.- Crie um conjunto de diretórios com o caminho
src/main/java/dataproc/codelab
:mkdir -p src/main/java/dataproc/codelab
- Copie
WordCount.java
para sua máquina local emsrc/main/java/dataproc/codelab
:cp WordCount.java src/main/java/dataproc/codelab
O WordCount.java é um job simples do Spark em Java que lê arquivos de texto do Cloud Storage, faz a contagem de palavras e grava os resultados em um arquivo de texto no Cloud Storage.
package dataproc.codelab; import java.util.Arrays; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import scala.Tuple2; public class WordCount { public static void main(String[] args) { if (args.length != 2) { throw new IllegalArgumentException("Exactly 2 arguments are required: <inputUri> <outputUri>"); } String inputPath = args[0]; String outputPath = args[1]; JavaSparkContext sparkContext = new JavaSparkContext(new SparkConf().setAppName("Word Count")); JavaRDD<String> lines = sparkContext.textFile(inputPath); JavaRDD<String> words = lines.flatMap( (String line) -> Arrays.asList(line.split(" ")).iterator() ); JavaPairRDD<String, Integer> wordCounts = words.mapToPair( (String word) -> new Tuple2<>(word, 1) ).reduceByKey( (Integer count1, Integer count2) -> count1 + count2 ); wordCounts.saveAsTextFile(outputPath); } }
- Crie um conjunto de diretórios com o caminho
- Criar o pacote.
mvn clean package
Se a compilação for bem-sucedida, umtarget/spark-with-gcs-1.0-SNAPSHOT.jar
será criado. - Prepare o pacote para o Cloud Storage.
gsutil cp target/word-count-1.0.jar \ gs://${BUCKET_NAME}/java/word-count-1.0.jar
Scala
- Copie o arquivo
build.sbt
para sua máquina local. O arquivobuild.sbt
a seguir especifica as dependências da biblioteca Scala e Spark, que recebem um escopoprovided
para indicar que o cluster do Dataproc fornecerá essas bibliotecas no ambiente de execução. O arquivobuild.sbt
não especifica uma dependência do Cloud Storage porque o conector implementa a interface HDFS padrão. Quando um job do Spark acessa arquivos de cluster do Cloud Storage (arquivos com URIs que começam comgs://
), o sistema usa automaticamente o conector do Cloud Storage para acessar os arquivos no Cloud StoragescalaVersion := "Scala version, for example,
2.11.8
" name := "word-count" organization := "dataproc.codelab" version := "1.0" libraryDependencies ++= Seq( "org.scala-lang" % "scala-library" % scalaVersion.value % "provided", "org.apache.spark" %% "spark-core" % "Spark version, for example,2.3.1
" % "provided" ) - Copie
word-count.scala
para sua máquina local. Ele é um job simples do Spark em Java que lê arquivos de texto do Cloud Storage, faz a contagem de palavras e grava os resultados em um arquivo de texto no Cloud Storage.package dataproc.codelab import org.apache.spark.SparkContext import org.apache.spark.SparkConf object WordCount { def main(args: Array[String]) { if (args.length != 2) { throw new IllegalArgumentException( "Exactly 2 arguments are required: <inputPath> <outputPath>") } val inputPath = args(0) val outputPath = args(1) val sc = new SparkContext(new SparkConf().setAppName("Word Count")) val lines = sc.textFile(inputPath) val words = lines.flatMap(line => line.split(" ")) val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _) wordCounts.saveAsTextFile(outputPath) } }
- Criar o pacote.
sbt clean package
Se a compilação for bem-sucedida, umtarget/scala-2.11/word-count_2.11-1.0.jar
será criado. - Prepare o pacote para o Cloud Storage.
gsutil cp target/scala-2.11/word-count_2.11-1.0.jar \ gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar
Python
- Copie
word-count.py
para sua máquina local. Ele é um job simples do Spark em Python usando PySpark que lê arquivos de texto do Cloud Storage, faz a contagem de palavras e grava os resultados em um arquivo de texto no Cloud Storage.#!/usr/bin/env python import pyspark import sys if len(sys.argv) != 3: raise Exception("Exactly 2 arguments are required: <inputUri> <outputUri>") inputUri=sys.argv[1] outputUri=sys.argv[2] sc = pyspark.SparkContext() lines = sc.textFile(sys.argv[1]) words = lines.flatMap(lambda line: line.split()) wordCounts = words.map(lambda word: (word, 1)).reduceByKey(lambda count1, count2: count1 + count2) wordCounts.saveAsTextFile(sys.argv[2])
Enviar o job
Execute o comando gcloud
a seguir para enviar o job de contagem de palavras ao cluster do Dataproc.
Java
gcloud dataproc jobs submit spark \ --cluster=${CLUSTER} \ --class=dataproc.codelab.WordCount \ --jars=gs://${BUCKET_NAME}/java/word-count-1.0.jar \ --region=${REGION} \ -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/
Scala
gcloud dataproc jobs submit spark \ --cluster=${CLUSTER} \ --class=dataproc.codelab.WordCount \ --jars=gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar \ --region=${REGION} \ -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/
Python
gcloud dataproc jobs submit pyspark word-count.py \ --cluster=${CLUSTER} \ --region=${REGION} \ -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/
Veja o resultado
Após a conclusão do job, execute o comando gsutil
do SDK do Cloud a seguir para visualizar a saída de contagem de palavras.
gsutil cat gs://${BUCKET_NAME}/output/*
O resultado da contagem de palavras deve ser semelhante a este:
(a,2) (call,1) (What's,1) (sweet.,1) (we,1) (as,1) (name?,1) (any,1) (other,1) (rose,1) (smell,1) (name,1) (would,1) (in,1) (which,1) (That,1) (By,1)
Como fazer a limpeza
Depois de concluir o tutorial "Usar o Dataproc", limpe os recursos criados no Google Cloud para que eles não ocupem cota, e eles não serão cobrados no futuro. Veja como excluir e desativar esses recursos nas seções a seguir.
Como excluir o projeto
O jeito mais fácil de evitar cobranças é excluindo o projeto que você criou para o tutorial.
Para excluir o projeto:
- No Console do Cloud, acesse a página Gerenciar recursos:
- Na lista de projetos, selecione o projeto que você quer excluir e clique em Excluir .
- Na caixa de diálogo, digite o ID do projeto e clique em Encerrar para excluí-lo.
Como excluir o cluster do Dataproc
Em vez de excluir o projeto, convém excluir o cluster dentro do projeto.
Como excluir o bucket do Cloud Storage
Cloud Console
- No Console do Cloud, acesse a página Navegador do Cloud Storage.
- Clique na caixa de seleção do bucket que você quer excluir.
- Para excluir o bucket, clique em Excluir delete.
Linha de comando
-
Excluir o bucket:
gsutil rb [BUCKET_NAME]