빠른 시작: Cloud 클라이언트 라이브러리 사용

아래 나열된 샘플 코드는 Cloud 클라이언트 라이브러리를 사용하여 Dataproc 클러스터를 만들고 클러스터에서 작업을 실행한 다음 클러스터를 삭제하는 방법을 보여줍니다.

다음을 사용하여 이러한 작업을 수행할 수도 있습니다.

시작하기 전에

  1. Google 계정으로 로그인합니다.

    아직 계정이 없으면 새 계정을 등록하세요.

  2. Cloud Console의 프로젝트 선택기 페이지에서 Cloud 프로젝트를 선택하거나 만듭니다.

    프로젝트 선택기 페이지로 이동

  3. Google Cloud 프로젝트에 결제가 사용 설정되어 있는지 확인합니다. 프로젝트에 결제가 사용 설정되어 있는지 확인하는 방법을 알아보세요.

  4. Dataproc API를 사용 설정합니다.

    API 사용 설정

코드 실행

Go

  1. 클라이언트 라이브러리를 설치합니다. 자세한 내용은 개발 환경 설정을 참조하세요.
  2. 인증을 설정합니다.
  3. 샘플 GitHub 코드를 클론하고 실행합니다.
  4. 출력을 확인합니다. 코드는 작업 드라이브 로그를 Cloud Storage의 기본 Dataproc 스테이징 버킷으로 출력합니다. 프로젝트의 Dataproc 작업 섹션에서 Cloud Console의 작업 드라이버 출력을 볼 수 있습니다. 작업 세부정보 페이지에서 작업 출력을 보려면 작업 ID를 클릭합니다.


// This quickstart shows how you can use the Cloud Dataproc Client library to create a
// Cloud Dataproc cluster, submit a PySpark job to the cluster, wait for the job to finish
// and finally delete the cluster.
//
// Usage:
//     go build
//     ./quickstart --project_id <PROJECT_ID> --region <REGION> \
//         --cluster_name <CLUSTER_NAME> --job_file_path <GCS_JOB_FILE_PATH>
package main

import (
	"context"
	"flag"
	"fmt"
	"io/ioutil"
	"log"
	"time"

	dataproc "cloud.google.com/go/dataproc/apiv1"
	"cloud.google.com/go/storage"
	"google.golang.org/api/option"
	dataprocpb "google.golang.org/genproto/googleapis/cloud/dataproc/v1"
)

