Envoyer une tâche

Vous pouvez envoyer une tâche à un cluster Dataproc existant via une requête HTTP ou automatisée jobs.submit de l'API Dataproc, à l'aide de l'outil de ligne de commande gcloud du Google Cloud CLI dans une fenêtre de terminal locale ou dans Cloud Shell, ou encore à partir de la console ouverte dans un navigateur local. Vous pouvez également vous connecter en SSH à l'instance maître de votre cluster, puis exécuter une tâche directement depuis l'instance sans utiliser le service Dataproc.

Envoyer un job

Console

Ouvrez la page Dataproc Submit a job (Envoyer une tâche) dans la console Google Cloud de votre navigateur.

Exemple de tâche Spark

Pour envoyer un exemple de tâche Spark, remplissez les champs de la page Submit a job (Envoyer une tâche) comme suit:

  1. Sélectionnez le nom du cluster dans la liste des clusters.
  2. Définissez le champ Job Type (Type de tâche) sur Spark.
  3. Définissez le champ Main class or jar (Classe principale ou fichier JAR) sur org.apache.spark.examples.SparkPi.
  4. Définissez le champ Arguments sur l'argument unique 1000.
  5. Ajoutez des fichiers JAR file:///usr/lib/spark/examples/jars/spark-examples.jar dans le champ Jar files (Fichiers JAR) :
    1. file:/// désigne un schéma LocalFileSystem Hadoop. Dataproc a installé /usr/lib/spark/examples/jars/spark-examples.jar sur le nœud maître du cluster lors de la création de celui-ci.
    2. Vous pouvez également spécifier un chemin d'accès à Cloud Storage (gs://your-bucket/your-jarfile.jar) ou un chemin d'accès au système de fichiers distribué Hadoop (hdfs://path-to-jar.jar) vers l'un de vos fichiers JAR.

Cliquez sur Envoyer pour démarrer la tâche. Une fois la tâche démarrée, elle est ajoutée à la liste des Tâches.

Cliquez sur l'ID de la tâche pour ouvrir la page Jobs (Tâches) qui affiche les résultats du pilote de la tâche. Étant donné que cette tâche génère de longues lignes de résultats dépassant la largeur de la fenêtre du navigateur, vous pouvez cocher la case Line wrapping (Renvoi à la ligne automatique) pour afficher tout le texte de résultats dans la vue et consulter le résultat de calcul pour pi.

Vous pouvez afficher les résultats du pilote de la tâche depuis la ligne de commande à l'aide de la commande gcloud dataproc jobs wait présentée ci-dessous (pour en savoir plus, consultez la section Afficher la sortie de la tâche – Commande gcloud). Copiez et collez votre ID de projet en tant que valeur de l'option --project et votre ID de tâche (affiché dans la liste des tâches) en tant qu'argument final.

gcloud dataproc jobs wait job-id \
    --project=project-id \
    --region=region

Voici des extraits de résultats de pilotes pour l'exemple de tâche SparkPi, envoyée ci-dessus :

...
2015-06-25 23:27:23,810 INFO [dag-scheduler-event-loop]
scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Stage 0 (reduce at
SparkPi.scala:35) finished in 21.169 s

2015-06-25 23:27:23,810 INFO [task-result-getter-3] cluster.YarnScheduler
(Logging.scala:logInfo(59)) - Removed TaskSet 0.0, whose tasks have all
completed, from pool

2015-06-25 23:27:23,819 INFO [main] scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Job 0 finished: reduce at SparkPi.scala:35,
took 21.674931 s

Pi is roughly 3.14189648
...
Job [c556b47a-4b46-4a94-9ba2-2dcee31167b2] finished successfully.

driverOutputUri:
gs://sample-staging-bucket/google-cloud-dataproc-metainfo/cfeaa033-749e-48b9-...
...

gcloud

Pour envoyer une tâche à un cluster Dataproc, exécutez la commande de gcloud CLI gcloud dataproc jobs submit en local dans une fenêtre de terminal ou dans Cloud Shell.

gcloud dataproc jobs submit job-command \
    --cluster=cluster-name \
    --region=region \
    other dataproc-flags \
    -- job-args
Exemple d'envoi de tâche PySpark
  1. Répertoriez l'élément accessible au public hello-world.py, situé dans Cloud Storage.
    gcloud storage cat gs://dataproc-examples/pyspark/hello-world/hello-world.py
    
    Fichiers répertoriés:

    #!/usr/bin/python
    import pyspark
    sc = pyspark.SparkContext()
    rdd = sc.parallelize(['Hello,', 'world!'])
    words = sorted(rdd.collect())
    print(words)
  2. Envoyez la tâche Pyspark à Dataproc.
    gcloud dataproc jobs submit pyspark \
        gs://dataproc-examples/pyspark/hello-world/hello-world.py \
        --cluster=cluster-name  \
        --region=region
    
    Résultat du terminal:
    Waiting for job output...
    …
    ['Hello,', 'world!']
    Job finished successfully.
    
Exemple d'envoi de tâche Spark
  1. Exécutez l'exemple SparkPi pré-installé sur le nœud maître du cluster Dataproc.
    gcloud dataproc jobs submit spark \
        --cluster=cluster-name \
        --region=region \
        --class=org.apache.spark.examples.SparkPi \
        --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
        -- 1000
    
    Résultat du terminal:
    Job [54825071-ae28-4c5b-85a5-58fae6a597d6] submitted.
    Waiting for job output…
    …
    Pi is roughly 3.14177148
    …
    Job finished successfully.
    …
    

REST

Cette section explique comment envoyer une tâche Spark pour calculer la valeur approximative de pi à l'aide de l'API jobs.submit de Dataproc.

Avant d'utiliser les données de requête ci-dessous, effectuez les remplacements suivants :

  • project-id: ID de projet Google Cloud
  • region : région du cluster
  • clusterName : nom du cluster

Méthode HTTP et URL :

POST https://dataproc.googleapis.com/v1/projects/project-id/regions/region/jobs:submit

Corps JSON de la requête :

{
  "job": {
    "placement": {
      "clusterName": "cluster-name"
    },
    "sparkJob": {
      "args": [
        "1000"
      ],
      "mainClass": "org.apache.spark.examples.SparkPi",
      "jarFileUris": [
        "file:///usr/lib/spark/examples/jars/spark-examples.jar"
      ]
    }
  }
}

Pour envoyer votre requête, développez l'une des options suivantes :

Vous devriez recevoir une réponse JSON de ce type :

{
  "reference": {
    "projectId": "project-id",
    "jobId": "job-id"
  },
  "placement": {
    "clusterName": "cluster-name",
    "clusterUuid": "cluster-Uuid"
  },
  "sparkJob": {
    "mainClass": "org.apache.spark.examples.SparkPi",
    "args": [
      "1000"
    ],
    "jarFileUris": [
      "file:///usr/lib/spark/examples/jars/spark-examples.jar"
    ]
  },
  "status": {
    "state": "PENDING",
    "stateStartTime": "2020-10-07T20:16:21.759Z"
  },
  "jobUuid": "job-Uuid"
}

Java

  1. Installer la bibliothèque cliente
  2. Configurer les identifiants par défaut de l'application
  3. Exécuter le code
    
    import com.google.api.gax.longrunning.OperationFuture;
    import com.google.cloud.dataproc.v1.Job;
    import com.google.cloud.dataproc.v1.JobControllerClient;
    import com.google.cloud.dataproc.v1.JobControllerSettings;
    import com.google.cloud.dataproc.v1.JobMetadata;
    import com.google.cloud.dataproc.v1.JobPlacement;
    import com.google.cloud.dataproc.v1.SparkJob;
    import com.google.cloud.storage.Blob;
    import com.google.cloud.storage.Storage;
    import com.google.cloud.storage.StorageOptions;
    import java.io.IOException;
    import java.util.concurrent.ExecutionException;
    import java.util.regex.Matcher;
    import java.util.regex.Pattern;
    
    public class SubmitJob {
    
      public static void submitJob() throws IOException, InterruptedException {
        // TODO(developer): Replace these variables before running the sample.
        String projectId = "your-project-id";
        String region = "your-project-region";
        String clusterName = "your-cluster-name";
        submitJob(projectId, region, clusterName);
      }
    
      public static void submitJob(String projectId, String region, String clusterName)
          throws IOException, InterruptedException {
        String myEndpoint = String.format("%s-dataproc.googleapis.com:443", region);
    
        // Configure the settings for the job controller client.
        JobControllerSettings jobControllerSettings =
            JobControllerSettings.newBuilder().setEndpoint(myEndpoint).build();
    
        // Create a job controller client with the configured settings. Using a try-with-resources
        // closes the client,
        // but this can also be done manually with the .close() method.
        try (JobControllerClient jobControllerClient =
            JobControllerClient.create(jobControllerSettings)) {
    
          // Configure cluster placement for the job.
          JobPlacement jobPlacement = JobPlacement.newBuilder().setClusterName(clusterName).build();
    
          // Configure Spark job settings.
          SparkJob sparkJob =
              SparkJob.newBuilder()
                  .setMainClass("org.apache.spark.examples.SparkPi")
                  .addJarFileUris("file:///usr/lib/spark/examples/jars/spark-examples.jar")
                  .addArgs("1000")
                  .build();
    
          Job job = Job.newBuilder().setPlacement(jobPlacement).setSparkJob(sparkJob).build();
    
          // Submit an asynchronous request to execute the job.
          OperationFuture<Job, JobMetadata> submitJobAsOperationAsyncRequest =
              jobControllerClient.submitJobAsOperationAsync(projectId, region, job);
    
          Job response = submitJobAsOperationAsyncRequest.get();
    
          // Print output from Google Cloud Storage.
          Matcher matches =
              Pattern.compile("gs://(.*?)/(.*)").matcher(response.getDriverOutputResourceUri());
          matches.matches();
    
          Storage storage = StorageOptions.getDefaultInstance().getService();
          Blob blob = storage.get(matches.group(1), String.format("%s.000000000", matches.group(2)));
    
          System.out.println(
              String.format("Job finished successfully: %s", new String(blob.getContent())));
    
        } catch (ExecutionException e) {
          // If the job does not complete successfully, print the error message.
          System.err.println(String.format("submitJob: %s ", e.getMessage()));
        }
      }
    }

Python

  1. Installer la bibliothèque cliente
  2. Configurer les identifiants par défaut de l'application
  3. Exécuter le code
    import re
    
    
    from google.cloud import dataproc_v1 as dataproc
    from google.cloud import storage
    
    
    def submit_job(project_id, region, cluster_name):
        # Create the job client.
        job_client = dataproc.JobControllerClient(
            client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
        )
    
        # Create the job config. 'main_jar_file_uri' can also be a
        # Google Cloud Storage URL.
        job = {
            "placement": {"cluster_name": cluster_name},
            "spark_job": {
                "main_class": "org.apache.spark.examples.SparkPi",
                "jar_file_uris": ["file:///usr/lib/spark/examples/jars/spark-examples.jar"],
                "args": ["1000"],
            },
        }
    
        operation = job_client.submit_job_as_operation(
            request={"project_id": project_id, "region": region, "job": job}
        )
        response = operation.result()
    
        # Dataproc job output gets saved to the Google Cloud Storage bucket
        # allocated to the job. Use a regex to obtain the bucket and blob info.
        matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri)
    
        output = (
            storage.Client()
            .get_bucket(matches.group(1))
            .blob(f"{matches.group(2)}.000000000")
            .download_as_bytes()
            .decode("utf-8")
        )
    
        print(f"Job finished successfully: {output}")
    
    

