将 Spark 作业提交到 Dataproc 集群。
深入探索
如需查看包含此代码示例的详细文档,请参阅以下内容:
代码示例
Go
在试用此示例之前,请按照使用客户端库的 Dataproc 快速入门中的 Go 设置说明进行操作。如需了解详情,请参阅 Dataproc Go API 参考文档。
如需向 Dataproc 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证。
import (
"context"
"fmt"
"io"
"io/ioutil"
"log"
"regexp"
dataproc "cloud.google.com/go/dataproc/apiv1"
"cloud.google.com/go/dataproc/apiv1/dataprocpb"
"cloud.google.com/go/storage"
"google.golang.org/api/option"
)
func submitJob(w io.Writer, projectID, region, clusterName string) error {
// projectID := "your-project-id"
// region := "us-central1"
// clusterName := "your-cluster"
ctx := context.Background()
// Create the job client.
endpoint := fmt.Sprintf("%s-dataproc.googleapis.com:443", region)
jobClient, err := dataproc.NewJobControllerClient(ctx, option.WithEndpoint(endpoint))
if err != nil {
log.Fatalf("error creating the job client: %s\n", err)
}
// Create the job config.
submitJobReq := &dataprocpb.SubmitJobRequest{
ProjectId: projectID,
Region: region,
Job: &dataprocpb.Job{
Placement: &dataprocpb.JobPlacement{
ClusterName: clusterName,
},
TypeJob: &dataprocpb.Job_SparkJob{
SparkJob: &dataprocpb.SparkJob{
Driver: &dataprocpb.SparkJob_MainClass{
MainClass: "org.apache.spark.examples.SparkPi",
},
JarFileUris: []string{"file:///usr/lib/spark/examples/jars/spark-examples.jar"},
Args: []string{"1000"},
},
},
},
}
submitJobOp, err := jobClient.SubmitJobAsOperation(ctx, submitJobReq)
if err != nil {
return fmt.Errorf("error with request to submitting job: %w", err)
}
submitJobResp, err := submitJobOp.Wait(ctx)
if err != nil {
return fmt.Errorf("error submitting job: %w", err)
}
re := regexp.MustCompile("gs://(.+?)/(.+)")
matches := re.FindStringSubmatch(submitJobResp.DriverOutputResourceUri)
if len(matches) < 3 {
return fmt.Errorf("regex error: %s", submitJobResp.DriverOutputResourceUri)
}
// Dataproc job output gets saved to a GCS bucket allocated to it.
storageClient, err := storage.NewClient(ctx)
if err != nil {
return fmt.Errorf("error creating storage client: %w", err)
}
obj := fmt.Sprintf("%s.000000000", matches[2])
reader, err := storageClient.Bucket(matches[1]).Object(obj).NewReader(ctx)
if err != nil {
return fmt.Errorf("error reading job output: %w", err)
}
defer reader.Close()
body, err := ioutil.ReadAll(reader)
if err != nil {
return fmt.Errorf("could not read output from Dataproc Job: %w", err)
}
fmt.Fprintf(w, "Job finished successfully: %s", body)
return nil
}
Java
在试用此示例之前,请按照使用客户端库的 Dataproc 快速入门中的 Java 设置说明进行操作。如需了解详情,请参阅 Dataproc Java API 参考文档。
如需向 Dataproc 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证。
import com.google.api.gax.longrunning.OperationFuture;
import com.google.cloud.dataproc.v1.Job;
import com.google.cloud.dataproc.v1.JobControllerClient;
import com.google.cloud.dataproc.v1.JobControllerSettings;
import com.google.cloud.dataproc.v1.JobMetadata;
import com.google.cloud.dataproc.v1.JobPlacement;
import com.google.cloud.dataproc.v1.SparkJob;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class SubmitJob {
public static void submitJob() throws IOException, InterruptedException {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
String region = "your-project-region";
String clusterName = "your-cluster-name";
submitJob(projectId, region, clusterName);
}
public static void submitJob(String projectId, String region, String clusterName)
throws IOException, InterruptedException {
String myEndpoint = String.format("%s-dataproc.googleapis.com:443", region);
// Configure the settings for the job controller client.
JobControllerSettings jobControllerSettings =
JobControllerSettings.newBuilder().setEndpoint(myEndpoint).build();
// Create a job controller client with the configured settings. Using a try-with-resources
// closes the client,
// but this can also be done manually with the .close() method.
try (JobControllerClient jobControllerClient =
JobControllerClient.create(jobControllerSettings)) {
// Configure cluster placement for the job.
JobPlacement jobPlacement = JobPlacement.newBuilder().setClusterName(clusterName).build();
// Configure Spark job settings.
SparkJob sparkJob =
SparkJob.newBuilder()
.setMainClass("org.apache.spark.examples.SparkPi")
.addJarFileUris("file:///usr/lib/spark/examples/jars/spark-examples.jar")
.addArgs("1000")
.build();
Job job = Job.newBuilder().setPlacement(jobPlacement).setSparkJob(sparkJob).build();
// Submit an asynchronous request to execute the job.
OperationFuture<Job, JobMetadata> submitJobAsOperationAsyncRequest =
jobControllerClient.submitJobAsOperationAsync(projectId, region, job);
Job response = submitJobAsOperationAsyncRequest.get();
// Print output from Google Cloud Storage.
Matcher matches =
Pattern.compile("gs://(.*?)/(.*)").matcher(response.getDriverOutputResourceUri());
matches.matches();
Storage storage = StorageOptions.getDefaultInstance().getService();
Blob blob = storage.get(matches.group(1), String.format("%s.000000000", matches.group(2)));
System.out.println(
String.format("Job finished successfully: %s", new String(blob.getContent())));
} catch (ExecutionException e) {
// If the job does not complete successfully, print the error message.
System.err.println(String.format("submitJob: %s ", e.getMessage()));
}
}
}
Node.js
在试用此示例之前,请按照使用客户端库的 Dataproc 快速入门中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Dataproc Node.js API 参考文档。
如需向 Dataproc 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证。
const dataproc = require('@google-cloud/dataproc');
const {Storage} = require('@google-cloud/storage');
// TODO(developer): Uncomment and set the following variables
// projectId = 'YOUR_PROJECT_ID'
// region = 'YOUR_CLUSTER_REGION'
// clusterName = 'YOUR_CLUSTER_NAME'
// Create a client with the endpoint set to the desired cluster region
const jobClient = new dataproc.v1.JobControllerClient({
apiEndpoint: `${region}-dataproc.googleapis.com`,
projectId: projectId,
});
async function submitJob() {
const job = {
projectId: projectId,
region: region,
job: {
placement: {
clusterName: clusterName,
},
sparkJob: {
mainClass: 'org.apache.spark.examples.SparkPi',
jarFileUris: [
'file:///usr/lib/spark/examples/jars/spark-examples.jar',
],
args: ['1000'],
},
},
};
const [jobOperation] = await jobClient.submitJobAsOperation(job);
const [jobResponse] = await jobOperation.promise();
const matches =
jobResponse.driverOutputResourceUri.match('gs://(.*?)/(.*)');
const storage = new Storage();
const output = await storage
.bucket(matches[1])
.file(`${matches[2]}.000000000`)
.download();
// Output a success message.
console.log(`Job finished successfully: ${output}`);
Python
在试用此示例之前,请按照使用客户端库的 Dataproc 快速入门中的 Python 设置说明进行操作。如需了解详情,请参阅 Dataproc Python API 参考文档。
如需向 Dataproc 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证。
# Create the job client.
job_client = dataproc_v1.JobControllerClient(
client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
)
# Create the job config.
job = {
"placement": {"cluster_name": cluster_name},
"pyspark_job": {"main_python_file_uri": f"gs://{gcs_bucket}/{spark_filename}"},
}
operation = job_client.submit_job_as_operation(
request={"project_id": project_id, "region": region, "job": job}
)
response = operation.result()
# Dataproc job output is saved to the Cloud Storage bucket
# allocated to the job. Use regex to obtain the bucket and blob info.
matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri)
output = (
storage.Client()
.get_bucket(matches.group(1))
.blob(f"{matches.group(2)}.000000000")
.download_as_bytes()
.decode("utf-8")
)
print(f"Job finished successfully: {output}\r\n")
后续步骤
如需搜索和过滤其他 Google Cloud 产品的代码示例,请参阅 Google Cloud 示例浏览器。