ネイティブ クエリ実行で Dataproc サーバーレス バッチ ワークロードを高速化する

このドキュメントでは、プレミアム料金階層で実行されている Dataproc サーバーレス バッチ ワークロードでネイティブ クエリ実行を使用する方法について説明します。ネイティブ クエリ実行は、Spark ワークロードを高速化し、費用を削減できる機能です。この機能は Apache Spark API と互換性があり、追加のユーザーコードは必要ありません。

ネイティブ クエリ実行を使用する場合

次のようなシナリオでは、ネイティブ クエリ実行を使用します。

Parquet ファイルと ORC ファイルからデータを読み取る Spark DataFrame API と Spark SQL クエリ。
ネイティブ クエリ実行の適格性評価ツールで推奨されるワークロード。

ネイティブ クエリ実行を使用すべきでない場合

次のシナリオでは、ネイティブ クエリ実行を使用しないでください。バッチ ワークロードのランタイムの短縮が達成されず、ワークロードの失敗やリグレッションが発生する可能性があります。

ネイティブ クエリ実行の適格性評価ツールで推奨されていないワークロード。
Spark RDD、UDF、ML ワークロード
書き込みワークロード
Parquet と ORC 以外のファイル形式
次のデータ型の入力:

  • Timestamp
  • TinyInt
  • Byte
  • Struct
  • Array
  • Map: ORC と Parquet
  • VarChar
  • Char: ORC
正規表現を含むクエリ
リクエスト元による支払いバケットを使用するワークロード
デフォルト以外の Cloud Storage 構成設定。ネイティブ クエリ実行では、オーバーライドされている場合でも多くのデフォルトが使用されます。

プレミアム ティア料金でネイティブ クエリ実行を使用する方法

Dataproc Serverless ネイティブ クエリ実行は、Dataproc Serverless プレミアム料金ティアで実行されるバッチ ワークロードでのみ使用できます。プレミアム ティアの料金はスタンダード ティアの料金よりも高くなりますが、ネイティブ クエリの実行に追加料金はかかりません。詳細については、Dataproc Serverless の料金をご覧ください。

バッチ リソースのプレミアム ティアのリソース割り当てと料金を有効にするには、Spark バッチ ワークロードを送信するときに、次のリソース割り当て階層プロパティpremium に設定します。

Dataproc サーバーレスのバッチ ワークロードは、Google Cloud コンソール、Google Cloud CLI、または Dataproc API を使用して送信できます。

Console

  1. Google Cloud コンソールの場合:

    1. [Dataproc バッチ] に移動します。
    2. [作成] をクリックして [バッチ作成] ページを開きます。
  2. 次のフィールドを選択して入力し、ネイティブ クエリ実行用のバッチを構成します。

    • コンテナ:
    • エグゼキュータとドライバのティア構成:
      • すべてのティア(ドライバのコンピューティング ティア実行コンピューティング ティア)で Premium を選択します。
    • プロパティ: 次の ネイティブ クエリ実行プロパティに、次の Key(プロパティ名)と Value のペアを入力します。
      キー
      spark.dataproc.runtimeEngine 先住民
  3. 他のバッチ ワークロードの設定を入力、選択、確認します。Spark バッチ ワークロードの送信をご覧ください。

  4. [送信] をクリックして、Spark バッチ ワークロードを実行します。

gcloud

ネイティブ クエリ実行のバッチ ワークロードを構成するには、次の gcloud CLI gcloud dataproc batches submit spark コマンドフラグを設定します。

gcloud dataproc batches submit spark \
    --project=PROJECT_ID \
    --region=REGION \
    --version=VERSION \
    --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
    --class=org.apache.spark.examples.SparkPi \
    --properties=spark.dataproc.runtimeEngine=native,spark.dataproc.driver.compute.tier=premium,spark.dataproc.executor.compute.tier=premium \
    OTHER_FLAGS_AS_NEEDED

注:

API

ネイティブ クエリ実行のバッチ ワークロードを構成するには、次の Dataproc API フィールドを設定します。

制限事項

次のシナリオでネイティブ クエリ実行を有効にすると、例外がスローされたり、Spark の互換性の問題が発生したり、ワークロードがデフォルトの Spark エンジンにフォールバックする可能性があります。