Go

  1. Installer la bibliothèque cliente
  2. Configurer les identifiants par défaut de l'application
  3. Exécuter le code
    import (
    	"context"
    	"fmt"
    	"io"
    	"log"
    	"regexp"
    
    	dataproc "cloud.google.com/go/dataproc/apiv1"
    	"cloud.google.com/go/dataproc/apiv1/dataprocpb"
    	"cloud.google.com/go/storage"
    	"google.golang.org/api/option"
    )
    
    func submitJob(w io.Writer, projectID, region, clusterName string) error {
    	// projectID := "your-project-id"
    	// region := "us-central1"
    	// clusterName := "your-cluster"
    	ctx := context.Background()
    
    	// Create the job client.
    	endpoint := fmt.Sprintf("%s-dataproc.googleapis.com:443", region)
    	jobClient, err := dataproc.NewJobControllerClient(ctx, option.WithEndpoint(endpoint))
    	if err != nil {
    		log.Fatalf("error creating the job client: %s\n", err)
    	}
    
    	// Create the job config.
    	submitJobReq := &dataprocpb.SubmitJobRequest{
    		ProjectId: projectID,
    		Region:    region,
    		Job: &dataprocpb.Job{
    			Placement: &dataprocpb.JobPlacement{
    				ClusterName: clusterName,
    			},
    			TypeJob: &dataprocpb.Job_SparkJob{
    				SparkJob: &dataprocpb.SparkJob{
    					Driver: &dataprocpb.SparkJob_MainClass{
    						MainClass: "org.apache.spark.examples.SparkPi",
    					},
    					JarFileUris: []string{"file:///usr/lib/spark/examples/jars/spark-examples.jar"},
    					Args:        []string{"1000"},
    				},
    			},
    		},
    	}
    
    	submitJobOp, err := jobClient.SubmitJobAsOperation(ctx, submitJobReq)
    	if err != nil {
    		return fmt.Errorf("error with request to submitting job: %w", err)
    	}
    
    	submitJobResp, err := submitJobOp.Wait(ctx)
    	if err != nil {
    		return fmt.Errorf("error submitting job: %w", err)
    	}
    
    	re := regexp.MustCompile("gs://(.+?)/(.+)")
    	matches := re.FindStringSubmatch(submitJobResp.DriverOutputResourceUri)
    
    	if len(matches) < 3 {
    		return fmt.Errorf("regex error: %s", submitJobResp.DriverOutputResourceUri)
    	}
    
    	// Dataproc job output gets saved to a GCS bucket allocated to it.
    	storageClient, err := storage.NewClient(ctx)
    	if err != nil {
    		return fmt.Errorf("error creating storage client: %w", err)
    	}
    
    	obj := fmt.Sprintf("%s.000000000", matches[2])
    	reader, err := storageClient.Bucket(matches[1]).Object(obj).NewReader(ctx)
    	if err != nil {
    		return fmt.Errorf("error reading job output: %w", err)
    	}
    
    	defer reader.Close()
    
    	body, err := io.ReadAll(reader)
    	if err != nil {
    		return fmt.Errorf("could not read output from Dataproc Job: %w", err)
    	}
    
    	fmt.Fprintf(w, "Job finished successfully: %s", body)
    
    	return nil
    }
    

