Uso da biblioteca de cliente do Python

Neste tutorial, fornecemos vários exemplos de uso da biblioteca google-api-python-client para interagir de maneira programática com o Google Cloud Dataproc. Essa biblioteca de cliente troca dados com a interface REST do Cloud Dataproc. Isso permite criar scripts e aplicativos em Python capazes de gerenciar clusters e jobs do Cloud Dataproc. Este tutorial também vai empregar o Google Cloud SDK para simplificar o processo de autenticação e configuração.

Ao seguir estas etapas, você saberá como usar a biblioteca google-python-client para:

  • cumprir a autorização OAuth 2.0 usando a biblioteca oauth2client e o Google Cloud SDK;
  • listar todos os clusters do Cloud Dataproc de um projeto;
  • criar um novo cluster do Cloud Dataproc;
  • criar um novo job PySpark em um cluster do Cloud Dataproc;
  • verificar o status de um job para determinar se ele foi concluído;
  • excluir um cluster do Cloud Dataproc.

Primeiras etapas

Se ainda não tiver feito, crie um projeto do Google Cloud Platform.

Configurar o projeto

  1. Faça login na sua Conta do Google.

    Se você ainda não tiver uma, inscreva-se.

  2. Selecione ou crie um projeto do Google Cloud Platform.

    Acessar a página Gerenciar recursos

  3. Verifique se o faturamento foi ativado no projeto do Google Cloud Platform.

    Saiba como ativar o faturamento

  4. Ativar Cloud Dataproc e Google Compute Engine APIs.

    Ativar as APIs

Instalar as bibliotecas necessárias

A biblioteca google-api-python-client contém a biblioteca principal do Python para acessar as APIs do Google e contém a biblioteca de cliente OAuth 2.0. Este exemplo também faz upload de um arquivo para o Google Cloud Storage e o download do arquivo resultante do Cloud Storage. Para interagir com o Cloud Storage, você precisa instalar a biblioteca gcloud-python. No código de exemplo, é possível instalar os dois usando requirements.txt.

$ pip install -r requirements.txt

Estudar o JSON

A Cloud Dataproc API e este tutorial usam o JSON extensivamente. Os objetos JSON são usados como contêineres de dados de solicitação e resposta. Talvez convenha se familiarizar com o JSON se nunca tiver usado ele antes.

Entender a regionalização da Cloud Dataproc API

A Cloud Dataproc API incorpora a regionalização. Para os fins deste tutorial, use a região global. A região global facilita a criação e o gerenciamento de clusters e jobs do Cloud Dataproc em todas as zonas.

Usar a biblioteca de cliente do Python

Após concluir a configuração do Cloud SDK e das bibliotecas necessárias, você está pronto para usar a biblioteca de cliente do Python para interagir com o Cloud Dataproc.

Conectar e inicializar a API

Este exemplo usa o Application Default Credentials, o que permite reutilizar as credenciais da ferramenta de linha de comando gcloud criadas automaticamente no App Engine ou no Compute Engine.

gcloud auth application-default login

Para usar as Application Default Credentials, importe GoogleCredentials e crie o cliente de API.

def get_client():
    """Builds a client to the dataproc API."""
    dataproc = googleapiclient.discovery.build('dataproc', 'v1')
    return dataproc

Listar clusters do Cloud Dataproc

Agora que você gerou credenciais e um objeto de serviço do Cloud Dataproc, pode começar a executar ações com clusters e jobs do Cloud Dataproc. Vamos começar listando todos os clusters do Cloud Dataproc de um projeto usando o comando projects.regions.clusters.list:

def list_clusters(dataproc, project, region):
    result = dataproc.projects().regions().clusters().list(
        projectId=project,
        region=region).execute()
    return result

A saída desse comando retorna um conjunto de dados JSON com todos os clusters atuais. Você pode alterar a resposta JSON para imprimir detalhes sobre os clusters, por exemplo:

def list_clusters_with_details(dataproc, project, region):
    result = dataproc.projects().regions().clusters().list(
        projectId=project,
        region=region).execute()
    cluster_list = result['clusters']
    for cluster in cluster_list:
        print("{} - {}"
              .format(cluster['clusterName'], cluster['status']['state']))
    return result

Criar um novo cluster do Cloud Dataproc

Crie um novo cluster do Cloud Dataproc com a operação projects.regions.clusters.create.

No mínimo, você precisa especificar quatro valores ao criar um novo cluster:

  1. o projeto em que o cluster será criado
  2. a região que será usada
  3. o nome do cluster
  4. A zona em que o cluster será criado (necessária ao usar a região global). Consulte Colocação em zona automática do Cloud Dataproc para ver informações sobre como o Cloud Dataproc escolhe uma zona para você ao usar uma região não global do cluster.

