Spark ジョブ調整のヒント

以降のセクションでは、Dataproc Spark アプリケーションを微調整するためのヒントを紹介します。

エフェメラル クラスタを使用する

Dataproc の「エフェメラル」クラスタモデルを使用する場合、ジョブごとに専用クラスタを作成し、ジョブが終了したらクラスタを削除します。 エフェメラル モデルを使用すると、ストレージとコンピューティングを別々に処理し、ジョブの入力データと出力データを Cloud Storage または BigQuery に保存し、コンピューティングと一時データ ストレージのみにクラスタを使用できます。

永続的なクラスタの注意点

1 つのジョブのエフェメラル クラスタを使用すると、共有クラスタおよび長時間実行される「永続」クラスタの使用に伴う次のような注意点と潜在的な問題を回避できます。

  • 単一障害点: 共有クラスタのエラー状態が原因で、すべてのジョブが失敗し、データ パイプライン全体がブロックされることがあります。エラーの調査と復旧には数時間かかる場合があります。エフェメラル クラスタはクラスタ内の一時的な状態のみを保持するため、エラーが発生した場合は、すばやく削除して再作成できます。
  • HDFS、MySQL、またはローカル ファイル システムでクラスタの状態の維持および移行が困難
  • SLO に悪影響を及ぼすジョブ間のリソース競合
  • メモリ負荷によりサービス デーモンが応答しない
  • ディスク容量を超える可能性のあるログと一時ファイルの蓄積
  • クラスタゾーンの在庫切れによるアップスケーリングの失敗
  • 古いクラスタ イメージ バージョンのサポートがない。

エフェメラル クラスタのメリット

メリットとしては、エフェメラル クラスタでは、次のことが可能です。

  • 異なる Dataproc VM サービス アカウントで、ジョブごとに異なる IAM 権限を構成します。
  • 必要に応じてクラスタ構成を変更し、ジョブごとにクラスタのハードウェアとソフトウェアの構成を最適化します。
  • 新しいクラスタのイメージ バージョンをアップグレードして、最新のセキュリティ パッチ、バグの修正、最適化を適用します。
  • 分離された単一ジョブクラスタで問題のトラブルシューティングをより迅速に行います。
  • 共有クラスタ上のジョブ間のアイドル時間ではなく、エフェメラル クラスタの実行時間に対してのみ料金を支払うことで、コストを削減できます。

Spark SQL の使用

Spark SQL DataFrame API は、RDD API を大幅に最適化したものです。RDD を使用するコードを操作する場合は、RDD をコードに渡す前に DataFrame としてデータを読み取ることを検討してください。Java や Scala のコードでは、RDD とデータフレームのスーパーセットとして Spark SQL Dataset API の使用を検討してください。

Apache Spark 3 の使用

Dataproc 2.0 で Spark 3 をインストールすると、次の機能とパフォーマンスが改善されます。

  • GPU のサポート
  • バイナリ ファイルの読み取り機能
  • パフォーマンスの改善
  • 動的パーティション プルーニング
  • 適応型クエリ実行。Spark ジョブをリアルタイムで最適化します。

ダイナミック アロケーションの使用

Apache Spark には、クラスタ内のワーカーで Spark エグゼキュータの数をスケーリングする動的割り当て機能が含まれています。この機能を使用すると、クラスタがスケールアップされても、ジョブが Dataproc クラスタ全体を使用できます。この機能は、デフォルトで Dataproc で有効になっています(spark.dynamicAllocation.enabledtrue に設定されています)。詳細は、Spark ダイナミック アロケーションをご覧ください。

Dataproc の自動スケーリングを使用する

Dataproc の自動スケーリングでは、クラスタの Dataproc ワーカーの数が動的に増減されるため、Spark ジョブで迅速に完了するために必要なリソースを確保できます。

セカンダリ ワーカーのみをスケーリングするように自動スケーリング ポリシーを構成することがベスト プラクティスとなります。

Dataproc の高度な柔軟性モードの使用

プリエンプティブル VM または自動スケーリング ポリシーを持つクラスタは、ワーカーをプリエンプトまたは削除する場合、削減指定子へのシャッフル データの提供が完了する前に、FetchFailed の例外を受け取ることがあります。この例外が発生すると、タスクの再試行が発生したり、ジョブの完了時間が長くなったりする可能性があります。

