Dataproc-Cluster automatisch skalieren

Was ist Autoscaling?

Das Schätzen der "richtigen" Anzahl von Cluster-Workern (Knoten) für eine Arbeitslast ist schwierig und eine einzelne Clustergröße für eine gesamte Pipeline ist oft nicht ideal. Vom Nutzer initiierte Cluster-Skalierung wird diese Herausforderung teilweise gelöst, erfordert jedoch die Überwachung der Clusternutzung und ein manuelles Eingreifen.

Die Dataproc AutoscalingPolicies API bietet einen Mechanismus zur Automatisierung der Clusterressourcenverwaltung und ermöglicht das Autoscaling von Cluster-Worker-VMs. Eine Autoscaling Policy ist eine wiederverwendbare Konfiguration, die beschreibt, wie Cluster-Worker mit der Autoscaling-Richtlinie skalieren sollen. Sie definiert Skalierungsgrenzen, Häufigkeit und Aggressivität, um eine detaillierte Kontrolle über Clusterressourcen während der gesamten Clusterlebensdauer zu ermöglichen.

Einsatzmöglichkeiten für Autoscaling

Verwenden Sie Autoscaling:

bei Clustern, die Daten in externen Diensten speichern, wie z. B. Cloud Storage oder BigQuery

bei Clustern, die viele Jobs verarbeiten

um einzelne Jobcluster zu skalieren

mit dem Enhanced Flexibility Mode für Spark-Batch-Jobs

Autoscaling wird nicht empfohlen bei/für:

  • HDFS: Autoscaling ist nicht für die Skalierung von HDFS im Cluster vorgesehen, weil:

    1. Die HDFS-Nutzung ist kein Signal für das Autoscaling.
    2. HDFS-Daten werden nur auf primären Workern gehostet. Die Anzahl der primären Worker muss ausreichen, um alle HDFS-Daten zu hosten.
    3. Durch die Außerbetriebnahme von HDFS-DataNodes kann das Entfernen von Workern verzögert werden. Datenknoten kopieren HDFS-Blöcke in andere Datenknoten, bevor ein Worker entfernt wird. Je nach Datengröße und Replikationsfaktor kann dieser Vorgang Stunden dauern.
  • YARN-Knotenlabels:Autoscaling unterstützt keine YARN-Knotenlabels und das Attribut dataproc:am.primary_only aufgrund von YARN-9088. YARN meldet Clustermesswerte nicht, wenn Knotenlabels verwendet werden.

  • Spark Structured Streaming: Autoscaling unterstützt kein Spark Structured Streaming. Informationen dazu finden Sie unter Autoscaling und Spark Structured Streaming.

  • Inaktive Cluster: Autoscaling eignet sich nicht zum Herunterskalieren eines Clusters auf die Mindestgröße, wenn der Cluster inaktiv ist. Da das Erstellen eines neuen Clusters genauso schnell geht wie das Ändern seiner Größe, ist es sinnvoller, inaktive Cluster stattdessen zu löschen und neu zu erstellen. Folgende Tools unterstützen dieses "ephemerische" Modell:

    Verwenden Sie Dataproc-Workflows, um einen Satz von Jobs in einem dedizierten Cluster zu planen. Löschen Sie den Cluster, wenn die Jobs abgeschlossen sind. Verwenden Sie für eine bessere Orchestrierung Cloud Composer, einen Dienst, der auf Apache Airflow basiert.

    Verwenden Sie für Cluster, die Ad-hoc-Abfragen oder extern geplante Arbeitslasten verarbeiten, Cluster Scheduled Deletion, um den Cluster nach einer bestimmten Dauer oder Zeit der Inaktivität bzw. zu einem bestimmten Zeitpunkt zu löschen.

  • Arbeitslasten unterschiedlicher Größe:Wenn kleine und große Jobs in einem Cluster ausgeführt werden, wird beim ordnungsgemäßen Deaktivierungs-Skalieren gewartet, bis die großen Jobs abgeschlossen sind. Das führt dazu, dass bei einem lang laufenden Job die Ressourcen für kleinere Jobs, die im Cluster ausgeführt werden, erst nach Abschluss des lang laufenden Jobs automatisch skaliert werden. Um dies zu vermeiden, sollten Sie kleinere Jobs ähnlicher Größe in einem Cluster gruppieren und jeden Job mit langer Dauer in einem separaten Cluster ausführen.

Autoscaling aktivieren

So aktivieren Sie Autoscaling in einem Cluster:

  1. Erstellen Sie eine Autoscaling-Richtlinie.

  2. Entweder:

    1. Autoscaling-Cluster erstellen oder
    2. Aktivieren Sie Autoscaling für einen vorhandenen Cluster

Autoscaling-Richtlinie erstellen

gcloud-Befehl

Sie können eine Autoscaling-Richtlinie mit dem Befehl gcloud dataproc autoscaling-policies import erstellen. Sie liest eine lokale YAML-Datei, die eine Autoscaling-Richtlinie definiert. Das Format und der Inhalt der Datei müssen den Konfigurationsobjekten und Feldern entsprechen, die von der REST API autoscalingPolicies definiert sind.

Im folgenden YAML-Beispiel wird eine Richtlinie definiert, die alle erforderlichen Felder angibt. Außerdem werden die Werte minInstances und maxInstances für die primären Worker und der Wert maxInstances für die sekundären (abrufbaren) Worker angegeben und eine 4-minütige cooldownPeriod festgelegt (der Standardwert ist 2 Minuten). workerConfig konfiguriert die primären Worker. In diesem Beispiel werden minInstances und maxInstances auf denselben Wert gesetzt, um die Skalierung der primären Worker zu vermeiden.

workerConfig:
  minInstances: 10
  maxInstances: 10
secondaryWorkerConfig:
  maxInstances: 50
basicAlgorithm:
  cooldownPeriod: 4m
  yarnConfig:
    scaleUpFactor: 0.05
    scaleDownFactor: 1.0
    gracefulDecommissionTimeout: 1h

