ネイティブ クエリ実行で Dataproc Serverless バッチ ワークロードとインタラクティブ セッションを高速化する

ネイティブ クエリ実行の一般提供(GA)を発表します。ネイティブ クエリ実行は、Spark Dataframe API、Parquet ファイルと ORC ファイルからデータを読み取る Spark SQL クエリ、ネイティブ クエリ実行の適格性評価ツールで推奨されるワークロードをサポートしています。その他のユースケースについてご不明な点がございましたら、dataproc-pms@google.com までお問い合わせください。

このドキュメントでは、プレミアム料金階層で実行されている Dataproc サーバーレス バッチ ワークロードとインタラクティブ セッションを有効にして、ネイティブ クエリ実行を使用する方法について説明します。

プレミアム ティア料金でネイティブ クエリ実行を使用する方法

Dataproc Serverless ネイティブ クエリ実行は、Dataproc Serverless プレミアム料金階層で実行されるバッチ ワークロードとインタラクティブ セッションでのみ使用できます。プレミアム階層の料金はスタンダード階層の料金よりも高くなりますが、ネイティブ クエリ実行に追加料金はかかりません。詳細については、Dataproc Serverless の料金をご覧ください。

バッチ セッション リソースとインタラクティブ セッション リソースのプレミアム ティアのリソース割り当てと料金を有効にするには、Spark バッチ ワークロードまたはインタラクティブ セッションを送信するときに、次のリソース割り当て階層プロパティpremium に設定します。

ネイティブ クエリ実行を構成するには、バッチ ワークロード、インタラクティブ セッション、またはセッション テンプレートにネイティブ クエリ実行プロパティを設定し、ワークロードを送信するか、ノートブックでインタラクティブ セッションを実行します。

コンソール

  1. Google Cloud コンソールの場合:

    1. [Dataproc バッチ] に移動します。
    2. [作成] をクリックして [バッチ作成] ページを開きます。
  2. 次のフィールドを選択して入力し、ネイティブ クエリ実行のバッチを構成します。

    • コンテナ:
    • エグゼキュータとドライバのティア構成:
      • すべてのティア(ドライバのコンピューティング ティア実行コンピューティング ティア)で Premium を選択します。
    • プロパティ: 次の ネイティブ クエリ実行プロパティに、次の Key(プロパティ名)と Value のペアを入力します。
      キー
      spark.dataproc.runtimeEngine 先住民
  3. 他のバッチ ワークロードの設定を入力、選択、確認します。Spark バッチ ワークロードの送信をご覧ください。

  4. [送信] をクリックして、Spark バッチ ワークロードを実行します。

gcloud

ネイティブ クエリ実行のバッチ ワークロードを構成するには、次の gcloud CLI gcloud dataproc batches submit spark コマンドフラグを設定します。

gcloud dataproc batches submit spark \
    --project=PROJECT_ID \
    --region=REGION \
    --version=VERSION \
    --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
    --class=org.apache.spark.examples.SparkPi \
    --properties=spark.dataproc.runtimeEngine=native,spark.dataproc.driver.compute.tier=premium,spark.dataproc.executor.compute.tier=premium \
    OTHER_FLAGS_AS_NEEDED

注:

API

ネイティブ クエリ実行のバッチ ワークロードを構成するには、次の Dataproc API フィールドを設定します。

ネイティブ クエリ実行ワークロードのチューニング

Dataproc Serverless ネイティブ クエリ実行は、次のプロパティを使用してさらに調整できます。

バッチ プロパティ 使用する状況
spark.driver.memory
spark.driver.memoryOverhead
To tune the memory provided to spark driver process
spark.executor.memory
spark.executor.memoryOverhead
spark.memory.offHeap.size
To tune the memory provided to onheap/offheap memory of executor process
spark.dataproc.driver.disk.tier
spark.dataproc.driver.disk.size
To configure premium disk tier and size for driver
spark.dataproc.executor.disk.tier
spark.dataproc.executor.disk.size
To configure premium disk tier and size for executor

