Enviar una tarea

Puedes enviar una tarea a un clúster de Dataproc mediante una solicitud HTTP o programática jobs.submit de la API de Dataproc, con la herramienta de línea de comandos gcloud de la CLI de Google Cloud en una ventana de terminal local o en Cloud Shell, o desde la Google Cloud consola abierta en un navegador local. También puedes conectarte a la instancia principal mediante SSH de tu clúster y, a continuación, ejecutar un trabajo directamente desde la instancia sin usar el servicio Dataproc.

.

Cómo enviar una tarea

Consola

Abre la página de Dataproc Enviar un trabajo en la Google Cloud consola de tu navegador.

Ejemplo de trabajo de Spark

Para enviar una tarea de Spark de ejemplo, rellena los campos de la página Enviar una tarea de la siguiente manera:

  1. Selecciona el nombre de tu clúster en la lista de clústeres.
  2. En Tipo de tarea, selecciona Spark.
  3. Asigna el valor org.apache.spark.examples.SparkPi a Clase principal o .jar.
  4. Asigna a Arguments el argumento 1000.
  5. Añade file:///usr/lib/spark/examples/jars/spark-examples.jar a archivos .jar:
    1. file:/// denota un esquema de Hadoop LocalFileSystem. Dataproc instaló /usr/lib/spark/examples/jars/spark-examples.jar en el nodo maestro del clúster cuando creó el clúster.
    2. También puedes especificar una ruta de Cloud Storage (gs://your-bucket/your-jarfile.jar) o una ruta del sistema de archivos distribuidos de Hadoop (hdfs://path-to-jar.jar) a uno de tus archivos JAR.

Haz clic en Enviar para iniciar el trabajo. Una vez que se inicia el trabajo, se añade a la lista de trabajos.

Haz clic en el ID de la tarea para abrir la página Tareas, donde puedes ver el resultado del controlador de la tarea. Como este trabajo genera líneas de salida largas que superan el ancho de la ventana del navegador, puedes marcar la casilla Ajuste de línea para que todo el texto de salida se muestre en la vista y se pueda ver el resultado calculado de pi.

Puedes ver la salida del controlador de tu tarea desde la línea de comandos con el comando gcloud dataproc jobs wait que se muestra a continuación (para obtener más información, consulta Ver la salida de una tarea: COMANDO GCLOUD). Copia y pega el ID de tu proyecto como valor de la marca --project y el ID de empleo (que se muestra en la lista de empleos) como argumento final.

gcloud dataproc jobs wait job-id \
    --project=project-id \
    --region=region

A continuación se muestran fragmentos de la salida del controlador de la tarea SparkPi de ejemplo enviada anteriormente:

...
2015-06-25 23:27:23,810 INFO [dag-scheduler-event-loop]
scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Stage 0 (reduce at
SparkPi.scala:35) finished in 21.169 s

2015-06-25 23:27:23,810 INFO [task-result-getter-3] cluster.YarnScheduler
(Logging.scala:logInfo(59)) - Removed TaskSet 0.0, whose tasks have all
completed, from pool

2015-06-25 23:27:23,819 INFO [main] scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Job 0 finished: reduce at SparkPi.scala:35,
took 21.674931 s

Pi is roughly 3.14189648
...
Job [c556b47a-4b46-4a94-9ba2-2dcee31167b2] finished successfully.

driverOutputUri:
gs://sample-staging-bucket/google-cloud-dataproc-metainfo/cfeaa033-749e-48b9-...
...

gcloud

Para enviar una tarea a un clúster de Dataproc, ejecuta el comando de la CLI de gcloud gcloud dataproc jobs submit de forma local en una ventana de terminal o en Cloud Shell.

gcloud dataproc jobs submit job-command \
    --cluster=cluster-name \
    --region=region \
    other dataproc-flags \
    -- job-args
Ejemplo de envío de una tarea de PySpark
  1. Lista los hello-world.py de acceso público ubicados en Cloud Storage.
    gcloud storage cat gs://dataproc-examples/pyspark/hello-world/hello-world.py
    
    Lista de archivos:

    #!/usr/bin/python
    import pyspark
    sc = pyspark.SparkContext()
    rdd = sc.parallelize(['Hello,', 'world!'])
    words = sorted(rdd.collect())
    print(words)
  2. Envía la tarea de PySpark a Dataproc.
    gcloud dataproc jobs submit pyspark \
        gs://dataproc-examples/pyspark/hello-world/hello-world.py \
        --cluster=cluster-name  \
        --region=region
    
    Resultado del terminal:
    Waiting for job output...
    …
    ['Hello,', 'world!']
    Job finished successfully.
    
Ejemplo de envío de una tarea de Spark
  1. Ejecuta el ejemplo SparkPi preinstalado en el nodo maestro del clúster de Dataproc.
    gcloud dataproc jobs submit spark \
        --cluster=cluster-name \
        --region=region \
        --class=org.apache.spark.examples.SparkPi \
        --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
        -- 1000
    
    Resultado del terminal:
    Job [54825071-ae28-4c5b-85a5-58fae6a597d6] submitted.
    Waiting for job output…
    …
    Pi is roughly 3.14177148
    …
    Job finished successfully.
    …
    

REST

En esta sección se muestra cómo enviar una tarea de Spark para calcular el valor aproximado de pi mediante la API jobs.submit de Dataproc.

Antes de usar los datos de la solicitud, haz las siguientes sustituciones:

  • project-id: Google Cloud ID de proyecto
  • region: región del clúster
  • clusterName: nombre del clúster

Método HTTP y URL:

POST https://dataproc.googleapis.com/v1/projects/project-id/regions/region/jobs:submit

Cuerpo JSON de la solicitud:

{
  "job": {
    "placement": {
      "clusterName": "cluster-name"
    },
    "sparkJob": {
      "args": [
        "1000"
      ],
      "mainClass": "org.apache.spark.examples.SparkPi",
      "jarFileUris": [
        "file:///usr/lib/spark/examples/jars/spark-examples.jar"
      ]
    }
  }
}

Para enviar tu solicitud, despliega una de estas opciones:

Deberías recibir una respuesta JSON similar a la siguiente:

{
  "reference": {
    "projectId": "project-id",
    "jobId": "job-id"
  },
  "placement": {
    "clusterName": "cluster-name",
    "clusterUuid": "cluster-Uuid"
  },
  "sparkJob": {
    "mainClass": "org.apache.spark.examples.SparkPi",
    "args": [
      "1000"
    ],
    "jarFileUris": [
      "file:///usr/lib/spark/examples/jars/spark-examples.jar"
    ]
  },
  "status": {
    "state": "PENDING",
    "stateStartTime": "2020-10-07T20:16:21.759Z"
  },
  "jobUuid": "job-Uuid"
}

Java

  1. Instalar la biblioteca de cliente
  2. Configurar credenciales predeterminadas de la aplicación
  3. Ejecuta el código. .
    
    import com.google.api.gax.longrunning.OperationFuture;
    import com.google.cloud.dataproc.v1.Job;
    import com.google.cloud.dataproc.v1.JobControllerClient;
    import com.google.cloud.dataproc.v1.JobControllerSettings;
    import com.google.cloud.dataproc.v1.JobMetadata;
    import com.google.cloud.dataproc.v1.JobPlacement;
    import com.google.cloud.dataproc.v1.SparkJob;
    import com.google.cloud.storage.Blob;
    import com.google.cloud.storage.Storage;
    import com.google.cloud.storage.StorageOptions;
    import java.io.IOException;
    import java.util.concurrent.ExecutionException;
    import java.util.regex.Matcher;
    import java.util.regex.Pattern;
    
    public class SubmitJob {
    
      public static void submitJob() throws IOException, InterruptedException {
        // TODO(developer): Replace these variables before running the sample.
        String projectId = "your-project-id";
        String region = "your-project-region";
        String clusterName = "your-cluster-name";
        submitJob(projectId, region, clusterName);
      }
    
      public static void submitJob(String projectId, String region, String clusterName)
          throws IOException, InterruptedException {
        String myEndpoint = String.format("%s-dataproc.googleapis.com:443", region);
    
        // Configure the settings for the job controller client.
        JobControllerSettings jobControllerSettings =
            JobControllerSettings.newBuilder().setEndpoint(myEndpoint).build();
    
        // Create a job controller client with the configured settings. Using a try-with-resources
        // closes the client,
        // but this can also be done manually with the .close() method.
        try (JobControllerClient jobControllerClient =
            JobControllerClient.create(jobControllerSettings)) {
    
          // Configure cluster placement for the job.
          JobPlacement jobPlacement = JobPlacement.newBuilder().setClusterName(clusterName).build();
    
          // Configure Spark job settings.
          SparkJob sparkJob =
              SparkJob.newBuilder()
                  .setMainClass("org.apache.spark.examples.SparkPi")
                  .addJarFileUris("file:///usr/lib/spark/examples/jars/spark-examples.jar")
                  .addArgs("1000")
                  .build();
    
          Job job = Job.newBuilder().setPlacement(jobPlacement).setSparkJob(sparkJob).build();
    
          // Submit an asynchronous request to execute the job.
          OperationFuture<Job, JobMetadata> submitJobAsOperationAsyncRequest =
              jobControllerClient.submitJobAsOperationAsync(projectId, region, job);
    
          Job response = submitJobAsOperationAsyncRequest.get();
    
          // Print output from Google Cloud Storage.
          Matcher matches =
              Pattern.compile("gs://(.*?)/(.*)").matcher(response.getDriverOutputResourceUri());
          matches.matches();
    
          Storage storage = StorageOptions.getDefaultInstance().getService();
          Blob blob = storage.get(matches.group(1), String.format("%s.000000000", matches.group(2)));
    
          System.out.println(
              String.format("Job finished successfully: %s", new String(blob.getContent())));
    
        } catch (ExecutionException e) {
          // If the job does not complete successfully, print the error message.
          System.err.println(String.format("submitJob: %s ", e.getMessage()));
        }
      }
    }

Python

  1. Instalar la biblioteca de cliente
  2. Configurar credenciales predeterminadas de la aplicación
  3. Ejecuta el código.
    import re
    
    
    from google.cloud import dataproc_v1 as dataproc
    from google.cloud import storage
    
    
    def submit_job(project_id, region, cluster_name):
        # Create the job client.
        job_client = dataproc.JobControllerClient(
            client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
        )
    
        # Create the job config. 'main_jar_file_uri' can also be a
        # Google Cloud Storage URL.
        job = {
            "placement": {"cluster_name": cluster_name},
            "spark_job": {
                "main_class": "org.apache.spark.examples.SparkPi",
                "jar_file_uris": ["file:///usr/lib/spark/examples/jars/spark-examples.jar"],
                "args": ["1000"],
            },
        }
    
        operation = job_client.submit_job_as_operation(
            request={"project_id": project_id, "region": region, "job": job}
        )
        response = operation.result()
    
        # Dataproc job output gets saved to the Google Cloud Storage bucket
        # allocated to the job. Use a regex to obtain the bucket and blob info.
        matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri)
    
        output = (
            storage.Client()
            .get_bucket(matches.group(1))
            .blob(f"{matches.group(2)}.000000000")
            .download_as_bytes()
            .decode("utf-8")
        )
    
        print(f"Job finished successfully: {output}")
    
    

