Invia un job

Puoi inviare un job a un cluster Dataproc esistente tramite un'API Dataproc jobs.submit HTTP o programmatica, utilizzando lo strumento a riga di comando gcloud di Google Cloud in una finestra del terminale locale o in Cloud Shell o dalla Google Cloud Console aperta in un browser locale. Puoi anche SSH nell'istanza master nel cluster, quindi eseguire un job direttamente dall'istanza senza utilizzare il servizio Dataproc.

Come inviare un lavoro

Console

Apri la pagina Invia un job di Dataproc nella console di Google Cloud nel browser.

Esempio di job di Spark

Per inviare un job Spark di esempio, compila i campi nella pagina Invia un job come segue:

  1. Seleziona il nome del tuo Cluster dall'elenco dei cluster.
  2. Imposta Tipo di job su Spark.
  3. Imposta Classe o jar principale su org.apache.spark.examples.SparkPi.
  4. Imposta Argomenti sul singolo argomento 1000.
  5. Aggiungi file:///usr/lib/spark/examples/jars/spark-examples.jar a file jar:
    1. file:/// indica uno schema Hadoop LocalFileSystem. Dataproc ha installato /usr/lib/spark/examples/jars/spark-examples.jar sul nodo master del cluster al momento della creazione del cluster.
    2. In alternativa, puoi specificare un percorso Cloud Storage (gs://your-bucket/your-jarfile.jar) o un percorso del file system Hadoop distribuito (hdfs://path-to-jar.jar) a uno dei tuoi jar.

Fai clic su Invia per avviare il job. Una volta avviato, il job viene aggiunto all'elenco dei job.

Fai clic sull'ID job per aprire la pagina Job, in cui puoi visualizzare l'output del driver del job. Poiché questo job produce lunghe righe di output che superano la larghezza della finestra del browser, puoi selezionare la casella Line wrapping (a capo della riga) per mostrare tutto il testo di output e mostrare il risultato calcolato per pi.

Puoi visualizzare l'output del driver del tuo job dalla riga di comando utilizzando il comando gcloud dataprocjob aspettare mostrato di seguito (per saperne di più, consulta Visualizza output del job - gcloud COMMAND). Copia e incolla il tuo ID progetto come valore per il flag --project e il tuo ID job (visualizzato nell'elenco dei job) come argomento finale.

gcloud dataproc jobs wait job-id \
    --project=project-id \
    --region=region

Ecco alcuni snippet dell'output del driver per il job di SparkPi esempio inviato sopra:

...
2015-06-25 23:27:23,810 INFO [dag-scheduler-event-loop]
scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Stage 0 (reduce at
SparkPi.scala:35) finished in 21.169 s

2015-06-25 23:27:23,810 INFO [task-result-getter-3] cluster.YarnScheduler
(Logging.scala:logInfo(59)) - Removed TaskSet 0.0, whose tasks have all
completed, from pool

2015-06-25 23:27:23,819 INFO [main] scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Job 0 finished: reduce at SparkPi.scala:35,
took 21.674931 s

Pi is roughly 3.14189648
...
Job [c556b47a-4b46-4a94-9ba2-2dcee31167b2] finished successfully.

driverOutputUri:
gs://sample-staging-bucket/google-cloud-dataproc-metainfo/cfeaa033-749e-48b9-...
...

gcloud

Per inviare un job a un cluster Dataproc, esegui il comando dell'interfaccia a riga di comando gcloud gcloud dataprocJob send localmente in una finestra del terminale o in Cloud Shell.

gcloud dataproc jobs submit job-command \
    --cluster=cluster-name \
    --region=region \
    other dataproc-flags \
    -- job-args
Esempio di invio del job PySpark
  1. Elenca i hello-world.py accessibili pubblicamente in Cloud Storage.
    gsutil cat gs://dataproc-examples/pyspark/hello-world/hello-world.py
    
    Elenco file:

    #!/usr/bin/python
    import pyspark
    sc = pyspark.SparkContext()
    rdd = sc.parallelize(['Hello,', 'world!'])
    words = sorted(rdd.collect())
    print(words)
    
  2. Inviare il job Pyspark a Dataproc.
    gcloud dataproc jobs submit pyspark \
        gs://dataproc-examples/pyspark/hello-world/hello-world.py \
        --cluster=cluster-name  \
        --region=region
    
    Output terminale:
    Waiting for job output...
    …
    ['Hello,', 'world!']
    Job finished successfully.
    