ネイティブ クエリ実行のプロパティ

  • spark.dataproc.runtimeEngine=native必須): デフォルトの spark ランタイム エンジンをオーバーライドするには、ワークロード ランタイム エンジンを native に設定する必要があります。

  • version必須): ワークロードで Spark ランタイム バージョン 1.2.26 以降、2.2.26 以降、またはそれ以降のメジャー ランタイム バージョンを使用する必要があります。

  • プレミアム コンピューティング階層(必須): spark.dataproc.spark.driver.compute.tier プロパティと spark.dataproc.executor.compute.tier プロパティは premium に設定する必要があります。

  • プレミアム ディスク階層(省略可、推奨): プレミアム ディスク階層では、行ベースのシャッフルではなく列ベースのシャッフルを使用してパフォーマンスを向上させます。シャッフル I/O スループットを改善するには、シャッフル ファイルに対応できる十分なディスクサイズのドライバとエグゼキュータのプレミアム ディスク階層を使用します。

  • メモリ(省略可): オフヒープ メモリ(spark.memory.offHeap.size)とオンヒープ メモリ(spark.executor.memory)の両方を構成せずにネイティブ クエリ実行エンジンを構成した場合、ネイティブ クエリ実行エンジンはデフォルトの 4g 量のメモリを取り、オフヒープ メモリとオンヒープ メモリに 6:1 の比率で分割します。

    ネイティブ クエリ実行の使用時にメモリを構成する場合は、次のいずれかの方法で構成できます。

    • 指定された値でオフヒープ メモリのみ(spark.memory.offHeap.size)を構成します。ネイティブ クエリ実行では、指定された値がオフヒープ メモリとして使用され、オフヒープ メモリ値の 1/7th がオンヒープ メモリとして追加で割り振られます。

    • オンヒープメモリ(spark.executor.memory)とオフヒープメモリ(spark.memory.offHeap.size)の両方を構成します。オフヒープメモリに割り当てる量は、オンヒープメモリに割り当てる量よりも大きくする必要があります。推奨事項: オフヒープのメモリとオンヒープのメモリを 6:1 の比率で割り当てます。

    値の例:

    ネイティブ クエリ実行なしのメモリ設定 ネイティブ クエリ実行での推奨メモリ設定
    spark.executor.memory spark.memory.offHeap.size spark.executor.memory
    7g 6g 1g
    14g 12g 2g
    28g 24g 4g
    56g 48g 8g

ネイティブ クエリ実行の適格性確認ツール

Dataproc ネイティブ クエリ実行の適格性ツール run_qualification_tool.sh を実行して、ネイティブ クエリ実行で実行時間を短縮できるワークロードを特定できます。このツールは、バッチ ワークロード アプリケーションによって生成された Spark イベントファイルを分析し、ネイティブ クエリ実行によって各ワークロード アプリケーションが得られる可能性があるランタイムの削減を見積もります。

適格性ツールを実行する

次の手順で、Dataproc Serverless バッチ ワークロード イベント ファイルに対してツールを実行します。

1. run_qualification_tool.sh を、分析する Spark イベント ファイルを含むローカル ディレクトリにコピーします。

  1. 適格性ツールを実行して、スクリプト ディレクトリに含まれる 1 つのイベント ファイルまたはイベント ファイルのセットを分析します。

    ./run_qualification_tool.sh -f EVENT_FILE_PATH/EVENT_FILE_NAME \
        -o CUSTOM_OUTPUT_DIRECTORY_PATH \
        -k SERVICE_ACCOUNT_KEY  \
        -x MEMORY_ALLOCATEDg  \
        -t PARALLEL_THREADS_TO_RUN
    

    フラグと値:

    -f(必須): Spark イベント ファイルの場所で、Spark ワークロード イベント ファイルを見つけます。

    • EVENT_FILE_PATHEVENT_FILE_NAME が指定されていない場合必須): 分析するイベント ファイルのパス。指定しない場合、イベント ファイルのパスには現在のディレクトリが使用されます。

    • EVENT_FILE_NAMEEVENT_FILE_PATH が指定されていない場合必須): 分析するイベント ファイルの名前。指定しない場合、EVENT_FILE_PATH で再帰的に検出されたイベント ファイルが分析されます。

    -o(省略可): 指定しない場合、ツールは現在のディレクトリの下に既存の output ディレクトリを作成するか、既存の output ディレクトリを使用して出力ファイルを配置します。

    • CUSTOM_OUTPUT_DIRECTORY_PATH: 出力ファイルの出力ディレクトリ パス。

    -k(省略可):

    -x(省略可):

    • MEMORY_ALLOCATED: ツールに割り当てるメモリ(ギガバイト単位)。デフォルトでは、このツールはシステムで使用可能な空きメモリの 80% と、使用可能なすべてのマシンコアを使用します。

    -t(省略可):

    • PARALLEL_THREADS_TO_RUN: N=ツールが実行する並列スレッドの数。デフォルトでは、ツールはすべてのコアを実行します。

    コマンドの使用例:

    ./run_qualification_tool.sh -f gs://dataproc-temp-us-east1-9779/spark-job-history \
        -o perfboost-output -k /keys/event-file-key -x 34g -t 5
    

    この例では、適格性ツールは gs://dataproc-temp-us-east1-9779/spark-job-history ディレクトリを走査し、このディレクトリとそのサブディレクトリに含まれる Spark イベント ファイルを分析します。ディレクトリへのアクセスは /keys/event-file-key によって提供されます。このツールは、実行に 34 GB memory を使用し、5 並列スレッドを実行します。

