Dataproc クラスタを自動スケーリングする

自動スケーリングとは

ワークロードに対するクラスタ ワーカー(ノード)の「適正な」数を見積もることは困難であり、パイプライン全体で単一のクラスタサイズを使用することは多くの場合、理想的ではありません。ユーザーが開始するクラスタのスケーリングは、この課題に部分的に対処しますが、クラスタの使用状況のモニタリングと手動による操作が必要になります。

Dataproc AutoscalingPolicies API を使用すると、クラスタ リソースの管理を自動化するメカニズムが提供され、クラスタ ワーカー VM の自動スケーリングが可能になります。Autoscaling Policy は、クラスタ ワーカーが自動スケーリング ポリシーを使用する方法を記述する再利用可能な構成です。スケーリングの境界や頻度、積極性を定義し、クラスタ存続期間中のクラスタ リソースをきめ細かく制御します。

自動スケーリングを使用する場合

次の場合、自動スケーリングを使用します。

Cloud StorageBigQuery などの外部サービスにデータを格納するクラスタ

多数のジョブを処理するクラスタ

単一ジョブをスケールアップするクラスタ

Spark バッチジョブ用エンハンスト柔軟性モードを使用

以下の自動スケーリングは推奨されません

  • HDFS: 自動スケーリングは、クラスタ内の HDFS のスケーリングを目的としたものではありません。その理由は以下のとおりです。

    1. HDFS の使用率は、自動スケーリングのシグナルではありません。
    2. HDFS データはプライマリ ワーカーでのみホストされます。プライマリ ワーカーの数は、すべての HDFS データをホストするのに十分である必要があります。
    3. HDFS DataNode をデコミッションすると、ワーカーの削除が遅れる可能性があります。Datanodes は、ワーカーを削除する前に、HDFS ブロックを他の Datanode にコピーします。データサイズとレプリケーション係数によっては、このプロセスに数時間かかることがあります。
  • YARN ノードラベル: 自動スケーリングでは、YARN ノードラベルや、YARN-9088 のプロパティ dataproc:am.primary_only はサポートされません。ノードラベルが使用された場合、YARN が誤ってクラスタ指標を報告します。

  • Spark Structured Streaming: 自動スケーリングでは Spark Structured Streaming がサポートされません(自動スケーリングと Spark Structured Streaming をご覧ください)。

  • アイドル状態のクラスタ: クラスタがアイドル状態のときにクラスタを最小サイズまで縮小するための自動スケーリングはおすすめしません。新しいクラスタの作成にかかる時間はサイズ変更の場合と同程度のため、アイドル状態のクラスタを削除して再作成することを検討してください。この「エフェメラル」モデルをサポートするツールは次のとおりです。

    Dataproc ワークフローを使用して、専用クラスタで一連のジョブをスケジュールし、ジョブが完了したらクラスタを削除します。高度なオーケストレーションには、Apache Airflow に基づく Cloud Composer を使用します。

    アドホック クエリまたは外部でスケジュールされたワークロードを処理するクラスタの場合、指定されたアイドル時間(期間)の後、または特定の時刻にクラスタを削除するには、クラスタのスケジュール設定された削除を使用します。

  • サイズが異なるワークロード: クラスタで小さなジョブと大規模なジョブが実行されている場合、正常なデコミッションのスケールダウンでは、大規模なジョブが終了するまで待機します。その結果、長時間実行ジョブが完了するまで、クラスタで実行されている小規模なジョブのリソースの自動スケーリングが遅延します。この結果を回避するには、サイズが同程度の小さなジョブをクラスタにグループ化し、長時間実行ジョブを個別のクラスタに分離します。

自動スケーリングを有効にする

クラスタで自動スケーリングを有効にするには、次の手順を行います。

  1. 自動スケーリング ポリシーを作成します

  2. 次のいずれかを実行します。

    1. 自動スケーリング クラスタを作成する
    2. 既存のクラスタで自動スケーリングを有効にする

自動スケーリング ポリシーを作成する

gcloud コマンド

gcloud dataproc autoscaling-policies import コマンドを使用すると、自動スケーリング ポリシーを作成できます。このコマンドは、自動スケーリング ポリシーを定義するローカルの YAML ファイルを読み取ります。ファイルの形式と内容は、autoscalingPolicies REST API で定義された構成オブジェクトやフィールドと一致している必要があります。

次の YAML の例は、すべての必須フィールドを指定するポリシーを定義しています。また、プライマリ ワーカーの minInstances 値と maxInstances 値、セカンダリ(プリエンプティブル)ワーカーの maxInstances 値を提供し、4 分 cooldownPeriod (デフォルトは 2 分)を指定します。workerConfig は、プライマリ ワーカーを設定します。この例では、プライマリ ワーカーのスケーリングを回避するためにminInstancesmaxInstances に同じ値を設定します。

workerConfig:
  minInstances: 10
  maxInstances: 10
