Cloud-Clientbibliotheken für Python verwenden

Diese Anleitung umfasst eine Schritt-für-Schritt-Anleitung für Cloud Shell. Darin werden Dataproc gRPC APIs mithilfe von Google Cloud-Clientbibliotheken für Python programmatisch aufgerufen, um einen Cluster zu erstellen und einen Job an den Cluster zu senden.

In den folgenden Abschnitten wird die Verwendung des Anleitungscodes im GitHub-Repository GoogleCloudPlatform/python-dataproc erläutert.

Schritt-für-Schritt-Anleitung für Cloud Shell ausführen

Klicken Sie zum Ausführen der Anleitung auf Open in Cloud Shell (In Google Cloud Shell öffnen).

In Cloud Shell öffnen

Den Code verstehen

Standardanmeldedaten für Anwendungen

Die Schritt-für-Schritt-Anleitung für Cloud Shell bietet eine Authentifizierung anhand der Anmeldedaten Ihres Google Cloud-Projekts. Wenn Sie Code lokal ausführen, sollten Sie zum Authentifizieren Ihres Codes die Dienstkonto-Anmeldedaten verwenden.

Dataproc-Cluster erstellen

Die folgenden Werte werden für das Erstellen des Clusters festgelegt:

  • Das Projekt, in dem der Cluster erstellt wird.
  • Die Region, in der der Cluster erstellt wird
  • Der Name des Clusters
  • Die Clusterkonfiguration, in der ein Master und zwei primäre Worker angegeben sind

Für die übrigen Clustereinstellungen werden die Standardkonfigurationseinstellungen verwendet. Sie können die Standard-Clusterkonfigurationseinstellungen überschreiben. Sie können beispielsweise sekundäre VMs hinzufügen (Standard = 0) oder ein nicht standardmäßiges VPC-Netzwerk für den Cluster angeben. Weitere Informationen finden Sie unter CreateCluster.

def quickstart(project_id, region, cluster_name, gcs_bucket, pyspark_file):
    # Create the cluster client.
    cluster_client = dataproc_v1.ClusterControllerClient(
        client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
    )

    # Create the cluster config.
    cluster = {
        "project_id": project_id,
        "cluster_name": cluster_name,
        "config": {
            "master_config": {"num_instances": 1, "machine_type_uri": "n1-standard-2"},
            "worker_config": {"num_instances": 2, "machine_type_uri": "n1-standard-2"},
        },
    }

    # Create the cluster.
    operation = cluster_client.create_cluster(
        request={"project_id": project_id, "region": region, "cluster": cluster}
    )
    result = operation.result()

    print(f"Cluster created successfully: {result.cluster_name}")

Job senden

Für die Aufgabe werden die folgenden Werte festgelegt:

  • Das Projekt, in dem der Cluster erstellt wird.
  • Die Region, in der der Cluster erstellt wird
  • Die Jobkonfiguration, in der der Clustername und der Cloud Storage-Pfad (URI) des PySpark-Jobs angegeben sind

Weitere Informationen finden Sie unter SubmitJob.

# Create the job client.
job_client = dataproc_v1.JobControllerClient(
    client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
)

# Create the job config.
job = {
    "placement": {"cluster_name": cluster_name},
    "pyspark_job": {"main_python_file_uri": f"gs://{gcs_bucket}/{spark_filename}"},
}

operation = job_client.submit_job_as_operation(
    request={"project_id": project_id, "region": region, "job": job}
)
response = operation.result()

# Dataproc job output is saved to the Cloud Storage bucket
# allocated to the job. Use regex to obtain the bucket and blob info.
matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri)

output = (
    storage.Client()
    .get_bucket(matches.group(1))
    .blob(f"{matches.group(2)}.000000000")
    .download_as_bytes()
    .decode("utf-8")
)

print(f"Job finished successfully: {output}\r\n")

Cluster löschen

Die folgenden Werte werden festgelegt, um den Cluster zu löschen:

  • Das Projekt, in dem der Cluster erstellt wird.
  • Die Region, in der der Cluster erstellt wird
  • Der Name des Clusters

Weitere Informationen finden Sie unter DeleteCluster.

# Delete the cluster once the job has terminated.
operation = cluster_client.delete_cluster(
    request={
        "project_id": project_id,
        "region": region,
        "cluster_name": cluster_name,
    }
)
operation.result()

print(f"Cluster {cluster_name} successfully deleted.")