Mengirim tugas

Anda dapat mengirimkan tugas ke cluster Dataproc yang ada melalui HTTP atau permintaan terprogram jobs.submit Dataproc API, menggunakan alat command line gcloud Google Cloud CLI di jendela terminal lokal atau di Cloud Shell, atau dari Konsol Google Cloud yang dibuka di browser lokal. Anda juga dapat SSH ke instance master di cluster, lalu menjalankan tugas langsung dari instance tanpa menggunakan layanan Dataproc.

Cara mengirimkan pekerjaan

Konsol

Buka halaman Kirim tugas Dataproc di Konsol Google Cloud pada browser Anda.

Contoh tugas Spark

Untuk mengirimkan contoh tugas Spark, isi kolom di halaman Submit a job, sebagai berikut:

  1. Pilih nama Cluster dari daftar cluster.
  2. Tetapkan Job type ke Spark.
  3. Tetapkan Class utama atau jar ke org.apache.spark.examples.SparkPi.
  4. Tetapkan Arguments ke argumen tunggal 1000.
  5. Tambahkan file:///usr/lib/spark/examples/jars/spark-examples.jar ke file Jar:
    1. file:/// menunjukkan skema LocalFileSystem Hadoop. Dataproc menginstal /usr/lib/spark/examples/jars/spark-examples.jar pada node master cluster saat membuat cluster.
    2. Atau, Anda dapat menentukan jalur Cloud Storage (gs://your-bucket/your-jarfile.jar) atau jalur Hadoop Distributed File System (hdfs://path-to-jar.jar) ke salah satu jar Anda.

Klik Submit untuk memulai tugas. Setelah pekerjaan dimulai, pekerjaan akan ditambahkan ke daftar Pekerjaan.

Klik ID Pekerjaan untuk membuka halaman Pekerjaan, tempat Anda dapat melihat output driver tugas. Karena tugas ini menghasilkan baris output panjang yang melebihi lebar jendela browser, Anda dapat mencentang kotak Penggabungan baris untuk menampilkan semua teks output dalam tampilan agar dapat menampilkan hasil yang dihitung untuk pi.

Anda dapat melihat output driver tugas dari command line menggunakan perintah gcloud dataproc jobswait yang ditunjukkan di bawah (untuk mengetahui informasi selengkapnya, baca Melihat output tugas–GCLOUD COMMAND). Salin dan tempel ID project Anda sebagai nilai untuk tanda --project dan ID Pekerjaan Anda (ditampilkan di daftar Tugas) sebagai argumen akhir.

gcloud dataproc jobs wait job-id \
    --project=project-id \
    --region=region

Berikut ini cuplikan dari output driver untuk contoh tugas SparkPi yang dikirimkan di atas:

...
2015-06-25 23:27:23,810 INFO [dag-scheduler-event-loop]
scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Stage 0 (reduce at
SparkPi.scala:35) finished in 21.169 s

2015-06-25 23:27:23,810 INFO [task-result-getter-3] cluster.YarnScheduler
(Logging.scala:logInfo(59)) - Removed TaskSet 0.0, whose tasks have all
completed, from pool

2015-06-25 23:27:23,819 INFO [main] scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Job 0 finished: reduce at SparkPi.scala:35,
took 21.674931 s

Pi is roughly 3.14189648
...
Job [c556b47a-4b46-4a94-9ba2-2dcee31167b2] finished successfully.

driverOutputUri:
gs://sample-staging-bucket/google-cloud-dataproc-metainfo/cfeaa033-749e-48b9-...
...

gcloud

Untuk mengirimkan tugas ke cluster Dataproc, jalankan perintah gcloud dataproc jobs submit gcloud CLI secara lokal di jendela terminal atau di Cloud Shell.

gcloud dataproc jobs submit job-command \
    --cluster=cluster-name \
    --region=region \
    other dataproc-flags \
    -- job-args
Contoh pengiriman tugas PySpark
  1. Mencantumkan hello-world.py yang dapat diakses secara publik yang terletak di Cloud Storage.
    gsutil cat gs://dataproc-examples/pyspark/hello-world/hello-world.py
    
    Daftar File:

    #!/usr/bin/python
    import pyspark
    sc = pyspark.SparkContext()
    rdd = sc.parallelize(['Hello,', 'world!'])
    words = sorted(rdd.collect())
    print(words)
  2. Kirim tugas Pyspark ke Dataproc.
    gcloud dataproc jobs submit pyspark \
        gs://dataproc-examples/pyspark/hello-world/hello-world.py \
        --cluster=cluster-name  \
        --region=region
    
    Output terminal:
    Waiting for job output...
    …
    ['Hello,', 'world!']
    Job finished successfully.
    
Contoh pengiriman tugas Spark
  1. Jalankan contoh SparkPi yang telah diinstal sebelumnya pada node master cluster Dataproc.
    gcloud dataproc jobs submit spark \
        --cluster=cluster-name \
        --region=region \
        --class=org.apache.spark.examples.SparkPi \
        --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
        -- 1000
    
    Output terminal:
    Job [54825071-ae28-4c5b-85a5-58fae6a597d6] submitted.
    Waiting for job output…
    …
    Pi is roughly 3.14177148
    …
    Job finished successfully.
    …
    

REST

Bagian ini menunjukkan cara mengirimkan tugas Spark untuk menghitung perkiraan nilai pi menggunakan jobs.submit API Dataproc.

Sebelum menggunakan salah satu data permintaan, lakukan penggantian berikut:

  • project-id: Project ID Google Cloud
  • region: region cluster
  • clusterName: nama cluster

Metode HTTP dan URL:

POST https://dataproc.googleapis.com/v1/projects/project-id/regions/region/jobs:submit

Meminta isi JSON:

{
  "job": {
    "placement": {
      "clusterName": "cluster-name"
    },
    },
    "sparkJob": {
      "args": [
        "1000"
      ],
      "mainClass": "org.apache.spark.examples.SparkPi",
      "jarFileUris": [
        "file:///usr/lib/spark/examples/jars/spark-examples.jar"
      ]
    }
  }
}

