Utiliser le connecteur Cloud Storage avec Apache Spark


Ce tutoriel vous explique comment exécuter un exemple de code utilisant le connecteur Cloud Storage avec Apache Spark.

Objectifs

Écrire une tâche de décompte de mots simple Spark en Java, Scala ou Python, puis exécuter la tâche sur un cluster Dataproc.

Coûts

Dans ce document, vous utilisez les composants facturables suivants de Google Cloud :

  • Compute Engine
  • Dataproc
  • Cloud Storage

Obtenez une estimation des coûts en fonction de votre utilisation prévue à l'aide du simulateur de coût. Les nouveaux utilisateurs de Google Cloud peuvent bénéficier d'un essai gratuit.

Avant de commencer

Exécutez les étapes ci-dessous pour préparer l'exécution du code dans ce tutoriel.

  1. Configurer votre projet Si nécessaire, configurez un projet avec les API Dataproc, Compute Engine et Cloud Storage activées, et la Google Cloud CLI, installée sur votre ordinateur local.

    1. Connectez-vous à votre compte Google Cloud. Si vous débutez sur Google Cloud, créez un compte pour évaluer les performances de nos produits en conditions réelles. Les nouveaux clients bénéficient également de 300 $ de crédits gratuits pour exécuter, tester et déployer des charges de travail.
    2. Dans Google Cloud Console, sur la page de sélection du projet, sélectionnez ou créez un projet Google Cloud.

      Accéder au sélecteur de projet

    3. Vérifiez que la facturation est activée pour votre projet Google Cloud.

    4. Activer les API Dataproc, Compute Engine, and Cloud Storage.

      Activer les API

    5. Créez un compte de service :

      1. Dans la console Google Cloud, accédez à la page Créer un compte de service.

        Accéder à la page "Créer un compte de service"
      2. Sélectionnez votre projet.
      3. Dans le champ Nom du compte de service, saisissez un nom. La console Google Cloud remplit le champ ID du compte de service en fonction de ce nom.

        Dans le champ Description du compte de service, saisissez une description. Exemple : Service account for quickstart.

      4. Cliquez sur Créer et continuer.
      5. Attribuez le rôle Project > Owner au compte de service.

        Pour accorder le rôle, trouvez la liste Sélectionner un rôle, puis sélectionnez Project > Owner.

      6. Cliquez sur Continuer.
      7. Cliquez sur OK pour terminer la création du compte de service.

        Ne fermez pas la fenêtre de votre navigateur. Vous en aurez besoin lors de la tâche suivante.

    6. Créez une clé de compte de service :

      1. Dans la console Google Cloud, cliquez sur l'adresse e-mail du compte de service que vous avez créé.
      2. Cliquez sur Keys (Clés).
      3. Cliquez sur Ajouter une clé, puis sur Créer une clé.
      4. Cliquez sur Create (Créer). Un fichier de clé JSON est téléchargé sur votre ordinateur.
      5. Cliquez sur Close (Fermer).
    7. Définissez la variable d'environnement GOOGLE_APPLICATION_CREDENTIALS sur le chemin d'accès du fichier JSON contenant vos identifiants. Cette variable ne s'applique qu'à la session de shell actuelle. Par conséquent, si vous ouvrez une nouvelle session, vous devez de nouveau la définir.

    8. Installez Google Cloud CLI.
    9. Pour initialiser gcloudCLI, exécutez la commande suivante :

      gcloud init
    10. Dans Google Cloud Console, sur la page de sélection du projet, sélectionnez ou créez un projet Google Cloud.

      Accéder au sélecteur de projet

    11. Vérifiez que la facturation est activée pour votre projet Google Cloud.

    12. Activer les API Dataproc, Compute Engine, and Cloud Storage.

      Activer les API

    13. Créez un compte de service :

      1. Dans la console Google Cloud, accédez à la page Créer un compte de service.

        Accéder à la page "Créer un compte de service"
      2. Sélectionnez votre projet.
      3. Dans le champ Nom du compte de service, saisissez un nom. La console Google Cloud remplit le champ ID du compte de service en fonction de ce nom.

        Dans le champ Description du compte de service, saisissez une description. Exemple : Service account for quickstart.

      4. Cliquez sur Créer et continuer.
      5. Attribuez le rôle Project > Owner au compte de service.

        Pour accorder le rôle, trouvez la liste Sélectionner un rôle, puis sélectionnez Project > Owner.

      6. Cliquez sur Continuer.
      7. Cliquez sur OK pour terminer la création du compte de service.

        Ne fermez pas la fenêtre de votre navigateur. Vous en aurez besoin lors de la tâche suivante.

    14. Créez une clé de compte de service :

      1. Dans la console Google Cloud, cliquez sur l'adresse e-mail du compte de service que vous avez créé.
      2. Cliquez sur Keys (Clés).
      3. Cliquez sur Ajouter une clé, puis sur Créer une clé.
      4. Cliquez sur Create (Créer). Un fichier de clé JSON est téléchargé sur votre ordinateur.
      5. Cliquez sur Close (Fermer).
    15. Définissez la variable d'environnement GOOGLE_APPLICATION_CREDENTIALS sur le chemin d'accès du fichier JSON contenant vos identifiants. Cette variable ne s'applique qu'à la session de shell actuelle. Par conséquent, si vous ouvrez une nouvelle session, vous devez de nouveau la définir.

    16. Installez Google Cloud CLI.
    17. Pour initialiser gcloudCLI, exécutez la commande suivante :

      gcloud init

  2. Créer un bucket Cloud Storage Vous aurez besoin d'un stockage Cloud Storage pour stocker les données du tutoriel. Si vous n'en avez pas, créez un bucket dans votre projet.

    1. Dans la console Google Cloud, accédez à la page Buckets Cloud Storage.

      Accéder à la page "Buckets"

    2. Cliquez sur Créer un bucket.
    3. Sur la page Créer un bucket, saisissez les informations concernant votre bucket. Pour passer à l'étape suivante, cliquez sur Continuer.
      • Pour nommer votre bucket, saisissez un nom qui répond aux exigences de dénomination des buckets.
      • Pour Choisir l'emplacement de stockage des données, procédez comme suit :
        • Sélectionnez une option de type d'emplacement.
        • Sélectionnez une option Location (Emplacement).
      • Pour Choisir une classe de stockage par défaut pour vos données, sélectionnez une classe de stockage.
      • Pour le champ Choisir comment contrôler l'accès aux objets, sélectionnez une option de Contrôle des accès.
      • Sous Paramètres avancés (facultatif), choisissez une méthode de chiffrement, une règle de conservation ou des libellés de bucket.
    4. Cliquez sur Create (Créer).

  3. Définissez des variables d'environnement locales. Définissez des variables d'environnement sur votre ordinateur local. Définissez l'ID de votre projet Google Cloud et le nom du bucket Cloud Storage que vous utiliserez pour ce tutoriel. Indiquez également le nom et la région d'un cluster Dataproc existant ou nouveau. Vous pouvez créer un cluster à utiliser dans ce tutoriel à l'étape suivante.

    PROJECT=project-id
    
    BUCKET_NAME=bucket-name
    
    CLUSTER=cluster-name
    
    REGION=cluster-region Example: "us-central1"
    

  4. Créer un cluster Dataproc Exécutez la commande ci-dessous pour créer un cluster Dataproc à nœud unique dans la zone Compute Engine spécifiée.

    gcloud dataproc clusters create ${CLUSTER} \
        --project=${PROJECT} \
        --region=${REGION} \
        --single-node
    

  5. Copier des données publiques dans votre bucket Cloud Storage. Copiez un extrait de texte Shakespeare public dans le dossier input de votre bucket Cloud Storage :

    gsutil cp gs://pub/shakespeare/rose.txt \
        gs://${BUCKET_NAME}/input/rose.txt
    

  6. Configurez un environnement de développement Java (Apache Maven), Scala (SBT) ou Python.