フォールバック

ネイティブ クエリ実行では、実行時にワークロードが Spark 実行エンジンにフォールバックし、リグレッションまたは障害が発生する可能性があります。

  • ANSI: ANSI モードが有効になっている場合、実行は Spark にフォールバックします。

  • 大文字と小文字を区別するモード: ネイティブ クエリ実行は、Spark のデフォルトの大文字と小文字を区別しないモードのみをサポートしています。大文字と小文字を区別するモードが有効になっていると、誤った結果が返される可能性があります。

  • RegExp functions: ネイティブ クエリ実行は、RE2 に基づいて rlikeregexp_extract などの regexp 関数を実装します。Spark では、java.util.regex に基づいています。

    • ルックアップ: ネイティブ クエリ実行は、RE2 のルックアップまたはルックバック パターンをサポートしていません。
    • java.util.regex とは異なり、RE2 では、パターン「\s」で空白文字を照合する場合、\v(\x0b)を空白文字として扱いません。
  • パーティション分割テーブルのスキャン: ネイティブ クエリ実行は、パスにパーティション情報が含まれている場合にのみ、パーティション分割テーブルのスキャンをサポートします。それ以外の場合、ワークロードは Spark 実行エンジンにフォールバックします。

互換性のない動作

次の場合にネイティブ クエリ実行を使用すると、互換性のない動作や誤った結果が発生する可能性があります。

  • JSON 関数: ネイティブ クエリ実行は、一重引用符ではなく二重引用符で囲まれた文字列をサポートしています。単一引用符を使用すると、正しくない結果が返されます。get_json_object 関数でパスに「*」を使用すると、NULL が返されます。

  • Parquet 読み取り構成:

    • ネイティブ クエリ実行では、true に設定されている場合でも、spark.files.ignoreCorruptFiles はデフォルトの false 値に設定されているものとして扱われます。
    • ネイティブ クエリ実行は spark.sql.parquet.datetimeRebaseModeInRead を無視し、Parquet ファイルの内容のみを返します。従来のハイブリッド(ユリウス グレゴリオ)暦とプロレプティック グレゴリオ暦の違いは考慮されません。Spark の結果は異なる場合があります。
  • NaN: サポートされていません。数値比較で NaN を使用する場合など、予期しない結果が生じる可能性があります。

  • Spark 列形式の読み取り: Spark 列形式ベクトルがネイティブ クエリ実行と互換性がないため、致命的なエラーが発生する可能性があります。

  • スピンアウト: シャッフル パーティションが大量に設定されている場合、ディスクへのスピンアウト機能によって OutOfMemoryException がトリガーされることがあります。この場合は、パーティションの数を減らすことで、この例外を排除できます。

ネイティブ クエリ実行ワークロードのチューニング

Dataproc Serverless ネイティブ クエリ実行は、次のプロパティを使用してさらに調整できます。

バッチ プロパティ 使用する状況
spark.driver.memory
spark.driver.memoryOverhead
To tune the memory provided to spark driver process
spark.executor.memory
spark.executor.memoryOverhead
spark.memory.offHeap.size
To tune the memory provided to onheap/offheap memory of executor process
spark.dataproc.driver.disk.tier
spark.dataproc.driver.disk.size
To configure premium disk tier and size for driver
spark.dataproc.executor.disk.tier
spark.dataproc.executor.disk.size
To configure premium disk tier and size for executor