Untuk mengirim permintaan Anda, perluas salah satu opsi berikut:

Anda akan melihat respons JSON seperti berikut:

{
  "reference": {
    "projectId": "project-id",
    "jobId": "job-id"
  },
  "placement": {
    "clusterName": "cluster-name",
    "clusterUuid": "cluster-Uuid"
  },
  "sparkJob": {
    "mainClass": "org.apache.spark.examples.SparkPi",
    "args": [
      "1000"
    ],
    "jarFileUris": [
      "file:///usr/lib/spark/examples/jars/spark-examples.jar"
    ]
  },
  "status": {
    "state": "PENDING",
    "stateStartTime": "2020-10-07T20:16:21.759Z"
  },
  "jobUuid": "job-Uuid"
}

Java

  1. Menginstal library klien
  2. Menyiapkan kredensial default aplikasi
  3. Jalankan kode
    
    import com.google.api.gax.longrunning.OperationFuture;
    import com.google.cloud.dataproc.v1.Job;
    import com.google.cloud.dataproc.v1.JobControllerClient;
    import com.google.cloud.dataproc.v1.JobControllerSettings;
    import com.google.cloud.dataproc.v1.JobMetadata;
    import com.google.cloud.dataproc.v1.JobPlacement;
    import com.google.cloud.dataproc.v1.SparkJob;
    import com.google.cloud.storage.Blob;
    import com.google.cloud.storage.Storage;
    import com.google.cloud.storage.StorageOptions;
    import java.io.IOException;
    import java.util.concurrent.ExecutionException;
    import java.util.regex.Matcher;
    import java.util.regex.Pattern;
    
    public class SubmitJob {
    
      public static void submitJob() throws IOException, InterruptedException {
        // TODO(developer): Replace these variables before running the sample.
        String projectId = "your-project-id";
        String region = "your-project-region";
        String clusterName = "your-cluster-name";
        submitJob(projectId, region, clusterName);
      }
    
      public static void submitJob(String projectId, String region, String clusterName)
          throws IOException, InterruptedException {
        String myEndpoint = String.format("%s-dataproc.googleapis.com:443", region);
    
        // Configure the settings for the job controller client.
        JobControllerSettings jobControllerSettings =
            JobControllerSettings.newBuilder().setEndpoint(myEndpoint).build();
    
        // Create a job controller client with the configured settings. Using a try-with-resources
        // closes the client,
        // but this can also be done manually with the .close() method.
        try (JobControllerClient jobControllerClient =
            JobControllerClient.create(jobControllerSettings)) {
    
          // Configure cluster placement for the job.
          JobPlacement jobPlacement = JobPlacement.newBuilder().setClusterName(clusterName).build();
    
          // Configure Spark job settings.
          SparkJob sparkJob =
              SparkJob.newBuilder()
                  .setMainClass("org.apache.spark.examples.SparkPi")
                  .addJarFileUris("file:///usr/lib/spark/examples/jars/spark-examples.jar")
                  .addArgs("1000")
                  .build();
    
          Job job = Job.newBuilder().setPlacement(jobPlacement).setSparkJob(sparkJob).build();
    
          // Submit an asynchronous request to execute the job.
          OperationFuture<Job, JobMetadata> submitJobAsOperationAsyncRequest =
              jobControllerClient.submitJobAsOperationAsync(projectId, region, job);
    
          Job response = submitJobAsOperationAsyncRequest.get();
    
          // Print output from Google Cloud Storage.
          Matcher matches =
              Pattern.compile("gs://(.*?)/(.*)").matcher(response.getDriverOutputResourceUri());
          matches.matches();
    
          Storage storage = StorageOptions.getDefaultInstance().getService();
          Blob blob = storage.get(matches.group(1), String.format("%s.000000000", matches.group(2)));
    
          System.out.println(
              String.format("Job finished successfully: %s", new String(blob.getContent())));
    
        } catch (ExecutionException e) {
          // If the job does not complete successfully, print the error message.
          System.err.println(String.format("submitJob: %s ", e.getMessage()));
        }
      }
    }

