Installer des bibliothèques clientes Google Cloud pour Python

Cette page inclut un tutoriel Cloud Shell qui se sert des bibliothèques clientes Google Cloud pour Python pour appeler les API gRPC Dataproc de manière automatisée, afin de créer un cluster et lui envoyer une tâche.

Les sections suivantes décrivent le fonctionnement du code du tutoriel contenu dans le dépôt GitHub GoogleCloudPlatform/python-dataproc.

Exécuter le tutoriel Cloud Shell

Cliquez sur Ouvrir dans Cloud Shell pour exécuter le tutoriel.

Ouvrir dans Cloud Shell

Comprendre le code

Identifiants par défaut de l'application

Le tutoriel Cloud Shell fournit une authentification à l'aide des identifiants de votre projet Google Cloud. Lorsque vous exécutez du code en local, il est recommandé de l'authentifier à l'aide des identifiants du compte de service.

Créer un cluster Dataproc

Les valeurs suivantes sont définies pour créer le cluster :

  • Le projet dans lequel le cluster sera créé
  • Région dans laquelle le cluster sera créé
  • Le nom du cluster
  • La configuration du cluster, qui spécifie un nœud maître et deux nœuds de calcul principaux

Les paramètres de configuration par défaut sont utilisés pour les autres paramètres du cluster. Vous pouvez remplacer les paramètres de configuration du cluster par défaut. Par exemple, vous pouvez ajouter des VM secondaires (par défaut = 0) ou spécifier un réseau VPC non par défaut pour le cluster. Pour en savoir plus, consultez CreateCluster.

def quickstart(project_id, region, cluster_name, gcs_bucket, pyspark_file):
    # Create the cluster client.
    cluster_client = dataproc_v1.ClusterControllerClient(
        client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
    )

    # Create the cluster config.
    cluster = {
        "project_id": project_id,
        "cluster_name": cluster_name,
        "config": {
            "master_config": {"num_instances": 1, "machine_type_uri": "n1-standard-2"},
            "worker_config": {"num_instances": 2, "machine_type_uri": "n1-standard-2"},
        },
    }

    # Create the cluster.
    operation = cluster_client.create_cluster(
        request={"project_id": project_id, "region": region, "cluster": cluster}
    )
    result = operation.result()

    print(f"Cluster created successfully: {result.cluster_name}")

Envoyer une tâche

Les valeurs suivantes sont définies pour envoyer la tâche :

  • Le projet dans lequel le cluster sera créé
  • Région dans laquelle le cluster sera créé
  • La configuration de la tâche, qui spécifie le nom du cluster et l'Cloud Storage chemin d'accès (URI) du job PySpark

Pour en savoir plus, consultez SubmitJob.

# Create the job client.
job_client = dataproc_v1.JobControllerClient(
    client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
)

# Create the job config.
job = {
    "placement": {"cluster_name": cluster_name},
    "pyspark_job": {"main_python_file_uri": f"gs://{gcs_bucket}/{spark_filename}"},
}

operation = job_client.submit_job_as_operation(
    request={"project_id": project_id, "region": region, "job": job}
)
response = operation.result()

# Dataproc job output is saved to the Cloud Storage bucket
# allocated to the job. Use regex to obtain the bucket and blob info.
matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri)

output = (
    storage.Client()
    .get_bucket(matches.group(1))
    .blob(f"{matches.group(2)}.000000000")
    .download_as_bytes()
    .decode("utf-8")
)

print(f"Job finished successfully: {output}\r\n")

Supprimer le cluster

Les valeurs suivantes sont définies pour supprimer le cluster:

  • Le projet dans lequel le cluster sera créé
  • Région dans laquelle le cluster sera créé
  • Le nom du cluster

Pour en savoir plus, consultez DeleteCluster.

# Delete the cluster once the job has terminated.
operation = cluster_client.delete_cluster(
    request={
        "project_id": project_id,
        "region": region,
        "cluster_name": cluster_name,
    }
)
operation.result()

print(f"Cluster {cluster_name} successfully deleted.")