適格性評価ツールの出力ファイル

分析が完了すると、適格性ツールは、現在のディレクトリの perfboost-output ディレクトリに次の出力ファイルを配置します。

  • AppsRecommendedForBoost.tsv: ネイティブ クエリ実行で使用することをおすすめするアプリケーションのタブ区切りリスト。

  • UnsupportedOperators.tsv: ネイティブ クエリ実行で使用しないことをおすすめするアプリケーションのタブ区切りリスト。

AppsRecommendedForBoost.tsv 出力ファイル

次の表に、サンプルの AppsRecommendedForBoost.tsv 出力ファイルの内容を示します。分析されたアプリケーションごとに 1 行が含まれます。

AppsRecommendedForBoost.tsv 出力ファイルの例:

applicationId applicationName rddPercentage unsupportedSqlPercentage totalTaskTime supportedTaskTime supportedSqlPercentage recommendedForBoost expectedRuntimeReduction
app-2024081/batches/083f6196248043938-000 projects/example.com:dev/locations/us-central1
6b4d6cae140f883c0
11c8e
0.00% 0.00% 548924253 548924253 100.00% TRUE 30.00%
app-2024081/batches/60381cab738021457-000 projects/example.com:dev/locations/us-central1
474113a1462b426bf
b3aeb
0.00% 0.00% 514401703 514401703 100.00% TRUE 30.00%

列の説明:

  • applicationId: Spark アプリケーションの ApplicationID。これを使用して、対応するバッチ ワークロードを特定します。

  • applicationName: Spark アプリケーションの名前。

  • rddPercentage: アプリケーション内の RDD オペレーションの割合。RDD オペレーションはネイティブ クエリ実行ではサポートされていません。

  • unsupportedSqlPercentage: ネイティブ クエリ実行でサポートされていない SQL オペレーションの割合。

  • totalTaskTime: アプリケーションの実行中に実行されたすべてのタスクの累積タスク時間。

  • supportedTaskTime: ネイティブ クエリ実行でサポートされるタスクの合計時間。

次の列には、ネイティブ クエリ実行がバッチ ワークロードにメリットがあるかどうかを判断するのに役立つ重要な情報が表示されます。

  • supportedSqlPercentage: ネイティブ クエリ実行でサポートされる SQL オペレーションの割合。この割合が高いほど、ネイティブ クエリ実行でアプリケーションを実行することで、ランタイムを短縮できます。

  • recommendedForBoost: TRUE の場合は、ネイティブ クエリ実行でアプリケーションを実行することをおすすめします。recommendedForBoostFALSE の場合、バッチ ワークロードでネイティブ クエリ実行を使用しないでください。

  • expectedRuntimeReduction: ネイティブ クエリ実行でアプリケーションを実行した場合に、アプリケーション ランタイムが短縮される割合(%)。

UnsupportedOperators.tsv 出力ファイル。

UnsupportedOperators.tsv 出力ファイルには、ネイティブ クエリ実行でサポートされていないワークロード アプリケーションで使用される演算子のリストが含まれています。出力ファイルの各行には、サポートされていない演算子がリストされます。

