クラスタの自動スケーリング

は、

ワークロードに対するクラスタ ワーカー(ノード)の「適正な」数を見積もることは困難であり、パイプライン全体の単一の構成は理想的ではないことがよくあります。Cloud Dataproc は、この課題を以下の 2 つの方法で解決するのに役立ちます。

  1. ユーザーが開始するクラスタのスケーリング
  2. クラスタの自動スケーリング。この方法では、Stackdriver Monitoring の Apache Hadoop YARN クラスタ指標が評価された後、ユーザーが設定した最小と最大の境界内でクラスタが動的にスケールアップ / スケールダウンされます。

自動スケーリングの推奨事項

以下のクラスタで自動スケーリングを使用します。

Cloud StorageBigQuery などの外部サービスにデータを格納するクラスタ

多くのジョブを処理するクラスタ

自動スケーリングを使用して、次のことを行います。

単一ジョブのクラスタをスケールアップする

以下の自動スケーリングは推奨されません

  • HDFS: 自動スケーリングは、クラスタ内の HDFS のスケーリングを目的としたものではありません。HDFS で自動スケーリングを使用する場合は、すべての HDFS データを処理するのに十分な数のプライマリ ワーカーがあることを確認してください。また、HDFS データノードをデコミッションすると、ワーカーの削除が遅れる可能性があります。

  • Spark Structured Streaming: 自動スケーリングでは Spark Structured Streaming がサポートされません(自動スケーリングと Spark Structured Streaming をご覧ください)。

  • アイドル状態のクラスタ: クラスタがアイドル状態のときにクラスタを最小サイズまで縮小するための自動スケーリングはおすすめしません。新しいクラスタの作成にかかる時間はサイズを変更するのと同程度ですので、アイドル状態のクラスタを削除して再作成することを検討してください。この「エフェメラル」モデルをサポートするツールは次のとおりです。

    Cloud Dataproc ワークフローを使用して、専用のクラスタ上の一連のジョブをスケジュールし、ジョブが終了したらクラスタを削除します。高度なオーケストレーションを行うには、Apache Airflow に基づく Cloud Composer を使用します。

    アドホック クエリまたは外部でスケジュールされたワークロードを処理するクラスタの場合、指定されたアイドル時間の後、または特定の時刻にクラスタを削除するには、クラスタのスケジュール設定された削除を使用します。

自動スケーリング クラスタの作成

必要なすべての自動スケーリング プロパティを設定することで、自動スケーリング クラスタを作成できます。 プロパティは「dataproc:alpha.autoscaling.」接頭辞で指定します。自動スケーリング プロパティの接尾辞は、自動スケーリングのプロパティ表にリストされています。

次の例では、必要な自動スケーリング プロパティを設定して、自動スケーリング クラスタを作成します。

gcloud beta dataproc clusters create cluster-name --properties "\
dataproc:alpha.autoscaling.enabled=true,\
dataproc:alpha.autoscaling.primary.max_workers=100,\
dataproc:alpha.autoscaling.secondary.max_workers=100,\
dataproc:alpha.autoscaling.cooldown_period=1h,\
dataproc:alpha.autoscaling.scale_up.factor=0.05,\
dataproc:alpha.autoscaling.graceful_decommission_timeout=1h"

初期クラスタサイズ

自動スケーリング クラスタを作成するとき、プライマリ ワーカーとセカンダリ ワーカーの初期数を設定する必要はありません。自動スケーリングはデフォルトのクラスタサイズ(2 個のプライマリ ワーカーと 0 個のセカンダリ ワーカー)から開始し、必要に応じてスケールアップします。ただし、プライマリ ワーカーの初期数を設定するとパフォーマンスが向上する場合があります。たとえば、大規模なクラスタが必要な場合は、クラスタに大きな初期サイズを設定することで、クラスタがスケールアップ サイズに達するまでの間、何度も自動スケーリング処理の完了を待たなくて済みます。

自動スケーリングの仕組み

