Enviar um job

É possível enviar um job por meio de uma API HTTP jobs.submit do Dataproc ou de uma solicitação programática, usando a ferramenta de linha de comando gcloud do SDK do Cloud em uma janela de terminal local ou no Cloud Shell ou então pelo Console do Google Cloud aberto em um navegador local. Também é possível executar SSH na instância mestre do cluster e executar um job diretamente na instância sem usar o serviço Dataproc.

Como enviar um job do Dataproc

gcloud

Para enviar um job a um cluster do Dataproc, execute o comando gcloud dataproc submit do SDK do Cloud localmente em uma janela de terminal ou no Cloud Shell.

gcloud dataproc jobs submit job-command \
    --cluster=cluster-name \
    --region=region \
    other dataproc-flags \
    -- job-args
Exemplo de envio de job do PySpark
  1. Liste os hello-world.py acessíveis publicamente localizados no Cloud Storage.
    gsutil cat gs://dataproc-examples/pyspark/hello-world/hello-world.py
    
    Listagem de arquivos:

    #!/usr/bin/python
    import pyspark
    sc = pyspark.SparkContext()
    rdd = sc.parallelize(['Hello,', 'world!'])
    words = sorted(rdd.collect())
    print(words)
    
  2. Envie o job do Pyspark para o Dataproc.
    gcloud dataproc jobs submit pyspark \
        gs://dataproc-examples/pyspark/hello-world/hello-world.py \
        --cluster=cluster-name  \
        --region=region
    
    Saída do terminal:
    Waiting for job output...
    …
    ['Hello,', 'world!']
    Job finished successfully.
    
Exemplo de envio de job do Spark
  1. Execute o exemplo SparkPi pré-instalado no nó mestre do cluster do Dataproc.
    gcloud dataproc jobs submit spark \
        --cluster=cluster-name \
        --region=region \
        --class=org.apache.spark.examples.SparkPi \
        --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
        -- 1000
    
    Saída do terminal:
    Job [54825071-ae28-4c5b-85a5-58fae6a597d6] submitted.
    Waiting for job output…
    …
    Pi is roughly 3.14177148
    …
    Job finished successfully.
    …
    

REST e LINHA DE CMD

Nesta seção, mostramos como enviar um job do Spark para calcular o valor aproximado de pi.

Antes de usar os dados da solicitação abaixo, faça as substituições a seguir:

  • project-id: ID de projeto do GCP.
  • region: região do cluster
  • clusterName: nome do cluster

Método HTTP e URL:

POST https://dataproc.googleapis.com/v1/projects/project-id/regions/region/jobs:submit

Corpo JSON da solicitação:

{
  "job": {
    "placement": {
      "clusterName": "cluster-name"
    },
    },
    "sparkJob": {
      "args": [
        "1000"
      ],
      "mainClass": "org.apache.spark.examples.SparkPi",
      "jarFileUris": [
        "file:///usr/lib/spark/examples/jars/spark-examples.jar"
      ]
    }
  }
}

Para enviar a solicitação, expanda uma destas opções:

Você receberá uma resposta JSON semelhante a esta:

{
  "reference": {
    "projectId": "project-id",
    "jobId": "job-id"
  },
  "placement": {
    "clusterName": "cluster-name",
    "clusterUuid": "cluster-Uuid"
  },
  "sparkJob": {
    "mainClass": "org.apache.spark.examples.SparkPi",
    "args": [
      "1000"
    ],
    "jarFileUris": [
      "file:///usr/lib/spark/examples/jars/spark-examples.jar"
    ]
  },
  "status": {
    "state": "PENDING",
    "stateStartTime": "2020-10-07T20:16:21.759Z"
  },
  "jobUuid": "job-Uuid"
}

Console

Abra a página Enviar um job do Dataproc no Console do Cloud em seu navegador.

Exemplo de job do Spark

