Usar o conector do Cloud Storage com o Apache Spark

Este tutorial mostra como executar um código de exemplo que usa o conector do Cloud Storage com o Apache Spark.

Objetivos

Escreva um job simples de contagem de palavras do Spark em Java, Scala ou Python e execute-o em um cluster do Dataproc.

Custos

Neste tutorial, há componentes faturáveis do Google Cloud, entre eles:

  • Compute Engine
  • Dataproc
  • Cloud Storage

Use a Calculadora de preços para gerar uma estimativa de custo com base no uso previsto. Usuários novos do Cloud Platform podem ter direito a uma avaliação gratuita.

Antes de começar

Execute as etapas abaixo para se preparar para executar o código neste tutorial.

  1. Criar o projeto. Se necessário, configure um projeto com as APIs Dataproc, Compute Engine e Cloud Storage ativadas e o SDK do Cloud instalado na máquina local.

    1. Faça login na sua conta do Google.

      Se você ainda não tiver uma, inscreva-se.

    2. No Console do Cloud, na página do seletor de projetos, selecione ou crie um projeto do Cloud.

      Acessar a página do seletor de projetos

    3. Verifique se a cobrança está ativada para o seu projeto do Google Cloud. Saiba como confirmar se a cobrança está ativada para o seu projeto.

    4. Ative as APIs Dataproc, Compute Engine, and Cloud Storage.

      Ative as APIs

    5. Instale e inicialize o SDK do Cloud..

  2. Criar um bucket do Cloud Storage Você precisa do Cloud Storage para armazenar os dados do tutorial. Se você não tiver um pronto para usá-lo, crie um novo bucket no projeto.

    1. No Console do Cloud, acesse a página Navegador do Cloud Storage.

      Acessar a página "Navegador do Cloud Storage"

    2. Clique em Criar bucket.
    3. Na caixa de diálogo Criar bucket, especifique os seguintes atributos.
    4. Clique em Criar

  3. Defina variáveis de ambiente locais. Defina variáveis de ambiente na máquina local. Defina o ID do projeto do Google Cloud e o nome do bucket do Cloud Storage que você usará neste tutorial. Forneça também o nome e a região de um cluster novo ou existente do Dataproc. Você pode criar um cluster para usar neste tutorial na próxima etapa.

    PROJECT=project-id
    
    BUCKET_NAME=bucket-name
    
    CLUSTER=cluster-name
    
    REGION=cluster-region Example: "us-central1"
    

  4. Criar um cluster de Dataproc. Execute o comando abaixo para criar um cluster do Dataproc de nó único na zona do Compute Engine especificada.

    gcloud dataproc clusters create ${CLUSTER} \
        --project=${PROJECT} \
        --region=${REGION} \
        --single-node
    

  5. Copie dados públicos para o bucket do Cloud Storage. Copie um snippet de um texto de Shakespeare de domínio público para a pasta input do bucket do Cloud Storage:

    gsutil cp gs://pub/shakespeare/rose.txt \
        gs://${BUCKET_NAME}/input/rose.txt
    

  6. Configure um ambiente de desenvolvimento Java (Apache Maven), Scala (SBT) ou Python.

Preparar o job de contagem de palavras do Spark

Selecione uma guia abaixo para seguir as etapas e preparar um pacote ou arquivo de job para enviar ao cluster. Você pode preparar um dos seguintes tipos de job:

