在 Google Kubernetes Engine 上的 Dataproc 上运行 Spark 作业

准备工作

  1. 您必须已经创建一个在集群上启用了 Workload Identity 的标准(非 Autopilot)Google Kubernetes Engine (GKE) 可用区级区域级集群

创建 Dataproc on GKE 虚拟集群

系统创建一个 Dataproc on GKE 虚拟集群,作为 Dataproc 组件的部署平台。它是一种虚拟资源,与 Compute Engine 集群上的 Dataproc 不同,它不包含单独的 Dataproc 主虚拟机和工作器虚拟机。

  • 当您创建 Dataproc on GKE 虚拟集群时,Dataproc on GKE 会在 GKE 集群中创建节点池。

  • Dataproc on GKE 作业在这些节点池上作为 Pod 运行。节点池以及节点池上的 Pod 调度由 GKE 管理。

  • 创建多个虚拟集群。您可以在 GKE 集群上创建和运行多个虚拟集群,通过在各个虚拟集群中共享节点池来提高资源利用率。

    • 每个虚拟集群:
      • 具有不同的属性创建,包括 Spark Engine 版本和 Workload Identity
      • 会隔离在 GKE 集群单独的 GKE 命名空间内

控制台

  1. 在 Google Cloud 控制台中,转到 Dataproc 集群页面。

    转到集群

  2. 点击创建集群

  3. 创建 Dataproc 集群对话框中,点击 Cluster on GKE 行中的创建

  4. 设置集群面板中,执行以下操作:

    1. 集群名称字段中,输入集群的名称。
    2. 区域列表中,为 Dataproc on GKE 虚拟集群选择一个区域。此区域必须是现有 GKE 集群所在的区域(您在下一项中选择)。
    3. Kubernetes 集群字段中,点击浏览,选择现有 GKE 集群所在的区域。
    4. 可选:在 Cloud Storage 暂存存储桶字段中,您可以点击浏览以选择现有的 Cloud Storage 存储桶。Dataproc on GKE 会将工件暂存在存储桶中。忽略此字段,让 Dataproc on GKE 创建暂存存储桶。
  5. 在左侧面板中,点击配置节点池,然后在节点池面板中,点击添加池

    1. 如需重复使用现有的 Dataproc on GKE 节点池,请执行以下操作:
      1. 点击重复使用现有节点池
      2. 输入现有节点池的名称,然后选择其角色。必须至少有一个节点池具有 DEFAULT 角色。
      3. 点击完成
    2. 如需创建新的 Dataproc on GKE 节点池,请执行以下操作:
      1. 点击创建新节点池
      2. 输入以下节点池值:
    3. 点击添加池以添加更多节点池。所有节点池都必须具有该位置。您总共可以添加四个节点池。
  6. (可选)如果您设置了用于查看 Spark 作业历史记录的 Dataproc Persistent History Server (PHS),请在活跃和已删除的 Dataproc on GKE 集群上点击自定义集群。然后,在历史记录服务器集群字段中,浏览并选择您的 PHS 集群。PHS 集群必须与 Dataproc on GKE 虚拟集群位于同一区域中。

  7. 点击创建以创建 Dataproc 集群。您的 Dataproc on GKE 集群会显示在集群页面上的列表中。在集群可供使用之前,其状态为正在预配,之后状态会更改为正在运行

gcloud

设置环境变量,然后在本地或在 Cloud Shell 中运行 gcloud dataproc clusters gke create 命令,以创建 Dataproc on GKE 集群。

  1. 设置环境变量:

    DP_CLUSTER=Dataproc on GKE  cluster-name \
      REGION=region \
      GKE_CLUSTER=GKE cluster-name \
      BUCKET=Cloud Storage bucket-name \
      DP_POOLNAME=node pool-name
      PHS_CLUSTER=Dataproc PHS server name
    
    备注:

    • DP_CLUSTER:设置 Dataproc 虚拟集群名称,该名称必须以小写字母开头,后面最多可跟 54 个小写字母、数字或连字符。且不能以连字符结尾。
    • REGIONregion 必须与 GKE 集群所在的区域相同。
    • GKE_CLUSTER:现有 GKE 集群的名称。
    • BUCKET:(可选)您可以指定 Dataproc 将用于暂存工件的 Cloud Storage 存储桶的名称。如果您没有指定存储桶,GKE 上的 Dataproc 将创建一个暂存存储桶。
    • DP_POOLNAME:要在 GKE 集群上创建的节点池的名称。
    • PHS_CLUSTER:(可选)Dataproc PHS 服务器,用于查看活跃和已删除的 Dataproc on GKE 集群上的 Spark 作业历史记录。PHS 集群必须与 GKE 虚拟集群上的 Dataproc 位于同一区域。
  2. 运行以下命令:

    gcloud dataproc clusters gke create ${DP_CLUSTER} \
        --region=${REGION} \
        --gke-cluster=${GKE_CLUSTER} \
        --spark-engine-version=latest \
        --staging-bucket=${BUCKET} \
        --pools="name=${DP_POOLNAME},roles=default" \
        --setup-workload-identity \
        --history-server-cluster=${PHS_CLUSTER}
    
    备注:

    • --spark-engine-version:Dataproc 集群上使用的 Spark 映像版本。您可以使用标识符(例如 33.1latest),也可以指定完整的次要版本(例如 3.1-dataproc-5)。
    • --staging-bucket:删除此标志,以使 Dataproc on GKE 创建暂存存储桶。
    • --pools:此标志用于指定 Dataproc 将创建或用于执行工作负载的新节点池或现有节点池。列出 GKE 节点池中的 Dataproc 设置,以英文逗号分隔,例如:
      --pools=name=dp-default,roles=default,machineType=e2-standard-4,min=0,max=10
      
      您必须指定节点池 namerole。其他节点池设置是可选的。您可以使用多个 --pools 标志指定多个节点池。必须至少有一个节点池具有 default 角色。所有节点池都必须具有相同的位置。
    • --setup-workload-identity:此标志会启用 Workload Identity 绑定。这些绑定允许 Kubernetes 服务帐号 (KSA) 充当虚拟集群的默认 Dataproc 虚拟机服务帐号(数据平面身份)

