Utilizzo delle librerie client di Cloud per Python

Questo tutorial include una procedura dettagliata di Cloud Shell che utilizza le librerie client di Google Cloud per Python per chiamare a livello di programmazione le API gRPC di Dataproc per creare un cluster e inviare un job al cluster.

Le sezioni seguenti spiegano il funzionamento del codice della procedura dettagliata contenuto nel repository GitHub GoogleCloudPlatform/python-dataproc.

Esegui la procedura dettagliata di Cloud Shell

Fai clic su Apri in Cloud Shell per eseguire la procedura dettagliata.

Apri in Cloud Shell

comprendi il codice

Credenziali predefinite dell'applicazione

La procedura dettagliata di Cloud Shell in questo tutorial fornisce l'autenticazione utilizzando le credenziali del progetto Google Cloud . Quando esegui il codice localmente, la prassi consigliata è utilizzare le credenziali del service account per autenticare il codice.

Crea un cluster Dataproc

Per creare il cluster vengono impostati i seguenti valori:

  • Il progetto in cui verrà creato il cluster
  • La regione in cui verrà creato il cluster
  • Il nome del cluster
  • La configurazione del cluster, che specifica un master e due worker principali

Per le impostazioni del cluster rimanenti vengono utilizzate le impostazioni di configurazione predefinite. Puoi eseguire l'override delle impostazioni di configurazione predefinite del cluster. Ad esempio, puoi aggiungere VM secondarie (valore predefinito = 0) o specificare una rete VPC non predefinita per il cluster. Per saperne di più, vedi CreateCluster.

def quickstart(project_id, region, cluster_name, gcs_bucket, pyspark_file):
    # Create the cluster client.
    cluster_client = dataproc_v1.ClusterControllerClient(
        client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
    )

    # Create the cluster config.
    cluster = {
        "project_id": project_id,
        "cluster_name": cluster_name,
        "config": {
            "master_config": {"num_instances": 1, "machine_type_uri": "n1-standard-2"},
            "worker_config": {"num_instances": 2, "machine_type_uri": "n1-standard-2"},
        },
    }

    # Create the cluster.
    operation = cluster_client.create_cluster(
        request={"project_id": project_id, "region": region, "cluster": cluster}
    )
    result = operation.result()

    print(f"Cluster created successfully: {result.cluster_name}")

Invia un job

Per inviare il job sono impostati i seguenti valori:

  • Il progetto in cui verrà creato il cluster
  • La regione in cui verrà creato il cluster
  • La configurazione del job, che specifica il nome del cluster e il percorso del file (URI) Cloud Storage del job PySpark

Per saperne di più, consulta SubmitJob.

# Create the job client.
job_client = dataproc_v1.JobControllerClient(
    client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
)

# Create the job config.
job = {
    "placement": {"cluster_name": cluster_name},
    "pyspark_job": {"main_python_file_uri": f"gs://{gcs_bucket}/{spark_filename}"},
}

operation = job_client.submit_job_as_operation(
    request={"project_id": project_id, "region": region, "job": job}
)
response = operation.result()

# Dataproc job output is saved to the Cloud Storage bucket
# allocated to the job. Use regex to obtain the bucket and blob info.
matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri)

output = (
    storage.Client()
    .get_bucket(matches.group(1))
    .blob(f"{matches.group(2)}.000000000")
    .download_as_bytes()
    .decode("utf-8")
)

print(f"Job finished successfully: {output}\r\n")

Elimina il cluster

I seguenti valori sono impostati per eliminare il cluster:

  • Il progetto in cui verrà creato il cluster
  • La regione in cui verrà creato il cluster
  • Il nome del cluster

Per saperne di più, consulta la sezione DeleteCluster.

# Delete the cluster once the job has terminated.
operation = cluster_client.delete_cluster(
    request={
        "project_id": project_id,
        "region": region,
        "cluster_name": cluster_name,
    }
)
operation.result()

print(f"Cluster {cluster_name} successfully deleted.")