Usa las bibliotecas cliente de Cloud para Python

En este instructivo, se incluye una explicación de Cloud Shell que usa las bibliotecas cliente de Google Cloud para Python para llamar a las API de gRPC de Dataproc de manera programática a fin de crear un clúster y enviar un trabajo a él.

En las siguientes secciones, se explica el funcionamiento del código de descripción que se incluye en el repositorio GoogleCloudPlatform/python-docs-samples/dataproc de GitHub.

Ejecuta la explicación de Cloud Shell

Haz clic en Open in Cloud Shell (Abrir en Google Cloud Shell) para ejecutar la explicación.

Abrir en Cloud Shell

Examina el código

Credencial predeterminada de la aplicación

La explicación de Cloud Shell de este instructivo proporciona autenticación mediante el uso de las credenciales de tu proyecto de Google Cloud. Cuando ejecutas el código de forma local, la práctica recomendada es usar las credenciales de la cuenta de servicio para autenticar tu código.

Crea un clúster de Dataproc

Se establecen los siguientes valores para crear el clúster:

  • El proyecto en el que se creará el clúster
  • La región en la que se creará el clúster
  • El nombre del clúster
  • La configuración del clúster, que especifica un trabajador principal y dos trabajadores primarios

Se usan los parámetros de configuración predeterminados para el resto de la configuración del clúster. Puedes anular los parámetros de configuración predeterminados del clúster. Por ejemplo, puedes agregar VMs secundarias (configuración predeterminada = 0) o especificar una red de VPC que no sea predeterminada para el clúster. Para obtener más información, consulta CreateCluster.

def quickstart(project_id, region, cluster_name, gcs_bucket, pyspark_file):
    # Create the cluster client.
    cluster_client = dataproc_v1.ClusterControllerClient(
        client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
    )

    # Create the cluster config.
    cluster = {
        "project_id": project_id,
        "cluster_name": cluster_name,
        "config": {
            "master_config": {"num_instances": 1, "machine_type_uri": "n1-standard-2"},
            "worker_config": {"num_instances": 2, "machine_type_uri": "n1-standard-2"},
        },
    }

    # Create the cluster.
    operation = cluster_client.create_cluster(
        request={"project_id": project_id, "region": region, "cluster": cluster}
    )
    result = operation.result()

    print(f"Cluster created successfully: {result.cluster_name}")

Envía un trabajo

Se establecen los siguientes valores para enviar el trabajo:

  • El proyecto en el que se creará el clúster
  • La región en la que se creará el clúster
  • La configuración del trabajo, que especifica el nombre del clúster y la ruta de acceso (URI) de Cloud Storage del trabajo de PySpark

Consulta SubmitJob para obtener más información.

# Create the job client.
job_client = dataproc_v1.JobControllerClient(
    client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
)

# Create the job config.
job = {
    "placement": {"cluster_name": cluster_name},
    "pyspark_job": {"main_python_file_uri": f"gs://{gcs_bucket}/{spark_filename}"},
}

operation = job_client.submit_job_as_operation(
    request={"project_id": project_id, "region": region, "job": job}
)
response = operation.result()

# Dataproc job output is saved to the Cloud Storage bucket
# allocated to the job. Use regex to obtain the bucket and blob info.
matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri)

output = (
    storage.Client()
    .get_bucket(matches.group(1))
    .blob(f"{matches.group(2)}.000000000")
    .download_as_bytes()
    .decode("utf-8")
)

print(f"Job finished successfully: {output}\r\n")

Borra el clúster

Se establecen los siguientes valores para borrar el clúster:

  • El proyecto en el que se creará el clúster
  • La región en la que se creará el clúster
  • El nombre del clúster

Para obtener más información, consulta DeleteCluster.

# Delete the cluster once the job has terminated.
operation = cluster_client.delete_cluster(
    request={
        "project_id": project_id,
        "region": region,
        "cluster_name": cluster_name,
    }
)
operation.result()

print(f"Cluster {cluster_name} successfully deleted.")