Envie um job

É possível enviar um job para um cluster do Dataproc por meio de uma solicitação programática ou HTTP jobs.submit da API Dataproc, usando a ferramenta de linha de comando gcloud da Google Cloud CLI em uma janela de terminal local ou no Cloud Shell ou pelo Console do Google Cloud aberto em um navegador local. Também é possível executar SSH na instância mestre do cluster e executar um job diretamente na instância sem usar o serviço Dataproc.

Como enviar um job

Console

Abra a página Enviar um job do Dataproc no console do Google Cloud no navegador.

Exemplo de job do Spark

Para enviar um job do Spark de exemplo, preencha os campos na página Enviar um job da seguinte maneira:

  1. Selecione o nome do Cluster na lista de clusters.
  2. Defina o Tipo de job para Spark.
  3. Defina Classe principal ou jar como org.apache.spark.examples.SparkPi.
  4. Defina Argumentos como o argumento único 1000.
  5. Adicione file:///usr/lib/spark/examples/jars/spark-examples.jar para Arquivos jar:
    1. file:/// indica um esquema de LocalFileSystem do Hadoop. O Dataproc instalou /usr/lib/spark/examples/jars/spark-examples.jar no nó mestre do cluster quando criou o cluster.
    2. Como alternativa, você pode especificar um caminho do Cloud Storage (gs://your-bucket/your-jarfile.jar) ou um caminho do sistema de arquivos distribuídos do Hadoop (hdfs://path-to-jar.jar) para um dos seus jars.

Clique em Enviar para iniciar o job. Depois de iniciado, o job será adicionado à lista.

Clique no ID do job para abrir a página Jobs e conferir a saída do driver do job. Como este job produz linhas de saída longas que excedem a largura da janela do navegador, você pode marcar a caixa de Quebra de linha para exibir todo o texto de saída e mostrar o resultado calculado para pi.

Visualize a saída do driver de job na linha de comando usando o comando gcloud dataproc jobs wait mostrado abaixo. Para saber mais, consulte Conferir a saída do job – GCLOUD COMMAND. Copie e cole o código do projeto como o valor para a sinalização --project e o ID do job (mostrado na lista de jobs) como o argumento final.

gcloud dataproc jobs wait job-id \
    --project=project-id \
    --region=region

Veja os snippets da saída do driver do job SparkPi de exemplo enviado acima:

...
2015-06-25 23:27:23,810 INFO [dag-scheduler-event-loop]
scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Stage 0 (reduce at
SparkPi.scala:35) finished in 21.169 s

2015-06-25 23:27:23,810 INFO [task-result-getter-3] cluster.YarnScheduler
(Logging.scala:logInfo(59)) - Removed TaskSet 0.0, whose tasks have all
completed, from pool

2015-06-25 23:27:23,819 INFO [main] scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Job 0 finished: reduce at SparkPi.scala:35,
took 21.674931 s

Pi is roughly 3.14189648
...
Job [c556b47a-4b46-4a94-9ba2-2dcee31167b2] finished successfully.

driverOutputUri:
gs://sample-staging-bucket/google-cloud-dataproc-metainfo/cfeaa033-749e-48b9-...
...

gcloud

Para enviar um job a um cluster do Dataproc, execute o comando da CLI gcloud gcloud dataproc jobs submit localmente em uma janela de terminal ou no Cloud Shell.

gcloud dataproc jobs submit job-command \
    --cluster=cluster-name \
    --region=region \
    other dataproc-flags \
    -- job-args
Exemplo de envio de job do PySpark
  1. Liste os hello-world.py acessíveis publicamente localizados no Cloud Storage.
    gcloud storage cat gs://dataproc-examples/pyspark/hello-world/hello-world.py
    
    Listagem de arquivos:

    #!/usr/bin/python
    import pyspark
    sc = pyspark.SparkContext()
    rdd = sc.parallelize(['Hello,', 'world!'])
    words = sorted(rdd.collect())
    print(words)
  2. Envie o job do Pyspark para o Dataproc.
    gcloud dataproc jobs submit pyspark \
        gs://dataproc-examples/pyspark/hello-world/hello-world.py \
        --cluster=cluster-name  \
        --region=region
    
    Saída do terminal:
    Waiting for job output...
    …
    ['Hello,', 'world!']
    Job finished successfully.
    
Exemplo de envio de job do Spark
  1. Execute o exemplo SparkPi pré-instalado no nó mestre do cluster do Dataproc.
    gcloud dataproc jobs submit spark \
        --cluster=cluster-name \
        --region=region \
        --class=org.apache.spark.examples.SparkPi \
        --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
        -- 1000
    
    Saída do terminal:
    Job [54825071-ae28-4c5b-85a5-58fae6a597d6] submitted.
    Waiting for job output…
    …
    Pi is roughly 3.14177148
    …
    Job finished successfully.
    …
    

REST

Nesta seção, mostramos como enviar um job do Spark para calcular o valor aproximado de pi usando a API jobs.submit do Dataproc.

Antes de usar os dados da solicitação abaixo, faça as substituições a seguir:

  • project-id: ID do projeto do Google Cloud
  • region: região do cluster
  • clusterName: nome do cluster

Método HTTP e URL:

POST https://dataproc.googleapis.com/v1/projects/project-id/regions/region/jobs:submit

Corpo JSON da solicitação:

{
  "job": {
    "placement": {
      "clusterName": "cluster-name"
    },
    "sparkJob": {
      "args": [
        "1000"
      ],
      "mainClass": "org.apache.spark.examples.SparkPi",
      "jarFileUris": [
        "file:///usr/lib/spark/examples/jars/spark-examples.jar"
      ]
    }
  }
}

Para enviar a solicitação, expanda uma destas opções:

Você receberá uma resposta JSON semelhante a esta:

{
  "reference": {
    "projectId": "project-id",
    "jobId": "job-id"
  },
  "placement": {
    "clusterName": "cluster-name",
    "clusterUuid": "cluster-Uuid"
  },
  "sparkJob": {
    "mainClass": "org.apache.spark.examples.SparkPi",
    "args": [
      "1000"
    ],
    "jarFileUris": [
      "file:///usr/lib/spark/examples/jars/spark-examples.jar"
    ]
  },
  "status": {
    "state": "PENDING",
    "stateStartTime": "2020-10-07T20:16:21.759Z"
  },
  "jobUuid": "job-Uuid"
}

Java

  1. Instalar a biblioteca cliente
  2. Configurar as credenciais padrão do aplicativo
  3. Execute o código
    
    import com.google.api.gax.longrunning.OperationFuture;
    import com.google.cloud.dataproc.v1.Job;
    import com.google.cloud.dataproc.v1.JobControllerClient;
    import com.google.cloud.dataproc.v1.JobControllerSettings;
    import com.google.cloud.dataproc.v1.JobMetadata;
    import com.google.cloud.dataproc.v1.JobPlacement;
    import com.google.cloud.dataproc.v1.SparkJob;
    import com.google.cloud.storage.Blob;
    import com.google.cloud.storage.Storage;
    import com.google.cloud.storage.StorageOptions;
    import java.io.IOException;
    import java.util.concurrent.ExecutionException;
    import java.util.regex.Matcher;
    import java.util.regex.Pattern;
    
    public class SubmitJob {
    
      public static void submitJob() throws IOException, InterruptedException {
        // TODO(developer): Replace these variables before running the sample.
        String projectId = "your-project-id";
        String region = "your-project-region";
        String clusterName = "your-cluster-name";
        submitJob(projectId, region, clusterName);
      }
    
      public static void submitJob(String projectId, String region, String clusterName)
          throws IOException, InterruptedException {
        String myEndpoint = String.format("%s-dataproc.googleapis.com:443", region);
    
        // Configure the settings for the job controller client.
        JobControllerSettings jobControllerSettings =
            JobControllerSettings.newBuilder().setEndpoint(myEndpoint).build();
    
        // Create a job controller client with the configured settings. Using a try-with-resources
        // closes the client,
        // but this can also be done manually with the .close() method.
        try (JobControllerClient jobControllerClient =
            JobControllerClient.create(jobControllerSettings)) {
    
          // Configure cluster placement for the job.
          JobPlacement jobPlacement = JobPlacement.newBuilder().setClusterName(clusterName).build();
    
          // Configure Spark job settings.
          SparkJob sparkJob =
              SparkJob.newBuilder()
                  .setMainClass("org.apache.spark.examples.SparkPi")
                  .addJarFileUris("file:///usr/lib/spark/examples/jars/spark-examples.jar")
                  .addArgs("1000")
                  .build();
    
          Job job = Job.newBuilder().setPlacement(jobPlacement).setSparkJob(sparkJob).build();
    
          // Submit an asynchronous request to execute the job.
          OperationFuture<Job, JobMetadata> submitJobAsOperationAsyncRequest =
              jobControllerClient.submitJobAsOperationAsync(projectId, region, job);
    
          Job response = submitJobAsOperationAsyncRequest.get();
    
          // Print output from Google Cloud Storage.
          Matcher matches =
              Pattern.compile("gs://(.*?)/(.*)").matcher(response.getDriverOutputResourceUri());
          matches.matches();
    
          Storage storage = StorageOptions.getDefaultInstance().getService();
          Blob blob = storage.get(matches.group(1), String.format("%s.000000000", matches.group(2)));
    
          System.out.println(
              String.format("Job finished successfully: %s", new String(blob.getContent())));
    
        } catch (ExecutionException e) {
          // If the job does not complete successfully, print the error message.
          System.err.println(String.format("submitJob: %s ", e.getMessage()));
        }
      }
    }

Python

  1. Instalar a biblioteca cliente
  2. Configurar as credenciais padrão do aplicativo
  3. Execute o código
    import re
    
    
    from google.cloud import dataproc_v1 as dataproc
    from google.cloud import storage
    
    
    def submit_job(project_id, region, cluster_name):
        # Create the job client.
        job_client = dataproc.JobControllerClient(
            client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
        )
    
        # Create the job config. 'main_jar_file_uri' can also be a
        # Google Cloud Storage URL.
        job = {
            "placement": {"cluster_name": cluster_name},
            "spark_job": {
                "main_class": "org.apache.spark.examples.SparkPi",
                "jar_file_uris": ["file:///usr/lib/spark/examples/jars/spark-examples.jar"],
                "args": ["1000"],
            },
        }
    
        operation = job_client.submit_job_as_operation(
            request={"project_id": project_id, "region": region, "job": job}
        )
        response = operation.result()
    
        # Dataproc job output gets saved to the Google Cloud Storage bucket
        # allocated to the job. Use a regex to obtain the bucket and blob info.
        matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri)
    
        output = (
            storage.Client()
            .get_bucket(matches.group(1))
            .blob(f"{matches.group(2)}.000000000")
            .download_as_bytes()
            .decode("utf-8")
        )
    
        print(f"Job finished successfully: {output}")
    
    

