提交作业

您可以通过以下方式将作业提交到现有 Dataproc 集群:通过 Dataproc API jobs.submit HTTP 或程序化请求,借助本地终端窗口或 Cloud Shell 中的 Cloud SDK gcloud 命令行工具,或通过在本地浏览器中打开的 Google Cloud Console。您还可以通过 SSH 连接到集群中的主实例,然后无需使用 Dataproc 服务,直接从实例运行作业。

提交 Dataproc 作业

gcloud

如需将作业提交到 Dataproc 集群,请在终端窗口或 Cloud Shell 中本地运行 Cloud SDK gcloud dataproc jobs submit 命令。

gcloud dataproc jobs submit job-command \
    --cluster=cluster-name \
    --region=region \
    other dataproc-flags \
    -- job-args
PySpark 作业提交示例
  1. 列出位于 Cloud Storage 中的可公开访问的 hello-world.py
    gsutil cat gs://dataproc-examples/pyspark/hello-world/hello-world.py
    
    文件列表:

    #!/usr/bin/python
    import pyspark
    sc = pyspark.SparkContext()
    rdd = sc.parallelize(['Hello,', 'world!'])
    words = sorted(rdd.collect())
    print(words)
    
  2. 将 Pyspark 作业提交到 Dataproc。
    gcloud dataproc jobs submit pyspark \
        gs://dataproc-examples/pyspark/hello-world/hello-world.py \
        --cluster=cluster-name  \
        --region=region
    
    终端输出:
    Waiting for job output...
    …
    ['Hello,', 'world!']
    Job finished successfully.
    
Spark 作业提交示例
  1. 运行预安装在 Dataproc 集群主节点上的 SparkPi 示例。
    gcloud dataproc jobs submit spark \
        --cluster=cluster-name \
        --region=region \
        --class=org.apache.spark.examples.SparkPi \
        --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
        -- 1000
    
    终端输出:
    Job [54825071-ae28-4c5b-85a5-58fae6a597d6] submitted.
    Waiting for job output…
    …
    Pi is roughly 3.14177148
    …
    Job finished successfully.
    …
    

REST 和命令行

本部分介绍如何使用 Dataproc jobs.submit API 提交 Spark 作业以计算 pi 的近似值。

在使用下面的请求数据之前,请先进行以下替换:

  • project-id:GCP 项目 ID
  • region集群地区
  • clusterName:集群名称

HTTP 方法和网址:

POST https://dataproc.googleapis.com/v1/projects/project-id/regions/region/jobs:submit

请求 JSON 正文:

{
  "job": {
    "placement": {
      "clusterName": "cluster-name"
    },
    },
    "sparkJob": {
      "args": [
        "1000"
      ],
      "mainClass": "org.apache.spark.examples.SparkPi",
      "jarFileUris": [
        "file:///usr/lib/spark/examples/jars/spark-examples.jar"
      ]
    }
  }
}

如需发送您的请求,请展开以下选项之一:

您应该收到类似以下内容的 JSON 响应:

{
  "reference": {
    "projectId": "project-id",
    "jobId": "job-id"
  },
  "placement": {
    "clusterName": "cluster-name",
    "clusterUuid": "cluster-Uuid"
  },
  "sparkJob": {
    "mainClass": "org.apache.spark.examples.SparkPi",
    "args": [
      "1000"
    ],
    "jarFileUris": [
      "file:///usr/lib/spark/examples/jars/spark-examples.jar"
    ]
  },
  "status": {
    "state": "PENDING",
    "stateStartTime": "2020-10-07T20:16:21.759Z"
  },
  "jobUuid": "job-Uuid"
}

控制台

在浏览器中通过 Cloud Console 打开提交作业页面。

Spark 作业示例

