クラスタの自動スケーリング

自動スケーリングとは

ワークロードに対するクラスタ ワーカー(ノード)の「適正な」数を見積もることは困難であり、パイプライン全体で単一のクラスタサイズを使用することは多くの場合、理想的ではありません。ユーザーが開始するクラスタのスケーリングは、この課題に部分的に対処しますが、クラスタの使用状況のモニタリングと手動による操作が必要になります。

Dataproc AutoscalingPolicies API を使用すると、クラスタ リソース管理を自動化するメカニズムが提供され、クラスタの自動スケーリングが可能になります。Autoscaling Policy は、クラスタが自動スケーリング ポリシーを使用する方法を記述する再利用可能な構成です。スケーリングの境界や頻度、積極性を定義し、クラスタ存続期間中のクラスタ リソースをきめ細かく制御します。

自動スケーリングを使用する場合

次の場合、自動スケーリングを使用します。

Cloud StorageBigQuery などの外部サービスにデータを格納するクラスタ

多数のジョブを処理するクラスタ

単一ジョブをスケールアップするクラスタ

以下の自動スケーリングは推奨されません

  • 高可用性クラスタ: 代わりに標準クラスタを使用します。標準クラスタは、断続的なサイズ変更操作の後も、より安定しています。

  • HDFS: 自動スケーリングは、クラスタ内の HDFS のスケーリングを目的としたものではありません。HDFS で自動スケーリングを使用する場合は、プライマリ ワーカーの最小数が、全 HDFS データの処理に十分なことを確認してください。また、HDFS データノードをデコミッションすると、ワーカーの削除が遅れる可能性があります。

  • YARN ノードラベル: 自動スケーリングでは、YARN ノードラベルや、YARN-9088 のプロパティ dataproc:am.primary_only はサポートされません。ノードラベルが使用された場合、YARN が誤ってクラスタ指標を報告します。

  • Spark Structured Streaming: 自動スケーリングでは Spark Structured Streaming がサポートされません(自動スケーリングと Spark Structured Streaming をご覧ください)。

  • アイドル状態のクラスタ: クラスタがアイドル状態のときにクラスタを最小サイズまで縮小するための自動スケーリングはおすすめしません。新しいクラスタの作成にかかる時間はサイズ変更の場合と同程度のため、アイドル状態のクラスタを削除して再作成することを検討してください。この「エフェメラル」モデルをサポートするツールは次のとおりです。

    Dataproc ワークフローを使用して、専用クラスタで一連のジョブをスケジュールし、ジョブが完了したらクラスタを削除します。高度なオーケストレーションには、Apache Airflow に基づく Cloud Composer を使用します。

    アドホック クエリまたは外部でスケジュールされたワークロードを処理するクラスタの場合、指定されたアイドル時間(期間)の後、または特定の時刻にクラスタを削除するには、クラスタのスケジュール設定された削除を使用します。

自動スケーリングを有効にする

クラスタで自動スケーリングを有効にするには、次の手順を行います。

  1. 自動スケーリング ポリシーを作成します

  2. 次のいずれかを実行します。

    1. 自動スケーリング クラスタを作成する
    2. 既存のクラスタで自動スケーリングを有効にする

自動スケーリング ポリシーを作成する

gcloud コマンド

gcloud dataproc autoscaling-policies import コマンドを使用すると、自動スケーリング ポリシーを作成できます。このコマンドは、自動スケーリング ポリシーを定義するローカルの YAML ファイルを読み取ります。ファイルの形式と内容は、autoscalingPolicies REST API で定義された構成オブジェクトやフィールドと一致している必要があります。

次の YAML の例は、すべての必須フィールドを指定するポリシーを定義しています。また、プライマリ ワーカーとセカンダリ(プリエンプティブル)ワーカーの両方の maxInstances 値を提供し、4 分の cooldownPeriod を指定します(デフォルトは 2 分)。

workerConfig:
  maxInstances: 100
secondaryWorkerConfig:
  maxInstances: 50
basicAlgorithm:
  cooldownPeriod: 4m
  yarnConfig:
    scaleUpFactor: 0.05
    scaleDownFactor: 1.0
    gracefulDecommissionTimeout: 1h

次の YAML の例は、自動スケーリング ポリシーのすべてのオプションと必須のフィールドを指定しています。