secondaryWorkerConfig:
  maxInstances: 50
basicAlgorithm:
  cooldownPeriod: 4m
  yarnConfig:
    scaleUpFactor: 0.05
    scaleDownFactor: 1.0
    gracefulDecommissionTimeout: 1h

次の YAML の例は、自動スケーリング ポリシーのすべてのオプションと必須のフィールドを指定しています。

workerConfig:
  minInstances: 10
  maxInstances: 10
  weight: 1
secondaryWorkerConfig:
  minInstances: 0
  maxInstances: 100
  weight: 1
basicAlgorithm:
  cooldownPeriod: 2m
  yarnConfig:
    scaleUpFactor: 0.05
    scaleDownFactor: 1.0
    scaleUpMinWorkerFraction: 0.0
    scaleDownMinWorkerFraction: 0.0
    gracefulDecommissionTimeout: 1h

ローカル ターミナルまたは Cloud Shell で次の gcloud コマンドを実行して、自動スケーリング ポリシーを作成します。ポリシーの名前を指定します。この名前はポリシー id となり、後の gcloud コマンドでポリシーを参照するために使用できます。--source フラグを使用して、インポートする自動スケーリング ポリシー YAML ファイルのローカル ファイルパスとファイル名を指定します。

gcloud dataproc autoscaling-policies import policy-name \
    --source=filepath/filename.yaml \
    --region=region

REST API

autoscalingPolicies.create リクエストの一部として AutoscalingPolicy を定義して、自動スケーリング ポリシーを作成します。

Console

自動スケーリング ポリシーを作成するには、Google Cloud コンソールを使用して Dataproc の [自動スケーリング ポリシー] ページで [ポリシーを作成] を選択します。[ポリシーの作成] ページで、ポリシーに関するパネルを選択して、特定のジョブタイプまたはスケーリング オブジェクティブの自動スケーリング ポリシー フィールドに入力できます。

自動スケーリング クラスタを作成する

自動スケーリング ポリシーを作成した後、自動スケーリング ポリシーを使用するクラスタを作成します。クラスタは、自動スケーリング ポリシーと同じリージョンに存在する必要があります。

gcloud コマンド

ローカルのターミナルや Cloud Shell で、次の gcloud コマンドを実行し、自動スケーリング クラスタを作成します。クラスタ名を指定し、--autoscaling-policy フラグを使用して policy idポリシーを作成したときに指定したポリシー名)を指定するか、ポリシー resource URI (resource name) を指定してください(AutoscalingPolicy の idname フィールドをご覧ください)。

gcloud dataproc clusters create cluster-name \
    --autoscaling-policy=policy id or resource URI \
    --region=region

REST API

clusters.create リクエストの一部として AutoscalingConfig を含めて、自動スケーリング クラスタを作成します。

Console

Google Cloud コンソールで、Dataproc の [クラスタを作成する] ページにある [クラスタの設定] パネルの [自動スケーリング ポリシー] セクションから、新しいクラスタに適用する既存の自動スケーリング ポリシーを選択できます。

既存のクラスタで自動スケーリングを有効にする

自動スケーリング ポリシーを作成した後、同じリージョンの既存のクラスタで作成したポリシーを有効にできます。

gcloud コマンド

既存のクラスタで自動スケーリング ポリシーを有効にするには、ローカルのターミナルや Cloud Shell で、次の gcloud コマンドを実行します。クラスタ名を指定し、--autoscaling-policy フラグを使用して policy idポリシーを作成したときに指定したポリシー名)を指定するか、ポリシー resource URI (resource name) を指定してください(AutoscalingPolicy の idname フィールドをご覧ください)。

gcloud dataproc clusters update cluster-name \
    --autoscaling-policy=policy id or resource URI \
    --region=region

REST API

既存のクラスタで自動スケーリング ポリシーを有効にするには、clusters.patch リクエストの updateMask にあるポリシーの AutoscalingConfig.policyUri を設定します。

Console

現在のところ、Google Cloud Console で、既存のクラスタで自動スケーリング ポリシーを有効にすることはサポートされていません。

マルチクラスタ ポリシーの使用

  • 自動スケーリング ポリシーは、複数のクラスタに適用できるスケーリングの動作を定義します。自動スケーリング ポリシーは、類似したワークロードを共有したり、類似したリソース使用パターンでジョブを実行したりする場合に最適です。

  • 複数のクラスタにより使用されているポリシーを更新できます。この更新は、そのポリシーを使用するすべてのクラスタの自動スケーリングの動作にすぐに反映されます(autoscalingPolicies.update をご覧ください)。そのポリシーを使用しているクラスタにポリシーの更新が適用されないようにするには、ポリシーを更新する前に、クラスタの自動スケーリングを無効にします。

gcloud コマンド

クラスタで自動スケーリングを無効にするには、ローカルのターミナルや Cloud Shell で、次の gcloud コマンドを実行します。

