Cloud-Clientbibliotheken für Python verwenden

Diese Anleitung umfasst eine Schritt-für-Schritt-Anleitung für Cloud Shell. Darin werden Dataproc gRPC APIs mithilfe von Google Cloud-Clientbibliotheken für Python programmatisch aufgerufen, um einen Cluster zu erstellen und einen Job an den Cluster zu senden.

In den folgenden Abschnitten wird die Verwendung des Anleitungscodes im GitHub-Repository GoogleCloudPlatform/python-dataproc erläutert.

Schritt-für-Schritt-Anleitung für Cloud Shell ausführen

Klicken Sie zum Ausführen der Anleitung auf Open in Cloud Shell (In Google Cloud Shell öffnen).

In Cloud Shell öffnen

Den Code verstehen

Standardanmeldedaten für Anwendungen

Die Schritt-für-Schritt-Anleitung für Cloud Shell in diesem Tutorial bietet eine Authentifizierung anhand der Anmeldedaten Ihres Google Cloud -Projekts. Wenn Sie Code lokal ausführen, sollten Sie zum Authentifizieren Ihres Codes die Dienstkonto-Anmeldedaten verwenden.

Dataproc-Cluster erstellen

Die folgenden Werte werden festgelegt, um den Cluster zu erstellen:

  • Das Projekt, in dem der Cluster erstellt wird.
  • Die Region, in der der Cluster erstellt wird.
  • Der Name des Clusters
  • Die Clusterkonfiguration, die einen Master und zwei primäre Worker angibt

Für die verbleibenden Clustereinstellungen werden die Standardkonfigurationseinstellungen verwendet. Sie können die Standard-Clusterkonfigurationseinstellungen überschreiben. Sie können beispielsweise sekundäre VMs hinzufügen (Standard = 0) oder ein nicht standardmäßiges VPC-Netzwerk für den Cluster angeben. Weitere Informationen finden Sie unter CreateCluster.

def quickstart(project_id, region, cluster_name, gcs_bucket, pyspark_file):
    # Create the cluster client.
    cluster_client = dataproc_v1.ClusterControllerClient(
        client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
    )

    # Create the cluster config.
    cluster = {
        "project_id": project_id,
        "cluster_name": cluster_name,
        "config": {
            "master_config": {"num_instances": 1, "machine_type_uri": "n1-standard-2"},
            "worker_config": {"num_instances": 2, "machine_type_uri": "n1-standard-2"},
        },
    }

    # Create the cluster.
    operation = cluster_client.create_cluster(
        request={"project_id": project_id, "region": region, "cluster": cluster}
    )
    result = operation.result()

    print(f"Cluster created successfully: {result.cluster_name}")

Job senden

Die folgenden Werte werden festgelegt, um den Job zu senden:

  • Das Projekt, in dem der Cluster erstellt wird.
  • Die Region, in der der Cluster erstellt wird.
  • Die Jobkonfiguration, in der der Clusternamen und der Cloud Storage-Dateipfad (URI) des PySpark-Jobs angegeben werden

Weitere Informationen finden Sie unter SubmitJob.

# Create the job client.
job_client = dataproc_v1.JobControllerClient(
    client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
)

# Create the job config.
job = {
    "placement": {"cluster_name": cluster_name},
    "pyspark_job": {"main_python_file_uri": f"gs://{gcs_bucket}/{spark_filename}"},
}

operation = job_client.submit_job_as_operation(
    request={"project_id": project_id, "region": region, "job": job}
)
response = operation.result()

# Dataproc job output is saved to the Cloud Storage bucket
# allocated to the job. Use regex to obtain the bucket and blob info.
matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri)

output = (
    storage.Client()
    .get_bucket(matches.group(1))
    .blob(f"{matches.group(2)}.000000000")
    .download_as_bytes()
    .decode("utf-8")
)

print(f"Job finished successfully: {output}\r\n")

Cluster löschen

Die folgenden Werte werden festgelegt, um das Cluster zu löschen:

  • Das Projekt, in dem der Cluster erstellt wird.
  • Die Region, in der der Cluster erstellt wird.
  • Der Name des Clusters

Weitere Informationen finden Sie unter DeleteCluster.

# Delete the cluster once the job has terminated.
operation = cluster_client.delete_cluster(
    request={
        "project_id": project_id,
        "region": region,
        "cluster_name": cluster_name,
    }
)
operation.result()

print(f"Cluster {cluster_name} successfully deleted.")