Spark ワークロードを送信すると、Dataproc Serverless for Spark は、エグゼキュータの数などのワークロード リソースを動的にスケーリングして、ワークロードを効率的に実行できます。Dataproc サーバーレス自動スケーリングはデフォルトの動作であり、Spark 動的リソース割り当てを使用して、ワークロードをスケーリングするかどうか、またその方法とタイミングを決定します。
Dataproc Serverless 自動スケーリング V2
Dataproc Serverless 自動スケーリング バージョン 2(V2)では、デフォルト バージョン 1(V1)に機能と改善を追加して、Dataproc Serverless ワークロードの管理、ワークロードのパフォーマンスの改善、費用の削減を行います。
- 非同期ノードのダウンスケーリング: 自動スケーリング V2 は、V1 の同期ダウンスケーリングを非同期ダウンスケーリングに置き換えます。非同期ダウンスケーリングを使用すると、Dataproc Serverless は、すべてのノードがシャッフル移行を完了するのを待たずに、ワークロード リソースをダウンスケールします。つまり、ゆっくりとスケールダウンするロングテール ノードは、アップスケーリングをブロックしません。
- インテリジェントなスケールダウンノード選択: 自動スケーリング V2 は、V1 のランダムなノード選択を、最初にスケールダウンするのに最適なノードを特定するインテリジェントなアルゴリズムで置き換えます。このアルゴリズムでは、ノードのシャッフル データサイズやアイドル時間などの要素が考慮されます。
- 構成可能な Spark の正常なデコミッションとシャッフル移行の動作: 自動スケーリング V2 では、標準の Spark プロパティを使用して、Spark の正常なデコミッションとシャッフル移行を構成できます。この機能は、カスタマイズした Spark プロパティで移行の互換性を維持するのに役立ちます。
Dataproc Serverless の自動スケーリング機能
特徴 | Dataproc Serverless 自動スケーリング V1 | Dataproc Serverless 自動スケーリング V2 |
ノードのダウンスケーリング | 同期 | 非同期 |
ダウンスケーリングのノード選択 | ランダム | インテリジェント |
Spark の正常なデコミッションおよびシャッフル移行 | 構成不可 | 構成可能 |
Spark の動的割り当てのプロパティ
次の表は、バッチ ワークロードを送信して自動スケーリングを制御する際に設定できる Spark の動的割り当てプロパティの一覧です(Spark プロパティの設定方法をご覧ください)。
プロパティ | 説明 | デフォルト |
---|---|---|
spark.dataproc.scaling.version |
Dataproc Serverless for Spark の自動スケーリング バージョン。バージョン 1 または 2 を指定します(Dataproc Serverless 自動スケーリング V2 をご覧ください)。 |
1 |
spark.dynamicAllocation.enabled |
ワークロードに基づいてエグゼキュータの数をスケールアップまたはスケールダウンする動的リソース割り当てを使用するかどうか。
値を false に設定すると、ワークロードの自動スケーリングが無効になります。デフォルト: true 。 |
true |
spark.dynamicAllocation.initialExecutors |
ワークロードに割り当てられる初期のエグゼキュータ数。ワークロードが開始されると、自動スケーリングによってアクティブなエグゼキュータの数が変化することがあります。最小値は 2 で、最大値は 500 です。 |
2 |
spark.dynamicAllocation.minExecutors |
ワークロードをスケールダウンするエグゼキュータの最小数。最小値は 2 です。 |
2 |
spark.dynamicAllocation.maxExecutors |
ワークロードをスケールアップするエグゼキュータの最大数。最大値は 2000 です。 |
1000 |
spark.dynamicAllocation.executorAllocationRatio |
Spark ワークロードのスケールアップをカスタマイズします。0 ~1 の値を受け入れます。値 1.0 は最大スケールアップ機能を提供し、最大の並列処理を達成するのに役立ちます。値 0.5 では、スケールアップ機能と並列処理が最大値の半分に設定されます。 |
0.3 |
spark.reducer.fetchMigratedShuffle.enabled |
true に設定すると、Spark 動的割り当てのために廃止されたエグゼキュータからフェッチが失敗した後、Spark ドライバからシャッフル出力のロケーションを取得できるようになります。これにより、廃止されたエグゼキュータからライブ エグゼキュータへのシャッフル ブロックの移行によって発生する ExecutorDeadException エラーが減り、FetchFailedException エラーによって発生するステージの再試行が少なくなります(ExecutorDeadException による FetchFailedException をご覧ください)。このプロパティは、Dataproc サーバーレスの Spark ランタイム バージョン 1.1.12 以降と 2.0.20 以降で使用できます。 |
false |
Spark 動的割り当ての指標
Spark バッチ ワークロードは、Spark 動的リソース割り当てに関連する以下の指標を生成します(Spark 指標の詳細については、モニタリングとインストゥルメンテーションをご覧ください)。
指標 | 説明 |
---|---|
maximum-needed |
実行中のタスクと保留中のタスクをすべて達成するために現在の負荷で必要なエグゼキュータの最大数。 |
running |
タスクを実行している稼働中のエグゼキュータ数。 |
Spark 動的割り当ての問題と解決策
ExecutorDeadException による FetchFailedException
原因: Spark 動的割り当てがエグゼキュータをスケールダウンすると、シャッフル ファイルがライブ エグゼキュータに移行します。ただし、エグゼキュータの Spark レデューサ タスクは、レデューサ タスクの開始時に Spark ドライバによって設定されたロケーションからシャッフル出力をフェッチするため、シャッフル ファイルが移行されても、レデューサは引き続き廃止されたエグゼキュータからのシャッフル出力のフェッチを試行できます。これにより、
ExecutorDeadException
とFetchFailedException
エラーが発生します。解決策: Spark バッチ ワークロードの Dataproc サーバーレスを実行するときに、
spark.reducer.fetchMigratedShuffle.enabled
をtrue
に設定して、シャッフル ロケーションの再取得を有効にします(Spark バッチ ワークロードのプロパティの設定をご覧ください)。このプロパティを有効にすると、廃止されたエグゼキュータからのフェッチに失敗した後、レデューサ タスクはドライバからシャッフル出力のロケーションを再取得します。