func main() {
	var projectID, clusterName, region, jobFilePath string
	flag.StringVar(&projectID, "project_id", "", "Cloud Project ID, used for creating resources.")
	flag.StringVar(&region, "region", "", "Region that resources should be created in.")
	flag.StringVar(&clusterName, "cluster_name", "", "Name of Cloud Dataproc cluster to create.")
	flag.StringVar(&jobFilePath, "job_file_path", "", "Path to job file in GCS.")
	flag.Parse()

	ctx := context.Background()

	// Create the cluster client.
	endpoint := fmt.Sprintf("%s-dataproc.googleapis.com:443", region)
	clusterClient, err := dataproc.NewClusterControllerClient(ctx, option.WithEndpoint(endpoint))
	if err != nil {
		log.Fatalf("error creating the cluster client: %s\n", err)
	}

	// Create the cluster config.
	createReq := &dataprocpb.CreateClusterRequest{
		ProjectId: projectID,
		Region:    region,
		Cluster: &dataprocpb.Cluster{
			ProjectId:   projectID,
			ClusterName: clusterName,
			Config: &dataprocpb.ClusterConfig{
				MasterConfig: &dataprocpb.InstanceGroupConfig{
					NumInstances:   1,
					MachineTypeUri: "n1-standard-1",
				},
				WorkerConfig: &dataprocpb.InstanceGroupConfig{
					NumInstances:   2,
					MachineTypeUri: "n1-standard-1",
				},
			},
		},
	}

	// Create the cluster.
	createOp, err := clusterClient.CreateCluster(ctx, createReq)
	if err != nil {
		log.Fatalf("error submitting the cluster creation request: %v\n", err)
	}

	createResp, err := createOp.Wait(ctx)
	if err != nil {
		log.Fatalf("error creating the cluster: %v\n", err)
	}

	// Defer cluster deletion.
	defer func() {
		dReq := &dataprocpb.DeleteClusterRequest{
			ProjectId:   projectID,
			Region:      region,
			ClusterName: clusterName,
		}
		deleteOp, err := clusterClient.DeleteCluster(ctx, dReq)
		deleteOp.Wait(ctx)
		if err != nil {
			fmt.Printf("error deleting cluster %q: %v\n", clusterName, err)
			return
		}
		fmt.Printf("Cluster %q successfully deleted\n", clusterName)
	}()

	// Output a success message.
	fmt.Printf("Cluster created successfully: %q\n", createResp.ClusterName)

	// Create the job client.
	jobClient, err := dataproc.NewJobControllerClient(ctx, option.WithEndpoint(endpoint))

	// Create the job config.
	submitJobReq := &dataprocpb.SubmitJobRequest{
		ProjectId: projectID,
		Region:    region,
		Job: &dataprocpb.Job{
			Placement: &dataprocpb.JobPlacement{
				ClusterName: clusterName,
			},
			TypeJob: &dataprocpb.Job_PysparkJob{
				PysparkJob: &dataprocpb.PySparkJob{
					MainPythonFileUri: jobFilePath,
				},
			},
		},
	}

	submitJobResp, err := jobClient.SubmitJob(ctx, submitJobReq)
	if err != nil {
		fmt.Printf("error submitting job: %v\n", err)
		return
	}

	id := submitJobResp.Reference.JobId

	fmt.Printf("Submitted job %q\n", id)

	// These states all signify that a job has terminated, successfully or not.
	terminalStates := map[dataprocpb.JobStatus_State]bool{
		dataprocpb.JobStatus_ERROR:     true,
		dataprocpb.JobStatus_CANCELLED: true,
		dataprocpb.JobStatus_DONE:      true,
	}

	// We can create a timeout such that the job gets cancelled if not in a terminal state after a certain amount of time.
	timeout := 5 * time.Minute
	start := time.Now()

	var state dataprocpb.JobStatus_State
	for {
		if time.Since(start) > timeout {
			cancelReq := &dataprocpb.CancelJobRequest{
				ProjectId: projectID,
				Region:    region,
				JobId:     id,
			}

			if _, err := jobClient.CancelJob(ctx, cancelReq); err != nil {
				fmt.Printf("error cancelling job: %v\n", err)
			}
			fmt.Printf("job %q timed out after %d minutes\n", id, int64(timeout.Minutes()))
			return
		}

		getJobReq := &dataprocpb.GetJobRequest{
			ProjectId: projectID,
			Region:    region,
			JobId:     id,
		}
		getJobResp, err := jobClient.GetJob(ctx, getJobReq)
		if err != nil {
			fmt.Printf("error getting job %q with error: %v\n", id, err)
			return
		}
		state = getJobResp.Status.State
		if terminalStates[state] {
			break
		}

		// Sleep as to not excessively poll the API.
		time.Sleep(1 * time.Second)
	}

	// Cloud Dataproc job outget gets saved to a GCS bucket allocated to it.
	getCReq := &dataprocpb.GetClusterRequest{
		ProjectId:   projectID,
		Region:      region,
		ClusterName: clusterName,
	}

	resp, err := clusterClient.GetCluster(ctx, getCReq)
	if err != nil {
		fmt.Printf("error getting cluster %q: %v\n", clusterName, err)
		return
	}

	storageClient, err := storage.NewClient(ctx)
	if err != nil {
		fmt.Printf("error creating storage client: %v\n", err)
		return
	}

	obj := fmt.Sprintf("google-cloud-dataproc-metainfo/%s/jobs/%s/driveroutput.000000000", resp.ClusterUuid, id)
	reader, err := storageClient.Bucket(resp.Config.ConfigBucket).Object(obj).NewReader(ctx)
	if err != nil {
		fmt.Printf("error reading job output: %v\n", err)
		return
	}

	defer reader.Close()

	body, err := ioutil.ReadAll(reader)
	if err != nil {
		fmt.Printf("could not read output from Dataproc Job %q\n", id)
		return
	}

	fmt.Printf("job %q finished with state %s:\n%s\n", id, state, body)
}

