Usar o conector do Cloud Storage com o Apache Spark


Neste tutorial, mostramos como executar um código de exemplo que usa o conector do Cloud Storage com o Apache Spark.

Objetivos

Escreva um job simples de contagem de palavras do Spark em Java, Scala ou Python e execute-o em um cluster do Dataproc.

Custos

Neste documento, você usará os seguintes componentes faturáveis do Google Cloud:

  • Compute Engine
  • Dataproc
  • Cloud Storage

Para gerar uma estimativa de custo baseada na projeção de uso deste tutorial, use a calculadora de preços. Novos usuários do Google Cloud podem estar qualificados para uma avaliação gratuita.

Antes de começar

Execute as etapas abaixo para se preparar para executar o código neste tutorial.

  1. Criar o projeto. Se necessário, configure um projeto com as APIs Dataproc, Compute Engine e Cloud Storage ativadas e a Google Cloud CLI instalada na máquina local.

    1. Faça login na sua conta do Google Cloud. Se você começou a usar o Google Cloud agora, crie uma conta para avaliar o desempenho de nossos produtos em situações reais. Clientes novos também recebem US$ 300 em créditos para executar, testar e implantar cargas de trabalho.
    2. No console do Google Cloud, na página do seletor de projetos, selecione ou crie um projeto do Google Cloud.

      Acessar o seletor de projetos

    3. Verifique se a cobrança está ativada para o seu projeto do Google Cloud.

    4. Ative as APIs Dataproc, Compute Engine, and Cloud Storage.

      Ative as APIs

    5. Create a service account:

      1. In the Google Cloud console, go to the Create service account page.

        Go to Create service account
      2. Select your project.
      3. In the Service account name field, enter a name. The Google Cloud console fills in the Service account ID field based on this name.

        In the Service account description field, enter a description. For example, Service account for quickstart.

      4. Click Create and continue.
      5. Grant the Project > Owner role to the service account.

        To grant the role, find the Select a role list, then select Project > Owner.

      6. Click Continue.
      7. Click Done to finish creating the service account.

        Do not close your browser window. You will use it in the next step.

    6. Create a service account key:

      1. In the Google Cloud console, click the email address for the service account that you created.
      2. Click Keys.
      3. Click Add key, and then click Create new key.
      4. Click Create. A JSON key file is downloaded to your computer.
      5. Click Close.
    7. Defina a variável de ambiente GOOGLE_APPLICATION_CREDENTIALS como o caminho do arquivo JSON que contém suas credenciais. Essa variável só se aplica à sessão de shell atual. Assim, se você abrir uma nova sessão, precisará definir a variável novamente.

    8. Instale a CLI do Google Cloud.
    9. Para inicializar a CLI gcloud, execute o seguinte comando:

      gcloud init
    10. No console do Google Cloud, na página do seletor de projetos, selecione ou crie um projeto do Google Cloud.

      Acessar o seletor de projetos

    11. Verifique se a cobrança está ativada para o seu projeto do Google Cloud.

    12. Ative as APIs Dataproc, Compute Engine, and Cloud Storage.

      Ative as APIs

    13. Create a service account:

      1. In the Google Cloud console, go to the Create service account page.

        Go to Create service account
      2. Select your project.
      3. In the Service account name field, enter a name. The Google Cloud console fills in the Service account ID field based on this name.

        In the Service account description field, enter a description. For example, Service account for quickstart.

      4. Click Create and continue.
      5. Grant the Project > Owner role to the service account.

        To grant the role, find the Select a role list, then select Project > Owner.

      6. Click Continue.
      7. Click Done to finish creating the service account.

        Do not close your browser window. You will use it in the next step.

    14. Create a service account key:

      1. In the Google Cloud console, click the email address for the service account that you created.
      2. Click Keys.
      3. Click Add key, and then click Create new key.
      4. Click Create. A JSON key file is downloaded to your computer.
      5. Click Close.
    15. Defina a variável de ambiente GOOGLE_APPLICATION_CREDENTIALS como o caminho do arquivo JSON que contém suas credenciais. Essa variável só se aplica à sessão de shell atual. Assim, se você abrir uma nova sessão, precisará definir a variável novamente.

    16. Instale a CLI do Google Cloud.
    17. Para inicializar a CLI gcloud, execute o seguinte comando:

      gcloud init

  2. Criar um bucket do Cloud Storage Você precisa do Cloud Storage para armazenar os dados do tutorial. Se você não tiver um pronto para usá-lo, crie um novo bucket no projeto.

    1. No console do Cloud, acesse a página Buckets do Cloud Storage.

      Acessar a página "Buckets"

    2. Clique em Criar bucket.
    3. Na página Criar um bucket, insira as informações do seu bucket. Para ir à próxima etapa, clique em Continuar.
    4. Clique em Criar.

  3. Defina variáveis de ambiente locais. Defina variáveis de ambiente na máquina local. Defina o ID do projeto do Google Cloud e o nome do bucket do Cloud Storage que você usará neste tutorial. Forneça também o nome e a região de um cluster novo ou existente do Dataproc. Você pode criar um cluster para usar neste tutorial na próxima etapa.

    PROJECT=project-id
    
    BUCKET_NAME=bucket-name
    
    CLUSTER=cluster-name
    
    REGION=cluster-region Example: "us-central1"
    

  4. Criar um cluster de Dataproc. Execute o comando abaixo para criar um cluster do Dataproc de nó único na zona do Compute Engine especificada.

    gcloud dataproc clusters create ${CLUSTER} \
        --project=${PROJECT} \
        --region=${REGION} \
        --single-node
    

  5. Copie dados públicos para o bucket do Cloud Storage. Copie um snippet de um texto de Shakespeare de domínio público para a pasta input do bucket do Cloud Storage:

    gsutil cp gs://pub/shakespeare/rose.txt \
        gs://${BUCKET_NAME}/input/rose.txt
    

  6. Configure um ambiente de desenvolvimento do Java (Apache Maven), Scala (SBT) ou Python.

