使用原生查詢執行功能,加快批次工作負載和互動式工作階段的速度

本文說明何時及如何啟用原生查詢執行功能,加快 Serverless for Apache Spark 批次工作負載和互動式工作階段的執行速度。

原生查詢執行作業規定

只有在 Serverless for Apache Spark 進階定價層級中,使用 1.2.26+2.2.26+ 或更新的 Spark 執行階段版本執行批次工作負載和互動式工作階段時,才能使用 Serverless for Apache Spark 原生查詢執行功能。進階級計價模式的費用高於標準級計價模式,但原生查詢執行作業不會產生額外費用。如要瞭解定價資訊,請參閱「Serverless for Apache Spark 定價」一文。

原生查詢執行屬性

本節列出可用於啟用及自訂批次工作負載或互動式工作階段原生查詢執行的必要和選用 Spark 資源分配屬性

必要房源設定

  • spark.dataproc.runtimeEngine=native:工作負載執行階段引擎必須設為 native,才能覆寫預設的 spark 執行階段引擎。

  • spark.dataproc.spark.driver.compute.tier=premiumspark.dataproc.executor.compute.tier=premium:這些定價層級屬性必須設為進階定價層級。

選填的資源分配屬性

  • spark.dataproc.driver.disk.tierspark.dataproc.driver.disk.sizespark.dataproc.executor.disk.tierspark.dataproc.executor.disk.size:使用這些屬性,為 Spark 驅動程式和執行器程序設定及配置進階磁碟層級和大小。

    進階磁碟層級會使用以資料欄為基礎的重組,而非以資料列為基礎的重組,以提供更優異的效能。如要提升隨機 I/O 總處理量,請使用磁碟大小足夠容納隨機檔案的驅動程式和執行器進階磁碟層。

  • spark.driver.memoryspark.driver.memoryOverheadspark.executor.memoryspark.executor.memoryOverheadspark.memory.offHeap.size:使用這些屬性調整提供給 Spark 驅動程式和執行器程序的記憶體。

    你可以透過下列任一方式設定記憶體:

    • 選項 1:只設定堆外記憶體 (spark.memory.offHeap.size),並指定值。原生查詢執行作業會將指定值做為堆外記憶體,並額外分配堆外記憶體值的 1/7th 做為堆內記憶體 (spark.executor.memory)。

    • 方法 2:同時設定堆積內記憶體 (spark.executor.memory) 和堆積外記憶體 (spark.memory.offHeap.size)。分配給堆積外記憶體的量必須大於分配給堆積內記憶體的量。

    如果未設定堆外記憶體 (spark.memory.offHeap.size) 和堆積記憶體 (spark.executor.memory),原生查詢執行引擎會以 6:1 的比例,在堆外和堆積記憶體之間分配預設的 4g 記憶體量。

    建議:以 6:1 的比例將堆外記憶體配置到堆內記憶體。

    範例:

    不含原生查詢執行的記憶體設定 使用原生查詢執行時建議的記憶體設定
    spark.executor.memory spark.memory.offHeap.size spark.executor.memory
    7g 6g 1g
    14g 12g 2g
    28g 24g 4g
    56g 48g 8g

執行資格工具

如要找出可透過原生查詢執行 (NQE) 縮短執行時間的批次工作負載,可以使用資格工具。這項工具會分析 Spark 事件記錄,估算潛在的執行階段節省量,並找出 NQE 引擎不支援的任何作業。

Google Cloud 提供兩種執行資格分析的方法:資格工作和資格指令碼。對於大多數使用者,建議採用資格工作,這項工作會自動探索及分析批次工作負載。替代資格指令碼可用於分析已知事件記錄檔的特定用途。請選擇最符合您用途的方法:

  • 資格職缺 (建議):這是主要且建議的方法。 這項 PySpark 工作會自動探索及分析一或多個 Google Cloud 專案和區域的近期批次工作負載。如要執行廣泛分析,但不需要手動尋找個別事件記錄檔,請使用這個方法。這個方法非常適合大規模評估 NQE 適用性。

  • 資格指令碼 (替代方法):這是進階或特定用途的替代方法。這個指令碼會分析單一 Spark 事件記錄檔,或特定 Cloud Storage 目錄中的所有事件記錄檔。如要分析事件記錄,請使用這個方法,並提供事件記錄的 Cloud Storage 路徑。