自動スケーリングでは、クラスタをスケーリングするかどうかを決定するために、各クールダウン期間が経過するたびにクラスタの Hadoop YARN 指標をチェックし、スケーリングする場合は、更新の規模を決定します。

  1. 自動スケーリングでは、ワーカー数に対して必要とされる厳密な変更を判断するために、評価ごとに前回の cooldown_period における保留中のクラスタメモリと使用可能なクラスタメモリの平均値を調べます。

    exact Δworkers = avg(pending memory - available memory) / memory per worker

    • pending memory は、クラスタにキューイングされているものの、まだ実行されていないタスクがあることを示すシグナルであり、ワークロードの処理を改善するためにスケールアップする必要があります。
    • available memory は、クラスタに余分な帯域幅があることを示すシグナルであり、リソースを節約するために縮小する必要があります。
    • これらの Apache Hadoop YARN 指標の詳細については、Hadoop と Spark による自動スケーリングをご覧ください。
  2. ワーカー数の厳密な変更が必要な場合、scale_up.factor または scale_down.factor を使用して、ワーカー数の実際の変更を計算します。

    actual Δworkers = exact Δworkers * scale_factor

    scale_factor が 1.0 の場合は、保留中のメモリや使用可能なメモリが 0 になる(完全に使用されている)ように自動スケーリングされます。

  3. ワーカー数の実際の変更が計算されると、min_worker_fraction がしきい値として機能し、自動スケーリングによってクラスタがスケーリングされるかどうかが判断されます。min_worker_fraction の値が小さい場合、Δworkers の値が小さい場合でも、スケーリングする必要があることを示します。min_worker_fraction の値が大きい場合、Δworkers の値が大きい場合にのみスケーリングを行う必要があることを意味します。

    if (Δworkers > min_worker_fraction * cluster size) then scale

  4. スケーリングするワーカー数がスケーリングをトリガーするのに十分な数である場合、自動スケーリングではワーカー グループの最小と最大の範囲と構成済みの secondary_worker_fraction を使用して、プライマリ インスタンス グループとセカンダリ インスタンス グループの全体でワーカー数を分割する方法を決定します。こうした計算の結果は、スケーリング期間のクラスタに対する最終的な自動スケーリングの変更になります。

自動スケーリングのプロパティ

自動スケーリングはプロパティを設定することで構成します。以下の表にプロパティを示します。

  • 標準のワーカー(ノード)は「プライマリ」ワーカーと呼びます
  • プリエンプティブ ワーカー(ノード)は「セカンダリ」ワーカーと呼びます
  • 必須プロパティにはデフォルトがありません
  • 以下の表にプロパティの接尾辞がリストされています。接尾辞は dataproc:alpha.autoscaling. 接頭辞の末尾に追加され、プロパティ名を構成します(たとえば、自動スケーリングを有効にするために使用されるプロパティの接尾辞は enabled で、完全なプロパティ名は dataproc:alpha.autoscaling.enabled です)。