Esempio di invio del job di Spark
  1. Esegui l'esempio SparkPi preinstallato sul nodo master del cluster Dataproc.
    gcloud dataproc jobs submit spark \
        --cluster=cluster-name \
        --region=region \
        --class=org.apache.spark.examples.SparkPi \
        --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
        -- 1000
    
    Output terminale:
    Job [54825071-ae28-4c5b-85a5-58fae6a597d6] submitted.
    Waiting for job output…
    …
    Pi is roughly 3.14177148
    …
    Job finished successfully.
    …
    

REST &AM; LINEA CMD

Questa sezione mostra come inviare un job Spark per calcolare il valore approssimativo di pi utilizzando l'API Dataproc jobs.submit.

Prima di utilizzare i dati della richiesta, effettua le seguenti sostituzioni:

Metodo HTTP e URL:

POST https://dataproc.googleapis.com/v1/projects/project-id/regions/region/jobs:submit

Corpo JSON richiesta:

{
  "job": {
    "placement": {
      "clusterName": "cluster-name"
    },
    },
    "sparkJob": {
      "args": [
        "1000"
      ],
      "mainClass": "org.apache.spark.examples.SparkPi",
      "jarFileUris": [
        "file:///usr/lib/spark/examples/jars/spark-examples.jar"
      ]
    }
  }
}

Per inviare la richiesta, espandi una delle seguenti opzioni:

Dovresti ricevere una risposta JSON simile alla seguente:

{
  "reference": {
    "projectId": "project-id",
    "jobId": "job-id"
  },
  "placement": {
    "clusterName": "cluster-name",
    "clusterUuid": "cluster-Uuid"
  },
  "sparkJob": {
    "mainClass": "org.apache.spark.examples.SparkPi",
    "args": [
      "1000"
    ],
    "jarFileUris": [
      "file:///usr/lib/spark/examples/jars/spark-examples.jar"
    ]
  },
  "status": {
    "state": "PENDING",
    "stateStartTime": "2020-10-07T20:16:21.759Z"
  },
  "jobUuid": "job-Uuid"
}

Java

  1. Installare la libreria client
  2. Configurare le credenziali predefinite dell'applicazione
  3. Esegui il codice
    
    import com.google.api.gax.longrunning.OperationFuture;
    import com.google.cloud.dataproc.v1.Job;
    import com.google.cloud.dataproc.v1.JobControllerClient;
    import com.google.cloud.dataproc.v1.JobControllerSettings;
    import com.google.cloud.dataproc.v1.JobMetadata;
    import com.google.cloud.dataproc.v1.JobPlacement;
    import com.google.cloud.dataproc.v1.SparkJob;
    import com.google.cloud.storage.Blob;
    import com.google.cloud.storage.Storage;
    import com.google.cloud.storage.StorageOptions;
    import java.io.IOException;
    import java.util.concurrent.ExecutionException;
    import java.util.regex.Matcher;
    import java.util.regex.Pattern;
    
    public class SubmitJob {
    
      public static void submitJob() throws IOException, InterruptedException {
        // TODO(developer): Replace these variables before running the sample.
        String projectId = "your-project-id";
        String region = "your-project-region";
        String clusterName = "your-cluster-name";
        submitJob(projectId, region, clusterName);
      }
    
      public static void submitJob(String projectId, String region, String clusterName)
          throws IOException, InterruptedException {
        String myEndpoint = String.format("%s-dataproc.googleapis.com:443", region);
    
        // Configure the settings for the job controller client.
        JobControllerSettings jobControllerSettings =
            JobControllerSettings.newBuilder().setEndpoint(myEndpoint).build();
    
        // Create a job controller client with the configured settings. Using a try-with-resources
        // closes the client,
        // but this can also be done manually with the .close() method.
        try (JobControllerClient jobControllerClient =
            JobControllerClient.create(jobControllerSettings)) {
    
          // Configure cluster placement for the job.
          JobPlacement jobPlacement = JobPlacement.newBuilder().setClusterName(clusterName).build();
    
          // Configure Spark job settings.
          SparkJob sparkJob =
              SparkJob.newBuilder()
                  .setMainClass("org.apache.spark.examples.SparkPi")
                  .addJarFileUris("file:///usr/lib/spark/examples/jars/spark-examples.jar")
                  .addArgs("1000")
                  .build();
    
          Job job = Job.newBuilder().setPlacement(jobPlacement).setSparkJob(sparkJob).build();
    
          // Submit an asynchronous request to execute the job.
          OperationFuture<Job, JobMetadata> submitJobAsOperationAsyncRequest =
              jobControllerClient.submitJobAsOperationAsync(projectId, region, job);
    
          Job response = submitJobAsOperationAsyncRequest.get();
    
          // Print output from Google Cloud Storage.
          Matcher matches =
              Pattern.compile("gs://(.*?)/(.*)").matcher(response.getDriverOutputResourceUri());
          matches.matches();
    
          Storage storage = StorageOptions.getDefaultInstance().getService();
          Blob blob = storage.get(matches.group(1), String.format("%s.000000000", matches.group(2)));
    
          System.out.println(
              String.format("Job finished successfully: %s", new String(blob.getContent())));
    
        } catch (ExecutionException e) {
          // If the job does not complete successfully, print the error message.
          System.err.println(String.format("submitJob: %s ", e.getMessage()));
        }
      }
    }