gcloud dataproc clusters update cluster-name --disable-autoscaling \
    --region=region

REST API

クラスタで自動スケーリングを無効にするには、AutoscalingConfig.policyUri に空の文字列を設定し、update_mask=config.autoscaling_config.policy_uriclusters.patch リクエストに設定します。

Console

現在、Google Cloud Console では、クラスタでの自動スケーリングの無効化はサポートされていません。

  • 1 つ以上のクラスタで使用されているポリシーは削除できません(autoscalingPolicies.delete を参照してください)。

自動スケーリングの仕組み

自動スケーリングでは、クラスタをスケールするかどうかを決定するために、各「クールダウン」期間が経過するたびにクラスタの Hadoop YARN 指標をチェックし、スケールする場合は、更新の規模を決定します。

  1. YARN 保留中リソース指標(保留中メモリまたは保留中コア)の値によって、スケールアップまたはスケールダウンが決まります。0 より大きい値は、YARN ジョブがリソースを待機しており、スケールアップが必要であることを示します。0 値は、YARN に十分なリソースがあり、スケールダウンなどの変更が必要ないことを示します。

    保留中のリソースが 0 より大きい場合:

    $estimated\_worker\_count =$

    \[ \Biggl \lceil AVERAGE\ during\ cooldown\ period\Big(\frac{Pending + Available + Allocated + Reserved}{Resource\ per\ worker}\Big)\Biggr \rceil \]

    保留中のリソースが 0 の場合:

    $estimated\_worker\_count =$

    \[ \Biggl \lceil AVERAGE\ during\ cooldown\ period\Big(\frac{Allocated + Reserved}{Resource\ per\ worker}\Big)\Biggr \rceil \]

    デフォルトでは、オートスケーラーは YARN メモリリソースをモニタリングします。コアベースの自動スケーリングを有効にすると、YARN メモリと YARN コアの両方がモニタリングされます。estimated_worker_count はメモリとコアについて別々に評価され、より多くのワーカー数が選択されます。

    $estimated\_worker\_count =$

    \[ max(estimated\_worker\_count\_by\_memory,\ estimated\_worker\_count\_by\_cores) \]

    \[ estimated\ \Delta worker = estimated\_worker\_count - current\_worker\_count \]

  2. ワーカー数に必要な変更が見積もられると、自動スケーリングでは、scaleUpFactorscaleDownFactor を使用してワーカー数の実際の変化を計算します。

    if estimated Δworkers > 0:
      actual Δworkers = ROUND_UP(estimated Δworkers * scaleUpFactor)
      # examples:
      # ROUND_UP(estimated Δworkers=5 * scaleUpFactor=0.5) = 3
      # ROUND_UP(estimated Δworkers=0.8 * scaleUpFactor=0.5) = 1
    else:
      actual Δworkers = ROUND_DOWN(estimated Δworkers * scaleDownFactor)
      # examples:
      # ROUND_DOWN(estimated Δworkers=-5 * scaleDownFactor=0.5) = -2
      # ROUND_DOWN(estimated Δworkers=-0.8 * scaleDownFactor=0.5) = 0
      # ROUND_DOWN(estimated Δworkers=-1.5 * scaleDownFactor=0.5) = 0
    
    scaleUpFactor や scaledownFactor が 1.0 の場合、保留中または使用可能なリソースが 0(完全に使用されている)になるように自動スケーリングが行われます。

  3. ワーカー数の変化が計算されると、scaleUpMinWorkerFractionscaleDownMinWorkerFraction のどちらかがしきい値として機能し、自動スケーリングによってクラスタがスケールされるかどうか判断されます。Δworkers が小さい場合でも、割合が小さければ自動スケーリングが必要であることを意味します。Δworkers が大きい場合は、割合が大きい場合のみ自動スケーリングが行われます。

    IF (Δworkers >  scaleUpMinWorkerFraction * current_worker_count) then scale up
    
    または
    IF (abs(Δworkers) >  scaleDownMinWorkerFraction * current_worker_count),
    THEN scale down.
    

  4. スケールするワーカーの数がスケーリングのトリガーに十分な数の場合、自動スケーリングは、workerConfigsecondaryWorkerConfigweight(プライマリ ワーカーとセカンダリ ワーカーの比率)の minInstances maxInstances の範囲を使用して、プライマリ ワーカー インスタンス グループとセカンダリ ワーカー インスタンス グループでワーカー数を分割する方法を決定します。こうした計算の結果は、スケーリング期間のクラスタに対する最終的な自動スケーリングの変更になります。

  5. 次の場合、2.0.57 より後のイメージ バージョンと 2.1.5 で作成されたクラスタで自動スケーリングのスケールダウン リクエストがキャンセルされます。

    1. ゼロ以外の正常なデコミッション タイムアウト値でスケールダウンが進行中である。
    2. 次の式に示すように、アクティブな YARN ワーカー(「アクティブ ワーカー」)数に、オートスケーラーが推奨するワーカーの合計数(Δworkers)の変更を加えると、DECOMMISSIONING YARN ワーカー(「デコミッション ワーカー」)と等しいかそれ以上になります。

      IF (active workers + Δworkers ≥ active workers + decommissioning workers)
      THEN cancel the scaledown operation
      

    スケールダウンのキャンセル例については、自動スケーリングによってスケールダウン オペレーションがキャンセルされるタイミングをご覧ください。

