Use Spark SQL
The Spark SQL DataFrame API is a significant optimization of the RDD API. If you interact with code that uses RDDs, consider reading data as a DataFrame before passing an RDD in the code. In Java or Scala code, consider using the Spark SQL Dataset API as a superset of RDDs and DataFrames.
Use Apache Spark 3
Dataproc 2.0 installs Spark 3, which includes the following features and performance improvements:
- GPU support
- Ability to read binary files
- Performance improvements
- Dynamic Partition Pruning
- Adaptive query execution, which optimizes Spark jobs in real time
Use Dynamic Allocation
Apache Spark includes a Dynamic Allocation feature that scales
the number of Spark executors on workers within a cluster. This feature allows
a job to use the full Dataproc cluster even when the cluster
scales up. This feature is enabled by default on Dataproc
spark.dynamicAllocation.enabled is set to
Spark Dynamic Allocation
for more information.
Use Dataproc Autoscaling
Dataproc Autoscaling dynamically adds and removes Dataproc workers from a cluster to help ensure that Spark jobs have the resources needed to complete quickly.
It is a best practice to configure the autoscaling policy to only scale secondary workers.
Use Dataproc Enhanced Flexibility Mode
Clusters with preemptible VMs or an autoscaling policy may receive FetchFailed exceptions when workers are preempted or removed before they finish serving shuffle data to reducers. This exception can cause task retries and longer job completion times.
Recommendation: Use Dataproc Enhanced Flexibility Mode, which does not store intermediate shuffle data on secondary workers, so that secondary workers can be safely preempted or scaled down.
Configure partitioning and shuffling
Spark stores data in temporary partitions on the cluster. If your application groups or joins DataFrames, it shuffles the data into new partitions according to the grouping and low-level configuration.
Data partitioning significantly impacts application performance: too few partitions limits job parallelism and cluster resource utilization; too many partitions slows down the job due to additional partition processing and shuffling.
The following properties govern the number and size of your partitions:
spark.sqlfiles.maxPartitionBytes: the maximum size of partitions when you read in data from Cloud Storage. The default is 128 MB, which is sufficiently large for most applications that process less than 100 TB.
spark.sql.shuffle.partitions: the number of partitions after performing a shuffle. The default is 200, which is appropriate for clusters with less than 100 vCPUs total. Recommendation: Set this to 3x the number of vCPUs in your cluster.
spark.default.parallelism: the number of partitions returned after performing RDD transformations that require shuffles, such as
parallelize. The default is the total number of vCPUs in your cluster. When using RDDs in Spark jobs, you can set this number to 3x your vCPUs
Limit the number of files
There is a performance loss when Spark reads a large number small files. Store data in larger file sizes, for example, file sizes in the 256MB–512MB range. Similarly, limit the number of output files (to force a shuffle, see Avoid unnecessary shuffles).
Configure adaptive query execution (Spark 3)
Adaptive query execution (enabled by default in Dataproc image version 2.0) provides Spark job performance improvements, including:
- Coalescing partitions after shuffles
- Converting sort-merge joins to broadcast joins
- Optimizations for skew joins.
Although the default configuration settings are sound for most use cases,
spark.sqlfiles.maxPartitionBytes (default 128 MB) can be beneficial.
Avoid unnecessary shuffles
Spark allows users to manually trigger a shuffle to re-balance their data with
repartition function. Shuffles are expensive, so reshuffling data should
be used cautiously. Setting the partition configurations
appropriately should be sufficient to allow Spark to automatically partition
Exception: When writing column-partitioned data to Cloud Storage, repartitioning on a specific column avoids writing many small files to achieve faster write times.
Store data in Parquet or Avro
Spark SQL defaults to reading and writing data in Snappy compressed Parquet files. Parquet is in efficient columnar file format that enables Spark to only read the data it needs to execute an application. This is an important advantage when working with large datasets. Other columnar formats, such as Apache ORC, also perform well.
For non-columnar data, Apache Avro provides an efficient binary-row file format. Although typically slower than Parquet, Avro's performance is better than text based formats,such as CSV or JSON.
Optimize disk size
Persistent disks throughput scales with disk size, which can affect the Spark job performance since jobs write metadata and shuffle data to disk. When using standard persistent disks, disk size should be at least 1 terabyte per worker (see Performance by persistent disk size).
To monitor worker disk throughput in the Google Cloud console:
- Click the cluster name on the Clusters page.
- Click the VM INSTANCES tab.
- Click on any worker name.
- Click the MONITORING tab, then scroll down to Disk Throughput to view worker throughput.
Ephemeral Dataproc clusters, which do not benefit from persistent storage. can use local SSDs. Local SSDs are physically attached to the cluster and provide higher throughput than persistent disks (see the Performance table). Local SSDs are available at a fixed size of 375 gigabytes, but you can add multiple SSDs to increase performance.
Local SSDs do not persist data after a cluster is shut down. If persistent storage is desired, you can use SSD persistent disks, which provide higher throughput for their size than standard persistent disks. SSD persistent disks are also a good choice if partition size will be smaller than 8 KB (however, avoid small paritiions).
Attach GPUs to your cluster
Spark 3 supports GPUs. Use GPUs with the RAPIDS initialization action to speed up Spark jobs using the RAPIDS SQL Accelerator. The GPU driver initialization action to configure a cluster with GPUs.
Common job failures and fixes
Out of Memory
- "Lost executor"
- "java.lang.OutOfMemoryError: GC overhead limit exceeded"
- "Container killed by YARN for exceeding memory limits"
- If using PySpark, raise
- Use high memory machine types.
- Use smaller partitions.
Shuffle Fetch Failures
- "FetchFailedException" (Spark error)
- "Failed to connect to..." (Spark error)
- "Failed to fetch" (MapReduce error)
Typically caused by by premature removal of workers which still have shuffle data to serve.
Possible causes and fixes:
- Preemptible worker VMs were reclaimed or non-preemptible worker VMs were removed by the autoscaler. Solution: Use Enhanced Flexibility Mode to make secondary workers safely preemptible or scalable.
- Executor or mapper crashed due to OutOfMemory error. Solution: increase the memory of executor or mapper.
- The Spark shuffle service may be overloaded. Solution: decrease the number of job partitions.
YARN nodes are UNHEALTHY
Examples (from YARN logs):
...reported UNHEALTHY with details: 1/1 local-dirs usable space is below configured utilization percentage/no more usable space [ /hadoop/yarn/nm-local-dir : used space above threshold of 90.0% ]
Often related to insufficient disk space for shuffle data. Diagnose by viewing log files:
- Open your project's Clusters page in the Google Cloud console, then click on the cluster's name.
- Click VIEW LOGS.
- Filter logs by
- Search for "UNHEALTHY".
Fix: If the log reports "UNHEALTHY" status, recreate your cluster with larger disk space, which will increase the throughput cap.
Job fails due to insufficient driver memory
When running jobs in cluster mode, the job fails if the memory size of the master node is significantly larger than the worker node memory size.
Example from driver logs:
'Exception in thread "main" java.lang.IllegalArgumentException: Required AM memory (32768+3276 MB) is above the max threshold (12288 MB) of this cluster! Please check the values of 'yarn.scheduler.maximum -allocation-mb' and/or 'yarn.nodemanager.resource.memory-mb'.'
- Use the same machine type for master and worker nodes.