Envoyer une tâche

Vous pouvez envoyer une tâche à un cluster Dataproc existant via une requête HTTP ou automatisée jobs.submit de l'API Dataproc, à l'aide de l'outil de ligne de commande gcloud du SDK Cloud (en local dans une fenêtre de terminal ou dans Cloud Shell), ou encore à partir d'une instance Google Cloud Console ouverte dans un navigateur local. Vous pouvez également vous connecter en SSH à l'instance maître de votre cluster, puis exécuter une tâche directement depuis l'instance sans utiliser le service Dataproc.

Envoyer une tâche Dataproc

gcloud

Pour envoyer une tâche à un cluster Dataproc, exécutez la commande gcloud dataproc jobs submit du SDK Cloud en local dans une fenêtre de terminal ou dans Cloud Shell.

gcloud dataproc jobs submit job-command \
    --cluster=cluster-name \
    --region=region \
    other dataproc-flags \
    -- job-args
Exemple d'envoi de tâche PySpark
  1. Répertoriez l'élément accessible au public hello-world.py, situé dans Cloud Storage.
    gsutil cat gs://dataproc-examples/pyspark/hello-world/hello-world.py
    
    Fichiers répertoriés :

    #!/usr/bin/python
    import pyspark
    sc = pyspark.SparkContext()
    rdd = sc.parallelize(['Hello,', 'world!'])
    words = sorted(rdd.collect())
    print(words)
    
  2. Envoyez la tâche Pyspark à Dataproc.
    gcloud dataproc jobs submit pyspark \
        gs://dataproc-examples/pyspark/hello-world/hello-world.py \
        --cluster=cluster-name  \
        --region=region
    
    Résultat du terminal :
    Waiting for job output...
    …
    ['Hello,', 'world!']
    Job finished successfully.
    
Exemple d'envoi de tâche Spark
  1. Exécutez l'exemple SparkPi pré-installé sur le nœud maître du cluster Dataproc.
    gcloud dataproc jobs submit spark \
        --cluster=cluster-name \
        --region=region \
        --class=org.apache.spark.examples.SparkPi \
        --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
        -- 1000
    
    Résultat du terminal :
    Job [54825071-ae28-4c5b-85a5-58fae6a597d6] submitted.
    Waiting for job output…
    …
    Pi is roughly 3.14177148
    …
    Job finished successfully.
    …
    

API REST et ligne de commande

Cette section explique comment envoyer une tâche Spark pour calculer la valeur approximative de pi.

Avant d'utiliser les données de requête ci-dessous, effectuez les remplacements suivants :

  • project-id : ID du projet GCP
  • region : région du cluster
  • clusterName : nom du cluster

Méthode HTTP et URL :

POST https://dataproc.googleapis.com/v1/projects/project-id/regions/region/jobs:submit

Corps JSON de la requête :

{
  "job": {
    "placement": {
      "clusterName": "cluster-name"
    },
    },
    "sparkJob": {
      "args": [
        "1000"
      ],
      "mainClass": "org.apache.spark.examples.SparkPi",
      "jarFileUris": [
        "file:///usr/lib/spark/examples/jars/spark-examples.jar"
      ]
    }
  }
}

Pour envoyer votre requête, développez l'une des options suivantes :

Vous devriez recevoir une réponse JSON de ce type :

{
  "reference": {
    "projectId": "project-id",
    "jobId": "job-id"
  },
  "placement": {
    "clusterName": "cluster-name",
    "clusterUuid": "cluster-Uuid"
  },
  "sparkJob": {
    "mainClass": "org.apache.spark.examples.SparkPi",
    "args": [
      "1000"
    ],
    "jarFileUris": [
      "file:///usr/lib/spark/examples/jars/spark-examples.jar"
    ]
  },
  "status": {
    "state": "PENDING",
    "stateStartTime": "2020-10-07T20:16:21.759Z"
  },
  "jobUuid": "job-Uuid"
}

Console

Ouvrez la page Dataproc Submit a job (Envoyer une tâche) de Cloud Console dans votre navigateur.

Exemple de tâche Spark

