Spark ジョブ調整のヒント

Spark SQL の使用

Spark SQL DataFrame API は、RDD API を大幅に最適化したものです。RDD を使用するコードを操作する場合は、RDD をコードに渡す前に DataFrame としてデータを読み取ることを検討してください。Java や Scala のコードでは、RDD とデータフレームのスーパーセットとして Spark SQL Dataset API の使用を検討してください。

Apache Spark 3 の使用

Dataproc 2.0 で Spark 3 をインストールすると、次の機能とパフォーマンスが改善されます。

  • GPU のサポート
  • バイナリ ファイルの読み取り機能
  • パフォーマンスの改善
  • 動的パーティション プルーニング
  • 適応型クエリ実行。Spark ジョブをリアルタイムで最適化します。

ダイナミック アロケーションの使用

Apache Spark には、クラスタ内のワーカーで Spark エグゼキュータの数をスケーリングする動的割り当て機能が含まれています。この機能を使用すると、クラスタがスケールアップされても、ジョブが Dataproc クラスタ全体を使用できます。この機能は、デフォルトで Dataproc で有効になっています(spark.dynamicAllocation.enabledtrue に設定されています)。詳細は、Spark ダイナミック アロケーションをご覧ください。

Dataproc の自動スケーリングを使用する

Dataproc の自動スケーリングでは、クラスタの Dataproc ワーカーの数が動的に増減されるため、Spark ジョブで迅速に完了するために必要なリソースを確保できます。

セカンダリ ワーカーのみをスケーリングするように自動スケーリング ポリシーを構成することがベスト プラクティスとなります。

Dataproc の高度な柔軟性モードの使用

プリエンプティブル VM または自動スケーリング ポリシーを持つクラスタは、ワーカーをプリエンプトまたは削除する場合、削減指定子へのシャッフル データの提供が完了する前に、FetchFailed の例外を受け取ることがあります。この例外が発生すると、タスクの再試行が発生したり、ジョブの完了時間が長くなったりする可能性があります。

推奨事項: Dataproc の高度な柔軟性モードを使用してください。このモードを使用すると、セカンダリ ワーカーに中間シャッフル データが保存されないため、セカンダリ ワーカーを安全にプリエンプトまたはスケールダウンできます。

パーティショニングとシャッフルの構成

Spark は、クラスタの一時パーティションにデータを保存します。アプリケーションが DataFrame をグループ化または結合すると、グループと低レベル構成に従って新しいパーティション内にデータがシャッフルされます。

データ パーティショニングは、アプリケーションのパフォーマンスに大きな影響を与えます。パーティションが少なすぎると、ジョブの並列処理とクラスタ リソースの使用率が制限されます。パーティションが多すぎると、追加のパーティション処理とシャッフルのためにジョブの速度が低下します。

パーティションの構成

パーティションの数とサイズは、次のプロパティで決まります。

  • spark.sqlfiles.maxPartitionBytes: Cloud Storage からデータを読み取るときのパーティションの最大サイズ。デフォルトは 128 MB です。これは、100 TB 未満の処理を行うほとんどのアプリケーションには十分な大きさです。

  • spark.sql.shuffle.partitions: シャッフル実行後のパーティションの数。デフォルトは 200 です。これは、vCPU の合計数が 100 未満のクラスタで最適な値です。推奨: この値を、クラスタ内の vCPU 数の 3 倍に設定します。

  • spark.default.parallelism: シャッフルが必要な RDD 変換(joinreduceByKeyparallelize など)を実行した後に返されるパーティションの数。デフォルトは、クラスタ内の vCPU の合計数です。Spark ジョブで RDD を使用する場合、この数は vCPU の 3 倍に設定します。

ファイル数を制限する

Spark で多数の小さなファイルを読み取ると、パフォーマンスが低下します。100 MB を超えるファイルサイズでデータを保存してください。同様に、出力ファイルの数を制限してください(シャッフルを強制するには、不要なシャッフルの回避をご覧ください)。

アダプティブ クエリ実行の構成(Spark 3)

適応型クエリ実行(Dataproc イメージ バージョン 2.0 でデフォルトで有効になっています)を行うと、次のような Spark ジョブのパフォーマンスが向上します。

ほとんどのユースケースにはデフォルトの構成設定で問題ありませんが、spark.sql.adaptive.advisoryPartitionSizeInBytesspark.sqlfiles.maxPartitionBytes(デフォルトは 128 MB)に設定すると便利です。

不要なシャッフルの回避

Spark を使用すると、シャッフルを手動でトリガーし、repartition 関数を使用してデータを再調整できます。シャッフルは負荷が大きいため、データの再シャッフルは慎重に使用される必要があります。Spark でデータを自動的にパーティショニングできるようにするには、パーティション構成を適切に設定するだけで十分です。