プロパティの接尾辞 説明
enabled 自動スケーリングを有効にする
必須: ブール値: true または false
primary.min_workers プライマリ ワーカーの最小数
省略可: int 境界: [2, primary.max_workers] デフォルト: 2
primary.max_workers プライマリ ワーカーの最大数
必須: int 境界: [primary.min_workers, )
min_workers セカンダリ ワーカーの最小数
省略可: int 境界: [0, secondary.max_workers] デフォルト: 0
secondary.max_workers セカンダリ ワーカーの最大数。セカンダリ VM は少なくとも 24 時間ごとに終了するため、短期間のクリティカルでないワークロードに最適です。セカンダリ ワーカーの使用を避けるには、0 に設定します。
必須: int 境界: [secondary.max_workers, )
secondary_worker_fraction セカンダリ ワーカーの目標の小数。ワーカー数の境界で許可されていない場合、クラスタ数がこの小数に達することはありません。たとえば、secondary.max_workers=0 の場合、プライマリ ワーカーのみが追加されます。また、作成時にクラスタ数のバランスが取れていない可能性もあります。
省略可: double 境界: [0.0, 1.0] デフォルト: 0.5
cooldown_period スケーリング イベント間の期間。スケーリング期間は、前のイベントでの更新操作が完了した後に開始されます。
必須: duration (s,m,h,d) 境界: [10m, )
scale_up.factor ワーカーを追加する最後のクールダウン期間中における保留中のメモリの平均値の小数。スケールアップ係数が 1.0 の場合、更新後に保留中のメモリが残らないようにスケールアップされます(積極的なスケーリング)。スケールアップ係数が 0 に近いほど、スケールアップの程度は小さくなります(積極的でないスケーリング)。
必須: double 境界: [0.0, 1.0]
scale_up.min_worker_fraction スケーリングが発生する前のクラスタの合計サイズの小数としての最小スケールアップしきい値。たとえば、20 個のワーカーがあるクラスタでは、しきい値が 0.1 に設定されている場合、オートスケーラーはスケーリングするクラスタに対して少なくとも 2 個のワーカーをスケールアップするように推奨するはずです。しきい値が 0 の場合、オートスケーラーは推奨された任意の変更に基づいてスケールアップすることを意味します。
省略可: double 境界: [0.0, 1.0] デフォルト: 0.0
scale_down.factor ワーカーを削除する最後のクールダウン期間中における保留中のメモリの平均値の小数。スケールダウン係数が 1 の場合、更新後に使用可能なメモリが残らないようにスケールダウンされます(積極的なスケーリング)。スケールダウン係数が 0 の場合、ワーカーの削除が無効になります。これは、単一のジョブの自動スケーリングに役立つことがあります。
省略可: double 境界: [0.0, 1.0] デフォルト: 1.0
min_worker_fraction スケーリングが発生する前のクラスタの合計サイズの小数としての最小スケールダウンしきい値。たとえば、20 個のワーカーがあるクラスタでは、しきい値が 0.1 に設定されていると、オートスケーラーはスケーリングするクラスタに対して少なくとも 2 個のワーカーのスケールダウンを推奨するはずです。しきい値が 0 の場合、オートスケーラーは推奨された任意の変更に基づいてスケールダウンすることを意味します。
省略可: double 境界: [0.0, 1.0] デフォルト: 0.0
graceful_decommission_timeout ノード マネージャによる YARN の正常なデコミッションのタイムアウト。ワーカーを強制的に削除する(この場合、ジョブが中断される可能性がある)前にジョブが完了するのを待機する時間を指定します。ダウンスケーリング オペレーションにのみ適用されます。
必須: duration (s,m,h,d) 境界: [0s, 1d]

Apache Hadoop と Apache Spark による自動スケーリング

以下のセクションでは、自動スケーリングの Hadoop YARN、Hadoop Mapreduce、Apache Spark、Spark Streaming、Spark Structured Streaming との相互運用方法について説明します。

Hadoop YARN 指標

自動スケーリングでは、YARN コアリクエストではなく、YARN メモリ リクエストに基づいてジョブをスケジュールするように Hadoop YARN を構成します。

自動スケーリングは、次の Hadoop YARN 指標に基づいて実行されます。

  1. Allocated memory は、クラスタ全体でコンテナを実行することによって使用される合計 YARN メモリを示します。最大 1GB まで使用できる 6 つの実行中のコンテナがある場合、6GB のメモリが割り当てられます。

  2. Available memory は、割り当てられたコンテナで使用されていないクラスタ内の YARN メモリです。すべてのノード マネージャに 10GB のメモリがあり、6GB のメモリが割り当てられている場合は、4GB の利用可能なメモリがあります。クラスタに利用可能な(使用されていない)メモリがある場合、自動スケーリングでクラスタからワーカーが削除されることがあります。

  3. Pending memory は、保留中のコンテナに対する YARN メモリ リクエストの合計です。保留中のコンテナは、YARN で実行するスペースを待機しています。利用可能なメモリが 0 であるか、または少なすぎて次のコンテナに割り当てることができない場合にのみ、保留中のメモリが 0 以外になります。保留中のコンテナがある場合、自動スケーリングでクラスタにワーカーが追加されることがあります。

Stackdriver Monitoring でこうした指標を表示できます。YARN メモリは、クラスタ上の合計メモリに 0.8 を乗じたサイズになります。残りのメモリは、他のデーモンやオペレーティング システムでのページ キャッシュなどの他の用途のために予約されています。

自動スケーリングと Hadoop MapReduce

MapReduce は各マップを実行し、独立した YARN コンテナとしてタスクを削減します。ジョブが開始されると、MapReduce は各マップタスクに対するコンテナ リクエストを送信するため、保留中の YARN メモリが急増します。マップタスクが完了すると、保留中のメモリは減少します。

mapreduce.job.reduce.slowstart.completedmaps が完了すると(Cloud Dataproc でのデフォルトは 95%)、MapReduce はすべてのレデューサに対するコンテナのリクエストをキュに入れ、その結果、保留メモリが急増します。

