Usa las bibliotecas cliente de Cloud para Python

En este instructivo, se incluye una explicación de Cloud Shell que usa las bibliotecas cliente de Google Cloud para Python para llamar a las API de gRPC de Dataproc de manera programática a fin de crear un clúster y enviar un trabajo a él.

En las siguientes secciones, se explica el funcionamiento del código de descripción que se incluye en el repositorio GoogleCloudPlatform/python-dataproc de GitHub.

Ejecuta la explicación de Cloud Shell

Haz clic en Open in Cloud Shell (Abrir en Google Cloud Shell) para ejecutar la explicación.

Abrir en Cloud Shell

Examina el código

Credencial predeterminada de la aplicación

La explicación de Cloud Shell de este instructivo proporciona autenticación mediante las credenciales de tu proyecto de Google Cloud. Cuando ejecutas el código de forma local, la práctica recomendada es usar las credenciales de la cuenta de servicio para autenticar tu código.

Crea un clúster de Dataproc

Se establecen los siguientes valores para crear el clúster:

  • El proyecto en el que se creará el clúster
  • La región en la que se creará el clúster
  • El nombre del clúster
  • La configuración del clúster, que especifica una instancia principal y dos trabajadores principales

La configuración predeterminada se usa para la configuración restante del clúster. Puedes anular la configuración predeterminada del clúster. Por ejemplo, puedes agregar VM secundarias (predeterminado = 0) o especificar una red de VPC no predeterminada para el clúster. Para obtener más información, consulta CreateCluster.

def quickstart(project_id, region, cluster_name, gcs_bucket, pyspark_file):
    # Create the cluster client.
    cluster_client = dataproc_v1.ClusterControllerClient(
        client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
    )

    # Create the cluster config.
    cluster = {
        "project_id": project_id,
        "cluster_name": cluster_name,
        "config": {
            "master_config": {"num_instances": 1, "machine_type_uri": "n1-standard-2"},
            "worker_config": {"num_instances": 2, "machine_type_uri": "n1-standard-2"},
        },
    }

    # Create the cluster.
    operation = cluster_client.create_cluster(
        request={"project_id": project_id, "region": region, "cluster": cluster}
    )
    result = operation.result()

    print(f"Cluster created successfully: {result.cluster_name}")

Envía un trabajo

Los siguientes valores se configuran para enviar el trabajo:

  • El proyecto en el que se creará el clúster
  • La región en la que se creará el clúster
  • La configuración del trabajo, que especifica el nombre del clúster y la ruta de archivo de Cloud Storage (URI) del trabajo de PySpark

Consulta SubmitJob para obtener más información.

# Create the job client.
job_client = dataproc_v1.JobControllerClient(
    client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
)

# Create the job config.
job = {
    "placement": {"cluster_name": cluster_name},
    "pyspark_job": {"main_python_file_uri": f"gs://{gcs_bucket}/{spark_filename}"},
}

operation = job_client.submit_job_as_operation(
    request={"project_id": project_id, "region": region, "job": job}
)
response = operation.result()

# Dataproc job output is saved to the Cloud Storage bucket
# allocated to the job. Use regex to obtain the bucket and blob info.
matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri)

output = (
    storage.Client()
    .get_bucket(matches.group(1))
    .blob(f"{matches.group(2)}.000000000")
    .download_as_bytes()
    .decode("utf-8")
)

print(f"Job finished successfully: {output}\r\n")

Borre el clúster

Los siguientes valores están configurados para borrar el clúster:

  • El proyecto en el que se creará el clúster
  • La región en la que se creará el clúster
  • El nombre del clúster

Para obtener más información, consulta DeleteCluster.

# Delete the cluster once the job has terminated.
operation = cluster_client.delete_cluster(
    request={
        "project_id": project_id,
        "region": region,
        "cluster_name": cluster_name,
    }
)
operation.result()

print(f"Cluster {cluster_name} successfully deleted.")