ネイティブ クエリ実行バッチ ワークロードのプロパティ

  • spark.dataproc.runtimeEngine=native必須): デフォルトの spark ランタイム エンジンをオーバーライドするには、ワークロード ランタイム エンジンを native に設定する必要があります。

  • version必須): ワークロードで Spark ランタイム バージョン 1.2.26 以降、2.2.26 以降、またはそれ以降のメジャー ランタイム バージョンを使用する必要があります。

  • プレミアム コンピューティング階層(必須): spark.dataproc.spark.driver.compute.tier プロパティと spark.dataproc.executor.compute.tier プロパティは premium に設定する必要があります。

  • プレミアム ディスク階層(省略可): プレミアム ディスク階層では、行ベースのシャッフルではなく列ベースのシャッフルを使用して、パフォーマンスを向上させます。シャッフル I/O スループットを改善するには、シャッフル ファイルに対応できる十分なディスクサイズのドライバとエグゼキュータのプレミアム ディスク階層を使用します。

  • メモリ(省略可): オフヒープ メモリ(spark.memory.offHeap.size)とオンヒープ メモリ(spark.executor.memory)の両方を構成せずにネイティブ クエリ実行エンジンを構成した場合、ネイティブ クエリ実行エンジンはデフォルトの 4g 量のメモリを取り、オフヒープ メモリとオンヒープ メモリに 6:1 の比率で分割します。

    ネイティブ クエリ実行の使用時にメモリを構成する場合は、次のいずれかの方法で構成できます。

    • 指定された値でオフヒープ メモリのみ(spark.memory.offHeap.size)を構成します。ネイティブ クエリ実行では、指定された値がオフヒープ メモリとして使用され、オフヒープ メモリ値の 1/7th がオンヒープ メモリとして使用されます。

    • オンヒープメモリ(spark.executor.memory)とオフヒープメモリ(spark.memory.offHeap.size)の両方を構成します。オフヒープメモリに割り当てる量は、オンヒープメモリに割り当てる量よりも大きくする必要があります。推奨事項: オフヒープのメモリとオンヒープのメモリを 6:1 の比率で割り当てます。

    値の例:

    ネイティブ クエリ実行なしのメモリ設定 ネイティブ クエリ実行での推奨メモリ設定
    spark.executor.memory spark.memory.offHeap.size spark.executor.memory
    7g 6g 1g
    14g 12g 2g
    28g 24g 4g
    56g 48g 8g

ネイティブ クエリ実行の適格性確認ツール

Dataproc ネイティブ クエリ実行の適格性ツール run_qualification_tool.sh を実行して、ネイティブ クエリ実行で実行時間を短縮できるワークロードを特定できます。このツールは、バッチ ワークロード アプリケーションによって生成された Spark イベントファイルを分析し、ネイティブ クエリ実行で各ワークロード アプリケーションが得られる潜在的なランタイム削減を見積もります。

適格性ツールを実行する

次の手順で、Dataproc Serverless バッチ ワークロード イベント ファイルに対してツールを実行します。

1. run_qualification_tool.sh を、分析する Spark イベント ファイルを含むローカル ディレクトリにコピーします。

  1. 適格性ツールを実行して、スクリプト ディレクトリに含まれる 1 つのイベント ファイルまたはイベント ファイルのセットを分析します。

    ./run_qualification_tool.sh -f EVENT_FILE_PATH/EVENT_FILE_NAME \
        -o CUSTOM_OUTPUT_DIRECTORY_PATH \
        -k SERVICE_ACCOUNT_KEY  \
        -x MEMORY_ALLOCATEDg  \
        -t PARALLEL_THREADS_TO_RUN
    

    フラグと値:

    -f(必須): Spark イベント ファイルの場所で、Spark ワークロード イベント ファイルを見つけます。

    • EVENT_FILE_PATHEVENT_FILE_NAME が指定されていない場合必須): 分析するイベント ファイルのパス。指定しない場合、イベント ファイルのパスには現在のディレクトリが使用されます。

    • EVENT_FILE_NAMEEVENT_FILE_PATH が指定されていない場合必須): 分析するイベント ファイルの名前。指定しない場合、EVENT_FILE_PATH で再帰的に検出されたイベント ファイルが分析されます。

    -o(省略可): 指定しない場合、ツールは現在のディレクトリの下に既存の output ディレクトリを作成するか、既存の output ディレクトリを使用して出力ファイルを配置します。

    • CUSTOM_OUTPUT_DIRECTORY_PATH: 出力ファイルの出力ディレクトリ パス。

    -k(省略可):

    -x(省略可):

    • MEMORY_ALLOCATED: ツールに割り当てるメモリ(ギガバイト単位)。デフォルトでは、このツールはシステムで使用可能な空きメモリの 80% と、使用可能なすべてのマシンコアを使用します。

    -t(省略可):

    • PARALLEL_THREADS_TO_RUN: N=ツールが実行する並列スレッドの数。デフォルトでは、ツールはすべてのコアを実行します。

    コマンドの使用例:

    ./run_qualification_tool.sh -f gs://dataproc-temp-us-east1-9779/spark-job-history \
        -o perfboost-output -k /keys/event-file-key -x 34g -t 5
    

    この例では、適格性ツールが gs://dataproc-temp-us-east1-9779/spark-job-history ディレクトリを走査し、このディレクトリとそのサブディレクトリに含まれる Spark イベント ファイルを分析します。ディレクトリへのアクセスは /keys/event-file-key によって提供されます。このツールは、実行に 34 GB memory を使用し、5 並列スレッドを実行します。

