Use the Cloud Client Libraries for Python

This tutorial includes a Cloud Shell walkthrough that uses the Google Cloud client libraries for Python to programmatically call Dataproc gRPC APIs to create a cluster and submit a job to the cluster.

The following sections explain the operation of the walkthrough code contained in the GitHub GoogleCloudPlatform/python-dataproc repository.

Run the Cloud Shell walkthrough

Click Open in Google Cloud Shell to run the walkthrough.

Open this project in Cloud Shell

Understand the Python example code

Application Default Credentials

The Cloud Shell walkthrough in this tutorial provides authentication by using your Google Cloud project credentials. When you run code locally, the recommended practice is to use service account credentials to authenticate your code.

Create a Dataproc cluster

You can create a new Dataproc cluster with the CreateCluster API.

You must specify the following values when creating a cluster:

  1. The project in which the cluster will be created
  2. The name of the cluster
  3. The region to use. If you specify the global region (the tutorial code uses a --global_region flag to select the global region), you must also specify a zone (see zone_uri). If you specify a non-global region and leave the zone_uri field empty, Dataproc Auto Zone Placement will select a zone for your cluster.
from google.cloud import dataproc_v1 as dataproc


def create_cluster(project_id, region, cluster_name):
    """This sample walks a user through creating a Cloud Dataproc cluster
       using the Python client library.

       Args:
           project_id (string): Project to use for creating resources.
           region (string): Region where the resources should live.
           cluster_name (string): Name to use for creating a cluster.
    """

    # Create a client with the endpoint set to the desired cluster region.
    cluster_client = dataproc.ClusterControllerClient(
        client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
    )

    # Create the cluster config.
    cluster = {
        "project_id": project_id,
        "cluster_name": cluster_name,
        "config": {
            "master_config": {"num_instances": 1, "machine_type_uri": "n1-standard-2"},
            "worker_config": {"num_instances": 2, "machine_type_uri": "n1-standard-2"},
        },
    }

    # Create the cluster.
    operation = cluster_client.create_cluster(
        request={"project_id": project_id, "region": region, "cluster": cluster}
    )
    result = operation.result()

    # Output a success message.
    print(f"Cluster created successfully: {result.cluster_name}")

You can also override default cluster config settings. For example, you can specify the number of workers (default = 2), whether to use preemptible VMs (default = 0), and network settings (default = default network. See CreateClusterRequest for more information.

List Dataproc clusters

You can list clusters within a project by calling the ListClusters API. The output returns a JSON object that lists the clusters. You can transverse the JSON response to print cluster details.

def list_clusters_with_details(dataproc, project, region):
    """List the details of clusters in the region."""
    for cluster in dataproc.list_clusters(
        request={"project_id": project, "region": region}
    ):
        print(
            (
                "{} - {}".format(
                    cluster.cluster_name,
                    cluster.status.State.Name(cluster.status.state),
                )
            )
        )

Submit a job to a Dataproc cluster

You can submit a job to an existing cluster with the SubmitJob API. When you submit a job, it runs asynchronously.

To submit a job, you must specify the following information:

  1. The name of the cluster to which the job will be submitted
  2. The region to use
  3. The type of job being submitted (such as Hadoop, Spark, 'PySpark)
  4. Job details for the type of job being submitted (see SubmitJobRequest for more information).

The following code submits a Spark Job to a cluster.

import re

from google.cloud import dataproc_v1 as dataproc
from google.cloud import storage


def submit_job(project_id, region, cluster_name):
    # Create the job client.
    job_client = dataproc.JobControllerClient(client_options={
        'api_endpoint': '{}-dataproc.googleapis.com:443'.format(region)
    })

    # Create the job config. 'main_jar_file_uri' can also be a
    # Google Cloud Storage URL.
    job = {
        'placement': {
            'cluster_name': cluster_name
        },
        'spark_job': {
            'main_class': 'org.apache.spark.examples.SparkPi',
            'jar_file_uris': ['file:///usr/lib/spark/examples/jars/spark-examples.jar'],
            'args': ['1000']
        }
    }

    operation = job_client.submit_job_as_operation(
        request={"project_id": project_id, "region": region, "job": job}
    )
    response = operation.result()

    # Dataproc job output gets saved to the Google Cloud Storage bucket
    # allocated to the job. Use a regex to obtain the bucket and blob info.
    matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri)

    output = (
        storage.Client()
        .get_bucket(matches.group(1))
        .blob(f"{matches.group(2)}.000000000")
        .download_as_string()
    )

    print(f"Job finished successfully: {output}")

By default, the tutorial code runs the SparkPi example job included with Spark.

Delete a Dataproc cluster

Call the DeleteCluster API to delete a cluster.

def delete_cluster(dataproc, project, region, cluster):
    """Delete the cluster."""
    print("Tearing down cluster.")
    result = dataproc.delete_cluster(
        request={"project_id": project, "region": region, "cluster_name": cluster}
    )
    return result