Preparar o job de contagem de palavras do Spark

Selecione uma guia abaixo para seguir as etapas e preparar um pacote ou arquivo de job para enviar ao cluster. Você pode preparar um dos seguintes tipos de job:

Java

  1. Copie o arquivo pom.xml para sua máquina local. O arquivo pom.xml a seguir especifica as dependências da biblioteca Scala e Spark, que recebem um escopo provided para indicar que o cluster do Dataproc fornecerá essas bibliotecas no ambiente de execução. O arquivo pom.xml não especifica uma dependência do Cloud Storage porque o conector implementa a interface HDFS padrão. Quando um job do Spark acessa arquivos de cluster do Cloud Storage (arquivos com URIs que começam com gs:// ), o sistema usa automaticamente o conector do Cloud Storage para acessar os arquivos no Cloud Storage
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
    
      <groupId>dataproc.codelab</groupId>
      <artifactId>word-count</artifactId>
      <version>1.0</version>
    
      <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
      </properties>
    
      <dependencies>
        <dependency>
          <groupId>org.scala-lang</groupId>
          <artifactId>scala-library</artifactId>
          <version>Scala version, for example, 2.11.8</version>
          <scope>provided</scope>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_Scala major.minor.version, for example, 2.11</artifactId>
          <version>Spark version, for example, 2.3.1</version>
          <scope>provided</scope>
        </dependency>
      </dependencies>
    </project>
    
  2. Copie o código WordCount.java listado abaixo na sua máquina local.
    1. Crie um conjunto de diretórios com o caminho src/main/java/dataproc/codelab:
      mkdir -p src/main/java/dataproc/codelab
      
    2. Copie WordCount.java para sua máquina local em src/main/java/dataproc/codelab:
      cp WordCount.java src/main/java/dataproc/codelab
      

    O WordCount.java é um job simples do Spark em Java que lê arquivos de texto do Cloud Storage, faz a contagem de palavras e grava os resultados em um arquivo de texto no Cloud Storage.

    package dataproc.codelab;
    
    import java.util.Arrays;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import scala.Tuple2;
    
    public class WordCount {
      public static void main(String[] args) {
        if (args.length != 2) {
          throw new IllegalArgumentException("Exactly 2 arguments are required: <inputUri> <outputUri>");
        }
        String inputPath = args[0];
        String outputPath = args[1];
        JavaSparkContext sparkContext = new JavaSparkContext(new SparkConf().setAppName("Word Count"));
        JavaRDD<String> lines = sparkContext.textFile(inputPath);
        JavaRDD<String> words = lines.flatMap(
            (String line) -> Arrays.asList(line.split(" ")).iterator()
        );
        JavaPairRDD<String, Integer> wordCounts = words.mapToPair(
            (String word) -> new Tuple2<>(word, 1)
        ).reduceByKey(
            (Integer count1, Integer count2) -> count1 + count2
        );
        wordCounts.saveAsTextFile(outputPath);
      }
    }
    
  3. Crie o pacote.
    mvn clean package
    
    Se o build for bem-sucedido, um target/word-count-1.0.jar será criado.
  4. Prepare o pacote para o Cloud Storage.
    gsutil cp target/word-count-1.0.jar \
        gs://${BUCKET_NAME}/java/word-count-1.0.jar
    

Scala

  1. Copie o arquivo build.sbt para sua máquina local. O arquivo build.sbt a seguir especifica as dependências da biblioteca Scala e Spark, que recebem um escopo provided para indicar que o cluster do Dataproc fornecerá essas bibliotecas no ambiente de execução. O arquivo build.sbt não especifica uma dependência do Cloud Storage porque o conector implementa a interface HDFS padrão. Quando um job do Spark acessa arquivos de cluster do Cloud Storage (arquivos com URIs que começam com gs:// ), o sistema usa automaticamente o conector do Cloud Storage para acessar os arquivos no Cloud Storage
    scalaVersion := "Scala version, for example, 2.11.8"
    
    name := "word-count"
    organization := "dataproc.codelab"
    version := "1.0"
    
    libraryDependencies ++= Seq(
      "org.scala-lang" % "scala-library" % scalaVersion.value % "provided",
      "org.apache.spark" %% "spark-core" % "Spark version, for example, 2.3.1" % "provided"
    )
    
    
  2. Copie word-count.scala para sua máquina local. Ele é um job simples do Spark em Java que lê arquivos de texto do Cloud Storage, faz a contagem de palavras e grava os resultados em um arquivo de texto no Cloud Storage.
    package dataproc.codelab
    
    import org.apache.spark.SparkContext
    import org.apache.spark.SparkConf
    
    object WordCount {
      def main(args: Array[String]) {
        if (args.length != 2) {
          throw new IllegalArgumentException(
              "Exactly 2 arguments are required: <inputPath> <outputPath>")
        }
    
        val inputPath = args(0)
        val outputPath = args(1)
    
        val sc = new SparkContext(new SparkConf().setAppName("Word Count"))
        val lines = sc.textFile(inputPath)
        val words = lines.flatMap(line => line.split(" "))
        val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
        wordCounts.saveAsTextFile(outputPath)
      }
    }
    
    
  3. Crie o pacote.
    sbt clean package
    
    Se o build for bem-sucedido, um target/scala-2.11/word-count_2.11-1.0.jar será criado.
  4. Prepare o pacote para o Cloud Storage.
    gsutil cp target/scala-2.11/word-count_2.11-1.0.jar \
        gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar
    

Python

  1. Copie word-count.py para sua máquina local. Ele é um job simples do Spark em Python usando PySpark que lê arquivos de texto do Cloud Storage, faz a contagem de palavras e grava os resultados em um arquivo de texto no Cloud Storage.
    #!/usr/bin/env python
    
    import pyspark
    import sys
    
    if len(sys.argv) != 3:
      raise Exception("Exactly 2 arguments are required: <inputUri> <outputUri>")
    
    inputUri=sys.argv[1]
    outputUri=sys.argv[2]
    
    sc = pyspark.SparkContext()
    lines = sc.textFile(sys.argv[1])
    words = lines.flatMap(lambda line: line.split())
    wordCounts = words.map(lambda word: (word, 1)).reduceByKey(lambda count1, count2: count1 + count2)
    wordCounts.saveAsTextFile(sys.argv[2])
    

Enviar o job

Execute o comando gcloud a seguir para enviar o job de contagem de palavras ao cluster do Dataproc.

Java

gcloud dataproc jobs submit spark \
    --cluster=${CLUSTER} \
    --class=dataproc.codelab.WordCount \
    --jars=gs://${BUCKET_NAME}/java/word-count-1.0.jar \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

Scala

gcloud dataproc jobs submit spark \
    --cluster=${CLUSTER} \
    --class=dataproc.codelab.WordCount \
    --jars=gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

Python

gcloud dataproc jobs submit pyspark word-count.py \
    --cluster=${CLUSTER} \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

Veja o resultado

Depois que o job for concluído, execute o seguinte comando gsutil da CLI gcloud para ver a saída da contagem de palavras.

gsutil cat gs://${BUCKET_NAME}/output/*

A saída da contagem de palavras será semelhante a esta:

(a,2)
(call,1)
(What's,1)
(sweet.,1)
(we,1)
(as,1)
(name?,1)
(any,1)
(other,1)
(rose,1)
(smell,1)
(name,1)
(would,1)
(in,1)
(which,1)
(That,1)
(By,1)

Limpar

Depois de concluir o tutorial, você pode limpar os recursos que criou para que eles parem de usar a cota e gerar cobranças. Nas seções a seguir, você aprenderá a excluir e desativar esses recursos.

Excluir o projeto

O jeito mais fácil de evitar cobranças é excluindo o projeto que você criou para o tutorial.

Para excluir o projeto:

  1. No Console do Google Cloud, acesse a página Gerenciar recursos.

    Acessar "Gerenciar recursos"

  2. Na lista de projetos, selecione o projeto que você quer excluir e clique em Excluir .
  3. Na caixa de diálogo, digite o ID do projeto e clique em Encerrar para excluí-lo.

Como excluir o cluster do Dataproc

Em vez de excluir o projeto, convém excluir o cluster dentro do projeto.

Como excluir o bucket do Cloud Storage

Console do Google Cloud

  1. No Console do Google Cloud, acesse a página Buckets do Cloud Storage.

    Acessar buckets

  2. Clique na caixa de seleção do bucket que você quer excluir.
  3. Para excluir o bucket, clique em Excluir e siga as instruções.

Linha de comando

    Excluir o bucket:
    gcloud storage buckets delete BUCKET_NAME

A seguir