Hier ist ein weiteres YAML-Beispiel, das alle optionalen und erforderlichen Felder der Autoscaling-Richtlinie angibt.

workerConfig:
  minInstances: 10
  maxInstances: 10
  weight: 1
secondaryWorkerConfig:
  minInstances: 0
  maxInstances: 100
  weight: 1
basicAlgorithm:
  cooldownPeriod: 2m
  yarnConfig:
    scaleUpFactor: 0.05
    scaleDownFactor: 1.0
    scaleUpMinWorkerFraction: 0.0
    scaleDownMinWorkerFraction: 0.0
    gracefulDecommissionTimeout: 1h

Führen Sie zum Erstellen der Autoscaling-Richtlinie den folgenden gcloud-Befehl über ein lokales Terminal oder in Cloud Shell aus. Benennen Sie die Richtlinie. Dieser Name wird zur Richtlinie id, die Sie in späteren gcloud-Befehlen verwenden können, um auf die Richtlinie zu verweisen. Verwenden Sie das Flag --source, um den lokalen Dateipfad und den Dateinamen der YAML-Datei der Autoscaling-Richtlinie anzugeben, die importiert werden soll.

gcloud dataproc autoscaling-policies import policy-name \
    --source=filepath/filename.yaml \
    --region=region

REST API

Erstellen Sie eine Autoscaling-Richtlinie, indem Sie eine AutoscalingPolicy als Teil einer autoscalingPolicies.create-Anfrage definieren.

Console

Wählen Sie zum Erstellen einer Autoscaling-Richtlinie in der Google Cloud Console auf der Dataproc-Seite Autoscaling-Richtlinien die Option CREATE POLICY (Richtlinie erstellen) aus. Auf der Seite Richtlinie erstellen können Sie ein Steuerfeld mit Richtlinienempfehlungen auswählen, um die Felder für die Autoscaling-Richtlinie für einen bestimmten Jobtyp oder ein Skalierungsziel auszufüllen.

Autoscaling-Cluster erstellen

Nachdem Sie eine Autoscaling-Richtlinie erstellt haben, erstellen Sie einen Cluster, der die Autoscaling-Richtlinie verwendet. Der Cluster muss sich in derselben Region wie die Autoscaling-Richtlinie befinden.

gcloud-Befehl

Führen Sie den folgenden gcloud-Befehl über ein lokales Terminal oder in Cloud Shell aus, um einen Autoscaling-Cluster zu erstellen. Geben Sie einen Namen für den Cluster an und verwenden Sie das Flag --autoscaling-policy, um policy id (den Namen der Richtlinie, den Sie beim Erstellen der Richtlinie angegeben haben) oder die Richtlinie resource URI (resource name) (siehe die Felder AutoscalingPolicy id und name) anzugeben.

gcloud dataproc clusters create cluster-name \
    --autoscaling-policy=policy id or resource URI \
    --region=region

REST API

Erstellen Sie einen Autoscaling-Cluster, indem Sie AutoscalingConfig als Teil einer clusters.create-Anfrage hinzufügen.

Console

Sie können eine vorhandene Autoscaling-Richtlinie für die Anwendung auf einen neuen Cluster aus dem Abschnitt „Autoscaling-Richtlinie“ im Bereich „Cluster einrichten“ auf der Seite Cluster erstellen auf der Seite „Dataproc“ der Google Cloud Console auswählen.

Autoscaling für einen vorhandenen Cluster aktivieren

Nach dem Erstellen einer Autoscaling-Richtlinie können Sie die Richtlinie für einen vorhandenen Cluster in derselben Region aktivieren.

gcloud-Befehl

Führen Sie den folgenden gcloud-Befehl von einem lokalen Terminal oder in Cloud Shell aus, um eine Autoscaling-Richtlinie für einen vorhandenen Cluster zu aktivieren. Geben Sie einen Namen für den Cluster an und verwenden Sie das Flag --autoscaling-policy, um policy id (den Namen der Richtlinie, den Sie beim Erstellen der Richtlinie angegeben haben) oder die Richtlinie resource URI (resource name) (siehe die AutoscalingPolicy-Felder id und name) anzugeben.

gcloud dataproc clusters update cluster-name \
    --autoscaling-policy=policy id or resource URI \
    --region=region

REST API

Um eine Autoscaling-Richtlinie für einen vorhandenen Cluster zu aktivieren, legen Sie den AutoscalingConfig.policyUri der Richtlinie in der updateMask einer clusters.patch-Anfrage fest.

Console

Derzeit wird das Aktivieren einer Autoscaling-Richtlinie für einen vorhandenen Cluster in der Google Cloud Console nicht unterstützt.

Multi-Cluster-Richtlinien verwenden

  • Eine Autoscaling-Richtlinie definiert das Skalierungsverhalten, das auf mehrere Cluster angewendet werden kann. Autoscaling-Richtlinien lassen sich am besten für mehrere Cluster anwenden, wenn die Cluster ähnliche Arbeitslasten gemeinsam verwenden oder Jobs mit ähnlichen Ressourcennutzungsmustern ausführen.

  • Sie können eine Richtlinie aktualisieren, die von mehreren Clustern verwendet wird. Die Aktualisierungen wirken sich sofort auf das Autoscaling-Verhalten aller Cluster aus, die die Richtlinie verwenden (siehe autoscalingPolicies.update). Wenn Sie nicht möchten, dass eine Richtlinienaktualisierung auf einen Cluster angewendet wird, der die Richtlinie verwendet, deaktivieren Sie das Autoscaling auf dem Cluster, bevor Sie die Richtlinie aktualisieren.

gcloud-Befehl

Führen Sie den folgenden gcloud-Befehl in einem lokalen Terminal oder in Cloud Shell aus, um Autoscaling in einem Cluster zu deaktivieren.

