Utilizzo del connettore Cloud Storage con Apache Spark

Mantieni tutto organizzato con le raccolte Salva e classifica i contenuti in base alle tue preferenze.

Questo tutorial mostra come eseguire codice di esempio che utilizza il connettore Cloud Storage con Apache Spark.

Obiettivi

Scrivi un semplice job di Spark di conteggio parole in Java, Scala o Python, quindi esegui il job su un cluster Dataproc.

Costi

Questo tutorial utilizza i seguenti componenti fatturabili di Google Cloud:

  • Compute Engine
  • Dataproc
  • Cloud Storage

Per generare una stima dei costi in base all'utilizzo previsto, utilizza il Calcolatore prezzi. I nuovi utenti di Google Cloud possono beneficiare di una prova gratuita.

Prima di iniziare

Esegui i passaggi seguenti per prepararti a eseguire il codice in questo tutorial.

  1. Configura il tuo progetto. Se necessario, configura un progetto con le API Dataproc, Compute Engine e Cloud Storage abilitate e Google Cloud CLI installato sulla macchina locale.

    1. Accedi al tuo account Google Cloud. Se non conosci Google Cloud, crea un account per valutare le prestazioni dei nostri prodotti in scenari reali. I nuovi clienti ricevono anche 300 $di crediti gratuiti per l'esecuzione, il test e il deployment dei carichi di lavoro.
    2. Nella pagina del selettore dei progetti in Google Cloud Console, seleziona o crea un progetto Google Cloud.

      Vai al selettore progetti

    3. Assicurati che la fatturazione sia attivata per il tuo progetto Cloud. Scopri come verificare se la fatturazione è abilitata su un progetto.

    4. Abilita le API Dataproc, Compute Engine, and Cloud Storage.

      Abilita le API

    5. Crea un account di servizio:

      1. Nella console Google Cloud, vai alla pagina Crea account di servizio.

        Vai a Crea account di servizio
      2. Seleziona il progetto.
      3. Inserisci un nome nel campo Service account name (Nome account di servizio). La console Google Cloud compila il campo ID account di servizio in base a questo nome.

        Nel campo Descrizione account di servizio, inserisci una descrizione. Ad esempio: Service account for quickstart.

      4. Fai clic su Crea e continua.
      5. Per fornire l'accesso al progetto, concedi i seguenti ruoli al tuo account di servizio: Progetto > Proprietario.

        Nell'elenco Seleziona un ruolo, seleziona un ruolo.

        Per altri ruoli, fai clic su Aggiungi un altro ruolo e aggiungi ogni ruolo aggiuntivo.

      6. Fai clic su Continua.
      7. Fai clic su Fine per completare la creazione dell'account di servizio.

        Non chiudere la finestra del browser. La utilizzerai nel passaggio successivo.

    6. Crea una chiave dell'account di servizio:

      1. Nella console Google Cloud, fai clic sull'indirizzo email dell'account di servizio che hai creato.
      2. Fai clic su Chiavi.
      3. Fai clic su Aggiungi chiave, quindi su Crea nuova chiave.
      4. Fai clic su Crea. Sul computer viene scaricato un file della chiave JSON.
      5. Fai clic su Chiudi.
    7. Imposta la variabile di ambiente GOOGLE_APPLICATION_CREDENTIALS sul percorso del file JSON che contiene la chiave dell'account di servizio. Questa variabile si applica solo alla sessione shell corrente, quindi se apri una nuova sessione, impostala di nuovo.

    8. Installa Google Cloud CLI.
    9. Per inizializzare l'interfaccia a riga di comando gcloud, esegui il comando seguente:

      gcloud init
    10. Nella pagina del selettore dei progetti in Google Cloud Console, seleziona o crea un progetto Google Cloud.

      Vai al selettore progetti

    11. Assicurati che la fatturazione sia attivata per il tuo progetto Cloud. Scopri come verificare se la fatturazione è abilitata su un progetto.

    12. Abilita le API Dataproc, Compute Engine, and Cloud Storage.

      Abilita le API

    13. Crea un account di servizio:

      1. Nella console Google Cloud, vai alla pagina Crea account di servizio.

        Vai a Crea account di servizio
      2. Seleziona il progetto.
      3. Inserisci un nome nel campo Service account name (Nome account di servizio). La console Google Cloud compila il campo ID account di servizio in base a questo nome.

        Nel campo Descrizione account di servizio, inserisci una descrizione. Ad esempio: Service account for quickstart.

      4. Fai clic su Crea e continua.
      5. Per fornire l'accesso al progetto, concedi i seguenti ruoli al tuo account di servizio: Progetto > Proprietario.

        Nell'elenco Seleziona un ruolo, seleziona un ruolo.

        Per altri ruoli, fai clic su Aggiungi un altro ruolo e aggiungi ogni ruolo aggiuntivo.

      6. Fai clic su Continua.
      7. Fai clic su Fine per completare la creazione dell'account di servizio.

        Non chiudere la finestra del browser. La utilizzerai nel passaggio successivo.

    14. Crea una chiave dell'account di servizio:

      1. Nella console Google Cloud, fai clic sull'indirizzo email dell'account di servizio che hai creato.
      2. Fai clic su Chiavi.
      3. Fai clic su Aggiungi chiave, quindi su Crea nuova chiave.
      4. Fai clic su Crea. Sul computer viene scaricato un file della chiave JSON.
      5. Fai clic su Chiudi.
    15. Imposta la variabile di ambiente GOOGLE_APPLICATION_CREDENTIALS sul percorso del file JSON che contiene la chiave dell'account di servizio. Questa variabile si applica solo alla sessione shell corrente, quindi se apri una nuova sessione, impostala di nuovo.

    16. Installa Google Cloud CLI.
    17. Per inizializzare l'interfaccia a riga di comando gcloud, esegui il comando seguente:

      gcloud init

  2. Crea un bucket Cloud Storage. Hai bisogno di un Cloud Storage per conservare i dati del tutorial. Se non ne hai uno da utilizzare, crea un nuovo bucket nel progetto.

    1. Nella console, vai alla pagina Browser Cloud Storage.

      Vai al browser

    2. Fai clic su Crea bucket.
    3. Nella pagina Crea un bucket, inserisci le informazioni sul bucket. Per andare al passaggio successivo, fai clic su Continua.
    4. Fai clic su Crea.

  3. Imposta le variabili di ambiente locali. Imposta le variabili di ambiente sulla tua macchina locale. Imposta il tuo ID progetto Google Cloud e il nome del bucket Cloud Storage che utilizzerai per questo tutorial. Fornisci anche il nome e l'area geografica di un cluster Dataproc esistente o nuovo. Puoi creare un cluster da utilizzare in questo tutorial nel passaggio successivo.

    PROJECT=project-id
    
    BUCKET_NAME=bucket-name
    
    CLUSTER=cluster-name
    
    REGION=cluster-region Example: "us-central1"
    

  4. Crea un cluster Dataproc. Esegui il comando seguente per creare un cluster Single-Node Dataproc nella zona di Compute Engine specificata.

    gcloud dataproc clusters create ${CLUSTER} \
        --project=${PROJECT} \
        --region=${REGION} \
        --single-node
    

  5. Copia i dati pubblici nel bucket Cloud Storage. Copia uno snippet di testo pubblico Shakespeare nella cartella input del bucket Cloud Storage:

    gsutil cp gs://pub/shakespeare/rose.txt \
        gs://${BUCKET_NAME}/input/rose.txt
    

  6. Configura un ambiente di sviluppo Java (Apache Maven), Scala (SBT) o Python.