workerConfig:
  minInstances: 2
  maxInstances: 100
  weight: 1
secondaryWorkerConfig:
  minInstances: 0
  maxInstances: 100
  weight: 1
basicAlgorithm:
  cooldownPeriod: 4m
  yarnConfig:
    scaleUpFactor: 0.05
    scaleDownFactor: 1.0
    scaleUpMinWorkerFraction: 0.0
    scaleDownMinWorkerFraction: 0.0
    gracefulDecommissionTimeout: 1h

ローカルのターミナルから、または Cloud Shell で次の gcloud コマンドを実行して、自動スケーリング ポリシーを作成します。ポリシーの名前を指定します。この名前はポリシー id となり、後ほど gcloud コマンドで使用してポリシーを参照できます。--source フラグを使用して、インポートする自動スケーリング ポリシー YAML ファイルのローカル ファイルパスとファイル名を指定します。

gcloud dataproc autoscaling-policies import policy-name \
    --source=filepath/filename.yaml \
    --region=region

REST API

autoscalingPolicies.create リクエストの一部として AutoscalingPolicy を定義して、自動スケーリング ポリシーを作成します。

コンソール

現在、Google Cloud Console での自動スケーリング ポリシーの作成はサポートされていません。

自動スケーリング クラスタを作成する

自動スケーリング ポリシーを作成した後、自動スケーリング ポリシーを使用するクラスタを作成します。クラスタは、自動スケーリング ポリシーと同じリージョンに存在する必要があります。

gcloud コマンド

ローカルのターミナルや Cloud Shell で、次の gcloud コマンドを実行し、自動スケーリング クラスタを作成します。クラスタ名を指定し、--autoscaling-policy フラグを使用して policy idポリシーを作成したときに指定したポリシー名)を指定するか、ポリシー resource URI (resource name) を指定してください(AutoscalingPolicy の idname フィールドをご覧ください)。

gcloud dataproc clusters create cluster-name \
    --autoscaling-policy=policy id or resource URI \
    --region=region

REST API

clusters.create リクエストの一部として AutoscalingConfig を含めて、自動スケーリング クラスタを作成します。

コンソール

Cloud Console の [クラスタの作成] ページの [自動スケーリング ポリシー] セクションから、新しいクラスタに適用する既存の自動スケーリング ポリシーを選択します。

既存のクラスタで自動スケーリングを有効にする

自動スケーリング ポリシーを作成した後、同じリージョンの既存のクラスタで作成したポリシーを有効にできます。

gcloud コマンド

既存のクラスタで自動スケーリング ポリシーを有効にするには、ローカルのターミナルや Cloud Shell で、次の gcloud コマンドを実行します。クラスタ名を指定し、--autoscaling-policy フラグを使用して policy idポリシーを作成したときに指定したポリシー名)を指定するか、ポリシー resource URI (resource name) を指定してください(AutoscalingPolicy の idname フィールドをご覧ください)。

gcloud dataproc clusters update cluster-name \
    --autoscaling-policy=policy id or resource URI \
    --region=region

REST API

既存のクラスタで自動スケーリング ポリシーを有効にするには、clusters.patch リクエストの updateMask にあるポリシーの AutoscalingConfig.policyUri を設定します。

コンソール

現在のところ、Google Cloud Console で、既存のクラスタで自動スケーリング ポリシーを有効にすることはサポートされていません。

自動スケーリングの仕組み

