작업 제출

Dataproc API jobs.submit HTTP 또는 프로그래매틱 요청을 통해, 로컬 터미널 창 또는 Cloud Shell에서 Google Cloud CLI gcloud 명령줄 도구를 사용하여, 또는 로컬 브라우저에 열린 Google Cloud 콘솔에서 작업을 기존 Dataproc 클러스터에 제출할 수 있습니다. 클러스터에서 마스터 인스턴스에 SSH를 통해 연결한 후 Dataproc 서비스를 사용하지 않고 인스턴스에서 바로 작업을 실행할 수도 있습니다.

작업 제출 방법

콘솔

브라우저의 Google Cloud 콘솔에서 Dataproc 작업 제출 페이지를 엽니다.

Spark 작업 예시

샘플 Spark 작업을 제출하려면 작업 제출 페이지의 필드를 다음과 같이 채웁니다.

  1. 클러스터 목록에서 클러스터 이름을 선택합니다.
  2. 작업 유형Spark로 설정합니다.
  3. 기본 클래스 또는 jarorg.apache.spark.examples.SparkPi로 설정합니다.
  4. 인수를 단일 인수 1000으로 설정합니다.
  5. file:///usr/lib/spark/examples/jars/spark-examples.jarJar 파일로 설정합니다.
    1. file:///는 Hadoop LocalFileSystem 스키마를 나타냅니다. 클러스터 생성 시 Dataproc는 클러스터의 마스터 노드에 /usr/lib/spark/examples/jars/spark-examples.jar을 설치했습니다.
    2. 또는 Cloud Storage 경로(gs://your-bucket/your-jarfile.jar)나 Hadoop 분산 파일 시스템 경로(hdfs://path-to-jar.jar)를 jar 하나에 지정할 수 있습니다.

제출을 클릭하여 작업을 시작합니다. 작업이 시작되면 작업 목록에 추가됩니다.

작업 ID를 클릭하여 작업의 드라이버 출력을 볼 수 있는 작업 페이지를 엽니다. 이 작업은 브라우저 창 너비를 초과하는 긴 출력 줄을 생성하므로, pi에 대해 계산된 결과가 표시되도록 줄바꿈 상자를 선택하여 모든 출력 텍스트를 뷰 안에서 볼 수 있습니다.

아래에 표시된 gcloud dataproc jobs wait 명령어를 사용하여 명령줄에서 작업의 드라이버 출력을 확인할 수 있습니다(자세한 내용은 작업 출력 보기–GCLOUD 명령어 참조). 프로젝트 ID를 복사하여 --project 플래그의 값으로 붙여넣고, 작업 ID(작업 목록에 표시됨)를 최종 인수로 붙여넣습니다.

gcloud dataproc jobs wait job-id \
    --project=project-id \
    --region=region

다음은 위에 제출된 샘플 SparkPi 작업의 드라이버 출력에서 가져온 스니펫입니다.

...
2015-06-25 23:27:23,810 INFO [dag-scheduler-event-loop]
scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Stage 0 (reduce at
SparkPi.scala:35) finished in 21.169 s

2015-06-25 23:27:23,810 INFO [task-result-getter-3] cluster.YarnScheduler
(Logging.scala:logInfo(59)) - Removed TaskSet 0.0, whose tasks have all
completed, from pool

2015-06-25 23:27:23,819 INFO [main] scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Job 0 finished: reduce at SparkPi.scala:35,
took 21.674931 s

Pi is roughly 3.14189648
...
Job [c556b47a-4b46-4a94-9ba2-2dcee31167b2] finished successfully.

driverOutputUri:
gs://sample-staging-bucket/google-cloud-dataproc-metainfo/cfeaa033-749e-48b9-...
...

gcloud

Dataproc 클러스터에 작업을 제출하려면 터미널 창 또는 Cloud Shell에서 로컬로 gcloud CLI gcloud dataproc jobs submit 명령어를 실행합니다.

gcloud dataproc jobs submit job-command \
    --cluster=cluster-name \
    --region=region \
    other dataproc-flags \
    -- job-args

PySpark 작업 제출 예시
  1. Cloud Storage에 있으며 공개적으로 액세스 가능한 hello-world.py를 나열합니다.
    gcloud storage cat gs://dataproc-examples/pyspark/hello-world/hello-world.py
    
    파일 목록:

    #!/usr/bin/python
    import pyspark
    sc = pyspark.SparkContext()
    rdd = sc.parallelize(['Hello,', 'world!'])
    words = sorted(rdd.collect())
    print(words)
    
  2. PySpark 작업을 Dataproc에 제출합니다.
    gcloud dataproc jobs submit pyspark \
        gs://dataproc-examples/pyspark/hello-world/hello-world.py \
        --cluster=cluster-name  \
        --region=region
    
    터미널 출력:
    Waiting for job output...
    …
    ['Hello,', 'world!']
    Job finished successfully.
    
Spark 작업 제출 예시
  1. Dataproc 클러스터의 마스터 노드에 사전 설치된 SparkPi 예시를 실행합니다.
    gcloud dataproc jobs submit spark \
        --cluster=cluster-name \
        --region=region \
        --class=org.apache.spark.examples.SparkPi \
        --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
        -- 1000
    
    터미널 출력:
    Job [54825071-ae28-4c5b-85a5-58fae6a597d6] submitted.
    Waiting for job output…
    …
    Pi is roughly 3.14177148
    …
    Job finished successfully.
    …
    

REST

이 섹션에서는 Spark 작업을 제출하여 Dataproc jobs.submit API로 pi의 대략적인 값을 계산하는 방법을 보여줍니다.

요청 데이터를 사용하기 전에 다음을 바꿉니다.

  • project-id: Google Cloud 프로젝트 ID
  • region: 클러스터 리전
  • clusterName: 클러스터 이름

HTTP 메서드 및 URL:

POST https://dataproc.googleapis.com/v1/projects/project-id/regions/region/jobs:submit

JSON 요청 본문:

{
  "job": {
    "placement": {
      "clusterName": "cluster-name"
    },
    },
    "sparkJob": {
      "args": [
        "1000"
      ],
      "mainClass": "org.apache.spark.examples.SparkPi",
      "jarFileUris": [
        "file:///usr/lib/spark/examples/jars/spark-examples.jar"
      ]
    }
  }
}

