利用原生查询执行功能加快批处理工作负载和交互式会话的运行速度

本文档介绍何时以及如何启用原生查询执行,以加快 Serverless for Apache Spark 批处理工作负载和交互式会话的运行速度。

原生查询执行要求

Serverless for Apache Spark 原生查询执行仅适用于使用 1.2.26+2.2.26+ 或更高版本 Spark 运行时版本的批处理工作负载和交互式会话,且这些工作负载和会话在 Serverless for Apache Spark 高级价格层中运行。高级层级的价格高于标准层级的价格,但本地查询执行功能不会产生额外费用。如需了解价格信息,请参阅 Serverless for Apache Spark 价格

原生查询执行属性

本部分列出了必需和可选的 Spark 资源分配属性,您可以使用这些属性为批处理工作负载或交互式会话启用和自定义原生查询执行。

必需的房源设置

  • spark.dataproc.runtimeEngine=native:工作负载运行时引擎必须设置为 native,才能替换默认的 spark 运行时引擎。

  • spark.dataproc.spark.driver.compute.tier=premiumspark.dataproc.executor.compute.tier=premium:这些定价层级属性必须设置为高级定价层级。

可选的资源分配属性

  • spark.dataproc.driver.disk.tierspark.dataproc.driver.disk.sizespark.dataproc.executor.disk.tierspark.dataproc.executor.disk.size:使用这些属性可为 Spark 驱动程序和执行程序进程设置和配置高级磁盘层级和大小。

    高级磁盘层级使用基于列而非基于行的 shuffle 来提供更好的性能。为了获得更好的 Shuffle I/O 吞吐量,请使用驱动程序和执行程序的高级磁盘层,并使用足够大的磁盘大小来容纳 Shuffle 文件。

  • spark.driver.memoryspark.driver.memoryOverheadspark.executor.memoryspark.executor.memoryOverheadspark.memory.offHeap.size:使用这些属性可调整提供给 Spark 驱动程序和执行程序进程的内存。

    您可以通过以下任一方式配置内存:

    • 选项 1:仅配置堆外内存 (spark.memory.offHeap.size),并指定一个值。原生查询执行将使用指定的值作为堆外内存,并额外分配堆外内存值的 1/7th 作为堆内内存 (spark.executor.memory)。

    • 选项 2:同时配置堆内内存 (spark.executor.memory) 和堆外内存 (spark.memory.offHeap.size)。分配给堆外内存的量必须大于分配给堆内内存的量。

    如果您未同时配置堆外内存 (spark.memory.offHeap.size) 和堆内内存 (spark.executor.memory),原生查询执行引擎会按照 6:1 的比例在堆外内存和堆内内存之间分配默认的 4g 内存量。

    建议:以 6:1 的比例将堆外内存分配到堆内内存。

    示例:

    不含原生查询执行的内存设置 使用原生查询执行时的推荐内存设置
    spark.executor.memory spark.memory.offHeap.size spark.executor.memory
    7g 6g 1g
    14g 12g 2G
    28g 24g 4g
    56g 48g 8g

运行资格赛工具

如需确定哪些批处理工作负载可以通过原生查询执行 (NQE) 实现更快的运行时,您可以使用资格鉴定工具。该工具会分析 Spark 事件日志,以估计潜在的运行时节省量,并识别 NQE 引擎不支持的任何操作。

Google Cloud 提供了两种运行资格认证分析的方法:资格认证作业和资格认证脚本。对于大多数用户,建议使用资格认证作业,该作业可自动发现和分析批处理工作负载。替代资格认证脚本适用于分析已知事件日志文件的特定使用情形。选择最适合您使用场景的方法:

  • 资格作业(推荐):这是主要且推荐的方法。 它是一个 PySpark 作业,可自动发现并分析一个或多个 Google Cloud 项目和区域中的近期批处理工作负载。如果您想进行广泛的分析,但又不想手动查找各个事件日志文件,请使用此方法。此方法非常适合大规模评估 NQE 的适用性。

  • 资格认证脚本(替代方法):这是一种替代方法,适用于高级或特定用例。这是一个 shell 脚本,用于分析单个 Spark 事件日志文件或特定 Cloud Storage 目录中的所有事件日志。如果您知道要分析的事件日志的 Cloud Storage 路径,请使用此方法。