REST

virtualClusterConfig 作为 Dataproc API cluster.create 请求的一部分完成。

在使用任何请求数据之前,请先进行以下替换:

  • PROJECT:Google Cloud 项目 ID
  • REGION:Dataproc 虚拟集群区域(与现有 GKE 集群区域相同的区域)
  • DP_CLUSTER:Dataproc 集群名称
  • GKE_CLUSTER:GKE 集群名称
  • NODE_POOL:节点池名称
  • PHS_CLUSTER永久性历史记录服务器 (PHS) 集群名称
  • BUCKET:(可选)暂存存储桶名称。将此字段留空,让 Dataproc on GKE 创建暂存存储桶。

HTTP 方法和网址:

POST https://dataproc.googleapis.com/v1/projects/project-id/regions/region/clusters

请求 JSON 正文:

{
  "clusterName":"DP_CLUSTER",
  "projectId":"PROJECT",
  "virtualClusterConfig":{
    "auxiliaryServicesConfig":{
      "sparkHistoryServerConfig":{
        "dataprocCluster":"projects/PROJECT/regions/REGION/clusters/PHS_CLUSTER"
      }
    },
    "kubernetesClusterConfig":{
      "gkeClusterConfig":{
        "gkeClusterTarget":"projects/PROJECT/locations/REGION/clusters/GKE_CLUSTER",
        "nodePoolTarget":[
          {
"nodePool":"projects/PROJECT/locations/REGION/clusters/GKE_CLUSTER/nodePools/NODE_POOL",
            "roles":[
              "DEFAULT"
            ]
          }
        ]
      },
      "kubernetesSoftwareConfig":{
        "componentVersion":{
          "SPARK":"latest"
        }
      }
    },
    "stagingBucket":"BUCKET"
  }
}

如需发送您的请求,请展开以下选项之一:

您应该收到类似以下内容的 JSON 响应:

{
  "projectId":"PROJECT",
  "clusterName":"DP_CLUSTER",
  "status":{
    "state":"RUNNING",
    "stateStartTime":"2022-04-01T19:16:39.865716Z"
  },
  "clusterUuid":"98060b77-...",
  "statusHistory":[
    {
      "state":"CREATING",
      "stateStartTime":"2022-04-01T19:14:27.340544Z"
    }
  ],
  "labels":{
    "goog-dataproc-cluster-name":"DP_CLUSTER",
    "goog-dataproc-cluster-uuid":"98060b77-...",
    "goog-dataproc-location":"REGION",
    "goog-dataproc-environment":"prod"
  },
  "virtualClusterConfig":{
    "stagingBucket":"BUCKET",
    "kubernetesClusterConfig":{
      "kubernetesNamespace":"dp-cluster",
      "gkeClusterConfig":{
"gkeClusterTarget":"projects/PROJECT/locations/REGION/clusters/GKE_CLUSTER",
        "nodePoolTarget":[
          {
"nodePool":"projects/PROJECT/locations/REGION/clusters/GKE_CLUSTER/nodePools/NODE_POOL",
            "roles":[
              "DEFAULT"
            ]
          }
        ]
      },
      "kubernetesSoftwareConfig":{
        "componentVersion":{
          "SPARK":"3.1-..."
        },
        "properties":{
          "dpgke:dpgke.unstable.outputOnly.endpoints.sparkHistoryServer":"https://...",
          "spark:spark.eventLog.dir":"gs://BUCKET/.../spark-job-history",
          "spark:spark.eventLog.enabled":"true"
        }
      }
    },
    "auxiliaryServicesConfig":{
      "sparkHistoryServerConfig":{
        "dataprocCluster":"projects/PROJECT/regions/REGION/clusters/PHS_CLUSTER"
      }
    }
  }

提交 Spark 作业

在 Dataproc on GKE 虚拟集群运行后,使用 Google Cloud 控制台、gcloud CLI 或 Dataproc jobs.submit API(使用直接 HTTP 请求或 Cloud 客户端库提交 Spark 作业

gcloud CLI Spark 作业示例

gcloud dataproc jobs submit spark \
    --region=${REGION} \
    --cluster=${DP_CLUSTER} \
    --class=org.apache.spark.examples.SparkPi \
    --jars=local:///usr/lib/spark/examples/jars/spark-examples.jar \
    -- 1000

gcloud CLI PySpark 作业示例

gcloud dataproc jobs submit pyspark \
    --region=${REGION} \
    --cluster=${DP_CLUSTER} \
    local:///usr/lib/spark/examples/src/main/python/pi.py \
    -- 10

gcloud CLI SparkR 作业示例

gcloud dataproc jobs submit spark-r \
    --region=${REGION} \
    --cluster=${DP_CLUSTER} \
    local:///usr/lib/spark/examples/src/main/r/dataframe.R

清理