自動スケーリングでは、クラスタをスケールするかどうかを決定するために、各「クールダウン」期間が経過するたびにクラスタの Hadoop YARN 指標をチェックし、スケールする場合は、更新の規模を決定します。

  1. 自動スケーリングは、ワーカー数に対して必要とされる厳密な変更を判断するために、評価ごとに前回の cooldown_period の保留中および使用可能なクラスタメモリの平均を調べます。

    exact Δworkers = avg(pending memory - available memory) / memory per node manager

    • pending memory は、キューに登録されているがまだ実行されていないタスクがクラスタにあることを占めるシグナルで、ワークロードの処理を改善するためにスケールアップする必要があります。
    • available memory は、正常なノードでクラスタに余分な帯域幅があることを示すシグナルであり、リソースを節約するためにスケールダウンすることが必要な場合があります。
    • これらの Apache Hadoop YARN 指標の詳細については、Hadoop と Spark による自動スケーリングをご覧ください。
  2. ワーカー数の厳密な変更が必要な場合、自動スケーリングでは、scaleUpFactorscaleDownFactor を使用してワーカー数の実際の変化を計算します。

    if exact Δworkers > 0:
      actual Δworkers = ROUND_UP(exact Δworkers * scaleUpFactor)
      # examples:
      # ROUND_UP(exact Δworkers=5 * scaleUpFactor=0.5) = 3
      # ROUND_UP(exact Δworkers=0.8 * scaleUpFactor=0.5) = 1
    else:
      actual Δworkers = ROUND_DOWN(exact Δworkers * scaleDownFactor)
      # examples:
      # ROUND_DOWN(exact Δworkers=-5 * scaleDownFactor=0.5) = -2
      # ROUND_DOWN(exact Δworkers=-0.8 * scaleDownFactor=0.5) = 0
      # ROUND_DOWN(exact Δworkers=-1.5 * scaleDownFactor=0.5) = 0
    
    scaleUpFactor や scaledownFactor が 1.0 の場合、保留中または使用可能なメモリが 0(完全に使用されている)になるように自動スケーリングが行われます。

  3. 実際のワーカー数の変化が計算されると、scaleUpMinWorkerFractionscaleDownMinWorkerFraction のどちらかがしきい値として機能し、自動スケーリングによってクラスタがスケールされるかどうか判断されます。Δworkers が小さい場合でも、割合が小さければ自動スケーリングが必要であることを意味します。Δworkers が大きい場合は、割合が大きい場合のみ自動スケーリングが行われます。

    if (Δworkers >  scaleUpMinWorkerFraction* cluster size) then scale up
    または
    if (abs(Δworkers) >  scaleDownMinWorkerFraction* cluster size) then scale down

  4. スケールするワーカーの数がスケーリングのトリガーに十分な数の場合、自動スケーリングは、workerConfigsecondaryWorkerConfigweight(プライマリ ワーカーとセカンダリ ワーカーの比率)の minInstances maxInstances の範囲を使用して、プライマリ ワーカー インスタンス グループとセカンダリ ワーカー インスタンス グループでワーカー数を分割する方法を決定します。こうした計算の結果は、スケーリング期間のクラスタに対する最終的な自動スケーリングの変更になります。

正常なデコミッション

自動スケーリングでは、クラスタからノードを削除するときの YARN 正常なデコミッションがサポートされます。正常なデコミッションにより、アプリケーションはジョブの進行を妨げないよう、ステージ間でシャッフル データを完了できます。自動スケーリング ポリシーで提供される「正常なデコミッション タイムアウト」は、YARN がノードを削除する前にアプリケーション(デコミッションの開始時に実行されていたアプリケーション)の実行を待機する時間の上限です。

マルチクラスタ ポリシーの使用

  • 自動スケーリング ポリシーは、複数のクラスタに適用できるスケーリングの動作を定義します。自動スケーリング ポリシーは、類似したワークロードを共有したり、類似したリソース使用パターンでジョブを実行したりする場合に最適です。

  • 複数のクラスタで使用されているポリシーを更新できます。更新は、ポリシーを使用するすべてのクラスタの自動スケーリングの動作に直ちに影響します(autoscalingPolicies.update を参照)。ポリシーを使用しているクラスタにポリシーの更新を適用しない場合は、ポリシーを更新する前にクラスタで自動スケーリングを無効にします。

gcloud コマンド

クラスタで自動スケーリングを無効にするには、ローカルのターミナルや Cloud Shell で、次の gcloud コマンドを実行します。

gcloud dataproc clusters update cluster-name --disable-autoscaling \
    --region=region

REST API

クラスタで自動スケーリングを無効にするには、AutoscalingConfig.policyUri に空の文字列を設定し、update_mask=config.autoscaling_config.policy_uriclusters.patch リクエストに設定します。

コンソール

現在、Google Cloud Console では、クラスタでの自動スケーリングの無効化はサポートされていません。

  • 1 つ以上のクラスタで使用されているポリシーは削除できません(autoscalingPolicies.delete を参照してください)。

