Envía un trabajo

Puedes enviar un trabajo a un clúster de Dataproc existente a través de una API de Dataprocjobs.submit Solicitud HTTP o programática, mediante la CLI de Google Cloudgcloud herramienta de línea de comandos en una ventana de la terminal local o en Cloud Shell oGoogle Cloud Console Se abrió en un navegador local. También puedes establecer una conexión SSH con la instancia principal en tu clúster y luego ejecutar un trabajo de forma directa desde la instancia sin usar el servicio de Dataproc.

Envía un trabajo de Dataproc

gcloud

Para enviar un trabajo a un clúster de Dataproc, ejecuta elgcloud CLIgcloud dataproc jobs Submit de forma local en una ventana de la terminal o enCloud Shell ,

gcloud dataproc jobs submit job-command \
    --cluster=cluster-name \
    --region=region \
    other dataproc-flags \
    -- job-args
Ejemplo de envío de un trabajo de PySpark
  1. Muestra el archivo hello-world.py de acceso público ubicado en Cloud Storage.
    gsutil cat gs://dataproc-examples/pyspark/hello-world/hello-world.py
    
    A continuación, se muestra la lista de archivos:

    #!/usr/bin/python
    import pyspark
    sc = pyspark.SparkContext()
    rdd = sc.parallelize(['Hello,', 'world!'])
    words = sorted(rdd.collect())
    print(words)
    
  2. Envía el trabajo de Pyspark a Dataproc.
    gcloud dataproc jobs submit pyspark \
        gs://dataproc-examples/pyspark/hello-world/hello-world.py \
        --cluster=cluster-name  \
        --region=region
    
    A continuación, se muestra el resultado de la terminal:
    Waiting for job output...
    …
    ['Hello,', 'world!']
    Job finished successfully.
    
Ejemplo de envío de un trabajo de Spark
  1. Ejecuta el ejemplo de SparkPi preinstalado en el nodo principal del clúster de Cloud Dataproc.
    gcloud dataproc jobs submit spark \
        --cluster=cluster-name \
        --region=region \
        --class=org.apache.spark.examples.SparkPi \
        --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
        -- 1000
    
    A continuación, se muestra el resultado de la terminal:
    Job [54825071-ae28-4c5b-85a5-58fae6a597d6] submitted.
    Waiting for job output…
    …
    Pi is roughly 3.14177148
    …
    Job finished successfully.
    …
    

LÍNEA DE REST Y CMD

En esta sección, se muestra cómo enviar un trabajo de Spark para calcular el valor aproximado de pi mediante la API de jobs.submit de Dataproc.

Antes de usar cualquiera de los datos de solicitud a continuación, realiza los siguientes reemplazos:

Método HTTP y URL:

POST https://dataproc.googleapis.com/v1/projects/project-id/regions/region/jobs:submit

Cuerpo JSON de la solicitud:

{
  "job": {
    "placement": {
      "clusterName": "cluster-name"
    },
    },
    "sparkJob": {
      "args": [
        "1000"
      ],
      "mainClass": "org.apache.spark.examples.SparkPi",
      "jarFileUris": [
        "file:///usr/lib/spark/examples/jars/spark-examples.jar"
      ]
    }
  }
}

Para enviar tu solicitud, expande una de estas opciones:

Deberías recibir una respuesta JSON similar a la que se muestra a continuación:

{
  "reference": {
    "projectId": "project-id",
    "jobId": "job-id"
  },
  "placement": {
    "clusterName": "cluster-name",
    "clusterUuid": "cluster-Uuid"
  },
  "sparkJob": {
    "mainClass": "org.apache.spark.examples.SparkPi",
    "args": [
      "1000"
    ],
    "jarFileUris": [
      "file:///usr/lib/spark/examples/jars/spark-examples.jar"
    ]
  },
  "status": {
    "state": "PENDING",
    "stateStartTime": "2020-10-07T20:16:21.759Z"
  },
  "jobUuid": "job-Uuid"
}

Console

Abre la página de Dataproc Enviar un trabajo (Submit a job) en Cloud Console, en tu navegador.

Ejemplo de un trabajo de Spark