Pour envoyer un exemple de tâche Spark, remplissez les champs de la page Submit a job (Envoyer une tâche) comme suit (comme illustré dans la capture d'écran précédente) :

  1. Sélectionnez le nom du cluster dans la liste des clusters.
  2. Définissez le champ Job Type (Type de tâche) sur Spark.
  3. Définissez le champ Main class or jar (Classe principale ou fichier JAR) sur org.apache.spark.examples.SparkPi.
  4. Définissez le champ Arguments sur l'argument unique 1000.
  5. Ajoutez des fichiers JAR file:///usr/lib/spark/examples/jars/spark-examples.jar dans le champ Jar files (Fichiers JAR) :
    1. file:/// désigne un schéma LocalFileSystem Hadoop. Dataproc a installé /usr/lib/spark/examples/jars/spark-examples.jar sur le nœud maître du cluster lors de la création de celui-ci.
    2. Vous pouvez également spécifier un chemin d'accès à Cloud Storage (gs://your-bucket/your-jarfile.jar) ou un chemin d'accès au système de fichiers distribué Hadoop (hdfs://path-to-jar.jar) vers l'un de vos fichiers JAR.

Cliquez sur Envoyer pour démarrer la tâche. Une fois la tâche démarrée, elle est ajoutée à la liste des Tâches.

Cliquez sur l'ID de la tâche pour ouvrir la page Jobs (Tâches) qui affiche les résultats du pilote de la tâche (consultez la section Accéder aux résultats du pilote de tâches – Console). Étant donné que cette tâche génère de longues lignes de résultats dépassant la largeur de la fenêtre du navigateur, vous pouvez cocher la case Line wrapping (Renvoi à la ligne automatique) pour afficher tout le texte de résultats dans la vue et consulter le résultat de calcul pour pi.

Vous pouvez afficher les résultats du pilote de la tâche depuis la ligne de commande à l'aide de la commande gcloud dataproc jobs wait présentée ci-dessous (pour plus d'informations, consultez la section Accéder aux résultats du pilote de tâches – Commande gcloud). Copiez et collez votre ID de projet en tant que valeur de l'option --project et votre ID de tâche (affiché dans la liste des tâches) en tant qu'argument final.

gcloud dataproc jobs wait job-id \
    --project=project-id \
    --region=region

Voici des extraits de résultats de pilotes pour l'exemple de tâche SparkPi, envoyée ci-dessus :

...
2015-06-25 23:27:23,810 INFO [dag-scheduler-event-loop]
scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Stage 0 (reduce at
SparkPi.scala:35) finished in 21.169 s

2015-06-25 23:27:23,810 INFO [task-result-getter-3] cluster.YarnScheduler
(Logging.scala:logInfo(59)) - Removed TaskSet 0.0, whose tasks have all
completed, from pool

2015-06-25 23:27:23,819 INFO [main] scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Job 0 finished: reduce at SparkPi.scala:35,
took 21.674931 s

Pi is roughly 3.14189648
...
Job [c556b47a-4b46-4a94-9ba2-2dcee31167b2] finished successfully.

driverOutputUri:
gs://sample-staging-bucket/google-cloud-dataproc-metainfo/cfeaa033-749e-48b9-...
...

Java

  1. Installer la bibliothèque cliente
  2. Configurer les identifiants par défaut de l'application
  3. Exécuter le code
    
    import com.google.api.gax.longrunning.OperationFuture;
    import com.google.cloud.dataproc.v1.Job;
    import com.google.cloud.dataproc.v1.JobControllerClient;
    import com.google.cloud.dataproc.v1.JobControllerSettings;
    import com.google.cloud.dataproc.v1.JobMetadata;
    import com.google.cloud.dataproc.v1.JobPlacement;
    import com.google.cloud.dataproc.v1.SparkJob;
    import com.google.cloud.storage.Blob;
    import com.google.cloud.storage.Storage;
    import com.google.cloud.storage.StorageOptions;
    import java.io.IOException;
    import java.util.concurrent.ExecutionException;
    import java.util.regex.Matcher;
    import java.util.regex.Pattern;
    
    public class SubmitJob {
    
      public static void submitJob() throws IOException, InterruptedException {
        // TODO(developer): Replace these variables before running the sample.
        String projectId = "your-project-id";
        String region = "your-project-region";
        String clusterName = "your-cluster-name";
        submitJob(projectId, region, clusterName);
      }
    
      public static void submitJob(
          String projectId, String region, String clusterName)
          throws IOException, InterruptedException {
        String myEndpoint = String.format("%s-dataproc.googleapis.com:443", region);
    
        // Configure the settings for the job controller client.
        JobControllerSettings jobControllerSettings =
            JobControllerSettings.newBuilder().setEndpoint(myEndpoint).build();
    
        // Create a job controller client with the configured settings. Using a try-with-resources
        // closes the client,
        // but this can also be done manually with the .close() method.
        try (JobControllerClient jobControllerClient =
            JobControllerClient.create(jobControllerSettings)) {
    
          // Configure cluster placement for the job.
          JobPlacement jobPlacement = JobPlacement.newBuilder().setClusterName(clusterName).build();
    
          // Configure Spark job settings.
          SparkJob sparkJob =
              SparkJob.newBuilder()
                  .setMainClass("org.apache.spark.examples.SparkPi")
                  .addJarFileUris("file:///usr/lib/spark/examples/jars/spark-examples.jar")
                  .addArgs("1000")
                  .build();
    
          Job job = Job.newBuilder().setPlacement(jobPlacement).setSparkJob(sparkJob).build();
    
          // Submit an asynchronous request to execute the job.
          OperationFuture<Job, JobMetadata> submitJobAsOperationAsyncRequest =
              jobControllerClient.submitJobAsOperationAsync(projectId, region, job);
    
          Job response = submitJobAsOperationAsyncRequest.get();
    
          // Print output from Google Cloud Storage.
          Matcher matches =
              Pattern.compile("gs://(.*?)/(.*)").matcher(response.getDriverOutputResourceUri());
          matches.matches();
    
          Storage storage = StorageOptions.getDefaultInstance().getService();
          Blob blob = storage.get(matches.group(1), String.format("%s.000000000", matches.group(2)));
    
          System.out.println(
              String.format("Job finished successfully: %s", new String(blob.getContent())));
    
        } catch (ExecutionException e) {
          // If the job does not complete successfully, print the error message.
          System.err.println(String.format("submitJob: %s ", e.getMessage()));
        }
      }
    }

Python

  1. Installer la bibliothèque cliente
  2. Configurer les identifiants par défaut de l'application
  3. Exécuter le code
    import re
    
    from google.cloud import dataproc_v1 as dataproc
    from google.cloud import storage
    
    def submit_job(project_id, region, cluster_name):
        # Create the job client.
        job_client = dataproc.JobControllerClient(client_options={
            'api_endpoint': '{}-dataproc.googleapis.com:443'.format(region)
        })
    
        # Create the job config. 'main_jar_file_uri' can also be a
        # Google Cloud Storage URL.
        job = {
            'placement': {
                'cluster_name': cluster_name
            },
            'spark_job': {
                'main_class': 'org.apache.spark.examples.SparkPi',
                'jar_file_uris': ['file:///usr/lib/spark/examples/jars/spark-examples.jar'],
                'args': ['1000']
            }
        }
    
        operation = job_client.submit_job_as_operation(
            request={"project_id": project_id, "region": region, "job": job}
        )
        response = operation.result()
    
        # Dataproc job output gets saved to the Google Cloud Storage bucket
        # allocated to the job. Use a regex to obtain the bucket and blob info.
        matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri)
    
        output = (
            storage.Client()
            .get_bucket(matches.group(1))
            .blob(f"{matches.group(2)}.000000000")
            .download_as_string()
        )
    
        print(f"Job finished successfully: {output}")