Prepara il job di conteggio parole Spark

Seleziona una scheda di seguito per seguire i passaggi per preparare un pacchetto o file da inviare al cluster. Puoi preparare uno dei seguenti tipi di prestazioni:

Java

  1. Copia pom.xml file nella macchina locale. Il seguente file pom.xml specifica le dipendenze libreria Scala e Spark, a cui viene assegnato un ambito provided per indicare che il cluster Dataproc fornirà queste librerie in fase di esecuzione. Il file pom.xml non specifica una dipendenza Cloud Storage perché il connettore implementa l'interfaccia HDFS standard. Quando un job Spark accede ai file del cluster di Cloud Storage (file con URI che iniziano con gs://), il sistema utilizza automaticamente il connettore Cloud Storage per accedere ai file in Cloud Storage.
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
    
      <groupId>dataproc.codelab</groupId>
      <artifactId>word-count</artifactId>
      <version>1.0</version>
    
      <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
      </properties>
    
      <dependencies>
        <dependency>
          <groupId>org.scala-lang</groupId>
          <artifactId>scala-library</artifactId>
          <version>Scala version, for example, 2.11.8</version>
          <scope>provided</scope>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_Scala major.minor.version, for example, 2.11</artifactId>
          <version>Spark version, for example, 2.3.1</version>
          <scope>provided</scope>
        </dependency>
      </dependencies>
    </project>
    
  2. Copia il codice WordCount.java elencato di seguito nella tua macchina locale.
    1. Crea un insieme di directory con il percorso src/main/java/dataproc/codelab:
      mkdir -p src/main/java/dataproc/codelab
      
    2. Copia WordCount.java nella macchina locale in src/main/java/dataproc/codelab:
      cp WordCount.java src/main/java/dataproc/codelab
      

    WordCount.java è un semplice job Spark in Java che legge i file di testo da Cloud Storage, esegue un conteggio delle parole, quindi scrive i risultati dei file di testo in Cloud Storage.

    package dataproc.codelab;
    
    import java.util.Arrays;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import scala.Tuple2;
    
    public class WordCount {
      public static void main(String[] args) {
        if (args.length != 2) {
          throw new IllegalArgumentException("Exactly 2 arguments are required: <inputUri> <outputUri>");
        }
        String inputPath = args[0];
        String outputPath = args[1];
        JavaSparkContext sparkContext = new JavaSparkContext(new SparkConf().setAppName("Word Count"));
        JavaRDD<String> lines = sparkContext.textFile(inputPath);
        JavaRDD<String> words = lines.flatMap(
            (String line) -> Arrays.asList(line.split(" ")).iterator()
        );
        JavaPairRDD<String, Integer> wordCounts = words.mapToPair(
            (String word) -> new Tuple2<>(word, 1)
        ).reduceByKey(
            (Integer count1, Integer count2) -> count1 + count2
        );
        wordCounts.saveAsTextFile(outputPath);
      }
    }
    
  3. Crea il pacchetto.
    mvn clean package
    
    Se la build ha esito positivo, viene creato un target/spark-with-gcs-1.0-SNAPSHOT.jar.
  4. Imposta il pacchetto in un'area temporanea su Cloud Storage.
    gsutil cp target/word-count-1.0.jar \
        gs://${BUCKET_NAME}/java/word-count-1.0.jar
    