gcloud dataproc clusters update cluster-name --disable-autoscaling \
    --region=region

REST API

Um Autoscaling auf einem Cluster zu deaktivieren, setzen Sie AutoscalingConfig.policyUri auf den leeren String und legen Sie update_mask=config.autoscaling_config.policy_uri in einer clusters.patch-Anfrage fest.

Console

Derzeit wird das Deaktivieren von Autoscaling in einem Cluster in der Google Cloud Console nicht unterstützt.

  • Eine Richtlinie, die von einem oder mehreren Clustern verwendet wird, kann nicht gelöscht werden (siehe autoscalingPolicies.delete).

Funktionsweise von Autoscaling

Beim Autoscaling werden die Hadoop-YARN-Messwerte des Clusters während jeder "Cooldown-Zeit" überprüft, um festzustellen, ob der Cluster skaliert werden soll und, wenn ja, in welchem Umfang.

  1. Anhand des Werts des YARN-Messwerts für ausstehende Ressourcen (ausstehender Arbeitsspeicher oder ausstehende Kerne) wird entschieden, ob hoch- oder herunterskaliert werden soll. Ein Wert größer als 0 gibt an, dass YARN-Jobs auf Ressourcen warten und eine Skalierung nach oben erforderlich sein kann. Ein Wert von 0 gibt an, dass YARN genügend Ressourcen hat, sodass eine Skalierung nach unten oder andere Änderungen möglicherweise nicht erforderlich sind.

    Wenn die ausstehende Ressource größer als 0 ist:

    $estimated\_worker\_count =$

    \[ \Biggl \lceil AVERAGE\ during\ cooldown\ period\Big(\frac{Pending + Available + Allocated + Reserved}{Resource\ per\ worker}\Big)\Biggr \rceil \]

    Wenn die ausstehende Ressource 0 ist:

    $estimated\_worker\_count =$

    \[ \Biggl \lceil AVERAGE\ during\ cooldown\ period\Big(\frac{Allocated + Reserved}{Resource\ per\ worker}\Big)\Biggr \rceil \]

    Standardmäßig überwacht der Autoscaler die YARN-Speicherressource. Wenn Sie die kernbasierte Autoscaling-Funktion aktivieren, werden sowohl der YARN-Speicher als auch die YARN-Kerne überwacht:estimated_worker_count wird separat für Speicher und Kerne ausgewertet und die daraus resultierende größere Anzahl von Workern wird ausgewählt.

    $estimated\_worker\_count =$

    \[ max(estimated\_worker\_count\_by\_memory,\ estimated\_worker\_count\_by\_cores) \]

    \[ estimated\ \Delta worker = estimated\_worker\_count - current\_worker\_count \]

  2. Je nach benötigter Anzahl der Worker verwendet Autoscaling einen scaleUpFactor oder scaleDownFactor, um die erforderliche Änderung an der Anzahl der Worker zu berechnen:

    if estimated Δworkers > 0:
      actual Δworkers = ROUND_UP(estimated Δworkers * scaleUpFactor)
      # examples:
      # ROUND_UP(estimated Δworkers=5 * scaleUpFactor=0.5) = 3
      # ROUND_UP(estimated Δworkers=0.8 * scaleUpFactor=0.5) = 1
    else:
      actual Δworkers = ROUND_DOWN(estimated Δworkers * scaleDownFactor)
      # examples:
      # ROUND_DOWN(estimated Δworkers=-5 * scaleDownFactor=0.5) = -2
      # ROUND_DOWN(estimated Δworkers=-0.8 * scaleDownFactor=0.5) = 0
      # ROUND_DOWN(estimated Δworkers=-1.5 * scaleDownFactor=0.5) = 0
    
    Ein scaleUpFactor oder ein scaleDownFactor von 1,0 bedeutet, dass das Autoscaling so skaliert wird, dass die ausstehende/verfügbare Ressource 0 ist (perfekte Auslastung).

  3. Sobald die Änderung an der Anzahl der Worker berechnet wurde, fungiert entweder scaleUpMinWorkerFraction und scaleDownMinWorkerFraction als Schwellenwert, um festzustellen, ob Autoscaling den Cluster skaliert. Ein niedriger Wert bedeutet, dass Autoscaling auch dann skalieren sollte, wenn Δworkers klein ist. Ein größerer Wert bedeutet, dass nur skaliert werden soll, wenn Δworkers groß ist.

    IF (Δworkers >  scaleUpMinWorkerFraction * current_worker_count) then scale up
    
    ODER
    IF (abs(Δworkers) >  scaleDownMinWorkerFraction * current_worker_count),
    THEN scale down.
    

  4. Wenn die Anzahl der zu skalierenden Worker groß genug ist, um eine Skalierung auszulösen, verwendet Autoscaling die minInstances/maxInstances-Grenzen von workerConfig und secondaryWorkerConfig und weight (Verhältnis von primären zu sekundären Workern), um zu bestimmen, wie die Anzahl der Worker auf die primären und sekundären Worker-Instanzgruppen aufgeteilt werden soll. Das Ergebnis dieser Berechnungen ist die endgültige Änderung des Clusters, die das Autoscaling in diesem Skalierungszeitraum vornimmt.

  5. Skalierungsanfragen für die Autoskalierung werden für Cluster, die mit Imageversionen nach 2.0.57 und 2.1.5 erstellt wurden, unter folgenden Bedingungen abgebrochen:

    1. eine Skalierung mit einem Zeitlimit für die ordnungsgemäße Außerbetriebnahme ungleich null ausgeführt wird und
    2. Die Anzahl der AKTIVEN YARN-Worker („aktive Worker“) und die Änderung der vom Autoscaler empfohlenen Gesamtzahl der Worker (Δworkers) ist gleich oder größer als DECOMMISSIONING YARN-Worker („deaktivierte Worker“), wie in der folgenden Formel dargestellt:

      IF (active workers + Δworkers ≥ active workers + decommissioning workers)
      THEN cancel the scaledown operation
      

    Ein Beispiel für die Stornierung einer Herunterskalierung finden Sie unter Wann bricht das Autoscaling einen Herunterskalierungsvorgang ab?.

