ジョブの送信

コレクションでコンテンツを整理 必要に応じて、コンテンツの保存と分類を行います。

Dataproc クラスタに Spark ジョブを送信します。

もっと見る

このコードサンプルを含む詳細なドキュメントについては、以下をご覧ください。

コードサンプル

Go

このサンプルを試す前に、Dataproc クイックスタート: クライアント ライブラリの使用にある Go の設定手順を行ってください。詳細については、Dataproc Go API のリファレンス ドキュメントをご覧ください。

import (
	"context"
	"fmt"
	"io"
	"io/ioutil"
	"log"
	"regexp"

	dataproc "cloud.google.com/go/dataproc/apiv1"
	"cloud.google.com/go/dataproc/apiv1/dataprocpb"
	"cloud.google.com/go/storage"
	"google.golang.org/api/option"
)

func submitJob(w io.Writer, projectID, region, clusterName string) error {
	// projectID := "your-project-id"
	// region := "us-central1"
	// clusterName := "your-cluster"
	ctx := context.Background()

	// Create the job client.
	endpoint := fmt.Sprintf("%s-dataproc.googleapis.com:443", region)
	jobClient, err := dataproc.NewJobControllerClient(ctx, option.WithEndpoint(endpoint))
	if err != nil {
		log.Fatalf("error creating the job client: %s\n", err)
	}

	// Create the job config.
	submitJobReq := &dataprocpb.SubmitJobRequest{
		ProjectId: projectID,
		Region:    region,
		Job: &dataprocpb.Job{
			Placement: &dataprocpb.JobPlacement{
				ClusterName: clusterName,
			},
			TypeJob: &dataprocpb.Job_SparkJob{
				SparkJob: &dataprocpb.SparkJob{
					Driver: &dataprocpb.SparkJob_MainClass{
						MainClass: "org.apache.spark.examples.SparkPi",
					},
					JarFileUris: []string{"file:///usr/lib/spark/examples/jars/spark-examples.jar"},
					Args:        []string{"1000"},
				},
			},
		},
	}

	submitJobOp, err := jobClient.SubmitJobAsOperation(ctx, submitJobReq)
	if err != nil {
		return fmt.Errorf("error with request to submitting job: %v", err)
	}

	submitJobResp, err := submitJobOp.Wait(ctx)
	if err != nil {
		return fmt.Errorf("error submitting job: %v", err)
	}

	re := regexp.MustCompile("gs://(.+?)/(.+)")
	matches := re.FindStringSubmatch(submitJobResp.DriverOutputResourceUri)

	if len(matches) < 3 {
		return fmt.Errorf("regex error: %s", submitJobResp.DriverOutputResourceUri)
	}

	// Dataproc job output gets saved to a GCS bucket allocated to it.
	storageClient, err := storage.NewClient(ctx)
	if err != nil {
		return fmt.Errorf("error creating storage client: %v", err)
	}

	obj := fmt.Sprintf("%s.000000000", matches[2])
	reader, err := storageClient.Bucket(matches[1]).Object(obj).NewReader(ctx)
	if err != nil {
		return fmt.Errorf("error reading job output: %v", err)
	}

	defer reader.Close()

	body, err := ioutil.ReadAll(reader)
	if err != nil {
		return fmt.Errorf("could not read output from Dataproc Job: %v", err)
	}

	fmt.Fprintf(w, "Job finished successfully: %s", body)

	return nil
}

Java

このサンプルを試す前に、Dataproc クイックスタート: クライアント ライブラリの使用にある Java の設定手順を行ってください。詳細については、Dataproc Java API のリファレンス ドキュメントをご覧ください。


