Cloud-Clientbibliotheken für Python verwenden

Diese Anleitung umfasst eine Schritt-für-Schritt-Anleitung für Cloud Shell. Darin werden Dataproc gRPC APIs mithilfe von Google Cloud-Clientbibliotheken für Python programmatisch aufgerufen, um einen Cluster zu erstellen und einen Job an den Cluster zu senden.

In den folgenden Abschnitten wird die Verwendung des Anleitungscodes im GitHub-Repository GoogleCloudPlatform/python-docs-samples/dataproc erläutert.

Schritt-für-Schritt-Anleitung für Cloud Shell ausführen

Klicken Sie zum Ausführen der Anleitung auf Open in Google Cloud Shell (In Google Cloud Shell öffnen).

Dieses Projekt in Cloud Shell öffnen

Beispielcode für Python

Standardanmeldedaten für Anwendungen

Die Schritt-für-Schritt-Anleitung für Cloud Shell bietet eine Authentifizierung anhand der Anmeldedaten Ihres Google Cloud-Projekts. Wenn Sie Code lokal ausführen, sollten Sie zum Authentifizieren Ihres Codes die Dienstkonto-Anmeldedaten verwenden.

Cluster- und Jobclients

Zwei Google Cloud Dataproc API-Clients werden zum Ausführen des Anleitungscodes benötigt: ein ClusterControllerClient, um clusters-gRPC APIs aufzurufen und ein JobControllerClient, um jobs-gRPC-APIs aufzurufen. Wenn der Cluster zum Erstellen und Ausführen von Jobs in der globalen Dataproc-Region enthalten ist, verwendet der Code den standardmäßigen gRPC-Endpunkt. Wenn die Clusterregion nicht global ist, wird ein regionaler gRPC-Endpunkt verwendet.

if global_region:
    region = 'global'
    # Use the default gRPC global endpoints.
    dataproc_cluster_client = dataproc_v1.ClusterControllerClient()
    dataproc_job_client = dataproc_v1.JobControllerClient()
else:
    region = get_region_from_zone(zone)
    # Use a regional gRPC endpoint. See:
    # https://cloud.google.com/dataproc/docs/concepts/regional-endpoints
    client_transport = (
        cluster_controller_grpc_transport.ClusterControllerGrpcTransport(
            address='{}-dataproc.googleapis.com:443'.format(region)))
    job_transport = (
        job_controller_grpc_transport.JobControllerGrpcTransport(
            address='{}-dataproc.googleapis.com:443'.format(region)))
    dataproc_cluster_client = dataproc_v1.ClusterControllerClient(
        client_transport)
    dataproc_job_client = dataproc_v1.JobControllerClient(job_transport)

Dataproc-Cluster auflisten

Sie können Cluster innerhalb eines Projekts auflisten, indem Sie die ListClusters API aufrufen. Die Ausgabe gibt ein JSON-Objekt zurück, das die Cluster auflistet. Sie können die JSON-Antwort durchsehen, um Clusterdetails zu drucken.

def list_clusters_with_details(dataproc, project, region):
    """List the details of clusters in the region."""
    for cluster in dataproc.list_clusters(project, region):
        print(('{} - {}'.format(cluster.cluster_name,
                                cluster.status.State.Name(
                                    cluster.status.state))))

Dataproc-Cluster erstellen

Sie können einen neuen Dataproc-Cluster mit der CreateCluster API erstellen.

Beim Erstellen eines Clusters müssen Sie die folgenden Werte angeben:

  1. Das Projekt, in dem der Cluster erstellt wird.
  2. Der Name des Clusters
  3. Die zu verwendende Region. Wenn Sie die Region global angeben (im Anleitungscode wird das Flag --global_region zum Auswählen der globalen Region verwendet), müssen Sie auch eine Zone angeben (siehe zone_uri). Wenn Sie eine nicht globale Region angeben und das Feld zone_uri frei lassen, wählt die automatische Zonenplatzierung von Dataproc eine Zone für Ihren Cluster aus.