Para enviar un ejemplo de trabajo de Spark, llena los campos en la página Submit a job de la siguiente manera (como se muestra en la captura de pantalla anterior):

  1. En Clúster (Cluster), selecciona el nombre del clúster que quieres elegir de la lista.
  2. Establece Tipo de trabajo (Job type) en Spark.
  3. Establece Clase principal o jar (Main class or jar) en org.apache.spark.examples.SparkPi.
  4. Establece Argumentos (Arguments) en un argumento único 1000.
  5. Agrega file:///usr/lib/spark/examples/jars/spark-examples.jar a Archivos jar (Jar files):
    1. file:/// denota un esquema LocalFileSystem de Hadoop. Dataproc instaló /usr/lib/spark/examples/jars/spark-examples.jar en el nodo principal del clúster cuando creó el clúster.
    2. Como alternativa, puedes especificar una ruta de Cloud Storage (gs://your-bucket/your-jarfile.jar) o una ruta del sistema de archivos distribuido de Hadoop (hdfs://path-to-jar.jar) a uno de tus archivos jar.

Haz clic en Submit (Enviar) para iniciar el trabajo. Una vez que se inicia el trabajo, se agrega a la lista Jobs (Trabajos).

Haz clic en el ID de trabajo para abrir la página Jobs, en la que podrás ver el resultado del controlador del trabajo (consulta Accede al resultado del controlador de trabajos–CONSOLE). Este trabajo produce líneas de salida extensas que superan el ancho de la ventana del navegador, por lo que puedes marcar el cuadro Ajuste de línea (Line wrapping) para mostrar todo el texto de salida del resultado calculado de pi.

Puedes ver el resultado del controlador de trabajos desde la línea de comandos con gcloud dataproc jobs wait que se muestra a continuación. Para obtener más información, consulta Accede al resultado del controlador de trabajos–GCLOUD COMMAND. Copia y pega tu ID del proyecto como el valor de la marca --project y tu ID de trabajo (que aparece en la lista de trabajos) como argumento final.

gcloud dataproc jobs wait job-id \
    --project=project-id \
    --region=region

Aquí hay fragmentos del resultado del controlador para el trabajo de muestra de SparkPi que se envió con anterioridad:

...
2015-06-25 23:27:23,810 INFO [dag-scheduler-event-loop]
scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Stage 0 (reduce at
SparkPi.scala:35) finished in 21.169 s

2015-06-25 23:27:23,810 INFO [task-result-getter-3] cluster.YarnScheduler
(Logging.scala:logInfo(59)) - Removed TaskSet 0.0, whose tasks have all
completed, from pool

2015-06-25 23:27:23,819 INFO [main] scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Job 0 finished: reduce at SparkPi.scala:35,
took 21.674931 s

Pi is roughly 3.14189648
...
Job [c556b47a-4b46-4a94-9ba2-2dcee31167b2] finished successfully.

driverOutputUri:
gs://sample-staging-bucket/google-cloud-dataproc-metainfo/cfeaa033-749e-48b9-...
...

Java

  1. Instala la biblioteca cliente
  2. Configura credenciales predeterminadas de la aplicación
  3. Ejecuta el código.
    
    import com.google.api.gax.longrunning.OperationFuture;
    import com.google.cloud.dataproc.v1.Job;
    import com.google.cloud.dataproc.v1.JobControllerClient;
    import com.google.cloud.dataproc.v1.JobControllerSettings;
    import com.google.cloud.dataproc.v1.JobMetadata;
    import com.google.cloud.dataproc.v1.JobPlacement;
    import com.google.cloud.dataproc.v1.SparkJob;
    import com.google.cloud.storage.Blob;
    import com.google.cloud.storage.Storage;
    import com.google.cloud.storage.StorageOptions;
    import java.io.IOException;
    import java.util.concurrent.ExecutionException;
    import java.util.regex.Matcher;
    import java.util.regex.Pattern;
    
    public class SubmitJob {
    
      public static void submitJob() throws IOException, InterruptedException {
        // TODO(developer): Replace these variables before running the sample.
        String projectId = "your-project-id";
        String region = "your-project-region";
        String clusterName = "your-cluster-name";
        submitJob(projectId, region, clusterName);
      }
    
      public static void submitJob(String projectId, String region, String clusterName)
          throws IOException, InterruptedException {
        String myEndpoint = String.format("%s-dataproc.googleapis.com:443", region);
    
        // Configure the settings for the job controller client.
        JobControllerSettings jobControllerSettings =
            JobControllerSettings.newBuilder().setEndpoint(myEndpoint).build();
    
        // Create a job controller client with the configured settings. Using a try-with-resources
        // closes the client,
        // but this can also be done manually with the .close() method.
        try (JobControllerClient jobControllerClient =
            JobControllerClient.create(jobControllerSettings)) {
    
          // Configure cluster placement for the job.
          JobPlacement jobPlacement = JobPlacement.newBuilder().setClusterName(clusterName).build();
    
          // Configure Spark job settings.
          SparkJob sparkJob =
              SparkJob.newBuilder()
                  .setMainClass("org.apache.spark.examples.SparkPi")
                  .addJarFileUris("file:///usr/lib/spark/examples/jars/spark-examples.jar")
                  .addArgs("1000")
                  .build();
    
          Job job = Job.newBuilder().setPlacement(jobPlacement).setSparkJob(sparkJob).build();
    
          // Submit an asynchronous request to execute the job.
          OperationFuture<Job, JobMetadata> submitJobAsOperationAsyncRequest =
              jobControllerClient.submitJobAsOperationAsync(projectId, region, job);
    
          Job response = submitJobAsOperationAsyncRequest.get();
    
          // Print output from Google Cloud Storage.
          Matcher matches =
              Pattern.compile("gs://(.*?)/(.*)").matcher(response.getDriverOutputResourceUri());
          matches.matches();
    
          Storage storage = StorageOptions.getDefaultInstance().getService();
          Blob blob = storage.get(matches.group(1), String.format("%s.000000000", matches.group(2)));
    
          System.out.println(
              String.format("Job finished successfully: %s", new String(blob.getContent())));
    
        } catch (ExecutionException e) {
          // If the job does not complete successfully, print the error message.
          System.err.println(String.format("submitJob: %s ", e.getMessage()));
        }
      }
    }

Python

  1. Instala la biblioteca cliente
  2. Configura credenciales predeterminadas de la aplicación
  3. Ejecuta el código.
    import re
    
    from google.cloud import dataproc_v1 as dataproc
    from google.cloud import storage
    
    def submit_job(project_id, region, cluster_name):
        # Create the job client.
        job_client = dataproc.JobControllerClient(
            client_options={"api_endpoint": "{}-dataproc.googleapis.com:443".format(region)}
        )
    
        # Create the job config. 'main_jar_file_uri' can also be a
        # Google Cloud Storage URL.
        job = {
            "placement": {"cluster_name": cluster_name},
            "spark_job": {
                "main_class": "org.apache.spark.examples.SparkPi",
                "jar_file_uris": ["file:///usr/lib/spark/examples/jars/spark-examples.jar"],
                "args": ["1000"],
            },
        }
    
        operation = job_client.submit_job_as_operation(
            request={"project_id": project_id, "region": region, "job": job}
        )
        response = operation.result()
    
        # Dataproc job output gets saved to the Google Cloud Storage bucket
        # allocated to the job. Use a regex to obtain the bucket and blob info.
        matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri)
    
        output = (
            storage.Client()
            .get_bucket(matches.group(1))
            .blob(f"{matches.group(2)}.000000000")
            .download_as_string()
        )
    
        print(f"Job finished successfully: {output}")
    
    

