Dataproc の高度な柔軟性モード(EFM)は、シャッフル データを管理して、動作中のクラスタからのノードの削除に起因するジョブ進行の遅延を最小限に抑えます。EFM は、Spark シャッフル データをプライマリ ワーカーに書き込みます。ワーカーは、削減フェーズ中にこれらのリモートノードからプル操作を実行します。
EFM はセカンダリ ワーカーに中間シャッフル データを保存しないため、プリエンプティブル VM を使用するクラスタ、またはセカンダリ ワーカー グループのみを自動スケーリングするクラスタでの使用に適しています。
- AppMaster の再配置に対応していない Apache Hadoop YARN ジョブは、高度な柔軟性モードで失敗する可能性があります(AppMaster の終了を待つタイミングをご覧ください)。
- 以下に対しては、高度な柔軟性モードはおすすめしません。
- プライマリ ワーカーのみのクラスタ。
- 以下の場合は、高度な柔軟性モードはサポートされません。
- プライマリ ワーカーの自動スケーリングが有効な場合。ほとんどの場合、プライマリ ワーカーは、自動的に移行されないシャッフル データを保持します。プライマリ ワーカー グループをダウンスケーリングすると EFM の利点がなくなります。
- 正常なデコミッションを有効にしてクラスタ上で Spark ジョブを実行する場合。YARN の正常なデコミッション メカニズムは、関連するすべてのアプリケーションが完了するまでデコミッション ノードを保持しているため、正常なデコミッションと EFM を複数の目的に使用できます。
高度な柔軟性モードの使用
高度な柔軟性モードは実行エンジンごとに構成され、クラスタの作成時に構成する必要があります。Spark EFM の実装は、dataproc:efm.spark.shuffle=primary-worker
クラスタ プロパティを使用して構成されています。
例: Spark 用にプライマリ ワーカー シャッフルを使用してクラスタを作成します。
gcloud dataproc clusters create cluster-name \ --region=region \ --properties=dataproc:efm.spark.shuffle=primary-worker \ --worker-machine-type=n1-highmem-8 \ --num-workers=25 \ --num-worker-local-ssds=2 \ --secondary-worker-type=preemptible \ --secondary-worker-boot-disk-size=500GB \ --num-secondary-workers=25
Apache Spark の例
- EFM クラスタで Spark サンプル jar を使用して、一般公開のシェイクスピア テキストに対して WordCount ジョブを実行します。
gcloud dataproc jobs submit spark \ --cluster=cluster-name \ --region=region \ --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \ --class=org.apache.spark.examples.JavaWordCount \ -- gs://apache-beam-samples/shakespeare/macbeth.txt
プライマリ ワーカー シャッフル用のローカル SSD の構成
プライマリ ワーカー シャッフルと HDFS の実装は、VM に接続されたディスクに中間シャッフル データを書き込み、ローカル SSD が提供する追加スループットと IOPS の恩恵を受けます。リソース割り当てを容易に行うため、プライマリ ワーカー マシンを構成するときに、4 つの vCPU あたり約 1 個のローカル SSD パーティションに目標を定めます。
ローカル SSD を接続するには、--num-worker-local-ssds
フラグを gcloud dataproc clusters create コマンドに渡します。
セカンダリ ワーカーの比率
セカンダリ ワーカーはシャッフル データをプライマリ ワーカーに書き込むため、ジョブのシャッフル負荷に対応できる十分な CPU、メモリ、ディスク リソースを持つ十分な数のプライマリ ワーカーがクラスタに含まれている必要があります。クラスタの自動スケーリングを目的としてプライマリ グループのスケーリングと望ましくない挙動を防ぐため、minInstances
を プライマリ ワーカー グループの自動スケーリング ポリシーの maxInstances
値に設定します。
セカンダリ ワーカーの比率がプライマリ ワーカーに対して高い(10:1 など)場合は、プライマリ ワーカーの CPU 使用率、ネットワーク、ディスク使用量をモニタリングして、過負荷状態かどうかを判断します。手順は次のとおりです。
Google Cloud Console の [VM インスタンス] ページに移動します。
プライマリ ワーカーの左側にあるチェックボックスをオンにします。
[モニタリング] タブをクリックして、プライマリ ワーカーの CPU 使用率、ディスク IOPS、ネットワーク バイト数などの指標を表示します。
プライマリ ワーカーが過負荷になっている場合は、プライマリ ワーカーの手動でのスケールアップを検討してください。
プライマリ ワーカー グループのサイズを変更する
プライマリ ワーカー グループは安全にスケールアップできますが、プライマリ ワーカー グループをダウンスケーリングすると、ジョブの進行状況に悪影響が及ぶ可能性があります。プライマリ ワーカー グループをスケールダウンするオペレーションでは、正常なデコミッションを適用する必要があります。これは --graceful-decommission-timeout
フラグを設定することで有効になります。
自動スケーリングされたクラスタ: 自動スケーリング ポリシーが設定されている EFM クラスタで、プライマリ ワーカー グループのスケーリングが無効になります。自動スケーリングされたクラスタでプライマリ ワーカー グループのサイズを変更するには、以下の手順を実施します。
自動スケーリングを無効にします。
gcloud dataproc clusters update \ --cluster=cluster-name \ --region=region \ --disable-autoscaling
プライマリ グループをスケーリングします。
gcloud dataproc clusters update \ --cluster=cluster-name \ --region=region \ --num-workers=num-primary-workers \ --graceful-decommission-timeout=graceful-decommission-timeout # (if downscaling)
自動スケーリングを再度有効にします。
gcloud dataproc clusters update \ --cluster=cluster-name \ --region=region \ --autoscaling-policy=autoscaling-policy
プライマリ ワーカー ディスクの使用状況をモニタリングする
プライマリ ワーカーには、クラスタのシャッフル データ用の十分なディスク容量が必要です。remaining HDFS capacity
指標を使用して、この機能をモニタリングできます。ローカル ディスクがいっぱいになると、HDFS のスペースが利用できなくなり、残りの容量が減少します。
デフォルトでは、プライマリ ワーカーのローカル ディスクの使用量が容量の 90% を超過すると、YARN ノード UI でノードが UNHEALTHY に指定されます。ディスク容量に関する問題が発生した場合は、HDFS から未使用のデータを削除するか、プライマリ ワーカープールをスケールアップします。
通常、中間シャッフル データはジョブの終了後までクリーンアップされません。Spark でプライマリ ワーカー シャッフルを使用する場合は、ジョブが完了してから最大 30 分かかることがあります。
詳細構成
パーティショニングと並列処理
MapReduce または Spark ジョブを送信するときは、適切なレベルのパーティショニングを構成します。シャッフル ステージの入力パーティションと出力パーティション数を決定するには、さまざまなパフォーマンス特性のトレードオフが必要です。ジョブ形態に適した値を試すことをおすすめします。
入力パーティション
MapReduce と Spark の入力パーティショニングは、入力データセットによって決定されます。Cloud Storage からファイルを読み取る際に、各タスクは 1 つの「ブロックサイズ」のデータを処理します。
Spark SQL ジョブの場合、パーティションの最大サイズは
spark.sql.files.maxPartitionBytes
で制御されます。1 GB に増やすことを検討してください。spark.sql.files.maxPartitionBytes=1073741824
MapReduce ジョブと Spark RDD では、通常、パーティション サイズは
fs.gs.block.size
で制御され、デフォルトは 128 MB です。1 GB に増やすことを検討してください。InputFormat
固有のプロパティ(mapreduce.input.fileinputformat.split.minsize
、mapreduce.input.fileinputformat.split.maxsize
など)を設定することもできます。- MapReduce ジョブの場合:
--properties fs.gs.block.size=1073741824
- Spark RDD の場合:
--properties spark.hadoop.fs.gs.block.size=1073741824
- MapReduce ジョブの場合:
出力パーティション
後続のステージのタスク数は、複数のプロパティによって制御されます。1 TB 以上を処理する大きなジョブの場合は、パーティションごとに少なくとも 1 GB を用意することを検討してください。
MapReduce ジョブの場合、出力パーティションの数は
mapreduce.job.reduces
によって制御されます。Spark SQL の場合、出力パーティションの数は
spark.sql.shuffle.partitions
によって制御されます。RDD API を使用する Spark ジョブの場合、出力パーティションの数を指定するか、
spark.default.parallelism
を設定します。
プライマリ ワーカーのシャッフル調整
最も重要なプロパティは --properties yarn:spark.shuffle.io.serverThreads=<num-threads>
です。Spark のシャッフル サーバーはノード マネージャーの一部として実行されるため、これはクラスタレベルの YARN プロパティである点に注意してください。マシンではデフォルトでコアが 2 倍になります(たとえば、n1-highmem-8 の場合は 16 スレッド)。[Shuffle Read Blocked Time] が 1 秒を超え、プライマリ ワーカーがネットワーク、CPU、ディスクの上限に達していない場合は、シャッフル サーバーのスレッド数を増やすことを検討してください。
大規模なマシンタイプでは、spark.shuffle.io.numConnectionsPerPeer
(デフォルトは 1)を増やすことを検討してください。(たとえば、ホストのペアごとに 5 つの接続を設定します)。
再試行回数の引き上げ
アプリマスター、タスク、ステージに対して許可される最大試行回数は、次のプロパティを設定することによって構成できます。
yarn:yarn.resourcemanager.am.max-attempts mapred:mapreduce.map.maxattempts mapred:mapreduce.reduce.maxattempts spark:spark.task.maxFailures spark:spark.stage.maxConsecutiveAttempts
正常にデコミッションせずに多くのプリエンプティブル VM または自動スケーリングを使用するクラスタでは、アプリマスターとタスクがより頻繁に終了するため、それらのクラスタ内で上記の値を引き上げることをおすすめします(なお、Spark と正常なデコミッションは EFM とは併用できません)。
EFM クラスタでの YARN の正常なデコミッション
YARN の正常なデコミッションを使用すると、実行中のアプリケーションへの影響を最小限に抑えながらノードを迅速に削除できます。自動スケーリング クラスタの場合、正常なデコミッションのタイムアウトは、EFM クラスタに接続されている AutoscalingPolicy で設定できます。
正常なデコミッションの EFM の強化
中間データは分散ファイル システムに保存されるため、ノード上で実行中のすべてのコンテナが終了するとすぐに、EFM クラスタからノードを削除できます。これに対し、アプリケーションが完了するまでは、標準の Dataproc クラスタ上のノードは削除されません。
ノードの削除は、ノードで実行されているアプリマスターの終了を待機しません。アプリマスターのコンテナが終了されると、デコミッションされていない別のノードで再度スケジュール設定されます。ジョブの進捗率は失われません。新しいアプリマスターは、ジョブ履歴を読み取ることで以前のアプリマスターから状態を迅速に回復します。