É possível especificar muito mais detalhes além desses requisitos mínimos. Por exemplo, é possível especificar o número de trabalhadores, se a computação preemptiva será usada e as configurações da rede. Consulte a documentação da API para mais informações.

def create_cluster(dataproc, project, zone, region, cluster_name):
    print('Creating cluster...')
    zone_uri = \
        'https://www.googleapis.com/compute/v1/projects/{}/zones/{}'.format(
            project, zone)
    cluster_data = {
        'projectId': project,
        'clusterName': cluster_name,
        'config': {
            'gceClusterConfig': {
                'zoneUri': zone_uri
            },
            'masterConfig': {
                'numInstances': 1,
                'machineTypeUri': 'n1-standard-1'
            },
            'workerConfig': {
                'numInstances': 2,
                'machineTypeUri': 'n1-standard-1'
            }
        }
    }
    result = dataproc.projects().regions().clusters().create(
        projectId=project,
        region=region,
        body=cluster_data).execute()
    return result

Enviar um job PySpark a um cluster do Cloud Dataproc

Envie um novo job do Cloud Dataproc a um cluster existente com a operação projects.regions.jobs.submit. Quando você envia um job, ele é executado de modo assíncrono.

Para enviar um job a um cluster, é preciso especificar as informações mínimas a seguir:

  1. o nome do cluster ao qual será enviado o job
  2. a região que será usada
  3. o tipo do job que será enviado (por exemplo, Hadoop, Spark, Spark SQL etc.)
  4. detalhes específicos para o tipo de job que será enviado

Neste exemplo, você envia um job PySpark a um cluster.

def submit_pyspark_job(dataproc, project, region,
                       cluster_name, bucket_name, filename):
    """Submits the Pyspark job to the cluster, assuming `filename` has
    already been uploaded to `bucket_name`"""
    job_details = {
        'projectId': project,
        'job': {
            'placement': {
                'clusterName': cluster_name
            },
            'pysparkJob': {
                'mainPythonFileUri': 'gs://{}/{}'.format(bucket_name, filename)
            }
        }
    }
    result = dataproc.projects().regions().jobs().submit(
        projectId=project,
        region=region,
        body=job_details).execute()
    job_id = result['reference']['jobId']
    print('Submitted job ID {}'.format(job_id))
    return job_id

Por exemplo, se você tiver o seguinte código PySpark:

import pyspark

sc = pyspark.SparkContext()
rdd = sc.parallelize(['Hello,', 'world!', 'dog', 'elephant', 'panther'])
words = sorted(rdd.collect())
print(words)

chame o método submit_pyspark_job para executar este script PySpark em um cluster no projeto:

job_id = submit_pyspark_job(
    dataproc, project_id, region,
    cluster_name, bucket_name, spark_filename)

Devido à execução assíncrona, o job precisa terminar antes da exibição da saída. Verifique o status de um job para determinar se ele foi concluído. Após essa conclusão, é possível chamar projects.regions.jobs.get para ver detalhes sobre o job e, em seguida, inspecionar a saída dele para saber mais.

Verificar o status de um job

Você pode usar a operação projects.regions.jobs.get para ver detalhes sobre um job enviado anteriormente. Com isso, é fácil determinar o status e os detalhes de um job.

Para ver informações sobre um job, é preciso especificar as informações mínimas a seguir:

  1. o projeto do cluster que executou o job
  2. a região que será usada
  3. o código do job (UUID)

O fragmento a seguir verifica o status de um job para determinar quando ele está concluído, para que seja possível fazer o download da saída dele.

def wait_for_job(dataproc, project, region, job_id):
    print('Waiting for job to finish...')
    while True:
        result = dataproc.projects().regions().jobs().get(
            projectId=project,
            region=region,
            jobId=job_id).execute()
        # Handle exceptions
        if result['status']['state'] == 'ERROR':
            raise Exception(result['status']['details'])
        elif result['status']['state'] == 'DONE':
            print('Job finished.')
            return result

Excluir um cluster do Cloud Dataproc

Por fim, você pode chamar a operação projects.regions.clusters.delete para excluir um cluster.

def delete_cluster(dataproc, project, region, cluster):
    print('Tearing down cluster')
    result = dataproc.projects().regions().clusters().delete(
        projectId=project,
        region=region,
        clusterName=cluster).execute()
    return result

Próximas etapas

Depois que você concluir este tutorial, recomendamos analisar estes recursos:

Esta página foi útil? Conte sua opinião sobre:

Enviar comentários sobre…

Documentação do Cloud Dataproc
Precisa de ajuda? Acesse nossa página de suporte.