Accelerate Dataproc Serverless batch workloads and interactive sessions with Native Query Execution

Announcing the General Availability (GA) of Native Query Execution, which supports Spark Dataframe APIs, Spark SQL queries that read data from Parquet and ORC files, and workloads recommended by the Native Query Execution qualification tool. For questions about additional use cases, please contact dataproc-pms@google.com.

This document describes how to enable Dataproc Serverless batch workloads and interactive sessions running in the Premium pricing tier to use Native Query Execution.

How to use Native Query Execution with premium tier pricing

Dataproc Serverless Native Query Execution is available only with batch workloads and interactive sessions running in the Dataproc Serverless premium pricing tier. Premium tier pricing is charged at a higher cost than standard tier pricing, but there is no additional charge for Native Query Execution. For more information, see Dataproc Serverless pricing.

You can enable premium tier resource allocation and pricing for the batch and interactive session resources by setting the following resource allocation tier properties to premium when you submit a Spark batch workload or an interactive session.

You configure Native Query Execution by setting Native Query Execution properties on a batch workload, interactive session, or session template, and then submitting the workload or running the interactive session in a notebook.

Console

  1. In the Google Cloud console:

    1. Go to Dataproc Batches.
    2. Click Create to open the Create batch page.
  2. Select and fill in the following fields to configure the batch for Native Query Execution:

  3. Fill in, select, or confirm other batch workloads settings. See Submit a Spark batch workload.

  4. Click SUBMIT to run the Spark batch workload.

gcloud

Set the following gcloud CLI gcloud dataproc batches submit spark command flags to configure the batch workload for Native Query Execution:

gcloud dataproc batches submit spark \
    --project=PROJECT_ID \
    --region=REGION \
    --version=VERSION \
    --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
    --class=org.apache.spark.examples.SparkPi \
    --properties=spark.dataproc.runtimeEngine=native,spark.dataproc.driver.compute.tier=premium,spark.dataproc.executor.compute.tier=premium \
    OTHER_FLAGS_AS_NEEDED

Notes:

API

Set the following Dataproc API fields to configure the batch workload for Native Query Execution:

Tuning your Native Query Execution workload

Dataproc Serverless Native Query Execution can be tuned further using the following properties:

Batch Property When to use
spark.driver.memory
spark.driver.memoryOverhead
To tune the memory provided to spark driver process
spark.executor.memory
spark.executor.memoryOverhead
spark.memory.offHeap.size
To tune the memory provided to onheap/offheap memory of executor process
spark.dataproc.driver.disk.tier
spark.dataproc.driver.disk.size
To configure premium disk tier and size for driver
spark.dataproc.executor.disk.tier
spark.dataproc.executor.disk.size
To configure premium disk tier and size for executor

Native Query Execution properties

  • spark.dataproc.runtimeEngine=native (Required): The workload runtime engine must be set to native to override the default spark runtime engine.

  • version (Required): The workload must use Spark runtime version 1.2.26+, 2.2.26+, or a later major runtime version.

  • Premium compute tiers (Required): The spark.dataproc.spark.driver.compute.tier and spark.dataproc.executor.compute.tier properties must be set to premium.

  • Premium disk tiers (Optional and Recommended): Premium disk tiers use columnar instead of row-based shuffle to provide better performance. For better shuffle I/O throughput, use the driver and executor premium disk tiers with a sufficiently large disk size to accommodate shuffle files.

  • Memory (Optional): If you have configured Native Query Execution engine without configuring both off-heap memory (spark.memory.offHeap.size) and on-heap memory (spark.executor.memory), the Native Query Execution engine takes a default 4g amount of memory and divides it in a 6:1 ratio between off-heap and on-heap memory.

    If you decide to configure memory when using Native Query Execution, you can do so in either of the following ways:

    • Configure off-heap memory only (spark.memory.offHeap.size) with a specified value. Native Query Execution will uses the specified value as off-heap memory, and allocates an additional 1/7th of the off-heap memory value as on-heap memory.

    • Configure both on-heap memory (spark.executor.memory) and off-heap memory (spark.memory.offHeap.size). The amount you allocate to off-heap memory must be greater than the amount you allocate to on-heap memory. Recommendation: Allocate off-heap to on-heap memory in a 6:1 ratio.

    Sample values:

    Memory settings without Native Query Execution Recommended memory settings with Native Query Execution
    spark.executor.memory spark.memory.offHeap.size spark.executor.memory
    7g 6g 1g
    14g 12g 2g
    28g 24g 4g
    56g 48g 8g

