Using the Python Client Library

This tutorial provides several examples for using the google-api-python-client library to programmatically interact with Google Cloud Dataproc. This client library exchanges data with the Cloud Dataproc REST interface. This allows you to create Python scripts and applications that can manage Cloud Dataproc clusters and jobs. This tutorial will also utilize the Google Cloud SDK to simplify the process of authentication and configuration.

As you follow these steps, you'll learn how to use the google-python-client library to:

  • perform OAuth 2.0 authorization using the oauth2client library and the Google Cloud SDK
  • list all Cloud Dataproc clusters in a project
  • create a new Cloud Dataproc cluster
  • create a new PySpark job on a Cloud Dataproc cluster
  • check a job's status to determine if the job has finished
  • delete a Cloud Dataproc cluster

First steps

If you haven't already done so, create a Google Cloud Platform project.

Set up your project

  1. Sign in to your Google account.

    If you don't already have one, sign up for a new account.

  2. Select or create a Cloud Platform project.

    Go to the Manage resources page

  3. Enable billing for your project.

    Enable billing

  4. Enable the Cloud Dataproc and Google Compute Engine APIs.

    Enable the APIs

Install the necessary libraries

The google-api-python-client library contains the core Python library for accessing Google APIs, and also contains the OAuth 2.0 client library. This sample also uploads a file to Cloud Storage and downloads the output from Cloud Storage. To interact with Cloud Storage, you should install the gcloud-python library. In the sample code, both of these can be installed together using the requirements.txt.

$ pip install -r requirements.txt

Read up on JSON

The Cloud Dataproc API and this tutorial make extensive use of JSON. JSON objects are used as containers for both request and response data. You may want to familiarize yourself with JSON if you have not used it before.

Understand Cloud Dataproc API regionalization

The Cloud Dataproc API incorporates regionalization. For this tutorial, you should use the global region. The global region allows you to easily create and manage Cloud Dataproc clusters and jobs in all zones.

Use the Python Client Library

Once you complete setup of the Cloud SDK and the necessary libraries, you are ready to use the Python Client Library to interact with Cloud Dataproc.

Connect and Initialize the API

This example uses Application Default Credentials, which allows you to re-use the gcloud command-line tool credentials (which are automatically created on App Engine or Compute Engine).

gcloud auth application-default login

To use Application Default Credentials, import GoogleCredentials, then create the API client.

def get_client():
    """Builds a client to the dataproc API."""
    dataproc ='dataproc', 'v1')
    return dataproc

List Cloud Dataproc clusters

Now that you have generated credentials and a Cloud Dataproc service object, you can start performing actions with Cloud Dataproc clusters and jobs. Let's start by listing all Cloud Dataproc clusters for a project using the projects.regions.clusters.list command:

def list_clusters(dataproc, project, region):
    result = dataproc.projects().regions().clusters().list(
    return result

Note the output of this command returns a JSON dataset with all current clusters. You can transverse the JSON response to print details about clusters, such as:

def list_clusters_with_details(dataproc, project, region):
    result = dataproc.projects().regions().clusters().list(
    cluster_list = result['clusters']
    for cluster in cluster_list:
        print("{} - {}"
              .format(cluster['clusterName'], cluster['status']['state']))
    return result

Create a new Cloud Dataproc cluster

You can create a new Cloud Dataproc cluster with the projects.regions.clusters.create operation.

At a minimum, you must specify four values when creating a new cluster:

  1. The project in which the cluster will be created
  2. The region to use
  3. The name of the cluster
  4. The zone in which the cluster will be created (required when using the global region). See Cloud Dataproc Auto Zone Placement for information about having Cloud Dataproc choose a zone for you when using a non-global region for your cluster.

You can specify many more details beyond these minimum requirements. For example, you can also specify the number of workers, whether preemptible compute should be used, and the network settings. Refer to the API documentation for more information.

def create_cluster(dataproc, project, zone, region, cluster_name):
    print('Creating cluster...')
    zone_uri = \
            project, zone)
    cluster_data = {
        'projectId': project,
        'clusterName': cluster_name,
        'config': {
            'gceClusterConfig': {
                'zoneUri': zone_uri
            'masterConfig': {
                'numInstances': 1,
                'machineTypeUri': 'n1-standard-1'
            'workerConfig': {
                'numInstances': 2,
                'machineTypeUri': 'n1-standard-1'
    result = dataproc.projects().regions().clusters().create(
    return result

Submit a PySpark job to a Cloud Dataproc cluster

You can submit a new Cloud Dataproc job to an existing cluster with the operation. When you submit a job, it will run asynchronously.

To submit a job to a cluster, you must specify the following minimum information:

  1. The name of the cluster to which the job will be submitted
  2. The region to use
  3. The type of job being submitted (such as Hadoop, Spark, Spark SQL, and so on)
  4. Job-specific details for the given type of job being submitted

In this example, you submit a PySpark Job to a cluster.

def submit_pyspark_job(dataproc, project, region,
                       cluster_name, bucket_name, filename):
    """Submits the Pyspark job to the cluster, assuming `filename` has
    already been uploaded to `bucket_name`"""
    job_details = {
        'projectId': project,
        'job': {
            'placement': {
                'clusterName': cluster_name
            'pysparkJob': {
                'mainPythonFileUri': 'gs://{}/{}'.format(bucket_name, filename)
    result = dataproc.projects().regions().jobs().submit(
    job_id = result['reference']['jobId']
    print('Submitted job ID {}'.format(job_id))
    return job_id

As an example, if you have the following PySpark code:

import pyspark

sc = pyspark.SparkContext()
rdd = sc.parallelize(['Hello,', 'world!', 'dog', 'elephant', 'panther'])
words = sorted(rdd.collect())

you can call the submit_pyspark_job method to run this PySpark script on a cluster in your project:

job_id = submit_pyspark_job(
    dataproc, project_id, region,
    cluster_name, bucket_name, spark_filename)

Since the job runs asynchronously, the job must finish before the output is displayed. You can check a job's status to determine if the job is finished. After the job finishes, you can call to get details about the job and then inspect the job output for further details.

Check a job's status

You can use the operation to get details about a previously submitted job. This makes it easy to determine the status and details of a job.

To get information about a job, you will need to specify the following minimum information:

  1. The project of the cluster which ran the job
  2. The region to use
  3. The job ID (UUID) of the job

The following snippet checks a job's status to determine when it is complete so that job output can be downloaded.

def wait_for_job(dataproc, project, region, job_id):
    print('Waiting for job to finish...')
    while True:
        result = dataproc.projects().regions().jobs().get(
        # Handle exceptions
        if result['status']['state'] == 'ERROR':
            raise Exception(result['status']['details'])
        elif result['status']['state'] == 'DONE':
            print('Job finished.')
            return result

Delete a Cloud Dataproc cluster

Finally, you can call projects.regions.clusters.delete operation to delete a cluster.

def delete_cluster(dataproc, project, region, cluster):
    print('Tearing down cluster')
    result = dataproc.projects().regions().clusters().delete(
    return result

Next steps

Once you complete this tutorial, we recommend you review these resources:

Send feedback about...

Google Cloud Dataproc Documentation