Announcing the General Availability (GA) of Native Query Execution, which supports Spark Dataframe APIs, Spark SQL queries that read data from Parquet and ORC files, and workloads recommended by the Native Query Execution qualification tool. For questions about additional use cases, please contact dataproc-pms@google.com.
This document describes how to enable Dataproc Serverless batch workloads and interactive sessions running in the Premium pricing tier to use Native Query Execution.
How to use Native Query Execution with premium tier pricing
Dataproc Serverless Native Query Execution is available only with batch workloads and interactive sessions running in the Dataproc Serverless premium pricing tier. Premium tier pricing is charged at a higher cost than standard tier pricing, but there is no additional charge for Native Query Execution. For more information, see Dataproc Serverless pricing.
You can enable premium tier resource allocation and pricing for the batch
and interactive session resources by setting the following
resource allocation tier properties
to premium
when you submit a Spark batch workload or an interactive session.
You configure Native Query Execution by setting Native Query Execution properties on a batch workload, interactive session, or session template, and then submitting the workload or running the interactive session in a notebook.
Console
In the Google Cloud console:
- Go to Dataproc Batches.
- Click Create to open the Create batch page.
Select and fill in the following fields to configure the batch for Native Query Execution:
- Container:
- Runtime version: Select
1.2
,2.2
, or if available, a highermajor.minor
version number. See Supported Dataproc Serverless for Spark runtime versions.
- Runtime version: Select
- Executor and Driver Tier Configuration:
- Select
Premium
for all tiers (Driver Compute Tier, Execute Compute Tier).
- Select
- Properties: Enter the following
Key
(property name) andValue
pairs for the following Native Query Execution properties:Key Value spark.dataproc.runtimeEngine
native
- Container:
Fill in, select, or confirm other batch workloads settings. See Submit a Spark batch workload.
Click SUBMIT to run the Spark batch workload.
gcloud
Set the following gcloud CLI
gcloud dataproc batches submit spark
command flags to configure the batch workload for Native Query Execution:
gcloud dataproc batches submit spark \ --project=PROJECT_ID \ --region=REGION \ --version=VERSION \ --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \ --class=org.apache.spark.examples.SparkPi \ --properties=spark.dataproc.runtimeEngine=native,spark.dataproc.driver.compute.tier=premium,spark.dataproc.executor.compute.tier=premium \ OTHER_FLAGS_AS_NEEDED
Notes:
- PROJECT_ID: Your Google Cloud project ID. Project IDs are listed in the Project info section on the Google Cloud console Dashboard.
- REGION: An available Compute Engine region to run the workload.
- VERSION: Specify
1.2
,2.2
, or if available, a highermajor.minor
version number See Supported Dataproc Serverless for Spark runtime versions. - OTHER_FLAGS_AS_NEEDED: See Submit a Spark batch workload.
API
Set the following Dataproc API fields to configure the batch workload for Native Query Execution:
- RuntimeConfig.version:
Specify
1.2
,2.2
, or if available, a highermajor.minor
version number. See Supported Dataproc Serverless for Spark runtime versions. RuntimeConfig.properties: Set the following Native Query Execution properties:
"spark.dataproc.runtimeEngine":"native" "spark.dataproc.driver.compute.tier":"premium" "spark.dataproc.executor.compute".tier:"premium"
Notes:
- See Submit a Spark batch workload to set other batch workload API fields.
Tuning your Native Query Execution workload
Dataproc Serverless Native Query Execution can be tuned further using the following properties:
Batch Property | When to use |
---|---|
spark.driver.memory spark.driver.memoryOverhead |
To tune the memory provided to spark driver process |
spark.executor.memory spark.executor.memoryOverhead spark.memory.offHeap.size |
To tune the memory provided to onheap/offheap memory of executor process |
spark.dataproc.driver.disk.tier spark.dataproc.driver.disk.size |
To configure premium disk tier and size for driver |
spark.dataproc.executor.disk.tier spark.dataproc.executor.disk.size |
To configure premium disk tier and size for executor |
Native Query Execution properties
spark.dataproc.runtimeEngine=native
(Required): The workload runtime engine must be set tonative
to override the defaultspark
runtime engine.version
(Required): The workload must use Spark runtime version 1.2.26+, 2.2.26+, or a later major runtime version.Premium compute tiers (Required): The
spark.dataproc.spark.driver.compute.tier
andspark.dataproc.executor.compute.tier
properties must be set topremium
.Premium disk tiers (Optional and Recommended): Premium disk tiers use columnar instead of row-based shuffle to provide better performance. For better shuffle I/O throughput, use the driver and executor premium disk tiers with a sufficiently large disk size to accommodate shuffle files.
Memory (Optional): If you have configured Native Query Execution engine without configuring both off-heap memory (
spark.memory.offHeap.size
) and on-heap memory (spark.executor.memory
), the Native Query Execution engine takes a default4g
amount of memory and divides it in a6:1
ratio between off-heap and on-heap memory.If you decide to configure memory when using Native Query Execution, you can do so in either of the following ways:
Configure off-heap memory only (
spark.memory.offHeap.size
) with a specified value. Native Query Execution will uses the specified value as off-heap memory, and allocates an additional1/7th
of the off-heap memory value as on-heap memory.Configure both on-heap memory (
spark.executor.memory
) and off-heap memory (spark.memory.offHeap.size
). The amount you allocate to off-heap memory must be greater than the amount you allocate to on-heap memory. Recommendation: Allocate off-heap to on-heap memory in a 6:1 ratio.
Sample values:
Memory settings without Native Query Execution Recommended memory settings with Native Query Execution spark.executor.memory
spark.memory.offHeap.size
spark.executor.memory
7g 6g 1g 14g 12g 2g 28g 24g 4g 56g 48g 8g
Native Query Execution qualification tool
You can run the Dataproc Native Query Execution qualification tool,
run_qualification_tool.sh
, to identify workloads that can achieve faster runtimes
with Native Query Execution. The tool analyzes the Spark event files
generated by batch workload applications, and then estimates potential runtime
savings that each workload application can obtain with Native Query Execution.
Run the qualification tool
Perform the following steps to run the tool against Dataproc Serverless batch workload event files.
1.Copy the
run_qualification_tool.sh
into a local directory that contains the Spark event files to analyze.
Run the qualification tool to analyze one event file or a set of event files contained in the script directory.
./run_qualification_tool.sh -f EVENT_FILE_PATH/EVENT_FILE_NAME \ -o CUSTOM_OUTPUT_DIRECTORY_PATH \ -k SERVICE_ACCOUNT_KEY \ -x MEMORY_ALLOCATEDg \ -t PARALLEL_THREADS_TO_RUN
Flags and values:
-f
(required): See Spark event file locations to locate Spark workload event files.EVENT_FILE_PATH (required unless EVENT_FILE_NAME is specified): Path of the event file to analyze. If not provided, the event file path is assumed to be the current directory.
EVENT_FILE_NAME (required unless EVENT_FILE_PATH is specified): Name of the event file to analyze. If not provided, the event files found recursively in the
EVENT_FILE_PATH
are analyzed.
-o
(optional): If not provided, the tool creates or uses an existingoutput
directory under the current directory to place output files.- CUSTOM_OUTPUT_DIRECTORY_PATH: Output directory path to output files.
-k
(optional):- SERVICE_ACCOUNT_KEY: The service account key in JSON format if needed to access the EVENT_FILE_PATH.
-x
(optional):- MEMORY_ALLOCATED: Memory in gigabytes to allocate to the tool. By default, the tool uses 80% of the free memory available in the system and all the available machine cores.
-t
(optional):- PARALLEL_THREADS_TO_RUN: The N=number of parallel threads for the tool to execute. By default, the tool executes all cores.
Example command usage:
./run_qualification_tool.sh -f gs://dataproc-temp-us-east1-9779/spark-job-history \ -o perfboost-output -k /keys/event-file-key -x 34g -t 5
In this example, the qualification tool traverses the
gs://dataproc-temp-us-east1-9779/spark-job-history
directory, and analyzes Spark event files contained with this directories and its subdirectories. Access to the directory is provided the/keys/event-file-key
. The tool uses34 GB memory
, for execution, and runs5
parallel threads.
Qualification tool output files
Once analysis is complete, the qualification tool places the following output files in a
perfboost-output
directory in the current directory:
AppsRecommendedForBoost.tsv
: A tab-separated list of applications recommended for use with Native Query Execution.UnsupportedOperators.tsv
: A tab-separated list of applications not recommended for use with Native Query Execution.
AppsRecommendedForBoost.tsv
output file
The following table shows the contents of a sample AppsRecommendedForBoost.tsv
output file. It contains a row for each analysed application.
Sample AppsRecommendedForBoost.tsv
output file:
applicationId | applicationName | rddPercentage | unsupportedSqlPercentage | totalTaskTime | supportedTaskTime | supportedSqlPercentage | recommendedForBoost | expectedRuntimeReduction |
---|---|---|---|---|---|---|---|---|
app-2024081/batches/083f6196248043938-000 | projects/example.com:dev/locations/us-central1 6b4d6cae140f883c0 11c8e |
0.00% | 0.00% | 548924253 | 548924253 | 100.00% | TRUE | 30.00% |
app-2024081/batches/60381cab738021457-000 | projects/example.com:dev/locations/us-central1 474113a1462b426bf b3aeb |
0.00% | 0.00% | 514401703 | 514401703 | 100.00% | TRUE | 30.00% |
Column descriptions:
applicationId
: TheApplicationID
of the Spark application. Use this to identify the corresponding batch workload.applicationName
: The name of the Spark application.rddPercentage
: The percentage of RDD operations in the application. RDD operations are not supported by Native Query Execution.unsupportedSqlPercentage:
Percentage of SQL operations not supported by Native Query Execution.totalTaskTime
: Cumulative task time of all tasks executed during the application run.supportedTaskTime
: The total task time supported by Native Query Execution.
The following columns provide the important information to help you determine if Native Query Execution can benefit your batch workload:
supportedSqlPercentage
: The percentage of SQL operations supported by native query execution. The higher the percentage, the more runtime reduction that can be achieved by running the application with Native Query Execution.recommendedForBoost
: IfTRUE
, running the application with native query execution is recommended. IfrecommendedForBoost
isFALSE
, don't use Native Query Execution on the batch workload.expectedRuntimeReduction
: The expected percentage reduction in application runtime when you run the application with Native Query Execution.
UnsupportedOperators.tsv
output file.
The UnsupportedOperators.tsv
output file contains a list of operators used in
workload applications that are not supported by Native Query Execution.
Each row in the output file lists an unsupported operator.
Column descriptions:
unsupportedOperator
: The name of the operator that is not supported by Native Query Execution.cumulativeCpuMs
: The number of CPU milliseconds consumed during the execution of the operator. This value reflects the relative importance of the operator in the application.count
: The number of times the operator is used in the application.
Run the qualification tool across projects
This section provides instructions to run a script to run the qualification tool to analyse batch workloads Spark event files from multiple projects.
Script requirements and limitations:
- Run the script on Linux machines:
- Java Version
>=11
must be installed as the default Java version.
- Java Version
- Since logs in Cloud Logging have a 30-day TTL, Spark event files from batch workloads run more than 30 days ago cannot be analysed.
To run the qualification tool across projects, perform the following steps.
Download the
list-batches-and-run-qt.sh
script, and copy it to your local machine.Change script permissions.
chmod +x list-batches-and-run-qt.sh
Prepare a project input file list to pass to the script for analysis. Create the text file by adding one row in the following format for each project and region with batch workload Spark event files to analyze.
-r REGION -s START_DATE -e END_DATE -p PROJECT_ID -l LIMIT_MAX_BATCHES -k KEY_PATH
Notes:
-r
(required):- REGION: Region where the batches in the project are submitted.
-s
(mandatory): Format:yyyy-mm-dd
. You can add an optional00:00:00
time segment.- START_DATE: Only batch workloads created after the start date are analyzed. Batches are analysed in descending order of batch creation time—the most recent batches are analyzed first.
-e
(optional): Format:yyyy-mm-dd
. You can add an optional00:00:00
time segment.- END_DATE: If you specify this, only batch workloads created before or on the end date are analyzed. If not specified, all batches created after the START_DATE are analyzed. Batches are analysed in descending order of batch creation time—the most recent batches are analyzed first.
-l
(optional):LIMIT_MAX_BATCHES: The maximum number of batches to analyze. You can use this option in combination with START-DATE and END-DATE to analyze a limited number of batches created between the specified dates.
If -l is not specified, the default number of up to
100
batches are analysed.
-k
(optional):- KEY_PATH: A local path that contains Cloud Storage access keys for the workload Spark event files.
Input file example:
-r us-central1 -s 2024-08-21 -p project1 -k key1 -r us-east1 -s 2024-08-21 -e 2024-08-23 -l 50 -p project2 -k key2
Notes:
Row 1: Up to the most recent 100 (default) Spark event files in
project1
in theus-central1
region with a creation time after2024-08-21 00:00:00 AM
will be analysed.key1
allows access to the files in Cloud Storage.Row 2: Up to the most recent 50 Spark event files in
project2
in theus-eastl1
region with a creation time after2024-08-21 00:00:00 AM
and before or on 2024-08-23 11:59:59 PM will be analysed.key2
allows access to the event files in Cloud Storage.
Run the
list-batches-and-run-qt.sh
script. Analysis are output in the fileAppsRecommendedForBoost.tsv
andUnsupportedOperators.tsv
files../list-batches-and-run-qt.sh PROJECT_INPUT_FILE_LIST \ -x MEMORY_ALLOCATED \ -o CUSTOM_OUTPUT_DIRECTORY_PATH \ -t PARALLEL_THREADS_TO_RUN
Notes:
PROJECT_INPUT_FILE_LIST: See #3 instructions in the section.
-x
,-o
, and-t
: See Run the qualification tool.
Spark event file locations
Perform any of the following steps to find the Spark event files for Dataproc Serverless batch workloads:
In Cloud Storage, find the
spark.eventLog.dir
for the workload, then download it.- If you can't find the
spark.eventLog.dir
, set thespark.eventLog.dir
to a Cloud Storage location, and then rerun the workload and download thespark.eventLog.dir
.
- If you can't find the
If you have configured Spark History Server for the batch job:
- Go to the Spark History Server, then select the workload.
- Click Download in the Event Log column.
When to use Native Query Execution
Use Native Query Execution in the following scenarios:
Spark Dataframe APIs and Spark SQL queries
that read data from Parquet and ORC files.
Workloads recommended by the
Native Query Execution qualification tool.
When not to use Native Query Execution
Don't use Native Query Execution in the following scenarios since doing so may not achieve workload runtime reduction, and may cause failure or regression:
- Workloads not recommended by the Native Query Execution qualification tool.
- Spark RDD, UDF, and ML workloads
- Write workloads
- File formats other than Parquet and ORC
- Inputs of the following data types:
Timestamp, TinyInt, Byte, Struct, Array, Map
: ORC and ParquetVarChar, Char
: ORC- Queries that contain regular expressions
- Workloads that use a Requester Pays bucket
- Non-default Cloud Storage configuration settings. Native Query Execution uses many defaults even when overridden.
Limitations
Enabling Native Query Execution in the following scenarios can throw exceptions, raise Spark incompatibilities, or cause the workload to fallback to the default Spark engine.
Fallbacks
Native Query Execution in the following the execution can result in workload fallback to the Spark execution engine, resulting in regression or failure.
ANSI: If ANSI mode is enabled, execution falls back to Spark.
Case-sensitive mode: Native Query Execution supports the Spark default case-insensitive mode only. If case-sensitive mode is enabled, incorrect results can occur.
Partitioned table scan: Native Query Execution supports the partitioned table scan only when the path contains the partition information, otherwise the workload falls back to the Spark execution engine.
Incompatible behavior
Incompatible behavior or incorrect results can result when using Native query execution in the following cases:
JSON functions: Native Query Execution supports strings surrounded by double quotes, not single quotes. Incorrect results occur with single quotes. Using "*" in the path with the
get_json_object
function returnsNULL
.Parquet read configuration:
- Native Query Execution treats
spark.files.ignoreCorruptFiles
as set to the defaultfalse
value, even when set totrue
. - Native Query Execution ignores
spark.sql.parquet.datetimeRebaseModeInRead
, and returns only the Parquet file contents. Differences between the legacy hybrid (Julian Gregorian) calendar and the Proleptic Gregorian calendar are not considered. Spark results can differ.
- Native Query Execution treats
NaN
: Not supported. Unexpected results can occur, for example, when usingNaN
in a numeric comparison.Spark columnar reading: A fatal error can occur due since the Spark columnar vector is incompatible with Native Query Execution.
Spill: When shuffle partitions are set to a large number, the spill-to-disk feature can trigger an
OutOfMemoryException
. If this occurs, reducing the number of partitions can eliminate this exception.