使用客户端库创建 Dataproc 集群

下面列出的示例代码介绍了如何使用 Cloud 客户端库创建 Dataproc 集群,在集群上运行作业,然后删除集群。



  1. 登录您的 Google Cloud 账号。如果您是 Google Cloud 新手,请创建一个账号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
  In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    

  3. 确保您的 Google Cloud 项目已启用结算功能

  Enable the Dataproc API.

    

试用演示:点击在 Cloud Shell 中打开,运行 Python Cloud 客户端库演示,创建集群、运行 PySpark 作业,然后删除集群。

在 Cloud Shell 中打开


  1. 安装客户端库 如需了解详情,请参阅设置开发环境
  2. 设置身份验证
  3. 克隆并运行示例 GitHub 代码。
  4. 查看输出。代码会将作业驱动程序日志输出到 Cloud Storage 中的默认 Dataproc 暂存存储分区。从 Google Cloud 控制台,您可以在项目的 Dataproc 作业部分中查看作业驱动程序输出。点击“作业详情”页面上的作业 ID 以查看作业输出。

// This quickstart shows how you can use the Dataproc Client library to create a
// Dataproc cluster, submit a PySpark job to the cluster, wait for the job to finish
// and finally delete the cluster.
// Usage:
//	go build
//	./quickstart --project_id <PROJECT_ID> --region <REGION> \
//	    --cluster_name <CLUSTER_NAME> --job_file_path <GCS_JOB_FILE_PATH>
package main