Java

  1. Copie o arquivo pom.xml para sua máquina local. O arquivo pom.xml a seguir especifica as dependências da biblioteca Scala e Spark, que recebem um escopo provided para indicar que o cluster do Dataproc fornecerá essas bibliotecas no ambiente de execução. O arquivo pom.xml não especifica uma dependência do Cloud Storage porque o conector implementa a interface HDFS padrão. Quando um job do Spark acessa arquivos de cluster do Cloud Storage (arquivos com URIs que começam com gs:// ), o sistema usa automaticamente o conector do Cloud Storage para acessar os arquivos no Cloud Storage
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
    
      <groupId>dataproc.codelab</groupId>
      <artifactId>word-count</artifactId>
      <version>1.0</version>
    
      <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
      </properties>
    
      <dependencies>
        <dependency>
          <groupId>org.scala-lang</groupId>
          <artifactId>scala-library</artifactId>
          <version>Scala version, for example, 2.11.8</version>
          <scope>provided</scope>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_Scala major.minor.version, for example, 2.11</artifactId>
          <version>Spark version, for example, 2.3.1</version>
          <scope>provided</scope>
        </dependency>
      </dependencies>
    </project>
    
  2. Copie o código WordCount.java listado abaixo para sua máquina local.
    1. Crie um conjunto de diretórios com o caminho src/main/java/dataproc/codelab:
      mkdir -p src/main/java/dataproc/codelab
      
    2. Copie WordCount.java para sua máquina local em src/main/java/dataproc/codelab:
      cp WordCount.java src/main/java/dataproc/codelab
      

    O WordCount.java é um job simples do Spark em Java que lê arquivos de texto do Cloud Storage, faz a contagem de palavras e grava os resultados em um arquivo de texto no Cloud Storage.

    package dataproc.codelab;
    
    import java.util.Arrays;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import scala.Tuple2;
    
    public class WordCount {
      public static void main(String[] args) {
        if (args.length != 2) {
          throw new IllegalArgumentException("Exactly 2 arguments are required: <inputUri> <outputUri>");
        }
        String inputPath = args[0];
        String outputPath = args[1];
        JavaSparkContext sparkContext = new JavaSparkContext(new SparkConf().setAppName("Word Count"));
        JavaRDD<String> lines = sparkContext.textFile(inputPath);
        JavaRDD<String> words = lines.flatMap(
            (String line) -> Arrays.asList(line.split(" ")).iterator()
        );
        JavaPairRDD<String, Integer> wordCounts = words.mapToPair(
            (String word) -> new Tuple2<>(word, 1)
        ).reduceByKey(
            (Integer count1, Integer count2) -> count1 + count2
        );
        wordCounts.saveAsTextFile(outputPath);
      }
    }
    
  3. Criar o pacote.
    mvn clean package
    
    Se a compilação for bem-sucedida, um target/spark-with-gcs-1.0-SNAPSHOT.jar será criado.
  4. Prepare o pacote para o Cloud Storage.
    gsutil cp target/word-count-1.0.jar \
        gs://${BUCKET_NAME}/java/word-count-1.0.jar
    

Scala

  1. Copie o arquivo build.sbt para sua máquina local. O arquivo build.sbt a seguir especifica as dependências da biblioteca Scala e Spark, que recebem um escopo provided para indicar que o cluster do Dataproc fornecerá essas bibliotecas no ambiente de execução. O arquivo build.sbt não especifica uma dependência do Cloud Storage porque o conector implementa a interface HDFS padrão. Quando um job do Spark acessa arquivos de cluster do Cloud Storage (arquivos com URIs que começam com gs:// ), o sistema usa automaticamente o conector do Cloud Storage para acessar os arquivos no Cloud Storage
    scalaVersion := "Scala version, for example, 2.11.8"
    
    name := "word-count"
    organization := "dataproc.codelab"
    version := "1.0"
    
    libraryDependencies ++= Seq(
      "org.scala-lang" % "scala-library" % scalaVersion.value % "provided",
      "org.apache.spark" %% "spark-core" % "Spark version, for example, 2.3.1" % "provided"
    )
    
    
  2. Copie word-count.scala para sua máquina local. Ele é um job simples do Spark em Java que lê arquivos de texto do Cloud Storage, faz a contagem de palavras e grava os resultados em um arquivo de texto no Cloud Storage.
    package dataproc.codelab
    
    import org.apache.spark.SparkContext
    import org.apache.spark.SparkConf
    
    object WordCount {
      def main(args: Array[String]) {
        if (args.length != 2) {
          throw new IllegalArgumentException(
              "Exactly 2 arguments are required: <inputPath> <outputPath>")
        }
    
        val inputPath = args(0)
        val outputPath = args(1)
    
        val sc = new SparkContext(new SparkConf().setAppName("Word Count"))
        val lines = sc.textFile(inputPath)
        val words = lines.flatMap(line => line.split(" "))
        val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
        wordCounts.saveAsTextFile(outputPath)
      }
    }
    
    
  3. Criar o pacote.
    sbt clean package
    
    Se a compilação for bem-sucedida, um target/scala-2.11/word-count_2.11-1.0.jar será criado.
  4. Prepare o pacote para o Cloud Storage.
    gsutil cp target/scala-2.11/word-count_2.11-1.0.jar \
        gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar
    

Python

  1. Copie word-count.py para sua máquina local. Ele é um job simples do Spark em Python usando PySpark que lê arquivos de texto do Cloud Storage, faz a contagem de palavras e grava os resultados em um arquivo de texto no Cloud Storage.
    #!/usr/bin/env python
    
    import pyspark
    import sys
    
    if len(sys.argv) != 3:
      raise Exception("Exactly 2 arguments are required: <inputUri> <outputUri>")
    
    inputUri=sys.argv[1]
    outputUri=sys.argv[2]
    
    sc = pyspark.SparkContext()
    lines = sc.textFile(sys.argv[1])
    words = lines.flatMap(lambda line: line.split())
    wordCounts = words.map(lambda word: (word, 1)).reduceByKey(lambda count1, count2: count1 + count2)
    wordCounts.saveAsTextFile(sys.argv[2])
    

Enviar o job

Execute o comando gcloud a seguir para enviar o job de contagem de palavras ao cluster do Dataproc.

Java

gcloud dataproc jobs submit spark \
    --cluster=${CLUSTER} \
    --class=dataproc.codelab.WordCount \
    --jars=gs://${BUCKET_NAME}/java/word-count-1.0.jar \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

Scala

gcloud dataproc jobs submit spark \
    --cluster=${CLUSTER} \
    --class=dataproc.codelab.WordCount \
    --jars=gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

Python

gcloud dataproc jobs submit pyspark word-count.py \
    --cluster=${CLUSTER} \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

Veja o resultado

Após a conclusão do job, execute o comando gsutil do SDK do Cloud a seguir para visualizar a saída de contagem de palavras.

gsutil cat gs://${BUCKET_NAME}/output/*

O resultado da contagem de palavras deve ser semelhante a este:

(a,2)
(call,1)
(What's,1)
(sweet.,1)
(we,1)
(as,1)
(name?,1)
(any,1)
(other,1)
(rose,1)
(smell,1)
(name,1)
(would,1)
(in,1)
(which,1)
(That,1)
(By,1)

Como fazer a limpeza

Depois de concluir o tutorial "Usar o Dataproc", limpe os recursos criados no Google Cloud para que eles não ocupem cota, e eles não serão cobrados no futuro. Veja como excluir e desativar esses recursos nas seções a seguir.

Como excluir o projeto

O jeito mais fácil de evitar cobranças é excluindo o projeto que você criou para o tutorial.

Para excluir o projeto:

  1. No Console do Cloud, acesse a página Gerenciar recursos:

    Acessar a página "Gerenciar recursos"

  2. Na lista de projetos, selecione o projeto que você quer excluir e clique em Excluir .
  3. Na caixa de diálogo, digite o ID do projeto e clique em Encerrar para excluí-lo.

Como excluir o cluster do Dataproc

Em vez de excluir o projeto, convém excluir o cluster dentro do projeto.

Como excluir o bucket do Cloud Storage

Cloud Console

  1. No Console do Cloud, acesse a página Navegador do Cloud Storage.

    Acesse a página "Navegador do Cloud Storage"

  2. Clique na caixa de seleção do bucket que você quer excluir.
  3. Para excluir o bucket, clique em Excluir .

Linha de comando

    Excluir o bucket:
    gsutil rb [BUCKET_NAME]