ネイティブ クエリ実行でバッチ ワークロードとインタラクティブ セッションを高速化する

このドキュメントでは、Native Query Execution を有効にして、Apache Spark バッチ ワークロードとインタラクティブ セッションの Serverless を高速化するタイミングと方法について説明します。

ネイティブ クエリ実行の要件

Serverless for Apache Spark ネイティブ クエリ実行は、1.2.26+2.2.26+、またはそれ以降の Spark ランタイム バージョンを使用するバッチ ワークロードとインタラクティブ セッションでのみ使用できます。これらのバージョンは、Serverless for Apache Spark のプレミアム価格帯で実行されます。プレミアム ティアの料金はスタンダード ティアの料金よりも高くなりますが、ネイティブ クエリ実行に追加料金はかかりません。料金については、Apache Spark 用サーバーレスの料金をご覧ください。

ネイティブ クエリ実行のプロパティ

このセクションでは、バッチ ワークロードまたはインタラクティブ セッションのネイティブ クエリ実行を有効にしてカスタマイズするために使用できる、必須とオプションの Spark リソース割り当てプロパティを示します。

必須のプロパティ設定

  • spark.dataproc.runtimeEngine=native: デフォルトの spark ランタイム エンジンをオーバーライドするには、ワークロード ランタイム エンジンを native に設定する必要があります。

  • spark.dataproc.spark.driver.compute.tier=premiumspark.dataproc.executor.compute.tier=premium: これらの料金階層プロパティは、プレミアム料金階層に設定する必要があります。

省略可能なリソース割り当てプロパティ

  • spark.dataproc.driver.disk.tierspark.dataproc.driver.disk.sizespark.dataproc.executor.disk.tierspark.dataproc.executor.disk.size: これらのプロパティを使用して、Spark ドライバ プロセスとエグゼキュータ プロセスのプレミアム ディスクの階層とサイズを設定します。

    プレミアム ディスク階層では、行ベースのシャッフルではなく列ベースのシャッフルを使用して、パフォーマンスを向上させます。シャッフル I/O スループットを向上させるには、シャッフル ファイルを格納するのに十分な大きさのディスクサイズで、ドライバとエグゼキュータのプレミアム ディスク階層を使用します。

  • spark.driver.memoryspark.driver.memoryOverheadspark.executor.memoryspark.executor.memoryOverheadspark.memory.offHeap.size: これらのプロパティを使用して、Spark ドライバ プロセスとエグゼキュータ プロセスに提供されるメモリを調整します。

    メモリは、次のいずれかの方法で構成できます。

    • オプション 1: 指定した値でオフヒープ メモリ(spark.memory.offHeap.size)のみを構成します。ネイティブ クエリの実行では、指定された値がオフヒープ メモリとして使用され、オフヒープ メモリ値の 1/7th がオンヒープ メモリ(spark.executor.memory)として割り当てられます。

    • オプション 2: オンヒープ メモリ(spark.executor.memory)とオフヒープ メモリ(spark.memory.offHeap.size)の両方を構成します。オフヒープ メモリに割り当てる量は、オンヒープ メモリに割り当てる量よりも大きくする必要があります。

    オフヒープ メモリ(spark.memory.offHeap.size)とオンヒープ メモリ(spark.executor.memory)の両方を構成しない場合、ネイティブ クエリ実行エンジンは、デフォルトの 4g メモリ量を 6:1 の比率でオフヒープ メモリとオンヒープ メモリに分割します。

    推奨事項: 6:1 の比率でオフヒープ メモリをオンヒープ メモリに割り当てます。

    例:

    ネイティブ クエリ実行なしのメモリ設定 ネイティブ クエリ実行でのメモリの推奨設定
    spark.executor.memory spark.memory.offHeap.size spark.executor.memory
    7g 6g 1g
    14g 12g 2g
    28g 24g 4g
    56g 48g 8g

認定ツールを実行する

ネイティブ クエリ実行(NQE)で実行時間を短縮できるバッチ ワークロードを特定するには、認定ツールを使用します。このツールは、Spark イベントログを分析して、潜在的なランタイムの節約量を推定し、NQE エンジンでサポートされていないオペレーションを特定します。

