Use the Cloud Client Libraries for Python

This tutorial includes a Cloud Shell walkthrough that uses the Google Cloud client libraries for Python to programmatically call Dataproc gRPC APIs to create a cluster and submit a job to the cluster.

The following sections explain the operation of the walkthrough code contained in the GitHub GoogleCloudPlatform/python-dataproc repository.

Run the Cloud Shell walkthrough

Click Open in Cloud Shell to run the walkthrough.

Open in Cloud Shell

Understand the code

Application Default Credentials

The Cloud Shell walkthrough in this tutorial provides authentication by using your Google Cloud project credentials. When you run code locally, the recommended practice is to use service account credentials to authenticate your code.

Create a Dataproc cluster

The following values are set to create the cluster:

  • The project in which the cluster will be created
  • The region where the cluster will be created
  • The name of the cluster
  • The cluster config, which specifies one master and two primary workers

Default config settings are used for the remaining cluster settings. You can override default cluster config settings. For example, you can add secondary VMs (default = 0) or specify a non-default VPC network for the cluster. For more information, see CreateCluster.

def quickstart(project_id, region, cluster_name, gcs_bucket, pyspark_file):
    # Create the cluster client.
    cluster_client = dataproc_v1.ClusterControllerClient(
        client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
    )

    # Create the cluster config.
    cluster = {
        "project_id": project_id,
        "cluster_name": cluster_name,
        "config": {
            "master_config": {"num_instances": 1, "machine_type_uri": "n1-standard-2"},
            "worker_config": {"num_instances": 2, "machine_type_uri": "n1-standard-2"},
        },
    }

    # Create the cluster.
    operation = cluster_client.create_cluster(
        request={"project_id": project_id, "region": region, "cluster": cluster}
    )
    result = operation.result()

    print(f"Cluster created successfully: {result.cluster_name}")

Submit a job

The following values are set to submit the job:

  • The project in which the cluster will be created
  • The region where the cluster will be created
  • The job config, which specifies the cluster name and the Cloud Storage filepath (URI) of the PySpark job

See SubmitJob for more information.

# Create the job client.
job_client = dataproc_v1.JobControllerClient(
    client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
)

# Create the job config.
job = {
    "placement": {"cluster_name": cluster_name},
    "pyspark_job": {"main_python_file_uri": f"gs://{gcs_bucket}/{spark_filename}"},
}

operation = job_client.submit_job_as_operation(
    request={"project_id": project_id, "region": region, "job": job}
)
response = operation.result()

# Dataproc job output is saved to the Cloud Storage bucket
# allocated to the job. Use regex to obtain the bucket and blob info.
matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri)

output = (
    storage.Client()
    .get_bucket(matches.group(1))
    .blob(f"{matches.group(2)}.000000000")
    .download_as_bytes()
    .decode("utf-8")
)

print(f"Job finished successfully: {output}\r\n")

Delete the cluster

The following values are set to delete the cluster:

  • The project in which the cluster will be created
  • The region where the cluster will be created
  • The name of the cluster

For more information, see the DeleteCluster.

# Delete the cluster once the job has terminated.
operation = cluster_client.delete_cluster(
    request={
        "project_id": project_id,
        "region": region,
        "cluster_name": cluster_name,
    }
)
operation.result()

print(f"Cluster {cluster_name} successfully deleted.")