在 Dataproc on Google Kubernetes Engine 上运行 Spark 作业

准备工作

  1. 您必须创建标准服务(而非 Autopilot) Google Kubernetes Engine (GKE) 区域级区域级集群 具有 Workload Identity 集群上启用的 IP 地址

创建 Dataproc on GKE 虚拟集群

我们将创建一个 Dataproc on GKE 虚拟集群 作为其部署平台 Dataproc 组件。它是一种虚拟资源,不同于 Compute Engine 上的 Dataproc 集群,不包含单独的 Dataproc 主虚拟机和工作器虚拟机

  • Dataproc on GKE 在 GKE 内创建节点池 Dataproc on GKE 虚拟集群。

  • Dataproc on GKE 作业作为 Pod 在这些节点池上运行。节点池 节点池上 Pod 的调度由 GKE 管理。

  • 创建多个虚拟集群。您可以创建和运行多个 集群上,以改进 通过在虚拟集群之间共享节点池来提升资源利用率。

    • 每个虚拟集群: <ph type="x-smartling-placeholder">
        </ph>
      • 使用单独的属性创建 引擎版本和工作负载身份
      • 隔离在单独的 GKE 命名空间中 在 GKE 集群上

控制台

  1. 在 Google Cloud 控制台中,前往 Dataproc 集群页面。

    转到集群

  2. 点击创建集群

  3. 创建 Dataproc 集群对话框中,点击创建 GKE 上的集群行。

  4. 设置集群面板中,执行以下操作:

    1. 集群名称字段中,输入集群的名称。
    2. 区域列表中, 区域 适用于 Dataproc on GKE 虚拟集群的此区域必须相同 现有 GKE 集群所在的区域 (您将在下一项中选择)。
    3. Kubernetes 集群字段中,点击浏览以选择 现有 GKE 集群所在的区域。
    4. 可选:在 Cloud Storage 暂存存储桶字段中,您可以点击 浏览以选择现有的 Cloud Storage 存储桶。 Dataproc on GKE 将在存储桶中暂存工件。忽略此项 字段让 Dataproc on GKE 创建暂存存储桶。
  5. 在左侧面板中,点击配置节点池,然后在节点池中 面板中,点击添加池

    1. 要重复使用现有的 Dataproc on GKE 节点池,请执行以下操作: <ph type="x-smartling-placeholder">
        </ph>
      1. 点击重复使用现有节点池
      2. 输入现有节点池的名称 角色。 至少一个节点池必须具有 DEFAULT 角色。
      3. 点击完成
    2. 要创建新的 Dataproc on GKE 节点池,请执行以下操作: <ph type="x-smartling-placeholder">
        </ph>
      1. 点击创建新节点池
      2. 输入以下节点池值: <ph type="x-smartling-placeholder">
          </ph>
        • 节点池名称
        • 角色:必须至少有一个节点池具有 DEFAULT 角色。
        • 位置:在 Dataproc on GKE 集群区域内指定一个可用区。
        • 节点池机器类型
        • CPU 平台
        • 抢占
        • 最小:节点数下限。
        • Max:节点数上限。节点数上限必须大于 0。
    3. 点击添加池以添加更多节点池。全部 节点池必须具有此位置。您总共可以添加四个节点池。
  6. (可选)如果您已设置 Dataproc 永久性历史记录服务器 (PHS) 用于查看活跃和已删除的 Dataproc on GKE 上的 Spark 作业历史记录 请点击自定义集群。然后,在历史记录服务器集群中 字段,浏览并选择您的 PHS 集群。PHS 集群必须位于 与 Dataproc on GKE 虚拟集群位于同一区域。

  7. 点击创建以创建 Dataproc 集群。您的 Dataproc on GKE 集群会显示在集群页面上的列表中。 在集群可供使用之前,其状态为正在预配;并且 然后,状态将更改为正在运行

gcloud

设置环境变量,然后运行 gcloud dataproc clusters gke create 命令来创建 Dataproc on GKE 集群。

  1. 设置环境变量:

    DP_CLUSTER=Dataproc on GKE  cluster-name \
      REGION=region \
      GKE_CLUSTER=GKE cluster-name \
      BUCKET=Cloud Storage bucket-name \
      DP_POOLNAME=node pool-name
      PHS_CLUSTER=Dataproc PHS server name
    
    注意:

    • DP_CLUSTER:设置 Dataproc 虚拟集群名称,该名称必须以 一个小写字母,后跟最多 54 个小写字母、数字或 连字符。并且不能以连字符结尾。
    • REGIONregion 必须与 GKE 集群所在的区域相同。
    • GKE_CLUSTER:现有 GKE 集群的名称。
    • BUCKET:(可选)您可以指定 Cloud Storage 存储桶、 Dataproc 将用于暂存工件。如果您不指定存储桶 Dataproc on GKE 将创建一个暂存存储桶。
    • DP_POOLNAME节点池的名称 如何在 GKE 集群上创建
    • PHS_CLUSTER:(可选)Dataproc PHS 服务器 用于查看活跃和已删除的 Dataproc on GKE 上的 Spark 作业历史记录 集群。PHS 集群必须与 Dataproc on GKE 虚拟集群。
  2. 运行以下命令:

    gcloud dataproc clusters gke create ${DP_CLUSTER} \
        --region=${REGION} \
        --gke-cluster=${GKE_CLUSTER} \
        --spark-engine-version=latest \
        --staging-bucket=${BUCKET} \
        --pools="name=${DP_POOLNAME},roles=default" \
        --setup-workload-identity \
        --history-server-cluster=${PHS_CLUSTER}
    
    注意:

    • --spark-engine-versionSpark 映像版本 部署在 Dataproc 集群上的资源。您可以 使用标识符,例如 33.1latest。 也可以指定完整的次要版本,例如 3.1-dataproc-5
    • --staging-bucket:删除此标志以在 GKE 上启用 Dataproc 创建暂存存储桶
    • --pools:此标志用于指定新节点池或现有节点池 由 Dataproc 创建或使用的资源来执行工作负载。列表 Dataproc on GKE 节点池设置、 以英文逗号分隔,例如:
      --pools=name=dp-default,roles=default,machineType=e2-standard-4,min=0,max=10
      
      您必须指定节点池 namerole。其他节点池 设置是可选的。您可以使用多个 --pools 标志 指定多个节点池必须至少有一个节点池 具有 default 角色。所有节点池都必须 位置。
    • --setup-workload-identity:此标志用于启用 Workload Identity 绑定。这些绑定允许 Kubernetes 服务账号 (KSA) 作为默认 Dataproc 虚拟机服务账号(数据平面身份) 虚拟集群的权限。