Google Cloud には、適格性分析を実行するための 2 つの方法(適格性ジョブと適格性スクリプト)があります。ほとんどのユーザーにおすすめの方法は、バッチ ワークロードの検出と分析を自動化する認定ジョブです。代替の限定スクリプトは、既知のイベント ログファイルの分析という特定のユースケースで使用できます。ユースケースに最適な方法を選択します。

  • Qualification Job(推奨): これは主な推奨方法です。これは、1 つ以上の Google Cloud プロジェクトとリージョンにわたって最近のバッチ ワークロードを自動的に検出して分析する PySpark ジョブです。このメソッドは、個々のイベント ログファイルを自分で探す必要なく、広範な分析を行う場合に使用します。このアプローチは、NQE の適合性を大規模に評価する場合に最適です。

  • 認定スクリプト(代替):高度なユースケースや特定のユースケース向けの代替方法です。これは、単一の Spark イベントログファイルまたは特定の Cloud Storage ディレクトリ内のすべてのイベントログを分析するシェル スクリプトです。分析するイベントログの Cloud Storage パスがある場合は、このメソッドを使用します。

資格ジョブ

認定ジョブは、Apache Spark バッチ ワークロード用のサーバーレスをプログラムでスキャンし、分散分析ジョブを送信することで、大規模な分析を簡素化します。このツールは組織全体のジョブを評価するため、イベント ログのパスを手動で検索して指定する必要がなくなります。

IAM ロールを付与する

限定ジョブがバッチ ワークロード メタデータにアクセスし、Cloud Logging で Spark イベントログを読み取るには、ワークロードを実行するサービス アカウントに、分析対象のすべてのプロジェクトで次の IAM ロールが付与されている必要があります。

限定ジョブを送信する

認定ジョブは gcloud CLI ツールを使用して送信します。このジョブには、一般公開されている Cloud Storage バケットでホストされている PySpark スクリプトと JAR ファイルが含まれています。

ジョブは次のいずれかの実行環境で実行できます。

  • Serverless for Apache Spark バッチ ワークロードとして。これは、単純なスタンドアロン ジョブの実行です。

  • Dataproc on Compute Engine クラスタで実行されるジョブとして。この方法は、ジョブをワークフローに統合する場合に便利です。

ジョブの引数

