Utilizar las bibliotecas de cliente de Cloud para Python

Este tutorial incluye un recorrido por Cloud Shell que usa las bibliotecas de cliente de Google Cloud para Python para llamar de forma programática a las APIs gRPC de Dataproc para crear un clúster y enviar un trabajo a ese clúster.

En las siguientes secciones se explica el funcionamiento del código de la guía que se encuentra en el repositorio GoogleCloudPlatform/python-dataproc de GitHub.

Ejecutar el tutorial de Cloud Shell

Haz clic en Abrir en Cloud Shell para ejecutar la guía.

Abrir en Cloud Shell

Interpretar el código

credenciales de aplicación predeterminadas

La guía de Cloud Shell de este tutorial proporciona autenticación mediante las credenciales de tu proyecto Google Cloud . Cuando ejecutas código de forma local, lo más recomendable es usar credenciales de cuenta de servicio para autenticar tu código.

Crear una agrupación Dataproc

Se han definido los siguientes valores para crear el clúster:

  • El proyecto en el que se creará el clúster
  • Región en la que se creará el clúster
  • Nombre del clúster
  • La configuración del clúster, que especifica un maestro y dos trabajadores principales

Se usan los ajustes de configuración predeterminados para el resto de los ajustes del clúster. Puedes anular la configuración predeterminada del clúster. Por ejemplo, puedes añadir VMs secundarias (el valor predeterminado es 0) o especificar una red de VPC que no sea la predeterminada para el clúster. Para obtener más información, consulta CreateCluster.

def quickstart(project_id, region, cluster_name, gcs_bucket, pyspark_file):
    # Create the cluster client.
    cluster_client = dataproc_v1.ClusterControllerClient(
        client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
    )

    # Create the cluster config.
    cluster = {
        "project_id": project_id,
        "cluster_name": cluster_name,
        "config": {
            "master_config": {"num_instances": 1, "machine_type_uri": "n1-standard-2"},
            "worker_config": {"num_instances": 2, "machine_type_uri": "n1-standard-2"},
        },
    }

    # Create the cluster.
    operation = cluster_client.create_cluster(
        request={"project_id": project_id, "region": region, "cluster": cluster}
    )
    result = operation.result()

    print(f"Cluster created successfully: {result.cluster_name}")

Enviar una tarea

Se definen los siguientes valores para enviar el trabajo:

  • El proyecto en el que se creará el clúster
  • Región en la que se creará el clúster
  • La configuración de la tarea, que especifica el nombre del clúster y la ruta de archivo (URI) de Cloud Storage de la tarea de PySpark

Consulta SubmitJob para obtener más información.

# Create the job client.
job_client = dataproc_v1.JobControllerClient(
    client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
)

# Create the job config.
job = {
    "placement": {"cluster_name": cluster_name},
    "pyspark_job": {"main_python_file_uri": f"gs://{gcs_bucket}/{spark_filename}"},
}

operation = job_client.submit_job_as_operation(
    request={"project_id": project_id, "region": region, "job": job}
)
response = operation.result()

# Dataproc job output is saved to the Cloud Storage bucket
# allocated to the job. Use regex to obtain the bucket and blob info.
matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri)

output = (
    storage.Client()
    .get_bucket(matches.group(1))
    .blob(f"{matches.group(2)}.000000000")
    .download_as_bytes()
    .decode("utf-8")
)

print(f"Job finished successfully: {output}\r\n")

Elimina el clúster

Se han definido los siguientes valores para eliminar el clúster:

  • El proyecto en el que se creará el clúster
  • Región en la que se creará el clúster
  • Nombre del clúster

Para obtener más información, consulta la documentación de DeleteCluster.

# Delete the cluster once the job has terminated.
operation = cluster_client.delete_cluster(
    request={
        "project_id": project_id,
        "region": region,
        "cluster_name": cluster_name,
    }
)
operation.result()

print(f"Cluster {cluster_name} successfully deleted.")