資格工作

資格工作會以程式輔助方式掃描 Serverless for Apache Spark 批次工作負載,並提交分散式分析工作,簡化大規模分析作業。這項工具會評估整個機構的工作,因此您不必手動尋找及指定事件記錄路徑。

授予 IAM 角色

如要讓資格工作存取批次工作負載中繼資料,並在 Cloud Logging 中讀取 Spark 事件記錄,執行工作負載的服務帳戶必須在所有待分析的專案中,獲得下列 IAM 角色:

提交資格工作

您可以使用 gcloud CLI 工具提交資格工作。這項工作包含 PySpark 指令碼和 JAR 檔案,這些檔案託管在公開的 Cloud Storage bucket 中。

您可以在下列任一執行環境中執行工作:

  • 以 Serverless for Apache Spark 批次工作負載的形式。這是簡單的獨立工作執行作業。

  • 以在 Dataproc on Compute Engine 叢集上執行的工作形式。這種方法有助於將工作整合至工作流程。

工作引數

引數 說明 是否必要 預設值
--project-ids 單一專案 ID 或以半形逗號分隔的 Google Cloud 專案 ID 清單,用於掃描批次工作負載。 執行資格工作所在的專案。
--regions 單一區域或以半形逗號分隔的區域清單,用於掃描指定專案。 指定專案中的所有地區。
--start-time 用於篩選批次的開始日期。系統只會分析在此日期當天或之後建立的批次 (格式:YYYY-MM-DD)。 未套用開始日期篩選條件。
--end-time 用於篩選批次的結束日期。系統只會分析在這個日期 (格式:YYYY-MM-DD) 之前建立的批次。 未套用結束日期篩選器。
--limit 每個區域可分析的批次數量上限。系統會優先分析最近的批次。 系統會分析符合其他篩選條件的所有批次。
--output-gcs-path 結果檔案的寫入路徑 (例如 gs://your-bucket/output/)。
--input-file 用於大量分析的文字檔案 Cloud Storage 路徑。如果提供這項引數,系統會覆寫所有其他定義範圍的引數 (--project-ids--regions--start-time--end-time--limit)。

資格工作範例

  • Serverless for Apache Spark 批次工作,可執行簡單的臨時分析。工作引數會列在 -- 分隔符號之後。

    gcloud dataproc batches submit pyspark gs://qualification-tool/performance-boost-qualification.py \
        --project=PROJECT_ID \
        --region=REGION \
        --jars=gs://qualification-tool/dataproc-perfboost-qualification-1.2.jar \
        -- \
        --project-ids=COMMA_SEPARATED_PROJECT_IDS \
        --regions=COMMA_SEPARATED_REGIONS \
        --limit=MAX_BATCHES \
        --output-gcs-path=gs://BUCKET
    
  • 無伺服器 Apache Spark 批次工作,可分析 us-central1 地區 sample_project 中最多 50 個最近的批次。結果會寫入 Cloud Storage 中的 bucket。工作引數會列在 -- 分隔符號之後。

    gcloud dataproc batches submit pyspark gs://qualification-tool/performance-boost-qualification.py \
        --project=PROJECT_ID \
        --region=US-CENTRAL1 \
        --jars=gs://qualification-tool/dataproc-perfboost-qualification-1.2.jar \
        -- \
        --project-ids=PROJECT_ID \
        --regions=US-CENTRAL1 \
        --limit=50 \
        --output-gcs-path=gs://BUCKET/
    
  • 提交至 Dataproc 叢集的 Dataproc on Compute Engine 工作,用於大規模、可重複或自動化分析工作流程中的大量分析。工作引數會放在 INPUT_FILE 中,並上傳至 Cloud Storage 中的 BUCKET。這個方法非常適合在單次執行中,掃描不同專案和區域的不同日期範圍或批次限制。

    gcloud dataproc jobs submit pyspark gs://qualification-tool/performance-boost-qualification.py \
        --cluster=CLUSTER_NAME \
        --region=REGION \
        --jars=gs://qualification-tool/dataproc-perfboost-qualification-1.2.jar \
        -- \
        --input-file=gs://INPUT_FILE \
        --output-gcs-path=gs://BUCKET
    

    注意:

    INPUT_FILE:檔案中的每一行代表不同的分析要求,並使用單一字母標記及其值的格式,例如 -p PROJECT-ID -r REGION -s START_DATE -e END_DATE -l LIMITS

    輸入檔案內容範例:

    -p project1 -r us-central1 -s 2024-12-01 -e 2024-12-15 -l 100
    -p project2 -r europe-west1 -s 2024-11-15 -l 50
    

    這些引數會引導工具分析下列兩個範圍:

    • 2025 年 12 月 1 日至 2025 年 12 月 15 日期間,在 us-central1 地區的 project1 中建立最多 100 個批次。
    • 在 2025 年 11 月 15 日當天或之後建立的 europe-west1 區域中,專案 2 最多可有 50 個批次。

資格賽腳本

如果您有要分析的特定 Spark 事件記錄的直接 Cloud Storage 路徑,請使用這個方法。這個方法需要您在本機電腦或 Compute Engine VM 上下載並執行殼層指令碼 run_qualification_tool.sh,且該 VM 已設定為可存取 Cloud Storage 中的事件記錄檔。

請按照下列步驟,針對 Serverless for Apache Spark 批次工作負載事件檔案執行指令碼。

1. 將 run_qualification_tool.sh 複製到包含要分析的 Spark 事件檔案的本機目錄。

  1. 執行資格指令碼,分析指令碼目錄中的一個或一組事件檔案。

    ./run_qualification_tool.sh -f EVENT_FILE_PATH/EVENT_FILE_NAME \
        -o CUSTOM_OUTPUT_DIRECTORY_PATH \
        -k SERVICE_ACCOUNT_KEY  \
        -x MEMORY_ALLOCATEDg  \
        -t PARALLEL_THREADS_TO_RUN
    

    旗標和值:

    -f (必要):請參閱「Spark 事件檔案位置」,找出 Spark 工作負載事件檔案。

    • EVENT_FILE_PATH (必要,除非指定 EVENT_FILE_NAME):要分析的事件檔案路徑。如未提供,系統會假設事件檔案路徑為目前目錄。

    • EVENT_FILE_NAME (必要,除非指定 EVENT_FILE_PATH): 要分析的事件檔案名稱。如未提供,系統會分析 EVENT_FILE_PATH 中以遞迴方式找到的事件檔案。

    -o(選用):如果未提供,工具會在目前目錄下建立或使用現有的 output 目錄,放置輸出檔案。

    • CUSTOM_OUTPUT_DIRECTORY_PATH:輸出檔案的輸出目錄路徑。

    -k (選填):

    • SERVICE_ACCOUNT_KEY:如果需要存取 EVENT_FILE_PATH,請提供 JSON 格式的服務帳戶金鑰

    -x (選填):

    • MEMORY_ALLOCATED:要分配給工具的記憶體 (以 GB 為單位)。根據預設,這項工具會使用系統中 80% 的可用記憶體,以及所有可用的機器核心。

    -t(選用):

    • PARALLEL_THREADS_TO_RUN:工具執行的平行執行緒數量。根據預設,這項工具會執行所有核心。

    指令用法範例:

    ./run_qualification_tool.sh -f gs://dataproc-temp-us-east1-9779/spark-job-history \
        -o perfboost-output -k /keys/event-file-key -x 34g -t 5
    

    在本例中,資格工具會遍歷 gs://dataproc-temp-us-east1-9779/spark-job-history 目錄,並分析這個目錄及其子目錄中包含的 Spark 事件檔案。目錄存取權由 /keys/event-file-key 提供。這項工具會使用 34 GB memory 執行作業,並執行 5 個平行執行緒。

    Spark 事件檔案位置

如要找出 Serverless for Apache Spark 批次工作負載的 Spark 事件檔案,請執行下列任一步驟:

  1. 在 Cloud Storage 中找出工作負載的 spark.eventLog.dir,然後下載。

    1. 如果找不到 spark.eventLog.dir,請將 spark.eventLog.dir 設為 Cloud Storage 位置,然後重新執行工作負載並下載 spark.eventLog.dir
  2. 如果您已為批次工作設定 Spark 記錄伺服器

    1. 前往 Spark 記錄伺服器,然後選取工作負載。
    2. 按一下「事件記錄」欄中的「下載」

資格評估工具輸出檔案

資格工作或指令碼分析完成後,資格工具會在目前目錄的 perfboost-output 目錄中放置下列輸出檔案:

AppsRecommendedForBoost.tsv 輸出檔案

下表顯示範例 AppsRecommendedForBoost.tsv 輸出檔案的內容。其中包含每個分析應用程式的資料列。

AppsRecommendedForBoost.tsv 輸出檔案範例:

applicationId applicationName rddPercentage unsupportedSqlPercentage totalTaskTime supportedTaskTime supportedSqlPercentage recommendedForBoost expectedRuntimeReduction
app-2024081/batches/083f6196248043938-000 projects/example.com:dev/locations/us-central1
6b4d6cae140f883c0
11c8e
0.00% 0.00% 548924253 548924253 100.00% TRUE 30.00%
app-2024081/batches/60381cab738021457-000 projects/example.com:dev/locations/us-central1
474113a1462b426bf
b3aeb
0.00% 0.00% 514401703 514401703 100.00% TRUE 30.00%

資料欄說明:

  • applicationId:Spark 應用程式的 ApplicationID。用來識別對應的批次工作負載。

  • applicationName:Spark 應用程式的名稱。

  • rddPercentage:應用程式中 RDD 作業的百分比。 原生查詢執行作業不支援 RDD 作業。

  • unsupportedSqlPercentage: 系統不支援原生查詢執行的 SQL 作業百分比。

  • totalTaskTime:應用程式執行期間執行的所有工作累計時間。

  • supportedTaskTime:原生查詢執行作業支援的總工作時間。

下列資料欄提供重要資訊,協助您判斷原生查詢執行是否能提升批次工作負載的效益:

  • supportedSqlPercentage原生查詢執行作業支援的 SQL 作業百分比。百分比越高,表示使用原生查詢執行功能執行應用程式時,可大幅縮短執行時間。

  • recommendedForBoost如果 TRUE,建議使用原生查詢執行功能執行應用程式。如果 recommendedForBoostFALSE請勿在批次工作負載上使用原生查詢執行。

  • expectedRuntimeReduction使用原生查詢執行功能執行應用程式時,預期應用程式執行階段的縮減百分比。

UnsupportedOperators.tsv 輸出檔案。

UnsupportedOperators.tsv 輸出檔案包含工作負載應用程式中使用的運算子清單,這些運算子不支援原生查詢執行。輸出檔案中的每一列都會列出不支援的運算子。

資料欄說明:

  • unsupportedOperator:原生查詢執行作業不支援的運算子名稱。

  • cumulativeCpuMs:執行運算子期間耗用的 CPU 毫秒數。這個值反映了應用程式中運算子的相對重要性。

  • count:運算子在應用程式中使用的次數。

使用原生查詢執行

建立執行應用程式的批次工作負載互動式工作階段工作階段範本時,您可以設定原生查詢執行屬性,在應用程式中使用原生查詢執行。

搭配批次工作負載使用原生查詢執行

您可以使用 Google Cloud 控制台、Google Cloud CLI 或 Dataproc API,在批次工作負載上啟用原生查詢執行。

控制台

使用 Google Cloud 控制台,在批次工作負載中啟用原生查詢執行。

  1. 在 Google Cloud 控制台中:

    1. 前往 Dataproc Batches
    2. 按一下「建立」,開啟「建立批次」頁面。
  2. 選取並填寫下列欄位,為原生查詢執行設定批次:

    • 容器:
    • 執行器和驅動程式層級設定:
      • 為所有層級 (「驅動程式運算層級」、「執行運算層級」) 選取 Premium
    • 屬性:輸入 Key (屬性名稱) 和 Value 配對,指定原生查詢執行屬性
      spark.dataproc.runtimeEngine 原生
  3. 填寫、選取或確認其他批次工作負載設定。請參閱「提交 Spark 批次工作負載」。

  4. 按一下「提交」,執行 Spark 批次工作負載。

gcloud

設定下列 gcloud CLI gcloud dataproc batches submit spark 指令標記,為原生查詢執行設定批次工作負載:

gcloud dataproc batches submit spark \
    --project=PROJECT_ID \
    --region=REGION \
    --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
    --class=org.apache.spark.examples.SparkPi \
    --properties=spark.dataproc.runtimeEngine=native,spark.dataproc.driver.compute.tier=premium,spark.dataproc.executor.compute.tier=premium \
    OTHER_FLAGS_AS_NEEDED

注意:

  • PROJECT_ID:您的 Google Cloud 專案 ID。 專案 ID 會列在 Google Cloud 控制台資訊主頁的「專案資訊」部分。
  • REGION:可執行工作負載的 Compute Engine 區域
  • OTHER_FLAGS_AS_NEEDED:請參閱「 提交 Spark 批次工作負載」。

API

設定下列 Dataproc API 欄位,為原生查詢執行設定批次工作負載:

原生查詢執行功能的使用時機

在下列情況下使用原生查詢執行:

  • Spark Dataframe API、Spark Dataset API 和 Spark SQL 查詢,可從 Parquet 和 ORC 檔案讀取資料。輸出檔案格式不會影響原生查詢執行效能。

  • 原生查詢執行資格工具建議的工作負載。

不應使用原生查詢執行功能的情況

下列資料類型的輸入內容:

  • 位元:ORC 和 Parquet
  • 時間戳記:ORC
  • 結構體、陣列、對應:Parquet

限制

在下列情況下啟用原生查詢執行功能,可能會導致例外狀況、Spark 不相容,或工作負載回復為預設的 Spark 引擎。

備用廣告

在下列執行作業中執行原生查詢,可能會導致工作負載回溯至 Spark 執行引擎,進而導致回歸或失敗。

  • ANSI:如果啟用 ANSI 模式,執行作業會改回使用 Spark。

  • 區分大小寫模式:原生查詢執行作業僅支援 Spark 預設的不區分大小寫模式。如果啟用區分大小寫模式,可能會出現不正確的結果。

  • 分區資料表掃描:只有在路徑包含分區資訊時,原生查詢執行作業才會支援分區資料表掃描,否則工作負載會回復為 Spark 執行引擎。

不相容的行為

在下列情況下使用原生查詢執行作業時,可能會導致不相容的行為或不正確的結果:

  • JSON 函式:原生查詢執行作業支援以雙引號括住的字串,而非單引號。使用單引號會導致結果不正確。在路徑中使用「*」搭配 get_json_object 函式會傳回 NULL

  • Parquet 讀取設定:

    • 即使設為 true,原生查詢執行作業仍會將 spark.files.ignoreCorruptFiles 視為設為預設的 false 值。
    • 原生查詢執行會忽略 spark.sql.parquet.datetimeRebaseModeInRead,並只傳回 Parquet 檔案內容。系統不會考量舊版混合式 (儒略格里高利) 日曆與前推格里高利日曆之間的差異。Spark 結果可能會有所不同。
  • NaN不支援。舉例來說,在數值比較中使用 NaN 可能會導致非預期的結果。

  • Spark 直欄讀取:Spark 直欄向量與原生查詢執行作業不相容,因此可能會發生嚴重錯誤。

  • 溢出:如果將隨機重組分區設為大量,溢出至磁碟功能可能會觸發 OutOfMemoryException。如果發生這種情況,減少分區數量即可排除這項例外狀況。