요청을 보내려면 다음 옵션 중 하나를 펼칩니다.

다음과 비슷한 JSON 응답이 표시됩니다.

{
  "reference": {
    "projectId": "project-id",
    "jobId": "job-id"
  },
  "placement": {
    "clusterName": "cluster-name",
    "clusterUuid": "cluster-Uuid"
  },
  "sparkJob": {
    "mainClass": "org.apache.spark.examples.SparkPi",
    "args": [
      "1000"
    ],
    "jarFileUris": [
      "file:///usr/lib/spark/examples/jars/spark-examples.jar"
    ]
  },
  "status": {
    "state": "PENDING",
    "stateStartTime": "2020-10-07T20:16:21.759Z"
  },
  "jobUuid": "job-Uuid"
}

Java

  1. 클라이언트 라이브러리 설치
  2. 애플리케이션 기본 사용자 인증 정보 설정
  3. 코드 실행
    
    import com.google.api.gax.longrunning.OperationFuture;
    import com.google.cloud.dataproc.v1.Job;
    import com.google.cloud.dataproc.v1.JobControllerClient;
    import com.google.cloud.dataproc.v1.JobControllerSettings;
    import com.google.cloud.dataproc.v1.JobMetadata;
    import com.google.cloud.dataproc.v1.JobPlacement;
    import com.google.cloud.dataproc.v1.SparkJob;
    import com.google.cloud.storage.Blob;
    import com.google.cloud.storage.Storage;
    import com.google.cloud.storage.StorageOptions;
    import java.io.IOException;
    import java.util.concurrent.ExecutionException;
    import java.util.regex.Matcher;
    import java.util.regex.Pattern;
    
    public class SubmitJob {
    
      public static void submitJob() throws IOException, InterruptedException {
        // TODO(developer): Replace these variables before running the sample.
        String projectId = "your-project-id";
        String region = "your-project-region";
        String clusterName = "your-cluster-name";
        submitJob(projectId, region, clusterName);
      }
    
      public static void submitJob(String projectId, String region, String clusterName)
          throws IOException, InterruptedException {
        String myEndpoint = String.format("%s-dataproc.googleapis.com:443", region);
    
        // Configure the settings for the job controller client.
        JobControllerSettings jobControllerSettings =
            JobControllerSettings.newBuilder().setEndpoint(myEndpoint).build();
    
        // Create a job controller client with the configured settings. Using a try-with-resources
        // closes the client,
        // but this can also be done manually with the .close() method.
        try (JobControllerClient jobControllerClient =
            JobControllerClient.create(jobControllerSettings)) {
    
          // Configure cluster placement for the job.
          JobPlacement jobPlacement = JobPlacement.newBuilder().setClusterName(clusterName).build();
    
          // Configure Spark job settings.
          SparkJob sparkJob =
              SparkJob.newBuilder()
                  .setMainClass("org.apache.spark.examples.SparkPi")
                  .addJarFileUris("file:///usr/lib/spark/examples/jars/spark-examples.jar")
                  .addArgs("1000")
                  .build();
    
          Job job = Job.newBuilder().setPlacement(jobPlacement).setSparkJob(sparkJob).build();
    
          // Submit an asynchronous request to execute the job.
          OperationFuture<Job, JobMetadata> submitJobAsOperationAsyncRequest =
              jobControllerClient.submitJobAsOperationAsync(projectId, region, job);
    
          Job response = submitJobAsOperationAsyncRequest.get();
    
          // Print output from Google Cloud Storage.
          Matcher matches =
              Pattern.compile("gs://(.*?)/(.*)").matcher(response.getDriverOutputResourceUri());
          matches.matches();
    
          Storage storage = StorageOptions.getDefaultInstance().getService();
          Blob blob = storage.get(matches.group(1), String.format("%s.000000000", matches.group(2)));
    
          System.out.println(
              String.format("Job finished successfully: %s", new String(blob.getContent())));
    
        } catch (ExecutionException e) {
          // If the job does not complete successfully, print the error message.
          System.err.println(String.format("submitJob: %s ", e.getMessage()));
        }
      }
    }