引数 説明 必須かどうか デフォルト値
--project-ids バッチ ワークロードをスキャンする単一のプロジェクト ID または Google Cloud プロジェクト ID のカンマ区切りリスト。 いいえ 認定ジョブが実行されているプロジェクト。
--regions 指定されたプロジェクト内でスキャンする単一のリージョンまたはカンマ区切りのリージョン リスト。 いいえ 指定されたプロジェクト内のすべてのリージョン。
--start-time バッチのフィルタリングの開始日。この日付(形式: YYYY-MM-DD)以降に作成されたバッチのみが分析されます。 いいえ 開始日フィルタは適用されていません。
--end-time バッチのフィルタリングの終了日。この日付(形式: YYYY-MM-DD)以前に作成されたバッチのみが分析されます。 いいえ 終了日フィルタは適用されません。
--limit リージョンごとに分析するバッチの最大数。最新のバッチから順に分析されます。 いいえ 他のフィルタ条件に一致するすべてのバッチが分析されます。
--output-gcs-path 結果ファイルが書き込まれる Cloud Storage パス(例: gs://your-bucket/output/)。 なし
--input-file 一括分析用のテキスト ファイルの Cloud Storage パス。この引数が指定されている場合、他のすべてのスコープ定義引数(--project-ids--regions--start-time--end-time--limit)がオーバーライドされます。 いいえ なし

認定ジョブの例

  • 簡単なアドホック分析を実行する Apache Spark バッチジョブ用のサーバーレス。ジョブ引数は -- 区切り文字の後に一覧表示されます。

    gcloud dataproc batches submit pyspark gs://qualification-tool/performance-boost-qualification.py \
        --project=PROJECT_ID \
        --region=REGION \
        --jars=gs://qualification-tool/dataproc-perfboost-qualification-1.2.jar \
        -- \
        --project-ids=COMMA_SEPARATED_PROJECT_IDS \
        --regions=COMMA_SEPARATED_REGIONS \
        --limit=MAX_BATCHES \
        --output-gcs-path=gs://BUCKET
    
  • us-central1 リージョンの sample_project で見つかった最新のバッチを最大 50 個分析する Apache Spark 用サーバーレス バッチジョブ。結果は Cloud Storage のバケットに書き込まれます。ジョブ引数は -- 区切り文字の後に一覧表示されます。

    gcloud dataproc batches submit pyspark gs://qualification-tool/performance-boost-qualification.py \
        --project=PROJECT_ID \
        --region=US-CENTRAL1 \
        --jars=gs://qualification-tool/dataproc-perfboost-qualification-1.2.jar \
        -- \
        --project-ids=PROJECT_ID \
        --regions=US-CENTRAL1 \
        --limit=50 \
        --output-gcs-path=gs://BUCKET/
    
  • 大規模で再現性のある自動化された分析ワークフローで一括分析を行うために Dataproc クラスタに送信された Dataproc on Compute Engine ジョブ。ジョブ引数は、Cloud Storage の BUCKET にアップロードされる INPUT_FILE に配置されます。この方法は、1 回の実行で異なるプロジェクトとリージョンにわたって異なる日付範囲やバッチ上限をスキャンする場合に最適です。

    gcloud dataproc jobs submit pyspark gs://qualification-tool/performance-boost-qualification.py \
        --cluster=CLUSTER_NAME \
        --region=REGION \
        --jars=gs://qualification-tool/dataproc-perfboost-qualification-1.2.jar \
        -- \
        --input-file=gs://INPUT_FILE \
        --output-gcs-path=gs://BUCKET
    

    注:

    INPUT_FILE: ファイル内の各行は個別の分析リクエストを表し、1 文字のフラグとその値の形式(-p PROJECT-ID -r REGION -s START_DATE -e END_DATE -l LIMITS など)を使用します。

    入力ファイルの内容の例:

    -p project1 -r us-central1 -s 2024-12-01 -e 2024-12-15 -l 100
    -p project2 -r europe-west1 -s 2024-11-15 -l 50
    

    これらの引数は、次の 2 つのスコープを分析するようにツールに指示します。

    • 2025 年 12 月 1 日から 2025 年 12 月 15 日の間に作成された us-central1 リージョンの project1 のバッチが 100 個まで。
    • 2025 年 11 月 15 日以降に作成された europe-west1 リージョンの project2 で最大 50 個のバッチ。

見込み客の評価スクリプト

分析する特定の Spark イベントログへの Cloud Storage の直接パスがある場合は、この方法を使用します。この方法では、Cloud Storage のイベントログ ファイルへのアクセス権が構成されているローカルマシンまたは Compute Engine VM で、シェル スクリプト run_qualification_tool.sh をダウンロードして実行する必要があります。

次の手順で、Apache Spark バッチ ワークロード イベント ファイルに対してスクリプトを実行します。

1. 分析する Spark イベント ファイルを含むローカル ディレクトリに run_qualification_tool.sh をコピーします。

  1. 限定スクリプトを実行して、1 つのイベント ファイルまたはスクリプト ディレクトリに含まれる一連のイベント ファイルを分析します。

    ./run_qualification_tool.sh -f EVENT_FILE_PATH/EVENT_FILE_NAME \
        -o CUSTOM_OUTPUT_DIRECTORY_PATH \
        -k SERVICE_ACCOUNT_KEY  \
        -x MEMORY_ALLOCATEDg  \
        -t PARALLEL_THREADS_TO_RUN
    

    フラグと値:

    -f(必須): Spark イベント ファイルの場所を参照して、Spark ワークロード イベント ファイルを見つけます。

    • EVENT_FILE_PATHEVENT_FILE_NAME が指定されていない場合は必須): 分析するイベント ファイルのパス。指定しない場合、イベント ファイルのパスは現在のディレクトリとみなされます。

    • EVENT_FILE_NAMEEVENT_FILE_PATH が指定されていない場合は必須): 分析するイベント ファイルの名前。指定しない場合、EVENT_FILE_PATH で再帰的に検出されたイベント ファイルが分析されます。

    -o(省略可): 指定しない場合、ツールは現在のディレクトリの下に output ディレクトリを作成するか、既存のディレクトリを使用して、出力ファイルを配置します。

    • CUSTOM_OUTPUT_DIRECTORY_PATH: 出力ファイルを保存する出力ディレクトリのパス。

    -k(省略可):

    -x(省略可):

    • MEMORY_ALLOCATED: ツールに割り当てるメモリ(GB)。デフォルトでは、このツールはシステムで使用可能な空きメモリの 80% と、使用可能なすべてのマシンコアを使用します。

    -t(省略可):

    • PARALLEL_THREADS_TO_RUN: ツールが実行する並列スレッドの数。デフォルトでは、ツールはすべてのコアを実行します。

    コマンドの使用例:

    ./run_qualification_tool.sh -f gs://dataproc-temp-us-east1-9779/spark-job-history \
        -o perfboost-output -k /keys/event-file-key -x 34g -t 5
    

    この例では、限定ツールは gs://dataproc-temp-us-east1-9779/spark-job-history ディレクトリをトラバースし、このディレクトリとそのサブディレクトリに含まれる Spark イベントファイルを分析します。ディレクトリへのアクセスは /keys/event-file-key で提供されます。このツールは実行に 34 GB memory を使用し、5 並列スレッドを実行します。

    Spark イベント ファイルの場所

