When a Dataproc node is removed, Dataproc Enhanced Flexibility Mode (EFM) preserves stateful node data, such as mapreduce shuffle data, in HDFS.
- Enhanced Flexibility Mode is currently only supported in 1.4 image clusters.
- Apache Hadoop YARN jobs that do not support AppMaster relocation may fail more often in Enhanced Flexibility Mode (see When to wait for AppMasters to finish).
- Enhanced Flexibility Mode is not recommended when:
- a cluster has primary workers only
- primary worker autoscaling is on unless EFM is configured to point to
a different cluster's HDFS
Note: if jobs depend heavily on HDFS, autoscaling of HDFS is not recommended.
Using Enhanced Flexibility Mode
Create a cluster with Enhanced Flexibility Mode enabled:
gcloud dataproc clusters create cluster-name \ --properties dataproc:alpha.state.shuffle.hcfs.enabled=true \ --image-version 1.4 \ --zone cluster-zone (e.g., us-central1-a)
Apache Hadoop MapReduce example
From the mapreduce examples jar on the Enhanced Flexibility Mode cluster, run a small teragen job to generate input data in Cloud Storage for a later terasort job.
gcloud dataproc jobs submit hadoop \ --cluster cluster-name \ --jar file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \ -- teragen 1000 Cloud Storage output URI (e.g., gs://terasort/input)
Run a terasort job on the data.
gcloud dataproc jobs submit hadoop \ --cluster cluster-name \ --jar file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \ -- terasort gs://terasort/input gs://terasort/output
Apache Spark example
- Run a WordCount job on public Shakespeare text using the Spark examples jar
on the enhanced flexibility mode cluster.
gcloud dataproc jobs submit spark \ --cluster cluster-name \ --jars file:///usr/lib/spark/examples/jars/spark-examples.jar \ --class org.apache.spark.examples.JavaWordCount \ -- gs://apache-beam-samples/shakespeare/macbeth.txt
YARN graceful decommissioning on Enhanced Flexibility Mode clusters
YARN Graceful Decommissioning can be used to remove nodes quickly with minimal impact on running applications. For autoscaling clusters, the graceful decommission timeout can be set in an AutoscalingPolicy that is attached to the enhanced flexibility mode cluster.
Enhanced Flexibility Mode enhancements to graceful decommissioning
Since intermediate data is stored in a distributed file system, nodes can be removed from an Enhanced Flexibility Mode cluster as soon as all containers running on those nodes have finished. By comparison, nodes are not removed on standard Dataproc clusters until the application has finished.
Node removal does not wait for app masters running on a node to finish. When the app master container is killed, it is rescheduled on another node that is not being decommissioned. Job progress is not lost: the new app master quickly recovers state from the previous app master by reading job history.
Using Graceful Decommission on an Enhanced Flexibility Mode cluster
Create an enhanced flexibility mode cluster with an equal number of primary and secondary workers.
gcloud dataproc clusters create cluster-name \ --properties dataproc:alpha.state.shuffle.hcfs.enabled=true \ --image-version 1.4 \ --zone cluster-zone (e.g., us-central1-a) \ --num-workers 5 \ --num-secondary-workers 5
From the mapreduce examples jar on the cluster, run a mapreduce job that calculates the value of pi.
gcloud dataproc jobs submit hadoop \ --cluster cluster-name \ --jar file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \ -- pi 1000 10000000
While the job is running, scale down the cluster using graceful decommissioning.
gcloud dataproc clusters update cluster-name \ --num-secondary-workers 0 \ --graceful-decommission-timeout 1h
The nodes will be removed from the cluster quickly (before the job finishes) while minimizing loss of job progress. Temporary dips in job progress can occur due to:
- App master failover. If job progress drops to 0% and then immediately jumps up to the pre-drop value, a likely reason is that the app master was killed and a new app master recovered its state. This should not significantly affect the progress of the job since failover happens quickly.
- VM preemption. Since HDFS preserves only complete, not partial, map task outputs, temporary dips in job progress can occur when a VM is preempted while working on a map task.
To improve performance for large shuffles, you can increase the number of
serving threads for the namenode and datanode by increasing the values of
dfs.datanode.handler.count when creating the
cluster. By default, the namenode and datanodes each have 10 serving threads.
To further improve namenode performance, you can attach local SSDs to the
master node by passing the
--num-master-local-ssds flag to
command. You can also use the
--num-worker-local-ssds flag to
attach local SSDs to data nodes to improve HDFS performance.
Partitioning and Parallelism
When submitting a MapReduce or Spark job, it is important to configure an appropriate level of partitioning. The number of input and output partitions for a given shuffle stage trade off different performance characteristics. It is best to experiment with values that work for your job shapes.
For MapReduce jobs, the number of output partitions is controlled by
mapreduce.job.reduces. For Spark SQL, the number of output partitions is
spark.sql.shuffle.partitions. For Spark jobs using the RDD API,
you can the specify the number of partitions. MapReduce and Spark input
partitioning are implicitly set by the input data set.
The maximum number of attempts permitted for app masters, tasks, and stages can be configured by setting the following properties:
yarn:yarn.resourcemanager.am.max-attempts mapred:mapreduce.map.maxattempts mapred:mapreduce.reduce.maxattempts spark:spark.task.maxFailures spark:spark.stage.maxConsecutiveAttempts
Since app masters and tasks will be more frequently killed in clusters that use many preemptible VMs or autoscaling without graceful decommissioning, the recommended practice is to increase the above values in those clusters.
Preemptible Worker Ratio
Since preemptible workers write their shuffle data to HDFS, it is important to
make sure that your cluster contains enough primary workers to accommodate your
job's shuffle data. For autoscaling clusters, you can control the target fraction
of preemptible workers in your cluster by assigning weights
secondaryWorkerConfig in your AutoscalingPolicy.
Other Hadoop Compatible File Systems
By default, shuffle data is written to HDFS, but you can use any
Hadoop Compatible File System (HCFS).
For example, you may decide to write shuffle to Cloud Storage or to
a different cluster's HDFS. To specify a file system , you can point
fs.defaultFS to the target file system when you submit
a job to your cluster.