例外: 列パーティション分割データを Cloud Storage に書き込むときに、特定の列を再パーティショニングすることで、多数の小さなファイルの書き込みが回避され、書き込み時間を短縮できます。

df.repartition("col_name").write().partitionBy("col_name").save("gs://...")

Parquet または Avro にデータを保存する

Spark SQL は、デフォルトで Snappy 圧縮 Parquet ファイルのデータの読み取りと書き込みを行います。Parquet は効率的なカラム型ファイル形式であり、Spark でアプリケーションの実行に必要なデータのみを読み取ることができます。これは、大規模なデータセットを扱う場合に非常に重要になります。Apache ORC など、他のカラム型形式も効果的です。

カラム型ではないデータの場合、Apache Avro が効率的なバイナリ行ファイル形式を提供します。通常は Parquet より時間がかかりますが、Avro のパフォーマンスはテキストベースの形式(CSV や JSON など)より優れています。

ディスクサイズの最適化

永続ディスクのスループットはディスクサイズに応じてスケーリングされます。ジョブはメタデータを書き込み、データをディスクにシャッフルするため、Spark ジョブのパフォーマンスに影響を与える可能性があります。標準の永続ディスクを使用する場合、ディスクサイズはワーカーあたり少なくとも 1 テラバイトである必要があります(永続ディスクサイズによるパフォーマンスを参照)。

Google Cloud Console でワーカーのスループットをモニタリングするには、次のようにします。

  1. [クラスタ] ページでクラスタ名をクリックします。
  2. [VM インスタンス] タブをクリックします。
  3. ワーカー名をクリックします。
  4. [モニタリング] タブをクリックして、[ディスク スループット] までスクロールし、ワーカー スループットを表示します。

ディスクに関する考慮事項

エフェメラル ストレージのメリットがないエフェメラル Dataproc クラスタでは、ローカル SSD を使用できます。ローカル SSD は、クラスタに物理的に接続され、永続ディスクよりもスループットが高くなります(パフォーマンス表をご覧ください)。ローカル SSD は、375 GB の固定サイズで利用できますが、複数の SSD を追加してパフォーマンスを改善することもできます。

ローカル SSD は、クラスタのシャットダウン後にデータを保持しません。永続ストレージが必要な場合は、SSD 永続ディスクを使用して、標準永続ディスクよりもサイズに対するスループットを高めることができます。パーティション サイズが 8 KB 未満の場合は、SSD 永続ディスクも適しています(ただし、小さいパーティションは避けてください)。

クラスタに GPU を接続する

Spark 3 は GPU をサポートしています。RAPIDS 初期化アクションで GPU を使用して、RAPIDS SQL Accelerator を使用して Spark ジョブを高速化します。GPU を使用するクラスタを構成するには、GPU ドライバの初期化アクションを使用します。

一般的なジョブのエラーと解決策

メモリ不足

例:

  • 「Lost executor」(エグゼキュータがなくなりました)
  • 「java.lang.OutOfMemoryError: GC overhead limit exceeded」(GC のオーバーヘッド上限を超えました)
  • 「Container killed by YARN for exceeding memory limits」(コンテナの上限超過のため YARN によって強制終了されました)

考えられる解決策

シャッフル フェッチのエラー

例:

  • 「FetchFailedException」(Spark エラー)
  • 「Failed to connect to...」(Spark エラー)
  • 「Failed to fetch」(取得できませんでした)(MapReduce エラー)

通常、まだ処理するシャッフル データがあるワーカーの早期削除が原因です。

考えられる原因と修正点:

  • プリエンプティブル ワーカー VM が再要求された、または非プリエンプティブル ワーカー VM がオートスケーラーによって削除されました。解決策: 高度な柔軟性モードを使用して、セカンダリ ワーカーを安全にプリエンプティブルまたはスケーラブルにします。
  • OutOfMemory エラーが原因でエグゼキュータまたはマッパーがクラッシュしました。解決策: エグゼキュータまたはマッパーのメモリを増やしてください。
  • Spark シャッフル サービスが過負荷状態になっている可能性があります。解決策: ジョブ パーティションの数を減らします。

YARN ノードが UNHEALTHY

YARN ログからの例:

...reported UNHEALTHY with details: 1/1 local-dirs usable space is below
configured utilization percentage/no more usable space
[ /hadoop/yarn/nm-local-dir : used space above threshold of 90.0% ]

多くの場合、データをシャッフルするためのディスク容量の不足に関連しています。ログファイルを表示して診断します。

  • Cloud Console でプロジェクトの [クラスタ] ページを開き、クラスタの名前をクリックします。
  • [ログを表示] をクリックします。
  • hadoop-yarn-nodemanager でログをフィルタします。
  • 「UNHEALTHY」で検索します。

解決策: ログに「UNHEALTHY」というステータスが報告される場合は、より大きなディスク スペースを使用してクラスタを再作成し、スループットの上限を引き上げます。

詳細情報