自動スケーリング構成の推奨事項

プライマリ ワーカーのスケーリングを回避する

プライマリ ワーカーは HDFS データノードを実行しますが、セカンダリ ワーカーはコンピューティングのみを行います。セカンダリ ワーカーを使用すると、ストレージをプロビジョニングしなくてもコンピューティング リソースを効率的にスケーリングできるため、スケーリング機能が高速化されます。HDFS Namenode には競合状態が複数あり、これによって HDFS が破損し、デコミッションが永続的に停止する可能性があります。この問題を回避するには、プライマリ ワーカーのスケーリングを避けてください。次に例を示します。workerConfig: minInstances: 10 maxInstances: 10 secondaryWorkerConfig: minInstances: 0 maxInstances: 100

クラスタの作成コマンドを次のように変更する必要があります。

  1. 自動スケーリング ポリシーのプライマリ ワーカー グループのサイズに合わせて --num-workers=10 を設定します。
  2. セカンダリ ワーカーを非プリエンプティブルになるように構成するには、--secondary-worker-type=non-preemptible を設定します。(プリエンプティブル VM が必要な場合を除く)。
  3. ハードウェア構成をプライマリ ワーカーからセカンダリ ワーカーにコピーする。たとえば、--worker-boot-disk-size=1000GB と一致するように --secondary-worker-boot-disk-size=1000GB を設定します。

Spark バッチジョブの高度な柔軟性モードの使用

自動スケーリングで高度な柔軟性モード(EFM)を使用すると、次のことができます。

ジョブの実行中にクラスタのスケールダウンを高速化できる

クラスタのスケールダウンによる実行中のジョブの停止を防ぐ

プリエンプティブル セカンダリ ワーカーのプリエンプションによる実行中のジョブの停止を最小限に抑える

EFM が有効な場合、自動スケーリング ポリシーの正常なデコミッションのタイムアウトを 0s に設定する必要があります。自動スケーリング ポリシーでは、セカンダリ ワーカーの自動スケーリングのみを行う必要があります。

正常なデコミッションのタイムアウトの選択

自動スケーリングでは、クラスタからノードを削除するときの YARN 正常なデコミッションがサポートされます。正常なデコミッションにより、アプリケーションはジョブの進行を妨げないよう、ステージ間でシャッフル データを完了できます。自動スケーリング ポリシーで提供される「正常なデコミッション タイムアウト」は、YARN がノードを削除する前にアプリケーション(デコミッションの開始時に実行されていたアプリケーション)の実行を待機する時間の上限です。

指定された正常なデコミッション タイムアウト期間内にプロセスが完了しない場合、ワーカーノードは強制的にシャットダウンされ、データの損失やサービスの停止が発生する可能性があります。このような可能性を防ぐには、正常なデコミッションのタイムアウトをクラスタが処理する最長ジョブよりも長い値に設定します。たとえば、最長のジョブが 1 時間実行されることが予想される場合は、タイムアウトを 1 時間以上に設定します(1h)。

正常なデコミッションを防ぐには、処理時間が 1 時間を超えるジョブを独自のエフェメラル クラスタに移行することを検討してください。

scaleUpFactor を設定する

scaleUpFactor は、オートスケーラーがクラスタをどのくらい積極的にスケールアップするかを制御します。0.01.0 の数値を指定して、ノードを追加する YARN の保留中のリソースの小数値を設定します。

たとえば、100 MB の保留中コンテナがそれぞれ 512 MB をリクエストしている場合、保留中の YARN メモリは 50 GB になります。scaleUpFactor が 0.5 の場合、オートスケーラーは 25 GB の YARN メモリを追加するのに十分な数のノードを追加します。同様に、0.1 の場合、オートスケーラーは 5 GB の YARN メモリの追加に十分なノードを追加します。これらの値は、VM 上で物理的に使用可能な合計メモリではなく、YARN メモリに対応しています。

手始めに、0.05 を MapReduce ジョブと動的割り当てが有効になっている Spark ジョブに利用することをおすすめします。固定エグゼキュータ数と Tez ジョブを持つ Spark ジョブの場合は、1.0 を使用します。scaleUpFactor が 1.0 の場合、保留中または使用可能なリソースが 0(完全に使用されている)になるように自動スケーリングが行われます。

scaleDownFactor を設定する

scaleDownFactor は、オートスケーラーがクラスタをどのくらい積極的にスケールダウンするかを制御します。0.01.0 の数値を指定して、ノードの削除の原因となる YARN リソースの小数値を設定します。