如需提交示例 Spark 作业,请填写提交作业页面上的字段,如下所示(根据上一屏幕截图):

  1. 从集群列表中选择集群名称。
  2. 作业类型设置为 Spark
  3. 主类或 jar 设置为 org.apache.spark.examples.SparkPi
  4. 参数设置为单个参数 1000
  5. file:///usr/lib/spark/examples/jars/spark-examples.jar 添加到 Jar 文件
    1. file:/// 表示 Hadoop LocalFileSystem 方案。在创建集群时,Dataproc 在集群主节点上安装了 /usr/lib/spark/examples/jars/spark-examples.jar
    2. 或者,您可以为其中一个 jar 指定 Cloud Storage 路径 (gs://your-bucket/your-jarfile.jar) 或 Hadoop 分布式文件系统路径 (hdfs://path-to-jar.jar)。

点击提交以启动作业。作业启动后,就会添加到作业列表中。

点击作业 ID 以打开作业页面,您可在此查看作业的驱动程序输出(请参阅访问作业驱动程序输出–CONSOLE)。由于该作业生成的输出行长度超出浏览器窗口宽度,因此您可以勾选换行框,以将所有输出文本置于视图中,显示 pi 的计算结果。

您可以使用如下所示的 gcloud dataproc jobs wait 命令,从命令行查看作业的驱动程序输出(如需了解详情,请参阅访问作业驱动程序输出–GCLOUD 命令)。 将项目 ID 复制并粘贴为 --project 标志的值,并将作业 ID(显示在“作业”列表中)作为最终参数。

gcloud dataproc jobs wait job-id \
    --project=project-id \
    --region=region

以下为上述提交的示例 SparkPi 作业的驱动程序输出的片段:

...
2015-06-25 23:27:23,810 INFO [dag-scheduler-event-loop]
scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Stage 0 (reduce at
SparkPi.scala:35) finished in 21.169 s

2015-06-25 23:27:23,810 INFO [task-result-getter-3] cluster.YarnScheduler
(Logging.scala:logInfo(59)) - Removed TaskSet 0.0, whose tasks have all
completed, from pool

2015-06-25 23:27:23,819 INFO [main] scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Job 0 finished: reduce at SparkPi.scala:35,
took 21.674931 s

Pi is roughly 3.14189648
...
Job [c556b47a-4b46-4a94-9ba2-2dcee31167b2] finished successfully.

driverOutputUri:
gs://sample-staging-bucket/google-cloud-dataproc-metainfo/cfeaa033-749e-48b9-...
...

Java

  1. 安装客户端库
  2. 设置应用默认凭据
  3. 运行代码
    
    import com.google.api.gax.longrunning.OperationFuture;
    import com.google.cloud.dataproc.v1.Job;
    import com.google.cloud.dataproc.v1.JobControllerClient;
    import com.google.cloud.dataproc.v1.JobControllerSettings;
    import com.google.cloud.dataproc.v1.JobMetadata;
    import com.google.cloud.dataproc.v1.JobPlacement;
    import com.google.cloud.dataproc.v1.SparkJob;
    import com.google.cloud.storage.Blob;
    import com.google.cloud.storage.Storage;
    import com.google.cloud.storage.StorageOptions;
    import java.io.IOException;
    import java.util.concurrent.ExecutionException;
    import java.util.regex.Matcher;
    import java.util.regex.Pattern;
    
    public class SubmitJob {
    
      public static void submitJob() throws IOException, InterruptedException {
        // TODO(developer): Replace these variables before running the sample.
        String projectId = "your-project-id";
        String region = "your-project-region";
        String clusterName = "your-cluster-name";
        submitJob(projectId, region, clusterName);
      }
    
      public static void submitJob(
          String projectId, String region, String clusterName)
          throws IOException, InterruptedException {
        String myEndpoint = String.format("%s-dataproc.googleapis.com:443", region);
    
        // Configure the settings for the job controller client.
        JobControllerSettings jobControllerSettings =
            JobControllerSettings.newBuilder().setEndpoint(myEndpoint).build();
    
        // Create a job controller client with the configured settings. Using a try-with-resources
        // closes the client,
        // but this can also be done manually with the .close() method.
        try (JobControllerClient jobControllerClient =
            JobControllerClient.create(jobControllerSettings)) {
    
          // Configure cluster placement for the job.
          JobPlacement jobPlacement = JobPlacement.newBuilder().setClusterName(clusterName).build();
    
          // Configure Spark job settings.
          SparkJob sparkJob =
              SparkJob.newBuilder()
                  .setMainClass("org.apache.spark.examples.SparkPi")
                  .addJarFileUris("file:///usr/lib/spark/examples/jars/spark-examples.jar")
                  .addArgs("1000")
                  .build();
    
          Job job = Job.newBuilder().setPlacement(jobPlacement).setSparkJob(sparkJob).build();
    
          // Submit an asynchronous request to execute the job.
          OperationFuture<Job, JobMetadata> submitJobAsOperationAsyncRequest =
              jobControllerClient.submitJobAsOperationAsync(projectId, region, job);
    
          Job response = submitJobAsOperationAsyncRequest.get();
    
          // Print output from Google Cloud Storage.
          Matcher matches =
              Pattern.compile("gs://(.*?)/(.*)").matcher(response.getDriverOutputResourceUri());
          matches.matches();
    
          Storage storage = StorageOptions.getDefaultInstance().getService();
          Blob blob = storage.get(matches.group(1), String.format("%s.000000000", matches.group(2)));
    
          System.out.println(
              String.format("Job finished successfully: %s", new String(blob.getContent())));
    
        } catch (ExecutionException e) {
          // If the job does not complete successfully, print the error message.
          System.err.println(String.format("submitJob: %s ", e.getMessage()));
        }
      }
    }

Python

  1. 安装客户端库
  2. 设置应用默认凭据
  3. 运行代码
    import re
    
    from google.cloud import dataproc_v1 as dataproc
    from google.cloud import storage
    
    def submit_job(project_id, region, cluster_name):
        # Create the job client.
        job_client = dataproc.JobControllerClient(client_options={
            'api_endpoint': '{}-dataproc.googleapis.com:443'.format(region)
        })
    
        # Create the job config. 'main_jar_file_uri' can also be a
        # Google Cloud Storage URL.
        job = {
            'placement': {
                'cluster_name': cluster_name
            },
            'spark_job': {
                'main_class': 'org.apache.spark.examples.SparkPi',
                'jar_file_uris': ['file:///usr/lib/spark/examples/jars/spark-examples.jar'],
                'args': ['1000']
            }
        }
    
        operation = job_client.submit_job_as_operation(
            request={"project_id": project_id, "region": region, "job": job}
        )
        response = operation.result()
    
        # Dataproc job output gets saved to the Google Cloud Storage bucket
        # allocated to the job. Use a regex to obtain the bucket and blob info.
        matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri)
    
        output = (
            storage.Client()
            .get_bucket(matches.group(1))
            .blob(f"{matches.group(2)}.000000000")
            .download_as_string()
        )
    
        print(f"Job finished successfully: {output}")

