Envia um job do Spark para um cluster do Dataproc.

Antes de testar este exemplo, siga as instruções de configuração do Go no Guia de início rápido do Dataproc: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Go do Dataproc.

Para autenticar no Dataproc, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.

import (

	dataproc "cloud.google.com/go/dataproc/apiv1"

func submitJob(w io.Writer, projectID, region, clusterName string) error {
	// projectID := "your-project-id"
	// region := "us-central1"
	// clusterName := "your-cluster"
	ctx := context.Background()

	// Create the job client.
	endpoint := fmt.Sprintf("%s-dataproc.googleapis.com:443", region)
	jobClient, err := dataproc.NewJobControllerClient(ctx, option.WithEndpoint(endpoint))
	if err != nil {
		log.Fatalf("error creating the job client: %s\n", err)

	// Create the job config.
	submitJobReq := &dataprocpb.SubmitJobRequest{
		ProjectId: projectID,
		Region:    region,
		Job: &dataprocpb.Job{
			Placement: &dataprocpb.JobPlacement{
				ClusterName: clusterName,
			TypeJob: &dataprocpb.Job_SparkJob{
				SparkJob: &dataprocpb.SparkJob{
					Driver: &dataprocpb.SparkJob_MainClass{
						MainClass: "org.apache.spark.examples.SparkPi",
					JarFileUris: []string{"file:///usr/lib/spark/examples/jars/spark-examples.jar"},
					Args:        []string{"1000"},

	submitJobOp, err := jobClient.SubmitJobAsOperation(ctx, submitJobReq)
	if err != nil {
		return fmt.Errorf("error with request to submitting job: %w", err)

	submitJobResp, err := submitJobOp.Wait(ctx)
	if err != nil {
		return fmt.Errorf("error submitting job: %w", err)

	re := regexp.MustCompile("gs://(.+?)/(.+)")
	matches := re.FindStringSubmatch(submitJobResp.DriverOutputResourceUri)

	if len(matches) < 3 {
		return fmt.Errorf("regex error: %s", submitJobResp.DriverOutputResourceUri)

	// Dataproc job output gets saved to a GCS bucket allocated to it.
	storageClient, err := storage.NewClient(ctx)
	if err != nil {
		return fmt.Errorf("error creating storage client: %w", err)

	obj := fmt.Sprintf("%s.000000000", matches[2])
	reader, err := storageClient.Bucket(matches[1]).Object(obj).NewReader(ctx)
	if err != nil {
		return fmt.Errorf("error reading job output: %w", err)

	defer reader.Close()

	body, err := io.ReadAll(reader)
	if err != nil {
		return fmt.Errorf("could not read output from Dataproc Job: %w", err)

	fmt.Fprintf(w, "Job finished successfully: %s", body)

	return nil

Antes de testar este exemplo, siga as instruções de configuração do Java no Guia de início rápido do Dataproc: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Java do Dataproc.

Para autenticar no Dataproc, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.

import com.google.api.gax.longrunning.OperationFuture;
import com.google.cloud.dataproc.v1.Job;
import com.google.cloud.dataproc.v1.JobControllerClient;
import com.google.cloud.dataproc.v1.JobControllerSettings;
import com.google.cloud.dataproc.v1.JobMetadata;
import com.google.cloud.dataproc.v1.JobPlacement;
import com.google.cloud.dataproc.v1.SparkJob;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class SubmitJob {

  public static void submitJob() throws IOException, InterruptedException {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String region = "your-project-region";
    String clusterName = "your-cluster-name";
    submitJob(projectId, region, clusterName);

  public static void submitJob(String projectId, String region, String clusterName)
      throws IOException, InterruptedException {
    String myEndpoint = String.format("%s-dataproc.googleapis.com:443", region);

    // Configure the settings for the job controller client.
    JobControllerSettings jobControllerSettings =

    // Create a job controller client with the configured settings. Using a try-with-resources
    // closes the client,
    // but this can also be done manually with the .close() method.
    try (JobControllerClient jobControllerClient =
        JobControllerClient.create(jobControllerSettings)) {

      // Configure cluster placement for the job.
      JobPlacement jobPlacement = JobPlacement.newBuilder().setClusterName(clusterName).build();

      // Configure Spark job settings.
      SparkJob sparkJob =

      Job job = Job.newBuilder().setPlacement(jobPlacement).setSparkJob(sparkJob).build();

      // Submit an asynchronous request to execute the job.
      OperationFuture<Job, JobMetadata> submitJobAsOperationAsyncRequest =
          jobControllerClient.submitJobAsOperationAsync(projectId, region, job);

      Job response = submitJobAsOperationAsyncRequest.get();

      // Print output from Google Cloud Storage.
      Matcher matches =

      Storage storage = StorageOptions.getDefaultInstance().getService();
      Blob blob = storage.get(matches.group(1), String.format("%s.000000000", matches.group(2)));

          String.format("Job finished successfully: %s", new String(blob.getContent())));

    } catch (ExecutionException e) {
      // If the job does not complete successfully, print the error message.
      System.err.println(String.format("submitJob: %s ", e.getMessage()));

Antes de testar este exemplo, siga as instruções de configuração do Node.js no Guia de início rápido do Dataproc: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Node.js do Dataproc.

Para autenticar no Dataproc, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.

const dataproc = require('@google-cloud/dataproc');
const {Storage} = require('@google-cloud/storage');

// TODO(developer): Uncomment and set the following variables
// projectId = 'YOUR_PROJECT_ID'
// clusterName = 'YOUR_CLUSTER_NAME'

// Create a client with the endpoint set to the desired cluster region
const jobClient = new dataproc.v1.JobControllerClient({
  apiEndpoint: `${region}-dataproc.googleapis.com`,
  projectId: projectId,

async function submitJob() {
  const job = {
    projectId: projectId,
    region: region,
    job: {
      placement: {
        clusterName: clusterName,
      sparkJob: {
        mainClass: 'org.apache.spark.examples.SparkPi',
        jarFileUris: [
        args: ['1000'],

  const [jobOperation] = await jobClient.submitJobAsOperation(job);
  const [jobResponse] = await jobOperation.promise();

  const matches =

  const storage = new Storage();

  const output = await storage

  // Output a success message.
  console.log(`Job finished successfully: ${output}`);

Antes de testar este exemplo, siga as instruções de configuração do Python no Guia de início rápido do Dataproc: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Python do Dataproc.

Para autenticar no Dataproc, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.

# Create the job client.
job_client = dataproc_v1.JobControllerClient(
    client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}

# Create the job config.
job = {
    "placement": {"cluster_name": cluster_name},
    "pyspark_job": {"main_python_file_uri": f"gs://{gcs_bucket}/{spark_filename}"},

operation = job_client.submit_job_as_operation(
    request={"project_id": project_id, "region": region, "job": job}
response = operation.result()

# Dataproc job output is saved to the Cloud Storage bucket
# allocated to the job. Use regex to obtain the bucket and blob info.
matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri)

output = (

print(f"Job finished successfully: {output}\r\n")