マップやタスクの削減に数分以上かかる場合を除いて、dataproc:alpha.autoscaling.scale_up.factor に対して大きな値を設定しないでください。クラスタにワーカーを追加するには少なくとも 1.5 分かかるので、新しいワーカーを利用するための十分な保留作業を行えるように数分間を確保してください。最初に dataproc:alpha.autoscaling.scale_up.factor を保留中のメモリの 0.05(5%)または 0.1(10%)に設定するとよいでしょう。

自動スケーリングと Spark

Spark は、YARN の上にスケジューリングのためのレイヤを追加します。具体的には、Spark Core の動的割り当てにより、コンテナで Spark エグゼキュータを実行するように YARN にリクエストし、それらのエグゼキュータのスレッドで Spark タスクをスケジュールします。Cloud Dataproc クラスタではデフォルトで動的割り当てが有効になっているため、エグゼキュータは必要に応じて追加されたり削除されたりします。

Spark は常に YARN にコンテナを要求しますが、動的割り当てが行われない場合は、ジョブの開始時にのみコンテナを要求します。動的割り当てが行われる場合、必要に応じてコンテナを削除したり、新しいコンテナをリクエストしたりします。

Spark は少数(自動スケーリング クラスタ上で 2 つ)のエグゼキュータから開始されます。未処理のタスクがある間は、エグゼキュータの数を倍増し続けます。これにより、保留中のメモリが少なくなります(保留中のメモリが急増することが少なくなります)。Spark ジョブに対して、dataproc:alpha.autoscaling.scale_up.factor に 1.0(100%)などの大きな数値を設定することをおすすめします。

Spark のデフォルト値では、キャッシュに保存されたデータがあるエグゼキュータは終了せず(spark.dynamicAllocation.cachedExecutorIdleTimeout=0)、キャッシュに保存されたデータのないエグゼキュータは 60 秒後に終了します(spark.dynamicAllocation.executorIdleTimeout=60)。Cloud Dataproc は外部の YARN ベースのシャッフル サービスを構成するため、エグゼキュータはシャッフル データが提供される前に終了することがあります。したがって、キャッシュに保存されたデータがあるエグゼキュータが最終的に終了するように、cachedExecutorIdleTimeout に 0 以外を設定します。また、dataproc:alpha.autoscaling.graceful_decommission_timeout に 0 以外を設定すると、自動スケーリングで削除される前に、ワーカー上のシャッフル データがドレインされます。

Spark の動的割り当てを無効にする

Spark の動的割り当ての恩恵を受けない個別の Spark ジョブを実行している場合は、spark.dynamicAllocation.enabled=falsespark.executor.instances を設定することで、Spark の動的割り当てを無効にできます。別の Spark ジョブの実行中でも、自動スケーリングを使用してクラスタをスケーリングできます。

自動スケーリングと Spark Streaming

自動スケーリングを Spark Streaming と一緒に使用するには:

  1. Spark Streaming には、ストリーミングに固有のシグナルを使用してエグゼキュータの追加や削除を行う独自のバージョンの動的割り当てがあるため、spark.streaming.dynamicAllocation.enabled=true を設定した後、spark.dynamicAllocation.enabled=false を設定して Spark Core の動的割り当てを無効にします。

  2. spark.executor.instances を考慮する必要がある Spark Streaming の動的割り当ての問題が修正されるまで、初期化アクションを使用して /etc/spark/conf/spark-defaults.conf から spark.executor.instances を削除します。

  3. 正常なデコミッションdataproc:alpha.autoscaling.graceful_decommission_timeout)は、Spark Streaming ジョブには適用されません。代わりに、自動スケーリングでワーカーを安全に削除するには、フォールト トレランスのためにチェックポイントを構成します。

または、自動スケーリングなしで Spark Streaming を使用するには:

  1. Spark Core の動的割り当てを無効にします(spark.dynamicAllocation.enabled=false)。
  2. ジョブのエグゼキュータの数を設定します(spark.executor.instances)。クラスタ プロパティをご覧ください。

自動スケーリングと Spark Structured Streaming

現在、Spark Structured Streaming では動的割り当てがサポートされていないため、自動スケーリングには Spark Structured Streaming との互換性がありません(SPARK-24815: Structured Streaming should support dynamic allocation をご覧ください)。

