Dataproc Enhanced Flexibility Mode

Dataproc Enhanced Flexibility Mode (EFM) manages shuffle data to minimize job progress delays caused by the removal of nodes from a running cluster. EFM offloads shuffle data in one of two user-selectable modes:

  1. Primary-worker shuffle. Mappers write data to primary workers. Workers pull from those remote nodes during the reduce phase. This mode is only available to, and is recommended for, Spark jobs.

  2. HCFS (Hadoop Compatible File System) shuffle. Mappers write data to an HCFS implementation (HDFS by default). As with primary worker mode, only primary workers participate in HDFS and HCFS implementations (if HCFS shuffle uses the Cloud Storage Connector, data is stored off-cluster). This mode can benefit jobs with small amounts of data, but due to scaling limitations, it is not recommended for larger jobs.

Since both EFM modes don't store intermediate shuffle data on secondary workers, EFM is well suited to clusters that use preemptible VMs or only autoscale the secondary worker group.

Limitations:

  • Apache Hadoop YARN jobs that don't support AppMaster relocation can fail in Enhanced Flexibility Mode (see When to wait for AppMasters to finish).
  • Enhanced Flexibility Mode is not recommended:
    • on a cluster that has primary workers only
    • on streaming jobs since it can take up to 30 minutes after job completion to clean up intermediate shuffle data.
  • Enhanced Flexibility Mode is not supported:
    • when primary worker autoscaling is enabled. In most cases, primary workers will continue to store shuffle data that is not automatically migrated. Downscaling the primary worker group negates EFM benefits.
    • when Spark jobs run on a cluster with graceful decommissioning enabled. Graceful decommissioning and EFM can work at cross purposes since the YARN graceful decommission mechanism keeps the DECOMMISSIONING nodes until all involved applications complete.

Using Enhanced Flexibility Mode

Enhanced Flexibility mode is configured per execution engine, and must be configured during cluster creation.

  • The Spark EFM implementation is configured with the dataproc:efm.spark.shuffle cluster property. Valid property values:

    • primary-worker for primary worker shuffle (recommended)
    • hcfs for HCFS-based shuffle. This mode is deprecated and is available only on clusters running image version 1.5. Not recommended for new workflows.
  • The Hadoop MapReduce implementation is configured with the dataproc:efm.mapreduce.shuffle cluster property. Valid property values:

    • hcfs

Example: Create a cluster with primary-worker shuffle for Spark and HCFS shuffle for MapReduce:

gcloud dataproc clusters create cluster-name \
    --region=region \
    --properties=dataproc:efm.spark.shuffle=primary-worker \
    --properties=dataproc:efm.mapreduce.shuffle=hcfs \
    --worker-machine-type=n1-highmem-8 \
    --num-workers=25 \
    --num-worker-local-ssds=2 \
    --secondary-worker-type=preemptible \
    --secondary-worker-boot-disk-size=500GB \
    --num-secondary-workers=25

Apache Spark example

  1. Run a WordCount job against public Shakespeare text using the Spark examples jar on the EFM cluster.
    gcloud dataproc jobs submit spark \
        --cluster=cluster-name \
        --region=region \
        --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
        --class=org.apache.spark.examples.JavaWordCount \
        -- gs://apache-beam-samples/shakespeare/macbeth.txt
    