Para enviar um job do Spark de exemplo, preencha os campos na página Enviar um job a seguir (conforme mostrado na captura de tela anterior):

  1. Selecione o nome do Cluster na lista de clusters.
  2. Defina o Tipo de job para Spark.
  3. Defina Classe principal ou jar como org.apache.spark.examples.SparkPi.
  4. Defina Argumentos como o argumento único 1000.
  5. Adicione file:///usr/lib/spark/examples/jars/spark-examples.jar para Arquivos jar:
    1. file:/// indica um esquema de LocalFileSystem do Hadoop. O Dataproc instalou /usr/lib/spark/examples/jars/spark-examples.jar no nó mestre do cluster quando criou o cluster.
    2. Como alternativa, você pode especificar um caminho do Cloud Storage (gs://your-bucket/your-jarfile.jar) ou um caminho do sistema de arquivos distribuídos do Hadoop (hdfs://path-to-jar.jar) para um dos seus jars.

Clique em Enviar para iniciar o job. Depois de iniciado, o job será adicionado à lista.

Clique no código do job para abrir a página Jobs, em que você visualiza a saída do driver do job (consulte Como acessar a saída do driver de job–CONSOLE). Como este job produz linhas de saída longas que excedem a largura da janela do navegador, você pode marcar a caixa de Quebra de linha para exibir todo o texto de saída e mostrar o resultado calculado para pi.

Visualize a saída do driver de job na linha de comando usando o comando gcloud dataproc jobs wait mostrado abaixo. Para ver mais informações, consulte Como acessar a saída do driver de job - GCLOUD COMMAND. Copie e cole o código do projeto como o valor para a sinalização --project e o ID do job (mostrado na lista de jobs) como o argumento final.

gcloud dataproc jobs wait job-id \
    --project=project-id \
    --region=region

Veja os snippets da saída do driver do job SparkPi de exemplo enviado acima:

...
2015-06-25 23:27:23,810 INFO [dag-scheduler-event-loop]
scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Stage 0 (reduce at
SparkPi.scala:35) finished in 21.169 s

2015-06-25 23:27:23,810 INFO [task-result-getter-3] cluster.YarnScheduler
(Logging.scala:logInfo(59)) - Removed TaskSet 0.0, whose tasks have all
completed, from pool

2015-06-25 23:27:23,819 INFO [main] scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Job 0 finished: reduce at SparkPi.scala:35,
took 21.674931 s

Pi is roughly 3.14189648
...
Job [c556b47a-4b46-4a94-9ba2-2dcee31167b2] finished successfully.

driverOutputUri:
gs://sample-staging-bucket/google-cloud-dataproc-metainfo/cfeaa033-749e-48b9-...
...

Java

  1. Instalar a biblioteca cliente
  2. Configurar as credenciais padrão do aplicativo
  3. Execute o código
    
    import com.google.api.gax.longrunning.OperationFuture;
    import com.google.cloud.dataproc.v1.Job;
    import com.google.cloud.dataproc.v1.JobControllerClient;
    import com.google.cloud.dataproc.v1.JobControllerSettings;
    import com.google.cloud.dataproc.v1.JobMetadata;
    import com.google.cloud.dataproc.v1.JobPlacement;
    import com.google.cloud.dataproc.v1.SparkJob;
    import com.google.cloud.storage.Blob;
    import com.google.cloud.storage.Storage;
    import com.google.cloud.storage.StorageOptions;
    import java.io.IOException;
    import java.util.concurrent.ExecutionException;
    import java.util.regex.Matcher;
    import java.util.regex.Pattern;
    
    public class SubmitJob {
    
      public static void submitJob() throws IOException, InterruptedException {
        // TODO(developer): Replace these variables before running the sample.
        String projectId = "your-project-id";
        String region = "your-project-region";
        String clusterName = "your-cluster-name";
        submitJob(projectId, region, clusterName);
      }
    
      public static void submitJob(
          String projectId, String region, String clusterName)
          throws IOException, InterruptedException {
        String myEndpoint = String.format("%s-dataproc.googleapis.com:443", region);
    
        // Configure the settings for the job controller client.
        JobControllerSettings jobControllerSettings =
            JobControllerSettings.newBuilder().setEndpoint(myEndpoint).build();
    
        // Create a job controller client with the configured settings. Using a try-with-resources
        // closes the client,
        // but this can also be done manually with the .close() method.
        try (JobControllerClient jobControllerClient =
            JobControllerClient.create(jobControllerSettings)) {
    
          // Configure cluster placement for the job.
          JobPlacement jobPlacement = JobPlacement.newBuilder().setClusterName(clusterName).build();
    
          // Configure Spark job settings.
          SparkJob sparkJob =
              SparkJob.newBuilder()
                  .setMainClass("org.apache.spark.examples.SparkPi")
                  .addJarFileUris("file:///usr/lib/spark/examples/jars/spark-examples.jar")
                  .addArgs("1000")
                  .build();
    
          Job job = Job.newBuilder().setPlacement(jobPlacement).setSparkJob(sparkJob).build();
    
          // Submit an asynchronous request to execute the job.
          OperationFuture<Job, JobMetadata> submitJobAsOperationAsyncRequest =
              jobControllerClient.submitJobAsOperationAsync(projectId, region, job);
    
          Job response = submitJobAsOperationAsyncRequest.get();
    
          // Print output from Google Cloud Storage.
          Matcher matches =
              Pattern.compile("gs://(.*?)/(.*)").matcher(response.getDriverOutputResourceUri());
          matches.matches();
    
          Storage storage = StorageOptions.getDefaultInstance().getService();
          Blob blob = storage.get(matches.group(1), String.format("%s.000000000", matches.group(2)));
    
          System.out.println(
              String.format("Job finished successfully: %s", new String(blob.getContent())));
    
        } catch (ExecutionException e) {
          // If the job does not complete successfully, print the error message.
          System.err.println(String.format("submitJob: %s ", e.getMessage()));
        }
      }
    }

Python

  1. Instalar a biblioteca cliente
  2. Configurar as credenciais padrão do aplicativo
  3. Execute o código
    import re
    
    from google.cloud import dataproc_v1 as dataproc
    from google.cloud import storage
    
    def submit_job(project_id, region, cluster_name):
        # Create the job client.
        job_client = dataproc.JobControllerClient(client_options={
            'api_endpoint': '{}-dataproc.googleapis.com:443'.format(region)
        })
    
        # Create the job config. 'main_jar_file_uri' can also be a
        # Google Cloud Storage URL.
        job = {
            'placement': {
                'cluster_name': cluster_name
            },
            'spark_job': {
                'main_class': 'org.apache.spark.examples.SparkPi',
                'jar_file_uris': ['file:///usr/lib/spark/examples/jars/spark-examples.jar'],
                'args': ['1000']
            }
        }
    
        operation = job_client.submit_job_as_operation(
            request={"project_id": project_id, "region": region, "job": job}
        )
        response = operation.result()
    
        # Dataproc job output gets saved to the Google Cloud Storage bucket
        # allocated to the job. Use a regex to obtain the bucket and blob info.
        matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri)
    
        output = (
            storage.Client()
            .get_bucket(matches.group(1))
            .blob(f"{matches.group(2)}.000000000")
            .download_as_string()
        )
    
        print(f"Job finished successfully: {output}")