ダウンスケーリングの考慮事項と推奨事項

  • データのシャッフル: Hadoop MapReduce では、マップタスクでシャッフル データがローカル ディスクに書き込まれます。これは、ノード マネージャ上で実行されているサーバー経由でレデューサ タスクに使用されます。タスクが実行されていないときでも、シャッフル データがあるワーカーを削除すると、マップタスクを再実行する必要がある場合にジョブの進行状況が巻き戻される可能性があるため、問題が発生することがあります。また、Spark はステージ境界の間でデータをシャッフルします。シャッフル ファイルが足りないことを検出した場合、ステージ全体を再実行します。

    推奨事項: マルチジョブ クラスタでは、dataproc:alpha.autoscaling.graceful_decommission_timeout を設定して、自動スケーリングでワーカーが削除される前に実行中のジョブが完了するために十分な時間を確保します。一般的に、ワーカーが削除される前にすべてのジョブが完了するように、タイムアウトを最長のジョブ時間に設定します。

    代替戦略 1: 利用可能なメモリのサイズに関係なくワーカーが削除されることがないように、dataproc:alpha.autoscaling.scale_down.factor=0.0 を設定して、自動スケーリングのダウンスケーリングを無効にします。この戦略は、ダウンスケーリングによってジョブの進捗状況が巻き戻される可能性がある単一ジョブクラスタに役立つことがあります。

    代替戦略 2: dataproc:alpha.autoscaling.scale_down.factor=1.0dataproc:alpha.autoscaling.scale_down.min_worker_fraction=1.0 を設定して、クラスタがアイドル状態のときだけスケールダウンするようにクラスタを設定します。

  • キャッシュに保存されたデータ: Spark では、エグゼキュータのメモリまたはディスクにデータセットを保存できます。通常、Spark のエグゼキュータは処理する作業がないと終了しますが、データがキャッシュに保存されている場合、デフォルトでは終了することはありません。したがって、キャッシュに保存されたデータが存在するノートブックやその他のアプリケーションでは、アプリケーションが不要になった後もエグゼキュータは存続し、自動スケーリングでクラスタがスケールダウンされることはありません。

    推奨事項: キャッシュに保存されたデータがあるエグゼキュータが最終的に終了するようにするには、spark.dynamicAllocation.cachedExecutorIdleTimeout を設定するか、不要になったデータセットのキャッシュを解除することを検討してください。

パーティショニングと並列処理による自動スケーリングの制御

並列化は通常、クラスタ リソースによって設定または決定されます(たとえば、複数のタスクで制御される HDFS ブロック数)。自動スケーリングではその逆が適用され、ジョブの並列性に従ってクラスタ リソース(ワーカー)が設定されます。ジョブの並列処理の設定に役立つガイドラインは次のとおりです。

  • Cloud Dataproc ではクラスタの最初のクラスタサイズに基づいて MapReduce で減らすタスクのデフォルト数を設定する一方で、mapreduce.job.reduces を設定して、減らす段階の並列処理数を増加させることができます。
  • Spark SQL と Dataframe の並列処理数は spark.sql.shuffle.partitions によって指定され、デフォルトは 200 です。
  • Spark の RDD 関数のデフォルトは spark.default.parallelism で、ジョブの開始時のエグゼキュータの数に関連しています。ただし、シャッフルを作成するすべての RDD 関数が、spark.default.parallelism をオーバーライドするパーティション数のパラメータをとります。

データが均等に分割されていることを確認する必要があります。重大なキースキューがある場合、1 つ以上のタスクが他のタスクよりも大幅に時間がかかり、結果として使用率が低くなる可能性があります。

自動スケーリングのデフォルトの Spark / Hadoop プロパティ設定

自動スケーリング クラスタには、プライマリ ワーカーが削除されたときやセカンダリ ワーカーがプリエンプトされたときのジョブの失敗を回避するのに役立つデフォルトのクラスタ プロパティ値があります。自動スケーリングを使用してクラスタを作成するときに、デフォルト値をオーバーライドできます(クラスタ プロパティをご覧ください)。

以下は、タスク、アプリケーション マスター、ステージの最大再試行回数を増やすデフォルト設定です。

