Se usó la API de Cloud Translation para traducir esta página.
Switch to English

Usa las bibliotecas cliente de Cloud para Python

En este instructivo, se incluye una explicación de Cloud Shell que usa las bibliotecas cliente de Google Cloud para Python para llamar a las API de gRPC de Dataproc de manera programática a fin de crear un clúster y enviar un trabajo a él.

En las siguientes secciones, se explica el funcionamiento del código de explicación que se incluye en el repositorio GoogleCloudPlatform/python-dataproc de GitHub.

Ejecuta la explicación de Cloud Shell

Haz clic en Open in Google Cloud Shell (Abrir en Google Cloud Shell) para ejecutar la explicación.

Abre este proyecto en Cloud Shell

Comprende el código de ejemplo de Python

Credencial predeterminada de la aplicación

La explicación de Cloud Shell de este instructivo proporciona autenticación mediante el uso de las credenciales del proyecto de Google Cloud. Cuando ejecutas el código de forma local, la práctica recomendada es usar las credenciales de la cuenta de servicio para autenticar tu código.

Cree un clúster de Dataproc.

Puedes crear un clúster de Dataproc nuevo con la API de CreateCluster.

Debes especificar los siguientes valores cuando creas un clúster:

  1. El proyecto en el que se creará el clúster
  2. El nombre del clúster
  3. La región que se usará. Si especificas la región global (el código del instructivo usa una marca --global_region para seleccionar la región global), también debes especificar una zona (consulta zone_uri). Si especificas una región no global y dejas el campo zone_uri vacío, la ubicación de zona automática de Dataproc seleccionará una zona para tu clúster.
from google.cloud import dataproc_v1 as dataproc

def create_cluster(project_id, region, cluster_name):
    """This sample walks a user through creating a Cloud Dataproc cluster
       using the Python client library.

       Args:
           project_id (string): Project to use for creating resources.
           region (string): Region where the resources should live.
           cluster_name (string): Name to use for creating a cluster.
    """

    # Create a client with the endpoint set to the desired cluster region.
    cluster_client = dataproc.ClusterControllerClient(
        client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
    )

    # Create the cluster config.
    cluster = {
        "project_id": project_id,
        "cluster_name": cluster_name,
        "config": {
            "master_config": {"num_instances": 1, "machine_type_uri": "n1-standard-2"},
            "worker_config": {"num_instances": 2, "machine_type_uri": "n1-standard-2"},
        },
    }

    # Create the cluster.
    operation = cluster_client.create_cluster(
        request={"project_id": project_id, "region": region, "cluster": cluster}
    )
    result = operation.result()

    # Output a success message.
    print(f"Cluster created successfully: {result.cluster_name}")

También puedes anular los ajustes predeterminados de configuración del clúster. Por ejemplo, puedes especificar la cantidad de trabajadores (configuración predeterminada = 2), si deseas usar VM interrumpibles (configuración predeterminada = 0) y la configuración de red (configuración predeterminada = default network). Consulta CreateClusterRequest para obtener más información.

Enumera clústeres de Dataproc

Puedes enumerar los clústeres dentro de un proyecto; para ello, llama a la API de ListClusters. El resultado muestra un objeto JSON que enumera los clústeres. Puedes atravesar la respuesta JSON para imprimir los detalles del clúster.

def list_clusters_with_details(dataproc, project, region):
    """List the details of clusters in the region."""
    for cluster in dataproc.list_clusters(
        request={"project_id": project, "region": region}
    ):
        print(
            (
                "{} - {}".format(
                    cluster.cluster_name,
                    cluster.status.State.Name(cluster.status.state),
                )
            )
        )

Envía un trabajo a un clúster de Dataproc

Puedes enviar un trabajo a un clúster existente con la API de SubmitJob. Cuando envías un trabajo, este se ejecuta de manera asíncrona.

Para enviar un trabajo, debes especificar la siguiente información:

  1. El nombre del clúster al que se enviará el trabajo
  2. La región que se usará
  3. El tipo de trabajo que se enviará (como Hadoop, Spark, 'PySpark)
  4. Detalles del trabajo para el tipo de trabajo que se enviará (consulta SubmitJobRequest para obtener más información).

El siguiente código envía un trabajo de Spark a un clúster.

import re

from google.cloud import dataproc_v1 as dataproc
from google.cloud import storage

def submit_job(project_id, region, cluster_name):
    # Create the job client.
    job_client = dataproc.JobControllerClient(client_options={
        'api_endpoint': '{}-dataproc.googleapis.com:443'.format(region)
    })

    # Create the job config. 'main_jar_file_uri' can also be a
    # Google Cloud Storage URL.
    job = {
        'placement': {
            'cluster_name': cluster_name
        },
        'spark_job': {
            'main_class': 'org.apache.spark.examples.SparkPi',
            'jar_file_uris': ['file:///usr/lib/spark/examples/jars/spark-examples.jar'],
            'args': ['1000']
        }
    }

    operation = job_client.submit_job_as_operation(
        request={"project_id": project_id, "region": region, "job": job}
    )
    response = operation.result()

    # Dataproc job output gets saved to the Google Cloud Storage bucket
    # allocated to the job. Use a regex to obtain the bucket and blob info.
    matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri)

    output = (
        storage.Client()
        .get_bucket(matches.group(1))
        .blob(f"{matches.group(2)}.000000000")
        .download_as_string()
    )

    print(f"Job finished successfully: {output}")

De forma predeterminada, el código del instructivo ejecuta el trabajo de ejemplo SparkPi incluido en Spark.

Borra un clúster de Dataproc

Llama a la API de DeleteCluster para borrar un clúster.

def delete_cluster(dataproc, project, region, cluster):
    """Delete the cluster."""
    print("Tearing down cluster.")
    result = dataproc.delete_cluster(
        request={"project_id": project, "region": region, "cluster_name": cluster}
    )
    return result