This document describes how to move Apache Spark jobs to Dataproc. The document is intended for big-data engineers and architects. It covers topics such as considerations for migration, preparation, job migration, and management.
Overview
When you want to move your Apache Spark workloads from an on-premises environment to Google Cloud, we recommend using Dataproc to run Apache Spark/Apache Hadoop clusters. Dataproc is a fully managed, fully supported service offered by Google Cloud. It allows you to separate storage and compute, which helps you to manage your costs and be more flexible in scaling your workloads.
If a managed Hadoop environment doesn't fit your needs, you can also use a different setup, such as running Spark on Google Kubernetes Engine (GKE), or renting virtual machines on Compute Engine and setting up a Hadoop or Spark cluster yourself. However, take into account that options other than using Dataproc are self-managed and have only community support.
Planning your migration
There are many differences between running Spark jobs on-premises and running Spark jobs on Dataproc or Hadoop clusters on Compute Engine. It's important to look closely at your workload and prepare for migration. In this section, we outline considerations to take into account, and preparations that to take before you migrate Spark jobs.
Identify job types and plan clusters
There are three types of Spark workloads, as described in this section.
Regularly scheduled batch jobs
Regularly scheduled batch jobs include use cases like daily or hourly ETLs, or pipelines for training machine learning models with Spark ML. For these cases, we recommend that you create a cluster for each batch workload and then delete the cluster after your job is finished. You have the flexibility to configure your cluster, because you can adjust the configuration for each workload separately. Dataproc clusters are billed in one-second block increments after the first minute, so this approach is also cost-effective, because you can label your clusters. For more information, see the Dataproc pricing page.
You can implement batch jobs with workflow templates or by following these steps:
Create a cluster and wait until the cluster is created. (You can monitor whether the cluster has been created by using an API call or a gcloud command.) If you run your job on a dedicated Dataproc cluster, it might help to turn off the dynamic allocation and the external shuffle service. The following
gcloud
command shows Spark configuration properties that are provided when you create the Dataproc cluster:dataproc clusters create ... \ --properties 'spark:spark.dynamicAllocation.enabled=false,spark:spark.shuffle.service.enabled=false,spark.executor.instances=10000'
Submit your job to the cluster. (You can monitor the status of your job by using an API call or a gcloud command.) For example:
jobId=$(gcloud --quiet dataproc jobs submit pyspark \ --async \ --format='value(reference.jobId)' \ --cluster $clusterName \ --region global \ gs://dataproc-examples-2f10d78d114f6aaec76462e3c310f31f/src/pyspark/hello-world/hello-world.py) gcloud dataproc jobs describe $jobId \ --region=global \ --format='value(status.state)'
Delete the cluster after the job has been executed by using an API call or a gcloud command.
Streaming jobs
For streaming jobs, you need to create a long-running Dataproc cluster and configure the cluster to run in high-availability mode. We don't recommend using preemptible VMs for this case.
Ad hoc or interactive workloads submitted by users
Examples of ad hoc workloads can include users who write queries or execute analytical jobs during the day.
For these cases, you must decide whether you need the cluster to run in high-availability mode, whether you'd like to use preemptible VMs, and how you'll manage access to the cluster. You can schedule cluster creation and termination (for example, if you never need the cluster during the night or weekends), and you can implement up and down scaling according to the schedule.
Identify data sources and dependencies
Each job has its own dependencies (for example, the data sources it needs), and other teams at your company might be dependent on the outcome of your jobs. Therefore, you must identify all dependencies and then create a migration plan that includes procedures for the following:
Step-by-step migration of all of your data sources to Google Cloud. In the beginning, it's helpful to mirror the data source in Google Cloud so that you have it in two places.
Job-by-job migration of your Spark workloads to Google Cloud as soon as corresponding data sources have been migrated. As with data, at some point you might have two workloads that are running in parallel both in your old environment and in Google Cloud.
Migration of other workloads that depend on the output of your Spark workloads. Or you might just replicate the output back to the initial environment.
Shutdown of Spark jobs in the old environment after all dependent teams have confirmed that they no longer require the jobs.
Choose storage options
There are two storage options to use with your Dataproc clusters: you can store all of the data in Cloud Storage, or you can use local disks or persistent disks with the cluster workers. The correct choice depends on the character of your jobs.
Comparing Cloud Storage and HDFS
Each node of a Dataproc cluster has a Cloud Storage
connector
installed on it. By default, the connector is installed under
/usr/lib/hadoop/lib
. The connector implements the Hadoop
FileSystem
interface and makes Cloud Storage compatible with HDFS.
Because Cloud Storage is a binary large object (BLOB) storage system,
the connector emulates directories according to the object's name. You can
access your data by using the gs://
prefix instead of the hdfs://
prefix.
The Cloud Storage connector typically doesn't require any customization. However, if you need to make changes, you can follow the instructions for configuring the connector. A full list of configuration keys is also available.
Cloud Storage is a good option if:
- Your data in ORC, Parquet, Avro, or any other format will be used by different clusters or jobs, and you need data persistence if the cluster terminates.
- You need high throughput and your data is stored in files larger than 128 MB.
- You need cross-zone durability for your data.
- You need data to be highly available—for example, you want to eliminate HDFS NameNode as a single point of failure.
Local HDFS storage is a good option if:
- Your jobs require a lot of metadata operations—for example, you have thousands of partitions and directories, and each file size is relatively small.
- You modify the HDFS data frequently or you rename directories. (Cloud Storage objects are immutable, so renaming a directory is an expensive operation because it consists of copying all objects to a new key and deleting them afterwards.)
- You heavily use the append operation on HDFS files.
You have workloads that involve heavy I/O. For example, you have a lot of partitioned writes, such as the following:
spark.read().write.partitionBy(...).parquet("gs://")
You have I/O workloads that are especially sensitive to latency. For example, you require single-digit millisecond latency per storage operation.
In general, we recommend using Cloud Storage as the initial and final source of data in a big-data pipeline. For example, if a workflow contains five Spark jobs in series, the first job retrieves the initial data from Cloud Storage and then writes shuffle data and intermediate job output to HDFS. The final Spark job writes its results to Cloud Storage.
Adjust storage size
Using Dataproc with Cloud Storage allows you to reduce the disk requirements and save costs by putting your data there instead of in the HDFS. When you keep your data on Cloud Storage and don't store it on the local HDFS, you can use smaller disks for your cluster. By making your cluster truly on-demand, you're also able to separate storage and compute, as noted earlier, which helps you reduce costs significantly.
Even if you store all of your data in Cloud Storage, your Dataproc cluster needs HDFS for certain operations such as storing control and recovery files, or aggregating logs. It also needs non-HDFS local disk space for shuffling. You can reduce the disk size per worker if you are not heavily using the local HDFS.
Here are some options to adjust the size of the local HDFS:
- Decrease the total size of the local HDFS by decreasing the size of primary persistent disks for the master and workers. The primary persistent disk also contains the boot volume and system libraries, so allocate at least 100 GB.
- Increase the total size of the local HDFS by increasing the size of primary persistent disk for workers. Consider this option carefully— it's rare to have workloads that get better performance by using HDFS with standard persistent disks in comparison to using Cloud Storage or local HDFS with SSD.
- Attach up to eight SSDs (375 GB each) to each worker and use these disks for the HDFS. This is a good option if you need to use the HDFS for I/O-intensive workloads and you need single-digit millisecond latency. Make sure that you use a machine type that has enough CPUs and memory on the worker to support these disks.
- Use persistent-disk SSDs (PD-SSDs) for your master or workers as a primary disk.
Access Dataproc
Accessing Dataproc or Hadoop on Compute Engine is different than accessing an on-premises cluster. You need to determine security settings and network access options.
Networking
All VM instances of a Dataproc cluster require internal networking to each other, and they require open UDP, TCP, and ICMP ports. You can allow access to your Dataproc cluster from external IP addresses by using the default network configuration or by using a VPC network. Your Dataproc cluster will have networking access to all Google Cloud services (Cloud Storage buckets, APIs, and so on) in any network option you use. To allow networking access to or from on-premises resources, choose a VPC network configuration and set up the appropriate firewall rules. For details, see the Dataproc Cluster Network Configuration guide and the Access YARN section below.
Identity and access management
In addition to network access, your Dataproc cluster needs permissions to access resources. For example, to write data to a Cloud Storage bucket, your Dataproc cluster must have write access to the bucket. You establish access by using roles. Scan through your Spark code and find all the non–Dataproc resources that the code needs and grant the correct roles to the cluster's service account. Also, ensure that users who will create clusters, jobs, operations, and workflow templates have the right permissions.
For more details and best practices, see the IAM documentation.
Verify Spark and other library dependencies
Compare your version of Spark and the versions of other libraries to the official Dataproc version list and look for any libraries that are not already available. We recommend that you use Spark versions that are officially supported by Dataproc.
If you need to add libraries, you can do the following:
- Create a custom image of a Dataproc cluster.
- Create initialization scripts in Cloud Storage for your cluster. You can use initialization scripts to install additional dependencies, to copy binaries, and so on.
- Recompile your Java or Scala code and package all additional dependencies that are not part of the base distribution as a "fat jar" by using Gradle, Maven, Sbt, or another tool.
Adjust Dataproc cluster size
In any cluster configuration, whether on-premises or in the cloud, the cluster size is crucial for Spark job performance. A Spark job without enough resources will either be slow or will fail, especially if it does not have enough executor memory. For advice on what you need to consider when sizing any Hadoop cluster, see the sizing your cluster section of the Hadoop migration guide.
The following sections describe some options for how to size your cluster.
Getting the configuration of your current Spark jobs
Look at how your current Spark jobs are configured and make sure that the Dataproc cluster is large enough. If you move from a shared cluster to multiple Dataproc clusters (one for each batch workload), look at the YARN configuration for each application so that you understand how many executors you need, the number of CPUs per executor, and the total executor memory. If your on-premises cluster has YARN queues set up, see which jobs are sharing each queue's resources and identify bottlenecks. This migration is an opportunity to remove any resource restrictions you might have had in your on-premises cluster.
Choosing machine types and disk options
Choose the number and type of VMs to match the needs of your workload. If you have decided to use local HDFS for storage, make sure the VMs have the right disk type and size. Don't forget to include the resource needs of the driver programs in your calculation.
Each VM has a networking egress cap of 2 Gbps per vCPU. Writing to persistent disks or to persistent SSDs counts toward this cap, so a VM with a very low number of vCPUs might be throttled by the cap when it writes to these disks. This is likely to happen in the shuffle phase, when Spark writes shuffle data to disk and moves shuffle data over the network between executors. Persistent disks require at least 2 vCPUs to reach maximum write performance and persistent SSDs require 4 vCPUs. Note that these minimums don't take into account traffic such as communication between VMs. Also, the size of each disk affects its peak performance.
The configuration you choose will have an impact on the cost of your Dataproc cluster. Dataproc pricing is in addition to the Compute Engine per-instance price for each VM and other Google Cloud resources. For more information, and to use the Google Cloud pricing calculator to get an estimate of your costs, see the Dataproc pricing page.
Benchmarking performance and optimizing
When you've finished the job migration phase, but before you stop running Spark workloads in your on-premises cluster, benchmark your Spark jobs and consider any optimizations. Remember that you can resize your cluster if your configuration is not optimal.
Dataproc Serverless for Spark autoscaling
Use Dataproc Serverless to run Spark workloads without provisioning and managing your own cluster. Specify workload parameters, and then submit the workload to the Dataproc Serverless service. The service will run the workload on a managed compute infrastructure, autoscaling resources as needed. Dataproc Serverless charges apply only to the time when the workload is executing.
Performing the migration
This section discusses migrating the data, changing job code, and changing how jobs are run.
Migrate data
Before you run any Spark jobs in your Dataproc cluster, you need to migrate your data to Google Cloud. For more information, see the Data Migration Guide.
Migrate Spark code
After you've planned your migration to Dataproc and moved any
required data sources, you can migrate the job code. If there are no differences
in Spark versions between the two clusters, and if you want to store data on
Cloud Storage instead of local HDFS, you only need to change the prefix
of all of your HDFS file paths from hdfs://
to gs://
.
If you are using different Spark versions, consult the Spark release notes, compare the two versions, and adapt your Spark code accordingly.
You can copy the jar files for your Spark applications either to the Cloud Storage bucket that is tied to your Dataproc cluster or to an HDFS folder. The next section explains the available options for running Spark jobs.
If you decide to use workflow templates, we recommend that you separately test each Spark job that you plan to add. Then you can run a final test run of the template to make sure the template's workflow is correct (no missing upstream jobs, outputs are stored in the right locations, and so on).
Run jobs
You can run Spark jobs in the following ways:
By using the following
gcloud
command:gcloud dataproc jobs submit [COMMAND]
where:
[COMMAND]
isspark
,pyspark
, orspark-sql
You can set Spark properties with the
--properties
option. For more information, see the documentation for this command.By using the same process you used before you migrated the job to Dataproc. The Dataproc cluster must be accessible from on-premises, and you need to use the same configuration.
By using Cloud Composer. You can create an environment (a managed Apache Airflow server), define multiple Spark jobs as a DAG workflow, and then run the entire workflow.
For more details, see the Submit a Job guide.
Managing jobs after migration
After you move the Spark jobs to Google Cloud, it's important to manage these jobs by using the tools and mechanisms provided by Google Cloud. In this section, we discuss logging, monitoring, accessing clusters, scaling clusters, and optimizing jobs.
Use logging and performance monitoring
In Google Cloud, you can use Cloud Logging and Cloud Monitoring to view and customize logs, and to monitor jobs and resources.
The best way to find what error caused a Spark job failure is to look at the driver output and the logs generated by the Spark executors.
You can retrieve the driver program output by using the Google Cloud console or by
using a gcloud
command. The output is also stored in the Cloud Storage
bucket of the Dataproc cluster. For more details, see the
section on
job driver output
in the Dataproc documentation.
All other logs are located in different files inside the machines of the
cluster. It's possible to see the logs for each container from the Spark app web
UI (or from the History Server after the program ends) in the executors tab. You
need to browse through each Spark container to view each log. If you write logs
or print to stdout
or stderr
in your application code, the logs are saved in
the redirection of stdout
or stderr
.
In a Dataproc cluster, YARN is configured to collect all these logs by default, and they're available in Cloud Logging. Cloud Logging provides a consolidated and concise view of all logs so that you don't need to spend time browsing among container logs to find errors.
The following figure shows the Cloud Logging page in the Google Cloud console. You can view all logs from your Dataproc cluster by selecting the cluster's name in the selector menu. Don't forget to expand the time duration in the time-range selector.
You can get logs from a Spark application by filtering by its ID. You can get the application ID from the driver output.
Create and use labels
To find logs faster, you can create and use your own
labels
for each cluster or for each Dataproc job. For example, you can
create a label with the key env
and the value exploration
and use it for
your data exploration job. You can then get logs for all exploration job
creations by filtering with label:env:exploration
in Cloud Logging.
Note that this filter will not return all logs for this job, only the resource
creation logs.
Set the log level
You can
set the driver log level
using the following gcloud
command:
gcloud dataproc jobs submit hadoop --driver-log-levels
You set the log level for the rest of the application from the Spark context. For example:
spark.sparkContext.setLogLevel("DEBUG")
Monitor jobs
Cloud Monitoring can monitor the cluster's CPU, disk, network usage, and YARN resources. You can create a custom dashboard to get up- to-date charts for these and other metrics. Dataproc runs on top of Compute Engine. If you want to visualize CPU usage, disk I/O, or networking metrics in a chart, you need to select a Compute Engine VM instance as the resource type and then filter by the cluster name. The following diagram shows an example of the output.
To view metrics for Spark queries, jobs, stages, or tasks, connect to the Spark application's web UI. The next section section explains how to do this. For details on how to create custom metrics, see the Custom Metrics from the Agent guide.
Access YARN
You can access the YARN resource manager web interface from outside the Dataproc cluster by setting up an SSH tunnel. It's preferred to use the lightweight SOCKS proxy instead of local port forwarding, because browsing through the web interface is easier that way.
The following URLs are useful for YARN access:
YARN resource manager:
http://[MASTER_HOST_NAME]:8088
Spark history server:
http://[MASTER_HOST_NAME]:18080
If the Dataproc cluster has only internal IP addresses, you can connect either through a VPN connection or through a bastion host. For more information, see Choose a connection option for internal-only VMs.
Scale and resize Dataproc clusters
The Dataproc cluster can be scaled by increasing or decreasing the number of primary or secondary (preemptible) workers. Dataproc also supports graceful decommissioning.
Downscaling in Spark is affected by a number of factors. Take into account the following:
We do not recommend using
ExternalShuffleService
, especially if you downscale the cluster periodically. Shuffling uses the results that have been written to the worker's local disk after the compute phase has been executed, so the node can't be removed even if compute resources are not being consumed anymore.Spark caches data in memory (both RDDs and datasets), and executors that are used for caching never exit. As a result, if a worker is used for caching, it will never be gracefully decommissioned. Removing workers forcefully would influence the overall performance, because the cached data would be lost.
Spark Streaming has dynamic allocation disabled by default, and the configuration key that sets this behavior is not documented. (You can follow a discussion of dynamic allocation behavior in a Spark issues thread.) If you're using Spark Streaming or Spark Structured Streaming, you must also explicitly disable dynamic allocation as discussed earlier under Identify job types and plan clusters.
In general, we recommend that you avoid downscaling a Dataproc cluster if you're running batch or streaming workloads.
Optimize performance
This section discusses ways to get better performance and reduce cost while running Spark jobs.
Manage Cloud Storage file sizes
To get optimal performance, split your data in Cloud Storage into files with sizes from 128 MB to 1 GB. Using lots of small files can create a bottleneck. If you have many small files, consider copying files for processing to the local HDFS and then copying the results back.
Switch to SSD disks
If you perform many shuffling operations or partitioned writes, switch to SSDs to boost performance.
Place VMs in the same zone
To reduce networking costs and to boost performance, use the same regional location for your Cloud Storage buckets that you use for your Dataproc clusters.
By default, when you use global or regional Dataproc endpoints, your cluster's VMs will be placed into the same zone (or another zone in the same region that has enough capacity) when the cluster is created. You can also specify the zone when the cluster is created.
Use preemptible VMs
Dataproc cluster can use preemptible VM instances as workers. This results in lower per-hour compute costs for your non-critical workloads than by using normal instances. However, there are some factors to consider when you use preemptible VMs:
- Preemptible VMs can't be used for HDFS storage.
- As a default, preemptible VMs are created with a smaller boot disk size, and you might want to override this configuration if you are running shuffle-heavy workloads. For details, see the page on preemptible VMs in the Dataproc documentation.
- We don't recommend making more than half of your total workers preemptible.
When you use preemptible VMs, we recommend adjusting your cluster configuration to be more tolerant to task failures, because VMs might be less available. For example, make settings like the following in the YARN configuration:
yarn.resourcemanager.am.max-attempts mapreduce.map.maxattempts mapreduce.reduce.maxattempts spark.task.maxFailures spark.stage.maxConsecutiveAttempts
You can easily add or remove preemptible VMs from your cluster. For more details, see Preemptible VMs.
What's next
- See the guide on how to migrate on-premises Hadoop infrastructure to Google Cloud.
- See our description of a Life of a Dataproc Job.
- Explore reference architectures, diagrams, and best practices about Google Cloud. Take a look at our Cloud Architecture Center.