Use as bibliotecas cliente da Google Cloud para Python

Este tutorial inclui um passo a passo do Cloud Shell que usa as bibliotecas cliente do Google Cloud para Python para chamar programaticamente as APIs gRPC do Dataproc para criar um cluster e enviar uma tarefa para o cluster.

As secções seguintes explicam o funcionamento do código do passo a passo contido no repositório GoogleCloudPlatform/python-dataproc do GitHub.

Execute o tutorial do Cloud Shell

Clique em Abrir no Cloud Shell para executar o tutorial passo a passo.

Abrir no Cloud Shell

Compreenda o código

Credenciais padrão da aplicação

O passo a passo do Cloud Shell neste tutorial fornece a autenticação através das credenciais do seu projeto Google Cloud . Quando executa código localmente, a prática recomendada é usar credenciais da conta de serviço para autenticar o seu código.

Crie um cluster do Dataproc

Os seguintes valores são definidos para criar o cluster:

  • O projeto no qual o cluster vai ser criado
  • A região onde o cluster vai ser criado
  • O nome do cluster
  • A configuração do cluster, que especifica um mestre e dois trabalhadores primários

As predefinições de configuração são usadas para as restantes definições do cluster. Pode substituir as predefinições de configuração do cluster. Por exemplo, pode adicionar VMs secundárias (predefinição = 0) ou especificar uma rede VPC não predefinida para o cluster. Para mais informações, consulte o artigo CreateCluster.

def quickstart(project_id, region, cluster_name, gcs_bucket, pyspark_file):
    # Create the cluster client.
    cluster_client = dataproc_v1.ClusterControllerClient(
        client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
    )

    # Create the cluster config.
    cluster = {
        "project_id": project_id,
        "cluster_name": cluster_name,
        "config": {
            "master_config": {"num_instances": 1, "machine_type_uri": "n1-standard-2"},
            "worker_config": {"num_instances": 2, "machine_type_uri": "n1-standard-2"},
        },
    }

    # Create the cluster.
    operation = cluster_client.create_cluster(
        request={"project_id": project_id, "region": region, "cluster": cluster}
    )
    result = operation.result()

    print(f"Cluster created successfully: {result.cluster_name}")

Envie um trabalho

Os seguintes valores são definidos para enviar a tarefa:

  • O projeto no qual o cluster vai ser criado
  • A região onde o cluster vai ser criado
  • A configuração do trabalho, que especifica o nome do cluster e o caminho do ficheiro (URI) do Cloud Storage do trabalho do PySpark

Consulte SubmitJob para mais informações.

# Create the job client.
job_client = dataproc_v1.JobControllerClient(
    client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
)

# Create the job config.
job = {
    "placement": {"cluster_name": cluster_name},
    "pyspark_job": {"main_python_file_uri": f"gs://{gcs_bucket}/{spark_filename}"},
}

operation = job_client.submit_job_as_operation(
    request={"project_id": project_id, "region": region, "job": job}
)
response = operation.result()

# Dataproc job output is saved to the Cloud Storage bucket
# allocated to the job. Use regex to obtain the bucket and blob info.
matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri)

output = (
    storage.Client()
    .get_bucket(matches.group(1))
    .blob(f"{matches.group(2)}.000000000")
    .download_as_bytes()
    .decode("utf-8")
)

print(f"Job finished successfully: {output}\r\n")

Elimine o cluster

Os seguintes valores são definidos para eliminar o cluster:

  • O projeto no qual o cluster vai ser criado
  • A região onde o cluster vai ser criado
  • O nome do cluster

Para mais informações, consulte o artigo DeleteCluster.

# Delete the cluster once the job has terminated.
operation = cluster_client.delete_cluster(
    request={
        "project_id": project_id,
        "region": region,
        "cluster_name": cluster_name,
    }
)
operation.result()

print(f"Cluster {cluster_name} successfully deleted.")