Apache Hadoop MapReduce example

  1. Run a small teragen job to generate input data in Cloud Storage for a later terasort job using the mapreduce examples jar on the EFM cluster.

    gcloud dataproc jobs submit hadoop \
        --cluster=cluster-name \
        --region=region \
        --jar=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \
        -- teragen 1000 Cloud Storage output URI (for example, gs://terasort/input)
    

  2. Run a terasort job on the data.

    gcloud dataproc jobs submit hadoop \
        --cluster=cluster-name \
        --region=region \
        --jar=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \
        -- terasort gs://terasort/input gs://terasort/output
    

Configuring local SSDs for primary worker shuffle

Primary-worker and HDFS shuffle implementations write intermediate shuffle data to VM-attached disks, and benefit from the additional throughput and IOPS offered by local SSDs. To facilitate resource allocation, target a goal of approximately 1 local SSD partition per 4 vCPUs when configuring primary worker machines.

To attach local SSDs, pass the --num-worker-local-ssds flag to the gcloud Dataproc clusters create command.

Generally, you won't need local SSDs on secondary workers. Adding local SSDs to a cluster's secondary workers (using the --num-secondary-worker-local-ssds flag) is often of less importance because secondary workers don't write shuffle data locally. However, since local SSDs improve local disk performance, you may decide to add local SSDs to secondary workers if you expect jobs to be I/O bound due to local disk use: your job uses significant local disk for scratch space or your partitions are too large to fit in memory and will spill to disk.

Secondary worker ratio

Since secondary workers write their shuffle data to primary workers, your cluster must contain a sufficient number of primary workers with sufficient CPU, memory, and disk resources to accommodate your job's shuffle load. For autoscaling clusters, to prevent the primary group from scaling and causing unwanted behavior, set minInstances to the maxInstances value in the autoscaling policy for the primary worker group.

If you have a high secondary-to-primary workers ratio (for example, 10:1), monitor the CPU utilization, network, and disk usage of primary workers to determine if they are overloaded. To do this:

  1. Go to the VM instances page in the Google Cloud console.

  2. Click the check box to the left side of primary worker.

  3. Click the MONITORING tab to view the primary worker's CPU Utilization, Disk IOPS, Network Bytes, and other metrics.

If primary workers are overloaded, consider scaling up primary workers manually.

Resizing the primary worker group

The primary worker group can be safely scaled up, but downscaling the primary worker group can negatively impact job progress. Operations that downscale the primary worker group should use graceful decommissioning, which is enabled by setting the --graceful-decommission-timeout flag.

Autoscaled clusters: Primary worker group scaling is disabled on EFM clusters with autoscaling policies. To resize the primary worker group on an autoscaled cluster:

  1. Disable autoscaling.

    gcloud dataproc clusters update \
        --cluster=cluster-name \
        --region=region \
        --disable-autoscaling
    

  2. Scale the primary group.

    gcloud dataproc clusters update \
        --cluster=cluster-name \
        --region=region \
        --num-workers=num-primary-workers \
        --graceful-decommission-timeout=graceful-decommission-timeout # (if downscaling)
    

  3. Re-enable autoscaling:

    gcloud dataproc clusters update \
        --cluster=cluster-name \
        --region=region \
        --autoscaling-policy=autoscaling-policy
    

Monitoring primary worker disk use

Primary workers must have sufficient disk space for the cluster's shuffle data. You can monitor this indirectly through the remaining HDFS capacity metric. As local disk fills up, space becomes unavailable for HDFS, and the remaining capacity decreases.

By default, when a primary worker's local disk use exceeds 90% of capacity, the node will be marked as UNHEALTHY in the YARN node UI. If you experience disk capacity issues, you can delete unused data from HDFS or scale up the primary worker pool.

Advanced configuration

Partitioning and parallelism

When submitting a MapReduce or Spark job, configure an appropriate level of partitioning. Deciding on the number of input and output partitions for a shuffle stage involves a trade off among different performance characteristics. It is best to experiment with values that work for your job shapes.

Input partitions

MapReduce and Spark input partitioning are determined by the input data set. When reading files from Cloud Storage, each task processes approximately one "block size" worth of data.

  • For Spark SQL jobs, the maximum partition size is controlled by spark.sql.files.maxPartitionBytes. Consider increasing it to 1GB: spark.sql.files.maxPartitionBytes=1073741824.

  • For MapReduce jobs and Spark RDDs, partition size is typically controlled with fs.gs.block.size, which defaults to 128MB. Consider increasing it to 1GB. You can also set InputFormat specific properties such as mapreduce.input.fileinputformat.split.minsize and mapreduce.input.fileinputformat.split.maxsize

    • For MapReduce jobs: --properties fs.gs.block.size=1073741824
    • For Spark RDDs: --properties spark.hadoop.fs.gs.block.size=1073741824

Output partitions

The number of tasks in subsequent stages is controlled by several properties. On larger jobs that process more than 1TB, consider having at least 1GB per partition.

  • For MapReduce jobs, the number of output partitions is controlled by mapreduce.job.reduces.

  • For Spark SQL, the number of output partitions is controlled by spark.sql.shuffle.partitions.

  • For Spark jobs using the RDD API, you can specify the number of output partitions or set spark.default.parallelism.

Shuffle tuning for primary worker shuffle

The most significant property is --properties yarn:spark.shuffle.io.serverThreads=<num-threads>. Note that this is a cluster-level YARN property because the Spark shuffle server runs as part of the Node Manager. It defaults to twice (2x) number of cores on the machine (for example, 16 threads on an n1-highmem-8). If "Shuffle Read Blocked Time" is larger than 1 second, and primary workers have not reached network, CPU or disk limits, consider increasing the number of shuffle server threads.

On larger machine types, consider increasing spark.shuffle.io.numConnectionsPerPeer, which defaults to 1. (For example, set it to 5 connections per pair of hosts).

Increasing retries

The maximum number of attempts permitted for app masters, tasks, and stages can be configured by setting the following properties:

yarn:yarn.resourcemanager.am.max-attempts
mapred:mapreduce.map.maxattempts
mapred:mapreduce.reduce.maxattempts
spark:spark.task.maxFailures
spark:spark.stage.maxConsecutiveAttempts

Since app masters and tasks are more frequently terminated in clusters that use many preemptible VMs or autoscaling without graceful decommissioning, increasing the values of the above properties in those clusters can help (note that using EFM with Spark and graceful decommissioning is not supported).

Configuring HDFS For HCFS shuffle

To improve the performance of large shuffles, you can decrease lock contention in the NameNode by setting dfs.namenode.fslock.fair=false. Note that this risks starving individual requests, but may improve cluster-wide throughput. To further improve NameNode performance, you can attach local SSDs to the master node by setting --num-master-local-ssds. You can also add local SSDs to primary workers to improve DataNode performance by setting --num-worker-local-ssds.

Other Hadoop Compatible File Systems for HCFS shuffle

By default, EFM HCFS shuffle data is written to HDFS, but you can use any Hadoop Compatible File System (HCFS). For example, you may decide to write shuffle to Cloud Storage or to a different cluster's HDFS. To specify a file system, you can point fs.defaultFS to the target file system when you submit a job to your cluster.

YARN graceful decommissioning on EFM clusters

YARN Graceful Decommissioning can be used to remove nodes quickly with minimal impact on running applications. For autoscaling clusters, the graceful decommissioning timeout can be set in an AutoscalingPolicy that is attached to the EFM cluster.

MapReduce EFM enhancements to graceful decommissioning

  1. Since intermediate data is stored in a distributed file system, nodes can be removed from an EFM cluster as soon as all containers running on those nodes have finished. By comparison, nodes are not removed on standard Dataproc clusters until the application has finished.

  2. Node removal does not wait for app masters running on a node to finish. When the app master container is terminated, it is rescheduled on another node that is not being decommissioned. Job progress is not lost: the new app master quickly recovers state from the previous app master by reading job history.

Using graceful decommissioning on an EFM cluster with MapReduce

  1. Create an EFM cluster with an equal number of primary and secondary workers.

    gcloud dataproc clusters create cluster-name \
        --properties=dataproc:efm.mapreduce.shuffle=hcfs \
        --region=region \
        --num-workers=5 \
        --num-secondary-workers=5
    

  2. Run a mapreduce job that calculates the value of pi using the mapreduce examples jar on the cluster.

    gcloud dataproc jobs submit hadoop \
        --cluster=cluster-name \
        --region=region \
        --jar=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \
        -- pi 1000 10000000
    

  3. While the job is running, scale down the cluster using graceful decommissioning.

    gcloud dataproc clusters update cluster-name \
        --region=region \
        --num-secondary-workers=0 \
        --graceful-decommission-timeout=1h
    
    The nodes will be removed from the cluster quickly before the job finishes, while minimizing loss of job progress. Temporary pauses in job progress can occur due to:

    • App master failover. If job progress drops to 0% and then immediately jumps up to the pre-drop value, the app master may have terminated and a new app master recovered its state. This should not significantly affect the progress of the job since failover happens quickly.
    • VM preemption. Since HDFS preserves only complete, not partial, map task outputs, temporary pauses in job progress can occur when a VM is preempted while working on a map task.

To speed up node removal, you can scale down the cluster without graceful decommissioning by omitting the --graceful-decommission-timeout flag in the previous gcloud command example. Job progress from completed map tasks will be preserved, but partially completed map task output will be lost (the map tasks will be rerun).