Dataproc クラスタに Spark ジョブを送信します。
もっと見る
このコードサンプルを含む詳細なドキュメントについては、以下をご覧ください。
コードサンプル
Go
このサンプルを試す前に、Dataproc クイックスタート: クライアント ライブラリの使用にある Go の設定手順を行ってください。詳細については、Dataproc Go API のリファレンス ドキュメントをご覧ください。
import (
"context"
"fmt"
"io"
"io/ioutil"
"log"
"regexp"
dataproc "cloud.google.com/go/dataproc/apiv1"
"cloud.google.com/go/dataproc/apiv1/dataprocpb"
"cloud.google.com/go/storage"
"google.golang.org/api/option"
)
func submitJob(w io.Writer, projectID, region, clusterName string) error {
// projectID := "your-project-id"
// region := "us-central1"
// clusterName := "your-cluster"
ctx := context.Background()
// Create the job client.
endpoint := fmt.Sprintf("%s-dataproc.googleapis.com:443", region)
jobClient, err := dataproc.NewJobControllerClient(ctx, option.WithEndpoint(endpoint))
if err != nil {
log.Fatalf("error creating the job client: %s\n", err)
}
// Create the job config.
submitJobReq := &dataprocpb.SubmitJobRequest{
ProjectId: projectID,
Region: region,
Job: &dataprocpb.Job{
Placement: &dataprocpb.JobPlacement{
ClusterName: clusterName,
},
TypeJob: &dataprocpb.Job_SparkJob{
SparkJob: &dataprocpb.SparkJob{
Driver: &dataprocpb.SparkJob_MainClass{
MainClass: "org.apache.spark.examples.SparkPi",
},
JarFileUris: []string{"file:///usr/lib/spark/examples/jars/spark-examples.jar"},
Args: []string{"1000"},
},
},
},
}
submitJobOp, err := jobClient.SubmitJobAsOperation(ctx, submitJobReq)
if err != nil {
return fmt.Errorf("error with request to submitting job: %v", err)
}
submitJobResp, err := submitJobOp.Wait(ctx)
if err != nil {
return fmt.Errorf("error submitting job: %v", err)
}
re := regexp.MustCompile("gs://(.+?)/(.+)")
matches := re.FindStringSubmatch(submitJobResp.DriverOutputResourceUri)
if len(matches) < 3 {
return fmt.Errorf("regex error: %s", submitJobResp.DriverOutputResourceUri)
}
// Dataproc job output gets saved to a GCS bucket allocated to it.
storageClient, err := storage.NewClient(ctx)
if err != nil {
return fmt.Errorf("error creating storage client: %v", err)
}
obj := fmt.Sprintf("%s.000000000", matches[2])
reader, err := storageClient.Bucket(matches[1]).Object(obj).NewReader(ctx)
if err != nil {
return fmt.Errorf("error reading job output: %v", err)
}
defer reader.Close()
body, err := ioutil.ReadAll(reader)
if err != nil {
return fmt.Errorf("could not read output from Dataproc Job: %v", err)
}
fmt.Fprintf(w, "Job finished successfully: %s", body)
return nil
}
Java
このサンプルを試す前に、Dataproc クイックスタート: クライアント ライブラリの使用にある Java の設定手順を行ってください。詳細については、Dataproc Java API のリファレンス ドキュメントをご覧ください。
import com.google.api.gax.longrunning.OperationFuture;
import com.google.cloud.dataproc.v1.Job;
import com.google.cloud.dataproc.v1.JobControllerClient;
import com.google.cloud.dataproc.v1.JobControllerSettings;
import com.google.cloud.dataproc.v1.JobMetadata;
import com.google.cloud.dataproc.v1.JobPlacement;
import com.google.cloud.dataproc.v1.SparkJob;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class SubmitJob {
public static void submitJob() throws IOException, InterruptedException {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
String region = "your-project-region";
String clusterName = "your-cluster-name";
submitJob(projectId, region, clusterName);
}
public static void submitJob(String projectId, String region, String clusterName)
throws IOException, InterruptedException {
String myEndpoint = String.format("%s-dataproc.googleapis.com:443", region);
// Configure the settings for the job controller client.
JobControllerSettings jobControllerSettings =
JobControllerSettings.newBuilder().setEndpoint(myEndpoint).build();
// Create a job controller client with the configured settings. Using a try-with-resources
// closes the client,
// but this can also be done manually with the .close() method.
try (JobControllerClient jobControllerClient =
JobControllerClient.create(jobControllerSettings)) {
// Configure cluster placement for the job.
JobPlacement jobPlacement = JobPlacement.newBuilder().setClusterName(clusterName).build();
// Configure Spark job settings.
SparkJob sparkJob =
SparkJob.newBuilder()
.setMainClass("org.apache.spark.examples.SparkPi")
.addJarFileUris("file:///usr/lib/spark/examples/jars/spark-examples.jar")
.addArgs("1000")
.build();
Job job = Job.newBuilder().setPlacement(jobPlacement).setSparkJob(sparkJob).build();
// Submit an asynchronous request to execute the job.
OperationFuture<Job, JobMetadata> submitJobAsOperationAsyncRequest =
jobControllerClient.submitJobAsOperationAsync(projectId, region, job);
Job response = submitJobAsOperationAsyncRequest.get();
// Print output from Google Cloud Storage.
Matcher matches =
Pattern.compile("gs://(.*?)/(.*)").matcher(response.getDriverOutputResourceUri());
matches.matches();
Storage storage = StorageOptions.getDefaultInstance().getService();
Blob blob = storage.get(matches.group(1), String.format("%s.000000000", matches.group(2)));
System.out.println(
String.format("Job finished successfully: %s", new String(blob.getContent())));
} catch (ExecutionException e) {
// If the job does not complete successfully, print the error message.
System.err.println(String.format("submitJob: %s ", e.getMessage()));
}
}
}
Node.js
このサンプルを試す前に、Dataproc クイックスタート: クライアント ライブラリの使用にある Node.js の設定手順を行ってください。詳細については、Dataproc Node.js API のリファレンス ドキュメントをご覧ください。
const dataproc = require('@google-cloud/dataproc');
const {Storage} = require('@google-cloud/storage');
// TODO(developer): Uncomment and set the following variables
// projectId = 'YOUR_PROJECT_ID'
// region = 'YOUR_CLUSTER_REGION'
// clusterName = 'YOUR_CLUSTER_NAME'
// Create a client with the endpoint set to the desired cluster region
const jobClient = new dataproc.v1.JobControllerClient({
apiEndpoint: `${region}-dataproc.googleapis.com`,
projectId: projectId,
});
async function submitJob() {
const job = {
projectId: projectId,
region: region,
job: {
placement: {
clusterName: clusterName,
},
sparkJob: {
mainClass: 'org.apache.spark.examples.SparkPi',
jarFileUris: [
'file:///usr/lib/spark/examples/jars/spark-examples.jar',
],
args: ['1000'],
},
},
};
const [jobOperation] = await jobClient.submitJobAsOperation(job);
const [jobResponse] = await jobOperation.promise();
const matches =
jobResponse.driverOutputResourceUri.match('gs://(.*?)/(.*)');
const storage = new Storage();
const output = await storage
.bucket(matches[1])
.file(`${matches[2]}.000000000`)
.download();
// Output a success message.
console.log(`Job finished successfully: ${output}`);
Python
このサンプルを試す前に、Dataproc クイックスタート: クライアント ライブラリの使用にある Python の設定手順を行ってください。詳細については、Dataproc Python API のリファレンス ドキュメントをご覧ください。
import re
from google.cloud import dataproc_v1 as dataproc
from google.cloud import storage
def submit_job(project_id, region, cluster_name):
# Create the job client.
job_client = dataproc.JobControllerClient(
client_options={"api_endpoint": "{}-dataproc.googleapis.com:443".format(region)}
)
# Create the job config. 'main_jar_file_uri' can also be a
# Google Cloud Storage URL.
job = {
"placement": {"cluster_name": cluster_name},
"spark_job": {
"main_class": "org.apache.spark.examples.SparkPi",
"jar_file_uris": ["file:///usr/lib/spark/examples/jars/spark-examples.jar"],
"args": ["1000"],
},
}
operation = job_client.submit_job_as_operation(
request={"project_id": project_id, "region": region, "job": job}
)
response = operation.result()
# Dataproc job output gets saved to the Google Cloud Storage bucket
# allocated to the job. Use a regex to obtain the bucket and blob info.
matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri)
output = (
storage.Client()
.get_bucket(matches.group(1))
.blob(f"{matches.group(2)}.000000000")
.download_as_string()
)
print(f"Job finished successfully: {output}")
次のステップ
他の Google Cloud プロダクトに関連するコードサンプルの検索およびフィルタ検索を行うには、Google Cloud のサンプルをご覧ください。