Run a Spark job on Dataproc on Google Kubernetes Engine

Before you begin

  1. You must have created a standard (not autopilot) Google Kubernetes Engine (GKE) zonal or regional cluster that has Workload Identity enabled on the cluster.

Create a Dataproc on GKE virtual cluster

A Dataproc on GKE virtual cluster is created as the deployment platform for Dataproc components. It's a virtual resource, and unlike a Dataproc on Compute Engine cluster, does not include separate Dataproc master and worker VMs.

  • Dataproc on GKE creates node pools within a GKE cluster when you create a Dataproc on GKE virtual cluster.

  • Dataproc on GKE jobs are run as pods on these node pools. The node pools and scheduling of pods on the node pools are managed by GKE.

  • Create multiple virtual clusters. You can create and run multiple virtual clusters on a GKE cluster to obtain improved resource utilization by sharing node pools across the virtual clusters.

    • Each virtual cluster:
      • is created with separate properties, including Spark engine version and workload identity
      • is isolated within a separate GKE namespace on the GKE cluster

Console

  1. In the Google Cloud console, go to the Dataproc Clusters page.

    Go to Clusters

  2. Click Create cluster.

  3. In the Create Dataproc cluster dialog, click Create in the Cluster on GKE row.

  4. In the Set up cluster panel:

    1. In the Cluster Name field, enter a name for the cluster.
    2. In the Region list, select a region for the Dataproc on GKE virtual cluster. This region must be the same region where your existing GKE cluster is located (which you select in the next item).
    3. In the Kubernetes Cluster field, click Browse to select the region where your existing GKE cluster is located.
    4. Optional: In the Cloud Storage staging bucket field, you can click Browse to select an existing Cloud Storage bucket. Dataproc on GKE will stage artifacts in the bucket. Ignore this field to have Dataproc on GKE create a staging bucket.
  5. In the left panel, click Configure Node pools, then in the Node pools panel, click Add a pool.

    1. To reuse an existing Dataproc on GKE node pool:
      1. Click Reuse existing node pool.
      2. Input the name of the existing node pool and select its Role. At least one node pool must have the DEFAULT role.
      3. Click Done.
    2. To create a new Dataproc on GKE node pool:
      1. Click Create a new node pool.
      2. Input the following node pool values:
        • Node pool name
        • Role: At least one node pool must have the DEFAULT role.
        • Location: Specify a zone within the Dataproc on GKE cluster region.
        • Node pool machine type
        • CPU platform
        • Preemptibility
        • Min: Minimum node count.
        • Max: Maximum node count. The maximum node count must be greater than 0.
    3. Click Add a pool to add more node pools. All node pools must have the location. You can add a total of four node pools.
  6. (Optional) If you have set up a Dataproc Persistent History Server (PHS) to use to view Spark job history, on active and deleted Dataproc on GKE clusters, click Customize cluster. Then in the History server cluster field, browse for and choose your PHS cluster. The PHS cluster must be located in the same region as the Dataproc on GKE virtual cluster.

  7. Click Create to create the Dataproc cluster. Your Dataproc on GKE cluster appears in a list on the Clusters page. Its status is Provisioning until the cluster is ready to use, and then the status changes to Running.

gcloud

Set environment variables, then run the gcloud dataproc clusters gke create command locally or in Cloud Shell to create a Dataproc on GKE cluster.

  1. Set environment variables:

    DP_CLUSTER=Dataproc on GKE  cluster-name \
      REGION=region \
      GKE_CLUSTER=GKE cluster-name \
      BUCKET=Cloud Storage bucket-name \
      DP_POOLNAME=node pool-name
      PHS_CLUSTER=Dataproc PHS server name
    
    Notes:

    • DP_CLUSTER: Set the Dataproc virtual cluster name, which must start with a lowercase letter, followed by up to 54 lowercase letters, numbers, or hyphens. It and cannot end with a hyphen.
    • REGION: The region must be the same as the region where the GKE cluster is located.
    • GKE_CLUSTER: The name of your existing GKE cluster.
    • BUCKET: (Optional) You can specify the name of a Cloud Storage bucket, which Dataproc will use to stage artifacts. If you do not specify a bucket, Dataproc on GKE will create a staging bucket.
    • DP_POOLNAME: The name of a node pool to create on the GKE cluster.
    • PHS_CLUSTER: (Optional) Dataproc PHS Server to use to view Spark job history on active and deleted Dataproc on GKE clusters. The PHS cluster must be located in the same region as the Dataproc on GKE virtual cluster.
  2. Run the command:

    gcloud dataproc clusters gke create ${DP_CLUSTER} \
        --region=${REGION} \
        --gke-cluster=${GKE_CLUSTER} \
        --spark-engine-version=latest \
        --staging-bucket=${BUCKET} \
        --pools="name=${DP_POOLNAME},roles=default" \
        --setup-workload-identity \
        --history-server-cluster=${PHS_CLUSTER}
    
    Notes:

    • --spark-engine-version: The Spark image version used on the Dataproc cluster. You can use an identifier, such as 3, 3.1, or latest, or you can specify the full subminor version, such as 3.1-dataproc-5.
    • --staging-bucket: Delete this flag to have Dataproc on GKE create a staging bucket.
    • --pools: This flag is used to specify a new or existing node pool that Dataproc will create or use to perform the workload. List Dataproc on GKE node pool settings, separated by commas, for example:
      --pools=name=dp-default,roles=default,machineType=e2-standard-4,min=0,max=10
      
      You must specify the node pool name and role. Other node pool settings are optional. You can use multiple --pools flags to specify multiple node pools. At least one node pool must have the default role. All node pools must have the same location.
    • --setup-workload-identity: This flag enables Workload Identity bindings. These bindings allow the Kubernetes service accounts (KSAs) to act as the default Dataproc VM Service Account (Data Plane identity) of the virtual cluster.