推奨事項: Dataproc の高度な柔軟性モードを使用してください。このモードを使用すると、セカンダリ ワーカーに中間シャッフル データが保存されないため、セカンダリ ワーカーを安全にプリエンプトまたはスケールダウンできます。

パーティショニングとシャッフルの構成

Spark は、クラスタの一時パーティションにデータを保存します。アプリケーションが DataFrame をグループ化または結合すると、グループと低レベル構成に従って新しいパーティション内にデータがシャッフルされます。

データ パーティショニングは、アプリケーションのパフォーマンスに大きな影響を与えます。パーティションが少なすぎると、ジョブの並列処理とクラスタ リソースの使用率が制限されます。パーティションが多すぎると、追加のパーティション処理とシャッフルのためにジョブの速度が低下します。

パーティションの構成

パーティションの数とサイズは、次のプロパティで決まります。

  • spark.sql.files.maxPartitionBytes: Cloud Storage からデータを読み取るときのパーティションの最大サイズ。デフォルトは 128 MB です。これは、100 TB 未満の処理を行うほとんどのアプリケーションには十分な大きさです。

  • spark.sql.shuffle.partitions: シャッフル実行後のパーティションの数。デフォルトは 200 です。これは、vCPU の合計数が 100 未満のクラスタで最適な値です。推奨: この値を、クラスタ内の vCPU 数の 3 倍に設定します。

  • spark.default.parallelism: シャッフルが必要な RDD 変換(joinreduceByKeyparallelize など)を実行した後に返されるパーティションの数。デフォルトは、クラスタ内の vCPU の合計数です。Spark ジョブで RDD を使用する場合、この数は vCPU の 3 倍に設定します。

ファイル数を制限する

Spark で多数の小さなファイルを読み取ると、パフォーマンスが低下します。256 MB~512 MB の範囲のファイルサイズなど、より大きなファイルサイズでデータを保存します。 同様に、出力ファイルの数を制限してください(シャッフルを強制するには、不要なシャッフルの回避をご覧ください)。

アダプティブ クエリ実行の構成(Spark 3)

適応型クエリ実行(Dataproc イメージ バージョン 2.0 でデフォルトで有効になっています)を行うと、次のような Spark ジョブのパフォーマンスが向上します。

ほとんどのユースケースにはデフォルトの構成設定で問題ありませんが、spark.sql.adaptive.advisoryPartitionSizeInBytesspark.sqlfiles.maxPartitionBytes(デフォルトは 128 MB)に設定すると便利です。

不要なシャッフルの回避

Spark を使用すると、シャッフルを手動でトリガーし、repartition 関数を使用してデータを再調整できます。シャッフルは負荷が大きいため、データの再シャッフルは慎重に使用される必要があります。Spark でデータを自動的にパーティショニングできるようにするには、パーティション構成を適切に設定するだけで十分です。

例外: 列パーティション分割データを Cloud Storage に書き込むときに、特定の列を再パーティショニングすることで、多数の小さなファイルの書き込みが回避され、書き込み時間を短縮できます。

df.repartition("col_name").write().partitionBy("col_name").save("gs://...")

Parquet または Avro にデータを保存する

Spark SQL は、デフォルトで Snappy 圧縮 Parquet ファイルのデータの読み取りと書き込みを行います。Parquet は効率的なカラム型ファイル形式であり、Spark でアプリケーションの実行に必要なデータのみを読み取ることができます。これは、大規模なデータセットを扱う場合に非常に重要になります。Apache ORC など、他のカラム型形式も効果的です。

カラム型ではないデータの場合、Apache Avro が効率的なバイナリ行ファイル形式を提供します。通常は Parquet より時間がかかりますが、Avro のパフォーマンスはテキストベースの形式(CSV や JSON など)より優れています。

ディスクサイズの最適化

永続ディスクのスループットはディスクサイズに応じてスケーリングされます。ジョブはメタデータを書き込み、データをディスクにシャッフルするため、Spark ジョブのパフォーマンスに影響を与える可能性があります。標準の永続ディスクを使用する場合、ディスクサイズはワーカーあたり少なくとも 1 テラバイトである必要があります(永続ディスクサイズによるパフォーマンスを参照)。

Google Cloud Console でワーカーのスループットをモニタリングするには、次のようにします。

  1. [クラスタ] ページでクラスタ名をクリックします。
  2. [VM インスタンス] タブをクリックします。
  3. ワーカー名をクリックします。
  4. [モニタリング] タブをクリックして、[ディスク スループット] までスクロールし、ワーカー スループットを表示します。