頻繁にスケールアップとスケールダウンを行う必要があるほとんどのマルチジョブ クラスタでは、この値を 1.0 のままにします。正常なデコミッションにより、スケールダウン オペレーションは、スケールアップ オペレーションよりも大幅に遅くなります。scaleDownFactor=1.0 を設定すると、積極的なスケールダウン レートが設定されます。これにより、適切なクラスタサイズを達成するために必要なダウンスケーリング オペレーションの数を最小限に抑えることができます。

安定性を必要とするクラスタの場合は、スケールダウンのレートを遅くするために scaleDownFactor を低く設定します。

エフェメラル クラスタや単一ジョブ クラスタを使用する場合など、クラスタのスケールダウンを回避するには、この値を 0.0 に設定します。

scaleUpMinWorkerFractionscaleDownMinWorkerFraction の設定

scaleUpMinWorkerFractionscaleDownMinWorkerFractionscaleUpFactor または scaleDownFactor で使用され、デフォルト値は 0.0 です。これらは、オートスケーラーがクラスタをスケールアップまたはスケールダウンするしきい値を表します。スケールアップまたはスケールダウン リクエストを発行するために必要なクラスタサイズの最小の増減値です。

例: scaleUpMinWorkerFraction0.05(5%)以下でない限り、オートスケーラーは 100 ノードクラスタに 5 個のワーカーを追加する更新リクエストを発行しません。0.1 に設定すると、オートスケーラーはクラスタをスケールアップするリクエストを発行しません。同様に、scaleDownMinWorkerFraction0.05 の場合、少なくとも 5 つのノードが削除されない限り、オートスケーラーはノードを 100 ノードクラスタから削除するための更新リクエストを発行しません。

デフォルト値の 0.0 は、しきい値がないことを示します。

小規模で不要なスケーリング オペレーションを回避するため、大規模なクラスタ(100 ノード以上)で scaleDownMinWorkerFractionthresholds を高く設定することを強くおすすめします

クールダウン期間の選択

cooldownPeriod は、オートスケーラーがクラスタサイズの変更リクエストを発行しない期間を設定します。これを使用して、クラスタサイズに対するオートスケーラーの変更頻度を制限できます。

最小値かつデフォルトの cooldownPeriod は 2 分です。ポリシーにより短い cooldownPeriod を設定した場合、ワークロードの変化がクラスタのサイズにより早く影響を与えますが、クラスタが不必要にスケールアップやスケールダウンされる可能性があります。より短いcooldownPeriodを使う場合は、ポリシーのscaleUpMinWorkerFractionscaleDownMinWorkerFractionをゼロ以外の値に設定することをおすすめします。この設定により、クラスタを更新する正当な理由に値するリソース使用量の変化がある場合に限り、クラスタがスケールアップまたはスケールダウンされます。

ワークロードがクラスタサイズの変更の影響を受けやすい場合は、クールダウン期間を長くできます。たとえば、バッチ処理ジョブを実行している場合は、クールダウン期間を 10 分以上に設定できます。さまざまなクールダウン期間を試して、ワークロードに最適な値を見つけます。

ワーカー数とグループの重み

各ワーカー グループには minInstancesmaxInstances があり、各グループのサイズにハードリミットを構成します。

各グループには、weight というパラメータもあり、2 つのグループの間の目標残高を設定します。このパラメータはヒントにすぎません。グループが最小サイズまたは最大サイズに到達すると、ノードの追加と削除は他のグループからのみ行われます。したがって、weight はほとんどの場合、デフォルトの 1 のままにできます。

コアベースの自動スケーリングを有効にする

デフォルトでは、YARN はリソース割り当てにメモリ指標を使用します。CPU 使用率の高いアプリケーションの場合は、Dominant Resource Calculator を使用するように YARN を構成することをおすすめします。これを行うには、クラスタの作成時に次のプロパティを設定します。

capacity-scheduler:yarn.scheduler.capacity.resource-calculator=org.apache.hadoop.yarn.util.resource.DominantResourceCalculator

自動スケーリング指標とログ

次のリソースとツールは、自動スケーリングの操作と、クラスタとそのジョブへの影響をモニタリングするのに役立ちます。

Cloud Monitoring

Cloud Monitoring を使用すると、次のことができます。

  • 自動スケーリングで使用される指標を表示する
  • クラスタ内のノード マネージャの数を表示する
  • 自動スケーリングでクラスタのスケールが行われた理由、または行われなかった理由を理解する autoscaling-stackdriver1autoscaling-stackdriver2autoscaling-stackdriver3

Cloud Logging

Cloud Logging を使用して、Cloud Dataproc Autoscaler からログを表示します。

1)クラスタのログを検索します。

autoscaling-logs-for-cluster

2)dataproc.googleapis.com/autoscaler を選択します。

autoscaling-log-file

