Kirim tugas

Anda dapat mengirimkan tugas ke cluster Dataproc yang ada melalui permintaan HTTP atau terprogram jobs.submit Dataproc API, menggunakan alat command line gcloud Google Cloud CLI di jendela terminal lokal atau di Cloud Shell, atau dari Konsol Google Cloud yang dibuka di browser lokal. Anda juga dapat menggunakan SSH ke instance master di cluster, lalu menjalankan tugas langsung dari instance tanpa menggunakan layanan Dataproc.

Cara mengirimkan tugas

Konsol

Buka halaman Kirim tugas Dataproc di konsol Google Cloud di browser Anda.

Contoh tugas Spark

Untuk mengirimkan contoh tugas Spark, isi kolom di halaman Kirim tugas, sebagai berikut:

  1. Pilih nama Cluster dari daftar cluster.
  2. Tetapkan Jenis pekerjaan ke Spark.
  3. Tetapkan Main class or jar ke org.apache.spark.examples.SparkPi.
  4. Tetapkan Arguments ke satu argumen 1000.
  5. Tambahkan file:///usr/lib/spark/examples/jars/spark-examples.jar ke File jar:
    1. file:/// menunjukkan skema LocalFileSystem Hadoop. Dataproc menginstal /usr/lib/spark/examples/jars/spark-examples.jar di node master cluster saat membuat cluster.
    2. Atau, Anda dapat menentukan jalur Cloud Storage (gs://your-bucket/your-jarfile.jar) atau jalur Hadoop Distributed File System (hdfs://path-to-jar.jar) ke salah satu jar Anda.

Klik Kirim untuk memulai tugas. Setelah dimulai, tugas akan ditambahkan ke daftar Tugas.

Klik ID Tugas untuk membuka halaman Tugas, tempat Anda dapat melihat output driver tugas. Karena tugas ini menghasilkan baris output panjang yang melebihi lebar jendela browser, Anda dapat mencentang kotak Line wrapping untuk menampilkan semua teks output dalam tampilan guna menampilkan hasil yang dihitung untuk pi.

Anda dapat melihat output driver tugas dari command line menggunakan perintah gcloud dataproc jobs wait yang ditampilkan di bawah (untuk informasi selengkapnya, lihat Melihat output tugas–Perintah GCLOUD). Salin dan tempel project ID Anda sebagai nilai untuk flag --project dan ID Pekerjaan Anda (ditampilkan di daftar Pekerjaan) sebagai argumen akhir.

gcloud dataproc jobs wait job-id \
    --project=project-id \
    --region=region

Berikut adalah cuplikan dari output driver untuk contoh tugas SparkPi yang dikirimkan di atas:

...
2015-06-25 23:27:23,810 INFO [dag-scheduler-event-loop]
scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Stage 0 (reduce at
SparkPi.scala:35) finished in 21.169 s

2015-06-25 23:27:23,810 INFO [task-result-getter-3] cluster.YarnScheduler
(Logging.scala:logInfo(59)) - Removed TaskSet 0.0, whose tasks have all
completed, from pool

2015-06-25 23:27:23,819 INFO [main] scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Job 0 finished: reduce at SparkPi.scala:35,
took 21.674931 s

Pi is roughly 3.14189648
...
Job [c556b47a-4b46-4a94-9ba2-2dcee31167b2] finished successfully.

driverOutputUri:
gs://sample-staging-bucket/google-cloud-dataproc-metainfo/cfeaa033-749e-48b9-...
...

gcloud

Untuk mengirimkan tugas ke cluster Dataproc, jalankan perintah gcloud CLI gcloud dataproc jobs submit secara lokal di jendela terminal atau di Cloud Shell.

gcloud dataproc jobs submit job-command \
    --cluster=cluster-name \
    --region=region \
    other dataproc-flags \
    -- job-args
Contoh pengiriman tugas PySpark
  1. Cantumkan hello-world.py yang dapat diakses secara publik dan terletak di Cloud Storage.
    gcloud storage cat gs://dataproc-examples/pyspark/hello-world/hello-world.py
    
    Listingan File:

    #!/usr/bin/python
    import pyspark
    sc = pyspark.SparkContext()
    rdd = sc.parallelize(['Hello,', 'world!'])
    words = sorted(rdd.collect())
    print(words)
  2. Kirim tugas Pyspark ke Dataproc.
    gcloud dataproc jobs submit pyspark \
        gs://dataproc-examples/pyspark/hello-world/hello-world.py \
        --cluster=cluster-name  \
        --region=region
    
    Output terminal:
    Waiting for job output...
    …
    ['Hello,', 'world!']
    Job finished successfully.
    
Contoh pengiriman tugas Spark
  1. Jalankan contoh SparkPi yang telah diinstal sebelumnya di node master cluster Dataproc.
    gcloud dataproc jobs submit spark \
        --cluster=cluster-name \
        --region=region \
        --class=org.apache.spark.examples.SparkPi \
        --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
        -- 1000
    
    Output terminal:
    Job [54825071-ae28-4c5b-85a5-58fae6a597d6] submitted.
    Waiting for job output…
    …
    Pi is roughly 3.14177148
    …
    Job finished successfully.
    …
    

REST

Bagian ini menunjukkan cara mengirimkan tugas Spark untuk menghitung perkiraan nilai pi menggunakan API jobs.submit Dataproc.

Sebelum menggunakan salah satu data permintaan, lakukan penggantian berikut:

  • project-id: Project ID Google Cloud
  • region: cluster region
  • clusterName: nama cluster

Metode HTTP dan URL:

POST https://dataproc.googleapis.com/v1/projects/project-id/regions/region/jobs:submit

Meminta isi JSON:

{
  "job": {
    "placement": {
      "clusterName": "cluster-name"
    },
    "sparkJob": {
      "args": [
        "1000"
      ],
      "mainClass": "org.apache.spark.examples.SparkPi",
      "jarFileUris": [
        "file:///usr/lib/spark/examples/jars/spark-examples.jar"
      ]
    }
  }
}

Untuk mengirim permintaan Anda, perluas salah satu opsi berikut:

Anda akan melihat respons JSON seperti berikut:

{
  "reference": {
    "projectId": "project-id",
    "jobId": "job-id"
  },
  "placement": {
    "clusterName": "cluster-name",
    "clusterUuid": "cluster-Uuid"
  },
  "sparkJob": {
    "mainClass": "org.apache.spark.examples.SparkPi",
    "args": [
      "1000"
    ],
    "jarFileUris": [
      "file:///usr/lib/spark/examples/jars/spark-examples.jar"
    ]
  },
  "status": {
    "state": "PENDING",
    "stateStartTime": "2020-10-07T20:16:21.759Z"
  },
  "jobUuid": "job-Uuid"
}

Java

  1. Menginstal library klien
  2. Menyiapkan kredensial default aplikasi
  3. Jalankan kode tersebut.
    
    import com.google.api.gax.longrunning.OperationFuture;
    import com.google.cloud.dataproc.v1.Job;
    import com.google.cloud.dataproc.v1.JobControllerClient;
    import com.google.cloud.dataproc.v1.JobControllerSettings;
    import com.google.cloud.dataproc.v1.JobMetadata;
    import com.google.cloud.dataproc.v1.JobPlacement;
    import com.google.cloud.dataproc.v1.SparkJob;
    import com.google.cloud.storage.Blob;
    import com.google.cloud.storage.Storage;
    import com.google.cloud.storage.StorageOptions;
    import java.io.IOException;
    import java.util.concurrent.ExecutionException;
    import java.util.regex.Matcher;
    import java.util.regex.Pattern;
    
    public class SubmitJob {
    
      public static void submitJob() throws IOException, InterruptedException {
        // TODO(developer): Replace these variables before running the sample.
        String projectId = "your-project-id";
        String region = "your-project-region";
        String clusterName = "your-cluster-name";
        submitJob(projectId, region, clusterName);
      }
    
      public static void submitJob(String projectId, String region, String clusterName)
          throws IOException, InterruptedException {
        String myEndpoint = String.format("%s-dataproc.googleapis.com:443", region);
    
        // Configure the settings for the job controller client.
        JobControllerSettings jobControllerSettings =
            JobControllerSettings.newBuilder().setEndpoint(myEndpoint).build();
    
        // Create a job controller client with the configured settings. Using a try-with-resources
        // closes the client,
        // but this can also be done manually with the .close() method.
        try (JobControllerClient jobControllerClient =
            JobControllerClient.create(jobControllerSettings)) {
    
          // Configure cluster placement for the job.
          JobPlacement jobPlacement = JobPlacement.newBuilder().setClusterName(clusterName).build();
    
          // Configure Spark job settings.
          SparkJob sparkJob =
              SparkJob.newBuilder()
                  .setMainClass("org.apache.spark.examples.SparkPi")
                  .addJarFileUris("file:///usr/lib/spark/examples/jars/spark-examples.jar")
                  .addArgs("1000")
                  .build();
    
          Job job = Job.newBuilder().setPlacement(jobPlacement).setSparkJob(sparkJob).build();
    
          // Submit an asynchronous request to execute the job.
          OperationFuture<Job, JobMetadata> submitJobAsOperationAsyncRequest =
              jobControllerClient.submitJobAsOperationAsync(projectId, region, job);
    
          Job response = submitJobAsOperationAsyncRequest.get();
    
          // Print output from Google Cloud Storage.
          Matcher matches =
              Pattern.compile("gs://(.*?)/(.*)").matcher(response.getDriverOutputResourceUri());
          matches.matches();
    
          Storage storage = StorageOptions.getDefaultInstance().getService();
          Blob blob = storage.get(matches.group(1), String.format("%s.000000000", matches.group(2)));
    
          System.out.println(
              String.format("Job finished successfully: %s", new String(blob.getContent())));
    
        } catch (ExecutionException e) {
          // If the job does not complete successfully, print the error message.
          System.err.println(String.format("submitJob: %s ", e.getMessage()));
        }
      }
    }

Python

  1. Menginstal library klien
  2. Menyiapkan kredensial default aplikasi
  3. Jalankan kode tersebut
    import re
    
    
    from google.cloud import dataproc_v1 as dataproc
    from google.cloud import storage
    
    
    def submit_job(project_id, region, cluster_name):
        # Create the job client.
        job_client = dataproc.JobControllerClient(
            client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
        )
    
        # Create the job config. 'main_jar_file_uri' can also be a
        # Google Cloud Storage URL.
        job = {
            "placement": {"cluster_name": cluster_name},
            "spark_job": {
                "main_class": "org.apache.spark.examples.SparkPi",
                "jar_file_uris": ["file:///usr/lib/spark/examples/jars/spark-examples.jar"],
                "args": ["1000"],
            },
        }
    
        operation = job_client.submit_job_as_operation(
            request={"project_id": project_id, "region": region, "job": job}
        )
        response = operation.result()
    
        # Dataproc job output gets saved to the Google Cloud Storage bucket
        # allocated to the job. Use a regex to obtain the bucket and blob info.
        matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri)
    
        output = (
            storage.Client()
            .get_bucket(matches.group(1))
            .blob(f"{matches.group(2)}.000000000")
            .download_as_bytes()
            .decode("utf-8")
        )
    
        print(f"Job finished successfully: {output}")
    
    