Go

  1. Instalar a biblioteca cliente
  2. Configurar as credenciais padrão do aplicativo
  3. Execute o código
    import (
    	"context"
    	"fmt"
    	"io"
    	"log"
    	"regexp"
    
    	dataproc "cloud.google.com/go/dataproc/apiv1"
    	"cloud.google.com/go/dataproc/apiv1/dataprocpb"
    	"cloud.google.com/go/storage"
    	"google.golang.org/api/option"
    )
    
    func submitJob(w io.Writer, projectID, region, clusterName string) error {
    	// projectID := "your-project-id"
    	// region := "us-central1"
    	// clusterName := "your-cluster"
    	ctx := context.Background()
    
    	// Create the job client.
    	endpoint := fmt.Sprintf("%s-dataproc.googleapis.com:443", region)
    	jobClient, err := dataproc.NewJobControllerClient(ctx, option.WithEndpoint(endpoint))
    	if err != nil {
    		log.Fatalf("error creating the job client: %s\n", err)
    	}
    
    	// Create the job config.
    	submitJobReq := &dataprocpb.SubmitJobRequest{
    		ProjectId: projectID,
    		Region:    region,
    		Job: &dataprocpb.Job{
    			Placement: &dataprocpb.JobPlacement{
    				ClusterName: clusterName,
    			},
    			TypeJob: &dataprocpb.Job_SparkJob{
    				SparkJob: &dataprocpb.SparkJob{
    					Driver: &dataprocpb.SparkJob_MainClass{
    						MainClass: "org.apache.spark.examples.SparkPi",
    					},
    					JarFileUris: []string{"file:///usr/lib/spark/examples/jars/spark-examples.jar"},
    					Args:        []string{"1000"},
    				},
    			},
    		},
    	}
    
    	submitJobOp, err := jobClient.SubmitJobAsOperation(ctx, submitJobReq)
    	if err != nil {
    		return fmt.Errorf("error with request to submitting job: %w", err)
    	}
    
    	submitJobResp, err := submitJobOp.Wait(ctx)
    	if err != nil {
    		return fmt.Errorf("error submitting job: %w", err)
    	}
    
    	re := regexp.MustCompile("gs://(.+?)/(.+)")
    	matches := re.FindStringSubmatch(submitJobResp.DriverOutputResourceUri)
    
    	if len(matches) < 3 {
    		return fmt.Errorf("regex error: %s", submitJobResp.DriverOutputResourceUri)
    	}
    
    	// Dataproc job output gets saved to a GCS bucket allocated to it.
    	storageClient, err := storage.NewClient(ctx)
    	if err != nil {
    		return fmt.Errorf("error creating storage client: %w", err)
    	}
    
    	obj := fmt.Sprintf("%s.000000000", matches[2])
    	reader, err := storageClient.Bucket(matches[1]).Object(obj).NewReader(ctx)
    	if err != nil {
    		return fmt.Errorf("error reading job output: %w", err)
    	}
    
    	defer reader.Close()
    
    	body, err := io.ReadAll(reader)
    	if err != nil {
    		return fmt.Errorf("could not read output from Dataproc Job: %w", err)
    	}
    
    	fmt.Fprintf(w, "Job finished successfully: %s", body)
    
    	return nil
    }
    