import (

	dataproc "cloud.google.com/go/dataproc/apiv1"

func main() {
	var projectID, clusterName, region, jobFilePath string
	flag.StringVar(&projectID, "project_id", "", "Cloud Project ID, used for creating resources.")
	flag.StringVar(&region, "region", "", "Region that resources should be created in.")
	flag.StringVar(&clusterName, "cluster_name", "", "Name of Cloud Dataproc cluster to create.")
	flag.StringVar(&jobFilePath, "job_file_path", "", "Path to job file in GCS.")

	ctx := context.Background()

	// Create the cluster client.
	endpoint := fmt.Sprintf("%s-dataproc.googleapis.com:443", region)
	clusterClient, err := dataproc.NewClusterControllerClient(ctx, option.WithEndpoint(endpoint))
	if err != nil {
		log.Fatalf("error creating the cluster client: %s\n", err)

	// Create the cluster config.
	createReq := &dataprocpb.CreateClusterRequest{
		ProjectId: projectID,
		Region:    region,
		Cluster: &dataprocpb.Cluster{
			ProjectId:   projectID,
			ClusterName: clusterName,
			Config: &dataprocpb.ClusterConfig{
				MasterConfig: &dataprocpb.InstanceGroupConfig{
					NumInstances:   1,
					MachineTypeUri: "n1-standard-2",
				WorkerConfig: &dataprocpb.InstanceGroupConfig{
					NumInstances:   2,
					MachineTypeUri: "n1-standard-2",

	// Create the cluster.
	createOp, err := clusterClient.CreateCluster(ctx, createReq)
	if err != nil {
		log.Fatalf("error submitting the cluster creation request: %v\n", err)

	createResp, err := createOp.Wait(ctx)
	if err != nil {
		log.Fatalf("error creating the cluster: %v\n", err)

	// Defer cluster deletion.
	defer func() {
		dReq := &dataprocpb.DeleteClusterRequest{
			ProjectId:   projectID,
			Region:      region,
			ClusterName: clusterName,
		deleteOp, err := clusterClient.DeleteCluster(ctx, dReq)
		if err != nil {
			fmt.Printf("error deleting cluster %q: %v\n", clusterName, err)
		fmt.Printf("Cluster %q successfully deleted\n", clusterName)

	// Output a success message.
	fmt.Printf("Cluster created successfully: %q\n", createResp.ClusterName)

	// Create the job client.
	jobClient, err := dataproc.NewJobControllerClient(ctx, option.WithEndpoint(endpoint))

	// Create the job config.
	submitJobReq := &dataprocpb.SubmitJobRequest{
		ProjectId: projectID,
		Region:    region,
		Job: &dataprocpb.Job{
			Placement: &dataprocpb.JobPlacement{
				ClusterName: clusterName,
			TypeJob: &dataprocpb.Job_PysparkJob{
				PysparkJob: &dataprocpb.PySparkJob{
					MainPythonFileUri: jobFilePath,

	submitJobOp, err := jobClient.SubmitJobAsOperation(ctx, submitJobReq)
	if err != nil {
		fmt.Printf("error with request to submitting job: %v\n", err)

	submitJobResp, err := submitJobOp.Wait(ctx)
	if err != nil {
		fmt.Printf("error submitting job: %v\n", err)

	re := regexp.MustCompile("gs://(.+?)/(.+)")
	matches := re.FindStringSubmatch(submitJobResp.DriverOutputResourceUri)

	if len(matches) < 3 {
		fmt.Printf("regex error: %s\n", submitJobResp.DriverOutputResourceUri)

	// Dataproc job outget gets saved to a GCS bucket allocated to it.
	storageClient, err := storage.NewClient(ctx)
	if err != nil {
		fmt.Printf("error creating storage client: %v\n", err)

	obj := fmt.Sprintf("%s.000000000", matches[2])
	reader, err := storageClient.Bucket(matches[1]).Object(obj).NewReader(ctx)
	if err != nil {
		fmt.Printf("error reading job output: %v\n", err)

	defer reader.Close()

	body, err := io.ReadAll(reader)
	if err != nil {
		fmt.Printf("could not read output from Dataproc Job: %v\n", err)

	fmt.Printf("Job finished successfully: %s", body)


  1. 安装客户端库 如需了解详情,请参阅设置 Java 开发环境
  2. 设置身份验证
  3. 克隆并运行示例 GitHub 代码。
  4. 查看输出。代码会将作业驱动程序日志输出到 Cloud Storage 中的默认 Dataproc 暂存存储分区。从 Google Cloud 控制台,您可以在项目的 Dataproc 作业部分中查看作业驱动程序输出。点击“作业详情”页面上的作业 ID 以查看作业输出。

/* This quickstart sample walks a user through creating a Cloud Dataproc
 * cluster, submitting a PySpark job from Google Cloud Storage to the
 * cluster, reading the output of the job and deleting the cluster, all
 * using the Java client library.
 * Usage:
 *     mvn clean package -DskipTests
 *     mvn exec:java -Dexec.args="<PROJECT_ID> <REGION> <CLUSTER_NAME> <GCS_JOB_FILE_PATH>"
 *     You can also set these arguments in the main function instead of providing them via the CLI.

import com.google.api.gax.longrunning.OperationFuture;
import com.google.cloud.dataproc.v1.Cluster;
import com.google.cloud.dataproc.v1.ClusterConfig;
import com.google.cloud.dataproc.v1.ClusterControllerClient;
import com.google.cloud.dataproc.v1.ClusterControllerSettings;
import com.google.cloud.dataproc.v1.ClusterOperationMetadata;
import com.google.cloud.dataproc.v1.InstanceGroupConfig;
import com.google.cloud.dataproc.v1.Job;
import com.google.cloud.dataproc.v1.JobControllerClient;
import com.google.cloud.dataproc.v1.JobControllerSettings;
import com.google.cloud.dataproc.v1.JobMetadata;
import com.google.cloud.dataproc.v1.JobPlacement;
import com.google.cloud.dataproc.v1.PySparkJob;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import com.google.protobuf.Empty;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class Quickstart {

  public static void quickstart(
      String projectId, String region, String clusterName, String jobFilePath)
      throws IOException, InterruptedException {
    String myEndpoint = String.format("%s-dataproc.googleapis.com:443", region);

    // Configure the settings for the cluster controller client.
    ClusterControllerSettings clusterControllerSettings =

    // Configure the settings for the job controller client.
    JobControllerSettings jobControllerSettings =

    // Create both a cluster controller client and job controller client with the
    // configured settings. The client only needs to be created once and can be reused for
    // multiple requests. Using a try-with-resources closes the client, but this can also be done
    // manually with the .close() method.
    try (ClusterControllerClient clusterControllerClient =
        JobControllerClient jobControllerClient =
            JobControllerClient.create(jobControllerSettings)) {
      // Configure the settings for our cluster.
      InstanceGroupConfig masterConfig =
      InstanceGroupConfig workerConfig =
      ClusterConfig clusterConfig =
      // Create the cluster object with the desired cluster config.
      Cluster cluster =

      // Create the Cloud Dataproc cluster.
      OperationFuture<Cluster, ClusterOperationMetadata> createClusterAsyncRequest =
          clusterControllerClient.createClusterAsync(projectId, region, cluster);
      Cluster clusterResponse = createClusterAsyncRequest.get();
          String.format("Cluster created successfully: %s", clusterResponse.getClusterName()));

      // Configure the settings for our job.
      JobPlacement jobPlacement = JobPlacement.newBuilder().setClusterName(clusterName).build();
      PySparkJob pySparkJob = PySparkJob.newBuilder().setMainPythonFileUri(jobFilePath).build();
      Job job = Job.newBuilder().setPlacement(jobPlacement).setPysparkJob(pySparkJob).build();

      // Submit an asynchronous request to execute the job.
      OperationFuture<Job, JobMetadata> submitJobAsOperationAsyncRequest =
          jobControllerClient.submitJobAsOperationAsync(projectId, region, job);
      Job jobResponse = submitJobAsOperationAsyncRequest.get();

      // Print output from Google Cloud Storage.
      Matcher matches =

      Storage storage = StorageOptions.getDefaultInstance().getService();
      Blob blob = storage.get(matches.group(1), String.format("%s.000000000", matches.group(2)));

          String.format("Job finished successfully: %s", new String(blob.getContent())));

      // Delete the cluster.
      OperationFuture<Empty, ClusterOperationMetadata> deleteClusterAsyncRequest =
          clusterControllerClient.deleteClusterAsync(projectId, region, clusterName);
      System.out.println(String.format("Cluster \"%s\" successfully deleted.", clusterName));

    } catch (ExecutionException e) {
      System.err.println(String.format("quickstart: %s ", e.getMessage()));

  public static void main(String... args) throws IOException, InterruptedException {
    if (args.length != 4) {
          "Insufficient number of parameters provided. Please make sure a "
              + "PROJECT_ID, REGION, CLUSTER_NAME and JOB_FILE_PATH are provided, in this order.");

    String projectId = args[0]; // project-id of project to create the cluster in
    String region = args[1]; // region to create the cluster
    String clusterName = args[2]; // name of the cluster
    String jobFilePath = args[3]; // location in GCS of the PySpark job

    quickstart(projectId, region, clusterName, jobFilePath);


  1. 安装客户端库 如需了解详情,请参阅设置 Node.js 开发环境
  2. 设置身份验证
  3. 克隆并运行示例 GitHub 代码。
  4. 查看输出。代码会将作业驱动程序日志输出到 Cloud Storage 中的默认 Dataproc 暂存存储分区。您可以通过 Google Cloud 控制台查看作业驱动程序输出 在项目的 Dataproc 中 求职招聘 部分。点击“作业详情”页面上的作业 ID 以查看作业输出。

// This quickstart sample walks a user through creating a Dataproc
// cluster, submitting a PySpark job from Google Cloud Storage to the
// cluster, reading the output of the job and deleting the cluster, all
// using the Node.js client library.

'use strict';

function main(projectId, region, clusterName, jobFilePath) {
  const dataproc = require('@google-cloud/dataproc');
  const {Storage} = require('@google-cloud/storage');

  // Create a cluster client with the endpoint set to the desired cluster region
  const clusterClient = new dataproc.v1.ClusterControllerClient({
    apiEndpoint: `${region}-dataproc.googleapis.com`,
    projectId: projectId,

  // Create a job client with the endpoint set to the desired cluster region
  const jobClient = new dataproc.v1.JobControllerClient({
    apiEndpoint: `${region}-dataproc.googleapis.com`,
    projectId: projectId,

  async function quickstart() {
    // Create the cluster config
    const cluster = {
      projectId: projectId,
      region: region,
      cluster: {
        clusterName: clusterName,
        config: {
          masterConfig: {
            numInstances: 1,
            machineTypeUri: 'n1-standard-2',
          workerConfig: {
            numInstances: 2,
            machineTypeUri: 'n1-standard-2',

    // Create the cluster
    const [operation] = await clusterClient.createCluster(cluster);
    const [response] = await operation.promise();

    // Output a success message
    console.log(`Cluster created successfully: ${response.clusterName}`);

    const job = {
      projectId: projectId,
      region: region,
      job: {
        placement: {
          clusterName: clusterName,
        pysparkJob: {
          mainPythonFileUri: jobFilePath,

    const [jobOperation] = await jobClient.submitJobAsOperation(job);
    const [jobResponse] = await jobOperation.promise();

    const matches =

    const storage = new Storage();

    const output = await storage

    // Output a success message.
    console.log(`Job finished successfully: ${output}`);

    // Delete the cluster once the job has terminated.
    const deleteClusterReq = {
      projectId: projectId,
      region: region,
      clusterName: clusterName,

    const [deleteOperation] =
      await clusterClient.deleteCluster(deleteClusterReq);
    await deleteOperation.promise();

    // Output a success message
    console.log(`Cluster ${clusterName} successfully deleted.`);


const args = process.argv.slice(2);

if (args.length !== 4) {
    'Insufficient number of parameters provided. Please make sure a ' +
      'PROJECT_ID, REGION, CLUSTER_NAME and JOB_FILE_PATH are provided, in this order.'



  1. 安装客户端库 如需了解详情,请参阅设置 Python 开发环境
  2. 设置身份验证
  3. 克隆并运行示例 GitHub 代码。
  4. 查看输出。代码会将作业驱动程序日志输出到 Cloud Storage 中的默认 Dataproc 暂存存储分区。您可以通过 Google Cloud 控制台查看作业驱动程序输出 在项目的 Dataproc 中 求职招聘 部分。点击“作业详情”页面上的作业 ID 以查看作业输出。

This quickstart sample walks a user through creating a Cloud Dataproc
cluster, submitting a PySpark job from Google Cloud Storage to the
cluster, reading the output of the job and deleting the cluster, all
using the Python client library.

    python quickstart.py --project_id <PROJECT_ID> --region <REGION> \
        --cluster_name <CLUSTER_NAME> --job_file_path <GCS_JOB_FILE_PATH>

import argparse
import re

from google.cloud import dataproc_v1 as dataproc
from google.cloud import storage

def quickstart(project_id, region, cluster_name, job_file_path):
    # Create the cluster client.
    cluster_client = dataproc.ClusterControllerClient(
        client_options={"api_endpoint": "{}-dataproc.googleapis.com:443".format(region)}

    # Create the cluster config.
    cluster = {
        "project_id": project_id,
        "cluster_name": cluster_name,
        "config": {
            "master_config": {
                "num_instances": 1,
                "machine_type_uri": "n1-standard-2",
                "disk_config": {"boot_disk_size_gb": 100},
            "worker_config": {
                "num_instances": 2,
                "machine_type_uri": "n1-standard-2",
                "disk_config": {"boot_disk_size_gb": 100},

    # Create the cluster.
    operation = cluster_client.create_cluster(
        request={"project_id": project_id, "region": region, "cluster": cluster}
    result = operation.result()

    print("Cluster created successfully: {}".format(result.cluster_name))

    # Create the job client.
    job_client = dataproc.JobControllerClient(
        client_options={"api_endpoint": "{}-dataproc.googleapis.com:443".format(region)}

    # Create the job config.
    job = {
        "placement": {"cluster_name": cluster_name},
        "pyspark_job": {"main_python_file_uri": job_file_path},

    operation = job_client.submit_job_as_operation(
        request={"project_id": project_id, "region": region, "job": job}
    response = operation.result()

    # Dataproc job output gets saved to the Google Cloud Storage bucket
    # allocated to the job. Use a regex to obtain the bucket and blob info.
    matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri)

    output = (

    print(f"Job finished successfully: {output}")

    # Delete the cluster once the job has terminated.
    operation = cluster_client.delete_cluster(
            "project_id": project_id,
            "region": region,
            "cluster_name": cluster_name,

    print("Cluster {} successfully deleted.".format(cluster_name))

if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        help="Project to use for creating resources.",
        help="Region where the resources should live.",
        help="Name to use for creating a cluster.",
        help="Job in GCS to execute against the cluster.",

    args = parser.parse_args()
    quickstart(args.project_id, args.region, args.cluster_name, args.job_file_path)