適格性評価ツールの出力ファイル

分析が完了すると、適格性ツールは、現在のディレクトリの perfboost-output ディレクトリに次の出力ファイルを配置します。

  • AppsRecommendedForBoost.tsv: ネイティブ クエリ実行で使用することをおすすめするアプリケーションのタブ区切りリスト。

  • UnsupportedOperators.tsv: ネイティブ クエリ実行での使用が推奨されないアプリケーションのタブ区切りリスト。

AppsRecommendedForBoost.tsv 出力ファイル

次の表に、サンプルの AppsRecommendedForBoost.tsv 出力ファイルの内容を示します。分析されたアプリケーションごとに 1 行が含まれます。

AppsRecommendedForBoost.tsv 出力ファイルの例:

applicationId applicationName rddPercentage unsupportedSqlPercentage totalTaskTime supportedTaskTime supportedSqlPercentage recommendedForBoost expectedRuntimeReduction
app-2024081/batches/083f6196248043938-000 projects/example.com:dev/locations/us-central1
6b4d6cae140f883c0
11c8e
0.00% 0.00% 548924253 548924253 100.00% TRUE 30.00%
app-2024081/batches/60381cab738021457-000 projects/example.com:dev/locations/us-central1
474113a1462b426bf
b3aeb
0.00% 0.00% 514401703 514401703 100.00% TRUE 30.00%

列の説明:

  • applicationId: Spark アプリケーションの ApplicationID。これを使用して、対応するバッチ ワークロードを特定します。

  • applicationName: Spark アプリケーションの名前。

  • rddPercentage: アプリケーション内の RDD オペレーションの割合。RDD オペレーションはネイティブ クエリ実行ではサポートされていません。

  • unsupportedSqlPercentage: ネイティブ クエリ実行でサポートされていない SQL オペレーションの割合。

  • totalTaskTime: アプリケーションの実行中に実行されたすべてのタスクの累積タスク時間。

  • supportedTaskTime: ネイティブ クエリ実行でサポートされるタスクの合計時間。

次の列には、ネイティブ クエリ実行がバッチ ワークロードにメリットがあるかどうかを判断するのに役立つ重要な情報が表示されます。

  • supportedSqlPercentage: ネイティブ クエリ実行でサポートされる SQL オペレーションの割合。この割合が高いほど、ネイティブ クエリ実行でアプリケーションを実行することで、ランタイムを短縮できます。

  • recommendedForBoost: TRUE の場合は、ネイティブ クエリ実行でアプリケーションを実行することをおすすめします。recommendedForBoostFALSE の場合、バッチ ワークロードでネイティブ クエリ実行を使用しないでください。

  • expectedRuntimeReduction: ネイティブ クエリ実行でアプリケーションを実行した場合に、アプリケーション ランタイムが削減される割合(%)。

UnsupportedOperators.tsv 出力ファイル。

UnsupportedOperators.tsv 出力ファイルには、ネイティブ クエリ実行でサポートされていないワークロード アプリケーションで使用される演算子のリストが含まれています。出力ファイルの各行には、サポートされていない演算子がリストされます。

列の説明:

  • unsupportedOperator: ネイティブ クエリ実行でサポートされていない演算子の名前。

  • cumulativeCpuMs: オペレータの実行中に消費された CPU ミリ秒数。この値は、アプリケーション内の演算子の相対的な重要度を反映しています。

  • count: アプリケーションでオペレーターが使用された回数。

プロジェクト間で適格性ツールを実行する

このセクションでは、スクリプトを実行して適格性ツールを実行し、複数のプロジェクトのバッチ ワークロードの Spark イベント ファイルを分析する手順について説明します。