Go

  1. Instalar la biblioteca de cliente
  2. Configurar credenciales predeterminadas de la aplicación
  3. Ejecuta el código.
    import (
    	"context"
    	"fmt"
    	"io"
    	"log"
    	"regexp"
    
    	dataproc "cloud.google.com/go/dataproc/apiv1"
    	"cloud.google.com/go/dataproc/apiv1/dataprocpb"
    	"cloud.google.com/go/storage"
    	"google.golang.org/api/option"
    )
    
    func submitJob(w io.Writer, projectID, region, clusterName string) error {
    	// projectID := "your-project-id"
    	// region := "us-central1"
    	// clusterName := "your-cluster"
    	ctx := context.Background()
    
    	// Create the job client.
    	endpoint := fmt.Sprintf("%s-dataproc.googleapis.com:443", region)
    	jobClient, err := dataproc.NewJobControllerClient(ctx, option.WithEndpoint(endpoint))
    	if err != nil {
    		log.Fatalf("error creating the job client: %s\n", err)
    	}
    
    	// Create the job config.
    	submitJobReq := &dataprocpb.SubmitJobRequest{
    		ProjectId: projectID,
    		Region:    region,
    		Job: &dataprocpb.Job{
    			Placement: &dataprocpb.JobPlacement{
    				ClusterName: clusterName,
    			},
    			TypeJob: &dataprocpb.Job_SparkJob{
    				SparkJob: &dataprocpb.SparkJob{
    					Driver: &dataprocpb.SparkJob_MainClass{
    						MainClass: "org.apache.spark.examples.SparkPi",
    					},
    					JarFileUris: []string{"file:///usr/lib/spark/examples/jars/spark-examples.jar"},
    					Args:        []string{"1000"},
    				},
    			},
    		},
    	}
    
    	submitJobOp, err := jobClient.SubmitJobAsOperation(ctx, submitJobReq)
    	if err != nil {
    		return fmt.Errorf("error with request to submitting job: %w", err)
    	}
    
    	submitJobResp, err := submitJobOp.Wait(ctx)
    	if err != nil {
    		return fmt.Errorf("error submitting job: %w", err)
    	}
    
    	re := regexp.MustCompile("gs://(.+?)/(.+)")
    	matches := re.FindStringSubmatch(submitJobResp.DriverOutputResourceUri)
    
    	if len(matches) < 3 {
    		return fmt.Errorf("regex error: %s", submitJobResp.DriverOutputResourceUri)
    	}
    
    	// Dataproc job output gets saved to a GCS bucket allocated to it.
    	storageClient, err := storage.NewClient(ctx)
    	if err != nil {
    		return fmt.Errorf("error creating storage client: %w", err)
    	}
    
    	obj := fmt.Sprintf("%s.000000000", matches[2])
    	reader, err := storageClient.Bucket(matches[1]).Object(obj).NewReader(ctx)
    	if err != nil {
    		return fmt.Errorf("error reading job output: %w", err)
    	}
    
    	defer reader.Close()
    
    	body, err := io.ReadAll(reader)
    	if err != nil {
    		return fmt.Errorf("could not read output from Dataproc Job: %w", err)
    	}
    
    	fmt.Fprintf(w, "Job finished successfully: %s", body)
    
    	return nil
    }
    