Comienza a usarlo

  1. Instala la biblioteca cliente
  2. Configura credenciales predeterminadas de la aplicación
  3. Ejecuta el código. .
    import (
    	"context"
    	"fmt"
    	"io"
    	"io/ioutil"
    	"log"
    	"regexp"
    
    	dataproc "cloud.google.com/go/dataproc/apiv1"
    	"cloud.google.com/go/storage"
    	"google.golang.org/api/option"
    	dataprocpb "google.golang.org/genproto/googleapis/cloud/dataproc/v1"
    )
    
    func submitJob(w io.Writer, projectID, region, clusterName string) error {
    	// projectID := "your-project-id"
    	// region := "us-central1"
    	// clusterName := "your-cluster"
    	ctx := context.Background()
    
    	// Create the job client.
    	endpoint := fmt.Sprintf("%s-dataproc.googleapis.com:443", region)
    	jobClient, err := dataproc.NewJobControllerClient(ctx, option.WithEndpoint(endpoint))
    	if err != nil {
    		log.Fatalf("error creating the job client: %s\n", err)
    	}
    
    	// Create the job config.
    	submitJobReq := &dataprocpb.SubmitJobRequest{
    		ProjectId: projectID,
    		Region:    region,
    		Job: &dataprocpb.Job{
    			Placement: &dataprocpb.JobPlacement{
    				ClusterName: clusterName,
    			},
    			TypeJob: &dataprocpb.Job_SparkJob{
    				SparkJob: &dataprocpb.SparkJob{
    					Driver: &dataprocpb.SparkJob_MainClass{
    						MainClass: "org.apache.spark.examples.SparkPi",
    					},
    					JarFileUris: []string{"file:///usr/lib/spark/examples/jars/spark-examples.jar"},
    					Args:        []string{"1000"},
    				},
    			},
    		},
    	}
    
    	submitJobOp, err := jobClient.SubmitJobAsOperation(ctx, submitJobReq)
    	if err != nil {
    		return fmt.Errorf("error with request to submitting job: %v", err)
    	}
    
    	submitJobResp, err := submitJobOp.Wait(ctx)
    	if err != nil {
    		return fmt.Errorf("error submitting job: %v", err)
    	}
    
    	re := regexp.MustCompile("gs://(.+?)/(.+)")
    	matches := re.FindStringSubmatch(submitJobResp.DriverOutputResourceUri)
    
    	if len(matches) < 3 {
    		return fmt.Errorf("regex error: %s", submitJobResp.DriverOutputResourceUri)
    	}
    
    	// Dataproc job output gets saved to a GCS bucket allocated to it.
    	storageClient, err := storage.NewClient(ctx)
    	if err != nil {
    		return fmt.Errorf("error creating storage client: %v", err)
    	}
    
    	obj := fmt.Sprintf("%s.000000000", matches[2])
    	reader, err := storageClient.Bucket(matches[1]).Object(obj).NewReader(ctx)
    	if err != nil {
    		return fmt.Errorf("error reading job output: %v", err)
    	}
    
    	defer reader.Close()
    
    	body, err := ioutil.ReadAll(reader)
    	if err != nil {
    		return fmt.Errorf("could not read output from Dataproc Job: %v", err)
    	}
    
    	fmt.Fprintf(w, "Job finished successfully: %s", body)
    
    	return nil
    }
    

