Dataproc best practices guide
Dataproc is a fully managed service for hosting open source distributed processing platforms such as Apache Spark, Presto, Apache Flink and Apache Hadoop on Google Cloud. Unlike on-premise clusters, Dataproc provides organizations the flexibility to provision and configure clusters of varying size on demand. In addition, Dataproc has powerful features to enable your organization to lower costs, increase performance and streamline operational management of workloads running on the cloud. This post aims to provide an overview on key best practices for Storage, Compute and Operations when adopting Dataproc for running Hadoop or Spark-based workloads.
Auto scaling enables clusters to scale up or down based on YARN memory metrics. Determining the correct auto scaling policy for a cluster may require careful monitoring and tuning over a period of time. It is also possible that an auto scaling policy which works well for one type of workload might not work that well for others. To handle this you can create multiple clusters with different auto scaling policies tuned for specific types of workloads.
Refer to the detailed documentation on configuring the different levers available to set up the auto scaling policy based on workloads. The configurations enable you to adjust how aggressively you want to upscale or downscale a cluster. For example, it makes sense to have more aggressive upscale configurations for clusters running business critical applications/jobs while one for those running low priority jobs may be less aggressive.
In general, below are some points to consider:
- Dataproc auto scaling depends on YARN metrics - Allocated, Available and Pending memory
- Scaling down can be less straightforward than scaling up and can result in task reprocessing or job failures. This is because map tasks store intermediate shuffle data on the local disk. When nodes are decommissioned, shuffle data can be lost for running jobs. Some common symptoms for this are:
MapReduce Jobs - “Failed to fetch”
Spark - “FetchFailedException”, “Failed to connect to
- Preferably scale only the secondary workers (the ones without data nodes). This would eliminate the need to move HDFS from the nodes being deleted. As a result, possibilities of the cluster going unhealthy due to corrupt HDFS blocks or Namenode race conditions is greatly reduced. Review the following section to determine if the secondary workers should be configured as preemptible or non-preemptible. By default, secondary workers are preemptible.
- Graceful decommission should ideally be set to be longer than the longest running job on the cluster. Note that the cluster will not scale up or down during the graceful decommission period and cool down period. This could result in a situation where smaller jobs get slowed down due to lack of resources. For sensitive long running workloads, consider scheduling on separate ephemeral clusters. Conversely, keep short running jobs on a single auto scaling cluster.
- Use Enhanced Flexibility Mode (EFM) to enable the use of more aggressive auto scaling policies. Since shuffle data is stored on primary worker nodes, secondary can be scaled up/down aggressively.
Preemptible VMs (PVMs)
PVMs are highly affordable, short-lived compute instances suitable for batch jobs and fault-tolerant workloads. Their price is significantly lower than normal VMs but they can be taken away from clusters at any time without any notice. Using a certain percentage of PVMs in clusters running fault tolerant workloads can reduce costs. However, remember that using a high percentage of PVMs or using it for jobs which are not fault tolerant can result in failed jobs or other related issues. In general, the recommendations would be to:-
- Use PVMs only for secondary workers as they do not run HDFS
- Set upto less than 30% of the max secondary workers to be PVMs
- Use PVMs only for fault tolerant jobs and test rigorously on lower level environments before upgrading to Prod
- Increase Application (MapReduce/Spark/etc) fault tolerance by increasing maximum attempts of application master and task/executor as required. Additionally consider setting the dataproc:am.primary_only flag to true to ensure that application master is started on the non-preemptible workers only
Unlike traditional, on-premise Hadoop, Dataproc is based on separation of compute and storage. This decouples scaling of compute and storage. Here is a summary of the storage options available with Dataproc:
Google Cloud Storage is the preferred storage option for all persistent storage needs. GCS is a Hadoop Compatible File System (HCFS) enabling Hadoop and Spark jobs to read and write to it with minimal changes. Further, data stored on GCS can be accessed by other Dataproc clusters and products (such as BigQuery)
HDFS storage on Dataproc is built on top of persistent disks (PDs) attached to worker nodes. This means data stored on HDFS is transient (unless it is copied to GCS or other persistent storage) with relatively higher storage costs. Hence it is recommended to minimize the use of HDFS storage. However there might be valid scenarios where you need to maintain a small HDFS footprint, specifically for performance reasons. In such cases, you can provision Dataproc clusters with limited HDFS storage, offloading all persistent storage needs to GCS. Below are some considerations while deciding on size and nature of storage disks to be attached to worker nodes:
- In general read and write throughput for standard PDs increase with increase in size of attached disk.
- Zonal disks have higher read/write throughput than regional ones.
- PDs come in a few different types to accommodate different performance and cost considerations.
- To supplement the boot disk, you can attach local Solid-State Drives (SSD) to master, primary worker, and secondary worker nodes in your cluster.
- Local SSDs can provide faster read and write times than persistent disk. The 375GB size of each local SSD is fixed, but you can attach multiple local SSDs to increase SSD storage.
In order to balance performance of HDFS with the flexibility and durability of GCS, you can design your workloads such that the source and final datasets are stored on GCS and intermediate datasets are stored on HDFS.
Further, to reduce read/write latency to GCS files, consider adopting the following measures:-
- Ensure that the GCS bucket is in the same region as the cluster.
- Consider disabling auto.purge for Hive managed tables on GCS. This would eliminate the copy to Trash when overwriting/deleting.
- Consider using Spark 3 or later (available starting from Dataproc 2.0) when using Spark SQL. For instance, INSERT OVERWRITE has a known issue in Spark 2.x.
- In general, the more files on GCS, the greater the time to read/write/move/delete the data on GCS. Hence consider tuning the Spark SQL/RDD/DataFrame parameters such as spark.sql.shuffle.partitions, spark.default.parallelism or the Hadoop parameter mapreduce.job.reduces to reduce the possibility of many files with varying/small sizes being written to GCS.
The worker node PDs by default hold shuffle data. A common cause for a YARN node to be marked “UNHEALTHY” is because the node has run out of disk space. This can be verified by checking the YARN Node Manager logs for the cluster and checking the disk space for the unhealthy node. Either increase the disk size or run fewer jobs concurrently.
Enhanced Flexibility Mode (EFM)
Removal of worker nodes due to downscaling or preemption (see sections below) often result in the loss of shuffle (intermediate data) stored locally on the node. To minimize job delays in such scenarios, it is highly recommended to enable Enhanced Flexibility Mode on the cluster. EFM has two modes:
Primary-worker shuffle - Recommended for Spark jobs, this enables mappers to write data to primary workers. The reduce phase, which typically runs on lesser nodes than the map phase, would read the data from the primary workers.
HCFS (Hadoop Compatible File System) shuffle - Mappers write data to an HCFS implementation (HDFS by default). As with primary worker mode, only primary workers participate in HDFS and HCFS implementations (if HCFS shuffle uses the Cloud Storage Connector, data is stored off-cluster). This mode can benefit jobs processing relatively small-medium amounts of data. However it is not recommended for jobs processing large volumes of data as it may introduce higher latency for shuffle data resulting in increased job execution time.
EFM is highly recommended for clusters that use preemptible VMs or for improving the stability of autoscale with the secondary worker group. With EFM enabled, ensure that the Primary Workers are sized correctly in terms of ratio to Secondary Workers, CPU and disk size. For example, if the ratio of Secondary Workers to Primary Workers is very high, the stability of the cluster could get impacted negatively as the shuffle gets bottlenecked on the Primary Workers. To further improve the shuffle performance, ensure that the shuffle parameters are increased from the default values.
Labels are key-value pairs that can be tagged to Dataproc clusters and jobs. Labels are added when the cluster is created or at job submission time. Listed below are some possible applications for labels:
Billing - It is possible to track and consolidate costs associated with Dataproc clusters for the purpose of attribution to users, teams or departments. This can be achieved by filtering billing data by labels on clusters, jobs or other resources. (E.g. team:marketing, team:analytics, etc)
Searching and Categorizing - It is easy to tag, filter or search various resources (jobs, clusters, environments, components, etc.) based on their labels.
Automation - Dynamically submit job/workflows to Dataproc cluster pools based on cluster or job labels. For instance, one can submit high priority jobs to a cluster with aggressive auto scaling or jobs tagged with ML or Data Science labels can be run on clusters with TPUs.
Monitoring - Labels are also very useful for monitoring. For example, using Advanced Filter in Cloud Logging one can filter out events for specific labels.
Upgrades - You can perform rolling upgrades of clusters using labels. This is specifically useful if you want to maintain specific versions of Dataproc based on workloads or team.
Traditionally, organizations would have one or more Hadoop clusters consisting of disparate hardware (nodes, switches etc). This makes management of the cluster, scaling and tuning of jobs a complex exercise. While using Dataproc build Cluster pools with labels. Those pools can be assigned Dataproc Workflow Templates. This enables you to submit jobs using Workflow Templates to cluster pools. A single cluster pool could have one or more clusters assigned to it. When a Workflow Template is assigned to a cluster pool, it will run against any one of the Dataproc clusters within the cluster pool.
Here are some possible ways of organizing cluster pools:-
Job types - Jobs can be classified according to characteristics like priority (critical, high, low etc) or resource utilization (cpu or memory intensive, ML etc). For example, having different cluster pools to run Compute intensive, I/O intensive and ML related use cases separately may result in better performance as well as lower costs (as hardware and config are customized for workload type).
Users & Groups - Cluster pools are also useful if you want to configure clusters to run jobs from certain teams or users. For example Data scientists can submit spark ML jobs to clusters with TPUs while normal ETL jobs run on a Dataproc cluster with normal CPUs, PDs.
Compliance & Data Governance - Labels along with cluster pools can simplify data governance and compliance needs as well. For example it is possible to run jobs with specific security and compliance needs to run in a more hardened environment than others. This can be achieved via cluster pools.
Cluster pools are also useful in long running clusters. It might be beneficial to create a pool of auto scaling clusters optimized to run specific kinds of workflows. You can creatively perform rolling upgrades of dataproc clusters using cluster pools. Follow below steps to upgrade your dataproc cluster pools without any downtime to current workloads:
Spin up new cluster-pools with target versions using specific tags (Ex “dataproc-2.1” etc) and auto scaling set to true.
Submit all new workflows/jobs to the new cluster pool. You can use labels to submit jobs to the cluster pool.
Let your previous cluster pools with older versions complete current workloads. Clean them up once all jobs finish.
Cluster scheduled deletion
Trimming costs due to unused, idle resources is top on any organization’s IT priorities. To prevent such scenarios from arising, you can create a cluster with the Cluster Scheduled Deletion feature enabled. This configuration can be embedded in your IaC code (Infrastructure As Code like Cloud Build, Terraform scripts). Carefully review the details of configured deletion conditions when enabling scheduled deletion to ensure it fits with your organization’s priorities.
Long running vs short lived (ephemeral) clusters
A common question we hear from our customers is to share recommendations around when to use short lived (ephemeral) clusters vs long running ones. As their names indicate, ephemeral clusters are short lived. For the majority of Hadoop and Spark use cases, ephemeral clusters provide flexibility to create workload sized, auto-scaling clusters. Below are some key advantages ephemeral clusters brings to the table:
- Workload specific cluster configuration Ephemeral clusters enable users to customize cluster configurations according to individual workflows, eliminating the administrative burden of managing different hardware profiles and configurations.
- Workload specific h/w profiles VMs used in creation of these ephemeral clusters can also be customized according to individual use cases. Dataproc offers a wide variety of VMs (General purpose, memory optimized, compute optimized etc). With workflow sized clusters you can choose the best hardware (compute instance) to run it. For example, compute intensive use cases can benefit from more vCPUs (compute optimized machines [C2]) while allocating more memory persistent disks for i/o intensive ones (memory optimized machines). For more details refer to this detailed blog on the topic.
- Cost attribution Since the lifetime of the cluster is limited to individual workflow, cost attribution is easy and straightforward.
- Reduced operational overhead
Since the cluster is short lived there is no need for classic cluster maintenance. With CICD integration, you can deploy and clean up ephemeral clusters with minimal intervention.
There is no need to maintain separate infrastructure for development, testing, and production. Users can use the same cluster definitions to spin up as many different versions of a cluster as required and clean them up once done.
Simplified security Since a single cluster is used for a single use case or user, corresponding security requirements are also simplified. From multi-tenancy to network firewall rules, large monolith clusters need to cater to different security requirements. Usage of ephemeral clusters simplifies those needs by letting us concentrate on one use case (user) at a time.
For more details around ephemeral clusters, refer to official documentation here. You should use ephemeral clusters for the majority of use cases. Long running clusters may be required in certain instances, especially when you need to maintain a cluster in warm conditions all the time. For example, it makes sense to spin up a long running cluster for use cases requiring constant analysis. This involves batch or streaming jobs which run 24X7 (either periodically or always on realtime jobs). An hourly batch job which aggregates raw events and ingests it into BigQuery throughout the day might fit the bill. Another scenario where we often see customers using long running clusters is for ad-hoc analytical queries. When there is a need to let analysts quickly run relatively short lived queries on Hive, Presto or Spark.
Although this second scenario may sound like a good fit for ephemeral clusters, creating an ephemeral cluster for a hive query which may run for a few minutes may be an overhead. Especially when the number of such live users are large. As discussed earlier, this can be a good use case to run auto scaling cluster pools. Remember Dataproc now supports “start” and “stop” functionality for clusters. You can use this to turn off long running clusters when not in use.
Monitoring and logging
Default Dataproc view is a dashboard that gives a high-level overview of the health of the cluster based on Cloud Monitoring — a Google Cloud service, which can monitor, alert, filter, and aggregate metrics. It has different tabs for Monitoring, Jobs, VM Instances, Configuration, and Web Interfaces.
The Jobs tab shows recent jobs along with their type, start time, elapsed time, and status.
The VM Instances view shows the status of GCE instances that constitute the cluster. Clicking on a GCE VM instance name will reveal instance configuration. Each GCE VM node comes with a Cloud Monitoring agent, which is a universal metrics collecting solution across GCP.
Users can also access GCP metrics through the Monitoring API, or through Cloud Monitoring dashboard. These metrics can be used for monitoring, alerting or to find saturated resources in the cluster. It is also possible to emit your custom metrics to stackdriver and create dashboards on top of those metrics. Remember, Stackdriver has its own costs associated with metrics. Some of these are free but some result in additional costs.
Hadoop ecosystem UI
You can enable Hadoop ecosystem UI’s like YARN, HDFS or Spark server UI. For more details refer to documentation on enabling component gateway.
- Set dataproc:job.history.to-gcs.enabled to true to minimize local disk consumption. It is enabled by default from images 1.5 onwards. This setting will persist MapReduce and Spark history files to the GCS bucket reducing the possibility of the nodes running out of disk and causing the cluster to go unhealthy.
- Use the parameter --driver-log-levels to control the level of logging into Cloud Logging.
Diagnostics and debugging
- The Spark history server and YARN history server UI is useful to view and debug corresponding applications.
- Use the diagnose utility to obtain a tarball which can provide a snapshot of the cluster’s state at the time. Once the tarball is generated on GCS, it should be safe to delete the cluster. The tarball contains the confs for the cluster, Jstack and logs for the Dataproc Agent, JMX metrics for NodeManager and ResourceManager and other System logs. When contacting GCP Support, provide this tarball on the case to enable engineers diagnose and troubleshoot cluster issues.
By now you should have a good understanding of some of the best practices of using Dataproc service on GCP. In this section we covered recommendations around storage, performance, cluster-pools and labels. We also covered answers to some commonly asked questions like “Usage of ephemeral clusters vs long running clusters”. Hope this will help you make the best use of Dataproc.