スクリプトの要件と制限事項:

  • Linux マシンでスクリプトを実行します。
    • Java バージョン >=11 がデフォルトの Java バージョンとしてインストールされている必要があります。
  • Cloud Logging のログの TTL は 30 日間であるため、30 日以上前に実行されたバッチ ワークロードの Spark イベント ファイルは分析できません。

プロジェクト間で適格性ツールを実行する手順は次のとおりです。

  1. list-batches-and-run-qt.sh スクリプトをダウンロードして、ローカルマシンにコピーします。

  2. スクリプトの権限を変更する。

    chmod +x list-batches-and-run-qt.sh
    
  3. 分析用にスクリプトに渡すプロジェクト入力ファイルのリストを準備します。分析するバッチ ワークロードの Spark イベント ファイルを含むプロジェクトとリージョンごとに、次の形式で 1 行ずつ追加してテキスト ファイルを作成します。

    -r REGION -s START_DATE -e END_DATE -p PROJECT_ID -l LIMIT_MAX_BATCHES -k KEY_PATH
    

    注:

    -r(必須):

    • REGION: プロジェクトのバッチが送信されるリージョン。

    -s(必須): 形式: yyyy-mm-dd。必要に応じて、00:00:00 時間セグメントを追加できます。

    • START_DATE: 開始日以降に作成されたバッチ ワークロードのみが分析されます。バッチは、バッチの作成時間の降順で分析されます。最新のバッチが最初に分析されます。

    -e(省略可): 形式は yyyy-mm-dd です。必要に応じて、00:00:00 時間セグメントを追加できます。

    • END_DATE: 指定すると、終了日より前または終了日に作成されたバッチ ワークロードのみが分析されます。指定しない場合、START_DATE の後に作成されたすべてのバッチが分析されます。バッチは、バッチの作成時間の降順で分析されます。最新のバッチが最初に分析されます。

    -l(省略可):

    • LIMIT_MAX_BATCHES: 分析するバッチの最大数。このオプションは、START-DATEEND-DATE と組み合わせて使用すると、指定した日付の間に作成されたバッチの数を制限して分析できます。

      -l が指定されていない場合、デフォルトの最大 100 バッチが分析されます。

    -k(省略可):

    • KEY_PATH: ワークロード Spark イベント ファイルの Cloud Storage アクセスキーを含むローカルパス。

    入力ファイルの例:

    -r us-central1 -s 2024-08-21 -p project1 -k key1
    -r us-east1 -s 2024-08-21 -e 2024-08-23 -l 50 -p project2 -k key2
    

    注:

    • 行 1: us-central1 リージョンの project1 にある、2024-08-21 00:00:00 AM より後の作成時間の Spark イベント ファイルのうち、最新の 100 個(デフォルト)が分析されます。key1 により、Cloud Storage 内のファイルにアクセスできます。

    • 行 2: us-eastl1 リージョンの project2 にある、2024-08-21 00:00:00 AM より後の作成時間で、2024 年 8 月 23 日午後 11 時 59 分 59 秒より前またはその日の Spark イベント ファイルのうち、最新の 50 件が分析されます。key2 により、Cloud Storage 内のイベント ファイルにアクセスできます。

  4. list-batches-and-run-qt.sh スクリプトを実行します。分析結果は、AppsRecommendedForBoost.tsv ファイルと UnsupportedOperators.tsv ファイルに出力されます。

    ./list-batches-and-run-qt.sh PROJECT_INPUT_FILE_LIST \
        -x MEMORY_ALLOCATED \
        -o CUSTOM_OUTPUT_DIRECTORY_PATH \
        -t PARALLEL_THREADS_TO_RUN
    

    注:

Spark イベント ファイルの場所

Dataproc サーバーレスのバッチ ワークロードの Spark イベント ファイルを検索するには、次のいずれかの手順を実施します。

  1. Cloud Storage でワークロードの spark.eventLog.dir を見つけてダウンロードします。

    1. spark.eventLog.dir が見つからない場合は、spark.eventLog.dir を Cloud Storage のロケーションに設定し、ワークロードを再実行して spark.eventLog.dir をダウンロードします。
  2. バッチジョブに Spark History Server を構成している場合:

    1. Spark History Server に移動し、ワークロードを選択します。
    2. [イベントログ] 列の [ダウンロード] をクリックします。