3)ログメッセージを展開して status フィールドを表示します。ログは、コンピュータが読み取り可能な JSON 形式です。

autoscaling-three-logs autoscaling-update-operation

4)ログメッセージを展開して、スケーリングの推奨値、スケーリングの決定に使用する指標、オリジナルのクラスタサイズ、新しいクラスタサイズを表示します。

autoscaling-recommendation-message

背景: Apache Hadoop と Apache Spark による自動スケーリング

続くセクションでは、自動スケーリングと Hadoop YARN、Hadoop Mapreduce、Apache Spark、Spark Streaming、Spark Structured Streaming との連携について説明します。

Hadoop YARN 指標

自動スケーリングは、次の Hadoop YARN 指標に基づいて実行されます。

  1. Allocated resource は、コンテナを実行することによってクラスタ全体で使用される YARN リソースの合計を指します。最大 1 ユニットのリソースを使用できる 6 つの実行中のコンテナがある場合、6 つのリソースが割り当てられます。

  2. Available resource は、割り当てられたコンテナで使用されない、クラスタ内の YARN リソースです。すべてのノード マネージャーに 10 ユニットのリソースがあり、そのうち 6 ユニットが割り当てられている場合、利用可能なリソースは 4 ユニットです。クラスタに利用可能な(使用されていない)リソースがある場合、自動スケーリングでクラスタからワーカーが削除されることがあります。

  3. Pending resource は、保留中のコンテナに対する YARN リソース リクエストの合計です。保留中のコンテナは、YARN で実行するスペースを待機しています。保留中のリソースが 0 以外になるのは、使用可能なリソースが 0 の場合、または次のコンテナに割り当てることができないほど小さい場合のみです。保留中のコンテナがある場合、自動スケーリングでクラスタにワーカーが追加されることがあります。

これらの指標は Cloud Monitoring で確認できます。 デフォルトでは、クラスタ上の YARN メモリは 0.8 * 合計メモリとなり、残りのメモリは他のデーモンや、ページ キャッシュなどオペレーティング システムでの使用に予約されます。このデフォルト値は、「yarn.nodemanager.resource.memory-mb」の YARN 構成設定でオーバーライドできます(Apache Hadoop YARN、HDFS、Spark、関連プロパティを参照)。

自動スケーリングと Hadoop MapReduce

MapReduce は各マップを実行し、独立した YARN コンテナとしてタスクを削減します。ジョブが開始されると、MapReduce は各マップタスクに対するコンテナ リクエストを送信するため、保留中の YARN メモリが急増します。マップタスクが完了すると、保留中のメモリは減少します。

mapreduce.job.reduce.slowstart.completedmaps が完了すると(Dataproc でのデフォルトは 95%)、MapReduce によりすべてのレデューサに対するコンテナ リクエストがキューに追加され、保留メモリがさらに急増します。

タスクのマッピングと縮小に数分以上かかる場合を除き、自動スケーリング scaleUpFactor に高い値を設定しないでください。クラスタにワーカーを追加するには少なくとも 1.5 分かかるので、新しいワーカーを利用するための十分な保留作業を行えるように数分間を確保してください。最初は scaleUpFactor を保留中のメモリの 0.05(5%)または 0.1(10%)に設定することをおすすめします。

自動スケーリングと Spark

Spark は、YARN の上にスケジューリングのためのレイヤを追加します。具体的には、Spark Core のダイナミック アロケーションにより、コンテナで Spark エグゼキュータを実行するように YARN にリクエストし、それらのエグゼキュータのスレッドで Spark タスクをスケジュールします。Dataproc クラスタではデフォルトでダイナミック アロケーションが有効になっているため、エグゼキュータは必要に応じて追加削除されます。

Spark は常に YARN にコンテナを要求しますが、動的割り当てが行われない場合は、ジョブの開始時にのみコンテナを要求します。動的割り当てが行われる場合、必要に応じてコンテナを削除したり、新しいコンテナをリクエストしたりします。

Spark は少数(自動スケーリング クラスタ上で 2 つ)のエグゼキュータから開始されます。未処理のタスクがある間は、エグゼキュータの数を倍増し続けます。これにより、保留中のメモリが少なくなります(保留中のメモリが急増することが少なくなります)。Spark ジョブの場合は、自動スケーリング scaleUpFactor に 1.0(100%)などの大きい数を設定することをおすすめします。

Spark の動的割り当てを無効にする

Spark ダイナミック アロケーションのメリットがない別の Spark ジョブを実行する場合は、spark.dynamicAllocation.enabled=falsespark.executor.instances を設定して Spark ダイナミック アロケーションを無効にできます。別の Spark ジョブの実行中でも、自動スケーリングを使用してクラスタをスケールできます。

キャッシュ データを使用した Spark ジョブ

データセットが不要になった場合は、spark.dynamicAllocation.cachedExecutorIdleTimeout をセットするか、データセットのキャッシュを解除します。デフォルトでは、Spark はキャッシュされたデータのあるエグゼキュータを削除せず、このことによってクラスタのスケールダウンを防止します。