Go

  1. 安装客户端库
  2. 设置应用默认凭据
  3. 运行代码
    import (
    	"context"
    	"fmt"
    	"io"
    	"io/ioutil"
    	"log"
    	"regexp"
    
    	dataproc "cloud.google.com/go/dataproc/apiv1"
    	"cloud.google.com/go/storage"
    	"google.golang.org/api/option"
    	dataprocpb "google.golang.org/genproto/googleapis/cloud/dataproc/v1"
    )
    
    func submitJob(w io.Writer, projectID, region, clusterName string) error {
    	// projectID := "your-project-id"
    	// region := "us-central1"
    	// clusterName := "your-cluster"
    	ctx := context.Background()
    
    	// Create the job client.
    	endpoint := fmt.Sprintf("%s-dataproc.googleapis.com:443", region)
    	jobClient, err := dataproc.NewJobControllerClient(ctx, option.WithEndpoint(endpoint))
    	if err != nil {
    		log.Fatalf("error creating the job client: %s\n", err)
    	}
    
    	// Create the job config.
    	submitJobReq := &dataprocpb.SubmitJobRequest{
    		ProjectId: projectID,
    		Region:    region,
    		Job: &dataprocpb.Job{
    			Placement: &dataprocpb.JobPlacement{
    				ClusterName: clusterName,
    			},
    			TypeJob: &dataprocpb.Job_SparkJob{
    				SparkJob: &dataprocpb.SparkJob{
    					Driver: &dataprocpb.SparkJob_MainClass{
    						MainClass: "org.apache.spark.examples.SparkPi",
    					},
    					JarFileUris: []string{"file:///usr/lib/spark/examples/jars/spark-examples.jar"},
    					Args:        []string{"1000"},
    				},
    			},
    		},
    	}
    
    	submitJobOp, err := jobClient.SubmitJobAsOperation(ctx, submitJobReq)
    	if err != nil {
    		return fmt.Errorf("error with request to submitting job: %v", err)
    	}
    
    	submitJobResp, err := submitJobOp.Wait(ctx)
    	if err != nil {
    		return fmt.Errorf("error submitting job: %v", err)
    	}
    
    	re := regexp.MustCompile("gs://(.+?)/(.+)")
    	matches := re.FindStringSubmatch(submitJobResp.DriverOutputResourceUri)
    
    	if len(matches) < 3 {
    		return fmt.Errorf("regex error: %s", submitJobResp.DriverOutputResourceUri)
    	}
    
    	// Dataproc job output gets saved to a GCS bucket allocated to it.
    	storageClient, err := storage.NewClient(ctx)
    	if err != nil {
    		return fmt.Errorf("error creating storage client: %v", err)
    	}
    
    	obj := fmt.Sprintf("%s.000000000", matches[2])
    	reader, err := storageClient.Bucket(matches[1]).Object(obj).NewReader(ctx)
    	if err != nil {
    		return fmt.Errorf("error reading job output: %v", err)
    	}
    
    	defer reader.Close()
    
    	body, err := ioutil.ReadAll(reader)
    	if err != nil {
    		return fmt.Errorf("could not read output from Dataproc Job: %v", err)
    	}
    
    	fmt.Fprintf(w, "Job finished successfully: %s", body)
    
    	return nil
    }
    