Node.js

  1. Instala la biblioteca cliente
  2. Configura credenciales predeterminadas de la aplicación
  3. Ejecuta el código.
    const dataproc = require('@google-cloud/dataproc');
    const {Storage} = require('@google-cloud/storage');
    
    // TODO(developer): Uncomment and set the following variables
    // projectId = 'YOUR_PROJECT_ID'
    // region = 'YOUR_CLUSTER_REGION'
    // clusterName = 'YOUR_CLUSTER_NAME'
    
    // Create a client with the endpoint set to the desired cluster region
    const jobClient = new dataproc.v1.JobControllerClient({
      apiEndpoint: `${region}-dataproc.googleapis.com`,
      projectId: projectId,
    });
    
    async function submitJob() {
      const job = {
        projectId: projectId,
        region: region,
        job: {
          placement: {
            clusterName: clusterName,
          },
          sparkJob: {
            mainClass: 'org.apache.spark.examples.SparkPi',
            jarFileUris: [
              'file:///usr/lib/spark/examples/jars/spark-examples.jar',
            ],
            args: ['1000'],
          },
        },
      };
    
      const [jobOperation] = await jobClient.submitJobAsOperation(job);
      const [jobResponse] = await jobOperation.promise();
    
      const matches =
        jobResponse.driverOutputResourceUri.match('gs://(.*?)/(.*)');
    
      const storage = new Storage();
    
      const output = await storage
        .bucket(matches[1])
        .file(`${matches[2]}.000000000`)
        .download();
    
      // Output a success message.
      console.log(`Job finished successfully: ${output}`);

Envía un trabajo directamente en tu clúster

Si deseas ejecutar un trabajo directamente en tu clúster sin usar el servicio de Dataproc, establece una conexión SSH al nodo principal de tu clúster y ejecuta el trabajo en el nodo principal.

Establece una conexión SSH a la instancia principal

Puedes conectarte a una instancia de VM de Compute Engine de tu clúster mediante SSH desde la línea de comandos o desde Cloud Console.

Comando de gcloud

Ejecuta el comando gcloud compute ssh en una ventana de la terminal de manera local o en Cloud Shell para establecer una conexión SSH al nodo principal del clúster (el nombre predeterminado del nodo principal es el nombre del clúster seguido de un sufijo -m).

gcloud compute ssh cluster-name-m \
    --region=region \
    --project=project-id

El siguiente fragmento usa gcloud compute ssh para establecer una conexión SSH con el nodo principal de cluster-1.

gcloud compute ssh cluster-1-m \
    --region=us-central-1 \
    --project=my-project-id
...
Linux cluster-1-m 4.9.0-8-amd64 #1 SMP Debian 4.9.110-3+deb9u6...
...
user@cluster-1-m:~$

Console

Usa Cloud Console para establecer una conexión SSH al nodo principal del clúster (el nombre predeterminado del nodo principal es el nombre del clúster seguido de un sufijo -m).
  1. En Cloud Console, ve a la página Instancias de VM.
  2. En la lista de instancias de máquinas virtuales, haz clic en SSH en la fila de la instancia principal (sufijo -m) a la que deseas conectarte.

Se abrirá una ventana del navegador en tu directorio principal del nodo.

Connected, host fingerprint: ssh-rsa ...
Linux cluster-1-m 3.16.0-0.bpo.4-amd64 ...
...
user@cluster-1-m:~$

Ejecuta un trabajo de Spark en el nodo principal

Luego de establecer una conexión SSH a la instancia principal de VM, ejecuta comandos en una ventana de terminal en el nodo principal del clúster para realizar las siguientes acciones:

  1. Abrir una shell de Spark
  2. Ejecutar un trabajo de Spark simple para contar la cantidad de líneas en un archivo “hello-world” de Python (de siete líneas) ubicado en un archivo accesible de manera pública en Cloud Storage
  3. Salir de la shell

    user@cluster-name-m:~$ spark-shell
    ...
    scala> sc.textFile("gs://dataproc-examples"
    + "/pyspark/hello-world/hello-world.py").count
    ...
    res0: Long = 7
    scala> :quit
    

Ejecuta trabajos de bash en Dataproc

Es posible que desees ejecutar una secuencia de comandos Bash como tu trabajo de Dataproc, ya sea porque los motores que usas no son compatibles como un tipo de trabajo de Dataproc de nivel superior o porque necesitas realizar una configuración o cálculo adicional de argumentos antes iniciar un trabajo mediante hadoop o spark-submit desde tu secuencia de comandos.

Ejemplo de pig

Supongamos que copiaste una secuencia de comandos bash hello.sh en Cloud Storage:

gsutil cp hello.sh gs://${BUCKET}/hello.sh

Dado que el comando pig fs usa rutas de Hadoop, copia la secuencia de comandos de Cloud Storage en un destino especificado como file:/// para asegurarte de que esté en el sistema de archivos local, en lugar de HDFS. Los comandos sh posteriores hacen referencia al sistema de archivos local de forma automática y no requieren el prefijo file:///.

gcloud dataproc jobs submit pig --cluster=${CLUSTER} --region=${REGION} \
    -e='fs -cp -f gs://${BUCKET}/hello.sh file:///tmp/hello.sh; sh chmod 750 /tmp/hello.sh; sh /tmp/hello.sh'

De manera alternativa, como los trabajos de Dataproc envían un archivo a etapa de argumento --jars en un directorio temporal creado durante la vida útil del trabajo, puedes especificar tu secuencia de comandos de shell de Cloud Storage como argumento --jars:

gcloud dataproc jobs submit pig --cluster=${CLUSTER} --region=${REGION} \
    --jars=gs://${BUCKET}/hello.sh \
    -e='sh chmod 750 ${PWD}/hello.sh; sh ${PWD}/hello.sh'

Ten en cuenta que el argumento --jars también puede hacer referencia a una secuencia de comandos local:

gcloud dataproc jobs submit pig --cluster=${CLUSTER} --region=${REGION} \
    --jars=hello.sh \
    -e='sh chmod 750 ${PWD}/hello.sh; sh ${PWD}/hello.sh'