Serverless for Apache Spark バッチ ワークロードの Spark イベント ファイルを見つけるには、次のいずれかの操作を行います。

  1. Cloud Storage でワークロードの spark.eventLog.dir を見つけてダウンロードします。

    1. spark.eventLog.dir が見つからない場合は、spark.eventLog.dir を Cloud Storage のロケーションに設定し、ワークロードを再実行して spark.eventLog.dir をダウンロードします。
  2. バッチジョブに Spark History Server を構成している場合:

    1. Spark History Server に移動し、ワークロードを選択します。
    2. [イベントログ] 列の [ダウンロード] をクリックします。

Qualification ツールの出力ファイル

認定ジョブまたはスクリプト分析が完了すると、認定ツールは次の出力ファイルを現在のディレクトリの perfboost-output ディレクトリに配置します。

  • AppsRecommendedForBoost.tsv: ネイティブ クエリ実行での使用が推奨されるアプリケーションのタブ区切りリスト。

  • UnsupportedOperators.tsv: ネイティブ クエリ実行での使用が推奨されないアプリケーションのタブ区切りリスト。

AppsRecommendedForBoost.tsv 出力ファイル

次の表は、AppsRecommendedForBoost.tsv 出力ファイルのサンプル内容を示しています。分析されたアプリケーションごとに 1 行が含まれています。

AppsRecommendedForBoost.tsv 出力ファイルの例:

applicationId applicationName rddPercentage unsupportedSqlPercentage totalTaskTime supportedTaskTime supportedSqlPercentage recommendedForBoost expectedRuntimeReduction
app-2024081/batches/083f6196248043938-000 projects/example.com:dev/locations/us-central1
6b4d6cae140f883c0
11c8e
0.00% 0.00% 548924253 548924253 100.00% TRUE 30.00%
app-2024081/batches/60381cab738021457-000 projects/example.com:dev/locations/us-central1
474113a1462b426bf
b3aeb
0.00% 0.00% 514401703 514401703 100.00% TRUE 30.00%

列の説明:

  • applicationId: Spark アプリケーションの ApplicationID。対応するバッチ ワークロードを特定するために使用します。

  • applicationName: Spark アプリケーションの名前。

  • rddPercentage: アプリケーション内の RDD オペレーションの割合。RDD オペレーションはネイティブ クエリ実行ではサポートされていません。

  • unsupportedSqlPercentage: ネイティブ クエリ実行でサポートされていない SQL オペレーションの割合。

  • totalTaskTime: アプリケーションの実行中に実行されたすべてのタスクの累積タスク時間。

  • supportedTaskTime: ネイティブ クエリ実行でサポートされている合計タスク時間。

次の列には、ネイティブ クエリ実行がバッチ ワークロードにメリットをもたらすかどうかを判断するうえで重要な情報が示されています。

  • supportedSqlPercentage: ネイティブ クエリ実行でサポートされている SQL オペレーションの割合。この割合が高いほど、ネイティブ クエリ実行でアプリケーションを実行することで実現できるランタイムの短縮率が大きくなります。

  • recommendedForBoost: TRUE の場合は、ネイティブ クエリ実行でアプリケーションを実行することをおすすめします。recommendedForBoostFALSE の場合、バッチ ワークロードでネイティブ クエリ実行を使用しないでください。

  • expectedRuntimeReduction: ネイティブ クエリ実行でアプリケーションを実行した場合に予想されるアプリケーション ランタイムの削減率。

UnsupportedOperators.tsv 出力ファイル。

UnsupportedOperators.tsv 出力ファイルには、ネイティブ クエリ実行でサポートされていないワークロード アプリケーションで使用されている演算子のリストが含まれています。出力ファイルの各行には、サポートされていない演算子がリストされます。

列の説明:

  • unsupportedOperator: ネイティブ クエリ実行でサポートされていない演算子の名前。

  • cumulativeCpuMs: オペレータの実行中に消費された CPU ミリ秒数。この値は、アプリケーションにおけるオペレーターの相対的な重要度を反映しています。

  • count: アプリケーションでオペレータが使用される回数。

ネイティブ クエリ実行を使用する

アプリケーションでネイティブ クエリ実行を使用するには、アプリケーションを実行するバッチ ワークロードインタラクティブ セッション、またはセッション テンプレートを作成するときに、ネイティブ クエリ実行プロパティを設定します。

バッチ ワークロードでネイティブ クエリ実行を使用する

Google Cloud コンソール、Google Cloud CLI、または Dataproc API を使用して、バッチ ワークロードでネイティブ クエリ実行を有効にできます。

コンソール

