// This quickstart shows how you can use the Dataproc Client library to create a
// Dataproc cluster, submit a PySpark job to the cluster, wait for the job to finish
// and finally delete the cluster.
//
// Usage:
//
// go build
// ./quickstart --project_id <PROJECT_ID> --region <REGION> \
// --cluster_name <CLUSTER_NAME> --job_file_path <GCS_JOB_FILE_PATH>
package main
import (
"context"
"flag"
"fmt"
"io"
"log"
"regexp"
dataproc "cloud.google.com/go/dataproc/apiv1"
"cloud.google.com/go/dataproc/apiv1/dataprocpb"
"cloud.google.com/go/storage"
"google.golang.org/api/option"
)
func main() {
var projectID, clusterName, region, jobFilePath string
flag.StringVar(&projectID, "project_id", "", "Cloud Project ID, used for creating resources.")
flag.StringVar(®ion, "region", "", "Region that resources should be created in.")
flag.StringVar(&clusterName, "cluster_name", "", "Name of Cloud Dataproc cluster to create.")
flag.StringVar(&jobFilePath, "job_file_path", "", "Path to job file in GCS.")
flag.Parse()
ctx := context.Background()
// Create the cluster client.
endpoint := fmt.Sprintf("%s-dataproc.googleapis.com:443", region)
clusterClient, err := dataproc.NewClusterControllerClient(ctx, option.WithEndpoint(endpoint))
if err != nil {
log.Fatalf("error creating the cluster client: %s\n", err)
}
// Create the cluster config.
createReq := &dataprocpb.CreateClusterRequest{
ProjectId: projectID,
Region: region,
Cluster: &dataprocpb.Cluster{
ProjectId: projectID,
ClusterName: clusterName,
Config: &dataprocpb.ClusterConfig{
MasterConfig: &dataprocpb.InstanceGroupConfig{
NumInstances: 1,
MachineTypeUri: "n1-standard-2",
},
WorkerConfig: &dataprocpb.InstanceGroupConfig{
NumInstances: 2,
MachineTypeUri: "n1-standard-2",
},
},
},
}
// Create the cluster.
createOp, err := clusterClient.CreateCluster(ctx, createReq)
if err != nil {
log.Fatalf("error submitting the cluster creation request: %v\n", err)
}
createResp, err := createOp.Wait(ctx)
if err != nil {
log.Fatalf("error creating the cluster: %v\n", err)
}
// Defer cluster deletion.
defer func() {
dReq := &dataprocpb.DeleteClusterRequest{
ProjectId: projectID,
Region: region,
ClusterName: clusterName,
}
deleteOp, err := clusterClient.DeleteCluster(ctx, dReq)
deleteOp.Wait(ctx)
if err != nil {
fmt.Printf("error deleting cluster %q: %v\n", clusterName, err)
return
}
fmt.Printf("Cluster %q successfully deleted\n", clusterName)
}()
// Output a success message.
fmt.Printf("Cluster created successfully: %q\n", createResp.ClusterName)
// Create the job client.
jobClient, err := dataproc.NewJobControllerClient(ctx, option.WithEndpoint(endpoint))
// Create the job config.
submitJobReq := &dataprocpb.SubmitJobRequest{
ProjectId: projectID,
Region: region,
Job: &dataprocpb.Job{
Placement: &dataprocpb.JobPlacement{
ClusterName: clusterName,
},
TypeJob: &dataprocpb.Job_PysparkJob{
PysparkJob: &dataprocpb.PySparkJob{
MainPythonFileUri: jobFilePath,
},
},
},
}
submitJobOp, err := jobClient.SubmitJobAsOperation(ctx, submitJobReq)
if err != nil {
fmt.Printf("error with request to submitting job: %v\n", err)
return
}
submitJobResp, err := submitJobOp.Wait(ctx)
if err != nil {
fmt.Printf("error submitting job: %v\n", err)
return
}
re := regexp.MustCompile("gs://(.+?)/(.+)")
matches := re.FindStringSubmatch(submitJobResp.DriverOutputResourceUri)
if len(matches) < 3 {
fmt.Printf("regex error: %s\n", submitJobResp.DriverOutputResourceUri)
return
}
// Dataproc job outget gets saved to a GCS bucket allocated to it.
storageClient, err := storage.NewClient(ctx)
if err != nil {
fmt.Printf("error creating storage client: %v\n", err)
return
}
obj := fmt.Sprintf("%s.000000000", matches[2])
reader, err := storageClient.Bucket(matches[1]).Object(obj).NewReader(ctx)
if err != nil {
fmt.Printf("error reading job output: %v\n", err)
return
}
defer reader.Close()
body, err := io.ReadAll(reader)
if err != nil {
fmt.Printf("could not read output from Dataproc Job: %v\n", err)
return
}
fmt.Printf("Job finished successfully: %s", body)
}