Préparer la tâche de décompte Spark

Sélectionnez un onglet ci-dessous pour suivre la procédure de préparation d'un package ou d'un fichier de tâche à envoyer au cluster. Vous pouvez préparer l'un des types de tâches suivants :

Java

  1. Copiez le fichier pom.xml sur votre ordinateur local. Le fichier pom.xml suivant spécifie les dépendances des bibliothèques Scala et Spark, auxquelles un champ d'application provided est attribué pour indiquer que le cluster Dataproc fournira ces bibliothèques lors de l'exécution. Le fichier pom.xml ne spécifie pas de dépendance Cloud Storage, car le connecteur met en œuvre l'interface HDFS standard. Lorsqu'une tâche Spark accède à des fichiers de cluster Cloud Storage (fichiers dont les URI commencent par gs:// ), le système utilise automatiquement le connecteur Cloud Storage pour accéder aux fichiers dans Cloud Storage.
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
    
      <groupId>dataproc.codelab</groupId>
      <artifactId>word-count</artifactId>
      <version>1.0</version>
    
      <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
      </properties>
    
      <dependencies>
        <dependency>
          <groupId>org.scala-lang</groupId>
          <artifactId>scala-library</artifactId>
          <version>Scala version, for example, 2.11.8</version>
          <scope>provided</scope>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_Scala major.minor.version, for example, 2.11</artifactId>
          <version>Spark version, for example, 2.3.1</version>
          <scope>provided</scope>
        </dependency>
      </dependencies>
    </project>
    
  2. Copiez le code WordCount.java indiqué ci-dessous sur votre ordinateur local.
    1. Créez un ensemble de répertoires avec le chemin d'accès src/main/java/dataproc/codelab:
      mkdir -p src/main/java/dataproc/codelab
      
    2. Copiez WordCount.java sur votre ordinateur local dans src/main/java/dataproc/codelab:
      cp WordCount.java src/main/java/dataproc/codelab
      

    WordCount.java est une tâche Spark simple en Java qui lit les fichiers texte de Cloud Storage, effectue un décompte des mots, puis écrit les résultats des fichiers texte dans Cloud Storage.

    package dataproc.codelab;
    
    import java.util.Arrays;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import scala.Tuple2;
    
    public class WordCount {
      public static void main(String[] args) {
        if (args.length != 2) {
          throw new IllegalArgumentException("Exactly 2 arguments are required: <inputUri> <outputUri>");
        }
        String inputPath = args[0];
        String outputPath = args[1];
        JavaSparkContext sparkContext = new JavaSparkContext(new SparkConf().setAppName("Word Count"));
        JavaRDD<String> lines = sparkContext.textFile(inputPath);
        JavaRDD<String> words = lines.flatMap(
            (String line) -> Arrays.asList(line.split(" ")).iterator()
        );
        JavaPairRDD<String, Integer> wordCounts = words.mapToPair(
            (String word) -> new Tuple2<>(word, 1)
        ).reduceByKey(
            (Integer count1, Integer count2) -> count1 + count2
        );
        wordCounts.saveAsTextFile(outputPath);
      }
    }
    
  3. Compilez le package.
    mvn clean package
    
    Si la compilation réussit, une target/word-count-1.0.jar est créée.
  4. Entreposez le package dans Cloud Storage.
    gsutil cp target/word-count-1.0.jar \
        gs://${BUCKET_NAME}/java/word-count-1.0.jar
    

Scala

  1. Copiez le fichier build.sbt sur votre ordinateur local. Le fichier build.sbt suivant spécifie les dépendances des bibliothèques Scala et Spark, auxquelles un champ d'application provided est attribué pour indiquer que le cluster Dataproc fournira ces bibliothèques lors de l'exécution. Le fichier build.sbt ne spécifie pas de dépendance Cloud Storage, car le connecteur met en œuvre l'interface HDFS standard. Lorsqu'une tâche Spark accède à des fichiers de cluster Cloud Storage (fichiers dont les URI commencent par gs:// ), le système utilise automatiquement le connecteur Cloud Storage pour accéder aux fichiers dans Cloud Storage.
    scalaVersion := "Scala version, for example, 2.11.8"
    
    name := "word-count"
    organization := "dataproc.codelab"
    version := "1.0"
    
    libraryDependencies ++= Seq(
      "org.scala-lang" % "scala-library" % scalaVersion.value % "provided",
      "org.apache.spark" %% "spark-core" % "Spark version, for example, 2.3.1" % "provided"
    )
    
    
  2. Copiez word-count.scala sur votre ordinateur local. Il s'agit d'une tâche Spark simple en Java qui lit les fichiers texte de Cloud Storage, effectue un décompte des mots, puis écrit les résultats des fichiers texte dans Cloud Storage.
    package dataproc.codelab
    
    import org.apache.spark.SparkContext
    import org.apache.spark.SparkConf
    
    object WordCount {
      def main(args: Array[String]) {
        if (args.length != 2) {
          throw new IllegalArgumentException(
              "Exactly 2 arguments are required: <inputPath> <outputPath>")
        }
    
        val inputPath = args(0)
        val outputPath = args(1)
    
        val sc = new SparkContext(new SparkConf().setAppName("Word Count"))
        val lines = sc.textFile(inputPath)
        val words = lines.flatMap(line => line.split(" "))
        val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
        wordCounts.saveAsTextFile(outputPath)
      }
    }
    
    
  3. Compilez le package.
    sbt clean package
    
    Si la compilation réussit, une target/scala-2.11/word-count_2.11-1.0.jar est créée.
  4. Entreposez le package dans Cloud Storage.
    gsutil cp target/scala-2.11/word-count_2.11-1.0.jar \
        gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar
    

Python

  1. Copiez word-count.py sur votre ordinateur local. Il s'agit d'une tâche Spark simple en Python qui lit les fichiers texte de Cloud Storage, effectue un décompte des mots, puis écrit les résultats des fichiers texte dans Cloud Storage.
    #!/usr/bin/env python
    
    import pyspark
    import sys
    
    if len(sys.argv) != 3:
      raise Exception("Exactly 2 arguments are required: <inputUri> <outputUri>")
    
    inputUri=sys.argv[1]
    outputUri=sys.argv[2]
    
    sc = pyspark.SparkContext()
    lines = sc.textFile(sys.argv[1])
    words = lines.flatMap(lambda line: line.split())
    wordCounts = words.map(lambda word: (word, 1)).reduceByKey(lambda count1, count2: count1 + count2)
    wordCounts.saveAsTextFile(sys.argv[2])
    

Envoyer la tâche

Exécutez la commande gcloud suivante pour envoyer la tâche de décompte à votre cluster Dataproc.

Java

gcloud dataproc jobs submit spark \
    --cluster=${CLUSTER} \
    --class=dataproc.codelab.WordCount \
    --jars=gs://${BUCKET_NAME}/java/word-count-1.0.jar \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

Scala

gcloud dataproc jobs submit spark \
    --cluster=${CLUSTER} \
    --class=dataproc.codelab.WordCount \
    --jars=gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

Python

gcloud dataproc jobs submit pyspark word-count.py \
    --cluster=${CLUSTER} \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

Consulter le résultat

Une fois la tâche terminée, exécutez la commande gcloud CLI gsutil suivante pour afficher le résultat du nombre de mots.

gsutil cat gs://${BUCKET_NAME}/output/*

Le résultat du nombre de mots doit se présenter comme suit:

(a,2)
(call,1)
(What's,1)
(sweet.,1)
(we,1)
(as,1)
(name?,1)
(any,1)
(other,1)
(rose,1)
(smell,1)
(name,1)
(would,1)
(in,1)
(which,1)
(That,1)
(By,1)

Effectuer un nettoyage

Une fois le tutoriel terminé, vous pouvez procéder au nettoyage des ressources que vous avez créées afin qu'elles ne soient plus comptabilisées dans votre quota et qu'elles ne vous soient plus facturées. Dans les sections suivantes, nous allons voir comment supprimer ou désactiver ces ressources.

Supprimer le projet

Le moyen le plus simple d'empêcher la facturation est de supprimer le projet que vous avez créé pour ce tutoriel.

Pour supprimer le projet :

  1. Dans la console Google Cloud, accédez à la page Gérer les ressources.

    Accéder à la page Gérer les ressources

  2. Dans la liste des projets, sélectionnez le projet que vous souhaitez supprimer, puis cliquez sur Supprimer.
  3. Dans la boîte de dialogue, saisissez l'ID du projet, puis cliquez sur Arrêter pour supprimer le projet.

Supprimer le cluster Dataproc

Au lieu de supprimer votre projet, vous souhaiterez peut-être simplement supprimer votre cluster au sein du projet.

Supprimer le bucket Cloud Storage

console Google Cloud

  1. Dans la console Google Cloud, accédez à la page Buckets de Cloud Storage.

    Accéder à la page "Buckets"

  2. Cochez la case correspondant au bucket que vous souhaitez supprimer.
  3. Pour supprimer le bucket, cliquez sur Supprimer , puis suivez les instructions.

Command line

    Supprimez le bucket :
    gcloud storage buckets delete BUCKET_NAME

Étapes suivantes