Native Query Execution qualification tool

You can run the Dataproc Native Query Execution qualification tool, run_qualification_tool.sh, to identify workloads that can achieve faster runtimes with Native Query Execution. The tool analyzes the Spark event files generated by batch workload applications, and then estimates potential runtime savings that each workload application can obtain with Native Query Execution.

Run the qualification tool

Perform the following steps to run the tool against Dataproc Serverless batch workload event files.

1.Copy the run_qualification_tool.sh into a local directory that contains the Spark event files to analyze.

  1. Run the qualification tool to analyze one event file or a set of event files contained in the script directory.

    ./run_qualification_tool.sh -f EVENT_FILE_PATH/EVENT_FILE_NAME \
        -o CUSTOM_OUTPUT_DIRECTORY_PATH \
        -k SERVICE_ACCOUNT_KEY  \
        -x MEMORY_ALLOCATEDg  \
        -t PARALLEL_THREADS_TO_RUN
    

    Flags and values:

    -f (required): See Spark event file locations to locate Spark workload event files.

    • EVENT_FILE_PATH (required unless EVENT_FILE_NAME is specified): Path of the event file to analyze. If not provided, the event file path is assumed to be the current directory.

    • EVENT_FILE_NAME (required unless EVENT_FILE_PATH is specified): Name of the event file to analyze. If not provided, the event files found recursively in the EVENT_FILE_PATH are analyzed.

    -o(optional): If not provided, the tool creates or uses an existing output directory under the current directory to place output files.

    • CUSTOM_OUTPUT_DIRECTORY_PATH: Output directory path to output files.

    -k (optional):

    • SERVICE_ACCOUNT_KEY: The service account key in JSON format if needed to access the EVENT_FILE_PATH.

    -x (optional):

    • MEMORY_ALLOCATED: Memory in gigabytes to allocate to the tool. By default, the tool uses 80% of the free memory available in the system and all the available machine cores.

    -t(optional):

    • PARALLEL_THREADS_TO_RUN: The N=number of parallel threads for the tool to execute. By default, the tool executes all cores.

    Example command usage:

    ./run_qualification_tool.sh -f gs://dataproc-temp-us-east1-9779/spark-job-history \
        -o perfboost-output -k /keys/event-file-key -x 34g -t 5
    

    In this example, the qualification tool traverses the gs://dataproc-temp-us-east1-9779/spark-job-history directory, and analyzes Spark event files contained with this directories and its subdirectories. Access to the directory is provided the /keys/event-file-key. The tool uses 34 GB memory, for execution, and runs 5 parallel threads.

Qualification tool output files

Once analysis is complete, the qualification tool places the following output files in a perfboost-output directory in the current directory:

AppsRecommendedForBoost.tsv output file

The following table shows the contents of a sample AppsRecommendedForBoost.tsv output file. It contains a row for each analysed application.

Sample AppsRecommendedForBoost.tsv output file:

applicationId applicationName rddPercentage unsupportedSqlPercentage totalTaskTime supportedTaskTime supportedSqlPercentage recommendedForBoost expectedRuntimeReduction
app-2024081/batches/083f6196248043938-000 projects/example.com:dev/locations/us-central1
6b4d6cae140f883c0
11c8e
0.00% 0.00% 548924253 548924253 100.00% TRUE 30.00%
app-2024081/batches/60381cab738021457-000 projects/example.com:dev/locations/us-central1
474113a1462b426bf
b3aeb
0.00% 0.00% 514401703 514401703 100.00% TRUE 30.00%

