Usar a biblioteca de cliente do Python

Este tutorial inclui um walkthrough do Cloud Shell que usa as bibliotecas de cliente do Google Cloud para Python para chamar programaticamente APIs gRPC do Dataproc para criar um cluster e enviar um job ao cluster.

As seções a seguir explicam o funcionamento do código walkthrough contido no repositório GoogleCloudPlatform/python-dataproc do GitHub.

Executar o walkthrough do Cloud Shell

Clique em Abrir no Cloud Shell para executar o tutorial.

Abrir no Cloud Shell

Entenda o código

Application Default Credentials

O tutorial do Cloud Shell neste tutorial fornece autenticação usando suas credenciais do projeto do Google Cloud. Ao executar o código localmente, a prática recomendada é usar credenciais da conta de serviço para autenticar seu código.

Crie um cluster do Dataproc:

Os valores a seguir estão definidos para criar o cluster:

  • o projeto em que o cluster será criado
  • A região em que o cluster será criado
  • o nome do cluster
  • A configuração do cluster, que especifica um mestre e dois workers principais

As configurações padrão são usadas nas demais configurações do cluster. É possível modificar as configurações padrão do cluster. Por exemplo, é possível adicionar VMs secundárias (padrão = 0) ou especificar uma rede VPC não padrão para o cluster. Para mais informações, consulte CreateCluster.

def quickstart(project_id, region, cluster_name, gcs_bucket, pyspark_file):
    # Create the cluster client.
    cluster_client = dataproc_v1.ClusterControllerClient(
        client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
    )

    # Create the cluster config.
    cluster = {
        "project_id": project_id,
        "cluster_name": cluster_name,
        "config": {
            "master_config": {"num_instances": 1, "machine_type_uri": "n1-standard-2"},
            "worker_config": {"num_instances": 2, "machine_type_uri": "n1-standard-2"},
        },
    }

    # Create the cluster.
    operation = cluster_client.create_cluster(
        request={"project_id": project_id, "region": region, "cluster": cluster}
    )
    result = operation.result()

    print(f"Cluster created successfully: {result.cluster_name}")

Envie um job

Os valores a seguir estão definidos para enviar o job:

  • o projeto em que o cluster será criado
  • A região em que o cluster será criado
  • A configuração do job, que especifica o nome do cluster e o caminho de arquivo (URI) do Cloud Storage do job do PySpark

Consulte SubmitJob para mais informações.

# Create the job client.
job_client = dataproc_v1.JobControllerClient(
    client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
)

# Create the job config.
job = {
    "placement": {"cluster_name": cluster_name},
    "pyspark_job": {"main_python_file_uri": f"gs://{gcs_bucket}/{spark_filename}"},
}

operation = job_client.submit_job_as_operation(
    request={"project_id": project_id, "region": region, "job": job}
)
response = operation.result()

# Dataproc job output is saved to the Cloud Storage bucket
# allocated to the job. Use regex to obtain the bucket and blob info.
matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri)

output = (
    storage.Client()
    .get_bucket(matches.group(1))
    .blob(f"{matches.group(2)}.000000000")
    .download_as_bytes()
    .decode("utf-8")
)

print(f"Job finished successfully: {output}\r\n")

Exclua o cluster

Os valores a seguir estão definidos para excluir o cluster:

  • o projeto em que o cluster será criado
  • A região em que o cluster será criado
  • o nome do cluster

Para mais informações, consulte DeleteCluster.

# Delete the cluster once the job has terminated.
operation = cluster_client.delete_cluster(
    request={
        "project_id": project_id,
        "region": region,
        "cluster_name": cluster_name,
    }
)
operation.result()

print(f"Cluster {cluster_name} successfully deleted.")