Enviar um trabalho diretamente no cluster

Se você quiser executar um job diretamente no cluster sem usar o serviço do Dataproc, use o SSH no nó mestre do cluster e execute o job no nó mestre.

SSH na instância principal

Você pode se conectar a uma instância de VM do Compute Engine no cluster usando SSH na linha de comando ou no Console do Cloud.

Comando gcloud

Execute o comando gcloud compute ssh em uma janela de terminal local ou do Cloud Shell para executar o SSH no nó mestre do cluster. O nome padrão do nó mestre é o nome do cluster seguido por um sufixo -m.

gcloud compute ssh cluster-name-m \
    --region=region \
    --project=project-id

O snippet a seguir usa gcloud compute ssh para executar o SSH no nó mestre de cluster-1.

gcloud compute ssh cluster-1-m \
    --region=us-central-1 \
    --project=my-project-id
...
Linux cluster-1-m 4.9.0-8-amd64 #1 SMP Debian 4.9.110-3+deb9u6...
...
user@cluster-1-m:~$

Console

Use o Console do Cloud para executar o SSH no nó mestre do cluster. O nome padrão do nó mestre é o nome do cluster seguido por um sufixo -m.
  1. No Console do Cloud, acesse a página Instâncias de VM.
  2. Na lista de instâncias de máquina virtual, clique em SSH na linha da instância mestre (sufixo -m) a que você quer se conectar.

Uma janela de navegador é aberta no diretório inicial do nó mestre.

Connected, host fingerprint: ssh-rsa ...
Linux cluster-1-m 3.16.0-0.bpo.4-amd64 ...
...
user@cluster-1-m:~$

Executar um job do Spark no nó mestre

Depois de estabelecer uma conexão SSH com a instância mestre de VM, execute comandos em uma janela de terminal no nó mestre do cluster para:

  1. abrir um shell do Spark;
  2. executar um job do Spark simples para contar o número de linhas em um arquivo "hello-world" do Python (sete linhas) localizado em um arquivo do Cloud Storage acessível publicamente;
  3. sair do shell.

    user@cluster-name-m:~$ spark-shell
    ...
    scala> sc.textFile("gs://dataproc-examples"
    + "/pyspark/hello-world/hello-world.py").count
    ...
    res0: Long = 7
    scala> :quit
    

Executar jobs de bash no Dataproc

Execute um script bash como job do Dataproc porque os mecanismos usados não são compatíveis com um tipo de job de nível superior ou porque você precisa configurar ou calcular os argumentos antes de iniciar um job usando hadoop ou spark-submit do seu script.

Exemplo de Python

Suponha que você tenha copiado um script hello.sh bash no Cloud Storage:

gsutil cp hello.sh gs://${BUCKET}/hello.sh

Como o comando pig fs usa caminhos do Hadoop, copie o script do Cloud Storage para um destino especificado como file:/// para garantir que ele esteja no sistema de arquivos local em vez do HDFS. Os comandos sh subsequentes fazem referência ao sistema de arquivos local automaticamente e não exigem o prefixo file:///.

gcloud dataproc jobs submit pig --cluster=${CLUSTER} --region=${REGION} \
    -e='fs -cp -f gs://${BUCKET}/hello.sh file:///tmp/hello.sh; sh chmod 750 /tmp/hello.sh; sh /tmp/hello.sh'

Como alternativa, como os jobs do Dataproc enviam o argumento --jars e em um diretório temporário criado durante a vida útil do job, especifique o script de shell do Cloud Storage como um argumento --jars:

gcloud dataproc jobs submit pig --cluster=${CLUSTER} --region=${REGION} \
    --jars=gs://${BUCKET}/hello.sh \
    -e='sh chmod 750 ${PWD}/hello.sh; sh ${PWD}/hello.sh'

Observe que o argumento --jars também pode fazer referência a um script local:

gcloud dataproc jobs submit pig --cluster=${CLUSTER} --region=${REGION} \
    --jars=hello.sh \
    -e='sh chmod 750 ${PWD}/hello.sh; sh ${PWD}/hello.sh'