Spark プロパティをチューニングして Dataproc Serverless ジョブを最適化する
Google Cloud Japan Team
※この投稿は米国時間 2023 年 9 月 7 日に、Google Cloud blog に投稿されたものの抄訳です。
データ サイエンティストやエンジニアは大規模なコンピューティング リソースを必要としていますが、基盤インフラストラクチャを作成および管理するためのノウハウやエンジニアリング チームを求めている場合も少なくありません。Dataproc Serverless は、業界初の Spark 用自動スケーリング サーバーレス プロダクトです。これを使用すると、Spark ワークロード用のインフラストラクチャについて考えなくて済むため、Spark エクスペリエンスが簡素化されます。
Dataproc Serverless は、Spark プロパティを使用して、バッチ ワークロードに割り当てるコンピューティング リソース、メモリリソース、ディスク リソースを決定します。ワークロードの割り当て消費量と費用は、これらのプロパティ設定に左右される可能性があります(詳しくは、Dataproc Serverless の割り当てと Dataproc Serverless の料金をご覧ください)。
このブログ投稿では、これらの Spark プロパティを使用してどのように Spark ジョブの実行時間と費用のサービスレベル契約を満たすことができるかについて説明します。
はじめに
このブログ投稿は、以下のことを前提としています。
読者の手元に Dataproc Serverless で使用可能な Apache Spark jar がある。
読者のプロジェクトに許可されている割り当てが、テストを行うのに十分な量である。
読者が、ジョブに必要な CPU / メモリ / ディスクの量を知らない。
コード最適化の機会はすでにやり尽くした。
GCS ファイル構造 / ファイルタイプの最適化などの他の方法をすでに試した。
Dataproc Serverless と Apache Spark のチューニングは反復的なプロセスです。このガイドでは、Spark プロパティを微調整して費用とパフォーマンスの要件を満たす手順を概説します。この手順の間、ジョブを数回送信する必要があります。
Dataproc Serverless の料金
Dataproc Serverless for Spark の料金は、データ コンピューティング ユニット(DCU)の数と使用されたシャッフル ストレージの量に基づきます。DCU とシャッフル ストレージは秒単位で課金され、最低課金時間は 1 分です。
詳しくは、Dataproc Serverless の料金をご覧ください。
Spark 永続履歴サーバーと GCS のログ出力
実行するジョブのために Spark 永続履歴サーバー(PHS)をセットアップします。これはデバッグやトラブルシューティングに役立ちます。Spark ジョブの実行中に、すべてのエグゼキュータとドライバのログを Cloud Logging で使用できます。また、実行中の Spark アプリケーションを PHS で見ることもできます(PHS > [Incomplete Applications])。エグゼキュータとドライバのログは Cloud Logging で使用できるのに対して、PHS には Spark イベントログがあります。このログは、Spark アプリケーションの実行に関する有益な情報(DAG、エグゼキュータ イベントなど)を提供します。これらのログは実際には GCS に保存されていますが、ログにアクセスするための UI は PHS から使用できます(PHS が実行されていない場合、Spark から GCS にログは送信されません)。
さらに、Google Cloud Storage バケットをセットアップし、それらのプロパティを Spark ジョブに渡します。
ジョブのパフォーマンス / 費用の経過の追跡
反復テスト全体を通して、各ジョブのプロパティとそれらが実行時間と費用に及ぼす影響を追跡します。


ジョブの送信
ジョブの実行時間 / DCU 使用量が満足の行くものになったら、そこでテストを終了します。これは、Dataproc Serverless バッチ コンソールの [Details] タブで確認できます。


Dataproc Serverless バッチ コンソールの [Monitoring] タブで、ジョブ実行時の maximum-needed 値をモニタリングします。


上記の例では、maximum-needed = 36.5 ですが、このジョブにはエグゼキュータが 1 つしか割り当てられていませんでした。
maximum-needed 値は、(実行中のタスクの数 + 保留中のタスクの数) ÷ エグゼキュータあたりのタスク数で算出されます。Dataproc Serverless は、ジョブ送信時に渡された自動スケーリング関連のプロパティに基づいてこの需要を満たそうとします。これらのプロパティについて、次のセクションで説明します。
Spark プロパティのファインチューニング
割り当て済みジョブリソース
わかりやすくするために、各種ワークロード用に以下の 3 通りの構成例を用意しました。各構成でテストを行い、どの構成がジョブに最も適しているかを判断します。各 Spark プロパティの詳細とデフォルト値については、Spark プロパティ | Dataproc Serverless のドキュメント | Google Cloud をご覧ください。