ディスクに関する考慮事項

永続ストレージのメリットがないエフェメラル Dataproc クラスタでは、ローカル SSD を使用できます。ローカル SSD は、クラスタに物理的に接続され、永続ディスクよりもスループットが高くなります(パフォーマンス表をご覧ください)。ローカル SSD は、375 GB の固定サイズで利用できますが、複数の SSD を追加してパフォーマンスを改善することもできます。

ローカル SSD は、クラスタのシャットダウン後にデータを保持しません。永続ストレージが必要な場合は、SSD 永続ディスクを使用して、標準永続ディスクよりもサイズに対するスループットを高めることができます。パーティション サイズが 8 KB 未満の場合は、SSD 永続ディスクも適しています(ただし、小さいパーティションは避けてください)。

クラスタに GPU を接続する

Spark 3 は GPU をサポートしています。RAPIDS 初期化アクションで GPU を使用して、RAPIDS SQL Accelerator を使用して Spark ジョブを高速化します。GPU を使用するクラスタを構成するには、GPU ドライバの初期化アクションを使用します。

一般的なジョブのエラーと解決策

メモリ不足

例:

  • 「Lost executor」(エグゼキュータがなくなりました)
  • 「java.lang.OutOfMemoryError: GC overhead limit exceeded」(GC のオーバーヘッド上限を超えました)
  • 「Container killed by YARN for exceeding memory limits」(コンテナの上限超過のため YARN によって強制終了されました)

考えられる解決策

シャッフル フェッチのエラー

例:

  • 「FetchFailedException」(Spark エラー)
  • 「Failed to connect to...」(Spark エラー)
  • 「Failed to fetch」(取得できませんでした)(MapReduce エラー)

通常、まだ処理するシャッフル データがあるワーカーの早期削除が原因です。

考えられる原因と修正点:

  • プリエンプティブル ワーカー VM が再要求された、または非プリエンプティブル ワーカー VM がオートスケーラーによって削除されました。解決策: 高度な柔軟性モードを使用して、セカンダリ ワーカーを安全にプリエンプティブルまたはスケーラブルにします。
  • OutOfMemory エラーによりエグゼキュータまたはマッパーがクラッシュしました。解決策: エグゼキュータまたはマッパーのメモリを増やします。
  • Spark シャッフル サービスが過負荷状態になっている可能性があります。解決策: ジョブ パーティションの数を減らします。

YARN ノードが UNHEALTHY

YARN ログからの例:

...reported UNHEALTHY with details: 1/1 local-dirs usable space is below
configured utilization percentage/no more usable space
[ /hadoop/yarn/nm-local-dir : used space above threshold of 90.0% ]

多くの場合、データをシャッフルするためのディスク容量の不足に関連しています。ログファイルを表示して診断します。

  • Google Cloud コンソールでプロジェクトの [クラスタ] ページを開き、クラスタの名前をクリックします。
  • [ログを表示] をクリックします。
  • hadoop-yarn-nodemanager でログをフィルタします。
  • 「UNHEALTHY」で検索します。

考えられる解決策:

  • ユーザー キャッシュは、yarn-site.xml fileyarn.nodemanager.local-dirs プロパティで指定したディレクトリに保存されます。このファイルは /etc/hadoop/conf/yarn-site.xml にあります。/hadoop/yarn/nm-local-dir パスの空き容量を確認し、/hadoop/yarn/nm-local-dir/usercache ユーザー キャッシュ フォルダを削除して空き容量を確保します。
  • ログに「UNHEALTHY」というステータスが報告される場合は、より大きなディスク スペースを使用してクラスタを再作成し、スループットの上限を引き上げます。

ドライバのメモリ不足によるジョブの失敗

クラスタモードでジョブを実行する場合、マスターノードのメモリサイズがワーカーノードのメモリサイズより大幅に大きくなると、ジョブが失敗します。

ドライバログの例:

'Exception in thread "main" java.lang.IllegalArgumentException:
Required AM memory (32768+3276 MB) is above the max threshold (12288 MB) of this cluster!
Please check the values of 'yarn.scheduler.maximum -allocation-mb' and/or 'yarn.nodemanager.resource.memory-mb'.'

考えられる解決策:

  • spark:spark.driver.memoryyarn:yarn.scheduler.maximum-allocation-mb よりも小さく設定します。
  • マスターノードとワーカーノードに同じマシンタイプを使用します。

追加情報