Usar o conector do Cloud Storage com o Apache Spark

Este tutorial mostra como executar um código de exemplo que usa o conector do Cloud Storage com o Apache Spark.

Objetivos

Escreva um job simples de contagem de palavras do Spark em Java, Scala ou Python e execute-o em um cluster do Dataproc.

Custos

Neste tutorial, usamos os seguintes componentes faturáveis do Google Cloud:

  • Compute Engine
  • Dataproc
  • Cloud Storage

Para gerar uma estimativa de custo baseada na projeção de uso deste tutorial, use a calculadora de preços. Novos usuários do Google Cloud podem estar qualificados para uma avaliação gratuita.

Antes de começar

Execute as etapas abaixo para se preparar para executar o código neste tutorial.

  1. Criar o projeto. Se necessário, configure um projeto com as APIs Dataproc, Compute Engine e Cloud Storage ativadas e o SDK do Cloud instalado na máquina local.

    1. Faça login na sua conta do Google Cloud. Se você começou a usar o Google Cloud agora, crie uma conta para avaliar o desempenho de nossos produtos em situações reais. Clientes novos também recebem US$ 300 em créditos para executar, testar e implantar cargas de trabalho.
    2. No Console do Google Cloud, na página do seletor de projetos, selecione ou crie um projeto do Google Cloud.

      Acessar o seletor de projetos

    3. Verifique se o faturamento está ativado para seu projeto na nuvem. Saiba como confirmar se o faturamento está ativado para o projeto.

    4. Ative as APIs Dataproc, Compute Engine, and Cloud Storage.

      Ative as APIs

    5. Crie uma conta de serviço:

      1. No Console do Cloud, acesse a página Criar conta de serviço.

        Acesse Criar conta de serviço
      2. Selecione um projeto.
      3. No campo Nome da conta de serviço, insira um nome. O Console do Cloud preenche o campo ID da conta de serviço com base nesse nome.

        No campo Descrição da conta de serviço, insira uma descrição. Por exemplo, Service account for quickstart.

      4. Clique em Criar e continuar.
      5. Clique no campo Selecionar um papel.

        Em Acesso rápido, clique em Básico e em Proprietário.

      6. Clique em Continuar.
      7. Clique em Concluído para terminar a criação da conta de serviço.

        Não feche a janela do navegador. Você vai usá-lo na próxima etapa.

    6. Crie uma chave de conta de serviço:

      1. No Console do Cloud, clique no endereço de e-mail da conta de serviço que você criou.
      2. Clique em Chaves.
      3. Clique em Adicionar chave e em Criar nova chave.
      4. Clique em Criar. O download de um arquivo de chave JSON é feito no seu computador.
      5. Clique em Fechar.
    7. Defina a variável de ambiente GOOGLE_APPLICATION_CREDENTIALS como o caminho do arquivo JSON que contém a chave da conta de serviço. Essa variável só se aplica à sessão de shell atual. Dessa maneira, se você abrir uma nova sessão, defina a variável novamente.

    8. Instale e inicialize o SDK do Cloud..
    9. No Console do Google Cloud, na página do seletor de projetos, selecione ou crie um projeto do Google Cloud.

      Acessar o seletor de projetos

    10. Verifique se o faturamento está ativado para seu projeto na nuvem. Saiba como confirmar se o faturamento está ativado para o projeto.

    11. Ative as APIs Dataproc, Compute Engine, and Cloud Storage.

      Ative as APIs

    12. Crie uma conta de serviço:

      1. No Console do Cloud, acesse a página Criar conta de serviço.

        Acesse Criar conta de serviço
      2. Selecione um projeto.
      3. No campo Nome da conta de serviço, insira um nome. O Console do Cloud preenche o campo ID da conta de serviço com base nesse nome.

        No campo Descrição da conta de serviço, insira uma descrição. Por exemplo, Service account for quickstart.

      4. Clique em Criar e continuar.
      5. Clique no campo Selecionar um papel.

        Em Acesso rápido, clique em Básico e em Proprietário.

      6. Clique em Continuar.
      7. Clique em Concluído para terminar a criação da conta de serviço.

        Não feche a janela do navegador. Você vai usá-lo na próxima etapa.

    13. Crie uma chave de conta de serviço:

      1. No Console do Cloud, clique no endereço de e-mail da conta de serviço que você criou.
      2. Clique em Chaves.
      3. Clique em Adicionar chave e em Criar nova chave.
      4. Clique em Criar. O download de um arquivo de chave JSON é feito no seu computador.
      5. Clique em Fechar.
    14. Defina a variável de ambiente GOOGLE_APPLICATION_CREDENTIALS como o caminho do arquivo JSON que contém a chave da conta de serviço. Essa variável só se aplica à sessão de shell atual. Dessa maneira, se você abrir uma nova sessão, defina a variável novamente.

    15. Instale e inicialize o SDK do Cloud..

  2. Criar um bucket do Cloud Storage Você precisa do Cloud Storage para armazenar os dados do tutorial. Se você não tiver um pronto para usá-lo, crie um novo bucket no projeto.

    1. No Console do Cloud, acesse a página Navegador do Cloud Storage.

      Acessar o navegador

    2. Clique em Criar bucket.
    3. Na página Criar um bucket, insira as informações do seu bucket. Para ir à próxima etapa, clique em Continuar.
    4. Clique em Criar.

  3. Defina variáveis de ambiente locais. Defina variáveis de ambiente na máquina local. Defina o ID do projeto do Google Cloud e o nome do bucket do Cloud Storage que você usará neste tutorial. Forneça também o nome e a região de um cluster novo ou existente do Dataproc. Você pode criar um cluster para usar neste tutorial na próxima etapa.

    PROJECT=project-id
    
    BUCKET_NAME=bucket-name
    
    CLUSTER=cluster-name
    
    REGION=cluster-region Example: "us-central1"
    

  4. Criar um cluster de Dataproc. Execute o comando abaixo para criar um cluster do Dataproc de nó único na zona do Compute Engine especificada.

    gcloud dataproc clusters create ${CLUSTER} \
        --project=${PROJECT} \
        --region=${REGION} \
        --single-node
    

  5. Copie dados públicos para o bucket do Cloud Storage. Copie um snippet de um texto de Shakespeare de domínio público para a pasta input do bucket do Cloud Storage:

    gsutil cp gs://pub/shakespeare/rose.txt \
        gs://${BUCKET_NAME}/input/rose.txt
    

  6. Configure um ambiente de desenvolvimento Java (Apache Maven), Scala (SBT) ou Python.