Node.js

  1. 安装客户端库
  2. 设置应用默认凭据
  3. 运行代码
    const dataproc = require('@google-cloud/dataproc');
    const {Storage} = require('@google-cloud/storage');
    
    // TODO(developer): Uncomment and set the following variables
    // projectId = 'YOUR_PROJECT_ID'
    // region = 'YOUR_CLUSTER_REGION'
    // clusterName = 'YOUR_CLUSTER_NAME'
    
    // Create a client with the endpoint set to the desired cluster region
    const jobClient = new dataproc.v1.JobControllerClient({
      apiEndpoint: `${region}-dataproc.googleapis.com`,
      projectId: projectId,
    });
    
    async function submitJob() {
      const job = {
        projectId: projectId,
        region: region,
        job: {
          placement: {
            clusterName: clusterName,
          },
          sparkJob: {
            mainClass: 'org.apache.spark.examples.SparkPi',
            jarFileUris: [
              'file:///usr/lib/spark/examples/jars/spark-examples.jar',
            ],
            args: ['1000'],
          },
        },
      };
    
      const [jobOperation] = await jobClient.submitJobAsOperation(job);
      const [jobResponse] = await jobOperation.promise();
    
      const matches = jobResponse.driverOutputResourceUri.match(
        'gs://(.*?)/(.*)'
      );
    
      const storage = new Storage();
    
      const output = await storage
        .bucket(matches[1])
        .file(`${matches[2]}.000000000`)
        .download();
    
      // Output a success message.
      console.log(`Job finished successfully: ${output}`);

在集群上直接提交作业

如果您想不使用 Dataproc 服务而在集群上直接运行作业,请通过 SSH 连接到集群的主节点,然后在主节点上运行作业。

通过 SSH 连接到主实例

您可以通过 SSH 从命令行或 Cloud Console 连接到集群中的 Compute Engine 虚拟机实例。

gcloud 命令

在本地终端窗口或从 Cloud Shell 运行 gcloud compute ssh 命令,通过 SSH 连接到集群的主节点(主节点的默认名称为集群名加上 -m 后缀)。

gcloud compute ssh cluster-name-m \
    --region=region \
    --project=project-id

以下代码段使用 gcloud compute ssh,通过 SSH 连接到 cluster-1 的主节点。

gcloud compute ssh cluster-1-m \
    --region=us-central-1 \
    --project=my-project-id
...
Linux cluster-1-m 4.9.0-8-amd64 #1 SMP Debian 4.9.110-3+deb9u6...
...
user@cluster-1-m:~$

控制台

使用 Cloud Console,通过 SSH 连接到集群的主节点(主节点的默认名称为集群名加上 -m 后缀)。
  1. 在 Cloud Console 中,转到虚拟机实例页面。
  2. 在虚拟机实例列表中,在您希望连接的主实例行(-m 后缀)中点击 SSH。

此时会打开一个浏览器窗口并显示主节点上的主目录。

Connected, host fingerprint: ssh-rsa ...
Linux cluster-1-m 3.16.0-0.bpo.4-amd64 ...
...
user@cluster-1-m:~$

在主节点上运行 Spark 作业

建立与虚拟机主实例的 SSH 连接后,在终端窗口中集群的主节点上运行命令,以执行以下操作:

  1. 打开 Spark shell。
  2. 运行一个简单的 Spark 作业,用于计算可公开访问的 Cloud Storage 文件中的 Python“hello-world”文件(七行)的行数。
  3. 退出 shell。

    user@cluster-name-m:~$ spark-shell
    ...
    scala> sc.textFile("gs://dataproc-examples"
    + "/pyspark/hello-world/hello-world.py").count
    ...
    res0: Long = 7
    scala> :quit
    

在 Dataproc 上运行 bash 作业

您可能希望将 bash 脚本作为 Dataproc 作业运行,因为不支持将您使用的引擎用作顶级 Dataproc 作业类型,或者因为您需要在使用脚本中的 hadoopspark-submit 启动作业之前先进行其他设置或计算参数。

Pig 示例

假设您已将 hello.sh bash 脚本复制到 Cloud Storage:

gsutil cp hello.sh gs://${BUCKET}/hello.sh

由于 pig fs 命令使用 Hadoop 路径,因此请将脚本从 Cloud Storage 复制到指定为 file:/// 的目的地,以确保它位于本地文件系统而非 HDFS 上。后续 sh 命令会自动引用本地文件系统,并且不需要 file:/// 前缀。

gcloud dataproc jobs submit pig --cluster=${CLUSTER} --region=${REGION} \
    -e='fs -cp -f gs://${BUCKET}/hello.sh file:///tmp/hello.sh; sh chmod 750 /tmp/hello.sh; sh /tmp/hello.sh'

或者,由于 Dataproc 作业提交 --jars 参数将文件暂存到为作业生命周期创建的临时目录中,因此您可以将 Cloud Storage shell 脚本指定为 --jars 参数:

gcloud dataproc jobs submit pig --cluster=${CLUSTER} --region=${REGION} \
    --jars=gs://${BUCKET}/hello.sh \
    -e='sh chmod 750 ${PWD}/hello.sh; sh ${PWD}/hello.sh'

请注意,--jars 参数还可以引用本地脚本:

gcloud dataproc jobs submit pig --cluster=${CLUSTER} --region=${REGION} \
    --jars=hello.sh \
    -e='sh chmod 750 ${PWD}/hello.sh; sh ${PWD}/hello.sh'