Node.js

  1. Instalar la biblioteca de cliente
  2. Configurar credenciales predeterminadas de la aplicación
  3. Ejecuta el código.
    const dataproc = require('@google-cloud/dataproc');
    const {Storage} = require('@google-cloud/storage');
    
    // TODO(developer): Uncomment and set the following variables
    // projectId = 'YOUR_PROJECT_ID'
    // region = 'YOUR_CLUSTER_REGION'
    // clusterName = 'YOUR_CLUSTER_NAME'
    
    // Create a client with the endpoint set to the desired cluster region
    const jobClient = new dataproc.v1.JobControllerClient({
      apiEndpoint: `${region}-dataproc.googleapis.com`,
      projectId: projectId,
    });
    
    async function submitJob() {
      const job = {
        projectId: projectId,
        region: region,
        job: {
          placement: {
            clusterName: clusterName,
          },
          sparkJob: {
            mainClass: 'org.apache.spark.examples.SparkPi',
            jarFileUris: [
              'file:///usr/lib/spark/examples/jars/spark-examples.jar',
            ],
            args: ['1000'],
          },
        },
      };
    
      const [jobOperation] = await jobClient.submitJobAsOperation(job);
      const [jobResponse] = await jobOperation.promise();
    
      const matches =
        jobResponse.driverOutputResourceUri.match('gs://(.*?)/(.*)');
    
      const storage = new Storage();
    
      const output = await storage
        .bucket(matches[1])
        .file(`${matches[2]}.000000000`)
        .download();
    
      // Output a success message.
      console.log(`Job finished successfully: ${output}`);

Enviar un trabajo directamente en tu clúster

Si quieres ejecutar un trabajo directamente en tu clúster sin usar el servicio Dataproc, conéctate mediante SSH al nodo maestro de tu clúster y, a continuación, ejecuta el trabajo en el nodo maestro.

Después de establecer una conexión SSH con la instancia maestra de la VM, ejecuta comandos en una ventana de terminal del nodo maestro del clúster para hacer lo siguiente:

  1. Abre un shell de Spark.
  2. Ejecuta una tarea de Spark sencilla para contar el número de líneas de un archivo Python "hello-world" (de siete líneas) ubicado en un archivo de Cloud Storage de acceso público.
  3. Salir del shell.

    user@cluster-name-m:~$ spark-shell
    ...
    scala> sc.textFile("gs://dataproc-examples"
    + "/pyspark/hello-world/hello-world.py").count
    ...
    res0: Long = 7
    scala> :quit
    

Ejecutar tareas de Bash en Dataproc

Puede que quieras ejecutar una secuencia de comandos bash como trabajo de Dataproc, ya sea porque los motores que usas no se admiten como tipo de trabajo de Dataproc de nivel superior o porque necesitas hacer una configuración adicional o calcular argumentos antes de iniciar un trabajo con hadoop o spark-submit desde tu secuencia de comandos.

Ejemplo de Pig

Supongamos que ha copiado una secuencia de comandos bash hello.sh en Cloud Storage:

gcloud storage cp hello.sh gs://${BUCKET}/hello.sh

Como el comando pig fs usa rutas de Hadoop, copie la secuencia de comandos de Cloud Storage en un destino especificado como file:/// para asegurarse de que se encuentra en el sistema de archivos local en lugar de en HDFS. Los comandos sh posteriores hacen referencia al sistema de archivos local automáticamente y no requieren el prefijo file:///.

gcloud dataproc jobs submit pig --cluster=${CLUSTER} --region=${REGION} \
    -e='fs -cp -f gs://${BUCKET}/hello.sh file:///tmp/hello.sh; sh chmod 750 /tmp/hello.sh; sh /tmp/hello.sh'

También puedes especificar tu secuencia de comandos de shell de Cloud Storage como argumento --jars, ya que el argumento --jars de las tareas de Dataproc pone en el área de stage un archivo en un directorio temporal creado durante el tiempo de vida de la tarea:

gcloud dataproc jobs submit pig --cluster=${CLUSTER} --region=${REGION} \
    --jars=gs://${BUCKET}/hello.sh \
    -e='sh chmod 750 ${PWD}/hello.sh; sh ${PWD}/hello.sh'

Ten en cuenta que el argumento --jars también puede hacer referencia a una secuencia de comandos local:

gcloud dataproc jobs submit pig --cluster=${CLUSTER} --region=${REGION} \
    --jars=hello.sh \
    -e='sh chmod 750 ${PWD}/hello.sh; sh ${PWD}/hello.sh'