借助原生查询执行功能加快 Dataproc Serverless 批量工作负载和交互式会话的速度

宣布原生查询执行功能正式发布 (GA)。该功能支持 Spark Dataframe API、从 Parquet 和 ORC 文件读取数据的 Spark SQL 查询,以及原生查询执行资格认证工具推荐的工作负载。如有关于其他用例的问题,请发送电子邮件至 dataproc-pms@google.com。

本文档介绍了如何启用在高级价格层级中运行的 Dataproc Serverless 批处理工作负载和 Interactive 会话,以使用原生查询执行。

如何将原生查询执行与高级层级定价搭配使用

Dataproc Serverless 原生查询执行仅适用于在 Dataproc Serverless 高级价格层级中运行的批处理工作负载和 Interactive 会话。优质层级的价格高于标准层级的价格,但原生查询执行不会产生额外费用。如需了解详情,请参阅 Dataproc Serverless 价格

您可以在提交 Spark 批处理工作负载或交互式会话时,将以下资源分配层级属性设置为 premium,以便为批处理和交互式会话资源启用高级层级资源分配和定价。

您可以通过在批量工作负载、交互式会话会话模板上设置原生查询执行属性,然后提交工作负载或在笔记本中运行交互式会话来配置原生查询执行。

控制台

  1. 在 Google Cloud 控制台中:

    1. 前往 Dataproc 批处理
    2. 点击创建,打开创建批处理页面。
  2. 选择并填写以下字段,以便为原生查询执行配置批处理:

    • 容器
    • 执行器和驱动程序层级配置
      • 为所有层级(驱动程序计算层级执行器计算层级)选择 Premium
    • 属性:为以下原生查询执行属性输入以下 Key(属性名称)和 Value 对:
      spark.dataproc.runtimeEngine 本国的/原生的/土著
  3. 填写、选择或确认其他批处理工作负载设置。请参阅提交 Spark 批处理工作负载

  4. 点击提交以运行 Spark 批处理工作负载。

gcloud

设置以下 gcloud CLI gcloud dataproc batches submit spark 命令标志,以配置适用于原生查询执行的批处理工作负载:

gcloud dataproc batches submit spark \
    --project=PROJECT_ID \
    --region=REGION \
    --version=VERSION \
    --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
    --class=org.apache.spark.examples.SparkPi \
    --properties=spark.dataproc.runtimeEngine=native,spark.dataproc.driver.compute.tier=premium,spark.dataproc.executor.compute.tier=premium \
    OTHER_FLAGS_AS_NEEDED

注意:

API

设置以下 Dataproc API 字段,以配置适用于原生查询执行的批量工作负载:

优化原生查询执行工作负载

您可以使用以下属性进一步调整 Dataproc Serverless 原生查询执行:

批处理媒体资源 何时使用
spark.driver.memory
spark.driver.memoryOverhead
To tune the memory provided to spark driver process
spark.executor.memory
spark.executor.memoryOverhead
spark.memory.offHeap.size
To tune the memory provided to onheap/offheap memory of executor process
spark.dataproc.driver.disk.tier
spark.dataproc.driver.disk.size
To configure premium disk tier and size for driver
spark.dataproc.executor.disk.tier
spark.dataproc.executor.disk.size
To configure premium disk tier and size for executor

原生查询执行属性

  • spark.dataproc.runtimeEngine=native必需):必须将工作负载运行时引擎设置为 native,才能替换默认的 spark 运行时引擎。

  • version必需):工作负载必须使用 Spark 运行时版本 1.2.26 或更高版本、2.2.26 或更高版本,或更高的主要运行时版本。

  • 高级计算层级(必需):必须将 spark.dataproc.spark.driver.compute.tierspark.dataproc.executor.compute.tier 属性设置为 premium

  • 高级磁盘层级(可选且推荐):高级磁盘层级使用基于列的 Shuffle 而不是基于行的 Shuffle,从而提供更好的性能。为了提高 Shuffle I/O 吞吐量,请使用驱动程序和执行器高级磁盘层级,并确保磁盘大小足够大,能够容纳 Shuffle 文件。

  • 内存(可选):如果您配置了原生查询执行引擎,但未同时配置堆外内存 (spark.memory.offHeap.size) 和堆内内存 (spark.executor.memory),则原生查询执行引擎会采用默认的 4g 内存量,并按 6:1 比例将其分配给堆外内存和堆内内存。

    如果您决定在使用原生查询执行时配置内存,可以通过以下任一方式进行配置:

    • 使用指定值配置仅限堆外内存 (spark.memory.offHeap.size)。原生查询执行将使用指定值作为堆外内存,并将堆外内存值的 1/7th 额外分配为堆内内存。

    • 同时配置堆内内存 (spark.executor.memory) 和堆外内存 (spark.memory.offHeap.size)。您为堆外内存分配的内存量必须大于为堆内内存分配的内存量。建议:将堆外内存与堆内内存按 6:1 的比例分配。

    示例值:

    不使用原生查询执行的内存设置 使用原生查询执行功能的建议内存设置
    spark.executor.memory spark.memory.offHeap.size spark.executor.memory
    7g 6g 1g
    14g 12g 2g
    28 克 24 克 4g
    56 克 48 克 8g

