在 Google Kubernetes Engine 上的 Dataproc 上运行 Spark 作业

准备工作

  1. 您必须创建了一个标准(而非 Autopilot)Google Kubernetes Engine (GKE) 可用区级区域级集群,并在集群上启用了 Workload Identity

创建 Dataproc on GKE 虚拟集群

系统会创建一个 Dataproc on GKE 虚拟集群,作为 Dataproc 组件的部署平台。它是一种虚拟资源,与 Dataproc on Compute Engine 集群不同,不包含单独的 Dataproc 主虚拟机和工作器虚拟机。

  • 当您创建 Dataproc on GKE 虚拟集群时,Dataproc on GKE 会在 GKE 集群中创建节点池。

  • Dataproc on GKE 作业会作为 Pod 在这些节点池上运行。节点池以及节点池上的 Pod 调度由 GKE 管理。

  • 创建多个虚拟集群。您可以在 GKE 集群上创建和运行多个虚拟集群,通过在虚拟集群之间共享节点池来提高资源利用率。

    • 每个虚拟集群:
      • 使用单独的属性(包括 Spark 引擎版本和工作负载身份)创建
      • 在 GKE 集群上的单独 GKE 命名空间中进行隔离

控制台

  1. 在 Google Cloud 控制台中,前往 Dataproc 集群页面。

    转到集群

  2. 点击创建集群

  3. 创建 Dataproc 集群对话框中,点击 GKE 上的集群行中的创建

  4. 设置集群面板中:

    1. 集群名称字段中,为集群输入名称。
    2. 区域列表中,为 Dataproc on GKE 虚拟集群选择一个区域。此区域必须与现有 GKE 集群所在的区域相同(您将在下一部分中进行选择)。
    3. Kubernetes 集群字段中,点击浏览以选择现有 GKE 集群所在的区域。
    4. 可选:在 Cloud Storage 暂存桶字段中,您可以点击浏览以选择现有的 Cloud Storage 存储桶。Dataproc on GKE 将在存储桶中暂存工件。忽略此字段可让 Dataproc on GKE 创建一个暂存存储桶。
  5. 在左侧面板中,点击配置节点池,然后在节点池面板中,点击添加池

    1. 如需重复使用现有的 Dataproc on GKE 节点池,请执行以下操作:
      1. 点击重复使用现有节点池
      2. 输入现有节点池的名称,然后选择其角色。必须至少有一个节点池具有 DEFAULT 角色。
      3. 点击完成
    2. 如需创建新的 Dataproc on GKE 节点池,请执行以下操作:
      1. 点击创建新的节点池
      2. 输入以下节点池值:
        • 节点池名称
        • 角色:至少有一个节点池必须具有 DEFAULT 角色。
        • 位置:指定 GKE 上的 Dataproc 集群区域内的可用可用区域。
        • 节点池机器类型
        • CPU 平台
        • 可抢占性
        • Min:节点数下限。
        • Max:节点数上限。节点数上限必须大于 0。
    3. 点击添加池以添加更多节点池。所有节点池都必须具有位置信息。您最多可以添加 4 个节点池。
  6. (可选)如果您已设置 Dataproc Persistent History Server (PHS) 以用于查看 GKE 上活跃和已删除的 Dataproc 集群的 Spark 作业历史记录,请点击自定义集群。然后,在 History Server 集群字段中,浏览并选择您的 PHS 集群。PHS 集群必须与 Dataproc on GKE 虚拟集群位于同一区域。

  7. 点击创建以创建 Dataproc 集群。您的 Dataproc on GKE 集群会显示在集群页面上的列表中。在集群准备好投入使用之前,状态为正在预配,然后状态会更改为正在运行

gcloud

设置环境变量,然后在本地或在 Cloud Shell 中运行 gcloud dataproc clusters gke create 命令以创建 Dataproc on GKE 集群。

  1. 设置环境变量:

    DP_CLUSTER=Dataproc on GKE  cluster-name \
      REGION=region \
      GKE_CLUSTER=GKE cluster-name \
      BUCKET=Cloud Storage bucket-name \
      DP_POOLNAME=node pool-name
      PHS_CLUSTER=Dataproc PHS server name
    
    注意:

    • DP_CLUSTER:设置 Dataproc 虚拟集群名称,该名称必须以小写字母开头,后面最多可跟 54 个小写字母、数字或连字符。且不能以连字符结尾。
    • REGIONregion 必须与 GKE 集群所在的区域相同。
    • GKE_CLUSTER:现有 GKE 集群的名称。
    • BUCKET:(可选)您可以指定 Cloud Storage 存储桶的名称,Dataproc 将使用该名称暂存工件。如果您未指定存储桶,GKE 上的 Dataproc 将创建一个暂存存储桶。
    • DP_POOLNAME:要在 GKE 集群上创建的节点池的名称。
    • PHS_CLUSTER:(可选)Dataproc PHS 服务器,用于查看活跃和已删除的 Dataproc on GKE 集群上的 Spark 作业历史记录。PHS 集群必须与 Dataproc on GKE 虚拟集群位于同一区域。
  2. 运行以下命令:

    gcloud dataproc clusters gke create ${DP_CLUSTER} \
        --region=${REGION} \
        --gke-cluster=${GKE_CLUSTER} \
        --spark-engine-version=latest \
        --staging-bucket=${BUCKET} \
        --pools="name=${DP_POOLNAME},roles=default" \
        --setup-workload-identity \
        --history-server-cluster=${PHS_CLUSTER}
    
    注意:

    • --spark-engine-version:Dataproc 集群上使用的 Spark 映像版本。您可以使用标识符(例如 33.1latest),也可以指定完整的子次要版本(例如 3.1-dataproc-5)。
    • --staging-bucket:删除此标志可让 Dataproc on GKE 创建暂存存储桶。
    • --pools:此标志用于指定 Dataproc 将创建或用于执行工作负载的新节点池或现有节点池。列出 GKE 上的 Dataproc 节点池设置,以英文逗号分隔,例如:
      --pools=name=dp-default,roles=default,machineType=e2-standard-4,min=0,max=10
      
      您必须指定节点池 namerole。其他节点池设置为可选。您可以使用多个 --pools 标志来指定多个节点池。至少一个节点池必须具有 default 角色。所有节点池都必须位于同一位置。
    • --setup-workload-identity:此标志用于启用 Workload Identity 绑定。这些绑定允许 Kubernetes 服务账号 (KSA) 充当虚拟集群的默认 Dataproc 虚拟机服务账号(数据平面身份)

