このドキュメントでは、 Google Cloud Apache Spark 用サーバーレスの自動スケーリングについて説明します。Spark ワークロードを送信すると、Apache Spark 向け Serverless は、エグゼキュータの数などのワークロード リソースを動的にスケーリングして、ワークロードを効率的に実行できます。Serverless for Apache Spark の自動スケーリングはデフォルトの動作であり、Spark 動的リソース割り当てを使用して、ワークロードをスケーリングするかどうか、またその方法とタイミングを決定します。
Apache Spark 向け Serverless 自動スケーリング V2
Apache Spark 用 Serverless 自動スケーリング バージョン 2(V2)では、デフォルト バージョン 1(V1)に機能と改善を追加して、Apache Spark 用 Serverless ワークロードの管理、ワークロードのパフォーマンスの改善、費用の削減を行います。
- 非同期ノードのスケールダウン: 自動スケーリング V2 は、V1 の同期スケールダウンを非同期スケールダウンに置き換えます。非同期のスケールダウンを使用すると、Apache Spark 用 Serverless は、すべてのノードがシャッフル移行を完了するのを待たずに、ワークロード リソースをスケールダウンします。つまり、スケールダウンが遅いロングテール ノードがアップスケーリングをブロックすることはありません。
 - インテリジェントなスケールダウンノード選択: 自動スケーリング V2 は、V1 のランダムなノード選択を、最初にスケールダウンするのに最適なノードを特定するインテリジェントなアルゴリズムで置き換えます。このアルゴリズムでは、ノードのシャッフル データサイズやアイドル時間などの要素が考慮されます。
 - 構成可能な Spark の正常なデコミッションとシャッフル移行の動作: 自動スケーリング V2 では、標準の Spark プロパティを使用して、Spark の正常なデコミッションとシャッフル移行を構成できます。この機能は、カスタマイズされた Spark プロパティとの移行の互換性を維持するのに役立ちます。
 
Apache Spark 用サーバーレスの自動スケーリング機能
| 機能 | Serverless for Apache Spark Autoscaling V1 | Serverless for Apache Spark 自動スケーリング V2 | 
| ノードのダウン スケーリング | 同期 | 非同期 | 
| ダウン スケーリングのノード選択 | ランダム | インテリジェント | 
| Spark の正常なデコミッションとシャッフル移行 | 構成不可 | 構成可能 | 
Spark の動的割り当てのプロパティ
次の表は、バッチ ワークロードを送信して自動スケーリングを制御する際に設定できる Spark の動的割り当てプロパティの一覧です(Spark プロパティの設定方法をご覧ください)。
| プロパティ | 説明 | デフォルト | 
|---|---|---|
spark.dataproc.scaling.version | 
    Serverless for Apache Spark の Spark 自動スケーリング バージョン。バージョン 1 または 2 を指定します(Apache Spark 用サーバーレスの自動スケーリング V2 をご覧ください)。 | 
    1 | 
  
spark.dynamicAllocation.enabled | 
    ワークロードに基づいてエグゼキュータの数をスケールアップまたはスケールダウンする動的リソース割り当てを使用するかどうか。
      値を false に設定すると、ワークロードの自動スケーリングが無効になります。デフォルト: true。 | 
  true | 
  
spark.dynamicAllocation.initialExecutors | 
    ワークロードに割り当てられるエグゼキュータの初期数。ワークロードが開始されると、自動スケーリングによってアクティブなエグゼキュータの数が変化することがあります。最小値は 2 で、最大値は 2000 です。 | 
      2 | 
  
spark.dynamicAllocation.minExecutors | 
    ワークロードをスケールダウンするエグゼキュータの最小数。最小値は 2 です。 | 
    2 | 
  
spark.dynamicAllocation.maxExecutors | 
    ワークロードをスケールアップするエグゼキュータの最大数。最大値は 2000 です。 | 
  1000 | 
  
spark.dynamicAllocation.executorAllocationRatio | 
    Spark ワークロードのスケールアップをカスタマイズします。0~1 の値を受け付けます。値 1.0 は、スケールアップ機能を最大化し、並列処理を最大化するのに役立ちます。値 0.5 では、スケールアップ機能と並列処理が最大値の半分に設定されます。 | 
  0.3 | 
  
spark.reducer.fetchMigratedShuffle.enabled | 
    true に設定すると、Spark 動的割り当てによって廃止されたエグゼキュータからのフェッチが失敗した後、Spark ドライバからシャッフル出力のロケーションをフェッチできるようになります。これにより、廃止されたエグゼキュータからライブ エグゼキュータへのシャッフル ブロックの移行によって発生する ExecutorDeadException エラーが減り、FetchFailedException エラーによって発生するステージの再試行が少なくなります(ExecutorDeadException による FetchFailedException をご覧ください)。このプロパティは、Apache Spark 用サーバーレスの Spark ランタイム バージョン
    1.1.12 以降と 2.0.20 以降で使用できます。 | 
  false | 
  
Spark 動的割り当ての指標
Spark バッチ ワークロードは、Spark の動的リソース割り当てに関連する次の指標を生成します(Spark 指標の詳細については、モニタリングと計測をご覧ください)。
| 指標 | 説明 | 
|---|---|
maximum-needed | 
    実行中のタスクと保留中のタスクをすべて達成するために現在の負荷で必要なエグゼキュータの最大数。 | 
running | 
    タスクを実行している稼働中のエグゼキュータ数。 | 
Spark 動的割り当ての問題と解決策
ExecutorDeadException による FetchFailedException
原因: Spark 動的割り当てがエグゼキュータをスケールダウンすると、シャッフル ファイルがライブ エグゼキュータに移行します。ただし、エグゼキュータの Spark レデューサ タスクは、レデューサ タスクの開始時に Spark ドライバによって設定されたロケーションからシャッフル出力をフェッチするため、シャッフル ファイルが移行されても、レデューサは引き続き廃止されたエグゼキュータからのシャッフル出力のフェッチを試行できます。これにより、
ExecutorDeadExceptionとFetchFailedExceptionエラーが発生します。解決策: Apache Spark バッチ ワークロードのサーバーレスを実行するときに、
spark.reducer.fetchMigratedShuffle.enabledをtrueに設定して、シャッフル ロケーションの再取得を有効にします(Spark バッチ ワークロードのプロパティの設定をご覧ください)。このプロパティを有効にすると、廃止されたエグゼキュータからのフェッチに失敗した後、レデューサ タスクはドライバからシャッフル出力のロケーションを再取得します。