原生查询执行资格认证工具

您可以运行 Dataproc 原生查询执行资格认证工具 run_qualification_tool.sh,以确定哪些工作负载可以通过原生查询执行实现更快的运行时。该工具会分析批处理工作负载应用生成的 Spark 事件文件,然后估算每个工作负载应用可以通过原生查询执行功能实现的潜在运行时节省量。

运行资格认证工具

请按以下步骤针对 Dataproc Serverless 批量工作负载事件文件运行该工具。

1. 将 run_qualification_tool.sh 复制到包含要分析的 Spark 事件文件的本地目录。

  1. 运行资格认证工具,以分析脚本目录中包含的一个事件文件或一组事件文件。

    ./run_qualification_tool.sh -f EVENT_FILE_PATH/EVENT_FILE_NAME \
        -o CUSTOM_OUTPUT_DIRECTORY_PATH \
        -k SERVICE_ACCOUNT_KEY  \
        -x MEMORY_ALLOCATEDg  \
        -t PARALLEL_THREADS_TO_RUN
    

    标志和值

    -f(必需):请参阅 Spark 事件文件位置,以查找 Spark 工作负载事件文件。

    • EVENT_FILE_PATH(除非指定了 EVENT_FILE_NAME,否则必需):要分析的事件文件的路径。如果未提供,系统会假定事件文件路径为当前目录。

    • EVENT_FILE_NAME(除非指定了 EVENT_FILE_PATH,否则必填):要分析的事件文件的名称。如果未提供,系统会分析在 EVENT_FILE_PATH 中递归查找的事件文件。

    -o(可选):如果未提供,该工具会在当前目录下创建或使用现有的 output 目录来放置输出文件。

    • CUSTOM_OUTPUT_DIRECTORY_PATH:输出文件的输出目录路径。

    -k(可选):

    • SERVICE_ACCOUNT_KEY:JSON 格式的服务账号密钥(如果需要访问 EVENT_FILE_PATH)。

    -x(可选):

    • MEMORY_ALLOCATED:要分配给该工具的内存(以 GB 为单位)。默认情况下,该工具会使用系统中可用内存的 80% 以及所有可用机器核心。

    -t(可选):

    • PARALLEL_THREADS_TO_RUN:N=工具要执行的并行线程数。默认情况下,该工具会执行所有核心。

    命令用法示例

    ./run_qualification_tool.sh -f gs://dataproc-temp-us-east1-9779/spark-job-history \
        -o perfboost-output -k /keys/event-file-key -x 34g -t 5
    

    在此示例中,资格认证工具会遍历 gs://dataproc-temp-us-east1-9779/spark-job-history 目录,并分析此目录及其子目录中包含的 Spark 事件文件。/keys/event-file-key 有权访问该目录。该工具使用 34 GB memory 进行执行,并运行 5 并行线程。

资格认证工具输出文件

分析完成后,资格认证工具会将以下输出文件放置在当前目录的 perfboost-output 目录中:

AppsRecommendedForBoost.tsv 输出文件

下表显示了示例 AppsRecommendedForBoost.tsv 输出文件的内容。其中包含与每个被分析的应用对应的一行。

AppsRecommendedForBoost.tsv 输出文件示例:

applicationId applicationName rddPercentage unsupportedSqlPercentage totalTaskTime supportedTaskTime supportedSqlPercentage recommendedForBoost expectedRuntimeReduction
app-2024081/batches/083f6196248043938-000 projects/example.com:dev/locations/us-central1
6b4d6cae140f883c0
11c8e
0.00% 0.00% 548924253 548924253 100.00% TRUE 30.00%
app-2024081/batches/60381cab738021457-000 projects/example.com:dev/locations/us-central1
474113a1462b426bf
b3aeb
0.00% 0.00% 514401703 514401703 100.00% TRUE 30.00%

