Dataproc の高度な柔軟性モード

Dataproc の高度な柔軟性モード(EFM)は、シャッフル データを管理して、動作中のクラスタからのノードの削除に起因するジョブ進行の遅延を最小限に抑えます。EFM は、ユーザーが選択できる 2 つのモードのいずれかでシャッフル データの負荷を軽減します。

  1. プライマリ ワーカー シャッフル。マッパーがプライマリ ワーカーにデータを書き込みます。ワーカーは、削減フェーズ中にこれらのリモートノードからプル操作を実行します。このモードは、Spark ジョブに対してのみ使用でき、Spark ジョブに使用するよう推奨されています。

  2. HCFS(Hadoop 互換ファイル システム)シャッフル。Mapper は HCFS 実装にデータを書き込みます(デフォルトは HDFS)。プライマリ ワーカー モードと同様に、プライマリ ワーカーのみが HDFS と HCFS の実装に参加します(HCFS シャッフルが Cloud Storage コネクタを使用する場合、データはクラスタ外に保存されます)。このモードは、データ量の少ないジョブにはメリットがありますが、スケーリングの制限により、大規模なジョブにはおすすめできません。

どちらの EFM モードでもセカンダリ ワーカーに中間シャッフル データが保存されることはないため、EFM はプリエンプティブル VM を使用するクラスタ、またはセカンダリ ワーカー グループの自動スケーリングのみを行うクラスタに適しています。

制限事項:

  • AppMaster の再配置に対応していない Apache Hadoop YARN ジョブは、高度な柔軟性モードで失敗する可能性があります(AppMaster の終了を待つタイミングをご覧ください)。
  • 以下に対しては、高度な柔軟性モードはおすすめしません
    • プライマリ ワーカーのみのクラスタ。
  • 以下の場合は、高度な柔軟性モードはサポートされません
    • プライマリ ワーカーの自動スケーリングが有効な場合。ほとんどの場合、プライマリ ワーカーは、自動的に移行されないシャッフル データを保持します。プライマリ ワーカー グループをダウンスケーリングすると EFM の利点がなくなります。
    • 正常なデコミッションを有効にしてクラスタ上で Spark ジョブを実行する場合(SPARK-20628 を参照)。

高度な柔軟性モードの使用

高度な柔軟性モードは実行エンジンごとに構成され、クラスタの作成時に構成する必要があります。

  • Spark EFM の実装は、dataproc:efm.spark.shuffle クラスタ プロパティを使用して構成されています。有効なプロパティ値は以下のとおりです。

    • プライマリ ワーカーのシャッフルの場合は primary-worker(推奨)
    • HCFS ベースのシャッフルの場合は hcfsこのモードは非推奨であり、イメージ バージョン 1.5 を実行しているクラスタでのみ使用できます。新しいワークフローにはおすすめしません。
  • Hadoop MapReduce の実装は、dataproc:efm.mapreduce.shuffle クラスタ プロパティを使用して構成されています。有効なプロパティ値は以下のとおりです。

    • hcfs

例: Spark 用にプライマリ ワーカー シャッフルを使用し、MapReduce 用に HCFS シャッフルを使用してクラスタを作成します。

gcloud dataproc clusters create cluster-name \
    --region=region \
    --properties=dataproc:efm.spark.shuffle=primary-worker \
    --properties=dataproc:efm.mapreduce.shuffle=hcfs \
    --worker-machine-type=n1-highmem-8 \
    --num-workers=25 \
    --num-worker-local-ssds=2 \
    --secondary-worker-type=preemptible \
    --secondary-worker-boot-disk-size=500GB \
    --num-secondary-workers=25

Apache Spark の例

  1. EFM クラスタで Spark サンプル jar を使用して、一般公開のシェイクスピア テキストに対して WordCount ジョブを実行します。
    gcloud dataproc jobs submit spark \
        --cluster=cluster-name \
        --region=region \
        --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
        --class=org.apache.spark.examples.JavaWordCount \
        -- gs://apache-beam-samples/shakespeare/macbeth.txt
    