Go

  1. Menginstal library klien
  2. Menyiapkan kredensial default aplikasi
  3. Jalankan kode
    import (
    	"context"
    	"fmt"
    	"io"
    	"log"
    	"regexp"
    
    	dataproc "cloud.google.com/go/dataproc/apiv1"
    	"cloud.google.com/go/dataproc/apiv1/dataprocpb"
    	"cloud.google.com/go/storage"
    	"google.golang.org/api/option"
    )
    
    func submitJob(w io.Writer, projectID, region, clusterName string) error {
    	// projectID := "your-project-id"
    	// region := "us-central1"
    	// clusterName := "your-cluster"
    	ctx := context.Background()
    
    	// Create the job client.
    	endpoint := fmt.Sprintf("%s-dataproc.googleapis.com:443", region)
    	jobClient, err := dataproc.NewJobControllerClient(ctx, option.WithEndpoint(endpoint))
    	if err != nil {
    		log.Fatalf("error creating the job client: %s\n", err)
    	}
    
    	// Create the job config.
    	submitJobReq := &dataprocpb.SubmitJobRequest{
    		ProjectId: projectID,
    		Region:    region,
    		Job: &dataprocpb.Job{
    			Placement: &dataprocpb.JobPlacement{
    				ClusterName: clusterName,
    			},
    			TypeJob: &dataprocpb.Job_SparkJob{
    				SparkJob: &dataprocpb.SparkJob{
    					Driver: &dataprocpb.SparkJob_MainClass{
    						MainClass: "org.apache.spark.examples.SparkPi",
    					},
    					JarFileUris: []string{"file:///usr/lib/spark/examples/jars/spark-examples.jar"},
    					Args:        []string{"1000"},
    				},
    			},
    		},
    	}
    
    	submitJobOp, err := jobClient.SubmitJobAsOperation(ctx, submitJobReq)
    	if err != nil {
    		return fmt.Errorf("error with request to submitting job: %w", err)
    	}
    
    	submitJobResp, err := submitJobOp.Wait(ctx)
    	if err != nil {
    		return fmt.Errorf("error submitting job: %w", err)
    	}
    
    	re := regexp.MustCompile("gs://(.+?)/(.+)")
    	matches := re.FindStringSubmatch(submitJobResp.DriverOutputResourceUri)
    
    	if len(matches) < 3 {
    		return fmt.Errorf("regex error: %s", submitJobResp.DriverOutputResourceUri)
    	}
    
    	// Dataproc job output gets saved to a GCS bucket allocated to it.
    	storageClient, err := storage.NewClient(ctx)
    	if err != nil {
    		return fmt.Errorf("error creating storage client: %w", err)
    	}
    
    	obj := fmt.Sprintf("%s.000000000", matches[2])
    	reader, err := storageClient.Bucket(matches[1]).Object(obj).NewReader(ctx)
    	if err != nil {
    		return fmt.Errorf("error reading job output: %w", err)
    	}
    
    	defer reader.Close()
    
    	body, err := io.ReadAll(reader)
    	if err != nil {
    		return fmt.Errorf("could not read output from Dataproc Job: %w", err)
    	}
    
    	fmt.Fprintf(w, "Job finished successfully: %s", body)
    
    	return nil
    }
    