def create_cluster(dataproc, project, zone, region, cluster_name):
    """Create the cluster."""
    print('Creating cluster...')
    zone_uri = \
        'https://www.googleapis.com/compute/v1/projects/{}/zones/{}'.format(
            project, zone)
    cluster_data = {
        'project_id': project,
        'cluster_name': cluster_name,
        'config': {
            'gce_cluster_config': {
                'zone_uri': zone_uri
            },
            'master_config': {
                'num_instances': 1,
                'machine_type_uri': 'n1-standard-1'
            },
            'worker_config': {
                'num_instances': 2,
                'machine_type_uri': 'n1-standard-1'
            }
        }
    }

    cluster = dataproc.create_cluster(project, region, cluster_data)
    cluster.add_done_callback(callback)
    global waiting_callback
    waiting_callback = True

Sie können die Standard-Clusterkonfigurationseinstellungen auch überschreiben. Sie können beispielsweise die Anzahl der Worker angeben (Standard = 2), ob VMs auf Abruf verwendet werden sollen (Standard = 0) und Netzwerkeinstellungen festlegen (Standard = default network). Weitere Informationen finden Sie unter CreateClusterRequest.

Job an Dataproc-Cluster senden

Mit der SubmitJob API können Sie einen Job an einen vorhandenen Cluster senden. Wenn Sie einen Job senden, wird er asynchron ausgeführt.

Zum Senden eines Jobs müssen Sie folgende Angaben machen:

  1. Den Namen des Clusters, an den der Job gesendet wird
  2. Die zu verwendende Region
  3. Die Art des gesendeten Jobs, z. B. Hadoop, Spark, PySpark
  4. Jobdetails für die Art des gesendeten Jobs (siehe SubmitJobRequest für weitere Informationen)

Mit dem folgenden Code wird ein PySpark-Job an einen Cluster gesendet.

def submit_pyspark_job(dataproc, project, region, cluster_name, bucket_name,
                       filename):
    """Submit the Pyspark job to the cluster (assumes `filename` was uploaded
    to `bucket_name."""
    job_details = {
        'placement': {
            'cluster_name': cluster_name
        },
        'pyspark_job': {
            'main_python_file_uri': 'gs://{}/{}'.format(bucket_name, filename)
        }
    }

    result = dataproc.submit_job(
        project_id=project, region=region, job=job_details)
    job_id = result.reference.job_id
    print('Submitted job ID {}.'.format(job_id))
    return job_id

Standardmäßig wird mit dem Anleitungscode der folgende kleine PySpark-Job ausgeführt.

import pyspark

sc = pyspark.SparkContext()
rdd = sc.parallelize(['Hello,', 'world!', 'dog', 'elephant', 'panther'])
words = sorted(rdd.collect())
print(words)

Da der Job asynchron ausgeführt wird, kann die Ausgabe erst nach Abschluss des Jobs angezeigt werden. Mit GetJob können Sie den JobStatus abrufen, während der Job ausgeführt wird, bzw. Jobdetails abrufen, nachdem der Job abgeschlossen ist.

Jobstatus und -details abrufen

Führen Sie GetJobRequest mit den folgenden erforderlichen Informationen aus:

  1. Dem Projekt des Clusters, an den der Job gesendet wurde
  2. Der Clusterregion
  3. Der Job-ID (UUID)

Mit dem folgenden Code werden der Status eines Jobs geprüft und Jobdetails geliefert, wenn der Job abgeschlossen ist.

def wait_for_job(dataproc, project, region, job_id):
    """Wait for job to complete or error out."""
    print('Waiting for job to finish...')
    while True:
        job = dataproc.get_job(project, region, job_id)
        # Handle exceptions
        if job.status.State.Name(job.status.state) == 'ERROR':
            raise Exception(job.status.details)
        elif job.status.State.Name(job.status.state) == 'DONE':
            print('Job finished.')
            return job

Dataproc-Cluster löschen

Rufen Sie zum Löschen eines Clusters die DeleteCluster API auf.

def delete_cluster(dataproc, project, region, cluster):
    """Delete the cluster."""
    print('Tearing down cluster.')
    result = dataproc.delete_cluster(
        project_id=project, region=region, cluster_name=cluster)
    return result