자바

  1. 클라이언트 라이브러리를 설치합니다. 자세한 내용은 자바 개발 환경 설정을 참조하세요.
  2. 인증을 설정합니다.
  3. 샘플 GitHub 코드를 클론하고 실행합니다.
  4. 출력을 확인합니다. 코드는 작업 드라이브 로그를 Cloud Storage의 기본 Dataproc 스테이징 버킷으로 출력합니다. 프로젝트의 Dataproc 작업 섹션에서 Cloud Console의 작업 드라이버 출력을 볼 수 있습니다. 작업 세부정보 페이지에서 작업 출력을 보려면 작업 ID를 클릭합니다.

/* This quickstart sample walks a user through creating a Cloud Dataproc
 * cluster, submitting a PySpark job from Google Cloud Storage to the
 * cluster, reading the output of the job and deleting the cluster, all
 * using the Java client library.
 *
 * Usage:
 *     mvn clean package -DskipTests
 *
 *     mvn exec:java -Dexec.args="<PROJECT_ID> <REGION> <CLUSTER_NAME> <GCS_JOB_FILE_PATH>"
 *
 *     You can also set these arguments in the main function instead of providing them via the CLI.
 */

import com.google.api.gax.longrunning.OperationFuture;
import com.google.cloud.dataproc.v1.Cluster;
import com.google.cloud.dataproc.v1.ClusterConfig;
import com.google.cloud.dataproc.v1.ClusterControllerClient;
import com.google.cloud.dataproc.v1.ClusterControllerSettings;
import com.google.cloud.dataproc.v1.ClusterOperationMetadata;
import com.google.cloud.dataproc.v1.InstanceGroupConfig;
import com.google.cloud.dataproc.v1.Job;
import com.google.cloud.dataproc.v1.JobControllerClient;
import com.google.cloud.dataproc.v1.JobControllerSettings;
import com.google.cloud.dataproc.v1.JobMetadata;
import com.google.cloud.dataproc.v1.JobPlacement;
import com.google.cloud.dataproc.v1.PySparkJob;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import com.google.protobuf.Empty;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class Quickstart {

  public static Job waitForJobCompletion(
      JobControllerClient jobControllerClient, String projectId, String region, String jobId) {
    while (true) {
      // Poll the service periodically until the Job is in a finished state.
      Job jobInfo = jobControllerClient.getJob(projectId, region, jobId);
      switch (jobInfo.getStatus().getState()) {
        case DONE:
        case CANCELLED:
        case ERROR:
          return jobInfo;
        default:
          try {
            // Wait a second in between polling attempts.
            TimeUnit.SECONDS.sleep(1);
          } catch (InterruptedException e) {
            throw new RuntimeException(e);
          }
      }
    }
  }

  public static void quickstart(
      String projectId, String region, String clusterName, String jobFilePath)
      throws IOException, InterruptedException {
    String myEndpoint = String.format("%s-dataproc.googleapis.com:443", region);

    // Configure the settings for the cluster controller client.
    ClusterControllerSettings clusterControllerSettings =
        ClusterControllerSettings.newBuilder().setEndpoint(myEndpoint).build();

    // Configure the settings for the job controller client.
    JobControllerSettings jobControllerSettings =
        JobControllerSettings.newBuilder().setEndpoint(myEndpoint).build();

    // Create both a cluster controller client and job controller client with the
    // configured settings. The client only needs to be created once and can be reused for
    // multiple requests. Using a try-with-resources closes the client, but this can also be done
    // manually with the .close() method.
    try (ClusterControllerClient clusterControllerClient =
            ClusterControllerClient.create(clusterControllerSettings);
        JobControllerClient jobControllerClient =
            JobControllerClient.create(jobControllerSettings)) {
      // Configure the settings for our cluster.
      InstanceGroupConfig masterConfig =
          InstanceGroupConfig.newBuilder()
              .setMachineTypeUri("n1-standard-1")
              .setNumInstances(1)
              .build();
      InstanceGroupConfig workerConfig =
          InstanceGroupConfig.newBuilder()
              .setMachineTypeUri("n1-standard-1")
              .setNumInstances(2)
              .build();
      ClusterConfig clusterConfig =
          ClusterConfig.newBuilder()
              .setMasterConfig(masterConfig)
              .setWorkerConfig(workerConfig)
              .build();
      // Create the cluster object with the desired cluster config.
      Cluster cluster =
          Cluster.newBuilder().setClusterName(clusterName).setConfig(clusterConfig).build();

      // Create the Cloud Dataproc cluster.
      OperationFuture<Cluster, ClusterOperationMetadata> createClusterAsyncRequest =
          clusterControllerClient.createClusterAsync(projectId, region, cluster);
      Cluster clusterResponse = createClusterAsyncRequest.get();
      System.out.println(
          String.format("Cluster created successfully: %s", clusterResponse.getClusterName()));

      // Configure the settings for our job.
      JobPlacement jobPlacement = JobPlacement.newBuilder().setClusterName(clusterName).build();
      PySparkJob pySparkJob = PySparkJob.newBuilder().setMainPythonFileUri(jobFilePath).build();
      Job job = Job.newBuilder().setPlacement(jobPlacement).setPysparkJob(pySparkJob).build();

      // Submit an asynchronous request to execute the job.
      Job request = jobControllerClient.submitJob(projectId, region, job);
      String jobId = request.getReference().getJobId();
      System.out.println(String.format("Submitting job \"%s\"", jobId));

      // Wait for the job to finish.
      System.out.println(String.format("Job %s finished successfully.", jobId));

      OperationFuture<Job, JobMetadata> submitJobAsOperationAsyncRequest =
          jobControllerClient.submitJobAsOperationAsync(projectId, region, job);

      Job jobResponse = submitJobAsOperationAsyncRequest.get();

      // Print output from Google Cloud Storage.
      Matcher matches =
          Pattern.compile("gs://(.*?)/(.*)").matcher(jobResponse.getDriverOutputResourceUri());
      matches.matches();

      Storage storage = StorageOptions.getDefaultInstance().getService();
      Blob blob = storage.get(matches.group(1), String.format("%s.000000000", matches.group(2)));

      System.out.println(
          String.format("Job finished successfully: %s", new String(blob.getContent())));

      // Delete the cluster.
      OperationFuture<Empty, ClusterOperationMetadata> deleteClusterAsyncRequest =
          clusterControllerClient.deleteClusterAsync(projectId, region, clusterName);
      deleteClusterAsyncRequest.get();
      System.out.println(String.format("Cluster \"%s\" successfully deleted.", clusterName));

    } catch (ExecutionException e) {
      System.err.println(String.format("quickstart: %s ", e.getMessage()));
    }
  }

  public static void main(String... args) throws IOException, InterruptedException {
    if (args.length != 4) {
      System.err.println(
          "Insufficient number of parameters provided. Please make sure a "
              + "PROJECT_ID, REGION, CLUSTER_NAME and JOB_FILE_PATH are provided, in this order.");
      return;
    }

    String projectId = args[0]; // project-id of project to create the cluster in
    String region = args[1]; // region to create the cluster
    String clusterName = args[2]; // name of the cluster
    String jobFilePath = args[3]; // location in GCS of the PySpark job

    quickstart(projectId, region, clusterName, jobFilePath);
  }
}