Empfehlungen für die Autoscaling-Konfiguration

Primäre Worker nicht skalieren

HDFS-Knotenknoten werden von primären Workern ausgeführt, sekundäre Worker dagegen nur Computing-Worker. Mithilfe sekundärer Worker können Sie Rechenressourcen effizient skalieren, ohne Speicher bereitstellen zu müssen. Dies führt zu schnelleren Skalierungsfunktionen. HDFS-Namenode können mehrere Race-Bedingungen haben, die dazu führen, dass HDFS beschädigt wird und die Außerbetriebnahme ewig dauert. Skalieren Sie keine primären Worker, um dieses Problem zu vermeiden. Beispiel: workerConfig: minInstances: 10 maxInstances: 10 secondaryWorkerConfig: minInstances: 0 maxInstances: 100

Am Befehl zur Cluster-Erstellung müssen Sie einige Änderungen vornehmen:

  1. Legen Sie --num-workers=10 fest, um die Größe der primären Workergruppe der Autoscaling-Richtlinie anzupassen.
  2. Legen Sie --secondary-worker-type=non-preemptible fest, um sekundäre Worker als nicht abrufbar zu konfigurieren. (Es sei denn, dass VMs auf Abruf erwünscht sind).
  3. Kopieren Sie die Hardwarekonfiguration von primären Worker auf sekundäre Worker. Legen Sie beispielsweise --secondary-worker-boot-disk-size=1000GB auf --worker-boot-disk-size=1000GB fest.

Verwenden Sie den Enhanced Flexibility Mode für Spark-Batchjobs

Mit dem Enhanced Flexibility Mode (EFM) und der automatischen Skalierung können Sie Folgendes tun:

 ermöglicht eine schnellere Herunterskalierung des Clusters während laufender Jobs

Unterbrechungen bei laufenden Jobs aufgrund einer Clusterverkleinerung verhindern

Unterbrechungen bei laufenden Jobs aufgrund der Voraktivierung von sekundären Workern auf Abruf minimieren

Wenn EFM aktiviert ist, muss das Zeitlimit für die ordnungsgemäße Außerbetriebnahme der Autoscaling-Richtlinie auf 0s gesetzt werden. Die Autoscaling-Richtlinie darf nur sekundäre Worker automatisch skalieren.

Zeitlimit für ordnungsgemäße Außerbetriebnahme auswählen

Autoscaling unterstützt die ordnungsgemäße Außerbetriebnahme von YARN, wenn Knoten aus einem Cluster entfernt werden. Die ordnungsgemäße Außerbetriebnahme ermöglicht es Anwendungen, nach dem Zufallsprinzip Teil der Daten zwischen den Phasen zu verschieben, um den Rückschrittfortschritt zu vermeiden. Das Zeitlimit für die ordnungsgemäße Außerbetriebnahme in einer Autoscaling-Richtlinie ist die Obergrenze des Zeitraums, den YARN auf das Ausführen von Anwendungen wartet (Anwendung, die beim Start der Außerbetriebnahme ausgeführt wurde), bevor Knoten entfernen werden.

Wenn ein Prozess nicht innerhalb des angegebenen Zeitlimits für die ordnungsgemäße Deaktivierung abgeschlossen wird, wird der Arbeitsknoten zwangsweise heruntergefahren. Dies kann zu Datenverlusten oder Dienstunterbrechungen führen. Um dies zu vermeiden, sollten Sie das Zeitlimit für die ordnungsgemäße Außerbetriebnahme auf einen Wert festlegen, der länger ist als der längste Job, den der Cluster verarbeiten wird. Wenn Sie beispielsweise davon ausgehen, dass der längste Job eine Stunde lang ausgeführt wird, legen Sie die Zeitüberschreitung auf mindestens eine Stunde (1h) fest.

Erwägen Sie Jobs, die länger als eine Stunde dauern, zu ihren eigenen sitzungsspezifischen Clustern zu migrieren und so eine ordnungsgemäße Außerbetriebnahme zu vermeiden.

scaleUpFactor festlegen

scaleUpFactor steuert, wie intensiv das Autoscaling einen Cluster hochskaliert. Geben Sie eine Zahl zwischen 0.0 und 1.0 an, um den Bruchteil der ausstehenden YARN-Ressource festzulegen, der zum Hinzufügen von Knoten führt.

Wenn es beispielsweise 100 ausstehende Container gibt, die jeweils 512 MB anfordern, gibt es 50 GB ausstehender YARN-Speicher. Wenn „scaleUpFactor” 0.5 ist, fügt das Autoscaling genügend Knoten hinzu, um 25 GB YARN-Speicher hinzuzufügen. Wenn 0.1 hingegen gleich ist, fügt das Autoscaling genügend Knoten für 5 GB hinzu. Beachten Sie, dass diese Werte dem YARN-Speicher entsprechen, nicht dem Gesamtspeicher, der für eine VM verfügbar ist.

Ein guter Ausgangspunkt ist 0.05 für MapReduce-Jobs und Spark-Jobs mit aktivierter dynamischer Zuweisung. Verwenden Sie für Spark-Jobs mit einer festen Executor-Anzahl und Tez-Jobs 1.0. Ein scaleUpFactor von 1.0 bedeutet, dass das Autoscaling so skaliert wird, dass die ausstehende/verfügbare Ressource 0 ist (perfekte Auslastung).

Einstellen von scaleDownFactor

scaleDownFactor steuert, wie intensiv das Autoscaling einen Cluster herunterskaliert. Geben Sie eine Zahl zwischen 0.0 und 1.0 an, um den Bruchteil des verfügbaren YARN-Speichers festzulegen, der zum Entfernen von Knoten führt.

