Use the Python Client Library

This tutorial includes a Cloud Shell walkthrough that uses the google-api-python-client library to programmatically call Cloud Dataproc REST APIs to create a cluster and submit a job to the cluster.

The sections, below, explain the operation of the walkthrough and other Python example code contained in the GitHub GoogleCloudPlatform/python-docs-samples/dataproc repository.

Run the Cloud Shell walkthrough

Click Open in Cloud Shell to run the walkthrough.

Open this project in Cloud Shell

Understand Python example code

Application Default Credentials

The Cloud Shell walkthrough in this tutorial provides authentication by using your GCP project credentials. When you run code locally, the recommended practice is to use service account credentials to authenticate your code. The following example uses Application Default Credentials to check the local machine's GOOGLE_APPLICATION_CREDENTIALS environment variable for the location of the service account key file on the local machine. These credentials are passed to googleapiclient.discovery.build() in the get_client() function, which returns a client to the Cloud Dataproc API.

from googleapiclient.discovery import build
from oauth2client.client import GoogleCredentials

# -------------------------------
# Use Application Default Credentials to check the
# GOOGLE_APPLICATION_CREDENTIALS environment variable
# for the location of the service account key file.
# -------------------------------
credentials = GoogleCredentials.get_application_default()


def get_client():
  """Builds a client to the dataproc API."""
  dataproc = build('dataproc', 'v1', credentials=credentials)
  return dataproc

List Cloud Dataproc clusters

After generating application default credentials and creating a dataproc client object, you can list clusters within a project by calling the projects.regions.clusters.list API:

def list_clusters(dataproc, project, region):
    result = dataproc.projects().regions().clusters().list(
        projectId=project,
        region=region).execute()
    return result

The output returns a JSON object that lists the clusters. You can transverse the JSON response to print cluster details.

def list_clusters_with_details(dataproc, project, region):
    result = dataproc.projects().regions().clusters().list(
        projectId=project,
        region=region).execute()
    cluster_list = result['clusters']
    for cluster in cluster_list:
        print("{} - {}"
              .format(cluster['clusterName'], cluster['status']['state']))
    return result

Create a Cloud Dataproc cluster

You can create a new Cloud Dataproc cluster with the Cloud Dataproc clusters.create API.

You must specify the following values when creating a cluster:

  1. The project in which the cluster will be created
  2. The name of the cluster
  3. The region to use. If you specify the global region, you must also specify a zone. If you specify a non-global region and set zone="", [Cloud Dataproc Auto Zone Placement] will select a zone for your cluster.

You can also override default cluster settings. For example, you can specify the number of workers (default = 2), whether to use preemptible VMs (default = 0), and network settings (default = default network. See the clusters.create API for more information.

def create_cluster(dataproc, project, zone, region, cluster_name):
    print('Creating cluster...')
    zone_uri = \
        'https://www.googleapis.com/compute/v1/projects/{}/zones/{}'.format(
            project, zone)
    cluster_data = {
        'projectId': project,
        'clusterName': cluster_name,
        'config': {
            'gceClusterConfig': {
                'zoneUri': zone_uri
            },
            'masterConfig': {
                'numInstances': 1,
                'machineTypeUri': 'n1-standard-1'
            },
            'workerConfig': {
                'numInstances': 2,
                'machineTypeUri': 'n1-standard-1'
            }
        }
    }
    result = dataproc.projects().regions().clusters().create(
        projectId=project,
        region=region,
        body=cluster_data).execute()
    return result

Submit a job to a Cloud Dataproc cluster

You can submit a Cloud Dataproc job to an existing cluster with the projects.regions.jobs.submit API. When you submit a job, it runs asynchronously.

To submit a job, you must specify the following information:

  1. The name of the cluster to which the job will be submitted
  2. The region to use
  3. The type of job being submitted (such as Hadoop, Spark, Spark SQL)
  4. Job-specific details for the type of job being submitted

The following code submits a PySpark Job to a cluster.

def submit_pyspark_job(dataproc, project, region,
                       cluster_name, bucket_name, filename):
    """Submits the Pyspark job to the cluster, assuming `filename` has
    already been uploaded to `bucket_name`"""
    job_details = {
        'projectId': project,
        'job': {
            'placement': {
                'clusterName': cluster_name
            },
            'pysparkJob': {
                'mainPythonFileUri': 'gs://{}/{}'.format(bucket_name, filename)
            }
        }
    }
    result = dataproc.projects().regions().jobs().submit(
        projectId=project,
        region=region,
        body=job_details).execute()
    job_id = result['reference']['jobId']
    print('Submitted job ID {}'.format(job_id))
    return job_id

Here is a small PySpark job.

import pyspark

sc = pyspark.SparkContext()
rdd = sc.parallelize(['Hello,', 'world!', 'dog', 'elephant', 'panther'])
words = sorted(rdd.collect())
print(words)

To submit a PySpark job to a cluster:

job_id = submit_pyspark_job(
    dataproc, project_id, region,
    cluster_name, bucket_name, spark_filename)

Since the job runs asynchronously, the job must finish before the output is displayed. You can check a job's status to determine if the job is finished. After the job finishes, you can call projects.regions.jobs.get to get details about the job and then inspect the job output for further details.

Check a job's status

You can use the projects.regions.jobs.get API to get job details and status.

To get job information, you must specify the following information:

  1. The project of the cluster which ran the job
  2. The region to use
  3. The job ID (UUID)

The following code checks a job's status. If the job is complete, job output can be downloaded.

def wait_for_job(dataproc, project, region, job_id):
    print('Waiting for job to finish...')
    while True:
        result = dataproc.projects().regions().jobs().get(
            projectId=project,
            region=region,
            jobId=job_id).execute()
        # Handle exceptions
        if result['status']['state'] == 'ERROR':
            raise Exception(result['status']['details'])
        elif result['status']['state'] == 'DONE':
            print('Job finished.')
            return result

Delete a Cloud Dataproc cluster

Call the projects.regions.clusters.delete API to delete a cluster.

def delete_cluster(dataproc, project, region, cluster):
    print('Tearing down cluster')
    result = dataproc.projects().regions().clusters().delete(
        projectId=project,
        region=region,
        clusterName=cluster).execute()
    return result
Was this page helpful? Let us know how we did:

Send feedback about...

Cloud Dataproc Documentation
Need help? Visit our support page.