Scala

  1. Copia build.sbt file nella macchina locale. Il seguente file build.sbt specifica le dipendenze libreria Scala e Spark, a cui viene assegnato un ambito provided per indicare che il cluster Dataproc fornirà queste librerie in fase di esecuzione. Il file build.sbt non specifica una dipendenza Cloud Storage perché il connettore implementa l'interfaccia HDFS standard. Quando un job Spark accede ai file del cluster Cloud Storage (file con URI che iniziano con gs://), il sistema utilizza automaticamente il connettore Cloud Storage per accedere ai file in Cloud Storage
    scalaVersion := "Scala version, for example, 2.11.8"
    
    name := "word-count"
    organization := "dataproc.codelab"
    version := "1.0"
    
    libraryDependencies ++= Seq(
      "org.scala-lang" % "scala-library" % scalaVersion.value % "provided",
      "org.apache.spark" %% "spark-core" % "Spark version, for example, 2.3.1" % "provided"
    )
    
    
  2. Copia word-count.scala nella macchina locale. Si tratta di un semplice job Spark in Java che legge i file di testo da Cloud Storage, esegue un conteggio delle parole, quindi scrive i risultati dei file di testo in Cloud Storage.
    package dataproc.codelab
    
    import org.apache.spark.SparkContext
    import org.apache.spark.SparkConf
    
    object WordCount {
      def main(args: Array[String]) {
        if (args.length != 2) {
          throw new IllegalArgumentException(
              "Exactly 2 arguments are required: <inputPath> <outputPath>")
        }
    
        val inputPath = args(0)
        val outputPath = args(1)
    
        val sc = new SparkContext(new SparkConf().setAppName("Word Count"))
        val lines = sc.textFile(inputPath)
        val words = lines.flatMap(line => line.split(" "))
        val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
        wordCounts.saveAsTextFile(outputPath)
      }
    }
    
    
  3. Crea il pacchetto.
    sbt clean package
    
    Se la build ha esito positivo, viene creato un target/scala-2.11/word-count_2.11-1.0.jar.
  4. Imposta il pacchetto in un'area temporanea su Cloud Storage.
    gsutil cp target/scala-2.11/word-count_2.11-1.0.jar \
        gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar
    