自動スケーリングと Spark Streaming

  1. Spark Streaming には、ストリーミング固有のシグナルを使用してエグゼキュータの追加や削除を行う独自のバージョンのダイナミック アロケーションがあるため spark.streaming.dynamicAllocation.enabled=true を設定し、Spark Core のダイナミック アロケーションを無効にする spark.dynamicAllocation.enabled=false を設定します。

  2. Spark Streaming ジョブと一緒に正常なデコミッション(自動スケーリング gracefulDecommissionTimeout)を使用しないでください。代わりに、自動スケーリングでワーカーを安全に削除するには、フォールト トレランスのためにチェックポイントを構成します。

または、自動スケーリングなしで Spark Streaming を使用するには、次の手順を行います。

  1. Spark Core のダイナミック アロケーション(spark.dynamicAllocation.enabled=false)を無効にする。
  2. ジョブのエグゼキュータの数(spark.executor.instances)を設定する。クラスタ プロパティをご覧ください。

自動スケーリングと Spark Structured Streaming

現在、Spark Structured Streaming では動的割り当てがサポートされていないため、自動スケーリングには Spark Structured Streaming との互換性がありません(SPARK-24815: Structured Streaming should support dynamic allocation をご覧ください)。

パーティショニングと並列処理による自動スケーリングの制御

通常、並列処理はクラスタ リソースによって設定または決定されます(たとえば、HDFS ブロック数はタスクの数に応じて制御されます)が、自動スケーリングでは反対になります。自動スケーリングでは、ジョブの並列性に従ってワーカーの数が設定されます。ジョブの並列処理の設定に役立つガイドラインは次のとおりです。

  • Dataproc では、初期のクラスタサイズに基づいて MapReduce のデフォルトの削減タスク数が設定されますが、mapreduce.job.reduces を設定して削減フェーズの並列処理を増やすことができます。
  • Spark SQL と Dataframe の並列処理は spark.sql.shuffle.partitions によって決まります。デフォルトは 200 です。
  • Spark の RDD 関数はデフォルトで spark.default.parallelism に設定されます。これは、ジョブの開始時にワーカーノードのコア数に設定されます。ただし、シャッフルを作成する RDD 関数はすべて、spark.default.parallelism をオーバーライドするパーティションの数のパラメータを取ります。

データが均等に分割されていることを確認する必要があります。重大なキースキューがある場合、1 つ以上のタスクが他のタスクよりも大幅に時間がかかり、結果として使用率が低くなる可能性があります。

自動スケーリングのデフォルトの Spark / Hadoop プロパティ設定

自動スケーリング クラスタには、プライマリ ワーカーが削除されたときやセカンダリ ワーカーがプリエンプトされたときのジョブの失敗を回避するのに役立つデフォルトのクラスタ プロパティ値があります。これらのデフォルト値は、自動スケーリングを使用してクラスタを作成するときにオーバーライドできます(クラスタ プロパティ を参照してください)。

以下は、タスク、アプリケーション マスター、ステージの最大再試行回数を増やすデフォルト設定です。

yarn:yarn.resourcemanager.am.max-attempts=10
mapred:mapreduce.map.maxattempts=10
mapred:mapreduce.reduce.maxattempts=10
spark:spark.task.maxFailures=10
spark:spark.stage.maxConsecutiveAttempts=10

以下は再試行カウンタをリセットするデフォルト設定です(長時間実行する Spark Streaming ジョブに役立ちます)。

spark:spark.yarn.am.attemptFailuresValidityInterval=1h
spark:spark.yarn.executor.failuresValidityInterval=1h

以下は、Spark の小規模から開始する slow-start ダイナミック アロケーション メカニズムのデフォルト設定です。

spark:spark.executor.instances=2

よくある質問(FAQ)

高可用性クラスタと単一ノード クラスタで自動スケーリングを有効にすることはできますか?

高可用性クラスタでは自動スケーリングを有効にできますが、単一ノードクラスタでは有効にできません(単一ノードクラスタではサイズ変更がサポートされません)。

自動スケーリング クラスタのサイズを手動で変更できますか?

はい。自動スケーリング ポリシーを調整するときは、暫定措置として、クラスタのサイズを手動で変更できます。ただし、こうした変更には一時的な効果しかないため、最終的には自動スケーリングでクラスタが縮小されます。

自動スケーリング クラスタのサイズを手動で変更する代わりに、次のことを検討してください。

自動スケーリング ポリシーを更新する。自動スケーリング ポリシーに加えた変更は、現在そのポリシーを使用しているすべてのクラスタに影響しますマルチクラスタ ポリシーの使用を参照)。

ポリシーを切り離し、クラスタを適切なサイズに手動でスケールする。

Dataproc サポートの利用

Dataproc と Dataflow の自動スケーリングはどのように違いますか?