Python

  1. Menginstal library klien
  2. Menyiapkan kredensial default aplikasi
  3. Jalankan kode
    import re
    
    from google.cloud import dataproc_v1 as dataproc
    from google.cloud import storage
    
    def submit_job(project_id, region, cluster_name):
        # Create the job client.
        job_client = dataproc.JobControllerClient(
            client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
        )
    
        # Create the job config. 'main_jar_file_uri' can also be a
        # Google Cloud Storage URL.
        job = {
            "placement": {"cluster_name": cluster_name},
            "spark_job": {
                "main_class": "org.apache.spark.examples.SparkPi",
                "jar_file_uris": ["file:///usr/lib/spark/examples/jars/spark-examples.jar"],
                "args": ["1000"],
            },
        }
    
        operation = job_client.submit_job_as_operation(
            request={"project_id": project_id, "region": region, "job": job}
        )
        response = operation.result()
    
        # Dataproc job output gets saved to the Google Cloud Storage bucket
        # allocated to the job. Use a regex to obtain the bucket and blob info.
        matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri)
    
        output = (
            storage.Client()
            .get_bucket(matches.group(1))
            .blob(f"{matches.group(2)}.000000000")
            .download_as_bytes()
            .decode("utf-8")
        )
    
        print(f"Job finished successfully: {output}")
    
    

Go

  1. Menginstal library klien
  2. Menyiapkan kredensial default aplikasi
  3. Jalankan kode
    import (
    	"context"
    	"fmt"
    	"io"
    	"io/ioutil"
    	"log"
    	"regexp"
    
    	dataproc "cloud.google.com/go/dataproc/apiv1"
    	"cloud.google.com/go/dataproc/apiv1/dataprocpb"
    	"cloud.google.com/go/storage"
    	"google.golang.org/api/option"
    )
    
    func submitJob(w io.Writer, projectID, region, clusterName string) error {
    	// projectID := "your-project-id"
    	// region := "us-central1"
    	// clusterName := "your-cluster"
    	ctx := context.Background()
    
    	// Create the job client.
    	endpoint := fmt.Sprintf("%s-dataproc.googleapis.com:443", region)
    	jobClient, err := dataproc.NewJobControllerClient(ctx, option.WithEndpoint(endpoint))
    	if err != nil {
    		log.Fatalf("error creating the job client: %s\n", err)
    	}
    
    	// Create the job config.
    	submitJobReq := &dataprocpb.SubmitJobRequest{
    		ProjectId: projectID,
    		Region:    region,
    		Job: &dataprocpb.Job{
    			Placement: &dataprocpb.JobPlacement{
    				ClusterName: clusterName,
    			},
    			TypeJob: &dataprocpb.Job_SparkJob{
    				SparkJob: &dataprocpb.SparkJob{
    					Driver: &dataprocpb.SparkJob_MainClass{
    						MainClass: "org.apache.spark.examples.SparkPi",
    					},
    					JarFileUris: []string{"file:///usr/lib/spark/examples/jars/spark-examples.jar"},
    					Args:        []string{"1000"},
    				},
    			},
    		},
    	}
    
    	submitJobOp, err := jobClient.SubmitJobAsOperation(ctx, submitJobReq)
    	if err != nil {
    		return fmt.Errorf("error with request to submitting job: %w", err)
    	}
    
    	submitJobResp, err := submitJobOp.Wait(ctx)
    	if err != nil {
    		return fmt.Errorf("error submitting job: %w", err)
    	}
    
    	re := regexp.MustCompile("gs://(.+?)/(.+)")
    	matches := re.FindStringSubmatch(submitJobResp.DriverOutputResourceUri)
    
    	if len(matches) < 3 {
    		return fmt.Errorf("regex error: %s", submitJobResp.DriverOutputResourceUri)
    	}
    
    	// Dataproc job output gets saved to a GCS bucket allocated to it.
    	storageClient, err := storage.NewClient(ctx)
    	if err != nil {
    		return fmt.Errorf("error creating storage client: %w", err)
    	}
    
    	obj := fmt.Sprintf("%s.000000000", matches[2])
    	reader, err := storageClient.Bucket(matches[1]).Object(obj).NewReader(ctx)
    	if err != nil {
    		return fmt.Errorf("error reading job output: %w", err)
    	}
    
    	defer reader.Close()
    
    	body, err := ioutil.ReadAll(reader)
    	if err != nil {
    		return fmt.Errorf("could not read output from Dataproc Job: %w", err)
    	}
    
    	fmt.Fprintf(w, "Job finished successfully: %s", body)
    
    	return nil
    }
    