Python

  1. Copia word-count.py nella macchina locale. Si tratta di un semplice job Spark in Python che utilizza PySpark che legge i file di testo da Cloud Storage, esegue un conteggio di parole, quindi scrive i risultati dei file di testo in Cloud Storage.
    #!/usr/bin/env python
    
    import pyspark
    import sys
    
    if len(sys.argv) != 3:
      raise Exception("Exactly 2 arguments are required: <inputUri> <outputUri>")
    
    inputUri=sys.argv[1]
    outputUri=sys.argv[2]
    
    sc = pyspark.SparkContext()
    lines = sc.textFile(sys.argv[1])
    words = lines.flatMap(lambda line: line.split())
    wordCounts = words.map(lambda word: (word, 1)).reduceByKey(lambda count1, count2: count1 + count2)
    wordCounts.saveAsTextFile(sys.argv[2])
    

Invia il job

Esegui questo comando gcloud per inviare il job di conteggio parole al cluster Dataproc.

Java

gcloud dataproc jobs submit spark \
    --cluster=${CLUSTER} \
    --class=dataproc.codelab.WordCount \
    --jars=gs://${BUCKET_NAME}/java/word-count-1.0.jar \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

Scala

gcloud dataproc jobs submit spark \
    --cluster=${CLUSTER} \
    --class=dataproc.codelab.WordCount \
    --jars=gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

Python

gcloud dataproc jobs submit pyspark word-count.py \
    --cluster=${CLUSTER} \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

Visualizza l'output

Al termine del job, esegui questo comando dell'interfaccia a riga di comando gcloud gsutil per visualizzare l'output del conteggio parole.

gsutil cat gs://${BUCKET_NAME}/output/*

L'output del conteggio parole dovrebbe essere simile al seguente:

(a,2)
(call,1)
(What's,1)
(sweet.,1)
(we,1)
(as,1)
(name?,1)
(any,1)
(other,1)
(rose,1)
(smell,1)
(name,1)
(would,1)
(in,1)
(which,1)
(That,1)
(By,1)

Esegui la pulizia

Dopo aver completato il tutorial, puoi eseguire la pulizia delle risorse che hai creato, in modo che smettano di utilizzare la quota e che vengano addebitati dei costi. Le seguenti sezioni descrivono come eliminare o disattivare queste risorse.

Elimina il progetto

Il modo più semplice per eliminare la fatturazione è eliminare il progetto che hai creato per il tutorial.

Per eliminare il progetto:

  1. In Google Cloud Console, vai alla pagina Gestisci risorse.

    Vai a Gestisci risorse

  2. Nell'elenco dei progetti, seleziona il progetto che vuoi eliminare, quindi fai clic su Elimina.
  3. Nella finestra di dialogo, digita l'ID del progetto e fai clic su Chiudi per eliminare il progetto.

Eliminazione del cluster Dataproc

Anziché eliminare il progetto, puoi solo eliminare il cluster all'interno del progetto.

Eliminazione del bucket Cloud Storage

console Google Cloud

  1. Nella console di Google Cloud, vai alla pagina Browser di Cloud Storage.

    Vai al browser

  2. Fai clic sulla casella di controllo del bucket che vuoi eliminare.
  3. Per eliminare il bucket, fai clic su Elimina, quindi segui le istruzioni.

Riga di comando

    Elimina il bucket:
    gsutil rb BUCKET_NAME

Passaggi successivi