端到端的工作流。
深入探索
如需查看包含此代码示例的详细文档,请参阅以下内容:
代码示例
Go
在尝试此示例之前,请按照Go Dataproc 快速入门: 客户端库。 如需了解详情,请参阅 Dataproc Go API 参考文档。
要向 Dataproc 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证。
// This quickstart shows how you can use the Dataproc Client library to create a
// Dataproc cluster, submit a PySpark job to the cluster, wait for the job to finish
// and finally delete the cluster.
//
// Usage:
//
// go build
// ./quickstart --project_id <PROJECT_ID> --region <REGION> \
// --cluster_name <CLUSTER_NAME> --job_file_path <GCS_JOB_FILE_PATH>
package main
import (
"context"
"flag"
"fmt"
"io"
"log"
"regexp"
dataproc "cloud.google.com/go/dataproc/apiv1"
"cloud.google.com/go/dataproc/apiv1/dataprocpb"
"cloud.google.com/go/storage"
"google.golang.org/api/option"
)
func main() {
var projectID, clusterName, region, jobFilePath string
flag.StringVar(&projectID, "project_id", "", "Cloud Project ID, used for creating resources.")
flag.StringVar(®ion, "region", "", "Region that resources should be created in.")
flag.StringVar(&clusterName, "cluster_name", "", "Name of Cloud Dataproc cluster to create.")
flag.StringVar(&jobFilePath, "job_file_path", "", "Path to job file in GCS.")
flag.Parse()
ctx := context.Background()
// Create the cluster client.
endpoint := fmt.Sprintf("%s-dataproc.googleapis.com:443", region)
clusterClient, err := dataproc.NewClusterControllerClient(ctx, option.WithEndpoint(endpoint))
if err != nil {
log.Fatalf("error creating the cluster client: %s\n", err)
}
// Create the cluster config.
createReq := &dataprocpb.CreateClusterRequest{
ProjectId: projectID,
Region: region,
Cluster: &dataprocpb.Cluster{
ProjectId: projectID,
ClusterName: clusterName,
Config: &dataprocpb.ClusterConfig{
MasterConfig: &dataprocpb.InstanceGroupConfig{
NumInstances: 1,
MachineTypeUri: "n1-standard-2",
},
WorkerConfig: &dataprocpb.InstanceGroupConfig{
NumInstances: 2,
MachineTypeUri: "n1-standard-2",
},
},
},
}
// Create the cluster.
createOp, err := clusterClient.CreateCluster(ctx, createReq)
if err != nil {
log.Fatalf("error submitting the cluster creation request: %v\n", err)
}
createResp, err := createOp.Wait(ctx)
if err != nil {
log.Fatalf("error creating the cluster: %v\n", err)
}
// Defer cluster deletion.
defer func() {
dReq := &dataprocpb.DeleteClusterRequest{
ProjectId: projectID,
Region: region,
ClusterName: clusterName,
}
deleteOp, err := clusterClient.DeleteCluster(ctx, dReq)
deleteOp.Wait(ctx)
if err != nil {
fmt.Printf("error deleting cluster %q: %v\n", clusterName, err)
return
}
fmt.Printf("Cluster %q successfully deleted\n", clusterName)
}()
// Output a success message.
fmt.Printf("Cluster created successfully: %q\n", createResp.ClusterName)
// Create the job client.
jobClient, err := dataproc.NewJobControllerClient(ctx, option.WithEndpoint(endpoint))
// Create the job config.
submitJobReq := &dataprocpb.SubmitJobRequest{
ProjectId: projectID,
Region: region,
Job: &dataprocpb.Job{
Placement: &dataprocpb.JobPlacement{
ClusterName: clusterName,
},
TypeJob: &dataprocpb.Job_PysparkJob{
PysparkJob: &dataprocpb.PySparkJob{
MainPythonFileUri: jobFilePath,
},
},
},
}
submitJobOp, err := jobClient.SubmitJobAsOperation(ctx, submitJobReq)
if err != nil {
fmt.Printf("error with request to submitting job: %v\n", err)
return
}
submitJobResp, err := submitJobOp.Wait(ctx)
if err != nil {
fmt.Printf("error submitting job: %v\n", err)
return
}
re := regexp.MustCompile("gs://(.+?)/(.+)")
matches := re.FindStringSubmatch(submitJobResp.DriverOutputResourceUri)
if len(matches) < 3 {
fmt.Printf("regex error: %s\n", submitJobResp.DriverOutputResourceUri)
return
}
// Dataproc job outget gets saved to a GCS bucket allocated to it.
storageClient, err := storage.NewClient(ctx)
if err != nil {
fmt.Printf("error creating storage client: %v\n", err)
return
}
obj := fmt.Sprintf("%s.000000000", matches[2])
reader, err := storageClient.Bucket(matches[1]).Object(obj).NewReader(ctx)
if err != nil {
fmt.Printf("error reading job output: %v\n", err)
return
}
defer reader.Close()
body, err := io.ReadAll(reader)
if err != nil {
fmt.Printf("could not read output from Dataproc Job: %v\n", err)
return
}
fmt.Printf("Job finished successfully: %s", body)
}
Java
在尝试此示例之前,请按照Java Dataproc 快速入门: 客户端库。 有关详情,请参阅 Dataproc Java API 参考文档。
如需向 Dataproc 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证。
/* This quickstart sample walks a user through creating a Cloud Dataproc
* cluster, submitting a PySpark job from Google Cloud Storage to the
* cluster, reading the output of the job and deleting the cluster, all
* using the Java client library.
*
* Usage:
* mvn clean package -DskipTests
*
* mvn exec:java -Dexec.args="<PROJECT_ID> <REGION> <CLUSTER_NAME> <GCS_JOB_FILE_PATH>"
*
* You can also set these arguments in the main function instead of providing them via the CLI.
*/
import com.google.api.gax.longrunning.OperationFuture;
import com.google.cloud.dataproc.v1.Cluster;
import com.google.cloud.dataproc.v1.ClusterConfig;
import com.google.cloud.dataproc.v1.ClusterControllerClient;
import com.google.cloud.dataproc.v1.ClusterControllerSettings;
import com.google.cloud.dataproc.v1.ClusterOperationMetadata;
import com.google.cloud.dataproc.v1.InstanceGroupConfig;
import com.google.cloud.dataproc.v1.Job;
import com.google.cloud.dataproc.v1.JobControllerClient;
import com.google.cloud.dataproc.v1.JobControllerSettings;
import com.google.cloud.dataproc.v1.JobMetadata;
import com.google.cloud.dataproc.v1.JobPlacement;
import com.google.cloud.dataproc.v1.PySparkJob;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import com.google.protobuf.Empty;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class Quickstart {
public static void quickstart(
String projectId, String region, String clusterName, String jobFilePath)
throws IOException, InterruptedException {
String myEndpoint = String.format("%s-dataproc.googleapis.com:443", region);
// Configure the settings for the cluster controller client.
ClusterControllerSettings clusterControllerSettings =
ClusterControllerSettings.newBuilder().setEndpoint(myEndpoint).build();
// Configure the settings for the job controller client.
JobControllerSettings jobControllerSettings =
JobControllerSettings.newBuilder().setEndpoint(myEndpoint).build();
// Create both a cluster controller client and job controller client with the
// configured settings. The client only needs to be created once and can be reused for
// multiple requests. Using a try-with-resources closes the client, but this can also be done
// manually with the .close() method.
try (ClusterControllerClient clusterControllerClient =
ClusterControllerClient.create(clusterControllerSettings);
JobControllerClient jobControllerClient =
JobControllerClient.create(jobControllerSettings)) {
// Configure the settings for our cluster.
InstanceGroupConfig masterConfig =
InstanceGroupConfig.newBuilder()
.setMachineTypeUri("n1-standard-2")
.setNumInstances(1)
.build();
InstanceGroupConfig workerConfig =
InstanceGroupConfig.newBuilder()
.setMachineTypeUri("n1-standard-2")
.setNumInstances(2)
.build();
ClusterConfig clusterConfig =
ClusterConfig.newBuilder()
.setMasterConfig(masterConfig)
.setWorkerConfig(workerConfig)
.build();
// Create the cluster object with the desired cluster config.
Cluster cluster =
Cluster.newBuilder().setClusterName(clusterName).setConfig(clusterConfig).build();
// Create the Cloud Dataproc cluster.
OperationFuture<Cluster, ClusterOperationMetadata> createClusterAsyncRequest =
clusterControllerClient.createClusterAsync(projectId, region, cluster);
Cluster clusterResponse = createClusterAsyncRequest.get();
System.out.println(
String.format("Cluster created successfully: %s", clusterResponse.getClusterName()));
// Configure the settings for our job.
JobPlacement jobPlacement = JobPlacement.newBuilder().setClusterName(clusterName).build();
PySparkJob pySparkJob = PySparkJob.newBuilder().setMainPythonFileUri(jobFilePath).build();
Job job = Job.newBuilder().setPlacement(jobPlacement).setPysparkJob(pySparkJob).build();
// Submit an asynchronous request to execute the job.
OperationFuture<Job, JobMetadata> submitJobAsOperationAsyncRequest =
jobControllerClient.submitJobAsOperationAsync(projectId, region, job);
Job jobResponse = submitJobAsOperationAsyncRequest.get();
// Print output from Google Cloud Storage.
Matcher matches =
Pattern.compile("gs://(.*?)/(.*)").matcher(jobResponse.getDriverOutputResourceUri());
matches.matches();
Storage storage = StorageOptions.getDefaultInstance().getService();
Blob blob = storage.get(matches.group(1), String.format("%s.000000000", matches.group(2)));
System.out.println(
String.format("Job finished successfully: %s", new String(blob.getContent())));
// Delete the cluster.
OperationFuture<Empty, ClusterOperationMetadata> deleteClusterAsyncRequest =
clusterControllerClient.deleteClusterAsync(projectId, region, clusterName);
deleteClusterAsyncRequest.get();
System.out.println(String.format("Cluster \"%s\" successfully deleted.", clusterName));
} catch (ExecutionException e) {
System.err.println(String.format("quickstart: %s ", e.getMessage()));
}
}
public static void main(String... args) throws IOException, InterruptedException {
if (args.length != 4) {
System.err.println(
"Insufficient number of parameters provided. Please make sure a "
+ "PROJECT_ID, REGION, CLUSTER_NAME and JOB_FILE_PATH are provided, in this order.");
return;
}
String projectId = args[0]; // project-id of project to create the cluster in
String region = args[1]; // region to create the cluster
String clusterName = args[2]; // name of the cluster
String jobFilePath = args[3]; // location in GCS of the PySpark job
quickstart(projectId, region, clusterName, jobFilePath);
}
}
Node.js
在尝试此示例之前,请按照《Dataproc 快速入门:使用客户端库》中的 Node.js 设置说明进行操作。 有关详情,请参阅 Dataproc Node.js API 参考文档。
如需向 Dataproc 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证。
// This quickstart sample walks a user through creating a Dataproc
// cluster, submitting a PySpark job from Google Cloud Storage to the
// cluster, reading the output of the job and deleting the cluster, all
// using the Node.js client library.
'use strict';
function main(projectId, region, clusterName, jobFilePath) {
const dataproc = require('@google-cloud/dataproc');
const {Storage} = require('@google-cloud/storage');
// Create a cluster client with the endpoint set to the desired cluster region
const clusterClient = new dataproc.v1.ClusterControllerClient({
apiEndpoint: `${region}-dataproc.googleapis.com`,
projectId: projectId,
});
// Create a job client with the endpoint set to the desired cluster region
const jobClient = new dataproc.v1.JobControllerClient({
apiEndpoint: `${region}-dataproc.googleapis.com`,
projectId: projectId,
});
async function quickstart() {
// Create the cluster config
const cluster = {
projectId: projectId,
region: region,
cluster: {
clusterName: clusterName,
config: {
masterConfig: {
numInstances: 1,
machineTypeUri: 'n1-standard-2',
},
workerConfig: {
numInstances: 2,
machineTypeUri: 'n1-standard-2',
},
},
},
};
// Create the cluster
const [operation] = await clusterClient.createCluster(cluster);
const [response] = await operation.promise();
// Output a success message
console.log(`Cluster created successfully: ${response.clusterName}`);
const job = {
projectId: projectId,
region: region,
job: {
placement: {
clusterName: clusterName,
},
pysparkJob: {
mainPythonFileUri: jobFilePath,
},
},
};
const [jobOperation] = await jobClient.submitJobAsOperation(job);
const [jobResponse] = await jobOperation.promise();
const matches =
jobResponse.driverOutputResourceUri.match('gs://(.*?)/(.*)');
const storage = new Storage();
const output = await storage
.bucket(matches[1])
.file(`${matches[2]}.000000000`)
.download();
// Output a success message.
console.log(`Job finished successfully: ${output}`);
// Delete the cluster once the job has terminated.
const deleteClusterReq = {
projectId: projectId,
region: region,
clusterName: clusterName,
};
const [deleteOperation] =
await clusterClient.deleteCluster(deleteClusterReq);
await deleteOperation.promise();
// Output a success message
console.log(`Cluster ${clusterName} successfully deleted.`);
}
quickstart();
}
const args = process.argv.slice(2);
if (args.length !== 4) {
console.log(
'Insufficient number of parameters provided. Please make sure a ' +
'PROJECT_ID, REGION, CLUSTER_NAME and JOB_FILE_PATH are provided, in this order.'
);
}
main(...args);
Python
在尝试此示例之前,请按照Python Dataproc 快速入门: 客户端库。 如需了解详情,请参阅 Dataproc Python API 参考文档。
要向 Dataproc 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证。
"""
This quickstart sample walks a user through creating a Cloud Dataproc
cluster, submitting a PySpark job from Google Cloud Storage to the
cluster, reading the output of the job and deleting the cluster, all
using the Python client library.
Usage:
python quickstart.py --project_id <PROJECT_ID> --region <REGION> \
--cluster_name <CLUSTER_NAME> --job_file_path <GCS_JOB_FILE_PATH>
"""
import argparse
import re
from google.cloud import dataproc_v1 as dataproc
from google.cloud import storage
def quickstart(project_id, region, cluster_name, job_file_path):
# Create the cluster client.
cluster_client = dataproc.ClusterControllerClient(
client_options={"api_endpoint": "{}-dataproc.googleapis.com:443".format(region)}
)
# Create the cluster config.
cluster = {
"project_id": project_id,
"cluster_name": cluster_name,
"config": {
"master_config": {
"num_instances": 1,
"machine_type_uri": "n1-standard-2",
"disk_config": {"boot_disk_size_gb": 100},
},
"worker_config": {
"num_instances": 2,
"machine_type_uri": "n1-standard-2",
"disk_config": {"boot_disk_size_gb": 100},
},
},
}
# Create the cluster.
operation = cluster_client.create_cluster(
request={"project_id": project_id, "region": region, "cluster": cluster}
)
result = operation.result()
print("Cluster created successfully: {}".format(result.cluster_name))
# Create the job client.
job_client = dataproc.JobControllerClient(
client_options={"api_endpoint": "{}-dataproc.googleapis.com:443".format(region)}
)
# Create the job config.
job = {
"placement": {"cluster_name": cluster_name},
"pyspark_job": {"main_python_file_uri": job_file_path},
}
operation = job_client.submit_job_as_operation(
request={"project_id": project_id, "region": region, "job": job}
)
response = operation.result()
# Dataproc job output gets saved to the Google Cloud Storage bucket
# allocated to the job. Use a regex to obtain the bucket and blob info.
matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri)
output = (
storage.Client()
.get_bucket(matches.group(1))
.blob(f"{matches.group(2)}.000000000")
.download_as_bytes()
.decode("utf-8")
)
print(f"Job finished successfully: {output}")
# Delete the cluster once the job has terminated.
operation = cluster_client.delete_cluster(
request={
"project_id": project_id,
"region": region,
"cluster_name": cluster_name,
}
)
operation.result()
print("Cluster {} successfully deleted.".format(cluster_name))
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument(
"--project_id",
type=str,
required=True,
help="Project to use for creating resources.",
)
parser.add_argument(
"--region",
type=str,
required=True,
help="Region where the resources should live.",
)
parser.add_argument(
"--cluster_name",
type=str,
required=True,
help="Name to use for creating a cluster.",
)
parser.add_argument(
"--job_file_path",
type=str,
required=True,
help="Job in GCS to execute against the cluster.",
)
args = parser.parse_args()
quickstart(args.project_id, args.region, args.cluster_name, args.job_file_path)
后续步骤
如需搜索和过滤其他 Google Cloud 产品的代码示例,请参阅 Google Cloud 示例浏览器。