自動スケーリングとは
ワークロードに対するクラスタ ワーカー(ノード)の「適正な」数を見積もることは困難であり、パイプライン全体で単一のクラスタサイズを使用することは多くの場合、理想的ではありません。ユーザーが開始するクラスタのスケーリングは、この課題に部分的に対処しますが、クラスタの使用状況のモニタリングと手動による操作が必要になります。
Dataproc AutoscalingPolicies API を使用すると、クラスタ リソースの管理を自動化するメカニズムが提供され、クラスタ ワーカー VM の自動スケーリングが可能になります。Autoscaling Policy
は、クラスタ ワーカーが自動スケーリング ポリシーを使用する方法を記述する再利用可能な構成です。スケーリングの境界や頻度、積極性を定義し、クラスタ存続期間中のクラスタ リソースをきめ細かく制御します。
自動スケーリングを使用する場合
次の場合、自動スケーリングを使用します。
Cloud Storage や BigQuery などの外部サービスにデータを格納するクラスタ
多数のジョブを処理するクラスタ
単一ジョブをスケールアップするクラスタ
Spark バッチジョブの高度な柔軟性モードを使用した
以下の自動スケーリングは推奨されません。
HDFS: 自動スケーリングは、クラスタ内の HDFS のスケーリングを目的としたものではありません。その理由は以下のとおりです。
- HDFS の使用率は、自動スケーリングのシグナルではありません。
- HDFS データはプライマリ ワーカーでのみホストされます。プライマリ ワーカーの数は、すべての HDFS データをホストするのに十分である必要があります。
- HDFS DataNode をデコミッションすると、ワーカーの削除が遅れる可能性があります。Datanodes は、ワーカーを削除する前に、HDFS ブロックを他の Datanode にコピーします。データサイズとレプリケーション係数によっては、このプロセスに数時間かかることがあります。
YARN ノードラベル: 自動スケーリングでは、YARN ノードラベルや、YARN-9088 のプロパティ
dataproc:am.primary_only
はサポートされません。ノードラベルが使用された場合、YARN が誤ってクラスタ指標を報告します。Spark Structured Streaming: 自動スケーリングでは Spark Structured Streaming がサポートされません(自動スケーリングと Spark Structured Streaming をご覧ください)。
アイドル状態のクラスタ: クラスタがアイドル状態のときにクラスタを最小サイズまで縮小するための自動スケーリングはおすすめしません。新しいクラスタの作成にかかる時間はサイズ変更の場合と同程度のため、アイドル状態のクラスタを削除して再作成することを検討してください。この「エフェメラル」モデルをサポートするツールは次のとおりです。
Dataproc ワークフローを使用して、専用クラスタで一連のジョブをスケジュールし、ジョブが完了したらクラスタを削除します。高度なオーケストレーションには、Apache Airflow に基づく Cloud Composer を使用します。
アドホック クエリまたは外部でスケジュールされたワークロードを処理するクラスタの場合、指定されたアイドル時間(期間)の後、または特定の時刻にクラスタを削除するには、クラスタのスケジュール設定された削除を使用します。
サイズが異なるワークロード: クラスタで小さなジョブと大規模なジョブが実行されている場合、正常なデコミッションのスケールダウンでは、大規模なジョブが終了するまで待機します。その結果、長時間実行ジョブは、長時間実行ジョブが終了するまで、クラスタ上で実行されている小さなジョブのリソースの自動スケーリングを遅延させます。この結果を回避するには、類似したサイズの小さなジョブをクラスタ上でグループ化し、各長時間実行ジョブを別々のクラスタに分離します。
自動スケーリングを有効にする
クラスタで自動スケーリングを有効にするには、次の手順を行います。
次のいずれかを実行します。
自動スケーリング ポリシーを作成する
gcloud コマンド
gcloud dataproc autoscaling-policies import
コマンドを使用すると、自動スケーリング ポリシーを作成できます。このコマンドは、自動スケーリング ポリシーを定義するローカルの YAML ファイルを読み取ります。ファイルの形式と内容は、autoscalingPolicies REST API で定義された構成オブジェクトやフィールドと一致している必要があります。
次の YAML の例は、すべての必須フィールドを指定するポリシーを定義しています。また、プライマリ ワーカーの minInstances
値と maxInstances
値、セカンダリ(プリエンプティブル)ワーカーの maxInstances
値を提供し、4 分 cooldownPeriod
(デフォルトは 2 分)を指定します。workerConfig
は、プライマリ ワーカーを設定します。この例では、プライマリ ワーカーのスケーリングを回避するために、minInstances
と maxInstances
に同じ値を設定します。
workerConfig: minInstances: 10 maxInstances: 10 secondaryWorkerConfig: maxInstances: 50 basicAlgorithm: cooldownPeriod: 4m yarnConfig: scaleUpFactor: 0.05 scaleDownFactor: 1.0 gracefulDecommissionTimeout: 1h
次の YAML の例は、自動スケーリング ポリシーのすべてのオプションと必須のフィールドを指定しています。
workerConfig: minInstances: 10 maxInstances: 10 weight: 1 secondaryWorkerConfig: minInstances: 0 maxInstances: 100 weight: 1 basicAlgorithm: cooldownPeriod: 2m yarnConfig: scaleUpFactor: 0.05 scaleDownFactor: 1.0 scaleUpMinWorkerFraction: 0.0 scaleDownMinWorkerFraction: 0.0 gracefulDecommissionTimeout: 1h
ローカル ターミナルまたは Cloud Shell で次の gcloud
コマンドを実行して、自動スケーリング ポリシーを作成します。ポリシーの名前を指定します。この名前はポリシー id
となり、後の gcloud
コマンドでポリシーを参照するために使用できます。--source
フラグを使用して、インポートする自動スケーリング ポリシー YAML ファイルのローカル ファイルパスとファイル名を指定します。
gcloud dataproc autoscaling-policies import policy-name \ --source=filepath/filename.yaml \ --region=region
REST API
autoscalingPolicies.create リクエストの一部として AutoscalingPolicy を定義して、自動スケーリング ポリシーを作成します。
コンソール
自動スケーリング ポリシーを作成するには、Google Cloud コンソールを使用して Dataproc の [自動スケーリング ポリシー] ページで [ポリシーを作成] を選択します。[ポリシーの作成] ページで、ポリシーに関するパネルを選択して、特定のジョブタイプまたはスケーリング オブジェクティブの自動スケーリング ポリシー フィールドに入力できます。
自動スケーリング クラスタを作成する
自動スケーリング ポリシーを作成した後、自動スケーリング ポリシーを使用するクラスタを作成します。クラスタは、自動スケーリング ポリシーと同じリージョンに存在する必要があります。
gcloud コマンド
ローカルのターミナルや Cloud Shell で、次の gcloud
コマンドを実行し、自動スケーリング クラスタを作成します。クラスタ名を指定し、--autoscaling-policy
フラグを使用して policy id(ポリシーを作成したときに指定したポリシー名)を指定するか、ポリシー resource URI (resource name) を指定してください(AutoscalingPolicy の id
と name
フィールドをご覧ください)。
gcloud dataproc clusters create cluster-name \ --autoscaling-policy=policy id or resource URI \ --region=region
REST API
clusters.create リクエストの一部として AutoscalingConfig を含めて、自動スケーリング クラスタを作成します。
コンソール
Google Cloud コンソールで、Dataproc の [クラスタを作成する] ページにある [クラスタの設定] パネルの [自動スケーリング ポリシー] セクションから、新しいクラスタに適用する既存の自動スケーリング ポリシーを選択できます。
既存のクラスタで自動スケーリングを有効にする
自動スケーリング ポリシーを作成した後、同じリージョンの既存のクラスタで作成したポリシーを有効にできます。
gcloud コマンド
既存のクラスタで自動スケーリング ポリシーを有効にするには、ローカルのターミナルや Cloud Shell で、次の gcloud
コマンドを実行します。クラスタ名を指定し、--autoscaling-policy
フラグを使用して policy id(ポリシーを作成したときに指定したポリシー名)を指定するか、ポリシー resource URI (resource name) を指定してください(AutoscalingPolicy の id
と name
フィールドをご覧ください)。
gcloud dataproc clusters update cluster-name \ --autoscaling-policy=policy id or resource URI \ --region=region
REST API
既存のクラスタで自動スケーリング ポリシーを有効にするには、clusters.patch リクエストの updateMask
にあるポリシーの AutoscalingConfig.policyUri を設定します。
Console
現在のところ、Google Cloud Console で、既存のクラスタで自動スケーリング ポリシーを有効にすることはサポートされていません。
マルチクラスタ ポリシーの使用
自動スケーリング ポリシーは、複数のクラスタに適用できるスケーリングの動作を定義します。自動スケーリング ポリシーは、類似したワークロードを共有したり、類似したリソース使用パターンでジョブを実行したりする場合に最適です。
複数のクラスタにより使用されているポリシーを更新できます。この更新は、そのポリシーを使用するすべてのクラスタの自動スケーリングの動作にすぐに反映されます(autoscalingPolicies.update をご覧ください)。そのポリシーを使用しているクラスタにポリシーの更新が適用されないようにするには、ポリシーを更新する前に、クラスタの自動スケーリングを無効にします。
gcloud コマンド
クラスタで自動スケーリングを無効にするには、ローカルのターミナルや Cloud Shell で、次の gcloud
コマンドを実行します。
gcloud dataproc clusters update cluster-name --disable-autoscaling \ --region=region
REST API
クラスタで自動スケーリングを無効にするには、AutoscalingConfig.policyUri に空の文字列を設定し、update_mask=config.autoscaling_config.policy_uri
を clusters.patch リクエストに設定します。
Console
現在、Google Cloud Console では、クラスタでの自動スケーリングの無効化はサポートされていません。
- 1 つ以上のクラスタで使用されているポリシーは削除できません(autoscalingPolicies.delete を参照してください)。
自動スケーリングの仕組み
自動スケーリングでは、クラスタをスケールするかどうかを決定するために、各「クールダウン」期間が経過するたびにクラスタの Hadoop YARN 指標をチェックし、スケールする場合は、更新の規模を決定します。
自動スケーリングでは、次の式を使用して、ワーカー数に対して必要とされる厳密な変更を判断するために、評価ごとに前回の
cooldown_period
における保留中、使用可能、予約済みの各クラスタメモリの平均値を調べます。
注: スケジュール設定されたセカンダリ ワーカー(存在しないものの、作成のスケジュールが設定されているセカンダリ ワーカー)は、
target worker count
に含まれます。pending memory
は、キューに登録されているがまだ実行されていないタスクがクラスタにあることを占めるシグナルで、ワークロードの処理を改善するためにスケールアップする必要があります。available memory
は、正常なノードでクラスタに余分な帯域幅があることを示すシグナルであり、リソースを節約するためにスケールダウンすることが必要な場合があります。- これらの Apache Hadoop YARN 指標の詳細については、Hadoop と Spark による自動スケーリングをご覧ください。
ワーカー数の厳密な変更が必要な場合、自動スケーリングでは、
scaleUpFactor
やscaleDownFactor
を使用してワーカー数の実際の変化を計算します。if exact Δworkers > 0: actual Δworkers = ROUND_UP(exact Δworkers * scaleUpFactor) # examples: # ROUND_UP(exact Δworkers=5 * scaleUpFactor=0.5) = 3 # ROUND_UP(exact Δworkers=0.8 * scaleUpFactor=0.5) = 1 else: actual Δworkers = ROUND_DOWN(exact Δworkers * scaleDownFactor) # examples: # ROUND_DOWN(exact Δworkers=-5 * scaleDownFactor=0.5) = -2 # ROUND_DOWN(exact Δworkers=-0.8 * scaleDownFactor=0.5) = 0 # ROUND_DOWN(exact Δworkers=-1.5 * scaleDownFactor=0.5) = 0
scaleUpFactor や scaledownFactor が 1.0 の場合、保留中または使用可能なメモリが 0(完全に使用されている)になるように自動スケーリングが行われます。実際のワーカー数の変化が計算されると、
scaleUpMinWorkerFraction
とscaleDownMinWorkerFraction
のどちらかがしきい値として機能し、自動スケーリングによってクラスタがスケールされるかどうか判断されます。Δworkers
が小さい場合でも、割合が小さければ自動スケーリングが必要であることを意味します。Δworkers
が大きい場合は、割合が大きい場合のみ自動スケーリングが行われます。if (Δworkers > scaleUpMinWorkerFraction* cluster size) then scale up
またはif (abs(Δworkers) > scaleDownMinWorkerFraction* cluster size) then scale down
スケールするワーカーの数がスケーリングのトリガーに十分な数の場合、自動スケーリングは、
workerConfig
、secondaryWorkerConfig
、weight
(プライマリ ワーカーとセカンダリ ワーカーの比率)のminInstances
maxInstances
の範囲を使用して、プライマリ ワーカー インスタンス グループとセカンダリ ワーカー インスタンス グループでワーカー数を分割する方法を決定します。こうした計算の結果は、スケーリング期間のクラスタに対する最終的な自動スケーリングの変更になります。
自動スケーリング構成の推奨事項
プライマリ ワーカーのスケーリングを回避する
プライマリ ワーカーは HDFS データノードを実行しますが、セカンダリ ワーカーはコンピューティングのみを行います。セカンダリ ワーカーを使用すると、ストレージをプロビジョニングすることなく、コンピューティング リソースを効率的にスケーリングできるため、スケーリングを高速化できます。HDFS 名前ノードには競合状態が複数あり、これによって HDFS が破損し、デコミッションが恒久的に停止する可能性があります。この問題を回避するには、プライマリ ワーカーのスケーリングは避けてください。次に例を示します。workerConfig:
minInstances: 10
maxInstances: 10
secondaryWorkerConfig:
minInstances: 0
maxInstances: 100
クラスタの作成コマンドを次のように変更する必要があります。
- 自動スケーリング ポリシーのプライマリ ワーカー グループのサイズに合わせて
--num-workers=10
を設定します。 - セカンダリ ワーカーを非プリエンプティブルになるように構成するには、
--secondary-worker-type=non-preemptible
を設定します。(プリエンプティブル VM が必要な場合を除く)。 - ハードウェア構成をプライマリ ワーカーからセカンダリ ワーカーにコピーする。たとえば、
--worker-boot-disk-size=1000GB
と一致するように--secondary-worker-boot-disk-size=1000GB
を設定します。
Spark バッチジョブの高度な柔軟性モードの使用
高度な柔軟性モード(EFM)を自動スケーリングで使用して、次のことを行います。
ジョブの実行中にクラスタの迅速なスケールダウンが可能
クラスタのスケールダウンによるジョブの実行の中断を防ぐ
プリエンプティブル セカンダリ ワーカーのプリエンプトによるジョブ実行の中断を最小限に抑える
EFM が有効な場合、自動スケーリング ポリシーの正常なデコミッションのタイムアウトを 0s
に設定する必要があります。自動スケーリング ポリシーでは、セカンダリ ワーカーの自動スケーリングのみを行う必要があります。
正常なデコミッションのタイムアウトの選択
自動スケーリングでは、クラスタからノードを削除するときの YARN 正常なデコミッションがサポートされます。正常なデコミッションにより、アプリケーションはジョブの進行を妨げないよう、ステージ間でシャッフル データを完了できます。自動スケーリング ポリシーで提供される「正常なデコミッション タイムアウト」は、YARN がノードを削除する前にアプリケーション(デコミッションの開始時に実行されていたアプリケーション)の実行を待機する時間の上限です。
指定された正常なデコミッションのタイムアウト時間内にプロセスが完了しない場合、ワーカーノードは強制的にシャットダウンされ、データ損失やサービス中断が発生する可能性があります。このような可能性を防ぐには、正常なデコミッションのタイムアウトをクラスタが処理する最長ジョブよりも長い値に設定します。たとえば、最長のジョブを 1 時間実行することを想定している場合は、タイムアウトを 1 時間(1h
)以上に設定します。
正常なデコミッションを防ぐには、処理時間が 1 時間を超えるジョブを独自のエフェメラル クラスタに移行することを検討してください。
scaleUpFactor
を設定する
scaleUpFactor
は、オートスケーラーがクラスタをどのくらい積極的にスケールアップするかを制御します。0.0
~ 1.0
の数値を指定して、ノードを追加する YARN の保留中のメモリの小数値を設定します。
たとえば、100 MB の保留中コンテナがそれぞれ 512 MB をリクエストしている場合、保留中の YARN メモリは 50 GB になります。scaleUpFactor が 0.5
の場合、オートスケーラーは 25 GB の YARN メモリを追加するのに十分な数のノードを追加します。同様に、0.1
の場合、オートスケーラーは 5 GB の YARN メモリの追加に十分なノードを追加します。これらの値は、VM 上で物理的に使用可能な合計メモリではなく、YARN メモリに対応しています。
手始めに、0.05
を MapReduce ジョブと動的割り当てが有効になっている Spark ジョブに利用することをおすすめします。固定エグゼキュータ数と Tez ジョブを持つ Spark ジョブの場合は、1.0
を使用します。 scaleUpFactor が 1.0
の場合、保留中または使用可能なメモリが 0(完全に使用されている)になるように自動スケーリングが行われます。
scaleDownFactor
の設定
scaleDownFactor
は、オートスケーラーがクラスタをどのくらい積極的にスケールダウンするかを制御します。0.0
~ 1.0
の数値を指定して、ノードの削除の原因となる YARN メモリの小数値を設定します。
スケールアップやスケールダウンを頻繁に行う必要がない場合、ほとんどのマルチジョブ クラスタではこの値を 1.0
のままにします。正常なデコミッションにより、スケールダウン オペレーションは、スケールアップ オペレーションよりも大幅に遅くなります。scaleDownFactor=1.0
を設定すると、積極的なスケールダウンレートが設定されます。これにより、適切なクラスタサイズを達成するために必要なダウンスケーリング オペレーションの数を最小限に抑えることができます。
安定性を必要とするクラスタの場合は、スケールダウンのレートを遅くするために scaleDownFactor
を低く設定します。
エフェメラル クラスタや単一ジョブクラスタなどでクラスタをスケールダウンしないようにするには、この値を 0.0
に設定します。
scaleUpMinWorkerFraction
と scaleDownMinWorkerFraction
の設定
scaleUpMinWorkerFraction
と scaleDownMinWorkerFraction
は scaleUpFactor
または scaleDownFactor
とともに使用され、デフォルト値は 0.0
です。これらは、オートスケーラーがクラスタをスケールアップまたはスケールダウンするしきい値を表します。スケールアップまたはスケールダウン リクエストを発行するために必要なクラスタサイズの最小の増減値です。
例: scaleUpMinWorkerFraction
が 0.05
(5%)以下でない限り、オートスケーラーは 100 ノードクラスタに 5 つのワーカーを追加する更新リクエストを発行しません。0.1
に設定すると、オートスケーラーはクラスタのスケールアップ リクエストを発行しません。同様に、scaleDownMinWorkerFraction
が 0.05
の場合、少なくとも 5 つのノードが削除されない限り、オートスケーラーはノードを 100 ノードクラスタから削除するための更新リクエストを発行しません。
デフォルト値の 0.0
は、しきい値がないことを示します。
小規模で不要なスケーリング オペレーションを回避するため、大規模なクラスタ(100 ノード以上)で scaleDownMinWorkerFractionthresholds
を高く設定することを強くおすすめします。
クールダウン期間の選択
cooldownPeriod
は、オートスケーラーがクラスタサイズの変更リクエストを発行しない期間を設定します。これを使用すると、オートスケーラーによるクラスタサイズの変更頻度を制限できます。
最小値かつデフォルトの cooldownPeriod
は 2 分です。ポリシーにより短い cooldownPeriod
を設定した場合、ワークロードの変化がクラスタのサイズにより早く影響を与えますが、クラスタが不必要にスケールアップやスケールダウンされる可能性があります。より短いcooldownPeriod
を使う場合は、ポリシーのscaleUpMinWorkerFraction
とscaleDownMinWorkerFraction
をゼロ以外の値に設定することをおすすめします。この設定により、クラスタを更新する正当な理由に値するメモリ使用量の変化がある場合に限り、クラスタがスケールアップまたはスケールダウンされます。
ワークロードがクラスタサイズの変更の影響を受けやすい場合は、クールダウン期間を増やすことができます。たとえば、バッチ処理ジョブを実行する場合は、クールダウン期間を 10 分以上に設定できます。さまざまなクールダウン期間を試して、ワークロードに最適な値を見つけます。
ワーカー数とグループの重み
各ワーカー グループには minInstances
と maxInstances
があり、各グループのサイズにハードリミットを構成します。
各グループには、weight
というパラメータもあり、2 つのグループの間の目標残高を設定します。このパラメータはヒントにすぎません。グループが最小サイズまたは最大サイズに到達すると、ノードの追加と削除は他のグループからのみ行われます。したがって、weight
はほとんどの場合、デフォルトの 1
のままにできます。
自動スケーリング指標とログ
次のリソースとツールは、自動スケーリングの操作と、クラスタとそのジョブへの影響をモニタリングするのに役立ちます。
Cloud Monitoring
Cloud Monitoring を使用すると、次のことができます。
- 自動スケーリングで使用される指標を表示する
- クラスタ内のノード マネージャの数を表示する
- 自動スケーリングでクラスタのスケールが行われた理由、または行われなかった理由を理解する
Cloud Logging
Cloud Logging を使用して、Cloud Dataproc Autoscaler からログを表示します。
1)クラスタのログを検索します。
2)dataproc.googleapis.com/autoscaler
を選択します。
3)ログメッセージを展開して status
フィールドを表示します。ログは、コンピュータが読み取り可能な JSON 形式です。
4)ログメッセージを展開して、スケーリングの推奨値、スケーリングの決定に使用する指標、オリジナルのクラスタサイズ、新しいクラスタサイズを表示します。
背景: Apache Hadoop と Apache Spark による自動スケーリング
続くセクションでは、自動スケーリングと Hadoop YARN、Hadoop Mapreduce、Apache Spark、Spark Streaming、Spark Structured Streaming との連携について説明します。
Hadoop YARN 指標
自動スケーリングでは、YARN コアリクエストではなく、YARN メモリ リクエストに基づいてジョブをスケジュールするように Hadoop YARN を構成します。
自動スケーリングは、次の Hadoop YARN 指標に基づいて実行されます。
Allocated memory
は、コンテナを実行することによってクラスタ全体で使用される YARN メモリの合計を指します。6 つの実行中のコンテナがあり、それぞれ最大 1GB まで使用できる場合は、6GB のメモリが割り当てられます。Available memory
は、割り当てられたコンテナで使用されない、クラスタ内の YARN メモリです。すべてのノード マネージャーに 10GB のメモリがあり、6GB のメモリが割り当てられている場合、利用可能なメモリは 4GB あります。クラスタに利用可能な(使用されていない)メモリがある場合、自動スケーリングでクラスタからワーカーが削除されることがあります。Pending memory
は、保留中のコンテナに対する YARN メモリ リクエストの合計です。保留中のコンテナは、YARN で実行するスペースを待機しています。利用可能なメモリが 0、または少なすぎて次のコンテナに割り当てることができない場合にのみ、保留中のメモリが 0 以外になります。保留中のコンテナがある場合、自動スケーリングでクラスタにワーカーが追加されることがあります。
これらの指標は Cloud Monitoring で確認できます。 デフォルトでは、クラスタ上の YARN メモリは 0.8 * 合計メモリとなり、残りのメモリは他のデーモンや、ページ キャッシュなどオペレーティング システムでの使用に予約されます。このデフォルト値は、「yarn.nodemanager.resource.memory-mb」の YARN 構成設定でオーバーライドできます(Apache Hadoop YARN、HDFS、Spark、関連プロパティを参照)。
自動スケーリングと Hadoop MapReduce
MapReduce は各マップを実行し、独立した YARN コンテナとしてタスクを削減します。ジョブが開始されると、MapReduce は各マップタスクに対するコンテナ リクエストを送信するため、保留中の YARN メモリが急増します。マップタスクが完了すると、保留中のメモリは減少します。
mapreduce.job.reduce.slowstart.completedmaps
が完了すると(Dataproc でのデフォルトは 95%)、MapReduce によりすべてのレデューサに対するコンテナ リクエストがキューに追加され、保留メモリがさらに急増します。
タスクのマッピングと縮小に数分以上かかる場合を除き、自動スケーリング scaleUpFactor
に高い値を設定しないでください。クラスタにワーカーを追加するには少なくとも 1.5 分かかるので、新しいワーカーを利用するための十分な保留作業を行えるように数分間を確保してください。最初は scaleUpFactor
を保留中のメモリの 0.05(5%)または 0.1(10%)に設定することをおすすめします。
自動スケーリングと Spark
Spark は、YARN の上にスケジューリングのためのレイヤを追加します。具体的には、Spark Core のダイナミック アロケーションにより、コンテナで Spark エグゼキュータを実行するように YARN にリクエストし、それらのエグゼキュータのスレッドで Spark タスクをスケジュールします。Dataproc クラスタではデフォルトでダイナミック アロケーションが有効になっているため、エグゼキュータは必要に応じて追加削除されます。
Spark は常に YARN にコンテナを要求しますが、動的割り当てが行われない場合は、ジョブの開始時にのみコンテナを要求します。動的割り当てが行われる場合、必要に応じてコンテナを削除したり、新しいコンテナをリクエストしたりします。
Spark は少数(自動スケーリング クラスタ上で 2 つ)のエグゼキュータから開始されます。未処理のタスクがある間は、エグゼキュータの数を倍増し続けます。これにより、保留中のメモリが少なくなります(保留中のメモリが急増することが少なくなります)。Spark ジョブの場合は、自動スケーリング scaleUpFactor
に 1.0(100%)などの大きい数を設定することをおすすめします。
Spark の動的割り当てを無効にする
Spark ダイナミック アロケーションのメリットがない別の Spark ジョブを実行する場合は、spark.dynamicAllocation.enabled=false
と spark.executor.instances
を設定して Spark ダイナミック アロケーションを無効にできます。別の Spark ジョブの実行中でも、自動スケーリングを使用してクラスタをスケールできます。
キャッシュ データを使用した Spark ジョブ
データセットが不要になった場合は、spark.dynamicAllocation.cachedExecutorIdleTimeout
をセットするか、データセットのキャッシュを解除します。デフォルトでは、Spark はキャッシュされたデータのあるエグゼキュータを削除せず、このことによってクラスタのスケールダウンを防止します。
自動スケーリングと Spark Streaming
Spark Streaming には、ストリーミング固有のシグナルを使用してエグゼキュータの追加や削除を行う独自のバージョンのダイナミック アロケーションがあるため
spark.streaming.dynamicAllocation.enabled=true
を設定し、Spark Core のダイナミック アロケーションを無効にするspark.dynamicAllocation.enabled=false
を設定します。Spark Streaming ジョブと一緒に正常なデコミッション(自動スケーリング
gracefulDecommissionTimeout
)を使用しないでください。代わりに、自動スケーリングでワーカーを安全に削除するには、フォールト トレランスのためにチェックポイントを構成します。
または、自動スケーリングなしで Spark Streaming を使用するには、次の手順を行います。
- Spark Core のダイナミック アロケーション(
spark.dynamicAllocation.enabled=false
)を無効にする。 - ジョブのエグゼキュータの数(
spark.executor.instances
)を設定する。クラスタ プロパティをご覧ください。
自動スケーリングと Spark Structured Streaming
現在、Spark Structured Streaming では動的割り当てがサポートされていないため、自動スケーリングには Spark Structured Streaming との互換性がありません(SPARK-24815: Structured Streaming should support dynamic allocation をご覧ください)。
パーティショニングと並列処理による自動スケーリングの制御
通常、並列処理はクラスタ リソースによって設定または決定されます(たとえば、HDFS ブロック数はタスクの数に応じて制御されます)が、自動スケーリングでは反対になります。自動スケーリングでは、ジョブの並列性に従ってワーカーの数が設定されます。ジョブの並列処理の設定に役立つガイドラインは次のとおりです。
- Dataproc では、初期のクラスタサイズに基づいて MapReduce のデフォルトの削減タスク数が設定されますが、
mapreduce.job.reduces
を設定して削減フェーズの並列処理を増やすことができます。 - Spark SQL と Dataframe の並列処理は
spark.sql.shuffle.partitions
によって決まります。デフォルトは 200 です。 - Spark の RDD 関数はデフォルトで
spark.default.parallelism
に設定されます。これは、ジョブの開始時にワーカーノードのコア数に設定されます。ただし、シャッフルを作成する RDD 関数はすべて、spark.default.parallelism
をオーバーライドするパーティションの数のパラメータをとります。
データが均等に分割されていることを確認する必要があります。重大なキースキューがある場合、1 つ以上のタスクが他のタスクよりも大幅に時間がかかり、結果として使用率が低くなる可能性があります。
自動スケーリングのデフォルトの Spark / Hadoop プロパティ設定
自動スケーリング クラスタには、プライマリ ワーカーが削除されたときやセカンダリ ワーカーがプリエンプトされたときのジョブの失敗を回避するのに役立つデフォルトのクラスタ プロパティ値があります。これらのデフォルト値は、自動スケーリングを使用してクラスタを作成するときにオーバーライドできます(クラスタ プロパティ を参照してください)。
以下は、タスク、アプリケーション マスター、ステージの最大再試行回数を増やすデフォルト設定です。
yarn:yarn.resourcemanager.am.max-attempts=10 mapred:mapreduce.map.maxattempts=10 mapred:mapreduce.reduce.maxattempts=10 spark:spark.task.maxFailures=10 spark:spark.stage.maxConsecutiveAttempts=10
以下は再試行カウンタをリセットするデフォルト設定です(長時間実行する Spark Streaming ジョブに役立ちます)。
spark:spark.yarn.am.attemptFailuresValidityInterval=1h spark:spark.yarn.executor.failuresValidityInterval=1h
以下は、Spark の小規模から開始する slow-start ダイナミック アロケーション メカニズムのデフォルト設定です。
spark:spark.executor.instances=2
よくある質問(FAQ)
高可用性クラスタと単一ノード クラスタで自動スケーリングを有効にすることはできますか?
高可用性クラスタでは自動スケーリングを有効にできますが、単一ノードクラスタでは有効にできません(単一ノードクラスタではサイズ変更がサポートされません)。
自動スケーリング クラスタのサイズを手動で変更できますか?
○。自動スケーリング ポリシーを調整するときは、暫定措置として、クラスタのサイズを手動で変更できます。ただし、こうした変更には一時的な効果しかないため、最終的には自動スケーリングでクラスタが縮小されます。
自動スケーリング クラスタのサイズを手動で変更する代わりに、次のことを検討してください。
自動スケーリング ポリシーを更新する。自動スケーリング ポリシーに加えた変更は、現在そのポリシーを使用しているすべてのクラスタに影響します(マルチクラスタ ポリシーの使用を参照)。
ポリシーを切り離し、クラスタを適切なサイズに手動でスケールする。
Dataproc と Dataflow の自動スケーリングはどのように違いますか?
Cloud Dataflow の自動スケーリングを Spark や Hadoop と比較するをご覧ください。
Dataproc の開発チームは、クラスタのステータスを ERROR
から RUNNING
にリセットできますか?
一般的に答えは「いいえ」です。手動でリセットするには、クラスタの状態をリセットするための作業が安全であることを確認するための手動作業が必要です。多くの場合、HDFS の Namenode の再起動など、他の手動による手順がなければクラスタはリセットできません。
失敗したオペレーション後のクラスタのステータスを特定できない場合、Dataproc はクラスタのステータスを ERROR
に設定します。ERROR
のクラスタでは、自動スケーリングやジョブの実行ができなくなります。一般的には次のような原因が考えられます。
Compute Engine API から返されたエラー。多くの場合、Compute Engine のサービス停止中に起こります。
HDFS のデコミッションの不具合により、HDFS が破損した状態になっている。
Dataproc Control API エラー(「タスクのリースの期限切れ」など)
Google はこれらの問題に対する Dataproc の復元性の強化に取り組んでいます。
今は、ステータスが ERROR
のクラスタを削除して再作成してください。