Usar a biblioteca de cliente do Cloud para Python

Este tutorial inclui um walkthrough do Cloud Shell que usa as bibliotecas de cliente do Google Cloud para Python para chamar programaticamente APIs gRPC do Dataproc para criar um cluster e enviar um job ao cluster.

As seções a seguir explicam o funcionamento do código walkthrough contido no repositório GoogleCloudPlatform/python-docs-samples/dataproc do GitHub.

Executar o walkthrough do Cloud Shell

Clique em Abrir no Google Cloud Shell para executar o walkthrough.

Abrir este projeto no Cloud Shell

Noções básicas do código de exemplo do Python

Application Default Credentials

O walkthrough do Cloud Shell neste tutorial fornece autenticação usando suas credenciais do projeto do Google Cloud. Ao executar o código localmente, a prática recomendada é usar credenciais da conta de serviço para autenticar seu código.

Receber clientes de cluster e job

São necessários dois clientes da API do Dataproc do Google Cloud para executar o código do tutorial: um ClusterControllerClient para chamar APIs de clusters do gRPC e um JobControllerClient para chamar APIs de jobs do gRPC. Se o cluster onde se criam e executam os jobs estiver na região global do Dataproc, o código usará o endpoint padrão do gRPC. Se a região do cluster não for global, será usado um endpoint regional do gRPC.

if global_region:
    region = 'global'
    # Use the default gRPC global endpoints.
    dataproc_cluster_client = dataproc_v1.ClusterControllerClient()
    dataproc_job_client = dataproc_v1.JobControllerClient()
else:
    region = get_region_from_zone(zone)
    # Use a regional gRPC endpoint. See:
    # https://cloud.google.com/dataproc/docs/concepts/regional-endpoints
    client_transport = (
        cluster_controller_grpc_transport.ClusterControllerGrpcTransport(
            address='{}-dataproc.googleapis.com:443'.format(region)))
    job_transport = (
        job_controller_grpc_transport.JobControllerGrpcTransport(
            address='{}-dataproc.googleapis.com:443'.format(region)))
    dataproc_cluster_client = dataproc_v1.ClusterControllerClient(
        client_transport)
    dataproc_job_client = dataproc_v1.JobControllerClient(job_transport)

Listar clusters do Dataproc

Você pode listar os clusters dentro de um projeto chamando a API ListClusters. A saída retorna um objeto JSON que lista os clusters. É possivel atravessar a resposta JSON para imprimir os detalhes do cluster.

def list_clusters_with_details(dataproc, project, region):
    """List the details of clusters in the region."""
    for cluster in dataproc.list_clusters(project, region):
        print(('{} - {}'.format(cluster.cluster_name,
                                cluster.status.State.Name(
                                    cluster.status.state))))

Criar um cluster do Dataproc

Você pode criar um novo cluster do Dataproc com a API CreateCluster.

Especifique os seguintes valores ao criar um cluster:

  1. o projeto em que o cluster será criado
  2. o nome do cluster
  3. a região que será usada Se especificar a região global (o código do tutorial usa uma sinalização --global_region para selecionar a região global), você também deverá especificar uma zona (zone_uri). Se especificar uma região não global e deixar o campo zone_uri vazio, a colocação em zona automática do Dataproc selecionará uma zona para o cluster.
def create_cluster(dataproc, project, zone, region, cluster_name):
    """Create the cluster."""
    print('Creating cluster...')
    zone_uri = \
        'https://www.googleapis.com/compute/v1/projects/{}/zones/{}'.format(
            project, zone)
    cluster_data = {
        'project_id': project,
        'cluster_name': cluster_name,
        'config': {
            'gce_cluster_config': {
                'zone_uri': zone_uri
            },
            'master_config': {
                'num_instances': 1,
                'machine_type_uri': 'n1-standard-1'
            },
            'worker_config': {
                'num_instances': 2,
                'machine_type_uri': 'n1-standard-1'
            }
        }
    }

    cluster = dataproc.create_cluster(project, region, cluster_data)
    cluster.add_done_callback(callback)
    global waiting_callback
    waiting_callback = True

Também é possível substituir as configurações de cluster padrão. Por exemplo, você pode especificar o número de workers (padrão = 2), se usará VMs preemptivas (padrão = 0) e configurações de rede (padrão = default network). Consulte CreateClusterRequest para mais informações.

Enviar um job para um cluster do Dataproc

Você pode enviar um job para um cluster existente com a API SubmitJob. Ao enviar um job, ele é executado de forma assíncrona.

Para enviar um job, especifique as seguintes informações:

  1. o nome do cluster ao qual será enviado o job
  2. a região que será usada
  3. o tipo de job que está sendo enviado (como Hadoop, Spark, 'PySpark)
  4. Detalhes do job para o tipo de job que está sendo enviado (consulte SubmitJobRequest para mais informações).

O código a seguir envia um job PySpark para um cluster.

def submit_pyspark_job(dataproc, project, region, cluster_name, bucket_name,
                       filename):
    """Submit the Pyspark job to the cluster (assumes `filename` was uploaded
    to `bucket_name."""
    job_details = {
        'placement': {
            'cluster_name': cluster_name
        },
        'pyspark_job': {
            'main_python_file_uri': 'gs://{}/{}'.format(bucket_name, filename)
        }
    }

    result = dataproc.submit_job(
        project_id=project, region=region, job=job_details)
    job_id = result.reference.job_id
    print('Submitted job ID {}.'.format(job_id))
    return job_id

Por padrão, o código do tutorial executa o seguinte job pequeno PySpark.

import pyspark

sc = pyspark.SparkContext()
rdd = sc.parallelize(['Hello,', 'world!', 'dog', 'elephant', 'panther'])
words = sorted(rdd.collect())
print(words)

Devido à execução assíncrona, o job precisa terminar antes da exibição da saída. Você pode chamar GetJob enquanto o job estiver em execução para acessar o JobStatus e detalhes do job após sua conclusão.

Receber status e detalhes do job

Faça um GetJobRequest com as seguintes informações obrigatórias:

  1. o projeto do cluster em que o job foi enviado
  2. a região do cluster
  3. o código do job (UUID)

O código a seguir verifica o status de um job e retorna os detalhes do job quando ele for concluído.

def wait_for_job(dataproc, project, region, job_id):
    """Wait for job to complete or error out."""
    print('Waiting for job to finish...')
    while True:
        job = dataproc.get_job(project, region, job_id)
        # Handle exceptions
        if job.status.State.Name(job.status.state) == 'ERROR':
            raise Exception(job.status.details)
        elif job.status.State.Name(job.status.state) == 'DONE':
            print('Job finished.')
            return job

Excluir um cluster do Dataproc

Chame a API DeleteCluster para excluir um cluster.

def delete_cluster(dataproc, project, region, cluster):
    """Delete the cluster."""
    print('Tearing down cluster.')
    result = dataproc.delete_cluster(
        project_id=project, region=region, cluster_name=cluster)
    return result