Node.js

  1. Menginstal library klien
  2. Menyiapkan kredensial default aplikasi
  3. Jalankan kode tersebut.
    const dataproc = require('@google-cloud/dataproc');
    const {Storage} = require('@google-cloud/storage');
    
    // TODO(developer): Uncomment and set the following variables
    // projectId = 'YOUR_PROJECT_ID'
    // region = 'YOUR_CLUSTER_REGION'
    // clusterName = 'YOUR_CLUSTER_NAME'
    
    // Create a client with the endpoint set to the desired cluster region
    const jobClient = new dataproc.v1.JobControllerClient({
      apiEndpoint: `${region}-dataproc.googleapis.com`,
      projectId: projectId,
    });
    
    async function submitJob() {
      const job = {
        projectId: projectId,
        region: region,
        job: {
          placement: {
            clusterName: clusterName,
          },
          sparkJob: {
            mainClass: 'org.apache.spark.examples.SparkPi',
            jarFileUris: [
              'file:///usr/lib/spark/examples/jars/spark-examples.jar',
            ],
            args: ['1000'],
          },
        },
      };
    
      const [jobOperation] = await jobClient.submitJobAsOperation(job);
      const [jobResponse] = await jobOperation.promise();
    
      const matches =
        jobResponse.driverOutputResourceUri.match('gs://(.*?)/(.*)');
    
      const storage = new Storage();
    
      const output = await storage
        .bucket(matches[1])
        .file(`${matches[2]}.000000000`)
        .download();
    
      // Output a success message.
      console.log(`Job finished successfully: ${output}`);

Mengirim tugas langsung di cluster

Jika Anda ingin menjalankan tugas langsung di cluster tanpa menggunakan layanan Dataproc, SSH ke node master cluster Anda, lalu jalankan tugas di node master.

Setelah membuat koneksi SSH ke instance master VM, jalankan perintah di jendela terminal pada node master cluster untuk:

  1. Buka shell Spark.
  2. Jalankan tugas Spark sederhana untuk menghitung jumlah baris dalam file "hello-world" Python (tujuh baris) yang terletak di file Cloud Storage yang dapat diakses publik.
  3. Keluar dari shell.

    user@cluster-name-m:~$ spark-shell
    ...
    scala> sc.textFile("gs://dataproc-examples"
    + "/pyspark/hello-world/hello-world.py").count
    ...
    res0: Long = 7
    scala> :quit
    

Menjalankan tugas bash di Dataproc

Anda mungkin ingin menjalankan skrip bash sebagai tugas Dataproc, baik karena mesin yang Anda gunakan tidak didukung sebagai jenis tugas Dataproc tingkat teratas atau karena Anda perlu melakukan penyiapan atau penghitungan argumen tambahan sebelum meluncurkan tugas menggunakan hadoop atau spark-submit dari skrip Anda.

Contoh Babi

Anggap Anda menyalin skrip bash hello.sh ke Cloud Storage:

gcloud storage cp hello.sh gs://${BUCKET}/hello.sh

Karena perintah pig fs menggunakan jalur Hadoop, salin skrip dari Cloud Storage ke tujuan yang ditentukan sebagai file:/// untuk memastikan skrip berada di sistem file lokal, bukan HDFS. Perintah sh berikutnya akan mereferensikan sistem file lokal secara otomatis dan tidak memerlukan awalan file:///.

gcloud dataproc jobs submit pig --cluster=${CLUSTER} --region=${REGION} \
    -e='fs -cp -f gs://${BUCKET}/hello.sh file:///tmp/hello.sh; sh chmod 750 /tmp/hello.sh; sh /tmp/hello.sh'

Atau, karena tugas Dataproc mengirimkan argumen --jars yang membuat file ke dalam direktori sementara yang dibuat selama tugas berjalan, Anda dapat menentukan skrip shell Cloud Storage sebagai argumen --jars:

gcloud dataproc jobs submit pig --cluster=${CLUSTER} --region=${REGION} \
    --jars=gs://${BUCKET}/hello.sh \
    -e='sh chmod 750 ${PWD}/hello.sh; sh ${PWD}/hello.sh'

Perhatikan bahwa argumen --jars juga dapat mereferensikan skrip lokal:

gcloud dataproc jobs submit pig --cluster=${CLUSTER} --region=${REGION} \
    --jars=hello.sh \
    -e='sh chmod 750 ${PWD}/hello.sh; sh ${PWD}/hello.sh'