Node.js

  1. Instalar a biblioteca cliente
  2. Configurar as credenciais padrão do aplicativo
  3. Execute o código
    const dataproc = require('@google-cloud/dataproc');
    const {Storage} = require('@google-cloud/storage');
    
    // TODO(developer): Uncomment and set the following variables
    // projectId = 'YOUR_PROJECT_ID'
    // region = 'YOUR_CLUSTER_REGION'
    // clusterName = 'YOUR_CLUSTER_NAME'
    
    // Create a client with the endpoint set to the desired cluster region
    const jobClient = new dataproc.v1.JobControllerClient({
      apiEndpoint: `${region}-dataproc.googleapis.com`,
      projectId: projectId,
    });
    
    async function submitJob() {
      const job = {
        projectId: projectId,
        region: region,
        job: {
          placement: {
            clusterName: clusterName,
          },
          sparkJob: {
            mainClass: 'org.apache.spark.examples.SparkPi',
            jarFileUris: [
              'file:///usr/lib/spark/examples/jars/spark-examples.jar',
            ],
            args: ['1000'],
          },
        },
      };
    
      const [jobOperation] = await jobClient.submitJobAsOperation(job);
      const [jobResponse] = await jobOperation.promise();
    
      const matches =
        jobResponse.driverOutputResourceUri.match('gs://(.*?)/(.*)');
    
      const storage = new Storage();
    
      const output = await storage
        .bucket(matches[1])
        .file(`${matches[2]}.000000000`)
        .download();
    
      // Output a success message.
      console.log(`Job finished successfully: ${output}`);

Enviar um trabalho diretamente no cluster

Se você quiser executar um job diretamente no cluster sem usar o serviço do Dataproc, use o SSH no nó mestre do cluster e execute o job no nó mestre.

Depois de estabelecer uma conexão SSH com a instância mestre de VM, execute comandos em uma janela de terminal no nó mestre do cluster para:

  1. abrir um shell do Spark;
  2. executar um job do Spark simples para contar o número de linhas em um arquivo "hello-world" do Python (sete linhas) localizado em um arquivo do Cloud Storage acessível publicamente;
  3. sair do shell.

    user@cluster-name-m:~$ spark-shell
    ...
    scala> sc.textFile("gs://dataproc-examples"
    + "/pyspark/hello-world/hello-world.py").count
    ...
    res0: Long = 7
    scala> :quit
    

Executar jobs de bash no Dataproc

Execute um script bash como job do Dataproc porque os mecanismos usados não são compatíveis com um tipo de job de nível superior ou porque você precisa configurar ou calcular os argumentos antes de iniciar um job usando hadoop ou spark-submit do seu script.

Exemplo de Python

Suponha que você tenha copiado um script hello.sh bash no Cloud Storage:

gcloud storage cp hello.sh gs://${BUCKET}/hello.sh

Como o comando pig fs usa caminhos do Hadoop, copie o script do Cloud Storage para um destino especificado como file:/// para garantir que ele esteja no sistema de arquivos local em vez do HDFS. Os comandos sh subsequentes fazem referência ao sistema de arquivos local automaticamente e não exigem o prefixo file:///.

gcloud dataproc jobs submit pig --cluster=${CLUSTER} --region=${REGION} \
    -e='fs -cp -f gs://${BUCKET}/hello.sh file:///tmp/hello.sh; sh chmod 750 /tmp/hello.sh; sh /tmp/hello.sh'

Como alternativa, como os jobs do Dataproc enviam o argumento --jars e em um diretório temporário criado durante a vida útil do job, especifique o script de shell do Cloud Storage como um argumento --jars:

gcloud dataproc jobs submit pig --cluster=${CLUSTER} --region=${REGION} \
    --jars=gs://${BUCKET}/hello.sh \
    -e='sh chmod 750 ${PWD}/hello.sh; sh ${PWD}/hello.sh'

Observe que o argumento --jars também pode fazer referência a um script local:

gcloud dataproc jobs submit pig --cluster=${CLUSTER} --region=${REGION} \
    --jars=hello.sh \
    -e='sh chmod 750 ${PWD}/hello.sh; sh ${PWD}/hello.sh'