Belassen Sie diesen Wert für die meisten Multi-Job-Cluster, die häufig hoch- und herunterskaliert werden müssen, auf 1.0. Aufgrund einer ordnungsgemäßen Außerbetriebnahme sind die Vorgänge zum Herunterskalieren wesentlich langsamer als das vertikale Skalieren. Mit scaleDownFactor=1.0 wird eine aggressive Skalierungsrate festgelegt, wodurch die Anzahl der Skalierungsvorgänge reduziert wird, um die richtige Clustergröße zu erzielen.

Legen Sie für Cluster, die mehr Stabilität benötigen, eine niedrigere scaleDownFactor für eine langsamere Herunterskalierungsrate fest.

Setzen Sie diesen Wert auf 0.0, um die Skalierung des Clusters zu verhindern, z. B. bei sitzungsspezifischen oder einzelnen Jobclustern.

scaleUpMinWorkerFraction und scaleDownMinWorkerFraction festlegen

scaleUpMinWorkerFraction und scaleDownMinWorkerFraction werden mit scaleUpFactor oder scaleDownFactor verwendet und haben die Standardwerte 0.0. Sie stellen die Schwellenwerte dar, mit denen Autoscaling den Cluster vertikal oder herunterskaliert: die minimale prozentuale Erhöhung oder Verringerung der Clustergröße, die zum Senden von Anfragen zum Skalieren nach oben oder unten erforderlich ist.

Beispiele: Das Autoscaling sendet nur dann eine Aktualisierungsanfrage, um einem Cluster mit 100 Knoten 5 Worker hinzuzufügen, wenn scaleUpMinWorkerFraction kleiner oder gleich 0.05 (5%) ist. Wenn dieser Wert auf 0.1 festgelegt ist, sendet der Autoscaler keine Anfrage zum Hochskalieren des Clusters. Wenn scaleDownMinWorkerFraction 0.05 ist, führt das Autoscaling in einem 100-Knoten-Cluster nur dann eine Aktualisierung durch, wenn mindestens fünf Knoten entfernt werden müssen.

Der Standardwert von 0.0 gibt keinen Grenzwert an.

Für große Cluster (mehr als 100 Knoten) wird dringend empfohlen, einen höheren Wert für scaleDownMinWorkerFractionthresholds festzulegen, um kleine, unnötige Skalierungsvorgänge zu vermeiden.

Cooldown-Zeit auswählen

Mit cooldownPeriod wird ein Zeitraum festgelegt, in dem der Autoscaler keine Anfragen zur Änderung der Clustergröße stellt. Sie können damit die Häufigkeit der Autoscaler-Änderungen an die Clustergröße binden.

Der Mindest- und Standardwert für cooldownPeriod beträgt zwei Minuten. Wenn in einer Richtlinie eine kürzere cooldownPeriod festgelegt ist, wirken sich Änderungen der Arbeitslast schneller auf die Clustergröße aus, aber Cluster können unnötigerweise vertikal oder horizontal skaliert werden. Es wird empfohlen, scaleUpMinWorkerFraction und scaleDownMinWorkerFraction einer Richtlinie auf einen Wert ungleich Null zu setzen, wenn eine kürzere cooldownPeriod verwendet wird. Dadurch wird sichergestellt, dass der Cluster nur dann hoch- oder herunterskaliert wird, wenn die Änderung der Ressourcenauslastung für eine Clusteraktualisierung ausreicht.

Wenn Ihre Arbeitslast empfindlich auf Änderungen der Clustergröße reagiert, können Sie die Wartezeit verlängern. Wenn Sie beispielsweise einen Batchverarbeitungsjob ausführen, können Sie die Wartezeit auf mindestens 10 Minuten festlegen. Experimentieren Sie mit verschiedenen Wartezeiten, um den Wert zu ermitteln, der für Ihre Arbeitslast am besten geeignet ist.

Zählgrenzen und Gruppengewicht der Worker

Jede Worker-Gruppe hat minInstances und maxInstances, die ein festes Limit für die Größe jeder Gruppe konfigurieren.

Jede Gruppe hat außerdem einen Parameter namens weight, der das Zielsaldo zwischen den beiden Gruppen konfiguriert. Beachten Sie, dass dieser Parameter nur ein Hinweis ist. Wenn eine Gruppe die Mindest- oder Höchstgröße erreicht, werden Knoten nur der anderen Gruppe hinzugefügt oder daraus entfernt. Daher kann weight fast immer standardmäßig auf 1 bleiben.

Kernbasiertes Autoscaling aktivieren

Standardmäßig verwendet YARN Speichermesswerte für die Ressourcenzuweisung. Bei CPU-intensiven Anwendungen empfiehlt es sich, YARN so zu konfigurieren, dass der Dominante Ressourcenrechner verwendet wird. Legen Sie dazu beim Erstellen eines Clusters die folgende Property fest:

capacity-scheduler:yarn.scheduler.capacity.resource-calculator=org.apache.hadoop.yarn.util.resource.DominantResourceCalculator

Autoscaling-Messwerte und -Logs

Mit den folgenden Ressourcen und Tools können Sie Autoscaling-Vorgänge und deren Auswirkungen auf einen Cluster und seine Jobs überwachen.

Cloud Monitoring

Mit Cloud Monitoring können Sie:

  • Von Autoscaling verwendete Messwerte anzeigen lassen
  • Anzahl der Knotenmanager in einem Cluster anzeigen lassen
  • Informationen dazu erhalten, warum Autoscaling einen Cluster skaliert oder nicht skaliert hat autoscaling-stackdriver1 autoscaling-stackdriver2 autoscaling-stackdriver3

Cloud Logging

Verwenden Sie Cloud Logging, um Logs von Cloud Dataproc Autoscaling aufzurufen.

1) Suchen Sie Logs für Ihren Cluster.

autoscaling-logs-for-cluster

2) Wählen Sie dataproc.googleapis.com/autoscaler.

autoscaling-log-file