列说明:

  • applicationId:Spark 应用的 ApplicationID。您可以使用此 ID 来识别相应的批处理工作负载。

  • applicationName:Spark 应用的名称。

  • rddPercentage:应用中 RDD 操作所占的百分比。原生查询执行不支持 RDD 操作。

  • unsupportedSqlPercentage: 原生查询执行不支持的 SQL 操作所占的百分比。

  • totalTaskTime:应用运行期间执行的所有任务的累计任务时间。

  • supportedTaskTime:原生查询执行支持的总任务时间。

以下列提供了重要信息,可帮助您确定原生查询执行是否有助于您的批处理工作负载:

  • supportedSqlPercentage:原生查询执行支持的 SQL 操作所占的百分比。该百分比越高,使用原生查询执行运行应用时可缩短的运行时间就越长。

  • recommendedForBoost:如果为 TRUE,建议使用原生查询执行方式运行应用。如果 recommendedForBoostFALSE请勿对批处理工作负载使用原生查询执行。

  • expectedRuntimeReduction:使用原生查询执行运行应用时,应用运行时的预期减少百分比。

UnsupportedOperators.tsv 输出文件。

UnsupportedOperators.tsv 输出文件包含工作负载应用中使用的运算符列表,这些运算符不受原生查询执行支持。输出文件中的每一行都列出了不受支持的运算符。

列说明:

  • unsupportedOperator:原生查询执行不支持的运算符的名称。

  • cumulativeCpuMs:运算符执行期间消耗的 CPU 毫秒数。此值反映了运算符在应用中的相对重要性。

  • count:操作符在应用中使用的次数。

跨项目运行资格要求工具

本部分介绍了如何运行脚本来运行资格认证工具,以分析来自多个项目的批处理工作负载 Spark 事件文件。

脚本要求和限制

  • 在 Linux 计算机上运行脚本:
    • 必须将 Java 版本 >=11 安装为默认 Java 版本。
  • 由于 Cloud Logging 中的日志有 30 天的 TTL,因此无法分析超过 30 天前运行的批处理工作负载中的 Spark 事件文件。

如需跨项目运行资格认证工具,请执行以下步骤。

  1. 下载 list-batches-and-run-qt.sh 脚本,并将其复制到本地机器。

  2. 更改脚本权限。

    chmod +x list-batches-and-run-qt.sh
    
  3. 准备一个项目输入文件列表,以便传递给脚本进行分析。创建文本文件,为每个包含要分析的批处理工作负载 Spark 事件文件的项目和区域添加一行,格式如下。

    -r REGION -s START_DATE -e END_DATE -p PROJECT_ID -l LIMIT_MAX_BATCHES -k KEY_PATH
    

    注意:

    -r(必需):

    • REGION:项目中批处理提交到的区域。

    -s(必需):格式:yyyy-mm-dd。您可以添加可选的 00:00:00 时间段。

    • START_DATE:仅分析开始日期之后创建的批量工作负载。系统会按批次创建时间的降序顺序分析批次,即先分析最近的批次。

    -e(可选):格式:yyyy-mm-dd。您可以添加可选的 00:00:00 时间段。

    • END_DATE:如果您指定此参数,系统只会分析在结束日期之前或当天创建的批量工作负载。如果未指定,系统会分析 START_DATE 之后创建的所有批次。系统会按批量创建时间的降序顺序分析批量,即先分析最近的批量。

    -l(可选):

    • LIMIT_MAX_BATCHES:要分析的批次数上限。您可以将此选项与 START-DATEEND-DATE 结合使用,以分析在指定日期之间创建的有限数量的批次。

      如果未指定 -l,则系统会分析最多 100 个批次的默认数量。

    -k(可选):

    • KEY_PATH:包含工作负载 Spark 事件文件的 Cloud Storage 访问密钥的本地路径。

    输入文件示例:

    -r us-central1 -s 2024-08-21 -p project1 -k key1
    -r us-east1 -s 2024-08-21 -e 2024-08-23 -l 50 -p project2 -k key2
    

    注意:

    • 第 1 行:系统将分析 us-central1 区域 project1 中创建时间在 2024-08-21 00:00:00 AM 之后的 Spark 事件文件(最多 100 个,默认值)。key1 允许访问 Cloud Storage 中的文件。

    • 第 2 行:系统将分析 us-eastl1 区域 project2 中创建时间在 2024-08-21 00:00:00 AM 之后且在 2024 年 8 月 23 日晚上 11:59:59 之前或当天的最多 50 个 Spark 事件文件。key2 允许访问 Cloud Storage 中的事件文件。

  4. 运行 list-batches-and-run-qt.sh 脚本: 分析结果会输出到 AppsRecommendedForBoost.tsvUnsupportedOperators.tsv 文件中。

    ./list-batches-and-run-qt.sh PROJECT_INPUT_FILE_LIST \
        -x MEMORY_ALLOCATED \
        -o CUSTOM_OUTPUT_DIRECTORY_PATH \
        -t PARALLEL_THREADS_TO_RUN
    

    注意:

    • PROJECT_INPUT_FILE_LIST:请参阅该部分中的第 3 条说明。

    • -x-o-t:请参阅运行资格认证工具