Python

  1. 클라이언트 라이브러리 설치
  2. 애플리케이션 기본 사용자 인증 정보 설정
  3. 코드 실행
    import re
    
    
    from google.cloud import dataproc_v1 as dataproc
    from google.cloud import storage
    
    
    def submit_job(project_id, region, cluster_name):
        # Create the job client.
        job_client = dataproc.JobControllerClient(
            client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
        )
    
        # Create the job config. 'main_jar_file_uri' can also be a
        # Google Cloud Storage URL.
        job = {
            "placement": {"cluster_name": cluster_name},
            "spark_job": {
                "main_class": "org.apache.spark.examples.SparkPi",
                "jar_file_uris": ["file:///usr/lib/spark/examples/jars/spark-examples.jar"],
                "args": ["1000"],
            },
        }
    
        operation = job_client.submit_job_as_operation(
            request={"project_id": project_id, "region": region, "job": job}
        )
        response = operation.result()
    
        # Dataproc job output gets saved to the Google Cloud Storage bucket
        # allocated to the job. Use a regex to obtain the bucket and blob info.
        matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri)
    
        output = (
            storage.Client()
            .get_bucket(matches.group(1))
            .blob(f"{matches.group(2)}.000000000")
            .download_as_bytes()
            .decode("utf-8")
        )
    
        print(f"Job finished successfully: {output}")
    
    

