在 Dataproc on Google Kubernetes Engine 上运行 Spark 作业

准备工作

  1. 您必须创建标准服务(而非 Autopilot) Google Kubernetes Engine (GKE) 区域级区域级集群 具有 Workload Identity 的 集群上启用的 IP 地址

创建 Dataproc on GKE 虚拟集群

我们将创建一个 Dataproc on GKE 虚拟集群 作为其部署平台 Dataproc 组件。它是一种虚拟资源,不同于 Compute Engine 上的 Dataproc 集群,不包含单独的 Dataproc 主虚拟机和工作器虚拟机

  • Dataproc on GKE 在 GKE 内创建节点池 Dataproc on GKE 虚拟集群。

  • Dataproc on GKE 作业作为 Pod 在这些节点池上运行。节点池以及节点池上的 Pod 调度由 GKE 管理。

  • 创建多个虚拟集群。您可以创建和运行多个 集群上,以改进 通过在虚拟集群之间共享节点池来提升资源利用率。

    • 每个虚拟集群: <ph type="x-smartling-placeholder">
        </ph>
      • 使用单独的属性创建 引擎版本和工作负载身份
      • 隔离在单独的 GKE 命名空间中 在 GKE 集群上

控制台

  1. 在 Google Cloud 控制台中,前往 Dataproc 集群页面。

    转到集群

  2. 点击创建集群

  3. 创建 Dataproc 集群对话框中,点击 GKE 上的集群行中的创建

  4. 设置集群面板中,执行以下操作:

    1. 集群名称字段中,输入集群的名称。
    2. 区域列表中, 区域 适用于 Dataproc on GKE 虚拟集群的此区域必须与现有 GKE 集群所在的区域(您将在下一部分中选择)相同。
    3. Kubernetes 集群字段中,点击浏览以选择 现有 GKE 集群所在的区域。
    4. 可选:在 Cloud Storage 暂存桶字段中,您可以点击浏览以选择现有的 Cloud Storage 存储桶。Dataproc on GKE 将在存储桶中暂存工件。忽略此字段可让 Dataproc on GKE 创建一个暂存存储桶。
  5. 在左侧面板中,点击配置节点池,然后在节点池面板中,点击添加池

    1. 如需重复使用现有的 Dataproc on GKE 节点池,请执行以下操作:
      1. 点击重复使用现有节点池
      2. 输入现有节点池的名称,然后选择其角色。必须至少有一个节点池具有 DEFAULT 角色。
      3. 点击完成
    2. 如需创建新的 Dataproc on GKE 节点池,请执行以下操作:
      1. 点击创建新节点池
      2. 输入以下节点池值: <ph type="x-smartling-placeholder">
          </ph>
        • 节点池名称
        • 角色:至少有一个节点池必须具有 DEFAULT 角色。
        • 位置:在 Dataproc on GKE 集群区域内指定一个可用区。
        • 节点池机器类型
        • CPU 平台
        • 可抢占性
        • Min:节点数下限。
        • Max:节点数上限。节点数上限必须大于 0。
    3. 点击添加池以添加更多节点池。所有节点池都必须具有位置信息。您最多可以添加四个节点池。
  6. (可选)如果您已设置 Dataproc Persistent History Server (PHS) 以用于查看 GKE 上活跃和已删除的 Dataproc 集群的 Spark 作业历史记录,请点击自定义集群。然后,在历史记录服务器集群中 字段,浏览并选择您的 PHS 集群。PHS 集群必须位于 与 Dataproc on GKE 虚拟集群位于同一区域。

  7. 点击创建以创建 Dataproc 集群。您的 Dataproc on GKE 集群会显示在集群页面上的列表中。 在集群准备好投入使用之前,状态为正在预配,然后状态会更改为正在运行

gcloud

设置环境变量,然后在本地或在 Cloud Shell 中运行 gcloud dataproc clusters gke create 命令以创建 Dataproc on GKE 集群。

  1. 设置环境变量:

    DP_CLUSTER=Dataproc on GKE  cluster-name \
      REGION=region \
      GKE_CLUSTER=GKE cluster-name \
      BUCKET=Cloud Storage bucket-name \
      DP_POOLNAME=node pool-name
      PHS_CLUSTER=Dataproc PHS server name
    
    注意:

    • DP_CLUSTER:设置 Dataproc 虚拟集群名称,该名称必须以 一个小写字母,后跟最多 54 个小写字母、数字或 连字符。并且不能以连字符结尾。
    • REGIONregion 必须与 GKE 集群所在的区域相同。
    • GKE_CLUSTER:现有 GKE 集群的名称。
    • BUCKET:(可选)您可以指定 Cloud Storage 存储桶、 Dataproc 将用于暂存工件。如果您未指定存储桶,GKE 上的 Dataproc 将创建一个暂存存储桶。
    • DP_POOLNAME节点池的名称 如何在 GKE 集群上创建
    • PHS_CLUSTER:(可选)Dataproc PHS 服务器,用于查看活跃和已删除的 Dataproc on GKE 集群上的 Spark 作业历史记录。PHS 集群必须与 Dataproc on GKE 虚拟集群。
  2. 运行以下命令:

    gcloud dataproc clusters gke create ${DP_CLUSTER} \
        --region=${REGION} \
        --gke-cluster=${GKE_CLUSTER} \
        --spark-engine-version=latest \
        --staging-bucket=${BUCKET} \
        --pools="name=${DP_POOLNAME},roles=default" \
        --setup-workload-identity \
        --history-server-cluster=${PHS_CLUSTER}
    
    注意:

    • --spark-engine-versionSpark 映像版本 部署在 Dataproc 集群上的资源。您可以 使用标识符,例如 33.1latest。 也可以指定完整的次要版本,例如 3.1-dataproc-5
    • --staging-bucket:删除此标志以在 GKE 上启用 Dataproc 创建暂存存储桶
    • --pools:此标志用于指定新节点池或现有节点池 由 Dataproc 创建或使用的资源来执行工作负载。列表 Dataproc on GKE 节点池设置、 以英文逗号分隔,例如:
      --pools=name=dp-default,roles=default,machineType=e2-standard-4,min=0,max=10
      
      您必须指定节点池 namerole。其他节点池 设置都是选填的您可以使用多个 --pools 标志 指定多个节点池至少一个节点池必须具有 default 角色。所有节点池都必须位于同一位置。
    • --setup-workload-identity:此标志用于启用 Workload Identity 绑定。这些绑定允许 Kubernetes 服务账号 (KSA) 充当虚拟集群的默认 Dataproc 虚拟机服务账号(数据平面身份)