Google Cloud コンソールを使用して、バッチ ワークロードでネイティブ クエリ実行を有効にします。

  1. Google Cloud コンソールで次の操作を行います。

    1. Dataproc バッチに移動します。
    2. [作成] をクリックして、[バッチ作成] ページを開きます。
  2. 次のフィールドを選択して入力し、ネイティブ クエリ実行のバッチを構成します。

    • コンテナ:
    • エグゼキュータとドライバのティア構成:
      • すべてのティア(ドライバのコンピューティング ティア実行のコンピューティング ティア)で Premium を選択します。
    • プロパティ: Key(プロパティ名)と Value のペアを入力して、ネイティブ クエリ実行プロパティを指定します。
      キー
      spark.dataproc.runtimeEngine 先住民
  3. 他のバッチ ワークロードの設定を入力、選択、または確認します。Spark バッチ ワークロードを送信するをご覧ください。

  4. [送信] をクリックして、Spark バッチ ワークロードを実行します。

gcloud

次の gcloud CLI gcloud dataproc batches submit spark コマンドフラグを設定して、ネイティブ クエリ実行のバッチ ワークロードを構成します。

gcloud dataproc batches submit spark \
    --project=PROJECT_ID \
    --region=REGION \
    --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
    --class=org.apache.spark.examples.SparkPi \
    --properties=spark.dataproc.runtimeEngine=native,spark.dataproc.driver.compute.tier=premium,spark.dataproc.executor.compute.tier=premium \
    OTHER_FLAGS_AS_NEEDED

注:

API

次の Dataproc API フィールドを設定して、ネイティブ クエリ実行のバッチ ワークロードを構成します。

ネイティブ クエリ実行を使用する場合

ネイティブ クエリ実行は、次のようなシナリオで使用します。

  • Parquet ファイルと ORC ファイルからデータを読み取る Spark Dataframe API、Spark Dataset API、Spark SQL クエリ。出力ファイル形式は、ネイティブ クエリ実行のパフォーマンスに影響しません。

  • ネイティブ クエリ実行の認定ツールで推奨されるワークロード。

ネイティブ クエリ実行を使用すべきでない場合

次のデータ型の入力:

  • バイト: ORC と Parquet
  • タイムスタンプ: ORC
  • 構造体、配列、マップ: Parquet

制限事項

次のシナリオでネイティブ クエリ実行を有効にすると、例外、Spark の非互換性、ワークロードのデフォルトの Spark エンジンへのフォールバックが発生する可能性があります。

フォールバック

次の実行でのネイティブ クエリの実行により、ワークロードが Spark 実行エンジンにフォールバックし、回帰または失敗が発生する可能性があります。

  • ANSI: ANSI モードが有効になっている場合、実行は Spark にフォールバックします。

  • 大文字と小文字を区別するモード: ネイティブ クエリ実行は、Spark のデフォルトの大文字と小文字を区別しないモードのみをサポートします。大文字と小文字を区別するモードが有効になっていると、正しくない結果が返されることがあります。

  • パーティション分割テーブルのスキャン: ネイティブ クエリ実行は、パスにパーティション情報が含まれている場合にのみパーティション分割テーブルのスキャンをサポートします。それ以外の場合、ワークロードは Spark 実行エンジンにフォールバックします。

互換性のない動作

次の場合は、ネイティブ クエリ実行を使用すると、互換性のない動作や誤った結果が生じる可能性があります。

  • JSON 関数: ネイティブ クエリ実行では、単一引用符ではなく二重引用符で囲まれた文字列がサポートされます。単一引用符を使用すると、正しくない結果が返されます。get_json_object 関数でパスに「*」を使用すると、NULL が返されます。

  • Parquet 読み取り構成:

    • ネイティブ クエリの実行では、spark.files.ignoreCorruptFilestrue に設定されている場合でも、デフォルトの false 値に設定されているとみなされます。
    • ネイティブ クエリ実行は spark.sql.parquet.datetimeRebaseModeInRead を無視し、Parquet ファイルの内容のみを返します。以前のハイブリッド(ユリウス暦とグレゴリオ暦)カレンダーと先発グレゴリオ暦カレンダーの違いは考慮されません。Spark の結果は異なる場合があります。
  • NaN: サポートされていません。たとえば、数値比較で NaN を使用すると、予期しない結果が生じる可能性があります。

  • Spark の列形式の読み取り: Spark の列形式のベクトルがネイティブ クエリ実行と互換性がないため、致命的なエラーが発生する可能性があります。

  • スピル: シャッフル パーティションが大きな数に設定されている場合、ディスクへのスピル機能が OutOfMemoryException をトリガーすることがあります。この例外が発生した場合は、パーティションの数を減らすことで例外を解消できます。