3) Maximieren Sie die Logeinträge, um das Feld status anzuzeigen. Die Logs sind im maschinenlesbaren Format JSON.

autoscaling-three-logs autoscaling-update-operation

4) Erweitern Sie den Logeintrag, um Skalierungsempfehlungen, für Skalierungsentscheidungen verwendete Messwerte, die ursprüngliche Clustergröße und die neue Zielclustergröße zu sehen.

autoscaling-recommendation-message

Hintergrund: Autoscaling mit Apache Hadoop und Apache Spark

In den folgenden Abschnitten wird erläutert, wie Autoscaling mit Hadoop YARN und Hadoop Mapreduce sowie mit Apache Spark, Spark Streaming und Spark Structured Streaming interagiert.

Hadoop YARN-Messwerte

Das Autoscaling basiert auf den folgenden Hadoop YARN-Messwerten:

  1. Allocated resource bezieht sich auf die Gesamtzahl der YARN-Ressourcen, die von den Containern belegt werden, die im gesamten Cluster ausgeführt werden. Wenn 6 Container ausgeführt werden, die bis zu 1 Ressourceneinheit verwenden können, sind 6 Ressourcen zugewiesen.

  2. Available resource ist die YARN-Ressource im Cluster, die nicht von zugewiesenen Containern verwendet wird. Wenn für alle Knotenmanager 10 Ressourceneinheiten zur Verfügung stehen und 6 von ihnen zugewiesen sind, sind 4 Ressourcen verfügbar. Wenn im Cluster verfügbare (nicht verwendete) Ressourcen vorhanden sind, kann Autoscaling Worker aus dem Cluster entfernen.

  3. Pending resource ist die Summe der YARN-Ressourcenanforderungen für ausstehende Container. Ausstehende Container warten darauf, dass in YARN Speicherplatz frei wird. Ausstehende Ressourcen sind nur dann ungleich null, wenn die verfügbare Ressource null oder zu klein ist, um dem nächsten Container zugewiesen zu werden. Wenn ausstehende Container vorhanden sind, fügt Autoscaling dem Cluster gegebenenfalls Worker hinzu.

Sie können diese Messwerte in Cloud Monitoring aufrufen. Standardmäßig beträgt der YARN-Speicher 0,8 * Gesamtspeicher auf dem Cluster. Der verbleibende Speicher ist für andere Daemons und die Betriebssystemnutzung reserviert, z. B. den Seitencache. Sie können den Standardwert mit der YARN-Konfigurationseinstellung "yarn.nodemanager.resource.memory-mb" überschreiben (siehe Apache Hadoop YARN, HDFS, Spark und zugehörige Attribute).

Autoscaling und Hadoop MapReduce

MapReduce führt alle MapReduce-Aufgaben als separaten YARN-Container aus. Wenn ein Job beginnt, sendet MapReduce eine Containeranfrage für jede Zuordnungsaufgabe, was zu einem starken Anstieg des ausstehenden YARN-Speichers führt. Wenn die Zuordnungsaufgaben abgeschlossen wurden, sinkt der Wert für den ausstehenden Speicher.

Wenn mapreduce.job.reduce.slowstart.completedmaps abgeschlossen wurde (ab einem Schwellwert von 95 % gilt dies in Dataproc als erfüllt), stellt MapReduce Containeranfragen für alle Reduzierungen in die Warteschlange, was zu einem weiteren Anstieg des ausstehenden Arbeitsspeichers führt.

Legen Sie keinen hohen Wert für den Autoscaling-scaleUpFactor fest, es sei denn, die Zuordnung und das Reduzieren von Aufgaben dauern mehrere Minuten oder länger. Das Hinzufügen von Workern zum Cluster dauert mindestens 1,5 Minuten. Achten Sie daher darauf, dass genügend ausstehende Arbeitslast vorhanden ist, um neue Worker für einige Minuten zu verwenden. Ein guter Ausgangspunkt ist, für scaleUpFactor einen Wert von 0,05 (5 %) oder 0,1 (10 %) des ausstehenden Speichers festzulegen.

Autoscaling und Spark

Spark fügt eine zusätzliche Ebene der Planung neben YARN hinzu. Genauer gesagt stellt die dynamische Zuordnung von Spark Core Anfragen an YARN nach Containern, die Spark-Executors ausführen, und plant dann Spark-Aufgaben für Threads auf diesen Executors. Dataproc-Cluster ermöglichen standardmäßig die dynamische Zuordnung, sodass Executors nach Bedarf hinzugefügt und entfernt werden können.

Spark fragt Container immer bei YARN an, aber ohne dynamische Zuordnung fragt es Container nur zu Beginn des Jobs an. Bei der dynamischen Zuordnung werden Container entfernt oder bei Bedarf neue Container angefragt.

Spark beginnt mit einer kleinen Anzahl von Executors – 2 in Autoscaling-Clustern – und verdoppelt die Anzahl der Executors, wenn Aufgaben im Rückstand sind. Dadurch wird ein sprunghafter Anstieg bzw. Abfall bei ausstehendem Speicher seltener. Es wird empfohlen, den Autoscaling-scaleUpFactor für Spark-Jobs auf eine große Zahl zu setzen, z. B. 1,0 (100 %).

Dynamische Zuordnung von Spark deaktivieren

Wenn Sie separate Spark-Jobs ausführen, die nicht von der dynamischen Zuordnung von Spark profitieren, können Sie die dynamische Zuordnung von Spark deaktivieren, indem Sie spark.dynamicAllocation.enabled=false und spark.executor.instances angeben. Sie können Autoscaling weiterhin verwenden, um Cluster hoch oder herunter zu skalieren, während die einzelnen Spark-Jobs ausgeführt werden.

Spark-Jobs mit im Cache gespeicherten Daten