REST

完成 virtualClusterConfig 作为 Dataproc API 的一部分 cluster.create 请求。

在使用任何请求数据之前,请先进行以下替换:

  • PROJECT:Google Cloud 项目 ID
  • REGION:Dataproc 虚拟集群区域(与现有 GKE 集群区域相同)
  • DP_CLUSTER:Dataproc 集群名称
  • GKE_CLUSTER:GKE 集群名称
  • NODE_POOL:节点池名称
  • PHS_CLUSTERPersistent History Server (PHS) 集群名称
  • BUCKET:(可选)暂存存储桶的名称。将此字段留空可让 Dataproc on GKE 创建暂存存储桶。

HTTP 方法和网址:

POST https://dataproc.googleapis.com/v1/projects/project-id/regions/region/clusters

请求 JSON 正文:

{
  "clusterName":"DP_CLUSTER",
  "projectId":"PROJECT",
  "virtualClusterConfig":{
    "auxiliaryServicesConfig":{
      "sparkHistoryServerConfig":{
        "dataprocCluster":"projects/PROJECT/regions/REGION/clusters/PHS_CLUSTER"
      }
    },
    "kubernetesClusterConfig":{
      "gkeClusterConfig":{
        "gkeClusterTarget":"projects/PROJECT/locations/REGION/clusters/GKE_CLUSTER",
        "nodePoolTarget":[
          {
"nodePool":"projects/PROJECT/locations/REGION/clusters/GKE_CLUSTER/nodePools/NODE_POOL",
            "roles":[
              "DEFAULT"
            ]
          }
        ]
      },
      "kubernetesSoftwareConfig":{
        "componentVersion":{
          "SPARK":"latest"
        }
      }
    },
    "stagingBucket":"BUCKET"
  }
}

如需发送您的请求,请展开以下选项之一:

您应该收到类似以下内容的 JSON 响应:

{
  "projectId":"PROJECT",
  "clusterName":"DP_CLUSTER",
  "status":{
    "state":"RUNNING",
    "stateStartTime":"2022-04-01T19:16:39.865716Z"
  },
  "clusterUuid":"98060b77-...",
  "statusHistory":[
    {
      "state":"CREATING",
      "stateStartTime":"2022-04-01T19:14:27.340544Z"
    }
  ],
  "labels":{
    "goog-dataproc-cluster-name":"DP_CLUSTER",
    "goog-dataproc-cluster-uuid":"98060b77-...",
    "goog-dataproc-location":"REGION",
    "goog-dataproc-environment":"prod"
  },
  "virtualClusterConfig":{
    "stagingBucket":"BUCKET",
    "kubernetesClusterConfig":{
      "kubernetesNamespace":"dp-cluster",
      "gkeClusterConfig":{
"gkeClusterTarget":"projects/PROJECT/locations/REGION/clusters/GKE_CLUSTER",
        "nodePoolTarget":[
          {
"nodePool":"projects/PROJECT/locations/REGION/clusters/GKE_CLUSTER/nodePools/NODE_POOL",
            "roles":[
              "DEFAULT"
            ]
          }
        ]
      },
      "kubernetesSoftwareConfig":{
        "componentVersion":{
          "SPARK":"3.1-..."
        },
        "properties":{
          "dpgke:dpgke.unstable.outputOnly.endpoints.sparkHistoryServer":"https://...",
          "spark:spark.eventLog.dir":"gs://BUCKET/.../spark-job-history",
          "spark:spark.eventLog.enabled":"true"
        }
      }
    },
    "auxiliaryServicesConfig":{
      "sparkHistoryServerConfig":{
        "dataprocCluster":"projects/PROJECT/regions/REGION/clusters/PHS_CLUSTER"
      }
    }
  }

提交 Spark 作业

Dataproc on GKE 虚拟集群运行后, 提交 Spark 作业 使用 Google Cloud 控制台 gcloud CLI 或 Dataproc jobs.submit API(通过使用直接 HTTP 请求或 Cloud 客户端库)。

gcloud CLI Spark 作业示例

gcloud dataproc jobs submit spark \
    --region=${REGION} \
    --cluster=${DP_CLUSTER} \
    --class=org.apache.spark.examples.SparkPi \
    --jars=local:///usr/lib/spark/examples/jars/spark-examples.jar \
    -- 1000

gcloud CLI PySpark 作业示例

gcloud dataproc jobs submit pyspark \
    --region=${REGION} \
    --cluster=${DP_CLUSTER} \
    local:///usr/lib/spark/examples/src/main/python/pi.py \
    -- 10

gcloud CLI SparkR 作业示例

gcloud dataproc jobs submit spark-r \
    --region=${REGION} \
    --cluster=${DP_CLUSTER} \
    local:///usr/lib/spark/examples/src/main/r/dataframe.R

清理