Cloud-Clientbibliotheken für Python verwenden

Diese Anleitung umfasst eine Schritt-für-Schritt-Anleitung für Cloud Shell. Darin werden Dataproc gRPC APIs mithilfe von Google Cloud-Clientbibliotheken für Python programmatisch aufgerufen, um einen Cluster zu erstellen und einen Job an den Cluster zu senden.

In den folgenden Abschnitten wird die Verwendung des Anleitungscodes im GitHub-Repository GoogleCloudPlatform/python-dataproc erläutert.

Schritt-für-Schritt-Anleitung für Cloud Shell ausführen

Klicken Sie zum Ausführen der Anleitung auf Open in Cloud Shell (In Google Cloud Shell öffnen).

In Cloud Shell öffnen

Den Code verstehen

Standardanmeldedaten für Anwendungen

Die Schritt-für-Schritt-Anleitung für Cloud Shell in dieser Anleitung ermöglicht die Authentifizierung mit den Anmeldedaten Ihres Google Cloud-Projekts. Wenn Sie Code lokal ausführen, sollten Sie zum Authentifizieren Ihres Codes die Dienstkonto-Anmeldedaten verwenden.

Dataproc-Cluster erstellen

Die folgenden Werte werden zum Erstellen des Clusters festgelegt:

  • Das Projekt, in dem der Cluster erstellt wird.
  • Die Region, in der der Cluster erstellt wird
  • Den Namen des Clusters.
  • Die Clusterkonfiguration, die einen Master und zwei primäre Worker angibt

Für die verbleibenden Clustereinstellungen werden die Standardkonfigurationseinstellungen verwendet. Sie können die Standardkonfigurationseinstellungen für Cluster überschreiben. Sie können beispielsweise sekundäre VMs hinzufügen (Standard = 0) oder ein nicht standardmäßiges VPC-Netzwerk für den Cluster angeben. Weitere Informationen finden Sie unter CreateCluster.

def quickstart(project_id, region, cluster_name, gcs_bucket, pyspark_file):
    # Create the cluster client.
    cluster_client = dataproc_v1.ClusterControllerClient(
        client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
    )

    # Create the cluster config.
    cluster = {
        "project_id": project_id,
        "cluster_name": cluster_name,
        "config": {
            "master_config": {"num_instances": 1, "machine_type_uri": "n1-standard-2"},
            "worker_config": {"num_instances": 2, "machine_type_uri": "n1-standard-2"},
        },
    }

    # Create the cluster.
    operation = cluster_client.create_cluster(
        request={"project_id": project_id, "region": region, "cluster": cluster}
    )
    result = operation.result()

    print(f"Cluster created successfully: {result.cluster_name}")

Job senden

Die folgenden Werte werden zum Senden des Jobs festgelegt:

  • Das Projekt, in dem der Cluster erstellt wird.
  • Die Region, in der der Cluster erstellt wird
  • Die Jobkonfiguration, die den Clusternamen und den Cloud Storage-Dateipfad (URI) des PySpark-Jobs angibt

Weitere Informationen finden Sie unter SubmitJob.

# Create the job client.
job_client = dataproc_v1.JobControllerClient(
    client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
)

# Create the job config.
job = {
    "placement": {"cluster_name": cluster_name},
    "pyspark_job": {"main_python_file_uri": f"gs://{gcs_bucket}/{spark_filename}"},
}

operation = job_client.submit_job_as_operation(
    request={"project_id": project_id, "region": region, "job": job}
)
response = operation.result()

# Dataproc job output is saved to the Cloud Storage bucket
# allocated to the job. Use regex to obtain the bucket and blob info.
matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri)

output = (
    storage.Client()
    .get_bucket(matches.group(1))
    .blob(f"{matches.group(2)}.000000000")
    .download_as_bytes()
    .decode("utf-8")
)

print(f"Job finished successfully: {output}\r\n")

Cluster löschen

Die folgenden Werte sind zum Löschen des Clusters festgelegt:

  • Das Projekt, in dem der Cluster erstellt wird.
  • Die Region, in der der Cluster erstellt wird
  • Den Namen des Clusters.

Weitere Informationen finden Sie unter DeleteCluster.

# Delete the cluster once the job has terminated.
operation = cluster_client.delete_cluster(
    request={
        "project_id": project_id,
        "region": region,
        "cluster_name": cluster_name,
    }
)
operation.result()

print(f"Cluster {cluster_name} successfully deleted.")