Preparar o job de contagem de palavras do Spark

Selecione uma guia abaixo para seguir as etapas e preparar um pacote ou arquivo de job para enviar ao cluster. Você pode preparar um dos seguintes tipos de job:

Java

  1. Copie o arquivo pom.xml para sua máquina local. O arquivo pom.xml a seguir especifica as dependências da biblioteca Scala e Spark, que recebem um escopo provided para indicar que o cluster do Dataproc fornecerá essas bibliotecas no ambiente de execução. O arquivo pom.xml não especifica uma dependência do Cloud Storage porque o conector implementa a interface HDFS padrão. Quando um job do Spark acessa arquivos de cluster do Cloud Storage (arquivos com URIs que começam com gs:// ), o sistema usa automaticamente o conector do Cloud Storage para acessar os arquivos no Cloud Storage
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
    
      <groupId>dataproc.codelab</groupId>
      <artifactId>word-count</artifactId>
      <version>1.0</version>
    
      <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
      </properties>
    
      <dependencies>
        <dependency>
          <groupId>org.scala-lang</groupId>
          <artifactId>scala-library</artifactId>
          <version>Scala version, for example, 2.11.8</version>
          <scope>provided</scope>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_Scala major.minor.version, for example, 2.11</artifactId>
          <version>Spark version, for example, 2.3.1</version>
          <scope>provided</scope>
        </dependency>
      </dependencies>
    </project>
    
  2. Copie o código WordCount.java listado abaixo para sua máquina local.
    1. Crie um conjunto de diretórios com o caminho src/main/java/dataproc/codelab:
      mkdir -p src/main/java/dataproc/codelab
      
    2. Copie WordCount.java para sua máquina local em src/main/java/dataproc/codelab:
      cp WordCount.java src/main/java/dataproc/codelab
      

    O WordCount.java é um job simples do Spark em Java que lê arquivos de texto do Cloud Storage, faz a contagem de palavras e grava os resultados em um arquivo de texto no Cloud Storage.

    package dataproc.codelab;
    
    import java.util.Arrays;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import scala.Tuple2;
    
    public class WordCount {
      public static void main(String[] args) {
        if (args.length != 2) {
          throw new IllegalArgumentException("Exactly 2 arguments are required: <inputUri> <outputUri>");
        }
        String inputPath = args[0];
        String outputPath = args[1];
        JavaSparkContext sparkContext = new JavaSparkContext(new SparkConf().setAppName("Word Count"));
        JavaRDD<String> lines = sparkContext.textFile(inputPath);
        JavaRDD<String> words = lines.flatMap(
            (String line) -> Arrays.asList(line.split(" ")).iterator()
        );
        JavaPairRDD<String, Integer> wordCounts = words.mapToPair(
            (String word) -> new Tuple2<>(word, 1)
        ).reduceByKey(
            (Integer count1, Integer count2) -> count1 + count2
        );
        wordCounts.saveAsTextFile(outputPath);
      }
    }
    
  3. Criar o pacote.
    mvn clean package
    
    Se a compilação for bem-sucedida, um target/spark-with-gcs-1.0-SNAPSHOT.jar será criado.
  4. Prepare o pacote para o Cloud Storage.
    gsutil cp target/word-count-1.0.jar \
        gs://${BUCKET_NAME}/java/word-count-1.0.jar
    

Scala

  1. Copie o arquivo build.sbt para sua máquina local. O arquivo build.sbt a seguir especifica as dependências da biblioteca Scala e Spark, que recebem um escopo provided para indicar que o cluster do Dataproc fornecerá essas bibliotecas no ambiente de execução. O arquivo build.sbt não especifica uma dependência do Cloud Storage porque o conector implementa a interface HDFS padrão. Quando um job do Spark acessa arquivos de cluster do Cloud Storage (arquivos com URIs que começam com gs:// ), o sistema usa automaticamente o conector do Cloud Storage para acessar os arquivos no Cloud Storage
    scalaVersion := "Scala version, for example, 2.11.8"
    
    name := "word-count"
    organization := "dataproc.codelab"
    version := "1.0"
    
    libraryDependencies ++= Seq(
      "org.scala-lang" % "scala-library" % scalaVersion.value % "provided",
      "org.apache.spark" %% "spark-core" % "Spark version, for example, 2.3.1" % "provided"
    )
    
    
  2. Copie word-count.scala para sua máquina local. Ele é um job simples do Spark em Java que lê arquivos de texto do Cloud Storage, faz a contagem de palavras e grava os resultados em um arquivo de texto no Cloud Storage.
    package dataproc.codelab
    
    import org.apache.spark.SparkContext
    import org.apache.spark.SparkConf
    
    object WordCount {
      def main(args: Array[String]) {
        if (args.length != 2) {
          throw new IllegalArgumentException(
              "Exactly 2 arguments are required: <inputPath> <outputPath>")
        }
    
        val inputPath = args(0)
        val outputPath = args(1)
    
        val sc = new SparkContext(new SparkConf().setAppName("Word Count"))
        val lines = sc.textFile(inputPath)
        val words = lines.flatMap(line => line.split(" "))
        val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
        wordCounts.saveAsTextFile(outputPath)
      }
    }
    
    
  3. Criar o pacote.
    sbt clean package
    
    Se a compilação for bem-sucedida, um target/scala-2.11/word-count_2.11-1.0.jar será criado.
  4. Prepare o pacote para o Cloud Storage.
    gsutil cp target/scala-2.11/word-count_2.11-1.0.jar \
        gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar
    

Python

  1. Copie word-count.py para sua máquina local. Ele é um job simples do Spark em Python usando PySpark que lê arquivos de texto do Cloud Storage, faz a contagem de palavras e grava os resultados em um arquivo de texto no Cloud Storage.
    #!/usr/bin/env python
    
    import pyspark
    import sys
    
    if len(sys.argv) != 3:
      raise Exception("Exactly 2 arguments are required: <inputUri> <outputUri>")
    
    inputUri=sys.argv[1]
    outputUri=sys.argv[2]
    
    sc = pyspark.SparkContext()
    lines = sc.textFile(sys.argv[1])
    words = lines.flatMap(lambda line: line.split())
    wordCounts = words.map(lambda word: (word, 1)).reduceByKey(lambda count1, count2: count1 + count2)
    wordCounts.saveAsTextFile(sys.argv[2])
    

Enviar o job

Execute o comando gcloud a seguir para enviar o job de contagem de palavras ao cluster do Dataproc.

Java

gcloud dataproc jobs submit spark \
    --cluster=${CLUSTER} \
    --class=dataproc.codelab.WordCount \
    --jars=gs://${BUCKET_NAME}/java/word-count-1.0.jar \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

Scala

gcloud dataproc jobs submit spark \
    --cluster=${CLUSTER} \
    --class=dataproc.codelab.WordCount \
    --jars=gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

Python

gcloud dataproc jobs submit pyspark word-count.py \
    --cluster=${CLUSTER} \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

Veja o resultado

Após a conclusão do job, execute o comando gsutil do SDK do Cloud a seguir para visualizar a saída de contagem de palavras.

gsutil cat gs://${BUCKET_NAME}/output/*

O resultado da contagem de palavras deve ser semelhante a este:

(a,2)
(call,1)
(What's,1)
(sweet.,1)
(we,1)
(as,1)
(name?,1)
(any,1)
(other,1)
(rose,1)
(smell,1)
(name,1)
(would,1)
(in,1)
(which,1)
(That,1)
(By,1)

Limpar

Depois de concluir o tutorial, você pode limpar os recursos que criou para que eles parem de usar a cota e gerar cobranças. Nas seções a seguir, você aprenderá a excluir e desativar esses recursos.

Excluir o projeto

O jeito mais fácil de evitar cobranças é excluindo o projeto que você criou para o tutorial.

Para excluir o projeto:

  1. No Console do Cloud, acesse a página Gerenciar recursos:

    Acessar "Gerenciar recursos"

  2. Na lista de projetos, selecione o projeto que você quer excluir e clique em Excluir .
  3. Na caixa de diálogo, digite o ID do projeto e clique em Encerrar para excluí-lo.

Como excluir o cluster do Dataproc

Em vez de excluir o projeto, convém excluir o cluster dentro do projeto.

Como excluir o bucket do Cloud Storage

Cloud Console

  1. No Console do Cloud, acesse a página Navegador do Cloud Storage.

    Acessar o navegador

  2. Clique na caixa de seleção do bucket que você quer excluir.
  3. Para excluir o bucket, clique em Excluir e siga as instruções.

Linha de comando

    Excluir o bucket:
    gsutil rb BUCKET_NAME

A seguir