Apache Hadoop MapReduce の例

  1. EFM クラスタの mapreduce サンプルを使用して小規模な teragen ジョブを実行し、後の terasort ジョブに備えて Cloud Storage に入力データを生成します。

    gcloud dataproc jobs submit hadoop \
        --cluster=cluster-name \
        --region=region \
        --jar=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \
        -- teragen 1000 Cloud Storage output URI (for example, gs://terasort/input)
    

  2. データに対して Terasort ジョブを実行します。

    gcloud dataproc jobs submit hadoop \
        --cluster=cluster-name \
        --region=region \
        --jar=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \
        -- terasort gs://terasort/input gs://terasort/output
    

プライマリ ワーカー シャッフル用のローカル SSD の構成

プライマリ ワーカー シャッフルと HDFS の実装は、VM に接続されたディスクに中間シャッフル データを書き込み、ローカル SSD が提供する追加スループットと IOPS の恩恵を受けます。リソース割り当てを容易に行うため、プライマリ ワーカー マシンを構成するときに、4 つの vCPU あたり約 1 個のローカル SSD パーティションに目標を定めます。

ローカル SSD を接続するには、--num-worker-local-ssds フラグを gcloud dataproc clusters create コマンドに渡します。

セカンダリ ワーカーの比率

セカンダリ ワーカーはシャッフル データをプライマリ ワーカーに書き込むため、ジョブのシャッフル負荷に対応できる十分な CPU、メモリ、ディスク リソースを持つ十分な数のプライマリ ワーカーがクラスタに含まれている必要があります。クラスタの自動スケーリングを目的としてプライマリ グループのスケーリングと望ましくない挙動を防ぐため、minInstances を プライマリ ワーカー グループの自動スケーリング ポリシーmaxInstances 値に設定します。

セカンダリ ワーカーの比率がプライマリ ワーカーに対して高い(10:1 など)場合は、プライマリ ワーカーの CPU 使用率、ネットワーク、ディスク使用量をモニタリングして、過負荷状態かどうかを判断します。手順は次のとおりです。

  1. Google Cloud Console の [VM インスタンス] ページに移動します。

  2. プライマリ ワーカーの左側にあるチェックボックスをオンにします。

  3. [モニタリング] タブをクリックして、プライマリ ワーカーの CPU 使用率、ディスク IOPS、ネットワーク バイト数などの指標を表示します。

プライマリ ワーカーが過負荷になっている場合は、プライマリ ワーカーの手動でのスケールアップを検討してください。

プライマリ ワーカー グループのサイズを変更する

プライマリ ワーカー グループは安全にスケールアップできますが、プライマリ ワーカー グループをダウンスケーリングすると、ジョブの進行状況に悪影響が及ぶ可能性があります。プライマリ ワーカー グループをスケールダウンするオペレーションでは、正常なデコミッションを適用する必要があります。これは --graceful-decommission-timeout フラグを設定することで有効になります。

自動スケーリングされたクラスタ: 自動スケーリング ポリシーが設定されている EFM クラスタで、プライマリ ワーカー グループのスケーリングが無効になります。自動スケーリングされたクラスタでプライマリ ワーカー グループのサイズを変更するには、以下の手順を実施します。

  1. 自動スケーリングを無効にします。

    gcloud dataproc clusters update \
        --cluster=cluster-name \
        --region=region \
        --disable-autoscaling
    

  2. プライマリ グループをスケーリングします。

    gcloud dataproc clusters update \
        --cluster=cluster-name \
        --region=region \
        --num-workers=num-primary-workers \
        --graceful-decommission-timeout=graceful-decommission-timeout # (if downscaling)
    

  3. 自動スケーリングを再度有効にします。

    gcloud dataproc clusters update \
        --cluster=cluster-name \
        --region=region \
        --autoscaling-policy=autoscaling-policy
    

プライマリ ワーカー ディスクの使用状況をモニタリングする

プライマリ ワーカーには、クラスタのシャッフル データ用の十分なディスク容量が必要です。remaining HDFS capacity 指標を使用して、この機能をモニタリングできます。ローカル ディスクがいっぱいになると、HDFS のスペースが利用できなくなり、残りの容量が減少します。

デフォルトでは、プライマリ ワーカーのローカル ディスクの使用量が容量の 90% を超過すると、YARN ノード UI でノードが UNHEALTHY に指定されます。ディスク容量に関する問題が発生した場合は、HDFS から未使用のデータを削除するか、プライマリ ワーカープールをスケールアップします。

通常、中間シャッフル データはジョブの終了後までクリーンアップされません。Spark でプライマリ ワーカー シャッフルを使用する場合は、ジョブが完了してから最大 30 分かかることがあります。

詳細構成

パーティショニングと並列処理

MapReduce または Spark ジョブを送信するときは、適切なレベルのパーティショニングを構成します。シャッフル ステージの入力パーティションと出力パーティション数を決定するには、さまざまなパフォーマンス特性のトレードオフが必要です。ジョブ形態に適した値を試すことをおすすめします。

入力パーティション

MapReduce と Spark の入力パーティショニングは、入力データセットによって決定されます。Cloud Storage からファイルを読み取る際に、各タスクは 1 つの「ブロックサイズ」のデータを処理します。

  • Spark SQL ジョブの場合、パーティションの最大サイズは spark.sql.files.maxPartitionBytes で制御されます。1 GB に増やすことを検討してください。spark.sql.files.maxPartitionBytes=1073741824

  • MapReduce ジョブと Spark RDD では、通常、パーティション サイズは fs.gs.block.size で制御され、デフォルトは 128 MB です。1 GB に増やすことを検討してください。InputFormat 固有のプロパティ(mapreduce.input.fileinputformat.split.minsizemapreduce.input.fileinputformat.split.maxsize など)を設定することもできます。

    • MapReduce ジョブの場合: --properties fs.gs.block.size=1073741824
    • Spark RDD の場合: --properties spark.hadoop.fs.gs.block.size=1073741824

出力パーティション

後続のステージのタスク数は、複数のプロパティによって制御されます。1 TB 以上を処理する大きなジョブの場合は、パーティションごとに少なくとも 1 GB を用意することを検討してください。

  • MapReduce ジョブの場合、出力パーティションの数は mapreduce.job.reduces によって制御されます。

  • Spark SQL の場合、出力パーティションの数は spark.sql.shuffle.partitions によって制御されます。

  • RDD API を使用する Spark ジョブの場合、出力パーティションの数を指定するか、spark.default.parallelism を設定します。

プライマリ ワーカーのシャッフル調整

最も重要なプロパティは --properties yarn:spark.shuffle.io.serverThreads=<num-threads> です。Spark のシャッフル サーバーはノード マネージャーの一部として実行されるため、これはクラスタレベルの YARN プロパティである点に注意してください。マシンではデフォルトでコアが 2 倍になります(たとえば、n1-highmem-8 の場合は 16 スレッド)。[Shuffle Read Blocked Time] が 1 秒を超え、プライマリ ワーカーがネットワーク、CPU、ディスクの上限に達していない場合は、シャッフル サーバーのスレッド数を増やすことを検討してください。

大規模なマシンタイプでは、spark.shuffle.io.numConnectionsPerPeer (デフォルトは 1)を増やすことを検討してください。(たとえば、ホストのペアごとに 5 つの接続を設定します)。

再試行回数の引き上げ

アプリマスター、タスク、ステージに対して許可される最大試行回数は、次のプロパティを設定することによって構成できます。

yarn:yarn.resourcemanager.am.max-attempts
mapred:mapreduce.map.maxattempts
mapred:mapreduce.reduce.maxattempts
spark:spark.task.maxFailures
spark:spark.stage.maxConsecutiveAttempts

正常にデコミッションせずに多くのプリエンプティブル VM または自動スケーリングを使用するクラスタでは、アプリマスターとタスクがより頻繁に終了するため、それらのクラスタ内で上記の値を引き上げることをおすすめします(なお、Spark と正常なデコミッションは EFM とは併用できません)。

HCFS シャッフル用に HDFS を構成する

大規模なシャッフルのパフォーマンスを向上させるには、dfs.namenode.fslock.fair=false を設定して NameNode のロックの競合を減らします。なお、この方法では、個々のリクエストが不足するおそれがありますが、クラスタ全体のスループットが向上します。NameNode のパフォーマンスをさらに向上させるには、--num-master-local-ssds を設定して、ローカル SSD をマスターノードに接続します。--num-worker-local-ssds を設定することで、プライマリ ワーカーにローカル SSD を追加して DataNode のパフォーマンスを向上させることもできます。

HCFS シャッフル用のその他の Hadoop 対応ファイル システム

デフォルトでは、EFM HCFS シャッフル データは HDFS に書き込まれますが、任意の Hadoop 対応ファイル システム(HCFS)を使用できます。たとえば、Cloud Storage または別のクラスタの HDFS にシャッフルを書き込むことができます。ファイル システムを指定するには、クラスタにジョブを送信するときに fs.defaultFS をターゲット ファイル システムに向けます。

EFM クラスタでの YARN の正常なデコミッション

YARN の正常なデコミッションを使用すると、実行中のアプリケーションへの影響を最小限に抑えながらノードを迅速に削除できます。自動スケーリング クラスタの場合、正常なデコミッションのタイムアウトは、EFM クラスタに接続されている AutoscalingPolicy で設定できます。

MapReduce EFM による正常なデコミッションの強化

  1. 中間データは分散ファイル システムに保存されるため、ノード上で実行中のすべてのコンテナが終了するとすぐに、EFM クラスタからノードを削除できます。これに対し、アプリケーションが完了するまでは、標準の Dataproc クラスタ上のノードは削除されません。

  2. ノードの削除は、ノードで実行されているアプリマスターの終了を待機しません。アプリマスターのコンテナが終了されると、デコミッションされていない別のノードで再度スケジュール設定されます。ジョブの進捗率は失われません。新しいアプリマスターは、ジョブ履歴を読み取ることで以前のアプリマスターから状態を迅速に回復します。

MapReduce を使用した EFM クラスタでの正常なデコミッションの実施

  1. プライマリ ワーカーとセカンダリ ワーカーの数が同じ EFM クラスタを作成します。

    gcloud dataproc clusters create cluster-name \
        --properties=dataproc:efm.mapreduce.shuffle=hcfs \
        --region=region \
        --num-workers=5 \
        --num-secondary-workers=5
    

  2. クラスタ上の mapreduce サンプル jar を使用して pi の値を計算する mapreduce ジョブを実行します。

    gcloud dataproc jobs submit hadoop \
        --cluster=cluster-name \
        --region=region \
        --jar=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \
        -- pi 1000 10000000
    

  3. ジョブの実行中に、正常なデコミッションを使用してクラスタをスケールダウンします。

    gcloud dataproc clusters update cluster-name \
        --region=region \
        --num-secondary-workers=0 \
        --graceful-decommission-timeout=1h
    
    ジョブの進捗率の低下を最小限に抑えながら、ジョブが終了する前にノードがクラスタから迅速に削除されます。ジョブの進捗率の一時的な停止は、次の原因で発生する可能性があります。

    • アプリマスターのフェイルオーバー。ジョブの進捗率が 0%に低下し、直後に低下前の値に急激に回復した場合は、アプリマスターが終了され、新しいアプリマスターが状態を復元したと考えられます。フェイルオーバーは迅速に行われるため、このことはジョブの進行にはあまり影響しません。
    • VM プリエンプション。HDFS ではマップタスクの出力の一部ではなく全体が保持されるため、マップタスクの処理中に VM がプリエンプトされると、ジョブの進捗率が一時停止する場合があります。