yarn:yarn.resourcemanager.am.max-attempts=10
mapred:mapreduce.map.maxattempts=10
mapred:mapreduce.reduce.maxattempts=10
spark:spark.task.maxFailures=10
spark:spark.stage.maxConsecutiveAttempts=10

以下は再試行カウンタをリセットするデフォルト設定です(長時間実行する Spark Streaming ジョブに役立ちます)。

spark:spark.yarn.am.attemptFailuresValidityInterval=1h
spark:spark.yarn.executor.failuresValidityInterval=1h

以下は、Spark の小規模から開始する slow-start 動的割り当てメカニズムのデフォルト設定です。

spark:spark.executor.instances=2

自動スケーリング指標とログ

次のリソースとツールは、自動スケーリングの操作と、クラスタとそのジョブへの影響をモニタリングするのに役立ちます。

Stackdriver Monitoring

Stackdriver Monitoring を使用すると、以下のことを行えます。

  • 自動スケーリングで使用される指標を表示する
  • クラスタ内のノード マネージャの数を表示する
  • 自動スケーリングでクラスタリングが行われた理由、または行われなかった理由を理解する autoscaling-stackdriver1 autoscaling-stackdriver2 autoscaling-stackdriver3

Stackdriver Audit Logging

Cloud Audit Logging を使用して、自動スケーリングでクラスタがどのようにサイズ変更されたかを確認します。

よくある質問(FAQ)

高可用性クラスタと単一ノードクラスタで自動スケーリングを有効にすることはできますか?

高可用性クラスタでは自動スケーリングを有効にできますが、単一ノードクラスタでは有効にできません(単一ノードクラスタではサイズ変更がサポートされません)。

クラスタの作成後、または実行中のクラスタに対して自動スケーリングのプロパティを変更できますか?

いいえ。現在、自動スケーリングのプロパティは、クラスタの作成後に変更することも、実行中のクラスタに追加することもできません。

自動スケーリング クラスタのサイズを手動で変更できますか?

はい。自動スケーリングで正しい判断が行われない場合や、ワークロードの構成設定を調整する場合は、一時しのぎの措置としてクラスタのサイズを手動で変更できます。ただし、こうした変更には一時的な効果しかないため、自動スケーリングで最終的にクラスタが縮小されます。

手動でクラスタのサイズを変更する代わりに、次の点を考慮してください。

改善されたワークロード構成でクラスタを削除して再作成する。

Cloud Dataproc のヘルプを取得する

どのイメージ バージョンで自動スケーリングがサポートされますか?どの API バージョンですか?

自動スケーリングではバージョン 1.2.22 以上と 1.3.0 以上でサポートされます。現在、自動スケーリングはバージョン 1.0 と 1.1 ではサポートされません(Cloud Dataproc のバージョン リストをご覧ください)。現在、自動スケーリングはアルファ版リリースで提供されていますが、Cloud Dataproc v1 または v1beta2 の API を使用して有効にできます。

Cloud Dataproc と Cloud Dataflow の自動スケーリングはどのように違いますか?

Cloud Dataflow の自動スケーリングを Spark や Hadoop と比較するをご覧ください。

自動スケーリングには多くの設定可能なプロパティがあります。自動スケーリング クラスタを作成する簡単な方法はありますか?

YAML ファイルからクラスタを作成できます。ファイル形式は REST API と直接一致します(自動スケーリングのプロパティクラスタ プロパティの文字列としてエンコードする必要があります)。

例:

gcloud beta dataproc clusters create-from-file --file=cluster.yaml
$ cat cluster.yaml
projectId: PROJECT
clusterName: NAME
config:
  gceClusterConfig:
    zoneUri: us-central1-a
  softwareConfig:
    properties:
      dataproc:alpha.autoscaling.enabled: 'true'
      dataproc:alpha.autoscaling.primary.max_workers: '100'
      dataproc:alpha.autoscaling.secondary.max_workers: '100'
      dataproc:alpha.autoscaling.cooldown_period: '1h'
      dataproc:alpha.autoscaling.scale_up.factor: '0.05'
      dataproc:alpha.autoscaling.graceful_decommission_timeout: '1h'
このページは役立ちましたか?評価をお願いいたします。

フィードバックを送信...

Cloud Dataproc ドキュメント
ご不明な点がありましたら、Google のサポートページをご覧ください。