Python

  1. Installare la libreria client
  2. Configurare le credenziali predefinite dell'applicazione
  3. Esegui il codice
    import re
    
    from google.cloud import dataproc_v1 as dataproc
    from google.cloud import storage
    
    def submit_job(project_id, region, cluster_name):
        # Create the job client.
        job_client = dataproc.JobControllerClient(
            client_options={"api_endpoint": "{}-dataproc.googleapis.com:443".format(region)}
        )
    
        # Create the job config. 'main_jar_file_uri' can also be a
        # Google Cloud Storage URL.
        job = {
            "placement": {"cluster_name": cluster_name},
            "spark_job": {
                "main_class": "org.apache.spark.examples.SparkPi",
                "jar_file_uris": ["file:///usr/lib/spark/examples/jars/spark-examples.jar"],
                "args": ["1000"],
            },
        }
    
        operation = job_client.submit_job_as_operation(
            request={"project_id": project_id, "region": region, "job": job}
        )
        response = operation.result()
    
        # Dataproc job output gets saved to the Google Cloud Storage bucket
        # allocated to the job. Use a regex to obtain the bucket and blob info.
        matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri)
    
        output = (
            storage.Client()
            .get_bucket(matches.group(1))
            .blob(f"{matches.group(2)}.000000000")
            .download_as_string()
        )
    
        print(f"Job finished successfully: {output}")
    
    

Go

  1. Installare la libreria client
  2. Configurare le credenziali predefinite dell'applicazione
  3. Esegui il codice
    import (
    	"context"
    	"fmt"
    	"io"
    	"io/ioutil"
    	"log"
    	"regexp"
    
    	dataproc "cloud.google.com/go/dataproc/apiv1"
    	"cloud.google.com/go/storage"
    	"google.golang.org/api/option"
    	dataprocpb "google.golang.org/genproto/googleapis/cloud/dataproc/v1"
    )
    
    func submitJob(w io.Writer, projectID, region, clusterName string) error {
    	// projectID := "your-project-id"
    	// region := "us-central1"
    	// clusterName := "your-cluster"
    	ctx := context.Background()
    
    	// Create the job client.
    	endpoint := fmt.Sprintf("%s-dataproc.googleapis.com:443", region)
    	jobClient, err := dataproc.NewJobControllerClient(ctx, option.WithEndpoint(endpoint))
    	if err != nil {
    		log.Fatalf("error creating the job client: %s\n", err)
    	}
    
    	// Create the job config.
    	submitJobReq := &dataprocpb.SubmitJobRequest{
    		ProjectId: projectID,
    		Region:    region,
    		Job: &dataprocpb.Job{
    			Placement: &dataprocpb.JobPlacement{
    				ClusterName: clusterName,
    			},
    			TypeJob: &dataprocpb.Job_SparkJob{
    				SparkJob: &dataprocpb.SparkJob{
    					Driver: &dataprocpb.SparkJob_MainClass{
    						MainClass: "org.apache.spark.examples.SparkPi",
    					},
    					JarFileUris: []string{"file:///usr/lib/spark/examples/jars/spark-examples.jar"},
    					Args:        []string{"1000"},
    				},
    			},
    		},
    	}
    
    	submitJobOp, err := jobClient.SubmitJobAsOperation(ctx, submitJobReq)
    	if err != nil {
    		return fmt.Errorf("error with request to submitting job: %v", err)
    	}
    
    	submitJobResp, err := submitJobOp.Wait(ctx)
    	if err != nil {
    		return fmt.Errorf("error submitting job: %v", err)
    	}
    
    	re := regexp.MustCompile("gs://(.+?)/(.+)")
    	matches := re.FindStringSubmatch(submitJobResp.DriverOutputResourceUri)
    
    	if len(matches) < 3 {
    		return fmt.Errorf("regex error: %s", submitJobResp.DriverOutputResourceUri)
    	}
    
    	// Dataproc job output gets saved to a GCS bucket allocated to it.
    	storageClient, err := storage.NewClient(ctx)
    	if err != nil {
    		return fmt.Errorf("error creating storage client: %v", err)
    	}
    
    	obj := fmt.Sprintf("%s.000000000", matches[2])
    	reader, err := storageClient.Bucket(matches[1]).Object(obj).NewReader(ctx)
    	if err != nil {
    		return fmt.Errorf("error reading job output: %v", err)
    	}
    
    	defer reader.Close()
    
    	body, err := ioutil.ReadAll(reader)
    	if err != nil {
    		return fmt.Errorf("could not read output from Dataproc Job: %v", err)
    	}
    
    	fmt.Fprintf(w, "Job finished successfully: %s", body)
    
    	return nil
    }
    