列の説明:

  • unsupportedOperator: ネイティブ クエリ実行でサポートされていない演算子の名前。

  • cumulativeCpuMs: オペレータの実行中に消費された CPU ミリ秒数。この値は、アプリケーション内の演算子の相対的な重要度を反映しています。

  • count: アプリケーションでオペレーターが使用された回数。

プロジェクト間で適格性ツールを実行する

このセクションでは、スクリプトを実行して適格性ツールを実行し、複数のプロジェクトのバッチ ワークロードの Spark イベントファイルを分析する手順について説明します。

スクリプトの要件と制限事項:

  • Linux マシンでスクリプトを実行します。
    • Java バージョン >=11 がデフォルトの Java バージョンとしてインストールされている必要があります。
  • Cloud Logging のログの TTL は 30 日間であるため、30 日以上前に実行されたバッチ ワークロードの Spark イベント ファイルは分析できません。

プロジェクト間で適格性ツールを実行する手順は次のとおりです。

  1. list-batches-and-run-qt.sh スクリプトをダウンロードして、ローカルマシンにコピーします。

  2. スクリプトの権限を変更する。

    chmod +x list-batches-and-run-qt.sh
    
  3. 分析用にスクリプトに渡すプロジェクト入力ファイルのリストを準備します。分析するバッチ ワークロードの Spark イベント ファイルを含むプロジェクトとリージョンごとに、次の形式で 1 行ずつ追加してテキスト ファイルを作成します。

    -r REGION -s START_DATE -e END_DATE -p PROJECT_ID -l LIMIT_MAX_BATCHES -k KEY_PATH
    

    注:

    -r(必須):

    • REGION: プロジェクトのバッチが送信されるリージョン。

    -s(必須): 形式: yyyy-mm-dd。必要に応じて、00:00:00 時間セグメントを追加できます。

    • START_DATE: 開始日以降に作成されたバッチ ワークロードのみが分析されます。バッチは、バッチの作成時間の降順で分析されます。最新のバッチが最初に分析されます。

    -e(省略可): 形式は yyyy-mm-dd です。必要に応じて、00:00:00 時間セグメントを追加できます。

    • END_DATE: これを指定すると、終了日より前または終了日に作成されたバッチ ワークロードのみが分析されます。指定しない場合、START_DATE の後に作成されたすべてのバッチが分析されます。バッチは、バッチの作成時間の降順で分析されます。最新のバッチが最初に分析されます。

    -l(省略可):

    • LIMIT_MAX_BATCHES: 分析するバッチの最大数。このオプションは、START-DATEEND-DATE と組み合わせて使用すると、指定した日付の間に作成されたバッチの数を制限して分析できます。

      -l が指定されていない場合、デフォルトの最大 100 バッチが分析されます。

    -k(省略可):

    • KEY_PATH: ワークロード Spark イベント ファイルの Cloud Storage アクセスキーを含むローカルパス。

    入力ファイルの例:

    -r us-central1 -s 2024-08-21 -p project1 -k key1
    -r us-east1 -s 2024-08-21 -e 2024-08-23 -l 50 -p project2 -k key2
    

    注:

    • 行 1: us-central1 リージョンの project1 にある、2024-08-21 00:00:00 AM より後の作成時間の Spark イベント ファイルのうち、最新の 100 個(デフォルト)が分析されます。key1 により、Cloud Storage 内のファイルにアクセスできます。

    • 行 2: us-eastl1 リージョンの project2 にある、2024-08-21 00:00:00 AM より後の作成時間で、2024 年 8 月 23 日午後 11 時 59 分 59 秒より前またはその日の Spark イベント ファイルが 50 個まで分析されます。key2 により、Cloud Storage 内のイベント ファイルにアクセスできます。

  4. list-batches-and-run-qt.sh スクリプトを実行します。分析結果は、AppsRecommendedForBoost.tsv ファイルと UnsupportedOperators.tsv ファイルに出力されます。

    ./list-batches-and-run-qt.sh PROJECT_INPUT_FILE_LIST \
        -x MEMORY_ALLOCATED \
        -o CUSTOM_OUTPUT_DIRECTORY_PATH \
        -t PARALLEL_THREADS_TO_RUN
    

    注:

Spark イベント ファイルの場所