自動スケーリングの推奨事項

  • クールダウン期間: cooldownPeriod の最小値とデフォルト値は 2 分です ポリシーにより短い cooldownPeriod を設定した場合、ワークロードの変化がクラスタのサイズにより早く影響を与えますが、クラスタが不必要にスケールアップやスケールダウンされる可能性があります。より短い cooldownPeriod を使う場合は、ポリシーでは、scale_upscale_downmin_worker_fractions をゼロ以外の値に設定することをおすすめします。この設定により、クラスタを更新する正当な理由に値するメモリ使用量の変化がある場合に限り、クラスタがスケールアップまたはスケールダウンされます。

  • スケールダウン: MapReduce と Spark は、中間シャッフル データをローカル ディスクに書き込みます。シャッフル データを持つワーカーを削除すると、ジョブの進行が遅れます。シャッフル データを再現するにはマップタスクを再実行する必要があります。Spark は、シャッフル ファイルがないことを検出すると、ステージ全体を再実行します。

    • クラスタがアイドル状態のときにのみスケールダウンするには、scale_down factorscale_down min_worker_fraction を 1.0 に設定します。

    • 連続負荷のあるクラスタの場合、YARN コンテナを実行していないクラスタのノードを削除するため、scale_down 係数を 1.0 に設定し、graceful_decommission_timeout をゼロ以外に設定して、ジョブ間のスケールダウンを構成します(正常なデコミッションをご覧ください)。graceful_decommission_timeout を最長実行のクラスタジョブよりも長く設定して、ジョブが完了する前にノードが強制的にデコミッションされないようにします。

    • ノードを削除するときにジョブの進行状況が失われないように、強化された柔軟性モードの使用を検討してください。

  • キャッシュに保存されたデータがある Spark ジョブ: spark.dynamicAllocation.cachedExecutorIdleTimeout や、データセットが不要になったときにキャッシュに保存されたデータセットを解放するように設定します。デフォルトでは、Spark はキャッシュされたデータをもつエグゼキュータは削除しません。

Apache Hadoop と Apache Spark による自動スケーリング

続くセクションでは、自動スケーリングと Hadoop YARN、Hadoop Mapreduce、Apache Spark、Spark Streaming、Spark Structured Streaming との連携について説明します。

Hadoop YARN 指標

自動スケーリングでは、YARN コアリクエストではなく、YARN メモリ リクエストに基づいてジョブをスケジュールするように Hadoop YARN を構成します。

自動スケーリングは、次の Hadoop YARN 指標に基づいて実行されます。

  1. Allocated memory は、コンテナを実行することによってクラスタ全体で使用される YARN メモリの合計を指します。6 つの実行中のコンテナがあり、それぞれ最大 1GB まで使用できる場合は、6GB のメモリが割り当てられます。

  2. Available memory は、割り当てられたコンテナで使用されない、クラスタ内の YARN メモリです。すべてのノード マネージャーに 10GB のメモリがあり、6GB のメモリが割り当てられている場合、利用可能なメモリは 4GB あります。クラスタに利用可能な(使用されていない)メモリがある場合、自動スケーリングでクラスタからワーカーが削除されることがあります。

  3. Pending memory は、保留中のコンテナに対する YARN メモリ リクエストの合計です。保留中のコンテナは、YARN で実行するスペースを待機しています。利用可能なメモリが 0、または少なすぎて次のコンテナに割り当てることができない場合にのみ、保留中のメモリが 0 以外になります。保留中のコンテナがある場合、自動スケーリングでクラスタにワーカーが追加されることがあります。

これらの指標は Cloud Monitoring で確認できます。 デフォルトでは、クラスタ上の YARN メモリは 0.8 * 合計メモリとなり、残りのメモリは他のデーモンや、ページ キャッシュなどオペレーティング システムでの使用に予約されます。このデフォルト値は、「yarn.nodemanager.resource.memory-mb」の YARN 構成設定でオーバーライドできます(Apache Hadoop YARN、HDFS、Spark、関連プロパティを参照)。

自動スケーリングと Hadoop MapReduce

MapReduce は各マップを実行し、独立した YARN コンテナとしてタスクを削減します。ジョブが開始されると、MapReduce は各マップタスクに対するコンテナ リクエストを送信するため、保留中の YARN メモリが急増します。マップタスクが完了すると、保留中のメモリは減少します。