Node.js

  1. 클라이언트 라이브러리를 설치합니다. 자세한 내용은 Node.js 개발 환경 설정을 참조하세요.
  2. 인증을 설정합니다.
  3. 샘플 GitHub 코드를 클론하고 실행합니다.
  4. 출력을 확인합니다. 코드는 작업 드라이브 로그를 Cloud Storage의 기본 Dataproc 스테이징 버킷으로 출력합니다. 프로젝트의 Dataproc 작업 섹션에서 Cloud Console의 작업 드라이버 출력을 볼 수 있습니다. 작업 세부정보 페이지에서 작업 출력을 보려면 작업 ID를 클릭합니다.

// This quickstart sample walks a user through creating a Cloud Dataproc
// cluster, submitting a PySpark job from Google Cloud Storage to the
// cluster, reading the output of the job and deleting the cluster, all
// using the Node.js client library.

'use strict';

function main(projectId, region, clusterName, jobFilePath) {
  const dataproc = require('@google-cloud/dataproc');
  const {Storage} = require('@google-cloud/storage');

  // Create a cluster client with the endpoint set to the desired cluster region
  const clusterClient = new dataproc.v1.ClusterControllerClient({
    apiEndpoint: `${region}-dataproc.googleapis.com`,
    projectId: projectId,
  });

  // Create a job client with the endpoint set to the desired cluster region
  const jobClient = new dataproc.v1.JobControllerClient({
    apiEndpoint: `${region}-dataproc.googleapis.com`,
    projectId: projectId,
  });

  async function quickstart() {
    // Create the cluster config
    const cluster = {
      projectId: projectId,
      region: region,
      cluster: {
        clusterName: clusterName,
        config: {
          masterConfig: {
            numInstances: 1,
            machineTypeUri: 'n1-standard-1',
          },
          workerConfig: {
            numInstances: 2,
            machineTypeUri: 'n1-standard-1',
          },
        },
      },
    };

    // Create the cluster
    const [operation] = await clusterClient.createCluster(cluster);
    const [response] = await operation.promise();

    // Output a success message
    console.log(`Cluster created successfully: ${response.clusterName}`);

    const job = {
      projectId: projectId,
      region: region,
      job: {
        placement: {
          clusterName: clusterName,
        },
        pysparkJob: {
          mainPythonFileUri: jobFilePath,
        },
      },
    };

    let [jobResp] = await jobClient.submitJob(job);
    const jobId = jobResp.reference.jobId;

    console.log(`Submitted job "${jobId}".`);

    // Terminal states for a job
    const terminalStates = new Set(['DONE', 'ERROR', 'CANCELLED']);

    // Create a timeout such that the job gets cancelled if not
    // in a termimal state after a fixed period of time.
    const timeout = 600000;
    const start = new Date();

    // Wait for the job to finish.
    const jobReq = {
      projectId: projectId,
      region: region,
      jobId: jobId,
    };

    while (!terminalStates.has(jobResp.status.state)) {
      if (new Date() - timeout > start) {
        await jobClient.cancelJob(jobReq);
        console.log(
          `Job ${jobId} timed out after threshold of ` +
            `${timeout / 60000} minutes.`
        );
        break;
      }
      await sleep(1);
      [jobResp] = await jobClient.getJob(jobReq);
    }

    const clusterReq = {
      projectId: projectId,
      region: region,
      clusterName: clusterName,
    };

    const [clusterResp] = await clusterClient.getCluster(clusterReq);

    const storage = new Storage();

    const output = await storage
      .bucket(clusterResp.config.configBucket)
      .file(
        `google-cloud-dataproc-metainfo/${clusterResp.clusterUuid}/` +
          `jobs/${jobId}/driveroutput.000000000`
      )
      .download();

    // Output a success message.
    console.log(
      `Job ${jobId} finished with state ${jobResp.status.state}:\n${output}`
    );

    // Delete the cluster once the job has terminated.
    const [deleteOperation] = await clusterClient.deleteCluster(clusterReq);
    await deleteOperation.promise();

    // Output a success message
    console.log(`Cluster ${clusterName} successfully deleted.`);
  }

  quickstart();
}