Dataproc サーバーレスのバッチ ワークロードの Spark イベント ファイルを検索するには、次のいずれかの手順を実施します。

  1. Cloud Storage でワークロードの spark.eventLog.dir を見つけてダウンロードします。

    1. spark.eventLog.dir が見つからない場合は、spark.eventLog.dir を Cloud Storage のロケーションに設定し、ワークロードを再実行して spark.eventLog.dir をダウンロードします。
  2. バッチジョブに Spark History Server を構成している場合:

    1. Spark History Server に移動し、ワークロードを選択します。
    2. [イベントログ] 列の [ダウンロード] をクリックします。

ネイティブ クエリ実行を使用する場合

次のようなシナリオでは、ネイティブ クエリ実行を使用します。

Parquet ファイルと ORC ファイルからデータを読み取る Spark Dataframe API と Spark SQL クエリ。
ネイティブ クエリ実行の適格性評価ツールで推奨されるワークロード。

ネイティブ クエリ実行を使用すべきでない場合

次のシナリオでは、ネイティブ クエリ実行を使用しないでください。ワークロードのランタイムの短縮が達成されず、障害やリグレッションが発生する可能性があります。

  • ネイティブ クエリ実行の適格性ツールで推奨されていないワークロード。
  • Spark RDD、UDF、ML ワークロード
  • 書き込みワークロード
  • Parquet と ORC 以外のファイル形式
  • 次のデータ型の入力:
    • Timestamp, TinyInt, Byte, Struct, Array, Map: ORC と Parquet
    • VarChar, Char: ORC
  • 正規表現を含むクエリ
  • リクエスト元による支払いバケットを使用するワークロード
  • デフォルト以外の Cloud Storage 構成設定。ネイティブ クエリ実行では、オーバーライドされている場合でも多くのデフォルトが使用されます。

制限事項

次のシナリオでネイティブ クエリ実行を有効にすると、例外がスローされたり、Spark の互換性が失われたり、ワークロードがデフォルトの Spark エンジンにフォールバックする可能性があります。

フォールバック

ネイティブ クエリ実行では、実行時にワークロードが Spark 実行エンジンにフォールバックし、リグレッションまたは失敗につながる可能性があります。

  • ANSI: ANSI モードが有効になっている場合、実行は Spark にフォールバックします。

  • 大文字と小文字を区別するモード: ネイティブ クエリ実行は、Spark のデフォルトの大文字と小文字を区別しないモードのみをサポートします。大文字と小文字を区別するモードが有効になっていると、誤った結果が返される可能性があります。

  • パーティション分割テーブル スキャン: ネイティブ クエリ実行は、パスにパーティション情報が含まれている場合にのみ、パーティション分割テーブル スキャンをサポートします。それ以外の場合、ワークロードは Spark 実行エンジンにフォールバックします。

互換性のない動作

次の場合にネイティブ クエリ実行を使用すると、互換性のない動作や誤った結果が発生する可能性があります。

  • JSON 関数: ネイティブ クエリ実行では、一重引用符ではなく二重引用符で囲まれた文字列がサポートされます。単一引用符を使用すると、正しくない結果が返されます。get_json_object 関数でパスに「*」を使用すると、NULL が返されます。

  • Parquet 読み取り構成:

    • ネイティブ クエリ実行では、true に設定されている場合でも、spark.files.ignoreCorruptFiles はデフォルトの false 値に設定されているものとして扱われます。
    • ネイティブ クエリ実行は spark.sql.parquet.datetimeRebaseModeInRead を無視し、Parquet ファイルの内容のみを返します。従来の混合(ユリウス グレゴリオ)暦とプロレプティック グレゴリオ暦の違いは考慮されません。Spark の結果は異なる場合があります。
  • NaN: サポートされていません。数値比較で NaN を使用する場合など、予期しない結果が生じる可能性があります。

  • Spark 列形式の読み取り: Spark 列形式ベクトルがネイティブ クエリ実行と互換性がないため、致命的なエラーが発生する可能性があります。

  • スピンアウト: シャッフル パーティションが大量に設定されている場合、ディスクへのスピンアウト機能によって OutOfMemoryException がトリガーされることがあります。この場合は、パーティションの数を減らすことで、この例外を排除できます。