mapreduce.job.reduce.slowstart.completedmaps が完了すると(Dataproc でのデフォルトは 95%)、MapReduce によりすべてのレデューサに対するコンテナ リクエストがキューに追加され、保留メモリがさらに急増します。

タスクのマッピングと縮小に数分以上かかる場合を除き、自動スケーリング scaleUpFactor に高い値を設定しないでください。クラスタにワーカーを追加するには少なくとも 1.5 分かかるので、新しいワーカーを利用するための十分な保留作業を行えるように数分間を確保してください。最初は scaleUpFactor を保留中のメモリの 0.05(5%)または 0.1(10%)に設定することをおすすめします。

自動スケーリングと Spark

Spark は、YARN の上にスケジューリングのためのレイヤを追加します。具体的には、Spark Core のダイナミック アロケーションにより、コンテナで Spark エグゼキュータを実行するように YARN にリクエストし、それらのエグゼキュータのスレッドで Spark タスクをスケジュールします。Dataproc クラスタではデフォルトでダイナミック アロケーションが有効になっているため、エグゼキュータは必要に応じて追加削除されます。

Spark は常に YARN にコンテナを要求しますが、動的割り当てが行われない場合は、ジョブの開始時にのみコンテナを要求します。動的割り当てが行われる場合、必要に応じてコンテナを削除したり、新しいコンテナをリクエストしたりします。

Spark は少数(自動スケーリング クラスタ上で 2 つ)のエグゼキュータから開始されます。未処理のタスクがある間は、エグゼキュータの数を倍増し続けます。これにより、保留中のメモリが少なくなります(保留中のメモリが急増することが少なくなります)。 Spark ジョブの場合は、自動スケーリング scaleUpFactor に 1.0(100%)などの大きい数を設定することをおすすめします。

Spark の動的割り当てを無効にする

Spark ダイナミック アロケーションのメリットがない別の Spark ジョブを実行する場合は、spark.dynamicAllocation.enabled=falsespark.executor.instances を設定して Spark ダイナミック アロケーションを無効にできます。別の Spark ジョブの実行中でも、自動スケーリングを使用してクラスタをスケールできます。

自動スケーリングと Spark Streaming

  1. Spark Streaming には、ストリーミング固有のシグナルを使用してエグゼキュータの追加や削除を行う独自のバージョンのダイナミック アロケーションがあるため spark.streaming.dynamicAllocation.enabled=true を設定し、Spark Core のダイナミック アロケーションを無効にする spark.dynamicAllocation.enabled=false を設定します。

  2. Spark Streaming ジョブと一緒に正常なデコミッション(自動スケーリング gracefulDecommissionTimeout)を使用しないでください。代わりに、自動スケーリングでワーカーを安全に削除するには、フォールト トレランスのためにチェックポイントを構成します。

または、自動スケーリングなしで Spark Streaming を使用するには、次の手順を行います。

  1. Spark Core のダイナミック アロケーション(spark.dynamicAllocation.enabled=false)を無効にする。
  2. ジョブのエグゼキュータの数(spark.executor.instances)を設定する。クラスタ プロパティをご覧ください。

自動スケーリングと Spark Structured Streaming

現在、Spark Structured Streaming では動的割り当てがサポートされていないため、自動スケーリングには Spark Structured Streaming との互換性がありません(SPARK-24815: Structured Streaming should support dynamic allocation をご覧ください)。

パーティショニングと並列処理による自動スケーリングの制御

通常、並列処理はクラスタ リソースによって設定または決定されます(たとえば、HDFS ブロック数はタスクの数に応じて制御されます)が、自動スケーリングでは反対になります。自動スケーリングでは、ジョブの並列性に従ってワーカーの数が設定されます。ジョブの並列処理の設定に役立つガイドラインは次のとおりです。

  • Dataproc では、初期のクラスタサイズに基づいて MapReduce のデフォルトの削減タスク数が設定されますが、mapreduce.job.reduces を設定して削減フェーズの並列処理を増やすことができます。
  • Spark SQL と Dataframe の並列処理は spark.sql.shuffle.partitions によって決まります。デフォルトは 200 です。
  • Spark の RDD 関数はデフォルトで spark.default.parallelism に設定されます。これは、ジョブの開始時にワーカーノードのコア数に設定されます。ただし、シャッフルを作成する RDD 関数はすべて、spark.default.parallelism をオーバーライドするパーティションの数のパラメータをとります。

