Installer des bibliothèques clientes Google Cloud pour Python

Cette page inclut un tutoriel Cloud Shell qui se sert des bibliothèques clientes Google Cloud pour Python pour appeler les API gRPC Dataproc de manière automatisée, afin de créer un cluster et lui envoyer une tâche.

Les sections suivantes décrivent le fonctionnement du code du tutoriel contenu dans le dépôt GitHub GoogleCloudPlatform/python-dataproc.

Exécuter le tutoriel Cloud Shell

Cliquez sur Ouvrir dans Cloud Shell pour exécuter le tutoriel.

Ouvrir dans Cloud Shell

Comprendre le code

Identifiants par défaut de l'application

Le tutoriel Cloud Shell de ce tutoriel fournit une authentification à l'aide des identifiants de votre projet Google Cloud. Lorsque vous exécutez du code en local, il est recommandé de l'authentifier à l'aide des identifiants du compte de service.

Créer un cluster Dataproc

Les valeurs suivantes sont définies pour créer le cluster:

  • Le projet dans lequel le cluster sera créé
  • Région dans laquelle le cluster sera créé
  • Le nom du cluster
  • La configuration du cluster, qui spécifie un nœud de calcul maître et deux nœuds de calcul principaux

Les paramètres de configuration par défaut sont utilisés pour les paramètres de cluster restants. Vous pouvez remplacer les paramètres de configuration du cluster par défaut. Par exemple, vous pouvez ajouter des VM secondaires (par défaut = 0) ou spécifier un réseau VPC autre que celui par défaut pour le cluster. Pour en savoir plus, consultez la section CreateCluster.

def quickstart(project_id, region, cluster_name, gcs_bucket, pyspark_file):
    # Create the cluster client.
    cluster_client = dataproc_v1.ClusterControllerClient(
        client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
    )

    # Create the cluster config.
    cluster = {
        "project_id": project_id,
        "cluster_name": cluster_name,
        "config": {
            "master_config": {"num_instances": 1, "machine_type_uri": "n1-standard-2"},
            "worker_config": {"num_instances": 2, "machine_type_uri": "n1-standard-2"},
        },
    }

    # Create the cluster.
    operation = cluster_client.create_cluster(
        request={"project_id": project_id, "region": region, "cluster": cluster}
    )
    result = operation.result()

    print(f"Cluster created successfully: {result.cluster_name}")

Envoyer un job

Les valeurs suivantes sont définies pour envoyer la tâche:

  • Le projet dans lequel le cluster sera créé
  • Région dans laquelle le cluster sera créé
  • La configuration de la tâche, qui spécifie le nom du cluster et le chemin de fichier (URI) Cloud Storage de la tâche PySpark

Consultez la section SubmitJob pour plus d'informations.

# Create the job client.
job_client = dataproc_v1.JobControllerClient(
    client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
)

# Create the job config.
job = {
    "placement": {"cluster_name": cluster_name},
    "pyspark_job": {"main_python_file_uri": f"gs://{gcs_bucket}/{spark_filename}"},
}

operation = job_client.submit_job_as_operation(
    request={"project_id": project_id, "region": region, "job": job}
)
response = operation.result()

# Dataproc job output is saved to the Cloud Storage bucket
# allocated to the job. Use regex to obtain the bucket and blob info.
matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri)

output = (
    storage.Client()
    .get_bucket(matches.group(1))
    .blob(f"{matches.group(2)}.000000000")
    .download_as_bytes()
    .decode("utf-8")
)

print(f"Job finished successfully: {output}\r\n")

Supprimer le cluster

Les valeurs suivantes sont définies pour supprimer le cluster:

  • Le projet dans lequel le cluster sera créé
  • Région dans laquelle le cluster sera créé
  • Le nom du cluster

Pour en savoir plus, consultez la section DeleteCluster.

# Delete the cluster once the job has terminated.
operation = cluster_client.delete_cluster(
    request={
        "project_id": project_id,
        "region": region,
        "cluster_name": cluster_name,
    }
)
operation.result()

print(f"Cluster {cluster_name} successfully deleted.")