Go

  1. 클라이언트 라이브러리 설치
  2. 애플리케이션 기본 사용자 인증 정보 설정
  3. 코드 실행
    import (
    	"context"
    	"fmt"
    	"io"
    	"io/ioutil"
    	"log"
    	"regexp"
    
    	dataproc "cloud.google.com/go/dataproc/apiv1"
    	"cloud.google.com/go/dataproc/apiv1/dataprocpb"
    	"cloud.google.com/go/storage"
    	"google.golang.org/api/option"
    )
    
    func submitJob(w io.Writer, projectID, region, clusterName string) error {
    	// projectID := "your-project-id"
    	// region := "us-central1"
    	// clusterName := "your-cluster"
    	ctx := context.Background()
    
    	// Create the job client.
    	endpoint := fmt.Sprintf("%s-dataproc.googleapis.com:443", region)
    	jobClient, err := dataproc.NewJobControllerClient(ctx, option.WithEndpoint(endpoint))
    	if err != nil {
    		log.Fatalf("error creating the job client: %s\n", err)
    	}
    
    	// Create the job config.
    	submitJobReq := &dataprocpb.SubmitJobRequest{
    		ProjectId: projectID,
    		Region:    region,
    		Job: &dataprocpb.Job{
    			Placement: &dataprocpb.JobPlacement{
    				ClusterName: clusterName,
    			},
    			TypeJob: &dataprocpb.Job_SparkJob{
    				SparkJob: &dataprocpb.SparkJob{
    					Driver: &dataprocpb.SparkJob_MainClass{
    						MainClass: "org.apache.spark.examples.SparkPi",
    					},
    					JarFileUris: []string{"file:///usr/lib/spark/examples/jars/spark-examples.jar"},
    					Args:        []string{"1000"},
    				},
    			},
    		},
    	}
    
    	submitJobOp, err := jobClient.SubmitJobAsOperation(ctx, submitJobReq)
    	if err != nil {
    		return fmt.Errorf("error with request to submitting job: %w", err)
    	}
    
    	submitJobResp, err := submitJobOp.Wait(ctx)
    	if err != nil {
    		return fmt.Errorf("error submitting job: %w", err)
    	}
    
    	re := regexp.MustCompile("gs://(.+?)/(.+)")
    	matches := re.FindStringSubmatch(submitJobResp.DriverOutputResourceUri)
    
    	if len(matches) < 3 {
    		return fmt.Errorf("regex error: %s", submitJobResp.DriverOutputResourceUri)
    	}
    
    	// Dataproc job output gets saved to a GCS bucket allocated to it.
    	storageClient, err := storage.NewClient(ctx)
    	if err != nil {
    		return fmt.Errorf("error creating storage client: %w", err)
    	}
    
    	obj := fmt.Sprintf("%s.000000000", matches[2])
    	reader, err := storageClient.Bucket(matches[1]).Object(obj).NewReader(ctx)
    	if err != nil {
    		return fmt.Errorf("error reading job output: %w", err)
    	}
    
    	defer reader.Close()
    
    	body, err := ioutil.ReadAll(reader)
    	if err != nil {
    		return fmt.Errorf("could not read output from Dataproc Job: %w", err)
    	}
    
    	fmt.Fprintf(w, "Job finished successfully: %s", body)
    
    	return nil
    }
    

