Utilizzo delle librerie client di Cloud per Python

Questo tutorial include una procedura dettagliata di Cloud Shell che utilizza le librerie client di Google Cloud per Python per chiamare in modo programmatico le API Dataproc gRPC di Dataproc, al fine di creare un cluster e inviare un job al cluster.

Le seguenti sezioni spiegano il funzionamento del codice della procedura dettagliata contenuto nel repository GitHub GoogleCloudPlatform/python-dataproc.

esegui la procedura dettagliata di Cloud Shell

Fai clic su Apri in Cloud Shell per eseguire la procedura dettagliata.

Apri in Cloud Shell

comprendi il codice

Credenziali predefinite dell'applicazione

La procedura dettagliata di Cloud Shell in questo tutorial fornisce l'autenticazione mediante l'utilizzo delle credenziali del tuo progetto Google Cloud. Quando esegui il codice in locale, la pratica consigliata è utilizzare le credenziali dell'account di servizio per autenticare il codice.

Crea un cluster Dataproc

I seguenti valori sono impostati per creare il cluster:

  • Il progetto in cui verrà creato il cluster
  • La regione in cui verrà creato il cluster
  • Il nome del cluster
  • La configurazione del cluster, che specifica un master e due worker principali,

Per le restanti impostazioni del cluster vengono utilizzate le impostazioni di configurazione predefinite. Puoi eseguire l'override delle impostazioni di configurazione del cluster predefinite. Ad esempio, puoi aggiungere VM secondarie (valore predefinito = 0) o specificare una rete VPC non predefinita per il cluster. Per ulteriori informazioni, consulta CreateCluster.

def quickstart(project_id, region, cluster_name, gcs_bucket, pyspark_file):
    # Create the cluster client.
    cluster_client = dataproc_v1.ClusterControllerClient(
        client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
    )

    # Create the cluster config.
    cluster = {
        "project_id": project_id,
        "cluster_name": cluster_name,
        "config": {
            "master_config": {"num_instances": 1, "machine_type_uri": "n1-standard-2"},
            "worker_config": {"num_instances": 2, "machine_type_uri": "n1-standard-2"},
        },
    }

    # Create the cluster.
    operation = cluster_client.create_cluster(
        request={"project_id": project_id, "region": region, "cluster": cluster}
    )
    result = operation.result()

    print(f"Cluster created successfully: {result.cluster_name}")

invia un job

Per inviare il job sono impostati i seguenti valori:

  • Il progetto in cui verrà creato il cluster
  • La regione in cui verrà creato il cluster
  • La configurazione del job, che specifica il nome del cluster e il percorso file (URI) Cloud Storage del job PySpark

Per ulteriori informazioni, consulta SubmitJob.

# Create the job client.
job_client = dataproc_v1.JobControllerClient(
    client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
)

# Create the job config.
job = {
    "placement": {"cluster_name": cluster_name},
    "pyspark_job": {"main_python_file_uri": f"gs://{gcs_bucket}/{spark_filename}"},
}

operation = job_client.submit_job_as_operation(
    request={"project_id": project_id, "region": region, "job": job}
)
response = operation.result()

# Dataproc job output is saved to the Cloud Storage bucket
# allocated to the job. Use regex to obtain the bucket and blob info.
matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri)

output = (
    storage.Client()
    .get_bucket(matches.group(1))
    .blob(f"{matches.group(2)}.000000000")
    .download_as_bytes()
    .decode("utf-8")
)

print(f"Job finished successfully: {output}\r\n")

Eliminare il cluster

I seguenti valori sono impostati per eliminare il cluster:

  • Il progetto in cui verrà creato il cluster
  • La regione in cui verrà creato il cluster
  • Il nome del cluster

Per ulteriori informazioni, consulta DeleteCluster.

# Delete the cluster once the job has terminated.
operation = cluster_client.delete_cluster(
    request={
        "project_id": project_id,
        "region": region,
        "cluster_name": cluster_name,
    }
)
operation.result()

print(f"Cluster {cluster_name} successfully deleted.")