データが均等に分割されていることを確認する必要があります。重大なキースキューがある場合、1 つ以上のタスクが他のタスクよりも大幅に時間がかかり、結果として使用率が低くなる可能性があります。

自動スケーリングのデフォルトの Spark / Hadoop プロパティ設定

自動スケーリング クラスタには、プライマリ ワーカーが削除されたときやセカンダリ ワーカーがプリエンプトされたときのジョブの失敗を回避するのに役立つデフォルトのクラスタ プロパティ値があります。これらのデフォルト値は、自動スケーリングを使用してクラスタを作成するときにオーバーライドできます(クラスタ プロパティ を参照してください)。

以下は、タスク、アプリケーション マスター、ステージの最大再試行回数を増やすデフォルト設定です。

yarn:yarn.resourcemanager.am.max-attempts=10
mapred:mapreduce.map.maxattempts=10
mapred:mapreduce.reduce.maxattempts=10
spark:spark.task.maxFailures=10
spark:spark.stage.maxConsecutiveAttempts=10

以下は再試行カウンタをリセットするデフォルト設定です(長時間実行する Spark Streaming ジョブに役立ちます)。

spark:spark.yarn.am.attemptFailuresValidityInterval=1h
spark:spark.yarn.executor.failuresValidityInterval=1h

以下は、Spark の小規模から開始する slow-start ダイナミック アロケーション メカニズムのデフォルト設定です。

spark:spark.executor.instances=2

自動スケーリング指標とログ

次のリソースとツールは、自動スケーリングの操作と、クラスタとそのジョブへの影響をモニタリングするのに役立ちます。

Cloud Monitoring

Cloud Monitoring を使用すると、次のことができます。

  • 自動スケーリングで使用される指標を表示する
  • クラスタ内のノード マネージャの数を表示する
  • 自動スケーリングでクラスタのスケールが行われた理由、または行われなかった理由を理解する autoscaling-stackdriver1autoscaling-stackdriver2autoscaling-stackdriver3

Cloud Logging

Cloud Logging を使用して、Cloud Dataproc Autoscaler からログを表示します。

1)クラスタのログを検索します。

autoscaling-logs-for-cluster

2)dataproc.googleapis.com/autoscaler を選択します。

autoscaling-log-file

3)ログメッセージを展開して status フィールドを表示します。ログは、機械で読み取り可能な JSON 形式です。

autoscaling-three-logs autoscaling-update-operation

4)ログメッセージを展開して、スケーリングの推奨値、スケーリングの決定に使用する指標、オリジナルのクラスタサイズ、新しいクラスタサイズを表示します。

autoscaling-recommendation-message

よくある質問(FAQ)

高可用性クラスタと単一ノード クラスタで自動スケーリングを有効にすることはできますか?

高可用性クラスタでは自動スケーリングを有効にできますが、単一ノードクラスタでは有効にできません(単一ノードクラスタではサイズ変更がサポートされません)。

自動スケーリング クラスタのサイズを手動で変更できますか?

○。自動スケーリング ポリシーを調整するときは、暫定措置として、クラスタのサイズを手動で変更できます。ただし、こうした変更には一時的な効果しかないため、最終的には自動スケーリングでクラスタが縮小されます。

自動スケーリング クラスタのサイズを手動で変更する代わりに、次のことを検討してください。

自動スケーリング ポリシーを更新する。自動スケーリング ポリシーに加えた変更は、現在そのポリシーを使用しているすべてのクラスタに影響しますマルチクラスタ ポリシーの使用を参照)。

ポリシーを切り離し、クラスタを適切なサイズに手動でスケールする。

Dataproc サポートの利用

どのイメージ バージョンで自動スケーリングがサポートされますか?どの API バージョンですか?

自動スケーリングは、クラスタ イメージのバージョン 1.0.99 以降、1.1.90 以降、1.2.22 以降、1.3.0 以降、1.4.0 以降の v1 APICloud Dataproc バージョン リスト)と gcloud dataproc autoscaling-policies コマンドをサポートしています。

Dataproc と Dataflow の自動スケーリングはどのように違いますか?

Cloud Dataflow の自動スケーリングを Spark や Hadoop と比較するをご覧ください。