Node.js

  1. 클라이언트 라이브러리 설치
  2. 애플리케이션 기본 사용자 인증 정보 설정
  3. 코드 실행
    const dataproc = require('@google-cloud/dataproc');
    const {Storage} = require('@google-cloud/storage');
    
    // TODO(developer): Uncomment and set the following variables
    // projectId = 'YOUR_PROJECT_ID'
    // region = 'YOUR_CLUSTER_REGION'
    // clusterName = 'YOUR_CLUSTER_NAME'
    
    // Create a client with the endpoint set to the desired cluster region
    const jobClient = new dataproc.v1.JobControllerClient({
      apiEndpoint: `${region}-dataproc.googleapis.com`,
      projectId: projectId,
    });
    
    async function submitJob() {
      const job = {
        projectId: projectId,
        region: region,
        job: {
          placement: {
            clusterName: clusterName,
          },
          sparkJob: {
            mainClass: 'org.apache.spark.examples.SparkPi',
            jarFileUris: [
              'file:///usr/lib/spark/examples/jars/spark-examples.jar',
            ],
            args: ['1000'],
          },
        },
      };
    
      const [jobOperation] = await jobClient.submitJobAsOperation(job);
      const [jobResponse] = await jobOperation.promise();
    
      const matches =
        jobResponse.driverOutputResourceUri.match('gs://(.*?)/(.*)');
    
      const storage = new Storage();
    
      const output = await storage
        .bucket(matches[1])
        .file(`${matches[2]}.000000000`)
        .download();
    
      // Output a success message.
      console.log(`Job finished successfully: ${output}`);

클러스터에서 바로 작업 제출

Dataproc 서비스를 사용하지 않고 클러스터에서 바로 작업을 실행하려면 클러스터의 마스터 노드에 SSH를 적용한 후에 마스터 노드에서 작업을 실행합니다.

VM 마스터 인스턴스에 SSH 연결을 설정한 후에 클러스터의 마스터 노드에 있는 터미널 창에서 다음과 같은 명령을 실행합니다.

  1. Spark 셸을 엽니다.
  2. 간단한 Spark 작업을 실행하여 공개적으로 액세스할 수 있는 Cloud Storage 파일에 있는 (7줄짜리) Python 'hello-world' 파일에서 줄의 수를 셉니다.
  3. 셸을 종료합니다.

    user@cluster-name-m:~$ spark-shell
    ...
    scala> sc.textFile("gs://dataproc-examples"
    + "/pyspark/hello-world/hello-world.py").count
    ...
    res0: Long = 7
    scala> :quit
    

Dataproc에서 bash 작업 실행

사용하는 엔진이 최상위 Dataproc 작업 유형으로 지원되지 않거나 스크립트에서 hadoop 또는 spark-submit를 사용하여 작업을 시작하기 전에 인수 추가 설정 또는 계산을 해야 하므로 bash 스크립트를 Dataproc 작업으로 실행할 수 있습니다.

Pig 예시

다음과 같이 hello.sh bash 스크립트를 Cloud Storage에 복사했다고 가정합니다.

gcloud storage cp hello.sh gs://${BUCKET}/hello.sh

pig fs 명령어는 Hadoop 경로를 사용하므로 Cloud Storage에서 file:///로 지정된 대상으로 스크립트를 복사하여 HDFS가 아닌 로컬 파일 시스템에 있는지 확인합니다. 후속 sh 명령어는 로컬 파일 시스템을 자동으로 참조하며 file:/// 프리픽스가 필요하지 않습니다.

gcloud dataproc jobs submit pig --cluster=${CLUSTER} --region=${REGION} \
    -e='fs -cp -f gs://${BUCKET}/hello.sh file:///tmp/hello.sh; sh chmod 750 /tmp/hello.sh; sh /tmp/hello.sh'

또는 Dataproc 작업은 --jars 인수를 제출하여 작업 전체 기간 동안 생성된 임시 디렉터리로 파일을 스테이징하므로 Cloud Storage 셸 스크립트를 --jars 인수로 지정할 수 있습니다.

gcloud dataproc jobs submit pig --cluster=${CLUSTER} --region=${REGION} \
    --jars=gs://${BUCKET}/hello.sh \
    -e='sh chmod 750 ${PWD}/hello.sh; sh ${PWD}/hello.sh'

--jars 인수는 로컬 스크립트도 참조할 수 있습니다.

gcloud dataproc jobs submit pig --cluster=${CLUSTER} --region=${REGION} \
    --jars=hello.sh \
    -e='sh chmod 750 ${PWD}/hello.sh; sh ${PWD}/hello.sh'