// Helper function to sleep for the given number of seconds
function sleep(seconds) {
  return new Promise(resolve => setTimeout(resolve, seconds * 1000));
}

const args = process.argv.slice(2);

if (args.length !== 4) {
  console.log(
    'Insufficient number of parameters provided. Please make sure a ' +
      'PROJECT_ID, REGION, CLUSTER_NAME and JOB_FILE_PATH are provided, in this order.'
  );
}

main(...args);

Python

  1. 클라이언트 라이브러리를 설치합니다. 자세한 내용은 Python 개발 환경 설정을 참조하세요.
  2. 인증을 설정합니다.
  3. 샘플 GitHub 코드를 클론하고 실행합니다.
  4. 출력을 확인합니다. 코드는 작업 드라이브 로그를 Cloud Storage의 기본 Dataproc 스테이징 버킷으로 출력합니다. 프로젝트의 Dataproc 작업 섹션에서 Cloud Console의 작업 드라이버 출력을 볼 수 있습니다. 작업 세부정보 페이지에서 작업 출력을 보려면 작업 ID를 클릭합니다.

"""
This quickstart sample walks a user through creating a Cloud Dataproc
cluster, submitting a PySpark job from Google Cloud Storage to the
cluster, reading the output of the job and deleting the cluster, all
using the Python client library.

Usage:
    python quickstart.py --project_id <PROJECT_ID> --region <REGION> \
        --cluster_name <CLUSTER_NAME> --job_file_path <GCS_JOB_FILE_PATH>
"""