Spark 事件文件位置

执行以下任一步骤即可查找 Dataproc Serverless 批量工作负载的 Spark 事件文件:

  1. 在 Cloud Storage 中,找到相应工作负载的 spark.eventLog.dir,然后将其下载下来。

    1. 如果您找不到 spark.eventLog.dir,请将 spark.eventLog.dir 设置为 Cloud Storage 位置,然后重新运行工作负载并下载 spark.eventLog.dir
  2. 如果您已为批量作业配置了 Spark History Server,请执行以下操作:

    1. 前往 Spark History Server,然后选择相应工作负载。
    2. 点击“事件日志”列中的下载

何时使用原生查询执行

在以下情况下使用原生查询执行:

从 Parquet 和 ORC 文件读取数据的 Spark Dataframe API 和 Spark SQL 查询。
原生查询执行资格认证工具推荐的工作负载。

何时不应使用原生查询执行

请勿在以下场景中使用原生查询执行,因为这样做可能无法缩短工作负载运行时,并且可能会导致失败或回归:

  • 原生查询执行资格认证工具不推荐的工作负载。
  • Spark RDD、UDF 和机器学习工作负载
  • 写入工作负载
  • 除 Parquet 和 ORC 之外的文件格式
  • 以下数据类型的输入:
    • Timestamp, TinyInt, Byte, Struct, Array, Map:ORC 和 Parquet
    • VarChar, Char:ORC
  • 包含正则表达式的查询
  • 使用请求者付款存储桶的工作负载
  • 非默认 Cloud Storage 配置设置。即使被替换,原生查询执行也会使用许多默认值。

限制

在以下场景中启用原生查询执行可能会抛出异常、引发 Spark 不兼容问题,或导致工作负载回退到默认的 Spark 引擎。

后备广告

执行后进行的原生查询执行可能会导致工作负载回退到 Spark 执行引擎,从而导致回归或失败。

  • ANSI:如果启用了 ANSI 模式,则执行会回退到 Spark。

  • 区分大小写模式:原生查询执行仅支持 Spark 默认的区分大小写模式。如果启用了区分大小写模式,则可能会出现错误的结果。

  • 分区表扫描:只有当路径包含分区信息时,原生查询执行才支持分区表扫描,否则工作负载会回退到 Spark 执行引擎。

不兼容的行为

在以下情况下,使用原生查询执行可能会导致不兼容的行为或不正确的结果:

  • JSON 函数:原生查询执行支持用英文双引号(而非单引号)括起来的字符串。使用单引号会出现错误结果。在路径中使用“*”与 get_json_object 函数结合使用会返回 NULL

  • Parquet 读取配置

    • 原生查询执行会将 spark.files.ignoreCorruptFiles 视为设置为默认 false 值,即使设置为 true 也是如此。
    • 原生查询执行会忽略 spark.sql.parquet.datetimeRebaseModeInRead,并且仅返回 Parquet 文件内容。不考虑旧版混合(儒略公历)日历和公历前推日历之间的差异。Spark 结果可能会有所不同。
  • NaN:不受支持。例如,在数字比较中使用 NaN 时,可能会出现意外结果。

  • Spark 列式读取:由于 Spark 列式矢量与原生查询执行不兼容,因此可能会发生严重错误。

  • 溢出:将 shuffle 分区设置为一个大数时,溢出到磁盘功能可能会触发 OutOfMemoryException。如果发生这种情况,可以通过减少分区数量来消除此异常。