Migrating Apache Spark Jobs to Dataproc

This document describes how to move Apache Spark jobs to Dataproc. The document is intended for big-data engineers and architects. It covers topics such as considerations for migration, preparation, job migration, and management.

Overview

When you want to move your Apache Spark workloads from an on-premises environment to Google Cloud, we recommend using Dataproc to run Apache Spark/Apache Hadoop clusters. Dataproc is a fully managed, fully supported service offered by Google Cloud. It allows you to separate storage and compute, which helps you to manage your costs and be more flexible in scaling your workloads.

If a managed Hadoop environment doesn't fit your needs, you can also use a different setup, such as running Spark on Google Kubernetes Engine (GKE), or renting virtual machines on Compute Engine and setting up a Hadoop or Spark cluster yourself. However, take into account that options other than using Dataproc are self-managed and have only community support.

Planning your migration

There are many differences between running Spark jobs on-premises and running Spark jobs on Dataproc or Hadoop clusters on Compute Engine. It's important to look closely at your workload and prepare for migration. In this section, we outline considerations to take into account, and preparations that to take before you migrate Spark jobs.

Identify job types and plan clusters

There are three types of Spark workloads, as described in this section.

Regularly scheduled batch jobs

Regularly scheduled batch jobs include use cases like daily or hourly ETLs, or pipelines for training machine learning models with Spark ML. For these cases, we recommend that you create a cluster for each batch workload and then delete the cluster after your job is finished. You have the flexibility to configure your cluster, because you can adjust the configuration for each workload separately. Dataproc clusters are billed in one-second block increments after the first minute, so this approach is also cost-effective, because you can label your clusters. For more information, see the Dataproc pricing page.