Node.js

  1. Installer la bibliothèque cliente
  2. Configurer les identifiants par défaut de l'application
  3. Exécuter le code
    const dataproc = require('@google-cloud/dataproc');
    const {Storage} = require('@google-cloud/storage');
    
    // TODO(developer): Uncomment and set the following variables
    // projectId = 'YOUR_PROJECT_ID'
    // region = 'YOUR_CLUSTER_REGION'
    // clusterName = 'YOUR_CLUSTER_NAME'
    
    // Create a client with the endpoint set to the desired cluster region
    const jobClient = new dataproc.v1.JobControllerClient({
      apiEndpoint: `${region}-dataproc.googleapis.com`,
      projectId: projectId,
    });
    
    async function submitJob() {
      const job = {
        projectId: projectId,
        region: region,
        job: {
          placement: {
            clusterName: clusterName,
          },
          sparkJob: {
            mainClass: 'org.apache.spark.examples.SparkPi',
            jarFileUris: [
              'file:///usr/lib/spark/examples/jars/spark-examples.jar',
            ],
            args: ['1000'],
          },
        },
      };
    
      const [jobOperation] = await jobClient.submitJobAsOperation(job);
      const [jobResponse] = await jobOperation.promise();
    
      const matches =
        jobResponse.driverOutputResourceUri.match('gs://(.*?)/(.*)');
    
      const storage = new Storage();
    
      const output = await storage
        .bucket(matches[1])
        .file(`${matches[2]}.000000000`)
        .download();
    
      // Output a success message.
      console.log(`Job finished successfully: ${output}`);

Envoyer une tâche directement sur votre cluster

Si vous souhaitez exécuter une tâche directement sur votre cluster sans utiliser le service Dataproc, connectez-vous en SSH au nœud maître de votre cluster, puis exécutez la tâche sur le nœud maître.

Une fois la connexion SSH établie avec l'instance maître de la VM, exécutez des commandes dans une fenêtre de terminal sur le nœud maître du cluster pour effectuer les opérations suivantes :

  1. Ouvrir une interface système Spark.
  2. Exécuter une tâche Spark simple permettant de compter le nombre de lignes dans un fichier "hello-world" Python (à sept lignes), situé dans un fichier Cloud Storage accessible au public.
  3. Fermer l'interface système.

    user@cluster-name-m:~$ spark-shell
    ...
    scala> sc.textFile("gs://dataproc-examples"
    + "/pyspark/hello-world/hello-world.py").count
    ...
    res0: Long = 7
    scala> :quit
    

Exécuter des tâches bash sur Dataproc

Vous souhaiterez peut-être exécuter un script bash en tant que tâche Dataproc, soit parce que les moteurs que vous utilisez ne sont pas acceptés en tant que type de tâche Dataproc de premier niveau, soit parce qu'une configuration ou un calcul d'arguments supplémentaires doit être effectué avant de lancer une tâche à l'aide de hadoop ou spark-submit à partir de votre script.

Exemple d'instruction Pig

Supposons que vous ayez copié un script bash "hello.sh" dans Cloud Storage :

gcloud storage cp hello.sh gs://${BUCKET}/hello.sh

Comme la commande pig fs utilise des chemins Hadoop, copiez le script de Cloud Storage vers une destination spécifiée en tant que file:/// pour vous assurer qu'il se trouve sur le système de fichiers local plutôt que sur HDFS. Les commandes sh suivantes font automatiquement référence au système de fichiers local et ne nécessitent pas le préfixe file:///.

gcloud dataproc jobs submit pig --cluster=${CLUSTER} --region=${REGION} \
    -e='fs -cp -f gs://${BUCKET}/hello.sh file:///tmp/hello.sh; sh chmod 750 /tmp/hello.sh; sh /tmp/hello.sh'

Étant donné que les tâches Dataproc envoient l'argument --jars, un fichier est créé dans un répertoire temporaire créé pendant la durée de vie de la tâche, vous pouvez spécifier votre script shell Cloud Storage en tant qu'argument --jars :

gcloud dataproc jobs submit pig --cluster=${CLUSTER} --region=${REGION} \
    --jars=gs://${BUCKET}/hello.sh \
    -e='sh chmod 750 ${PWD}/hello.sh; sh ${PWD}/hello.sh'

Notez que l'argument --jars peut également faire référence à un script local :

gcloud dataproc jobs submit pig --cluster=${CLUSTER} --region=${REGION} \
    --jars=hello.sh \
    -e='sh chmod 750 ${PWD}/hello.sh; sh ${PWD}/hello.sh'