Diese Seite wurde von der Cloud Translation API übersetzt.
Switch to English

Cloud-Clientbibliotheken für Python verwenden

Diese Anleitung umfasst eine Schritt-für-Schritt-Anleitung für Cloud Shell. Darin werden Dataproc gRPC APIs mithilfe von Google Cloud-Clientbibliotheken für Python programmatisch aufgerufen, um einen Cluster zu erstellen und einen Job an den Cluster zu senden.

In den folgenden Abschnitten wird die Verwendung des Anleitungscodes im GitHub-Repository GoogleCloudPlatform/python-dataproc erläutert.

Schritt-für-Schritt-Anleitung für Cloud Shell ausführen

Klicken Sie zum Ausführen der Anleitung auf Open in Google Cloud Shell (In Google Cloud Shell öffnen).

Dieses Projekt in Cloud Shell öffnen

Beispielcode für Python

Standardanmeldedaten für Anwendungen

Die Schritt-für-Schritt-Anleitung für Cloud Shell bietet eine Authentifizierung anhand der Anmeldedaten Ihres Google Cloud-Projekts. Wenn Sie Code lokal ausführen, sollten Sie zum Authentifizieren Ihres Codes die Dienstkonto-Anmeldedaten verwenden.

Dataproc-Cluster erstellen

Sie können einen neuen Dataproc-Cluster mit der CreateCluster API erstellen.

Beim Erstellen eines Clusters müssen Sie die folgenden Werte angeben:

  1. Das Projekt, in dem der Cluster erstellt wird.
  2. Den Namen des Clusters.
  3. Die zu verwendende Region. Wenn Sie die Region global angeben (im Anleitungscode wird das Flag --global_region zum Auswählen der globalen Region verwendet), müssen Sie auch eine Zone angeben (siehe zone_uri). Wenn Sie eine nicht globale Region angeben und das Feld zone_uri frei lassen, wählt die automatische Zonenplatzierung von Dataproc eine Zone für Ihren Cluster aus.
from google.cloud import dataproc_v1 as dataproc

def create_cluster(project_id, region, cluster_name):
    """This sample walks a user through creating a Cloud Dataproc cluster
       using the Python client library.

       Args:
           project_id (string): Project to use for creating resources.
           region (string): Region where the resources should live.
           cluster_name (string): Name to use for creating a cluster.
    """

    # Create a client with the endpoint set to the desired cluster region.
    cluster_client = dataproc.ClusterControllerClient(
        client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
    )

    # Create the cluster config.
    cluster = {
        "project_id": project_id,
        "cluster_name": cluster_name,
        "config": {
            "master_config": {"num_instances": 1, "machine_type_uri": "n1-standard-1"},
            "worker_config": {"num_instances": 2, "machine_type_uri": "n1-standard-1"},
        },
    }

    # Create the cluster.
    operation = cluster_client.create_cluster(
        request={"project_id": project_id, "region": region, "cluster": cluster}
    )
    result = operation.result()

    # Output a success message.
    print(f"Cluster created successfully: {result.cluster_name}")

Sie können die Standard-Clusterkonfigurationseinstellungen auch überschreiben. Sie können beispielsweise die Anzahl der Worker angeben (Standard = 2), ob VMs auf Abruf verwendet werden sollen (Standard = 0) und Netzwerkeinstellungen festlegen (Standard = default network). Weitere Informationen finden Sie unter CreateClusterRequest.

Dataproc-Cluster auflisten

Sie können Cluster innerhalb eines Projekts auflisten, indem Sie die ListClusters API aufrufen. Die Ausgabe gibt ein JSON-Objekt zurück, das die Cluster auflistet. Sie können die JSON-Antwort durchsehen, um Clusterdetails zu drucken.

def list_clusters_with_details(dataproc, project, region):
    """List the details of clusters in the region."""
    for cluster in dataproc.list_clusters(
        request={"project_id": project, "region": region}
    ):
        print(
            (
                "{} - {}".format(
                    cluster.cluster_name,
                    cluster.status.State.Name(cluster.status.state),
                )
            )
        )

Job an Dataproc-Cluster senden

Mit der SubmitJob API können Sie einen Job an einen vorhandenen Cluster senden. Wenn Sie einen Job senden, wird er asynchron ausgeführt.

Zum Senden eines Jobs müssen Sie folgende Angaben machen:

  1. Den Namen des Clusters, an den der Job gesendet wird
  2. Die zu verwendende Region
  3. Die Art des gesendeten Jobs, z. B. Hadoop, Spark, PySpark
  4. Jobdetails für die Art des gesendeten Jobs (siehe SubmitJobRequest für weitere Informationen)

Mit dem folgenden Code wird ein Spark-Job an einen Cluster gesendet.

import re

from google.cloud import dataproc_v1 as dataproc
from google.cloud import storage

def submit_job(project_id, region, cluster_name):
    # Create the job client.
    job_client = dataproc.JobControllerClient(client_options={
        'api_endpoint': '{}-dataproc.googleapis.com:443'.format(region)
    })

    # Create the job config. 'main_jar_file_uri' can also be a
    # Google Cloud Storage URL.
    job = {
        'placement': {
            'cluster_name': cluster_name
        },
        'spark_job': {
            'main_class': 'org.apache.spark.examples.SparkPi',
            'jar_file_uris': ['file:///usr/lib/spark/examples/jars/spark-examples.jar'],
            'args': ['1000']
        }
    }

    operation = job_client.submit_job_as_operation(
        request={"project_id": project_id, "region": region, "job": job}
    )
    response = operation.result()

    # Dataproc job output gets saved to the Google Cloud Storage bucket
    # allocated to the job. Use a regex to obtain the bucket and blob info.
    matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri)

    output = (
        storage.Client()
        .get_bucket(matches.group(1))
        .blob(f"{matches.group(2)}.000000000")
        .download_as_string()
    )

    print(f"Job finished successfully: {output}")

Mit dem Anleitungscode wird standardmäßig der in Spark enthaltene Beispieljob SparkPi ausgeführt.

Dataproc-Cluster löschen

Rufen Sie zum Löschen eines Clusters die DeleteCluster API auf.

def delete_cluster(dataproc, project, region, cluster):
    """Delete the cluster."""
    print("Tearing down cluster.")
    result = dataproc.delete_cluster(
        request={"project_id": project, "region": region, "cluster_name": cluster}
    )
    return result