Node.js

  1. Menginstal library klien
  2. Menyiapkan kredensial default aplikasi
  3. Jalankan kode
    const dataproc = require('@google-cloud/dataproc');
    const {Storage} = require('@google-cloud/storage');
    
    // TODO(developer): Uncomment and set the following variables
    // projectId = 'YOUR_PROJECT_ID'
    // region = 'YOUR_CLUSTER_REGION'
    // clusterName = 'YOUR_CLUSTER_NAME'
    
    // Create a client with the endpoint set to the desired cluster region
    const jobClient = new dataproc.v1.JobControllerClient({
      apiEndpoint: `${region}-dataproc.googleapis.com`,
      projectId: projectId,
    });
    
    async function submitJob() {
      const job = {
        projectId: projectId,
        region: region,
        job: {
          placement: {
            clusterName: clusterName,
          },
          sparkJob: {
            mainClass: 'org.apache.spark.examples.SparkPi',
            jarFileUris: [
              'file:///usr/lib/spark/examples/jars/spark-examples.jar',
            ],
            args: ['1000'],
          },
        },
      };
    
      const [jobOperation] = await jobClient.submitJobAsOperation(job);
      const [jobResponse] = await jobOperation.promise();
    
      const matches =
        jobResponse.driverOutputResourceUri.match('gs://(.*?)/(.*)');
    
      const storage = new Storage();
    
      const output = await storage
        .bucket(matches[1])
        .file(`${matches[2]}.000000000`)
        .download();
    
      // Output a success message.
      console.log(`Job finished successfully: ${output}`);

Mengirim tugas langsung di cluster

Jika Anda ingin menjalankan tugas langsung di cluster tanpa menggunakan layanan Dataproc, SSH ke node master cluster, lalu jalankan tugas di node master.

Setelah membuat koneksi SSH ke instance master VM, jalankan perintah di jendela terminal pada node master cluster untuk:

  1. Buka shell Spark.
  2. Jalankan tugas Spark sederhana untuk menghitung jumlah baris dalam file "hello-world" Python (tujuh baris) yang terletak di file Cloud Storage yang dapat diakses secara publik.
  3. Keluar dari shell.

    user@cluster-name-m:~$ spark-shell
    ...
    scala> sc.textFile("gs://dataproc-examples"
    + "/pyspark/hello-world/hello-world.py").count
    ...
    res0: Long = 7
    scala> :quit
    

Menjalankan tugas bash di Dataproc

Sebaiknya jalankan skrip bash sebagai tugas Dataproc, baik karena mesin yang Anda gunakan tidak didukung sebagai jenis tugas Dataproc level teratas atau karena Anda perlu melakukan penyiapan atau penghitungan argumen tambahan sebelum meluncurkan tugas menggunakan hadoop atau spark-submit dari skrip.

Contoh Babi

Misalkan Anda menyalin skrip hello.sh bash ke Cloud Storage:

gsutil cp hello.sh gs://${BUCKET}/hello.sh

Karena perintah pig fs menggunakan jalur Hadoop, salin skrip dari Cloud Storage ke tujuan yang ditentukan sebagai file:/// untuk memastikan skrip tersebut berada di sistem file lokal, bukan HDFS. Perintah sh berikutnya mereferensikan sistem file lokal secara otomatis dan tidak memerlukan awalan file:///.

gcloud dataproc jobs submit pig --cluster=${CLUSTER} --region=${REGION} \
    -e='fs -cp -f gs://${BUCKET}/hello.sh file:///tmp/hello.sh; sh chmod 750 /tmp/hello.sh; sh /tmp/hello.sh'

Atau, karena tugas Dataproc mengirimkan argumen --jars ke tahapan file menjadi direktori sementara yang dibuat selama masa aktif tugas, Anda dapat menentukan skrip shell Cloud Storage sebagai argumen --jars:

gcloud dataproc jobs submit pig --cluster=${CLUSTER} --region=${REGION} \
    --jars=gs://${BUCKET}/hello.sh \
    -e='sh chmod 750 ${PWD}/hello.sh; sh ${PWD}/hello.sh'

Perhatikan bahwa argumen --jars juga dapat merujuk ke skrip lokal:

gcloud dataproc jobs submit pig --cluster=${CLUSTER} --region=${REGION} \
    --jars=hello.sh \
    -e='sh chmod 750 ${PWD}/hello.sh; sh ${PWD}/hello.sh'