Node.js

  1. Installare la libreria client
  2. Configurare le credenziali predefinite dell'applicazione
  3. Esegui il codice
    const dataproc = require('@google-cloud/dataproc');
    const {Storage} = require('@google-cloud/storage');
    
    // TODO(developer): Uncomment and set the following variables
    // projectId = 'YOUR_PROJECT_ID'
    // region = 'YOUR_CLUSTER_REGION'
    // clusterName = 'YOUR_CLUSTER_NAME'
    
    // Create a client with the endpoint set to the desired cluster region
    const jobClient = new dataproc.v1.JobControllerClient({
      apiEndpoint: `${region}-dataproc.googleapis.com`,
      projectId: projectId,
    });
    
    async function submitJob() {
      const job = {
        projectId: projectId,
        region: region,
        job: {
          placement: {
            clusterName: clusterName,
          },
          sparkJob: {
            mainClass: 'org.apache.spark.examples.SparkPi',
            jarFileUris: [
              'file:///usr/lib/spark/examples/jars/spark-examples.jar',
            ],
            args: ['1000'],
          },
        },
      };
    
      const [jobOperation] = await jobClient.submitJobAsOperation(job);
      const [jobResponse] = await jobOperation.promise();
    
      const matches =
        jobResponse.driverOutputResourceUri.match('gs://(.*?)/(.*)');
    
      const storage = new Storage();
    
      const output = await storage
        .bucket(matches[1])
        .file(`${matches[2]}.000000000`)
        .download();
    
      // Output a success message.
      console.log(`Job finished successfully: ${output}`);

Invia un job direttamente nel cluster

Se vuoi eseguire un job direttamente sul tuo cluster senza utilizzare il servizio Dataproc, SSH nel nodo master del cluster, quindi esegui il job sul nodo master.

Dopo aver stabilito una connessione SSH all'istanza master della VM, esegui i comandi in una finestra del terminale sul nodo master del cluster per:

  1. Apri una shell Spark.
  2. Esegui un semplice job Spark per contare il numero di righe in un file Python (sette righe) all'interno di un file Cloud Storage accessibile pubblicamente.
  3. Esci dalla shell.

    user@cluster-name-m:~$ spark-shell
    ...
    scala> sc.textFile("gs://dataproc-examples"
    + "/pyspark/hello-world/hello-world.py").count
    ...
    res0: Long = 7
    scala> :quit
    

Eseguire job bash su Dataproc

Ti consigliamo di eseguire uno script bash come job Dataproc perché i motori in uso non sono supportati come tipo di job Dataproc di primo livello oppure perché devi eseguire un'ulteriore configurazione o calcolare gli argomenti prima di avviare un job utilizzando hadoop o spark-submit dallo script.

Esempio di Maiale

Supponiamo che tu abbia copiato uno script bash hello.sh in Cloud Storage:

gsutil cp hello.sh gs://${BUCKET}/hello.sh

Poiché il comando pig fs utilizza i percorsi Hadoop, copia lo script da Cloud Storage in una destinazione specificata come file:/// per assicurarti che sia sul file system locale anziché HDFS. I comandi sh successivi fanno riferimento automaticamente al file system locale e non richiedono il prefisso file:///.

gcloud dataproc jobs submit pig --cluster=${CLUSTER} --region=${REGION} \
    -e='fs -cp -f gs://${BUCKET}/hello.sh file:///tmp/hello.sh; sh chmod 750 /tmp/hello.sh; sh /tmp/hello.sh'

In alternativa, poiché i job Dataproc inviano l'argomento --jars per le fasi di un file in una directory temporanea creata per la durata del job, puoi specificare lo script della shell Cloud Storage come argomento --jars:

gcloud dataproc jobs submit pig --cluster=${CLUSTER} --region=${REGION} \
    --jars=gs://${BUCKET}/hello.sh \
    -e='sh chmod 750 ${PWD}/hello.sh; sh ${PWD}/hello.sh'

Tieni presente che l'argomento --jars può anche fare riferimento a uno script locale:

gcloud dataproc jobs submit pig --cluster=${CLUSTER} --region=${REGION} \
    --jars=hello.sh \
    -e='sh chmod 750 ${PWD}/hello.sh; sh ${PWD}/hello.sh'