Envoyer une tâche directement sur votre cluster

Si vous souhaitez exécuter une tâche directement sur votre cluster sans utiliser le service Dataproc, connectez-vous en SSH au nœud maître du cluster, puis exécutez la tâche sur le nœud maître.

Établir une connexion SSH à l'instance maître

Vous pouvez vous connecter en SSH à une instance de VM Compute Engine dans votre cluster depuis la ligne de commande ou depuis Cloud Console.

Commande gcloud

Exécutez la commande gcloud compute ssh en local dans une fenêtre de terminal ou depuis Cloud Shell pour vous connecter en SSH au nœud maître de votre cluster (le nom par défaut du nœud maître correspond au nom du cluster suivi du suffixe -m).

gcloud compute ssh cluster-name-m \
    --region=region \
    --project=project-id

L'extrait suivant utilise gcloud compute ssh pour établir une connexion en SSH avec le nœud maître de cluster-1.

gcloud compute ssh cluster-1-m \
    --region=us-central-1 \
    --project=my-project-id
...
Linux cluster-1-m 4.9.0-8-amd64 #1 SMP Debian 4.9.110-3+deb9u6...
...
user@cluster-1-m:~$

Console

Utilisez Cloud Console pour vous connecter en SSH au nœud maître de votre cluster (le nom par défaut du nœud maître correspond au nom du cluster suivi du suffixe -m).
  1. Dans Cloud Console, accédez à la page Instances de VM :
  2. Dans la liste des instances de machine virtuelle, cliquez sur SSH sur la ligne de l'instance maître (suffixe -m) à laquelle vous souhaitez vous connecter.

