Dataproc Enhanced Flexibility Mode (EFM) manages shuffle data to minimize job progress delays caused by the removal of nodes from a running cluster. EFM offloads shuffle data in one of two user-selectable modes:
Primary-worker shuffle. Mappers write data to primary workers. Workers pull from those remote nodes during the reduce phase. This mode is only available to, and is recommended for, Spark jobs.
HCFS (Hadoop Compatible File System) shuffle. Mappers write data to an HCFS implementation (HDFS by default). As with primary worker mode, only primary workers participate in HDFS and HCFS implementations (if HCFS shuffle uses the Cloud Storage Connector, data is stored off-cluster). This mode can benefit jobs with small amounts of data, but due to scaling limitations, it is not recommended for larger jobs.
- Apache Hadoop YARN jobs that do not support AppMaster relocation can fail in Enhanced Flexibility Mode (see When to wait for AppMasters to finish).
- Enhanced Flexibility Mode is not recommended:
- on a cluster that has primary workers only.
- Enhanced Flexibility Mode is not supported:
- when primary worker autoscaling is enabled. In most cases, primary workers will continue to store shuffle data that is not automatically migrated. Downscaling the primary worker group negates EFM benefits.
- when Spark jobs run on a cluster with graceful decommissioning enabled. Graceful decommissioning and EFM can work at cross purposes since the YARN graceful decommission mechanism keeps the DECOMMISSIONING nodes until all involved applications complete.
Using Enhanced Flexibility Mode
Enhanced Flexibility mode is configured per execution engine, and must be configured during cluster creation.
The Spark EFM implementation is configured with the
dataproc:efm.spark.shufflecluster property. Valid property values:
primary-workerfor primary worker shuffle (recommended)
hcfsfor HCFS-based shuffle. This mode is deprecated and is available only on clusters running image version 1.5. Not recommended for new workflows.
The Hadoop MapReduce implementation is configured with the
dataproc:efm.mapreduce.shufflecluster property. Valid property values:
Example: Create a cluster with primary-worker shuffle for Spark and HCFS shuffle for MapReduce:
gcloud dataproc clusters create cluster-name \ --region=region \ --properties=dataproc:efm.spark.shuffle=primary-worker \ --properties=dataproc:efm.mapreduce.shuffle=hcfs \ --worker-machine-type=n1-highmem-8 \ --num-workers=25 \ --num-worker-local-ssds=2 \ --secondary-worker-type=preemptible \ --secondary-worker-boot-disk-size=500GB \ --num-secondary-workers=25
Apache Spark example
- Run a WordCount job against public Shakespeare text using the Spark examples
jar on the EFM cluster.
gcloud dataproc jobs submit spark \ --cluster=cluster-name \ --region=region \ --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \ --class=org.apache.spark.examples.JavaWordCount \ -- gs://apache-beam-samples/shakespeare/macbeth.txt
Apache Hadoop MapReduce example
Run a small teragen job to generate input data in Cloud Storage for a later terasort job using the mapreduce examples jar on the EFM cluster.
gcloud dataproc jobs submit hadoop \ --cluster=cluster-name \ --region=region \ --jar=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \ -- teragen 1000 Cloud Storage output URI (for example, gs://terasort/input)
Run a terasort job on the data.
gcloud dataproc jobs submit hadoop \ --cluster=cluster-name \ --region=region \ --jar=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \ -- terasort gs://terasort/input gs://terasort/output
Configuring Local SSDs for primary worker shuffle
Primary-worker and HDFS shuffle implementations write intermediate shuffle data to VM-attached disks, and benefit from the additional throughput and IOPS offered by local SSDs. To facilitate resource allocation, target a goal of approximately 1 local SSD partition per 4 vCPUs when configuring primary worker machines.
To attach local SSDs, pass the
--num-worker-local-ssds flag to the
gcloud dataproc clusters create command.
Secondary worker ratio
Since secondary workers write their shuffle data to primary workers, your
cluster must contain a sufficient number of primary workers with sufficient CPU,
memory, and disk resources to accommodate your job's shuffle load. For
autoscaling clusters, to prevent the primary group from scaling and causing
unwanted behavior, set
minInstances to the
maxInstances value in the
for the primary worker group.
If you have a high secondary-to-primary workers ratio (for example, 10:1), monitor the CPU utilization, network, and disk usage of primary workers to determine if they are overloaded. To do this:
Go to the VM instances page in the Google Cloud console.
Click the check box to the left side of primary worker.
Click the MONITORING tab to view the primary worker's CPU Utilization, Disk IOPS, Network Bytes, and other metrics.
If primary workers are overloaded, consider scaling up primary workers manually.
Resizing the primary worker group
The primary worker group can be safely scaled up, but downscaling the primary
worker group can negatively impact job progress. Operations that downscale the
primary worker group should use
graceful decommissioning, which is enabled by setting the
Autoscaled clusters: Primary worker group scaling is disabled on EFM clusters with autoscaling policies. To resize the primary worker group on an autoscaled cluster:
gcloud dataproc clusters update \ --cluster=cluster-name \ --region=region \ --disable-autoscaling
Scale the primary group.
gcloud dataproc clusters update \ --cluster=cluster-name \ --region=region \ --num-workers=num-primary-workers \ --graceful-decommission-timeout=graceful-decommission-timeout # (if downscaling)
gcloud dataproc clusters update \ --cluster=cluster-name \ --region=region \ --autoscaling-policy=autoscaling-policy
Monitoring primary worker disk use
Primary workers must have sufficient disk space for the cluster's shuffle data.
You can monitor this indirectly through the
remaining HDFS capacity metric.
As local disk fills up, space becomes unavailable for HDFS, and
the remaining capacity decreases.
By default, when a primary worker's local disk use exceeds 90% of capacity, the node will be marked as UNHEALTHY in the YARN node UI. If you experience disk capacity issues, you can delete unused data from HDFS or scale up the primary worker pool.
Note that intermediate shuffle data is generally not cleaned up until the end of a job. When using primary worker shuffle with Spark, this can take up to 30 minutes after a job's completion.
Partitioning and parallelism
When submitting a MapReduce or Spark job, configure an appropriate level of partitioning. Deciding on the number of input and output partitions for a shuffle stage involves a trade off among different performance characteristics. It is best to experiment with values that work for your job shapes.
MapReduce and Spark input partitioning are determined by the input data set. When reading files from Cloud Storage, each task processes approximately one "block size" worth of data.
For Spark SQL jobs, the maximum partition size is controlled by
spark.sql.files.maxPartitionBytes. Consider increasing it to 1GB:
For MapReduce jobs and Spark RDDs, partition size is typically controlled with
fs.gs.block.size, which defaults to 128MB. Consider increasing it to 1GB. You can also set
InputFormatspecific properties such as
- For MapReduce jobs:
- For Spark RDDs:
- For MapReduce jobs:
The number of tasks in subsequent stages is controlled by several properties. On larger jobs that process more than 1TB, consider having at least 1GB per partition.
For MapReduce jobs, the number of output partitions is controlled by
For Spark SQL, the number of output partitions is controlled by
For Spark jobs using the RDD API, you can specify the number of output partitions or set
Shuffle tuning for primary worker shuffle
The most significant property is
Note that this is a cluster-level YARN property because the Spark shuffle server
runs as part of the Node Manager. It defaults to twice (2x) number of cores on the
machine (for example, 16 threads on an n1-highmem-8). If "Shuffle Read Blocked Time" is
larger than 1 second, and primary workers have not reached network, CPU or disk
limits, consider increasing the number of shuffle server threads.
On larger machine types, consider increasing
which defaults to 1. (For example, set it to 5 connections per pair of hosts).
The maximum number of attempts permitted for app masters, tasks, and stages can be configured by setting the following properties:
yarn:yarn.resourcemanager.am.max-attempts mapred:mapreduce.map.maxattempts mapred:mapreduce.reduce.maxattempts spark:spark.task.maxFailures spark:spark.stage.maxConsecutiveAttempts
Since app masters and tasks are more frequently terminated in clusters that use many preemptible VMs or autoscaling without graceful decommissioning, increasing the values of the above properties in those clusters can help (note that using EFM with Spark and graceful decommissioning is not supported).
Configuring HDFS For HCFS shuffle
To improve the performance of large shuffles, you can decrease lock contention in
the NameNode by setting
dfs.namenode.fslock.fair=false. Note that this risks
starving individual requests, but may improve cluster-wide throughput.
To further improve NameNode performance, you can attach local SSDs to the
master node by setting
--num-master-local-ssds. You can also add local SSDs to
primary workers to improve DataNode performance by setting
Other Hadoop Compatible File Systems for HCFS shuffle
By default, EFM HCFS shuffle data is written to HDFS, but you can use any
Hadoop Compatible File System (HCFS).
For example, you may decide to write shuffle to Cloud Storage or to
a different cluster's HDFS. To specify a file system, you can point
fs.defaultFS to the target file system when you submit a job to your cluster.
YARN graceful decommissioning on EFM clusters
YARN Graceful Decommissioning can be used to remove nodes quickly with minimal impact on running applications. For autoscaling clusters, the graceful decommissioning timeout can be set in an AutoscalingPolicy that is attached to the EFM cluster.
MapReduce EFM enhancements to graceful decommissioning
Since intermediate data is stored in a distributed file system, nodes can be removed from an EFM cluster as soon as all containers running on those nodes have finished. By comparison, nodes are not removed on standard Dataproc clusters until the application has finished.
Node removal does not wait for app masters running on a node to finish. When the app master container is terminated, it is rescheduled on another node that is not being decommissioned. Job progress is not lost: the new app master quickly recovers state from the previous app master by reading job history.
Using graceful decommissioning on an EFM cluster with MapReduce
Create an EFM cluster with an equal number of primary and secondary workers.
gcloud dataproc clusters create cluster-name \ --properties=dataproc:efm.mapreduce.shuffle=hcfs \ --region=region \ --num-workers=5 \ --num-secondary-workers=5
Run a mapreduce job that calculates the value of pi using the mapreduce examples jar on the cluster.
gcloud dataproc jobs submit hadoop \ --cluster=cluster-name \ --region=region \ --jar=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \ -- pi 1000 10000000
While the job is running, scale down the cluster using graceful decommissioning.
gcloud dataproc clusters update cluster-name \ --region=region \ --num-secondary-workers=0 \ --graceful-decommission-timeout=1hThe nodes will be removed from the cluster quickly before the job finishes, while minimizing loss of job progress. Temporary pauses in job progress can occur due to:
- App master failover. If job progress drops to 0% and then immediately jumps up to the pre-drop value, the app master may have terminated and a new app master recovered its state. This should not significantly affect the progress of the job since failover happens quickly.
- VM preemption. Since HDFS preserves only complete, not partial, map task outputs, temporary pauses in job progress can occur when a VM is preempted while working on a map task.