自動スケーリング
必要なリソースのみを使用する自動スケーリングは費用削減に役立ちますが、ジョブの実行時間と安定性に影響を与える場合があります。自動スケーリングは、実行時間またはジョブの安定性がニーズを満たしていないと確信できるまでデフォルトのままにしておくことをおすすめします。
以下に説明するいくつかのプロパティは、スケールの範囲と比率に影響します。自動スケーリングに関連するプロパティの詳細とデフォルト値については、Dataproc Serverless for Spark の自動スケーリングをご覧ください。


範囲
Dataproc Serverless ジョブに使用するエグゼキュータの最小数と最大数を構成できます。初期数を設定することもできます(デフォルトは 2)。エグゼキュータ インスタンスの初期数を適切に設定すると、実行時間が短縮されます(自動スケーリングに費やす時間が少なくなります)。エグゼキュータの最大数を設定すると、費用が抑えられます。


比率
自動スケーリングする際、Dataproc Serverless ジョブは現在のエグゼキュータの数と maximum-needed((実行中のタスクの数 + 保留中のタスクの数) ÷ エグゼキュータあたりのタスク数)を比較し、平衡状態に達するために現在のエグゼキュータの数を増減します。増減の比率は executorAllocationRatio と decommission.maxRatio によって決まります。
executorAllocationRatio プロパティは、ジョブがどれだけ速くスケールアップするかを決定します。decommission.maxRatio プロパティは、ジョブがどれだけ速くスケールダウンするかを決定します。
シャッフルまたは RDD の移行と同時にあまりにも多くのエグゼキュータをデコミッションすると、シャッフル フェッチのパフォーマンスが著しく低下する可能性があります。したがって、このプロパティは 0.1~0.3 程度の低い値に設定することをおすすめします。これにより、デコミッションによるパフォーマンスの低下は軽減されます。
わかりやすくするために、以下の 3 通りの自動スケーリング比率の構成例を用意しました。






自動スケーリングなし
ジョブに必要なエグゼキュータの数は maximum-needed の値を見るとわかります。また、自動スケーリングによるスケールアップには時間がかかることから、自動スケーリングをオフにするという選択肢もあります。そのためには、エグゼキュータの最小数と最大数を同一にします。こうすると、ジョブの開始から実行中にかけてエグゼキュータ インスタンスの数は変わりません。dynamicAllocation は引き続き有効なので、Dataproc Serverless ジョブ コンソールの [Monitoring] タブを見ることができます。


コンピューティング ティアとディスクティア
Dataproc Serverless ジョブのパフォーマンスを増強するもう一つの方法は、コンピューティング ティアとディスクティアを利用することです。デフォルトでは、ジョブはスタンダード ティアに設定されます。コアあたりのパフォーマンス(コンピューティング)または IOPS とスループット(ディスク)を向上させたい場合は、これらのプロパティをプレミアム ティアに設定することを検討します。ただし、プレミアム ティアに設定するとジョブの課金レートが高くなるので注意してください。


すべてを組み合わせる
ジョブを送信するときに上記のオプションをどのように使用するかを検討します。以下にサンプル事例を示します。
1. ジョブに中程度の割り当て済みジョブリソースと中程度の自動スケーリングが必要な場合
2. ジョブに最大の割り当て済みジョブリソースと高速な自動スケーリングが必要で、初期エグゼキュータの数が 50 と判明している場合
3. エグゼキュータ 20 個という予測可能で一貫したリソース要件を持つ小規模なジョブで、高い IOPS が必要な場合
よくあるエラーの説明
プロジェクトの割り当てを監視してください。自動スケーリングが有効な場合にプロジェクトの割り当て上限に達すると、ジョブが必要なリソースを取得できず、このエラーでジョブが失敗する可能性があります。
ディスク使用量 = number_instances × number_cores × disk_size
CPU 使用量 = number_instances × number_cores
CourseGrainedSchedulerError
このエラーは通常、不十分なリソースに関連します。ドライバコア / メモリまたはエグゼキュータ インスタンスの数を増やしてみてください。
FetchFailedException
spark.reducer.fetchMigratedShuffle.enabled=true
に設定してみてください。詳しくは、Spark 動的割り当ての問題と解決策をご覧ください。
次のステップ
このブログ投稿で説明したように、Dataproc Serverless は Spark プロパティを使用して、バッチ ワークロードに割り当てるコンピューティング リソース、メモリリソース、ディスク リソースを決定します。ワークロードの割り当て消費量と費用は、これらのプロパティ設定に左右される可能性があります(詳しくは、Dataproc Serverless の割り当てと Dataproc Serverless の料金をご覧ください。したがって、これらのプロパティがどのように機能し、互いにどのように相互作用するかを理解することが不可欠です。
Dataproc Serverless を使ってみるには、Serverless Spark ワークショップまたは Dataproc Serverless クイックスタートをご覧ください。
さらに詳しく知りたい場合は、以下の関連するブログ投稿をお読みください。