Dataflow の水平自動スケーリングDataflow Prime の垂直自動スケーリングをご覧ください。

Dataproc の開発チームは、クラスタのステータスを ERROR から RUNNING にリセットできますか?

一般的に答えは「いいえ」です。手動でリセットするには、クラスタの状態をリセットするための作業が安全であることを確認するための手動作業が必要です。多くの場合、HDFS の Namenode の再起動など、他の手動による手順がなければクラスタはリセットできません。

失敗したオペレーション後のクラスタのステータスを特定できない場合、Dataproc はクラスタのステータスを ERROR に設定します。ERROR のクラスタは自動スケーリングされず、ジョブも実行されません。一般的には次のような原因が考えられます。

  1. Compute Engine API から返されたエラー。多くの場合、Compute Engine のサービス停止中に起こります。

  2. HDFS のデコミッションの不具合により、HDFS が破損した状態になっている。

  3. Dataproc Control API エラー(「タスクのリースの期限切れ」など)

ステータスが ERROR のクラスタを削除して再作成します。

自動スケーリングによってスケールダウン オペレーションがキャンセルされるタイミング

次の図は、自動スケーリングがスケールダウン オペレーションをキャンセルするタイミングを示しています(自動スケーリングの仕組みも参照)。

dataproc-autoscaling-cancellation-example

メモ:

  • クラスタでは、YARN メモリ指標のみに基づいて自動スケーリングが有効になっています(デフォルト)。
  • T1~T9 は、オートスケーラーがワーカー数を評価するクールダウン期間を表します(イベントのタイミングは簡素化されています)。
  • 積み上げ棒グラフは、アクティブなクラスタ YARN ワーカー、デコミッション中のクラスタ YARN ワーカー、デコミッションされたクラスタ YARN ワーカーの数を表します。
  • オートスケーラーの推奨ワーカー数(黒い線)は、YARN メモリ指標、YARN アクティブ ワーカー数、自動スケーリング ポリシーの設定に基づいています(自動スケーリングの仕組みをご覧ください)。
  • 赤色の背景領域は、スケールダウン オペレーションが実行されている期間を示します。
  • 黄色の背景領域は、スケールダウン オペレーションがキャンセルされた期間を示します。
  • 緑色の背景領域は、スケールアップ オペレーションの期間を示します。

次の時点で、次の操作が行われます。

  • T1: オートスケーラーは正常なデコミッションのスケールダウン オペレーションを開始し、現在のクラスタ ワーカーの約半分をスケールダウンします。

  • T2: オートスケーラーはクラスタ指標を継続的にモニタリングします。スケールダウンの推奨事項は変更されず、スケールダウン オペレーションは続行されます。デコミッション済みのワーカーとデコミッション中のワーカーがあります(Dataproc はデコミッション済みのワーカーを削除します)。

  • T3: 追加の YARN メモリが使用可能になったため、ワーカーの数をさらにスケールダウンできるとオートスケーラーが計算します。ただし、アクティブ ワーカー数に、推奨されるワーカー数の変更を加えると、アクティブ ワーカー数とデコミッション ワーカーの数の合計未満になるため、スケールダウンのキャンセル条件が満たされず、オートスケーラーはスケールダウン オペレーションをキャンセルしません。

  • T4: YARN は保留中のメモリの増加を報告します。ただし、オートスケーラーはワーカー数の推奨値を変更しません。T3 と同様に、スケールダウンのキャンセル条件は満たされないままで、オートスケーラーによってスケールダウン オペレーションがキャンセルされることはありません。

  • T5: YARN の保留中のメモリが増加し、オートスケーラーによって推奨されるワーカーの数の変更が増えます。ただし、アクティブ ワーカー数に、推奨されるワーカー数の変更を加えると、アクティブ ワーカー数とデコミッション ワーカーの数の合計未満になるため、キャンセル条件が満たされないままで、スケールダウン オペレーションがキャンセルされません。

  • T6: YARN の保留中メモリがさらに増加します。アクティブなワーカー数に、オートスケーラーによって推奨されるワーカーの数の変更を加えると、アクティブなワーカー数とデコミッション ワーカーの合計より大きくなります。キャンセル条件が満たされ、オートスケーラーがスケールダウン オペレーションをキャンセルします。

  • T7: オートスケーラーは、スケールダウン オペレーションのキャンセルが完了するのを待機しています。オートスケーラーは、この期間中にワーカー数の変更の評価や推奨を行いません。

  • T8: スケールダウン オペレーションのキャンセルが完了します。廃止中のワーカーはクラスタに追加され、アクティブになります。オートスケーラーは、スケールダウン オペレーションのキャンセルの完了を検出し、次の評価期間(T9)を待機して推奨されるワーカー数を計算します。

  • T9: T9 時点でアクティブなオペレーションはありません。オートスケーラーのポリシーと YARN 指標に基づいて、オートスケーラーはスケールアップ オペレーションを推奨します。