import argparse
import time

from google.cloud import dataproc_v1 as dataproc
from google.cloud import storage

def quickstart(project_id, region, cluster_name, job_file_path):
    # Create the cluster client.
    cluster_client = dataproc.ClusterControllerClient(
        client_options={"api_endpoint": "{}-dataproc.googleapis.com:443".format(region)}
    )

    # Create the cluster config.
    cluster = {
        "project_id": project_id,
        "cluster_name": cluster_name,
        "config": {
            "master_config": {"num_instances": 1, "machine_type_uri": "n1-standard-1"},
            "worker_config": {"num_instances": 2, "machine_type_uri": "n1-standard-1"},
        },
    }

    # Create the cluster.
    operation = cluster_client.create_cluster(
        request={"project_id": project_id, "region": region, "cluster": cluster}
    )
    result = operation.result()

    print("Cluster created successfully: {}".format(result.cluster_name))

    # Create the job client.
    job_client = dataproc.JobControllerClient(
        client_options={"api_endpoint": "{}-dataproc.googleapis.com:443".format(region)}
    )

    # Create the job config.
    job = {
        "placement": {"cluster_name": cluster_name},
        "pyspark_job": {"main_python_file_uri": job_file_path},
    }

    job_response = job_client.submit_job(
        request={"project_id": project_id, "region": region, "job": job}
    )
    job_id = job_response.reference.job_id

    print('Submitted job "{}".'.format(job_id))

    # Termimal states for a job.
    terminal_states = {
        dataproc.JobStatus.State.ERROR,
        dataproc.JobStatus.State.CANCELLED,
        dataproc.JobStatus.State.DONE,
    }

    # Create a timeout such that the job gets cancelled if not in a
    # terminal state after a fixed period of time.
    timeout_seconds = 600
    time_start = time.time()

    # Wait for the job to complete.
    while job_response.status.state not in terminal_states:
        if time.time() > time_start + timeout_seconds:
            job_client.cancel_job(
                request={"project_id": project_id, "region": region, "job_id": job_id}
            )
            print(
                "Job {} timed out after threshold of {} seconds.".format(
                    job_id, timeout_seconds
                )
            )

        # Poll for job termination once a second.
        time.sleep(1)
        job_response = job_client.get_job(
            request={"project_id": project_id, "region": region, "job_id": job_id}
        )

    # Cloud Dataproc job output gets saved to a GCS bucket allocated to it.
    cluster_info = cluster_client.get_cluster(
        request={
            "project_id": project_id,
            "region": region,
            "cluster_name": cluster_name,
        }
    )

    storage_client = storage.Client()
    bucket = storage_client.get_bucket(cluster_info.config.config_bucket)
    output_blob = "google-cloud-dataproc-metainfo/{}/jobs/{}/driveroutput.000000000".format(
        cluster_info.cluster_uuid, job_id
    )
    output = bucket.blob(output_blob).download_as_string()

    print(
        "Job {} finished with state {}:\n{}".format(
            job_id, job_response.status.state.name, output
        )
    )

    # Delete the cluster once the job has terminated.
    operation = cluster_client.delete_cluster(
        request={
            "project_id": project_id,
            "region": region,
            "cluster_name": cluster_name,
        }
    )
    operation.result()

    print("Cluster {} successfully deleted.".format(cluster_name))

if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter,
    )
    parser.add_argument(
        "--project_id",
        type=str,
        required=True,
        help="Project to use for creating resources.",
    )
    parser.add_argument(
        "--region",
        type=str,
        required=True,
        help="Region where the resources should live.",
    )
    parser.add_argument(
        "--cluster_name",
        type=str,
        required=True,
        help="Name to use for creating a cluster.",
    )
    parser.add_argument(
        "--job_file_path",
        type=str,
        required=True,
        help="Job in GCS to execute against the cluster.",
    )

    args = parser.parse_args()
    quickstart(args.project_id, args.region, args.cluster_name, args.job_file_path)

다음 단계

  • Dataproc 클라우드 클라이언트 라이브러리 추가 리소스를 참조하세요.