Spark job tuning tips

Use Spark SQL

The Spark SQL DataFrame API is a significant optimization of the RDD API. If you interact with code that uses RDDs, consider reading data as a DataFrame before passing an RDD in the code. In Java or Scala code, consider using the Spark SQL Dataset API as a superset of RDDs and DataFrames.

Use Apache Spark 3

Dataproc 2.0 installs Spark 3, which includes the following features and performance improvements:

  • GPU support
  • Ability to read binary files
  • Performance improvements
  • Dynamic Partition Pruning
  • Adaptive query execution, which optimizes Spark jobs in real time

Use Dynamic Allocation

Apache Spark includes a Dynamic Allocation feature that scales the number of Spark executors on workers within a cluster. This feature allows a job to use the full Dataproc cluster even when the cluster scales up. This feature is enabled by default on Dataproc (spark.dynamicAllocation.enabled is set to true). See Spark Dynamic Allocation for more information.

Use Dataproc Autoscaling

Dataproc Autoscaling dynamically adds and removes Dataproc workers from a cluster to help ensure that Spark jobs have the resources needed to complete quickly.

It is a best practice to configure the autoscaling policy to only scale secondary workers.

Use Dataproc Enhanced Flexibility Mode

Clusters with preemptible VMs or an autoscaling policy may receive FetchFailed exceptions when workers are preempted or removed before they finish serving shuffle data to reducers. This exception can cause task retries and longer job completion times.

Recommendation: Use Dataproc Enhanced Flexibility Mode, which does not store intermediate shuffle data on secondary workers, so that secondary workers can be safely preempted or scaled down.

Configure partitioning and shuffling

Spark stores data in temporary partitions on the cluster. If your application groups or joins DataFrames, it shuffles the data into new partitions according to the grouping and low-level configuration.

Data partitioning significantly impacts application performance: too few partitions limits job parallelism and cluster resource utilization; too many partitions slows down the job due to additional partition processing and shuffling.

Configuring partitions

The following properties govern the number and size of your partitions:

  • spark.sqlfiles.maxPartitionBytes: the maximum size of partitions when you read in data from Cloud Storage. The default is 128 MB, which is sufficiently large for most applications that process less than 100 TB.

  • spark.sql.shuffle.partitions: the number of partitions after performing a shuffle. The default is 200, which is appropriate for clusters with less than 100 vCPUs total. Recommendation: Set this to 3x the number of vCPUs in your cluster.

  • spark.default.parallelism: the number of partitions returned after performing RDD transformations that require shuffles, such as join, reduceByKey and parallelize. The default is the total number of vCPUs in your cluster. When using RDDs in Spark jobs, you can set this number to 3x your vCPUs

Limit the number of files

There is a performance loss when Spark reads a large number small files. Store data in file sizes larger than 100 MB. Similarly, limit the number of output files (to force a shuffle, see Avoid unnecessary shuffles).

Configure adaptive query execution (Spark 3)

Adaptive query execution (enabled by default in Dataproc image version 2.0) provides Spark job performance improvements, including:

Although the default configuration settings are sound for most use cases, setting spark.sql.adaptive.advisoryPartitionSizeInBytes to spark.sqlfiles.maxPartitionBytes (default 128 MB) can be beneficial.

Avoid unnecessary shuffles

Spark allows users to manually trigger a shuffle to re-balance their data with the repartition function. Shuffles are expensive, so reshuffling data should be used cautiously. Setting the partition configurations appropriately should be sufficient to allow Spark to automatically partition your data.

Exception: When writing column-partitioned data to Cloud Storage, repartitioning on a specific column avoids writing many small files to achieve faster write times.

df.repartition("col_name").write().partitionBy("col_name").save("gs://...")

Store data in Parquet or Avro

Spark SQL defaults to reading and writing data in Snappy compressed Parquet files. Parquet is in efficient columnar file format that enables Spark to only read the data it needs to execute an application. This is an important advantage when working with large datasets. Other columnar formats, such as Apache ORC, also perform well.

For non-columnar data, Apache Avro provides an efficient binary-row file format. Although typically slower than Parquet, Avro's performance is better than text based formats,such as CSV or JSON.

Optimize disk size

Persistent disks throughput scales with disk size, which can affect the Spark job performance since jobs write metadata and shuffle data to disk. When using standard persistent disks, disk size should be at least 1 terabyte per worker (see Performance by persistent disk size).

To monitor worker disk throughput in the Google Cloud Console:

  1. Click the cluster name on the Clusters page.
  2. Click the VM INSTANCES tab.
  3. Click on any worker name.
  4. Click the MONITORING tab, then scroll down to Disk Throughput to view worker throughput.

Disk considerations

Ephemeral Dataproc clusters, which do not benefit from persistent storage. can use local SSDs. Local SSDs are physically attached to the cluster and provide higher throughput than persistent disks (see the Performance table). Local SSDs are available at a fixed size of 375 gigabytes, but you can add multiple SSDs to increase performance.

Local SSDs do not persist data after a cluster is shut down. If persistent storage is desired, you can use SSD persistent disks, which provide higher throughput for their size than standard persistent disks. SSD persistent disks are also a good choice if partition size will be smaller than 8 KB (however, avoid small paritiions).

Attach GPUs to your cluster

Spark 3 supports GPUs. Use GPUs with the RAPIDS initialization action to speed up Spark jobs using the RAPIDS SQL Accelerator. The GPU driver initialization action to configure a cluster with GPUs.

Common job failures and fixes

Out of Memory

Examples:

  • "Lost executor"
  • "java.lang.OutOfMemoryError: GC overhead limit exceeded"
  • "Container killed by YARN for exceeding memory limits"

Possible fixes:

Shuffle Fetch Failures

Examples:

  • "FetchFailedException" (Spark error)
  • "Failed to connect to..." (Spark error)
  • "Failed to fetch" (MapReduce error)

Typically caused by by premature removal of workers which still have shuffle data to serve.

Possible causes and fixes:

  • Preemptible worker VMs were reclaimed or non-preemptible worker VMs were removed by the autoscaler. Solution: Use Enhanced Flexibility Mode to make secondary workers safely preemptible or scalable.
  • Executor or mapper crashed due to OutOfMemory error. Solution: increase the memory of executor or mapper.
  • The Spark shuffle service may be overloaded. Solution: decrease the number of job partitions.

YARN nodes are UNHEALTHY

Examples (from YARN logs):

...reported UNHEALTHY with details: 1/1 local-dirs usable space is below
configured utilization percentage/no more usable space
[ /hadoop/yarn/nm-local-dir : used space above threshold of 90.0% ]

Often related to insufficient disk space for shuffle data. Diagnose by viewing log files:

  • Open your project's Clusters page in the Cloud Console, then click on the cluster's name.
  • Click VIEW LOGS.
  • Filter logs by hadoop-yarn-nodemanager.
  • Search for "UNHEALTHY".

Fix: If the log reports "UNHEALTHY" status, recreate your cluster with larger disk space, which will increase the throughput cap.

For more information