import com.google.api.gax.longrunning.OperationFuture;
import com.google.cloud.dataproc.v1.Job;
import com.google.cloud.dataproc.v1.JobControllerClient;
import com.google.cloud.dataproc.v1.JobControllerSettings;
import com.google.cloud.dataproc.v1.JobMetadata;
import com.google.cloud.dataproc.v1.JobPlacement;
import com.google.cloud.dataproc.v1.SparkJob;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class SubmitJob {

  public static void submitJob() throws IOException, InterruptedException {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String region = "your-project-region";
    String clusterName = "your-cluster-name";
    submitJob(projectId, region, clusterName);
  }

  public static void submitJob(String projectId, String region, String clusterName)
      throws IOException, InterruptedException {
    String myEndpoint = String.format("%s-dataproc.googleapis.com:443", region);

    // Configure the settings for the job controller client.
    JobControllerSettings jobControllerSettings =
        JobControllerSettings.newBuilder().setEndpoint(myEndpoint).build();

    // Create a job controller client with the configured settings. Using a try-with-resources
    // closes the client,
    // but this can also be done manually with the .close() method.
    try (JobControllerClient jobControllerClient =
        JobControllerClient.create(jobControllerSettings)) {

      // Configure cluster placement for the job.
      JobPlacement jobPlacement = JobPlacement.newBuilder().setClusterName(clusterName).build();

      // Configure Spark job settings.
      SparkJob sparkJob =
          SparkJob.newBuilder()
              .setMainClass("org.apache.spark.examples.SparkPi")
              .addJarFileUris("file:///usr/lib/spark/examples/jars/spark-examples.jar")
              .addArgs("1000")
              .build();

      Job job = Job.newBuilder().setPlacement(jobPlacement).setSparkJob(sparkJob).build();

      // Submit an asynchronous request to execute the job.
      OperationFuture<Job, JobMetadata> submitJobAsOperationAsyncRequest =
          jobControllerClient.submitJobAsOperationAsync(projectId, region, job);

      Job response = submitJobAsOperationAsyncRequest.get();

      // Print output from Google Cloud Storage.
      Matcher matches =
          Pattern.compile("gs://(.*?)/(.*)").matcher(response.getDriverOutputResourceUri());
      matches.matches();

      Storage storage = StorageOptions.getDefaultInstance().getService();
      Blob blob = storage.get(matches.group(1), String.format("%s.000000000", matches.group(2)));

      System.out.println(
          String.format("Job finished successfully: %s", new String(blob.getContent())));

    } catch (ExecutionException e) {
      // If the job does not complete successfully, print the error message.
      System.err.println(String.format("submitJob: %s ", e.getMessage()));
    }
  }
}

Node.js

このサンプルを試す前に、Dataproc クイックスタート: クライアント ライブラリの使用にある Node.js の設定手順を行ってください。詳細については、Dataproc Node.js API のリファレンス ドキュメントをご覧ください。

const dataproc = require('@google-cloud/dataproc');
const {Storage} = require('@google-cloud/storage');

// TODO(developer): Uncomment and set the following variables
// projectId = 'YOUR_PROJECT_ID'
// region = 'YOUR_CLUSTER_REGION'
// clusterName = 'YOUR_CLUSTER_NAME'

// Create a client with the endpoint set to the desired cluster region
const jobClient = new dataproc.v1.JobControllerClient({
  apiEndpoint: `${region}-dataproc.googleapis.com`,
  projectId: projectId,
});

async function submitJob() {
  const job = {
    projectId: projectId,
    region: region,
    job: {
      placement: {
        clusterName: clusterName,
      },
      sparkJob: {
        mainClass: 'org.apache.spark.examples.SparkPi',
        jarFileUris: [
          'file:///usr/lib/spark/examples/jars/spark-examples.jar',
        ],
        args: ['1000'],
      },
    },
  };

  const [jobOperation] = await jobClient.submitJobAsOperation(job);
  const [jobResponse] = await jobOperation.promise();

  const matches =
    jobResponse.driverOutputResourceUri.match('gs://(.*?)/(.*)');

  const storage = new Storage();

  const output = await storage
    .bucket(matches[1])
    .file(`${matches[2]}.000000000`)
    .download();

  // Output a success message.
  console.log(`Job finished successfully: ${output}`);

Python

このサンプルを試す前に、Dataproc クイックスタート: クライアント ライブラリの使用にある Python の設定手順を行ってください。詳細については、Dataproc Python API のリファレンス ドキュメントをご覧ください。

import re

from google.cloud import dataproc_v1 as dataproc
from google.cloud import storage

def submit_job(project_id, region, cluster_name):
    # Create the job client.
    job_client = dataproc.JobControllerClient(
        client_options={"api_endpoint": "{}-dataproc.googleapis.com:443".format(region)}
    )

    # Create the job config. 'main_jar_file_uri' can also be a
    # Google Cloud Storage URL.
    job = {
        "placement": {"cluster_name": cluster_name},
        "spark_job": {
            "main_class": "org.apache.spark.examples.SparkPi",
            "jar_file_uris": ["file:///usr/lib/spark/examples/jars/spark-examples.jar"],
            "args": ["1000"],
        },
    }

    operation = job_client.submit_job_as_operation(
        request={"project_id": project_id, "region": region, "job": job}
    )
    response = operation.result()

    # Dataproc job output gets saved to the Google Cloud Storage bucket
    # allocated to the job. Use a regex to obtain the bucket and blob info.
    matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri)

    output = (
        storage.Client()
        .get_bucket(matches.group(1))
        .blob(f"{matches.group(2)}.000000000")
        .download_as_string()
    )

    print(f"Job finished successfully: {output}")

次のステップ

他の Google Cloud プロダクトに関連するコードサンプルの検索およびフィルタ検索を行うには、Google Cloud のサンプルをご覧ください。