REST

完成 virtualClusterConfig 作为 Dataproc API 的一部分 cluster.create 请求。

在使用任何请求数据之前,请先进行以下替换:

  • PROJECT:Google Cloud 项目 ID
  • REGION:Dataproc 虚拟集群区域(与现有 GKE 集群区域相同)
  • DP_CLUSTER:Dataproc 集群名称
  • GKE_CLUSTER:GKE 集群名称
  • NODE_POOL:节点池名称
  • PHS_CLUSTERPersistent History Server (PHS) 集群名称
  • BUCKET:(可选)暂存桶名称。留空此字段可让 Dataproc on GKE 创建一个预演存储桶。

HTTP 方法和网址:

POST https://dataproc.googleapis.com/v1/projects/project-id/regions/region/clusters

请求 JSON 正文:

{
  "clusterName":"DP_CLUSTER",
  "projectId":"PROJECT",
  "virtualClusterConfig":{
    "auxiliaryServicesConfig":{
      "sparkHistoryServerConfig":{
        "dataprocCluster":"projects/PROJECT/regions/REGION/clusters/PHS_CLUSTER"
      }
    },
    "kubernetesClusterConfig":{
      "gkeClusterConfig":{
        "gkeClusterTarget":"projects/PROJECT/locations/REGION/clusters/GKE_CLUSTER",
        "nodePoolTarget":[
          {
"nodePool":"projects/PROJECT/locations/REGION/clusters/GKE_CLUSTER/nodePools/NODE_POOL",
            "roles":[
              "DEFAULT"
            ]
          }
        ]
      },
      "kubernetesSoftwareConfig":{
        "componentVersion":{
          "SPARK":"latest"
        }
      }
    },
    "stagingBucket":"BUCKET"
  }
}

如需发送您的请求,请展开以下选项之一:

您应该收到类似以下内容的 JSON 响应:

{
  "projectId":"PROJECT",
  "clusterName":"DP_CLUSTER",
  "status":{
    "state":"RUNNING",
    "stateStartTime":"2022-04-01T19:16:39.865716Z"
  },
  "clusterUuid":"98060b77-...",
  "statusHistory":[
    {
      "state":"CREATING",
      "stateStartTime":"2022-04-01T19:14:27.340544Z"
    }
  ],
  "labels":{
    "goog-dataproc-cluster-name":"DP_CLUSTER",
    "goog-dataproc-cluster-uuid":"98060b77-...",
    "goog-dataproc-location":"REGION",
    "goog-dataproc-environment":"prod"
  },
  "virtualClusterConfig":{
    "stagingBucket":"BUCKET",
    "kubernetesClusterConfig":{
      "kubernetesNamespace":"dp-cluster",
      "gkeClusterConfig":{
"gkeClusterTarget":"projects/PROJECT/locations/REGION/clusters/GKE_CLUSTER",
        "nodePoolTarget":[
          {
"nodePool":"projects/PROJECT/locations/REGION/clusters/GKE_CLUSTER/nodePools/NODE_POOL",
            "roles":[
              "DEFAULT"
            ]
          }
        ]
      },
      "kubernetesSoftwareConfig":{
        "componentVersion":{
          "SPARK":"3.1-..."
        },
        "properties":{
          "dpgke:dpgke.unstable.outputOnly.endpoints.sparkHistoryServer":"https://...",
          "spark:spark.eventLog.dir":"gs://BUCKET/.../spark-job-history",
          "spark:spark.eventLog.enabled":"true"
        }
      }
    },
    "auxiliaryServicesConfig":{
      "sparkHistoryServerConfig":{
        "dataprocCluster":"projects/PROJECT/regions/REGION/clusters/PHS_CLUSTER"
      }
    }
  }

提交 Spark 作业

在 Dataproc on GKE 虚拟集群运行后,使用 Google Cloud 控制台、gcloud CLI 或 Dataproc jobs.submit API(使用直接 HTTP 请求或 Cloud 客户端库提交 Spark 作业

gcloud CLI Spark 作业示例

gcloud dataproc jobs submit spark \
    --region=${REGION} \
    --cluster=${DP_CLUSTER} \
    --class=org.apache.spark.examples.SparkPi \
    --jars=local:///usr/lib/spark/examples/jars/spark-examples.jar \
    -- 1000

gcloud CLI PySpark 作业示例

gcloud dataproc jobs submit pyspark \
    --region=${REGION} \
    --cluster=${DP_CLUSTER} \
    local:///usr/lib/spark/examples/src/main/python/pi.py \
    -- 10

gcloud CLI SparkR 作业示例

gcloud dataproc jobs submit spark-r \
    --region=${REGION} \
    --cluster=${DP_CLUSTER} \
    local:///usr/lib/spark/examples/src/main/r/dataframe.R

清理