资格作业

资格认证作业通过以编程方式扫描 Serverless for Apache Spark 批量工作负载并提交分布式分析作业,简化了大规模分析。该工具会评估组织中的所有作业,无需手动查找和指定事件日志路径。

授予 IAM 角色

为了让资格认证作业能够访问批处理工作负载元数据并读取 Cloud Logging 中的 Spark 事件日志,运行工作负载的服务账号必须在所有待分析的项目中获得以下 IAM 角色:

提交资格作业

您可以使用 gcloud CLI 工具提交资格认证作业。该作业包含托管在公开 Cloud Storage 存储桶中的 PySpark 脚本和 JAR 文件。

您可以在以下任一执行环境中运行作业:

  • 作为 Serverless for Apache Spark 批量工作负载。这是一种简单、独立的作业执行方式。

  • 作为在 Dataproc on Compute Engine 集群上运行的作业。此方法有助于将作业集成到工作流中。

作业参数

参数 说明 是否必需? 默认值
--project-ids 单个项目 ID 或以英文逗号分隔的 Google Cloud 项目 ID 列表,用于扫描批处理工作负载。 资格认证作业运行的项目。
--regions 要在指定项目内扫描的单个区域或以英文逗号分隔的区域列表。 指定项目中的所有区域。
--start-time 用于过滤批次的开始日期。系统只会分析在此日期(格式:YYYY-MM-DD)当天或之后创建的批次。 未应用开始日期过滤条件。
--end-time 用于过滤批次的结束日期。系统只会分析在此日期(格式:YYYY-MM-DD)当天或之前创建的批次。 未应用任何结束日期过滤条件。
--limit 每个区域要分析的批次数量上限。系统会先分析最近的批次。 系统会分析符合其他过滤条件的所有批次。
--output-gcs-path 将写入结果文件的 Cloud Storage 路径(例如 gs://your-bucket/output/)。 无。
--input-file 用于批量分析的文本文件的 Cloud Storage 路径。如果提供,此实参会替换所有其他范围界定实参(--project-ids--regions--start-time--end-time--limit)。 无。

资格认证作业示例

  • 一个 Serverless for Apache Spark 批量作业,用于执行简单的临时分析。作业实参列在 -- 分隔符之后。

    gcloud dataproc batches submit pyspark gs://qualification-tool/performance-boost-qualification.py \
        --project=PROJECT_ID \
        --region=REGION \
        --jars=gs://qualification-tool/dataproc-perfboost-qualification-1.2.jar \
        -- \
        --project-ids=COMMA_SEPARATED_PROJECT_IDS \
        --regions=COMMA_SEPARATED_REGIONS \
        --limit=MAX_BATCHES \
        --output-gcs-path=gs://BUCKET
    
  • 一个 Serverless for Apache Spark 批量作业,用于分析在 us-central1 区域的 sample_project 中找到的最多 50 个最新批次,结果会写入 Cloud Storage 中的存储桶。作业实参列在 -- 分隔符之后。

    gcloud dataproc batches submit pyspark gs://qualification-tool/performance-boost-qualification.py \
        --project=PROJECT_ID \
        --region=US-CENTRAL1 \
        --jars=gs://qualification-tool/dataproc-perfboost-qualification-1.2.jar \
        -- \
        --project-ids=PROJECT_ID \
        --regions=US-CENTRAL1 \
        --limit=50 \
        --output-gcs-path=gs://BUCKET/
    
  • 提交到 Dataproc 集群的 Dataproc on Compute Engine 作业,用于在可重复或自动化的分析工作流中进行大规模批量分析。作业实参放置在上传到 Cloud Storage 中 BUCKETINPUT_FILE 中。此方法非常适合在一次运行中扫描不同项目和区域的不同日期范围或批次限制。

    gcloud dataproc jobs submit pyspark gs://qualification-tool/performance-boost-qualification.py \
        --cluster=CLUSTER_NAME \
        --region=REGION \
        --jars=gs://qualification-tool/dataproc-perfboost-qualification-1.2.jar \
        -- \
        --input-file=gs://INPUT_FILE \
        --output-gcs-path=gs://BUCKET
    

    注意:

    INPUT_FILE:文件中的每一行都表示一个不同的分析请求,并使用单字母标志后跟其值的格式,例如 -p PROJECT-ID -r REGION -s START_DATE -e END_DATE -l LIMITS

    输入文件内容示例:

    -p project1 -r us-central1 -s 2024-12-01 -e 2024-12-15 -l 100
    -p project2 -r europe-west1 -s 2024-11-15 -l 50
    

    这些实参指示工具分析以下两个范围:

    • 在 2025 年 12 月 1 日至 2025 年 12 月 15 日期间创建的 us-central1 区域中 project1 内的批次(最多 100 个)。
    • 在 2025 年 11 月 15 日或之后创建的 europe-west1 区域中的 project2 中,最多可有 50 个批次。

资格赛脚本

如果您有要分析的特定 Spark 事件日志的直接 Cloud Storage 路径,请使用此方法。此方法需要您在本地机器或 Compute Engine 虚拟机上下载并运行 shell 脚本 run_qualification_tool.sh,该虚拟机已配置为可访问 Cloud Storage 中的事件日志文件。

执行以下步骤,针对 Serverless for Apache Spark 批处理工作负载事件文件运行脚本。

1. 将 run_qualification_tool.sh 复制到包含要分析的 Spark 事件文件的本地目录中。

  1. 运行资格认证脚本,以分析脚本目录中包含的一个或一组事件文件。

    ./run_qualification_tool.sh -f EVENT_FILE_PATH/EVENT_FILE_NAME \
        -o CUSTOM_OUTPUT_DIRECTORY_PATH \
        -k SERVICE_ACCOUNT_KEY  \
        -x MEMORY_ALLOCATEDg  \
        -t PARALLEL_THREADS_TO_RUN
    

    标志和值

    -f(必需):请参阅 Spark 事件文件位置,找到 Spark 工作负载事件文件。

    • EVENT_FILE_PATH(必需,除非指定了 EVENT_FILE_NAME):要分析的事件文件的路径。如果未提供,则假定事件文件路径为当前目录。

    • EVENT_FILE_NAME(除非指定了 EVENT_FILE_PATH,否则为必需参数):要分析的事件文件的名称。如果未提供,则分析在 EVENT_FILE_PATH 中以递归方式找到的事件文件。

    -o(可选):如果未提供,该工具会在当前目录下创建或使用现有的 output 目录来放置输出文件。

    • CUSTOM_OUTPUT_DIRECTORY_PATH:输出文件的输出目录路径。

    -k(可选):

    • SERVICE_ACCOUNT_KEY:JSON 格式的服务账号密钥(如果需要访问 EVENT_FILE_PATH)。

    -x(可选):

    • MEMORY_ALLOCATED:要分配给工具的内存量(以 GB 为单位)。 默认情况下,该工具会使用系统中 80% 的可用内存和所有可用的机器核心。

    -t(可选):

    • PARALLEL_THREADS_TO_RUN:工具执行所用的并行线程数。默认情况下,该工具会执行所有核心。

    命令用法示例

    ./run_qualification_tool.sh -f gs://dataproc-temp-us-east1-9779/spark-job-history \
        -o perfboost-output -k /keys/event-file-key -x 34g -t 5
    

    在此示例中,资格认证工具会遍历 gs://dataproc-temp-us-east1-9779/spark-job-history 目录,并分析此目录及其子目录中包含的 Spark 事件文件。该目录的访问权限由 /keys/event-file-key 提供。该工具使用 34 GB memory 进行执行,并运行 5 个并行线程。

    Spark 事件文件位置

执行以下任一步骤,查找 Serverless for Apache Spark 批处理工作负载的 Spark 事件文件:

  1. 在 Cloud Storage 中,找到工作负载的 spark.eventLog.dir,然后下载。

    1. 如果您找不到 spark.eventLog.dir,请将 spark.eventLog.dir 设置为 Cloud Storage 位置,然后重新运行工作负载并下载 spark.eventLog.dir
  2. 如果您已为批量作业配置 Spark 历史记录服务器

    1. 前往 Spark History Server,然后选择相应的工作负载。
    2. 点击事件日志列中的下载

资格认证工具输出文件

资格认证作业或脚本分析完成后,资格认证工具会将以下输出文件放置在当前目录的 perfboost-output 目录中:

AppsRecommendedForBoost.tsv 输出文件

下表显示了示例 AppsRecommendedForBoost.tsv 输出文件的内容。其中包含每个已分析的应用对应的一行。

AppsRecommendedForBoost.tsv 输出文件示例:

applicationId applicationName rddPercentage unsupportedSqlPercentage totalTaskTime supportedTaskTime supportedSqlPercentage recommendedForBoost expectedRuntimeReduction
app-2024081/batches/083f6196248043938-000 projects/example.com:dev/locations/us-central1
6b4d6cae140f883c0
11c8e
0.00% 0.00% 548924253 548924253 100.00% TRUE 30.00%
app-2024081/batches/60381cab738021457-000 projects/example.com:dev/locations/us-central1
474113a1462b426bf
b3aeb
0.00% 0.00% 514401703 514401703 100.00% TRUE 30.00%

列说明:

  • applicationId:Spark 应用的 ApplicationID。用于标识相应的批处理工作负载。

  • applicationName:Spark 应用的名称。

  • rddPercentage:应用中 RDD 操作的百分比。 原生查询执行不支持 RDD 操作。

  • unsupportedSqlPercentage: 不受原生查询执行支持的 SQL 操作的百分比。

  • totalTaskTime:应用运行期间执行的所有任务的累计任务时间。

  • supportedTaskTime:原生查询执行支持的总任务时间。

以下列提供了重要信息,可帮助您确定原生查询执行是否有利于您的批处理工作负载:

  • supportedSqlPercentage:原生查询执行支持的 SQL 操作百分比。百分比越高,通过使用原生查询执行功能运行应用可实现的运行时缩减幅度就越大。

  • recommendedForBoost:如果值为 TRUE,建议使用原生查询执行来运行应用。如果 recommendedForBoostFALSE,则不要在批处理工作负载上使用原生查询执行。

  • expectedRuntimeReduction:使用原生查询执行功能运行应用时,应用运行时预计会减少的百分比。

UnsupportedOperators.tsv 输出文件。

UnsupportedOperators.tsv 输出文件包含工作负载应用中使用的不受原生查询执行支持的运算符列表。输出文件中的每一行都列出了一个不受支持的运算符。

列说明:

  • unsupportedOperator:原生查询执行不支持的运算符的名称。

  • cumulativeCpuMs:运算符执行期间消耗的 CPU 毫秒数。此值反映了运算符在应用中的相对重要性。

  • count:运算符在应用中的使用次数。

使用原生查询执行

您可以在创建运行应用的批量工作负载交互式会话会话模板时设置原生查询执行属性,以便在应用中使用原生查询执行。

将原生查询执行与批量工作负载搭配使用

您可以使用 Google Cloud 控制台、Google Cloud CLI 或 Dataproc API 在批处理工作负载上启用原生查询执行。

控制台

使用 Google Cloud 控制台在批处理工作负载上启用原生查询执行。

  1. 在 Google Cloud 控制台中:

    1. 前往 Dataproc 批次
    2. 点击创建,打开创建批处理页面。
  2. 选择并填写以下字段,以配置原生查询执行的批处理:

    • 容器
    • 执行器和驱动程序层级配置
      • 为所有层级(驱动程序计算层级执行器计算层级)选择 Premium
    • 属性:输入 Key(属性名称)和 Value 对,以指定 Native Query Execution 属性
      spark.dataproc.runtimeEngine 本国的/原生的/土著
  3. 填写、选择或确认其他批量工作负载设置。请参阅提交 Spark 批处理工作负载

  4. 点击提交以运行 Spark 批处理工作负载。

gcloud

设置以下 gcloud CLI gcloud dataproc batches submit spark 命令标志,以配置用于原生查询执行的批处理工作负载:

gcloud dataproc batches submit spark \
    --project=PROJECT_ID \
    --region=REGION \
    --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
    --class=org.apache.spark.examples.SparkPi \
    --properties=spark.dataproc.runtimeEngine=native,spark.dataproc.driver.compute.tier=premium,spark.dataproc.executor.compute.tier=premium \
    OTHER_FLAGS_AS_NEEDED

注意:

  • PROJECT_ID:您的 Google Cloud 项目 ID。 项目 ID 列在 Google Cloud 控制台信息中心项目信息部分中。
  • REGION:用于运行工作负载的可用 Compute Engine 区域
  • OTHER_FLAGS_AS_NEEDED:请参阅提交 Spark 批处理工作负载

API

设置以下 Dataproc API 字段,以配置用于原生查询执行的批处理工作负载:

何时使用原生查询执行

在以下场景中使用原生查询执行:

  • 从 Parquet 和 ORC 文件读取数据的 Spark Dataframe API、Spark Dataset API 和 Spark SQL 查询。输出文件格式不会影响原生查询执行性能。

  • 原生查询执行资格认证工具推荐的工作负载。

何时不应使用原生查询执行

以下数据类型的输入:

  • 字节:ORC 和 Parquet
  • 时间戳:ORC
  • 结构体、数组、映射:Parquet

限制

在以下场景中启用原生查询执行可能会导致异常、Spark 不兼容或工作负载回退到默认的 Spark 引擎。

后备广告

在以下执行中执行原生查询可能会导致工作负载回退到 Spark 执行引擎,从而导致回归或失败。

  • ANSI:如果启用了 ANSI 模式,执行会回退到 Spark。

  • 区分大小写模式:原生查询执行仅支持 Spark 默认的不区分大小写模式。如果启用了区分大小写模式,可能会出现不正确的结果。

  • 分区表扫描:仅当路径包含分区信息时,原生查询执行才支持分区表扫描,否则工作负载会回退到 Spark 执行引擎。

不兼容的行为

在以下情况下使用原生查询执行时,可能会导致不兼容的行为或错误的结果:

  • JSON 函数:原生查询执行支持用英文双引号而非英文单引号括起来的字符串。使用单引号时出现错误结果。在包含 get_json_object 函数的路径中使用“*”会返回 NULL

  • Parquet 读取配置

    • 即使将 spark.files.ignoreCorruptFiles 设置为 true,原生查询执行也会将其视为设置为默认值 false
    • 原生查询执行会忽略 spark.sql.parquet.datetimeRebaseModeInRead,并且仅返回 Parquet 文件内容。不考虑旧版混合(儒略格里高利)日历与 Proleptic 格里高利日历之间的差异。Spark 结果可能会有所不同。
  • NaN:不支持。例如,在数值比较中使用 NaN 时,可能会出现意外结果。

  • Spark 列式读取:由于 Spark 列式向量与原生查询执行不兼容,可能会发生严重错误。

  • 溢出:当 shuffle 分区设置为较大数量时,溢出到磁盘功能可能会触发 OutOfMemoryException。如果发生这种情况,减少分区数量可以消除此异常。