このドキュメントでは、プレミアム料金階層で実行されている Dataproc サーバーレス バッチ ワークロードでネイティブ クエリ実行を使用する方法について説明します。ネイティブ クエリ実行は、Spark ワークロードを高速化し、費用を削減できる機能です。この機能は Apache Spark API と互換性があり、追加のユーザーコードは必要ありません。
ネイティブ クエリ実行を使用する場合
次のようなシナリオでは、ネイティブ クエリ実行を使用します。
Parquet ファイルと ORC ファイルからデータを読み取る Spark DataFrame API と Spark SQL クエリ。
ネイティブ クエリ実行の適格性評価ツールで推奨されるワークロード。
ネイティブ クエリ実行を使用すべきでない場合
次のシナリオでは、ネイティブ クエリ実行を使用しないでください。バッチ ワークロードのランタイムの短縮が達成されず、ワークロードの失敗やリグレッションが発生する可能性があります。
ネイティブ クエリ実行の適格性評価ツールで推奨されていないワークロード。
Spark RDD、UDF、ML ワークロード
書き込みワークロード
Parquet と ORC 以外のファイル形式
次のデータ型の入力:
Timestamp
TinyInt
Byte
Struct
Array
Map
: ORC と ParquetVarChar
Char
: ORC
リクエスト元による支払いバケットを使用するワークロード
デフォルト以外の Cloud Storage 構成設定。ネイティブ クエリ実行では、オーバーライドされている場合でも多くのデフォルトが使用されます。
プレミアム ティア料金でネイティブ クエリ実行を使用する方法
Dataproc Serverless ネイティブ クエリ実行は、Dataproc Serverless プレミアム料金ティアで実行されるバッチ ワークロードでのみ使用できます。プレミアム ティアの料金はスタンダード ティアの料金よりも高くなりますが、ネイティブ クエリの実行に追加料金はかかりません。詳細については、Dataproc Serverless の料金をご覧ください。
バッチ リソースのプレミアム ティアのリソース割り当てと料金を有効にするには、Spark バッチ ワークロードを送信するときに、次のリソース割り当て階層プロパティを premium
に設定します。
Dataproc サーバーレスのバッチ ワークロードは、Google Cloud コンソール、Google Cloud CLI、または Dataproc API を使用して送信できます。
Console
Google Cloud コンソールの場合:
- [Dataproc バッチ] に移動します。
- [作成] をクリックして [バッチ作成] ページを開きます。
次のフィールドを選択して入力し、ネイティブ クエリ実行用のバッチを構成します。
- コンテナ:
- ランタイム バージョン:
1.2
、2.2
、または利用可能な場合は、より高いmajor.minor
バージョン番号を選択します。サポートされている Dataproc Serverless for Spark ランタイム バージョンをご覧ください。
- ランタイム バージョン:
- エグゼキュータとドライバのティア構成:
- すべてのティア(ドライバのコンピューティング ティア、実行コンピューティング ティア)で
Premium
を選択します。
- すべてのティア(ドライバのコンピューティング ティア、実行コンピューティング ティア)で
- プロパティ: 次の ネイティブ クエリ実行プロパティに、次の
Key
(プロパティ名)とValue
のペアを入力します。キー 値 spark.dataproc.runtimeEngine
先住民
- コンテナ:
他のバッチ ワークロードの設定を入力、選択、確認します。Spark バッチ ワークロードの送信をご覧ください。
[送信] をクリックして、Spark バッチ ワークロードを実行します。
gcloud
ネイティブ クエリ実行のバッチ ワークロードを構成するには、次の gcloud CLI gcloud dataproc batches submit spark
コマンドフラグを設定します。
gcloud dataproc batches submit spark \ --project=PROJECT_ID \ --region=REGION \ --version=VERSION \ --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \ --class=org.apache.spark.examples.SparkPi \ --properties=spark.dataproc.runtimeEngine=native,spark.dataproc.driver.compute.tier=premium,spark.dataproc.executor.compute.tier=premium \ OTHER_FLAGS_AS_NEEDED
注:
- PROJECT_ID: Google Cloud プロジェクト ID。 プロジェクト ID は、Google Cloud コンソールのダッシュボードの [プロジェクト情報] セクションに表示されます。
- REGION: ワークロードを実行できる利用可能な Compute Engine リージョン。
- VERSION:
1.2
、2.2
、または利用可能な場合は、より高いmajor.minor
バージョン番号を指定します。サポートされている Dataproc Serverless for Spark ランタイム バージョンをご覧ください。 - OTHER_FLAGS_AS_NEEDED: Spark バッチ ワークロードを送信するをご覧ください。
API
ネイティブ クエリ実行のバッチ ワークロードを構成するには、次の Dataproc API フィールドを設定します。
- RuntimeConfig.version:
1.2
、2.2
、または利用可能な場合は、より高いmajor.minor
バージョン番号を指定します。サポートされている Dataproc Serverless for Spark ランタイム バージョンをご覧ください。 RuntimeConfig.properties:次のネイティブ クエリ実行プロパティを設定します。
"spark.dataproc.runtimeEngine":"native" "spark.dataproc.driver.compute.tier":"premium" "spark.dataproc.executor.compute".tier:"premium"
注:
- 他のバッチ ワークロード API フィールドを設定する方法については、Spark バッチ ワークロードを送信するをご覧ください。
制限事項
次のシナリオでネイティブ クエリ実行を有効にすると、例外がスローされたり、Spark の互換性の問題が発生したり、ワークロードがデフォルトの Spark エンジンにフォールバックする可能性があります。
フォールバック
ネイティブ クエリ実行では、実行時にワークロードが Spark 実行エンジンにフォールバックし、リグレッションまたは障害が発生する可能性があります。
ANSI: ANSI モードが有効になっている場合、実行は Spark にフォールバックします。
大文字と小文字を区別するモード: ネイティブ クエリ実行は、Spark のデフォルトの大文字と小文字を区別しないモードのみをサポートしています。大文字と小文字を区別するモードが有効になっていると、誤った結果が返される可能性があります。
RegExp functions
: ネイティブ クエリ実行は、RE2
に基づいてrlike
、regexp_extract
などのregexp
関数を実装します。Spark では、java.util.regex
に基づいています。- ルックアップ: ネイティブ クエリ実行は、RE2 のルックアップまたはルックバック パターンをサポートしていません。
java.util.regex
とは異なり、RE2 では、パターン「\s」で空白文字を照合する場合、\v(\x0b)を空白文字として扱いません。
パーティション分割テーブルのスキャン: ネイティブ クエリ実行は、パスにパーティション情報が含まれている場合にのみ、パーティション分割テーブルのスキャンをサポートします。それ以外の場合、ワークロードは Spark 実行エンジンにフォールバックします。
互換性のない動作
次の場合にネイティブ クエリ実行を使用すると、互換性のない動作や誤った結果が発生する可能性があります。
JSON 関数: ネイティブ クエリ実行は、一重引用符ではなく二重引用符で囲まれた文字列をサポートしています。単一引用符を使用すると、正しくない結果が返されます。
get_json_object
関数でパスに「*」を使用すると、NULL
が返されます。Parquet 読み取り構成:
- ネイティブ クエリ実行では、
true
に設定されている場合でも、spark.files.ignoreCorruptFiles
はデフォルトのfalse
値に設定されているものとして扱われます。 - ネイティブ クエリ実行は
spark.sql.parquet.datetimeRebaseModeInRead
を無視し、Parquet ファイルの内容のみを返します。従来のハイブリッド(ユリウス グレゴリオ)暦とプロレプティック グレゴリオ暦の違いは考慮されません。Spark の結果は異なる場合があります。
- ネイティブ クエリ実行では、
NaN
: サポートされていません。数値比較でNaN
を使用する場合など、予期しない結果が生じる可能性があります。Spark 列形式の読み取り: Spark 列形式ベクトルがネイティブ クエリ実行と互換性がないため、致命的なエラーが発生する可能性があります。
スピンアウト: シャッフル パーティションが大量に設定されている場合、ディスクへのスピンアウト機能によって
OutOfMemoryException
がトリガーされることがあります。この場合は、パーティションの数を減らすことで、この例外を排除できます。
ネイティブ クエリ実行ワークロードのチューニング
Dataproc Serverless ネイティブ クエリ実行は、次のプロパティを使用してさらに調整できます。
バッチ プロパティ | 使用する状況 |
---|---|
spark.driver.memory spark.driver.memoryOverhead |
To tune the memory provided to spark driver process |
spark.executor.memory spark.executor.memoryOverhead spark.memory.offHeap.size |
To tune the memory provided to onheap/offheap memory of executor process |
spark.dataproc.driver.disk.tier spark.dataproc.driver.disk.size |
To configure premium disk tier and size for driver |
spark.dataproc.executor.disk.tier spark.dataproc.executor.disk.size |
To configure premium disk tier and size for executor |
ネイティブ クエリ実行バッチ ワークロードのプロパティ
spark.dataproc.runtimeEngine=native
(必須): デフォルトのspark
ランタイム エンジンをオーバーライドするには、ワークロード ランタイム エンジンをnative
に設定する必要があります。version
(必須): ワークロードで Spark ランタイム バージョン 1.2.26 以降、2.2.26 以降、またはそれ以降のメジャー ランタイム バージョンを使用する必要があります。プレミアム コンピューティング階層(必須):
spark.dataproc.spark.driver.compute.tier
プロパティとspark.dataproc.executor.compute.tier
プロパティはpremium
に設定する必要があります。プレミアム ディスク階層(省略可): プレミアム ディスク階層では、行ベースのシャッフルではなく列ベースのシャッフルを使用して、パフォーマンスを向上させます。シャッフル I/O スループットを改善するには、シャッフル ファイルに対応できる十分なディスクサイズのドライバとエグゼキュータのプレミアム ディスク階層を使用します。
メモリ(省略可): オフヒープ メモリ(
spark.memory.offHeap.size
)とオンヒープ メモリ(spark.executor.memory
)の両方を構成せずにネイティブ クエリ実行エンジンを構成した場合、ネイティブ クエリ実行エンジンはデフォルトの4g
量のメモリを取り、オフヒープ メモリとオンヒープ メモリに6:1
の比率で分割します。ネイティブ クエリ実行の使用時にメモリを構成する場合は、次のいずれかの方法で構成できます。
指定された値でオフヒープ メモリのみ(
spark.memory.offHeap.size
)を構成します。ネイティブ クエリ実行では、指定された値がオフヒープ メモリとして使用され、オフヒープ メモリ値の1/7th
がオンヒープ メモリとして使用されます。オンヒープメモリ(
spark.executor.memory
)とオフヒープメモリ(spark.memory.offHeap.size
)の両方を構成します。オフヒープメモリに割り当てる量は、オンヒープメモリに割り当てる量よりも大きくする必要があります。推奨事項: オフヒープのメモリとオンヒープのメモリを 6:1 の比率で割り当てます。
値の例:
ネイティブ クエリ実行なしのメモリ設定 ネイティブ クエリ実行での推奨メモリ設定 spark.executor.memory
spark.memory.offHeap.size
spark.executor.memory
7g 6g 1g 14g 12g 2g 28g 24g 4g 56g 48g 8g
ネイティブ クエリ実行の適格性確認ツール
Dataproc ネイティブ クエリ実行の適格性ツール run_qualification_tool.sh
を実行して、ネイティブ クエリ実行で実行時間を短縮できるワークロードを特定できます。このツールは、バッチ ワークロード アプリケーションによって生成された Spark イベントファイルを分析し、ネイティブ クエリ実行で各ワークロード アプリケーションが得られる潜在的なランタイム削減を見積もります。
適格性ツールを実行する
次の手順で、Dataproc Serverless バッチ ワークロード イベント ファイルに対してツールを実行します。
1. run_qualification_tool.sh
を、分析する Spark イベント ファイルを含むローカル ディレクトリにコピーします。
適格性ツールを実行して、スクリプト ディレクトリに含まれる 1 つのイベント ファイルまたはイベント ファイルのセットを分析します。
./run_qualification_tool.sh -f EVENT_FILE_PATH/EVENT_FILE_NAME \ -o CUSTOM_OUTPUT_DIRECTORY_PATH \ -k SERVICE_ACCOUNT_KEY \ -x MEMORY_ALLOCATEDg \ -t PARALLEL_THREADS_TO_RUN
フラグと値:
-f
(必須): Spark イベント ファイルの場所で、Spark ワークロード イベント ファイルを見つけます。EVENT_FILE_PATH(EVENT_FILE_NAME が指定されていない場合必須): 分析するイベント ファイルのパス。指定しない場合、イベント ファイルのパスには現在のディレクトリが使用されます。
EVENT_FILE_NAME(EVENT_FILE_PATH が指定されていない場合必須): 分析するイベント ファイルの名前。指定しない場合、
EVENT_FILE_PATH
で再帰的に検出されたイベント ファイルが分析されます。
-o
(省略可): 指定しない場合、ツールは現在のディレクトリの下に既存のoutput
ディレクトリを作成するか、既存のoutput
ディレクトリを使用して出力ファイルを配置します。- CUSTOM_OUTPUT_DIRECTORY_PATH: 出力ファイルの出力ディレクトリ パス。
-k
(省略可):- SERVICE_ACCOUNT_KEY: EVENT_FILE_PATH にアクセスする必要がある場合の、JSON 形式のサービス アカウント キー。
-x
(省略可):- MEMORY_ALLOCATED: ツールに割り当てるメモリ(ギガバイト単位)。デフォルトでは、このツールはシステムで使用可能な空きメモリの 80% と、使用可能なすべてのマシンコアを使用します。
-t
(省略可):- PARALLEL_THREADS_TO_RUN: N=ツールが実行する並列スレッドの数。デフォルトでは、ツールはすべてのコアを実行します。
コマンドの使用例:
./run_qualification_tool.sh -f gs://dataproc-temp-us-east1-9779/spark-job-history \ -o perfboost-output -k /keys/event-file-key -x 34g -t 5
この例では、適格性ツールが
gs://dataproc-temp-us-east1-9779/spark-job-history
ディレクトリを走査し、このディレクトリとそのサブディレクトリに含まれる Spark イベント ファイルを分析します。ディレクトリへのアクセスは/keys/event-file-key
によって提供されます。このツールは、実行に34 GB memory
を使用し、5
並列スレッドを実行します。
適格性評価ツールの出力ファイル
分析が完了すると、適格性ツールは、現在のディレクトリの perfboost-output
ディレクトリに次の出力ファイルを配置します。
AppsRecommendedForBoost.tsv
: ネイティブ クエリ実行で使用することをおすすめするアプリケーションのタブ区切りリスト。UnsupportedOperators.tsv
: ネイティブ クエリ実行での使用が推奨されないアプリケーションのタブ区切りリスト。
AppsRecommendedForBoost.tsv
出力ファイル
次の表に、サンプルの AppsRecommendedForBoost.tsv
出力ファイルの内容を示します。分析されたアプリケーションごとに 1 行が含まれます。
AppsRecommendedForBoost.tsv
出力ファイルの例:
applicationId | applicationName | rddPercentage | unsupportedSqlPercentage | totalTaskTime | supportedTaskTime | supportedSqlPercentage | recommendedForBoost | expectedRuntimeReduction |
---|---|---|---|---|---|---|---|---|
app-2024081/batches/083f6196248043938-000 | projects/example.com:dev/locations/us-central1 6b4d6cae140f883c0 11c8e |
0.00% | 0.00% | 548924253 | 548924253 | 100.00% | TRUE | 30.00% |
app-2024081/batches/60381cab738021457-000 | projects/example.com:dev/locations/us-central1 474113a1462b426bf b3aeb |
0.00% | 0.00% | 514401703 | 514401703 | 100.00% | TRUE | 30.00% |
列の説明:
applicationId
: Spark アプリケーションのApplicationID
。これを使用して、対応するバッチ ワークロードを特定します。applicationName
: Spark アプリケーションの名前。rddPercentage
: アプリケーション内の RDD オペレーションの割合。RDD オペレーションはネイティブ クエリ実行ではサポートされていません。unsupportedSqlPercentage:
ネイティブ クエリ実行でサポートされていない SQL オペレーションの割合。totalTaskTime
: アプリケーションの実行中に実行されたすべてのタスクの累積タスク時間。supportedTaskTime
: ネイティブ クエリ実行でサポートされるタスクの合計時間。
次の列には、ネイティブ クエリ実行がバッチ ワークロードにメリットがあるかどうかを判断するのに役立つ重要な情報が表示されます。
supportedSqlPercentage
: ネイティブ クエリ実行でサポートされる SQL オペレーションの割合。この割合が高いほど、ネイティブ クエリ実行でアプリケーションを実行することで、ランタイムを短縮できます。recommendedForBoost
:TRUE
の場合は、ネイティブ クエリ実行でアプリケーションを実行することをおすすめします。recommendedForBoost
がFALSE
の場合、バッチ ワークロードでネイティブ クエリ実行を使用しないでください。expectedRuntimeReduction
: ネイティブ クエリ実行でアプリケーションを実行した場合に、アプリケーション ランタイムが削減される割合(%)。
UnsupportedOperators.tsv
出力ファイル。
UnsupportedOperators.tsv
出力ファイルには、ネイティブ クエリ実行でサポートされていないワークロード アプリケーションで使用される演算子のリストが含まれています。出力ファイルの各行には、サポートされていない演算子がリストされます。
列の説明:
unsupportedOperator
: ネイティブ クエリ実行でサポートされていない演算子の名前。cumulativeCpuMs
: オペレータの実行中に消費された CPU ミリ秒数。この値は、アプリケーション内の演算子の相対的な重要度を反映しています。count
: アプリケーションでオペレーターが使用された回数。
プロジェクト間で適格性ツールを実行する
このセクションでは、スクリプトを実行して適格性ツールを実行し、複数のプロジェクトのバッチ ワークロードの Spark イベント ファイルを分析する手順について説明します。
スクリプトの要件と制限事項:
- Linux マシンでスクリプトを実行します。
- Java バージョン
>=11
がデフォルトの Java バージョンとしてインストールされている必要があります。
- Java バージョン
- Cloud Logging のログの TTL は 30 日間であるため、30 日以上前に実行されたバッチ ワークロードの Spark イベント ファイルは分析できません。
プロジェクト間で適格性ツールを実行する手順は次のとおりです。
list-batches-and-run-qt.sh
スクリプトをダウンロードして、ローカルマシンにコピーします。スクリプトの権限を変更する。
chmod +x list-batches-and-run-qt.sh
分析用にスクリプトに渡すプロジェクト入力ファイルのリストを準備します。分析するバッチ ワークロードの Spark イベント ファイルを含むプロジェクトとリージョンごとに、次の形式で 1 行ずつ追加してテキスト ファイルを作成します。
-r REGION -s START_DATE -e END_DATE -p PROJECT_ID -l LIMIT_MAX_BATCHES -k KEY_PATH
注:
-r
(必須):- REGION: プロジェクトのバッチが送信されるリージョン。
-s
(必須): 形式:yyyy-mm-dd
。必要に応じて、00:00:00
時間セグメントを追加できます。- START_DATE: 開始日以降に作成されたバッチ ワークロードのみが分析されます。バッチは、バッチの作成時間の降順で分析されます。最新のバッチが最初に分析されます。
-e
(省略可): 形式はyyyy-mm-dd
です。必要に応じて、00:00:00
時間セグメントを追加できます。- END_DATE: 指定すると、終了日より前または終了日に作成されたバッチ ワークロードのみが分析されます。指定しない場合、START_DATE の後に作成されたすべてのバッチが分析されます。バッチは、バッチの作成時間の降順で分析されます。最新のバッチが最初に分析されます。
-l
(省略可):LIMIT_MAX_BATCHES: 分析するバッチの最大数。このオプションは、START-DATE と END-DATE と組み合わせて使用すると、指定した日付の間に作成されたバッチの数を制限して分析できます。
-l が指定されていない場合、デフォルトの最大
100
バッチが分析されます。
-k
(省略可):- KEY_PATH: ワークロード Spark イベント ファイルの Cloud Storage アクセスキーを含むローカルパス。
入力ファイルの例:
-r us-central1 -s 2024-08-21 -p project1 -k key1 -r us-east1 -s 2024-08-21 -e 2024-08-23 -l 50 -p project2 -k key2
注:
行 1:
us-central1
リージョンのproject1
にある、2024-08-21 00:00:00 AM
より後の作成時間の Spark イベント ファイルのうち、最新の 100 個(デフォルト)が分析されます。key1
により、Cloud Storage 内のファイルにアクセスできます。行 2:
us-eastl1
リージョンのproject2
にある、2024-08-21 00:00:00 AM
より後の作成時間で、2024 年 8 月 23 日午後 11 時 59 分 59 秒より前またはその日の Spark イベント ファイルのうち、最新の 50 件が分析されます。key2
により、Cloud Storage 内のイベント ファイルにアクセスできます。
list-batches-and-run-qt.sh
スクリプトを実行します。分析結果は、AppsRecommendedForBoost.tsv
ファイルとUnsupportedOperators.tsv
ファイルに出力されます。./list-batches-and-run-qt.sh PROJECT_INPUT_FILE_LIST \ -x MEMORY_ALLOCATED \ -o CUSTOM_OUTPUT_DIRECTORY_PATH \ -t PARALLEL_THREADS_TO_RUN
注:
PROJECT_INPUT_FILE_LIST: このセクションの 3 の手順をご覧ください。
-x
、-o
、-t
: 適格性確認ツールを実行するをご覧ください。
Spark イベント ファイルの場所
Dataproc サーバーレスのバッチ ワークロードの Spark イベント ファイルを検索するには、次のいずれかの手順を実施します。
Cloud Storage でワークロードの
spark.eventLog.dir
を見つけてダウンロードします。spark.eventLog.dir
が見つからない場合は、spark.eventLog.dir
を Cloud Storage のロケーションに設定し、ワークロードを再実行してspark.eventLog.dir
をダウンロードします。
バッチジョブに Spark History Server を構成している場合:
- Spark History Server に移動し、ワークロードを選択します。
- [イベントログ] 列の [ダウンロード] をクリックします。