Legen Sie spark.dynamicAllocation.cachedExecutorIdleTimeout fest oder löschen Sie den Cache von Datasets, wenn sie nicht mehr benötigt werden. Standardmäßig entfernt Spark keine Executors mit im Cache gespeicherten Daten, die das Herunterskalieren des Clusters verhindern würden.

Autoscaling und Spark-Streaming

  1. Spark Streaming hat eine eigene Version der dynamischen Zuordnung, die Streaming-spezifische Signale zum Hinzufügen und Entfernen von Executors verwendet. Um Spark-Streaming zu verwenden, geben Sie spark.streaming.dynamicAllocation.enabled=true an und deaktivieren dann die dynamische Zuweisung von Spark Core mit spark.dynamicAllocation.enabled=false.

  2. Verwenden Sie die ordnungsgemäße Außerbetriebnahme (Autoscaling gracefulDecommissionTimeout) nicht mit Spark Streaming-Jobs. Um Worker mit Autoscaling sicher zu entfernen, konfigurieren Sie stattdessen Checkpointing für Fehlertoleranz.

Alternativ können Sie Spark Streaming ohne Autoscaling verwenden:

  1. Deaktivieren Sie die Spark Core-dynamische Zuordnung (spark.dynamicAllocation.enabled=false) und
  2. Legen Sie die Anzahl der Executors (spark.executor.instances) für Ihren Job fest. Informationen dazu finden Sie unter Clusterattribute:

Autoscaling und Spark Structured Streaming

Autoscaling ist nicht mit Spark Structured Streaming kompatibel, da Spark Structured Streaming derzeit keine dynamische Zuordnung unterstützt. Informationen dazu finden Sie unter SPARK-24815: Structured Streaming sollte die dynamische Zuordnung unterstützen.

Autoscaling durch Partitionierung und Parallelität steuern

Parallelität wird normalerweise durch Clusterressourcen festgelegt oder bestimmt, z. B. steuert die Anzahl der HDFS-Blöcke die Anzahl der Aufgaben. Beim Autoscaling ist jedoch das Gegenteil der Fall: Autoscaling legt die Clusterressourcen (Worker) anhand der Job-Parallelität fest. Diese Richtlinien können dabei helfen, die Job-Parallelität festzulegen:

  • Während Dataproc die Standardanzahl der MapReduce-Reduce-Aufgaben basierend auf der anfänglichen Clustergröße Ihres Clusters festlegt, können Sie mapreduce.job.reduces festlegen, um die Parallelität der Reduce-Phase zu erhöhen.
  • Spark SQL und DataFrame-Parallelität werden durch spark.sql.shuffle.partitions bestimmt, die standardmäßig auf 200 gesetzt sind.
  • Die RDD-Funktionen von Spark sind standardmäßig auf spark.default.parallelism gesetzt. Dieser Wert wird beim Start des Jobs auf die Anzahl der Kerne auf den Worker-Knoten festgelegt. Allerdings übernehmen alle RDD-Funktionen, die Shuffles erstellen, einen Parameter für die Anzahl der Partitionen, der spark.default.parallelism überschreibt.

Achten Sie darauf, dass Ihre Daten gleichmäßig partitioniert sind. Wenn ein signifikanter Schlüsselversatz vorliegt, können eine oder mehrere Aufgaben erheblich länger dauern als andere Aufgaben, was zu einer geringen Auslastung führt.

Autoscaling-Standardeinstellungen für Spark/Hadoop-Attribute

Autoscaling-Cluster haben Standardwerte für Clusterattribute, die dazu beitragen, Jobfehler zu vermeiden, wenn primäre Worker entfernt oder sekundäre Worker unterbrochen werden. Sie können diese Standardwerte überschreiben, wenn Sie einen Cluster mit Autoscaling erstellen. Informationen dazu finden Sie unter Clusterattribute.

Standardwerte für eine Erhöhung der maximalen Anzahl von Wiederholungen für Aufgaben, Application Master und Phasen:

yarn:yarn.resourcemanager.am.max-attempts=10
mapred:mapreduce.map.maxattempts=10
mapred:mapreduce.reduce.maxattempts=10
spark:spark.task.maxFailures=10
spark:spark.stage.maxConsecutiveAttempts=10

Standardwerte, um Wiederholungszähler zurückzusetzen. Dies kann bei lang andauernden Spark Streaming-Jobs hilfreich sein:

spark:spark.yarn.am.attemptFailuresValidityInterval=1h
spark:spark.yarn.executor.failuresValidityInterval=1h

Standardwert, damit der Mechanismus der dynamischen Zuordnung für Slow-Start von Spark mit einer geringen Größe beginnt:

spark:spark.executor.instances=2

Häufig gestellte Fragen (FAQ)

Kann Autoscaling in Hochverfügbarkeitsclustern und Single-Node-Clustern aktiviert werden?

Autoscaling kann in Hochverfügbarkeitsclustern aktiviert werden, nicht jedoch in Clustern mit einzelnem Knoten, da diese keine Skalierung unterstützen.

Kann man Größe eines Autoscaling-Clusters manuell ändern?

Ja. Möglicherweise entscheiden Sie sich dafür, die Größe eines Clusters während der Feinabstimmung einer Autoscaling-Richtlinie manuell als Stopp festzulegen. Diese Änderungen sind jedoch nur vorübergehend wirksam, das Autoscaling übernimmt die Clusterskalierung früher oder später wieder.

Statt den Cluster manuell zu skalieren, gibt es diese Möglichkeiten:

Aktualisieren der Autoscaling-Richtlinie Änderungen an der Autoscaling-Richtlinie wirken sich auf alle Cluster aus, die die Richtlinie derzeit verwenden (siehe Multi-Cluster-Richtlinienverwendung).

Richtlinie trennen und Cluster manuell auf die bevorzugte Größe skalieren.

Dataproc-Support nutzen.

Wie unterscheidet sich Dataproc vom Dataflow-Autoscaling?

Weitere Informationen finden Sie unter Horizontales Autoscaling in Dataflow und Vertikales Autoscaling in Dataflow Prime.