Une fenêtre de navigateur s'ouvre dans votre répertoire de base sur le nœud maître.

Connected, host fingerprint: ssh-rsa ...
Linux cluster-1-m 3.16.0-0.bpo.4-amd64 ...
...
user@cluster-1-m:~$

Exécuter une tâche Spark sur le nœud maître

Une fois la connexion SSH établie avec l'instance maître de la VM, exécutez des commandes dans une fenêtre de terminal sur le nœud maître du cluster pour effectuer les opérations suivantes :

  1. Ouvrir une interface système Spark.
  2. Exécuter une tâche Spark simple permettant de compter le nombre de lignes dans un fichier "hello-world" Python (à sept lignes), situé dans un fichier Cloud Storage accessible au public.
  3. Fermer l'interface système.

    user@cluster-name-m:~$ spark-shell
    ...
    scala> sc.textFile("gs://dataproc-examples"
    + "/pyspark/hello-world/hello-world.py").count
    ...
    res0: Long = 7
    scala> :quit
    

Exécuter des tâches bash sur Dataproc

Vous souhaiterez peut-être exécuter un script bash en tant que tâche Dataproc, soit parce que les moteurs que vous utilisez ne sont pas acceptés en tant que type de tâche Dataproc de premier niveau, soit parce qu'une configuration ou un calcul d'arguments supplémentaires doit être effectué avant de lancer une tâche à l'aide de hadoop ou spark-submit à partir de votre script.

Exemple d'instruction Pig

Supposons que vous ayez copié un script bash "hello.sh" dans Cloud Storage :

gsutil cp hello.sh gs://${BUCKET}/hello.sh

Comme la commande pig fs utilise des chemins Hadoop, copiez le script de Cloud Storage vers une destination spécifiée en tant que file:/// pour vous assurer qu'il se trouve sur le système de fichiers local plutôt que sur HDFS. Les commandes sh suivantes font automatiquement référence au système de fichiers local et ne nécessitent pas le préfixe file:///.

gcloud dataproc jobs submit pig --cluster=${CLUSTER} --region=${REGION} \
    -e='fs -cp -f gs://${BUCKET}/hello.sh file:///tmp/hello.sh; sh chmod 750 /tmp/hello.sh; sh /tmp/hello.sh'

Étant donné que les tâches Dataproc envoient l'argument --jars, un fichier est créé dans un répertoire temporaire créé pendant la durée de vie de la tâche, vous pouvez spécifier votre script shell Cloud Storage en tant qu'argument --jars :

gcloud dataproc jobs submit pig --cluster=${CLUSTER} --region=${REGION} \
    --jars=gs://${BUCKET}/hello.sh \
    -e='sh chmod 750 ${PWD}/hello.sh; sh ${PWD}/hello.sh'

Notez que l'argument --jars peut également faire référence à un script local :

gcloud dataproc jobs submit pig --cluster=${CLUSTER} --region=${REGION} \
    --jars=hello.sh \
    -e='sh chmod 750 ${PWD}/hello.sh; sh ${PWD}/hello.sh'