You can implement batch jobs with workflow templates or by following these steps:

  1. Create a cluster and wait until the cluster is created. (You can monitor whether the cluster has been created by using an API call or a gcloud command.) If you run your job on a dedicated Dataproc cluster, it might help to turn off the dynamic allocation and the external shuffle service. The following gcloud command shows Spark configuration properties that are provided when you create the Dataproc cluster:

    dataproc clusters create ... \
        --properties 'spark:spark.dynamicAllocation.enabled=false,spark:spark.shuffle.service.enabled=false,spark.executor.instances=10000'
  2. Submit your job to the cluster. (You can monitor the status of your job by using an API call or a gcloud command.) For example:

    jobId=$(gcloud --quiet dataproc jobs submit pyspark \
        --async \
        --format='value(reference.jobId)' \
        --cluster $clusterName \
        --region global \
        gs://dataproc-examples-2f10d78d114f6aaec76462e3c310f31f/src/pyspark/hello-world/hello-world.py)
    
    gcloud dataproc jobs describe $jobId \
        --region=global \
        --format='value(status.state)'
  3. Delete the cluster after the job has been executed by using an API call or a gcloud command.

Streaming jobs

For streaming jobs, you need to create a long-running Dataproc cluster and configure the cluster to run in high-availability mode. We don't recommend using preemptible VMs for this case.

Ad hoc or interactive workloads submitted by users

Examples of ad hoc workloads can include users who write queries or execute analytical jobs during the day.

For these cases, you must decide whether you need the cluster to run in high-availability mode, whether you'd like to use preemptible VMs, and how you'll manage access to the cluster. You can schedule cluster creation and termination (for example, if you never need the cluster during the night or weekends), and you can implement up and down scaling according to the schedule.

Identify data sources and dependencies

Each job has its own dependencies (for example, the data sources it needs), and other teams at your company might be dependent on the outcome of your jobs. Therefore, you must identify all dependencies and then create a migration plan that includes procedures for the following:

  • Step-by-step migration of all of your data sources to Google Cloud. In the beginning, it's helpful to mirror the data source in Google Cloud so that you have it in two places.

  • Job-by-job migration of your Spark workloads to Google Cloud as soon as corresponding data sources have been migrated. As with data, at some point you might have two workloads that are running in parallel both in your old environment and in Google Cloud.

  • Migration of other workloads that depend on the output of your Spark workloads. Or you might just replicate the output back to the initial environment.

  • Shutdown of Spark jobs in the old environment after all dependent teams have confirmed that they no longer require the jobs.

Choose storage options

There are two storage options to use with your Dataproc clusters: you can store all of the data in Cloud Storage, or you can use local disks or persistent disks with the cluster workers. The correct choice depends on the character of your jobs.

Comparing Cloud Storage and HDFS

Each node of a Dataproc cluster has a Cloud Storage connector installed on it. By default, the connector is installed under /usr/lib/hadoop/lib. The connector implements the Hadoop FileSystem interface and makes Cloud Storage compatible with HDFS.

Because Cloud Storage is a binary large object (BLOB) storage system, the connector emulates directories according to the object's name. You can access your data by using the gs:// prefix instead of the hdfs:// prefix.

The Cloud Storage connector typically doesn't require any customization. However, if you need to make changes, you can follow the instructions for configuring the connector. A full list of configuration keys is also available.

Cloud Storage is a good option if:

  • Your data in ORC, Parquet, Avro, or any other format will be used by different clusters or jobs, and you need data persistence if the cluster terminates.
  • You need high throughput and your data is stored in files larger than 128 MB.
  • You need cross-zone durability for your data.
  • You need data to be highly available—for example, you want to eliminate HDFS NameNode as a single point of failure.

Local HDFS storage is a good option if:

  • Your jobs require a lot of metadata operations—for example, you have thousands of partitions and directories, and each file size is relatively small.
  • You modify the HDFS data frequently or you rename directories. (Cloud Storage objects are immutable, so renaming a directory is an expensive operation because it consists of copying all objects to a new key and deleting them afterwards.)
  • You heavily use the append operation on HDFS files.
  • You have workloads that involve heavy I/O. For example, you have a lot of partitioned writes, such as the following:

    spark.read().write.partitionBy(...).parquet("gs://")
  • You have I/O workloads that are especially sensitive to latency. For example, you require single-digit millisecond latency per storage operation.

In general, we recommend using Cloud Storage as the initial and final source of data in a big-data pipeline. For example, if a workflow contains five Spark jobs in series, the first job retrieves the initial data from Cloud Storage and then writes shuffle data and intermediate job output to HDFS. The final Spark job writes its results to Cloud Storage.

Adjust storage size

Using Dataproc with Cloud Storage allows you to reduce the disk requirements and save costs by putting your data there instead of in the HDFS. When you keep your data on Cloud Storage and don't store it on the local HDFS, you can use smaller disks for your cluster. By making your cluster truly on-demand, you're also able to separate storage and compute, as noted earlier, which helps you reduce costs significantly.

Even if you store all of your data in Cloud Storage, your Dataproc cluster needs HDFS for certain operations such as storing control and recovery files, or aggregating logs. It also needs non-HDFS local disk space for shuffling. You can reduce the disk size per worker if you are not heavily using the local HDFS.

Here are some options to adjust the size of the local HDFS:

  • Decrease the total size of the local HDFS by decreasing the size of primary persistent disks for the master and workers. The primary persistent disk also contains the boot volume and system libraries, so allocate at least 100 GB.
  • Increase the total size of the local HDFS by increasing the size of primary persistent disk for workers. Consider this option carefully— it's rare to have workloads that get better performance by using HDFS with standard persistent disks in comparison to using Cloud Storage or local HDFS with SSD.
  • Attach up to eight SSDs (375 GB each) to each worker and use these disks for the HDFS. This is a good option if you need to use the HDFS for I/O-intensive workloads and you need single-digit millisecond latency. Make sure that you use a machine type that has enough CPUs and memory on the worker to support these disks.
  • Use persistent-disk SSDs (PD-SSDs) for your master or workers as a primary disk.

Access Dataproc

Accessing Dataproc or Hadoop on Compute Engine is different than accessing an on-premises cluster. You need to determine security settings and network access options.

Networking

All VM instances of a Dataproc cluster require internal networking to each other, and they require open UDP, TCP, and ICMP ports. You can allow access to your Dataproc cluster from external IP addresses by using the default network configuration or by using a VPC network. Your Dataproc cluster will have networking access to all Google Cloud services (Cloud Storage buckets, APIs, and so on) in any network option you use. To allow networking access to or from on-premises resources, choose a VPC network configuration and set up the appropriate firewall rules. For details, see the Dataproc Cluster Network Configuration guide and the Access YARN section below.

Identity and access management

In addition to network access, your Dataproc cluster needs permissions to access resources. For example, to write data to a Cloud Storage bucket, your Dataproc cluster must have write access to the bucket. You establish access by using roles. Scan through your Spark code and find all the non–Dataproc resources that the code needs and grant the correct roles to the cluster's service account. Also, ensure that users who will create clusters, jobs, operations, and workflow templates have the right permissions.

For more details and best practices, see the IAM documentation.

Verify Spark and other library dependencies

Compare your version of Spark and the versions of other libraries to the official Dataproc version list and look for any libraries that are not already available. We recommend that you use Spark versions that are officially supported by Dataproc.

If you need to add libraries, you can do the following:

  • Create a custom image of a Dataproc cluster.
  • Create initialization scripts in Cloud Storage for your cluster. You can use initialization scripts to install additional dependencies, to copy binaries, and so on.
  • Recompile your Java or Scala code and package all additional dependencies that are not part of the base distribution as a "fat jar" by using Gradle, Maven, Sbt, or another tool.

Adjust Dataproc cluster size

In any cluster configuration, whether on-premises or in the cloud, the cluster size is crucial for Spark job performance. A Spark job without enough resources will either be slow or will fail, especially if it does not have enough executor memory. For advice on what you need to consider when sizing any Hadoop cluster, see the sizing your cluster section of the Hadoop migration guide.

The following sections describe some options for how to size your cluster.

Getting the configuration of your current Spark jobs

Look at how your current Spark jobs are configured and make sure that the Dataproc cluster is large enough. If you move from a shared cluster to multiple Dataproc clusters (one for each batch workload), look at the YARN configuration for each application so that you understand how many executors you need, the number of CPUs per executor, and the total executor memory. If your on-premises cluster has YARN queues set up, see which jobs are sharing each queue's resources and identify bottlenecks. This migration is an opportunity to remove any resource restrictions you might have had in your on-premises cluster.

Choosing machine types and disk options

Choose the number and type of VMs to match the needs of your workload. If you have decided to use local HDFS for storage, make sure the VMs have the right disk type and size. Don't forget to include the resource needs of the driver programs in your calculation.

Each VM has a networking egress cap of 2 Gbps per vCPU. Writing to persistent disks or to persistent SSDs counts toward this cap, so a VM with a very low number of vCPUs might be throttled by the cap when it writes to these disks. This is likely to happen in the shuffle phase, when Spark writes shuffle data to disk and moves shuffle data over the network between executors. Persistent disks require at least 2 vCPUs to reach maximum write performance and persistent SSDs require 4 vCPUs. Note that these minimums don't take into account traffic such as communication between VMs. Also, the size of each disk affects its peak performance.

The configuration you choose will have an impact on the cost of your Dataproc cluster. Dataproc pricing is in addition to the Compute Engine per-instance price for each VM and other Google Cloud resources. For more information, and to use the Google Cloud pricing calculator to get an estimate of your costs, see the Dataproc pricing page.

Benchmarking performance and optimizing

When you've finished the job migration phase, but before you stop running Spark workloads in your on-premises cluster, benchmark your Spark jobs and consider any optimizations. Remember that you can resize your cluster if your configuration is not optimal.

Dataproc Serverless for Spark autoscaling

Use Dataproc Serverless to run Spark workloads without provisioning and managing your own cluster. Specify workload parameters, and then submit the workload to the Dataproc Serverless service. The service will run the workload on a managed compute infrastructure, autoscaling resources as needed. Dataproc Serverless charges apply only to the time when the workload is executing.

Performing the migration

This section discusses migrating the data, changing job code, and changing how jobs are run.

Migrate data

Before you run any Spark jobs in your Dataproc cluster, you need to migrate your data to Google Cloud. For more information, see the Data Migration Guide.

Migrate Spark code

After you've planned your migration to Dataproc and moved any required data sources, you can migrate the job code. If there are no differences in Spark versions between the two clusters, and if you want to store data on Cloud Storage instead of local HDFS, you only need to change the prefix of all of your HDFS file paths from hdfs:// to gs://.

If you are using different Spark versions, consult the Spark release notes, compare the two versions, and adapt your Spark code accordingly.

You can copy the jar files for your Spark applications either to the Cloud Storage bucket that is tied to your Dataproc cluster or to an HDFS folder. The next section explains the available options for running Spark jobs.

If you decide to use workflow templates, we recommend that you separately test each Spark job that you plan to add. Then you can run a final test run of the template to make sure the template's workflow is correct (no missing upstream jobs, outputs are stored in the right locations, and so on).

Run jobs

You can run Spark jobs in the following ways:

  • By using the following gcloud command:

    gcloud dataproc jobs submit [COMMAND]

    where:

    [COMMAND] is spark, pyspark, or spark-sql

    You can set Spark properties with the --properties option. For more information, see the documentation for this command.

  • By using the same process you used before you migrated the job to Dataproc. The Dataproc cluster must be accessible from on-premises, and you need to use the same configuration.

  • By using Cloud Composer. You can create an environment (a managed Apache Airflow server), define multiple Spark jobs as a DAG workflow, and then run the entire workflow.

For more details, see the Submit a Job guide.

Managing jobs after migration

After you move the Spark jobs to Google Cloud, it's important to manage these jobs by using the tools and mechanisms provided by Google Cloud. In this section, we discuss logging, monitoring, accessing clusters, scaling clusters, and optimizing jobs.

Use logging and performance monitoring

In Google Cloud, you can use Cloud Logging and Cloud Monitoring to view and customize logs, and to monitor jobs and resources.

The best way to find what error caused a Spark job failure is to look at the driver output and the logs generated by the Spark executors.

You can retrieve the driver program output by using the Google Cloud console or by using a gcloud command. The output is also stored in the Cloud Storage bucket of the Dataproc cluster. For more details, see the section on job driver output in the Dataproc documentation.

All other logs are located in different files inside the machines of the cluster. It's possible to see the logs for each container from the Spark app web UI (or from the History Server after the program ends) in the executors tab. You need to browse through each Spark container to view each log. If you write logs or print to stdout or stderr in your application code, the logs are saved in the redirection of stdout or stderr.

In a Dataproc cluster, YARN is configured to collect all these logs by default, and they're available in Cloud Logging. Cloud Logging provides a consolidated and concise view of all logs so that you don't need to spend time browsing among container logs to find errors.

The following figure shows the Cloud Logging page in the Google Cloud console. You can view all logs from your Dataproc cluster by selecting the cluster's name in the selector menu. Don't forget to expand the time duration in the time-range selector.

Cloud Logging page in the Google Cloud console

You can get logs from a Spark application by filtering by its ID. You can get the application ID from the driver output.

Create and use labels

To find logs faster, you can create and use your own labels for each cluster or for each Dataproc job. For example, you can create a label with the key env and the value exploration and use it for your data exploration job. You can then get logs for all exploration job creations by filtering with label:env:exploration in Cloud Logging. Note that this filter will not return all logs for this job, only the resource creation logs.

Set the log level

You can set the driver log level using the following gcloud command:

gcloud dataproc jobs submit hadoop --driver-log-levels

You set the log level for the rest of the application from the Spark context. For example:

spark.sparkContext.setLogLevel("DEBUG")

Monitor jobs

Cloud Monitoring can monitor the cluster's CPU, disk, network usage, and YARN resources. You can create a custom dashboard to get up- to-date charts for these and other metrics. Dataproc runs on top of Compute Engine. If you want to visualize CPU usage, disk I/O, or networking metrics in a chart, you need to select a Compute Engine VM instance as the resource type and then filter by the cluster name. The following diagram shows an example of the output.

Monitoring page in the Google Cloud console

To view metrics for Spark queries, jobs, stages, or tasks, connect to the Spark application's web UI. The next section section explains how to do this. For details on how to create custom metrics, see the Custom Metrics from the Agent guide.

Access YARN

You can access the YARN resource manager web interface from outside the Dataproc cluster by setting up an SSH tunnel. It's preferred to use the lightweight SOCKS proxy instead of local port forwarding, because browsing through the web interface is easier that way.

The following URLs are useful for YARN access:

  • YARN resource manager: http://[MASTER_HOST_NAME]:8088

  • Spark history server: http://[MASTER_HOST_NAME]:18080

If the Dataproc cluster has only internal IP addresses, you can connect either through a VPN connection or through a bastion host. For more information, see Choose a connection option for internal-only VMs.

Scale and resize Dataproc clusters

The Dataproc cluster can be scaled by increasing or decreasing the number of primary or secondary (preemptible) workers. Dataproc also supports graceful decommissioning.

Downscaling in Spark is affected by a number of factors. Take into account the following:

  • We do not recommend using ExternalShuffleService, especially if you downscale the cluster periodically. Shuffling uses the results that have been written to the worker's local disk after the compute phase has been executed, so the node can't be removed even if compute resources are not being consumed anymore.

  • Spark caches data in memory (both RDDs and datasets), and executors that are used for caching never exit. As a result, if a worker is used for caching, it will never be gracefully decommissioned. Removing workers forcefully would influence the overall performance, because the cached data would be lost.

  • Spark Streaming has dynamic allocation disabled by default, and the configuration key that sets this behavior is not documented. (You can follow a discussion of dynamic allocation behavior in a Spark issues thread.) If you're using Spark Streaming or Spark Structured Streaming, you must also explicitly disable dynamic allocation as discussed earlier under Identify job types and plan clusters.

In general, we recommend that you avoid downscaling a Dataproc cluster if you're running batch or streaming workloads.

Optimize performance

This section discusses ways to get better performance and reduce cost while running Spark jobs.

Manage Cloud Storage file sizes

To get optimal performance, split your data in Cloud Storage into files with sizes from 128 MB to 1 GB. Using lots of small files can create a bottleneck. If you have many small files, consider copying files for processing to the local HDFS and then copying the results back.

Switch to SSD disks

If you perform many shuffling operations or partitioned writes, switch to SSDs to boost performance.

Place VMs in the same zone

To reduce networking costs and to boost performance, use the same regional location for your Cloud Storage buckets that you use for your Dataproc clusters.

By default, when you use global or regional Dataproc endpoints, your cluster's VMs will be placed into the same zone (or another zone in the same region that has enough capacity) when the cluster is created. You can also specify the zone when the cluster is created.

Use preemptible VMs

Dataproc cluster can use preemptible VM instances as workers. This results in lower per-hour compute costs for your non-critical workloads than by using normal instances. However, there are some factors to consider when you use preemptible VMs:

  • Preemptible VMs can't be used for HDFS storage.
  • As a default, preemptible VMs are created with a smaller boot disk size, and you might want to override this configuration if you are running shuffle-heavy workloads. For details, see the page on preemptible VMs in the Dataproc documentation.
  • We don't recommend making more than half of your total workers preemptible.
  • When you use preemptible VMs, we recommend adjusting your cluster configuration to be more tolerant to task failures, because VMs might be less available. For example, make settings like the following in the YARN configuration:

    yarn.resourcemanager.am.max-attempts
    mapreduce.map.maxattempts
    mapreduce.reduce.maxattempts
    spark.task.maxFailures
    spark.stage.maxConsecutiveAttempts
  • You can easily add or remove preemptible VMs from your cluster. For more details, see Preemptible VMs.

What's next