Dataproc Serverless for Spark の自動スケーリング

Spark ワークロードを送信すると、Dataproc Serverless for Spark は、エグゼキュータの数などのワークロード リソースを動的にスケーリングして、ワークロードを効率的に実行できます。Dataproc サーバーレス自動スケーリングはデフォルトの動作であり、Spark 動的リソース割り当てを使用して、ワークロードをスケーリングするかどうか、またその方法とタイミングを決定します。

Dataproc Serverless 自動スケーリング V2

Dataproc Serverless 自動スケーリング バージョン 2(V2)では、デフォルト バージョン 1(V1)に機能と改善を追加して、Dataproc Serverless ワークロードの管理、ワークロードのパフォーマンスの改善、費用の削減を行います。

  • 非同期ノードのダウンスケーリング: 自動スケーリング V2 は、V1 の同期ダウンスケーリングを非同期ダウンスケーリングに置き換えます。非同期ダウンスケーリングを使用すると、Dataproc Serverless は、すべてのノードがシャッフル移行を完了するのを待たずに、ワークロード リソースをダウンスケールします。つまり、ゆっくりとスケールダウンするロングテール ノードは、アップスケーリングをブロックしません。
  • インテリジェントなスケールダウンノード選択: 自動スケーリング V2 は、V1 のランダムなノード選択を、最初にスケールダウンするのに最適なノードを特定するインテリジェントなアルゴリズムで置き換えます。このアルゴリズムでは、ノードのシャッフル データサイズやアイドル時間などの要素が考慮されます。
  • 構成可能な Spark の正常なデコミッションとシャッフル移行の動作: 自動スケーリング V2 では、標準の Spark プロパティを使用して、Spark の正常なデコミッションとシャッフル移行を構成できます。この機能は、カスタマイズした Spark プロパティで移行の互換性を維持するのに役立ちます。

Dataproc Serverless の自動スケーリング機能

特徴 Dataproc Serverless 自動スケーリング V1 Dataproc Serverless 自動スケーリング V2
ノードのダウンスケーリング 同期 非同期
ダウンスケーリングのノード選択 ランダム インテリジェント
Spark の正常なデコミッションおよびシャッフル移行 構成不可 構成可能

Spark の動的割り当てのプロパティ

次の表は、バッチ ワークロードを送信して自動スケーリングを制御する際に設定できる Spark の動的割り当てプロパティの一覧です(Spark プロパティの設定方法をご覧ください)。

プロパティ 説明 デフォルト
spark.dataproc.scaling.version Dataproc Serverless for Spark の自動スケーリング バージョン。バージョン 1 または 2 を指定します(Dataproc Serverless 自動スケーリング V2 をご覧ください)。 1
spark.dynamicAllocation.enabled ワークロードに基づいてエグゼキュータの数をスケールアップまたはスケールダウンする動的リソース割り当てを使用するかどうか。 値を false に設定すると、ワークロードの自動スケーリングが無効になります。デフォルト: true true
spark.dynamicAllocation.initialExecutors ワークロードに割り当てられる初期のエグゼキュータ数。ワークロードが開始されると、自動スケーリングによってアクティブなエグゼキュータの数が変化することがあります。最小値は 2 で、最大値は 500 です。 2
spark.dynamicAllocation.minExecutors ワークロードをスケールダウンするエグゼキュータの最小数。最小値は 2 です。 2
spark.dynamicAllocation.maxExecutors ワークロードをスケールアップするエグゼキュータの最大数。最大値は 2000 です。 1000
spark.dynamicAllocation.executorAllocationRatio Spark ワークロードのスケールアップをカスタマイズします。01 の値を受け入れます。値 1.0 は最大スケールアップ機能を提供し、最大の並列処理を達成するのに役立ちます。値 0.5 では、スケールアップ機能と並列処理が最大値の半分に設定されます。 0.3
spark.reducer.fetchMigratedShuffle.enabled true に設定すると、Spark 動的割り当てのために廃止されたエグゼキュータからフェッチが失敗した後、Spark ドライバからシャッフル出力のロケーションを取得できるようになります。これにより、廃止されたエグゼキュータからライブ エグゼキュータへのシャッフル ブロックの移行によって発生する ExecutorDeadException エラーが減り、FetchFailedException エラーによって発生するステージの再試行が少なくなります(ExecutorDeadException による FetchFailedException をご覧ください)。このプロパティは、Dataproc サーバーレスの Spark ランタイム バージョン 1.1.12 以降と 2.0.20 以降で使用できます。 false

Spark 動的割り当ての指標

Spark バッチ ワークロードは、Spark 動的リソース割り当てに関連する以下の指標を生成します(Spark 指標の詳細については、モニタリングとインストゥルメンテーションをご覧ください)。

指標 説明
maximum-needed 実行中のタスクと保留中のタスクをすべて達成するために現在の負荷で必要なエグゼキュータの最大数。
running タスクを実行している稼働中のエグゼキュータ数。

Spark 動的割り当ての問題と解決策

  • ExecutorDeadException による FetchFailedException

    原因: Spark 動的割り当てがエグゼキュータをスケールダウンすると、シャッフル ファイルがライブ エグゼキュータに移行します。ただし、エグゼキュータの Spark レデューサ タスクは、レデューサ タスクの開始時に Spark ドライバによって設定されたロケーションからシャッフル出力をフェッチするため、シャッフル ファイルが移行されても、レデューサは引き続き廃止されたエグゼキュータからのシャッフル出力のフェッチを試行できます。これにより、ExecutorDeadExceptionFetchFailedException エラーが発生します。

    解決策: Spark バッチ ワークロードの Dataproc サーバーレスを実行するときに、spark.reducer.fetchMigratedShuffle.enabledtrue に設定して、シャッフル ロケーションの再取得を有効にします(Spark バッチ ワークロードのプロパティの設定をご覧ください)。このプロパティを有効にすると、廃止されたエグゼキュータからのフェッチに失敗した後、レデューサ タスクはドライバからシャッフル出力のロケーションを再取得します。