REST

Complete a virtualClusterConfig as part of a Dataproc API cluster.create request.

Before using any of the request data, make the following replacements:

  • PROJECT: Google Cloud project ID
  • REGION: Dataproc virtual cluster region (same region as the existing GKE cluster region)
  • DP_CLUSTER: Dataproc cluster name
  • GKE_CLUSTER: GKE cluster name
  • NODE_POOL: Node pool name
  • PHS_CLUSTER: Persistent History Server (PHS) cluster name
  • BUCKET: (Optional) Staging bucket name. Leave this empty to have Dataproc on GKE create a staging bucket.

HTTP method and URL:

POST https://dataproc.googleapis.com/v1/projects/project-id/regions/region/clusters

Request JSON body:

{
  "clusterName":"DP_CLUSTER",
  "projectId":"PROJECT",
  "virtualClusterConfig":{
    "auxiliaryServicesConfig":{
      "sparkHistoryServerConfig":{
        "dataprocCluster":"projects/PROJECT/regions/REGION/clusters/PHS_CLUSTER"
      }
    },
    "kubernetesClusterConfig":{
      "gkeClusterConfig":{
        "gkeClusterTarget":"projects/PROJECT/locations/REGION/clusters/GKE_CLUSTER",
        "nodePoolTarget":[
          {
"nodePool":"projects/PROJECT/locations/REGION/clusters/GKE_CLUSTER/nodePools/NODE_POOL",
            "roles":[
              "DEFAULT"
            ]
          }
        ]
      },
      "kubernetesSoftwareConfig":{
        "componentVersion":{
          "SPARK":"latest"
        }
      }
    },
    "stagingBucket":"BUCKET"
  }
}

To send your request, expand one of these options:

You should receive a JSON response similar to the following:

{
  "projectId":"PROJECT",
  "clusterName":"DP_CLUSTER",
  "status":{
    "state":"RUNNING",
    "stateStartTime":"2022-04-01T19:16:39.865716Z"
  },
  "clusterUuid":"98060b77-...",
  "statusHistory":[
    {
      "state":"CREATING",
      "stateStartTime":"2022-04-01T19:14:27.340544Z"
    }
  ],
  "labels":{
    "goog-dataproc-cluster-name":"DP_CLUSTER",
    "goog-dataproc-cluster-uuid":"98060b77-...",
    "goog-dataproc-location":"REGION",
    "goog-dataproc-environment":"prod"
  },
  "virtualClusterConfig":{
    "stagingBucket":"BUCKET",
    "kubernetesClusterConfig":{
      "kubernetesNamespace":"dp-cluster",
      "gkeClusterConfig":{
"gkeClusterTarget":"projects/PROJECT/locations/REGION/clusters/GKE_CLUSTER",
        "nodePoolTarget":[
          {
"nodePool":"projects/PROJECT/locations/REGION/clusters/GKE_CLUSTER/nodePools/NODE_POOL",
            "roles":[
              "DEFAULT"
            ]
          }
        ]
      },
      "kubernetesSoftwareConfig":{
        "componentVersion":{
          "SPARK":"3.1-..."
        },
        "properties":{
          "dpgke:dpgke.unstable.outputOnly.endpoints.sparkHistoryServer":"https://...",
          "spark:spark.eventLog.dir":"gs://BUCKET/.../spark-job-history",
          "spark:spark.eventLog.enabled":"true"
        }
      }
    },
    "auxiliaryServicesConfig":{
      "sparkHistoryServerConfig":{
        "dataprocCluster":"projects/PROJECT/regions/REGION/clusters/PHS_CLUSTER"
      }
    }
  }

Submit a Spark job

After your Dataproc on GKE virtual cluster is running, submit a Spark job using the Google Cloud console, gcloud CLI, or the Dataproc jobs.submit API (by using direct HTTP requests or the Cloud Client Libraries).

gcloud CLI Spark job example:

gcloud dataproc jobs submit spark \
    --region=${REGION} \
    --cluster=${DP_CLUSTER} \
    --class=org.apache.spark.examples.SparkPi \
    --jars=local:///usr/lib/spark/examples/jars/spark-examples.jar \
    -- 1000

gcloud CLI PySpark job example:

gcloud dataproc jobs submit pyspark \
    --region=${REGION} \
    --cluster=${DP_CLUSTER} \
    local:///usr/lib/spark/examples/src/main/python/pi.py \
    -- 10

gcloud CLI SparkR job example:

gcloud dataproc jobs submit spark-r \
    --region=${REGION} \
    --cluster=${DP_CLUSTER} \
    local:///usr/lib/spark/examples/src/main/r/dataframe.R

Clean up