Cloud Dataproc Enhanced Flexibility Mode

When a Cloud Dataproc node is removed, Cloud Dataproc Enhanced Flexibility Mode preserves stateful node data, such as mapreduce shuffle data, in HDFS.

Since HDFS only runs on primary workers, this feature is particularly suited to clusters that use preemptible VMs or only autoscale the preemptible worker group.

Limitations:

Using Enhanced Flexibility Mode

Create a cluster with Enhanced Flexibility Mode enabled:

gcloud dataproc clusters create cluster-name \
    --properties dataproc:alpha.state.shuffle.hcfs.enabled=true \
    --image-version 1.4 \
    --zone cluster-zone (e.g., us-central1-a)

Apache Hadoop MapReduce example

  1. From the mapreduce examples jar on the Enhanced Flexibility Mode cluster, run a small teragen job to generate input data in Cloud Storage for a later terasort job.

    gcloud dataproc jobs submit hadoop \
        --cluster cluster-name \
        --jar file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \
        -- teragen 1000 Cloud Storage output URI (e.g., gs://terasort/input)
    

  2. Run a terasort job on the data.

    gcloud dataproc jobs submit hadoop \
        --cluster cluster-name \
        --jar file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \
        -- terasort gs://terasort/input gs://terasort/output
    

Apache Spark example

  1. Run a WordCount job on public Shakespeare text using the Spark examples jar on the enhanced flexibility mode cluster.
    gcloud dataproc jobs submit spark \
        --cluster cluster-name \
        --jars file:///usr/lib/spark/examples/jars/spark-examples.jar \
        --class org.apache.spark.examples.JavaWordCount \
        -- gs://apache-beam-samples/shakespeare/macbeth.txt
    

YARN graceful decommissioning on Enhanced Flexibility Mode clusters

YARN Graceful Decommissioning can be used to remove nodes quickly with minimal impact on running applications. For autoscaling clusters, the graceful decommission timeout can be set in an AutoscalingPolicy that is attached to the enhanced flexibility mode cluster.

Enhanced Flexibility Mode enhancements to graceful decommissioning

  1. Since intermediate data is stored in a distributed file system, nodes can be removed from an Enhanced Flexibility Mode cluster as soon as all containers running on those nodes have finished. By comparison, nodes are not removed on standard Cloud Dataproc clusters until the application has finished.

  2. Node removal does not wait for app masters running on a node to finish. When the app master container is killed, it is rescheduled on another node that is not being decommissioned. Job progress is not lost: the new app master quickly recovers state from the previous app master by reading job history.

Using Graceful Decommission on an Enhanced Flexibility Mode cluster

Example:

  1. Create an enhanced flexibility mode cluster with an equal number of primary and secondary workers.

    gcloud dataproc clusters create cluster-name \
        --properties dataproc:alpha.state.shuffle.hcfs.enabled=true \
        --image-version 1.4 \
        --zone cluster-zone (e.g., us-central1-a) \
        --num-workers 5 \
        --num-preemptible-workers 5
    

  2. From the mapreduce examples jar on the cluster, run a mapreduce job that calculates the value of pi.

    gcloud dataproc jobs submit hadoop \
        --cluster cluster-name \
        --jar file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \
        -- pi 1000 10000000
    

  3. While the job is running, scale down the cluster using graceful decommissioning.

    gcloud dataproc clusters update cluster-name \
        --num-preemptible-workers 0 \
        --graceful-decommission-timeout 1h
    

The nodes will be removed from the cluster quickly (before the job finishes) while minimizing loss of job progress. Temporary dips in job progress can occur due to:

  • App master failover. If job progress drops to 0% and then immediately jumps up to the pre-drop value, a likely reason is that the app master was killed and a new app master recovered its state. This should not significantly affect the progress of the job since failover happens quickly.
  • VM preemption. Since HDFS preserves only complete, not partial, map task outputs, temporary dips in job progress can occur when a VM is preempted while working on a map task.

Advanced Configuration

Configuring HDFS

To improve performance for large shuffles, you can increase the number of serving threads for the namenode and datanode by increasing the values of dfs.namenode.handler.count and dfs.datanode.handler.count when creating the cluster. By default, the namenode and datanodes each have 10 serving threads. To further improve namenode performance, you can attach local SSDs to the master node by passing the --num-master-local-ssds flag to the clusters create command. You can also use the --num-worker-local-ssds flag to attach local SSDs to data nodes to improve HDFS performance.

Partitioning and Parallelism

When submitting a MapReduce or Spark job, it is important to configure an appropriate level of partitioning. The number of input and output partitions for a given shuffle stage trade off different performance characteristics. It is best to experiment with values that work for your job shapes.

For MapReduce jobs, the number of output partitions is controlled by mapreduce.job.reduces. For Spark SQL, the number of output partitions is controlled by spark.sql.shuffle.partitions. For Spark jobs using the RDD API, you can the specify the number of partitions. MapReduce and Spark input partitioning are implicitly set by the input data set.

Increasing Retries

The maximum number of attempts permitted for app masters, tasks, and stages can be configured by setting the following properties:

yarn:yarn.resourcemanager.am.max-attempts
mapred:mapreduce.map.maxattempts
mapred:mapreduce.reduce.maxattempts
spark:spark.task.maxFailures
spark:spark.stage.maxConsecutiveAttempts

Since app masters and tasks will be more frequently killed in clusters that use many preemptible VMs or autoscaling without graceful decommissioning, the recommended practice is to increase the above values in those clusters.

Preemptible Worker Ratio

Since preemptible workers write their shuffle data to HDFS, it is important to make sure that your cluster contains enough primary workers to accommodate your job's shuffle data. For autoscaling clusters, you can control the target fraction of preemptible workers in your cluster by assigning weights in the workerConfig and secondaryWorkerConfig in your AutoscalingPolicy.

Other Hadoop Compatible File Systems

By default, shuffle data is written to HDFS, but you can use any Hadoop Compatible File System (HCFS). For example, you may decide to write shuffle to Cloud Storage or to a different cluster's HDFS. To specify a file system , you can point fs.defaultFS to the target file system when you submit a job to your cluster.

Oliko tästä sivusta apua? Kerro mielipiteesi

Palautteen aihe:

Tämä sivu
Cloud Dataproc Documentation
Tarvitsetko apua? Siirry tukisivullemme.