Column descriptions:

  • applicationId: The ApplicationID of the Spark application. Use this to identify the corresponding batch workload.

  • applicationName: The name of the Spark application.

  • rddPercentage: The percentage of RDD operations in the application. RDD operations are not supported by Native Query Execution.

  • unsupportedSqlPercentage: Percentage of SQL operations not supported by Native Query Execution.

  • totalTaskTime: Cumulative task time of all tasks executed during the application run.

  • supportedTaskTime: The total task time supported by Native Query Execution.

The following columns provide the important information to help you determine if Native Query Execution can benefit your batch workload:

  • supportedSqlPercentage: The percentage of SQL operations supported by native query execution. The higher the percentage, the more runtime reduction that can be achieved by running the application with Native Query Execution.

  • recommendedForBoost: If TRUE, running the application with native query execution is recommended. If recommendedForBoost is FALSE, don't use Native Query Execution on the batch workload.

  • expectedRuntimeReduction: The expected percentage reduction in application runtime when you run the application with Native Query Execution.

UnsupportedOperators.tsv output file.

The UnsupportedOperators.tsv output file contains a list of operators used in workload applications that are not supported by Native Query Execution. Each row in the output file lists an unsupported operator.

Column descriptions:

  • unsupportedOperator: The name of the operator that is not supported by Native Query Execution.

  • cumulativeCpuMs: The number of CPU milliseconds consumed during the execution of the operator. This value reflects the relative importance of the operator in the application.

  • count: The number of times the operator is used in the application.

Run the qualification tool across projects

This section provides instructions to run a script to run the qualification tool to analyse batch workloads Spark event files from multiple projects.

Script requirements and limitations:

  • Run the script on Linux machines:
    • Java Version >=11 must be installed as the default Java version.
  • Since logs in Cloud Logging have a 30-day TTL, Spark event files from batch workloads run more than 30 days ago cannot be analysed.

To run the qualification tool across projects, perform the following steps.

  1. Download the list-batches-and-run-qt.sh script, and copy it to your local machine.

  2. Change script permissions.

    chmod +x list-batches-and-run-qt.sh
    
  3. Prepare a project input file list to pass to the script for analysis. Create the text file by adding one row in the following format for each project and region with batch workload Spark event files to analyze.

    -r REGION -s START_DATE -e END_DATE -p PROJECT_ID -l LIMIT_MAX_BATCHES -k KEY_PATH
    

    Notes:

    -r (required):

    • REGION: Region where the batches in the project are submitted.

    -s (mandatory): Format: yyyy-mm-dd. You can add an optional 00:00:00 time segment.

    • START_DATE: Only batch workloads created after the start date are analyzed. Batches are analysed in descending order of batch creation time—the most recent batches are analyzed first.

    -e (optional): Format: yyyy-mm-dd. You can add an optional 00:00:00 time segment.

    • END_DATE: If you specify this, only batch workloads created before or on the end date are analyzed. If not specified, all batches created after the START_DATE are analyzed. Batches are analysed in descending order of batch creation time—the most recent batches are analyzed first.

    -l (optional):

    • LIMIT_MAX_BATCHES: The maximum number of batches to analyze. You can use this option in combination with START-DATE and END-DATE to analyze a limited number of batches created between the specified dates.

      If -l is not specified, the default number of up to 100 batches are analysed.

    -k (optional):

    • KEY_PATH: A local path that contains Cloud Storage access keys for the workload Spark event files.

    Input file example:

    -r us-central1 -s 2024-08-21 -p project1 -k key1
    -r us-east1 -s 2024-08-21 -e 2024-08-23 -l 50 -p project2 -k key2
    

    Notes:

    • Row 1: Up to the most recent 100 (default) Spark event files in project1 in the us-central1 region with a creation time after 2024-08-21 00:00:00 AM will be analysed. key1 allows access to the files in Cloud Storage.

    • Row 2: Up to the most recent 50 Spark event files in project2 in the us-eastl1 region with a creation time after 2024-08-21 00:00:00 AM and before or on 2024-08-23 11:59:59 PM will be analysed. key2 allows access to the event files in Cloud Storage.

  4. Run the list-batches-and-run-qt.sh script. Analysis are output in the file AppsRecommendedForBoost.tsv and UnsupportedOperators.tsv files.

    ./list-batches-and-run-qt.sh PROJECT_INPUT_FILE_LIST \
        -x MEMORY_ALLOCATED \
        -o CUSTOM_OUTPUT_DIRECTORY_PATH \
        -t PARALLEL_THREADS_TO_RUN
    

    Notes:

Spark event file locations

Perform any of the following steps to find the Spark event files for Dataproc Serverless batch workloads:

  1. In Cloud Storage, find the spark.eventLog.dir for the workload, then download it.

    1. If you can't find the spark.eventLog.dir, set the spark.eventLog.dir to a Cloud Storage location, and then rerun the workload and download the spark.eventLog.dir.
  2. If you have configured Spark History Server for the batch job:

    1. Go to the Spark History Server, then select the workload.
    2. Click Download in the Event Log column.

When to use Native Query Execution

Use Native Query Execution in the following scenarios:

Spark Dataframe APIs and Spark SQL queries that read data from Parquet and ORC files.
Workloads recommended by the Native Query Execution qualification tool.

When not to use Native Query Execution

Don't use Native Query Execution in the following scenarios since doing so may not achieve workload runtime reduction, and may cause failure or regression:

  • Workloads not recommended by the Native Query Execution qualification tool.
  • Spark RDD, UDF, and ML workloads
  • Write workloads
  • File formats other than Parquet and ORC
  • Inputs of the following data types:
    • Timestamp, TinyInt, Byte, Struct, Array, Map: ORC and Parquet
    • VarChar, Char: ORC
  • Queries that contain regular expressions
  • Workloads that use a Requester Pays bucket
  • Non-default Cloud Storage configuration settings. Native Query Execution uses many defaults even when overridden.

Limitations

Enabling Native Query Execution in the following scenarios can throw exceptions, raise Spark incompatibilities, or cause the workload to fallback to the default Spark engine.

Fallbacks

Native Query Execution in the following the execution can result in workload fallback to the Spark execution engine, resulting in regression or failure.

  • ANSI: If ANSI mode is enabled, execution falls back to Spark.

  • Case-sensitive mode: Native Query Execution supports the Spark default case-insensitive mode only. If case-sensitive mode is enabled, incorrect results can occur.

  • Partitioned table scan: Native Query Execution supports the partitioned table scan only when the path contains the partition information, otherwise the workload falls back to the Spark execution engine.

Incompatible behavior

Incompatible behavior or incorrect results can result when using Native query execution in the following cases:

  • JSON functions: Native Query Execution supports strings surrounded by double quotes, not single quotes. Incorrect results occur with single quotes. Using "*" in the path with the get_json_object function returns NULL.

  • Parquet read configuration:

    • Native Query Execution treats spark.files.ignoreCorruptFiles as set to the default false value, even when set to true.
    • Native Query Execution ignores spark.sql.parquet.datetimeRebaseModeInRead, and returns only the Parquet file contents. Differences between the legacy hybrid (Julian Gregorian) calendar and the Proleptic Gregorian calendar are not considered. Spark results can differ.
  • NaN: Not supported. Unexpected results can occur, for example, when using NaN in a numeric comparison.

  • Spark columnar reading: A fatal error can occur due since the Spark columnar vector is incompatible with Native Query Execution.

  • Spill: When shuffle partitions are set to a large number, the spill-to-disk feature can trigger an OutOfMemoryException. If this occurs, reducing the number of partitions can eliminate this exception.