Utilizzo delle librerie client di Cloud per Python

Questo tutorial include una procedura dettagliata di Cloud Shell che utilizza le librerie client Google Cloud per Python per chiamare in modo programmatico API gRPC di Dataproc per creare un cluster e inviare un job al cluster.

Le seguenti sezioni spiegano il funzionamento del codice della procedura dettagliata contenuto nel repository GitHub GoogleCloudPlatform/python-dataproc.

Esegui la procedura dettagliata di Cloud Shell

Fai clic su Apri in Cloud Shell per eseguire la procedura dettagliata.

Apri in Cloud Shell

comprendi il codice

Credenziali predefinite dell'applicazione

La procedura dettagliata di Cloud Shell in questo tutorial fornisce l'autenticazione utilizzando le credenziali del progetto Google Cloud. Quando esegui il codice localmente, la best practice è utilizzare le credenziali dell'account di servizio per autenticare il codice.

Crea un cluster Dataproc

Per creare il cluster vengono impostati i seguenti valori:

  • Il progetto in cui verrà creato il cluster
  • La regione in cui verrà creato il cluster
  • Il nome del cluster
  • La configurazione del cluster, che specifica un master e due worker principali

Per le restanti impostazioni del cluster vengono utilizzate le impostazioni di configurazione predefinite. Puoi eseguire l'override delle impostazioni di configurazione del cluster predefinite. Ad esempio, puoi aggiungere VM secondarie (valore predefinito = 0) o specificare una rete VPC diversa da quella predefinita per il cluster. Per ulteriori informazioni, consulta CreateCluster.

def quickstart(project_id, region, cluster_name, gcs_bucket, pyspark_file):
    # Create the cluster client.
    cluster_client = dataproc_v1.ClusterControllerClient(
        client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
    )

    # Create the cluster config.
    cluster = {
        "project_id": project_id,
        "cluster_name": cluster_name,
        "config": {
            "master_config": {"num_instances": 1, "machine_type_uri": "n1-standard-2"},
            "worker_config": {"num_instances": 2, "machine_type_uri": "n1-standard-2"},
        },
    }

    # Create the cluster.
    operation = cluster_client.create_cluster(
        request={"project_id": project_id, "region": region, "cluster": cluster}
    )
    result = operation.result()

    print(f"Cluster created successfully: {result.cluster_name}")

Invia un job

Per l'invio del job sono impostati i seguenti valori:

  • Il progetto in cui verrà creato il cluster
  • La regione in cui verrà creato il cluster
  • La configurazione del job, che specifica il nome del cluster e il percorso (URI) Cloud Storage del job PySpark

Per ulteriori informazioni, consulta SubmitJob.

# Create the job client.
job_client = dataproc_v1.JobControllerClient(
    client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
)

# Create the job config.
job = {
    "placement": {"cluster_name": cluster_name},
    "pyspark_job": {"main_python_file_uri": f"gs://{gcs_bucket}/{spark_filename}"},
}

operation = job_client.submit_job_as_operation(
    request={"project_id": project_id, "region": region, "job": job}
)
response = operation.result()

# Dataproc job output is saved to the Cloud Storage bucket
# allocated to the job. Use regex to obtain the bucket and blob info.
matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri)

output = (
    storage.Client()
    .get_bucket(matches.group(1))
    .blob(f"{matches.group(2)}.000000000")
    .download_as_bytes()
    .decode("utf-8")
)

print(f"Job finished successfully: {output}\r\n")

Elimina il cluster

Per eliminare il cluster, sono impostati i seguenti valori:

  • Il progetto in cui verrà creato il cluster
  • La regione in cui verrà creato il cluster
  • Il nome del cluster

Per ulteriori informazioni, consulta DeleteCluster.

# Delete the cluster once the job has terminated.
operation = cluster_client.delete_cluster(
    request={
        "project_id": project_id,
        "region": region,
        "cluster_name": cluster_name,
    }
)
operation.result()

print(f"Cluster {cluster_name} successfully deleted.")