REST

在 Dataproc API cluster.create 请求中完成 virtualClusterConfig

在使用任何请求数据之前,请先进行以下替换:

  • PROJECT:Google Cloud 项目 ID
  • REGION:Dataproc 虚拟集群区域(与现有 GKE 集群区域相同)
  • DP_CLUSTER:Dataproc 集群名称
  • GKE_CLUSTER:GKE 集群名称
  • NODE_POOL:节点池名称
  • PHS_CLUSTER持久性历史记录服务器 (PHS) 集群名称
  • BUCKET:(可选)暂存桶名称。留空此字段可让 Dataproc on GKE 创建一个预演存储桶。

HTTP 方法和网址:

POST https://dataproc.googleapis.com/v1/projects/project-id/regions/region/clusters

请求 JSON 正文:

{
  "clusterName":"DP_CLUSTER",
  "projectId":"PROJECT",
  "virtualClusterConfig":{
    "auxiliaryServicesConfig":{
      "sparkHistoryServerConfig":{
        "dataprocCluster":"projects/PROJECT/regions/REGION/clusters/PHS_CLUSTER"
      }
    },
    "kubernetesClusterConfig":{
      "gkeClusterConfig":{
        "gkeClusterTarget":"projects/PROJECT/locations/REGION/clusters/GKE_CLUSTER",
        "nodePoolTarget":[
          {
"nodePool":"projects/PROJECT/locations/REGION/clusters/GKE_CLUSTER/nodePools/NODE_POOL",
            "roles":[
              "DEFAULT"
            ]
          }
        ]
      },
      "kubernetesSoftwareConfig":{
        "componentVersion":{
          "SPARK":"latest"
        }
      }
    },
    "stagingBucket":"BUCKET"
  }
}

如需发送您的请求,请展开以下选项之一:

您应该收到类似以下内容的 JSON 响应:

{
  "projectId":"PROJECT",
  "clusterName":"DP_CLUSTER",
  "status":{
    "state":"RUNNING",
    "stateStartTime":"2022-04-01T19:16:39.865716Z"
  },
  "clusterUuid":"98060b77-...",
  "statusHistory":[
    {
      "state":"CREATING",
      "stateStartTime":"2022-04-01T19:14:27.340544Z"
    }
  ],
  "labels":{
    "goog-dataproc-cluster-name":"DP_CLUSTER",
    "goog-dataproc-cluster-uuid":"98060b77-...",
    "goog-dataproc-location":"REGION",
    "goog-dataproc-environment":"prod"
  },
  "virtualClusterConfig":{
    "stagingBucket":"BUCKET",
    "kubernetesClusterConfig":{
      "kubernetesNamespace":"dp-cluster",
      "gkeClusterConfig":{
"gkeClusterTarget":"projects/PROJECT/locations/REGION/clusters/GKE_CLUSTER",
        "nodePoolTarget":[
          {
"nodePool":"projects/PROJECT/locations/REGION/clusters/GKE_CLUSTER/nodePools/NODE_POOL",
            "roles":[
              "DEFAULT"
            ]
          }
        ]
      },
      "kubernetesSoftwareConfig":{
        "componentVersion":{
          "SPARK":"3.1-..."
        },
        "properties":{
          "dpgke:dpgke.unstable.outputOnly.endpoints.sparkHistoryServer":"https://...",
          "spark:spark.eventLog.dir":"gs://BUCKET/.../spark-job-history",
          "spark:spark.eventLog.enabled":"true"
        }
      }
    },
    "auxiliaryServicesConfig":{
      "sparkHistoryServerConfig":{
        "dataprocCluster":"projects/PROJECT/regions/REGION/clusters/PHS_CLUSTER"
      }
    }
  }

提交 Spark 作业

在 Dataproc on GKE 虚拟集群运行后,使用 Google Cloud 控制台、gcloud CLI 或 Dataproc jobs.submit API(使用直接 HTTP 请求或 Cloud 客户端库提交 Spark 作业

gcloud CLI Spark 作业示例

gcloud dataproc jobs submit spark \
    --region=${REGION} \
    --cluster=${DP_CLUSTER} \
    --class=org.apache.spark.examples.SparkPi \
    --jars=local:///usr/lib/spark/examples/jars/spark-examples.jar \
    -- 1000

gcloud CLI PySpark 作业示例

gcloud dataproc jobs submit pyspark \
    --region=${REGION} \
    --cluster=${DP_CLUSTER} \
    local:///usr/lib/spark/examples/src/main/python/pi.py \
    -- 10

gcloud CLI SparkR 作业示例

gcloud dataproc jobs submit spark-r \
    --region=${REGION} \
    --cluster=${DP_CLUSTER} \
    local:///usr/lib/spark/examples/src/main/r/dataframe.R

清理