使用 Python 版 Cloud 客户端库

本教程提供了一项 Cloud Shell 演示,该演示使用 Python 版 Google Cloud 客户端库以编程方式调用 Dataproc gRPC API 来创建集群并将作业提交到该集群。

以下部分介绍 GitHub GoogleCloudPlatform/python-dataproc 代码库中包含的演示代码操作。

运行 Cloud Shell 演示

点击在 Google Cloud Shell 中打开 (Open in Google Cloud Shell) 以运行演示。

在 Cloud Shell 中打开此项目

了解 Python 示例代码

应用默认凭据

本教程中的 Cloud Shell 演示使用 Google Cloud 项目凭据提供身份验证。在本地运行代码时,推荐的做法是使用服务帐号凭据对代码进行身份验证。

创建 Dataproc 集群

您可以使用 CreateCluster API 创建新的 Dataproc 集群。

创建集群时,必须指定以下值:

  1. 将在其中创建集群的项目
  2. 集群的名称
  3. 要使用的区域。如果您指定了 global 区域(教程代码使用 --global_region 标志来选择全球区域),您还必须指定地区(请参阅 zone_uri)。如果您指定了非全球区域并将 zone_uri 字段留空,Dataproc 自动选择地区功能会为您的集群选择一个地区。
from google.cloud import dataproc_v1 as dataproc

def create_cluster(project_id, region, cluster_name):
    """This sample walks a user through creating a Cloud Dataproc cluster
       using the Python client library.

       Args:
           project_id (string): Project to use for creating resources.
           region (string): Region where the resources should live.
           cluster_name (string): Name to use for creating a cluster.
    """

    # Create a client with the endpoint set to the desired cluster region.
    cluster_client = dataproc.ClusterControllerClient(
        client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
    )

    # Create the cluster config.
    cluster = {
        "project_id": project_id,
        "cluster_name": cluster_name,
        "config": {
            "master_config": {"num_instances": 1, "machine_type_uri": "n1-standard-2"},
            "worker_config": {"num_instances": 2, "machine_type_uri": "n1-standard-2"},
        },
    }

    # Create the cluster.
    operation = cluster_client.create_cluster(
        request={"project_id": project_id, "region": region, "cluster": cluster}
    )
    result = operation.result()

    # Output a success message.
    print(f"Cluster created successfully: {result.cluster_name}")

您还可以替换默认集群配置设置。例如,您可以指定工作器数量(默认值 = 2)、是否使用抢占式虚拟机(默认值 = 0)以及网络设置(默认值 = default network)。如需了解详情,请参阅 CreateClusterRequest

列出 Dataproc 集群

您可以通过调用 ListClusters API 来列出项目中的集群。输出结果会返回列出集群的 JSON 对象。 您可以遍历 JSON 响应以输出集群详细信息。

def list_clusters_with_details(dataproc, project, region):
    """List the details of clusters in the region."""
    for cluster in dataproc.list_clusters(
        request={"project_id": project, "region": region}
    ):
        print(
            (
                "{} - {}".format(
                    cluster.cluster_name,
                    cluster.status.State.Name(cluster.status.state),
                )
            )
        )

将作业提交到 Dataproc 集群

您可以使用 SubmitJob API 将作业提交到现有集群。提交作业时,它将异步运行。

为了能够提交作业,您必须指定以下信息:

  1. 作业将提交到的集群的名称
  2. 要使用的区域
  3. 正在提交的作业的类型(例如 HadoopSpark、'PySpark)
  4. 提交的作业类型的作业详情(如需了解详情,请参阅 SubmitJobRequest)。

以下代码将 Spark 作业提交到集群。

import re

from google.cloud import dataproc_v1 as dataproc
from google.cloud import storage

def submit_job(project_id, region, cluster_name):
    # Create the job client.
    job_client = dataproc.JobControllerClient(client_options={
        'api_endpoint': '{}-dataproc.googleapis.com:443'.format(region)
    })

    # Create the job config. 'main_jar_file_uri' can also be a
    # Google Cloud Storage URL.
    job = {
        'placement': {
            'cluster_name': cluster_name
        },
        'spark_job': {
            'main_class': 'org.apache.spark.examples.SparkPi',
            'jar_file_uris': ['file:///usr/lib/spark/examples/jars/spark-examples.jar'],
            'args': ['1000']
        }
    }

    operation = job_client.submit_job_as_operation(
        request={"project_id": project_id, "region": region, "job": job}
    )
    response = operation.result()

    # Dataproc job output gets saved to the Google Cloud Storage bucket
    # allocated to the job. Use a regex to obtain the bucket and blob info.
    matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri)

    output = (
        storage.Client()
        .get_bucket(matches.group(1))
        .blob(f"{matches.group(2)}.000000000")
        .download_as_string()
    )

    print(f"Job finished successfully: {output}")

默认情况下,教程代码运行 Spark 随附的 SparkPi 示例作业。

删除 Dataproc 集群

调用 DeleteCluster API 以删除集群。

def delete_cluster(dataproc, project, region, cluster):
    """Delete the cluster."""
    print("Tearing down cluster.")
    result = dataproc.delete_cluster(
        request={"project_id": project, "region": region, "cluster_name": cluster}
    )
    return result