Usar a biblioteca de cliente do Python

Este tutorial inclui um walkthrough do Cloud Shell que usa as bibliotecas de cliente do Google Cloud para Python para chamar programaticamente APIs gRPC do Dataproc para criar um cluster e enviar um job ao cluster.

As seções a seguir explicam o funcionamento do código walkthrough contido no repositório GoogleCloudPlatform/python-docs-samples/dataproc do GitHub.

Executar o walkthrough do Cloud Shell

Clique em Abrir no Cloud Shell para executar o tutorial.

Abrir no Cloud Shell

entenda o código

Application Default Credentials

O tutorial do Cloud Shell neste tutorial fornece autenticação usando suas credenciais do projeto Google Cloud . Ao executar o código localmente, a prática recomendada é usar credenciais da conta de serviço para autenticar seu código.

Criar um cluster do Dataproc

Os valores a seguir são definidos para criar o cluster:

  • o projeto em que o cluster será criado
  • A região em que o cluster será criado
  • o nome do cluster
  • A configuração do cluster, que especifica um mestre e dois workers principais

As configurações padrão são usadas para as configurações restantes do cluster. É possível substituir as configurações padrão de configuração do cluster. Por exemplo, é possível adicionar VMs secundárias (padrão = 0) ou especificar uma rede VPC não padrão para o cluster. Para mais informações, consulte CreateCluster.

def quickstart(project_id, region, cluster_name, gcs_bucket, pyspark_file):
    # Create the cluster client.
    cluster_client = dataproc_v1.ClusterControllerClient(
        client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
    )

    # Create the cluster config.
    cluster = {
        "project_id": project_id,
        "cluster_name": cluster_name,
        "config": {
            "master_config": {"num_instances": 1, "machine_type_uri": "n1-standard-2"},
            "worker_config": {"num_instances": 2, "machine_type_uri": "n1-standard-2"},
        },
    }

    # Create the cluster.
    operation = cluster_client.create_cluster(
        request={"project_id": project_id, "region": region, "cluster": cluster}
    )
    result = operation.result()

    print(f"Cluster created successfully: {result.cluster_name}")

Envie um job

Os valores a seguir são definidos para enviar o job:

  • o projeto em que o cluster será criado
  • A região em que o cluster será criado
  • A configuração do job, que especifica o nome do cluster e o caminho do Cloud Storage (URI) do job do PySpark

Consulte SubmitJob para mais informações.

# Create the job client.
job_client = dataproc_v1.JobControllerClient(
    client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
)

# Create the job config.
job = {
    "placement": {"cluster_name": cluster_name},
    "pyspark_job": {"main_python_file_uri": f"gs://{gcs_bucket}/{spark_filename}"},
}

operation = job_client.submit_job_as_operation(
    request={"project_id": project_id, "region": region, "job": job}
)
response = operation.result()

# Dataproc job output is saved to the Cloud Storage bucket
# allocated to the job. Use regex to obtain the bucket and blob info.
matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri)

output = (
    storage.Client()
    .get_bucket(matches.group(1))
    .blob(f"{matches.group(2)}.000000000")
    .download_as_bytes()
    .decode("utf-8")
)

print(f"Job finished successfully: {output}\r\n")

Excluir o cluster

Os valores a seguir são definidos para excluir o cluster:

  • o projeto em que o cluster será criado
  • A região em que o cluster será criado
  • o nome do cluster

Para mais informações, consulte DeleteCluster.

# Delete the cluster once the job has terminated.
operation = cluster_client.delete_cluster(
    request={
        "project_id": project_id,
        "region": region,
        "cluster_name": cluster_name,
    }
)
operation.result()

print(f"Cluster {cluster_name} successfully deleted.")