Kann das Entwicklungsteam von Dataproc den Status eines Clusters von ERROR auf RUNNING zurücksetzen?

Im Allgemeinen nicht. Es erfordert manuelle Bemühungen, um zu testen, ob es sicher ist, den Status des Clusters einfach zurückzusetzen. Außerdem kann ein Cluster nicht ohne andere manuelle Schritte zurückgesetzt werden, z. B. Neustart des HDFS-NameNode.

Dataproc setzt den Status eines Clusters auf ERROR, wenn er den Status eines Clusters nach einem fehlgeschlagenen Vorgang nicht ermitteln kann. Cluster in ERROR werden nicht automatisch skaliert und es werden keine Jobs ausgeführt. Das kann folgende Gründe haben:

  1. Fehler, die von der Compute Engine API zurückgegeben werden, oft während eines Compute Engine-Ausfalls.

  2. HDFS kann aufgrund von Programmfehlern bei der Außerbetriebnahme von HDFS beschädigt werden.

  3. Fehler bei der Dataproc Control API, z. B. „Aufgaben freigegeben abgelaufen“

Löschen und erstellen Sie Cluster mit dem Status ERROR neu.

Wann bricht das Autoscaling einen Vorgang zum Herunterskalieren ab?

Die folgende Abbildung zeigt, wann das Autoscaling einen Downscaling-Vorgang abbricht. Weitere Informationen finden Sie unter Funktionsweise des Autoscalings.

dataproc-autoscaling-cancellation-example

Hinweise:

  • Für den Cluster ist das Autoscaling nur basierend auf YARN-Speichermesswerten aktiviert (Standardeinstellung).
  • T1–T9 stehen für Ruhezeitintervalle, in denen der Autoscaler die Anzahl der Worker bewertet. Die Ereigniszeit wurde vereinfacht.
  • Gestapelte Balken stellen die Anzahl der aktiven, in der Deaktivierung befindlichen und außer Betrieb genommenen YARN-Cluster-Worker dar.
  • Die vom Autoscaler empfohlene Anzahl von Workern (schwarze Linie) basiert auf YARN-Speichermesswerten, der Anzahl der aktiven YARN-Worker und den Einstellungen der Autoscaling-Richtlinie (siehe Funktionsweise des Autoscalings).
  • Der rote Hintergrundbereich gibt den Zeitraum an, in dem der Vorgang zum Herunterskalieren ausgeführt wird.
  • Der gelbe Hintergrundbereich gibt den Zeitraum an, in dem der Vorgang zum Herunterskalieren abgebrochen wird.
  • Der grüne Hintergrundbereich gibt den Zeitraum des Skalierungsvorgangs an.

Die folgenden Vorgänge werden zu den folgenden Zeiten ausgeführt:

  • T1: Der Autoscaler initiiert eine ordnungsgemäße Außerbetriebnahme, um etwa die Hälfte der aktuellen Cluster-Worker zu reduzieren.

  • T2: Der Autoscaler überwacht weiterhin die Clustermesswerte. Die Empfehlung zum Skalieren ändert sich nicht und der Vorgang wird fortgesetzt. Einige Worker wurden bereits außer Betrieb genommen, andere werden demnächst außer Betrieb genommen. Außer Betrieb genommene Worker werden von Dataproc gelöscht.

  • T3: Der Autoscaler berechnet, dass die Anzahl der Worker weiter reduziert werden kann, möglicherweise weil zusätzlicher YARN-Speicher verfügbar wird. Da die Anzahl der aktiven Worker plus die empfohlene Änderung der Anzahl der Worker jedoch nicht gleich oder größer als die Anzahl der aktiven Worker plus der außer Betrieb genommenen Worker ist, werden die Kriterien für die Abbruch des Herunterskalierens nicht erfüllt und der Autoscaler bricht den Vorgang nicht ab.

  • T4: YARN meldet eine Erhöhung des ausstehenden Arbeitsspeichers. Die Empfehlung für die Anzahl der Worker ändert sich jedoch nicht. Wie in T3 werden die Kriterien für die Stornierung des Herunterskalierens nicht erfüllt und der Autoscaler storniert den Vorgang nicht.

  • T5: Der ausstehende YARN-Speicher erhöht sich und die vom Autoscaler empfohlene Änderung der Anzahl der Worker wird größer. Da die Anzahl der aktiven Worker jedoch zusammen mit der empfohlenen Änderung der Anzahl der Worker kleiner ist als die Anzahl der aktiven Worker und der außer Betrieb genommenen Worker, werden die Kriterien für die Stornierung nicht erfüllt und der Vorgang wird nicht abgebrochen.

  • T6: Der ausstehende YARN-Speicher erhöht sich weiter. Die Anzahl der aktiven Worker plus die vom Autoscaler empfohlene Änderung der Anzahl der Worker ist jetzt größer als die Anzahl der aktiven Worker plus der Worker, die außer Betrieb genommen werden. Die Abbruchkriterien sind erfüllt und der Autoscaler bricht den Vorgang zum Herunterskalieren ab.

  • T7: Der Autoscaler wartet auf den Abschluss der Abbruchanfrage für den Vorgang zum Verringern der Anzahl der Instanzen. In diesem Intervall prüft und empfiehlt der Autoscaler keine Änderung der Anzahl der Worker.

  • T8: Die Stornierung des Downscaling-Vorgangs ist abgeschlossen. Ausmusterte Worker werden dem Cluster hinzugefügt und werden aktiv. Der Autoscaler erkennt den Abschluss der Abbruchsanfrage für den Skalierungsvorgang und wartet auf den nächsten Bewertungszeitraum (T9), um die empfohlene Anzahl von Workern zu berechnen.

  • T9: Zu T9 sind keine aktiven Vorgänge vorhanden. Basierend auf der Autoscaling-Richtlinie und den YARN-Messwerten empfiehlt der Autoscaler eine Hochskalierung.