Use the Cloud Client Libraries for Python

This tutorial includes a Cloud Shell walkthrough that uses the Google Cloud client libraries for Python to programmatically call Dataproc gRPC APIs to create a cluster and submit a job to the cluster.

The following sections explain the operation of the walkthrough code contained in the GitHub GoogleCloudPlatform/python-docs-samples/dataproc repository.

Run the Cloud Shell walkthrough

Click Open in Google Cloud Shell to run the walkthrough.

Open this project in Cloud Shell

Understand the Python example code

Application Default Credentials

The Cloud Shell walkthrough in this tutorial provides authentication by using your Google Cloud project credentials. When you run code locally, the recommended practice is to use service account credentials to authenticate your code.

Get cluster and job clients

Two Google Cloud Dataproc API clients are needed to run the tutorial code: a ClusterControllerClient to call clusters gRPC APIs and a JobControllerClient to call jobs gRPC APIs. If the cluster to create and run jobs on is in the Dataproc global region, the code uses the default gRPC endpoint. If the cluster region is non-global, a regional gRPC endpoint is used.

if global_region:
    region = "global"
    # Use the default gRPC global endpoints.
    dataproc_cluster_client = dataproc_v1.ClusterControllerClient()
    dataproc_job_client = dataproc_v1.JobControllerClient()
    region = get_region_from_zone(zone)
    # Use a regional gRPC endpoint. See:
    client_transport = cluster_controller_grpc_transport.ClusterControllerGrpcTransport(
    job_transport = job_controller_grpc_transport.JobControllerGrpcTransport(
    dataproc_cluster_client = dataproc_v1.ClusterControllerClient(client_transport)
    dataproc_job_client = dataproc_v1.JobControllerClient(job_transport)

List Dataproc clusters

You can list clusters within a project by calling the ListClusters API. The output returns a JSON object that lists the clusters. You can transverse the JSON response to print cluster details.

def list_clusters_with_details(dataproc, project, region):
    """List the details of clusters in the region."""
    for cluster in dataproc.list_clusters(
        request={"project_id": project, "region": region}
                "{} - {}".format(

Create a Dataproc cluster

You can create a new Dataproc cluster with the CreateCluster API.

You must specify the following values when creating a cluster:

  1. The project in which the cluster will be created
  2. The name of the cluster
  3. The region to use. If you specify the global region (the tutorial code uses a --global_region flag to select the global region), you must also specify a zone (see zone_uri). If you specify a non-global region and leave the zone_uri field empty, Dataproc Auto Zone Placement will select a zone for your cluster.
def create_cluster(dataproc, project, zone, region, cluster_name):
    """Create the cluster."""
    print("Creating cluster...")
    zone_uri = "{}/zones/{}".format(
        project, zone
    cluster_data = {
        "project_id": project,
        "cluster_name": cluster_name,
        "config": {
            "gce_cluster_config": {"zone_uri": zone_uri},
            "master_config": {"num_instances": 1, "machine_type_uri": "n1-standard-1"},
            "worker_config": {"num_instances": 2, "machine_type_uri": "n1-standard-1"},

    cluster = dataproc.create_cluster(
        request={"project_id": project, "region": region, "cluster": cluster_data}
    global waiting_callback
    waiting_callback = True

You can also override default cluster config settings. For example, you can specify the number of workers (default = 2), whether to use preemptible VMs (default = 0), and network settings (default = default network. See CreateClusterRequest for more information.

Submit a job to a Dataproc cluster

You can submit a job to an existing cluster with the SubmitJob API. When you submit a job, it runs asynchronously.

To submit a job, you must specify the following information:

  1. The name of the cluster to which the job will be submitted
  2. The region to use
  3. The type of job being submitted (such as Hadoop, Spark, 'PySpark)
  4. Job details for the type of job being submitted (see SubmitJobRequest for more information).

The following code submits a PySpark Job to a cluster.

def submit_pyspark_job(dataproc, project, region, cluster_name, bucket_name, filename):
    """Submit the Pyspark job to the cluster (assumes `filename` was uploaded
    to `bucket_name."""
    job_details = {
        "placement": {"cluster_name": cluster_name},
        "pyspark_job": {
            "main_python_file_uri": "gs://{}/{}".format(bucket_name, filename)

    result = dataproc.submit_job(
        request={"project_id": project, "region": region, "job": job_details}
    job_id = result.reference.job_id
    print("Submitted job ID {}.".format(job_id))
    return job_id

By default, the tutorial code runs the following small PySpark job.

import pyspark

sc = pyspark.SparkContext()
rdd = sc.parallelize(["Hello,", "world!", "dog", "elephant", "panther"])
words = sorted(rdd.collect())

Since the job runs asynchronously, the job must finish before the output is displayed. You can call GetJob while the job is running to get JobStatus and job details after the job completes.

Get job status and details

Make a GetJobRequest with the following required information:

  1. The project of the cluster where the job was submitted
  2. The cluster region
  3. The job ID (UUID)

The following code checks a job's status, and returns job details when the job completes.

def wait_for_job(dataproc, project, region, job_id):
    """Wait for job to complete or error out."""
    print("Waiting for job to finish...")
    while True:
        job = dataproc.get_job(
            request={"project_id": project, "region": region, "job_id": job_id}
        # Handle exceptions
        if job.status.State.Name(job.status.state) == "ERROR":
            raise Exception(job.status.details)
        elif job.status.State.Name(job.status.state) == "DONE":
            print("Job finished.")
            return job

Delete a Dataproc cluster

Call the DeleteCluster API to delete a cluster.

def delete_cluster(dataproc, project, region, cluster):
    """Delete the cluster."""
    print("Tearing down cluster.")
    result = dataproc.delete_cluster(
        request={"project_id": project, "region": region, "cluster_name": cluster}
    return result