Autoscaling-Cluster

Was ist Autoscaling?

Das Schätzen der "richtigen" Anzahl von Cluster-Workern (Knoten) für eine Arbeitslast ist schwierig und eine einzelne Clustergröße für eine gesamte Pipeline ist oft nicht ideal. Vom Nutzer initiierte Cluster-Skalierung wird diese Herausforderung teilweise gelöst, erfordert jedoch die Überwachung der Clusternutzung und ein manuelles Eingreifen.

Die Dataproc AutoscalingPolicies API bietet einen Mechanismus zur Automatisierung der Clusterressourcenverwaltung und ermöglicht das Autoscaling von Cluster-Worker-VMs. Eine Autoscaling Policy ist eine wiederverwendbare Konfiguration, die beschreibt, wie Cluster-Worker mit der Autoscaling-Richtlinie skalieren sollen. Sie definiert Skalierungsgrenzen, Häufigkeit und Aggressivität, um eine detaillierte Kontrolle über Clusterressourcen während der gesamten Clusterlebensdauer zu ermöglichen.

Einsatzmöglichkeiten für Autoscaling

Verwenden Sie Autoscaling:

bei Clustern, die Daten in externen Diensten speichern, wie z. B. Cloud Storage oder BigQuery

bei Clustern, die viele Jobs verarbeiten

um einzelne Jobcluster zu skalieren

mit dem Enhanced Flexibility Mode für Spark-Batch-Jobs

Autoscaling wird nicht empfohlen bei/für:

  • HDFS: Autoscaling ist nicht für die Skalierung von HDFS im Cluster vorgesehen, weil:

    1. Die HDFS-Nutzung ist kein Signal für das Autoscaling.
    2. HDFS-Daten werden nur auf primären Workern gehostet. Die Anzahl der primären Worker muss ausreichen, um alle HDFS-Daten zu hosten.
    3. Durch die Außerbetriebnahme von HDFS-DataNodes kann das Entfernen von Workern verzögert werden. Datenknoten kopieren HDFS-Blöcke in andere Datenknoten, bevor ein Worker entfernt wird. Je nach Datengröße und Replikationsfaktor kann dieser Vorgang Stunden dauern.
  • YARN-Knotenlabels:Autoscaling unterstützt keine YARN-Knotenlabels und das Attribut dataproc:am.primary_only aufgrund von YARN-9088. YARN meldet Clustermesswerte nicht, wenn Knotenlabels verwendet werden.

  • Spark Structured Streaming: Autoscaling unterstützt kein Spark Structured Streaming. Informationen dazu finden Sie unter Autoscaling und Spark Structured Streaming.

  • Inaktive Cluster: Autoscaling eignet sich nicht zum Herunterskalieren eines Clusters auf die Mindestgröße, wenn der Cluster inaktiv ist. Da das Erstellen eines neuen Clusters genauso schnell geht wie das Ändern seiner Größe, ist es sinnvoller, inaktive Cluster stattdessen zu löschen und neu zu erstellen. Folgende Tools unterstützen dieses "ephemerische" Modell:

    Verwenden Sie Dataproc-Workflows, um einen Satz von Jobs in einem dedizierten Cluster zu planen. Löschen Sie den Cluster, wenn die Jobs abgeschlossen sind. Verwenden Sie für eine bessere Orchestrierung Cloud Composer, einen Dienst, der auf Apache Airflow basiert.

    Verwenden Sie für Cluster, die Ad-hoc-Abfragen oder extern geplante Arbeitslasten verarbeiten, Cluster Scheduled Deletion, um den Cluster nach einer bestimmten Dauer oder Zeit der Inaktivität bzw. zu einem bestimmten Zeitpunkt zu löschen.

Autoscaling aktivieren

So aktivieren Sie Autoscaling in einem Cluster:

  1. Erstellen Sie eine Autoscaling-Richtlinie.

  2. Entweder:

    1. Autoscaling-Cluster erstellen oder
    2. Aktivieren Sie Autoscaling für einen vorhandenen Cluster

Autoscaling-Richtlinie erstellen

gcloud-Befehl

Sie können eine Autoscaling-Richtlinie mit dem Befehl gcloud dataproc autoscaling-policies import erstellen. Sie liest eine lokale YAML-Datei, die eine Autoscaling-Richtlinie definiert. Das Format und der Inhalt der Datei müssen den Konfigurationsobjekten und Feldern entsprechen, die von der REST API autoscalingPolicies definiert sind.

Im folgenden YAML-Beispiel wird eine Richtlinie definiert, die alle erforderlichen Felder angibt. Außerdem werden die Werte minInstances und maxInstances für die primären Worker und der Wert maxInstances für die sekundären (abrufbaren) Worker angegeben und eine 4-minütige cooldownPeriod festgelegt (der Standardwert ist 2 Minuten). workerConfig konfiguriert die primären Worker. In diesem Beispiel werden minInstances und maxInstances auf denselben Wert gesetzt, um die Skalierung der primären Worker zu vermeiden.

workerConfig:
  minInstances: 10
  maxInstances: 10
secondaryWorkerConfig:
  maxInstances: 50
basicAlgorithm:
  cooldownPeriod: 4m
  yarnConfig:
    scaleUpFactor: 0.05
    scaleDownFactor: 1.0
    gracefulDecommissionTimeout: 1h

Ein weiteres YAML-Beispiel, das alle optionalen und erforderlichen Autoscaling-Richtlinienfelder angibt.

workerConfig:
  minInstances: 10
  maxInstances: 10
  weight: 1
secondaryWorkerConfig:
  minInstances: 0
  maxInstances: 100
  weight: 1
basicAlgorithm:
  cooldownPeriod: 2m
  yarnConfig:
    scaleUpFactor: 0.05
    scaleDownFactor: 1.0
    scaleUpMinWorkerFraction: 0.0
    scaleDownMinWorkerFraction: 0.0
    gracefulDecommissionTimeout: 1h

Führen Sie zum Erstellen der Autoscaling-Richtlinie den folgenden gcloud-Befehl über ein lokales Terminal oder in Cloud Shell aus. Benennen Sie die Richtlinie. Dieser Name wird zur Richtlinie id, die Sie in späteren gcloud-Befehlen verwenden können, um auf die Richtlinie zu verweisen. Verwenden Sie das Flag --source, um den lokalen Dateipfad und den Dateinamen der YAML-Datei der Autoscaling-Richtlinie anzugeben, die importiert werden soll.

gcloud dataproc autoscaling-policies import policy-name \
    --source=filepath/filename.yaml \
    --region=region

REST API

Erstellen Sie eine Autoscaling-Richtlinie, indem Sie eine AutoscalingPolicy als Teil einer autoscalingPolicies.create-Anfrage definieren.

Console

Wählen Sie zum Erstellen einer Autoscaling-Richtlinie in der Google Cloud Console auf der Dataproc-Autoscaling-Richtlinie die Option RICHTLINIE ERSTELLEN aus. Auf der Seite Richtlinie erstellen können Sie ein Steuerfeld mit Richtlinienempfehlungen auswählen, um die Felder für die Autoscaling-Richtlinie für einen bestimmten Jobtyp oder ein Skalierungsziel auszufüllen.

Autoscaling-Cluster erstellen

Nachdem Sie eine Autoscaling-Richtlinie erstellt haben, erstellen Sie einen Cluster, der die Autoscaling-Richtlinie verwendet. Der Cluster muss sich in derselben Region wie die Autoscaling-Richtlinie befinden.

gcloud-Befehl

Führen Sie den folgenden gcloud-Befehl über ein lokales Terminal oder in Cloud Shell aus, um einen Autoscaling-Cluster zu erstellen. Geben Sie einen Namen für den Cluster an und verwenden Sie das Flag --autoscaling-policy, um policy id (den Namen der Richtlinie, den Sie beim Erstellen der Richtlinie angegeben haben) oder die Richtlinie resource URI (resource name) (siehe die Felder AutoscalingPolicy id und name) anzugeben.

gcloud dataproc clusters create cluster-name \
    --autoscaling-policy=policy id or resource URI \
    --region=region

REST API

Erstellen Sie einen Autoscaling-Cluster, indem Sie AutoscalingConfig als Teil einer clusters.create-Anfrage hinzufügen.

Console

Sie können eine vorhandene Autoscaling-Richtlinie auswählen, die auf einen neuen Cluster angewendet werden soll. Wählen Sie dazu im Bereich „Clusterrichtlinien einrichten“ auf der Dataproc-Seite Cluster erstellen der Google Cloud Console die Option „Autoscaling“ aus.

Autoscaling für einen vorhandenen Cluster aktivieren

Nach dem Erstellen einer Autoscaling-Richtlinie können Sie die Richtlinie für einen vorhandenen Cluster in derselben Region aktivieren.

gcloud-Befehl

Führen Sie den folgenden gcloud-Befehl von einem lokalen Terminal oder in Cloud Shell aus, um eine Autoscaling-Richtlinie für einen vorhandenen Cluster zu aktivieren. Geben Sie einen Namen für den Cluster an und verwenden Sie das Flag --autoscaling-policy, um policy id (den Namen der Richtlinie, den Sie beim Erstellen der Richtlinie angegeben haben) oder die Richtlinie resource URI (resource name) (siehe die AutoscalingPolicy-Felder id und name) anzugeben.

gcloud dataproc clusters update cluster-name \
    --autoscaling-policy=policy id or resource URI \
    --region=region

REST API

Um eine Autoscaling-Richtlinie für einen vorhandenen Cluster zu aktivieren, legen Sie den AutoscalingConfig.policyUri der Richtlinie in der updateMask einer clusters.patch-Anfrage fest.

Console

Derzeit wird das Aktivieren einer Autoscaling-Richtlinie für einen vorhandenen Cluster in der Google Cloud Console nicht unterstützt.

Multi-Cluster-Richtlinien verwenden

  • Eine Autoscaling-Richtlinie definiert das Skalierungsverhalten, das auf mehrere Cluster angewendet werden kann. Autoscaling-Richtlinien lassen sich am besten für mehrere Cluster anwenden, wenn die Cluster ähnliche Arbeitslasten gemeinsam verwenden oder Jobs mit ähnlichen Ressourcennutzungsmustern ausführen.

  • Sie können eine Richtlinie aktualisieren, die von mehreren Clustern verwendet wird. Die Aktualisierungen wirken sich sofort auf das Autoscaling-Verhalten aller Cluster aus, die die Richtlinie verwenden (siehe autoscalingPolicies.update). Wenn Sie nicht möchten, dass eine Richtlinienaktualisierung auf einen Cluster angewendet wird, der die Richtlinie verwendet, deaktivieren Sie das Autoscaling auf dem Cluster, bevor Sie die Richtlinie aktualisieren.

gcloud-Befehl

Führen Sie den folgenden gcloud-Befehl aus einem lokalen Terminal oder in Cloud Shell aus, um das Autoscaling für einen Cluster zu deaktivieren.

gcloud dataproc clusters update cluster-name --disable-autoscaling \
    --region=region

REST API

Um Autoscaling auf einem Cluster zu deaktivieren, setzen Sie AutoscalingConfig.policyUri auf den leeren String und legen Sie update_mask=config.autoscaling_config.policy_uri in einer clusters.patch-Anfrage fest.

Console

Derzeit wird das Deaktivieren des Autoscalings für einen Cluster in der Google Cloud Console nicht unterstützt.

  • Eine Richtlinie, die von einem oder mehreren Clustern verwendet wird, kann nicht gelöscht werden (siehe autoscalingPolicies.delete).

Funktionsweise von Autoscaling

Beim Autoscaling werden die Hadoop-YARN-Messwerte des Clusters während jeder "Cooldown-Zeit" überprüft, um festzustellen, ob der Cluster skaliert werden soll und, wenn ja, in welchem Umfang.

  1. Bei jeder Bewertung prüft Autoscaling den ausstehenden, verfügbaren, zugewiesenen und reservierten Speicher, über die letzten cooldown_period gemittelt, um einen genauen Wert für die erforderliche Änderung der Workerzahl zu ermitteln. Dazu verwendet es folgende Formel:
    dataproc-autoscaling-formula

    Hinweis: Geplante sekundäre Worker (sekundäre Worker, die nicht vorhanden sind, deren Erstellung aber geplant ist) sind in target worker count enthalten.

    • pending memory ist ein Signal dafür, dass der Cluster Aufgaben in die Warteschlange gestellt hat, diese aber noch nicht ausgeführt wurden. Möglicherweise muss er vertikal skaliert werden, um seine Arbeitslast besser bewältigen zu können.
    • available memory ist ein Signal dafür, dass der Cluster auf fehlerfreien Knoten zusätzliche Bandbreite hat und möglicherweise zur Ressourcenschonung horizontal skaliert werden muss.
    • Weitere Informationen zu diesen Apache Hadoop YARN-Messwerten finden Sie unter Autoscaling mit Hadoop und Spark.
  2. Bei genau der erforderlichen Änderung an der Anzahl der Worker verwendet das Autoscaling eine scaleUpFactor oder scaleDownFactor, um die tatsächliche Änderung der Anzahl der Worker zu berechnen:

    if exact Δworkers > 0:
      actual Δworkers = ROUND_UP(exact Δworkers * scaleUpFactor)
      # examples:
      # ROUND_UP(exact Δworkers=5 * scaleUpFactor=0.5) = 3
      # ROUND_UP(exact Δworkers=0.8 * scaleUpFactor=0.5) = 1
    else:
      actual Δworkers = ROUND_DOWN(exact Δworkers * scaleDownFactor)
      # examples:
      # ROUND_DOWN(exact Δworkers=-5 * scaleDownFactor=0.5) = -2
      # ROUND_DOWN(exact Δworkers=-0.8 * scaleDownFactor=0.5) = 0
      # ROUND_DOWN(exact Δworkers=-1.5 * scaleDownFactor=0.5) = 0
    
    Ein „scaleUpFactor“- oder „scaleDownFactor“-Wert von 1,0 bedeutet, dass das Autoscaling so skaliert wird, dass der ausstehende/verfügbare Arbeitsspeicher 0 ist (die perfekte Auslastung).

  3. Sobald die erforderliche Änderung an der Anzahl der Worker berechnet wurde, fungiert entweder scaleUpMinWorkerFraction und scaleDownMinWorkerFraction als Schwellenwert, um festzustellen, ob Autoscaling den Cluster skaliert. Ein niedriger Wert bedeutet, dass Autoscaling auch dann skalieren sollte, wenn Δworkers klein ist. Ein größerer Wert bedeutet, dass nur skaliert werden soll, wenn Δworkers groß ist.

    if (Δworkers >  scaleUpMinWorkerFraction* cluster size) then scale up
    oder
    if (abs(Δworkers) >  scaleDownMinWorkerFraction* cluster size) then scale down

  4. Wenn die Anzahl der zu skalierenden Worker groß genug ist, um eine Skalierung auszulösen, verwendet Autoscaling die minInstances/maxInstances-Grenzen von workerConfig und secondaryWorkerConfig und weight (Verhältnis von primären zu sekundären Workern), um zu bestimmen, wie die Anzahl der Worker auf die primären und sekundären Worker-Instanzgruppen aufgeteilt werden soll. Das Ergebnis dieser Berechnungen ist die endgültige Änderung des Clusters, die das Autoscaling in diesem Skalierungszeitraum vornimmt.

Empfehlungen für die Autoscaling-Konfiguration

Primäre Worker nicht skalieren

HDFS-Knotenknoten werden von primären Workern ausgeführt, sekundäre Worker dagegen nur Computing-Worker. Der NameNode von HDFS hat mehrere Race-Bedingungen, die dazu führen, dass HDFS in einen schadhaften Zustand versetzt wird, der zu einer Außerbetriebnahme führt. Skalieren Sie keine primären Worker, damit diese Probleme nicht auftreten. Beispiel:

workerConfig:
  minInstances: 10
  maxInstances: 10
secondaryWorkerConfig:
  minInstances: 0
  maxInstances: 100

Am Befehl zur Cluster-Erstellung müssen Sie einige Änderungen vornehmen:

  1. Legen Sie --num-workers=10 fest, um die Größe der primären Workergruppe der Autoscaling-Richtlinie anzupassen.
  2. Legen Sie --secondary-worker-type=non-preemptible fest, um sekundäre Worker als nicht abrufbar zu konfigurieren. (Es sei denn, dass VMs auf Abruf erwünscht sind).
  3. Kopieren Sie die Hardwarekonfiguration von primären Worker auf sekundäre Worker. Legen Sie beispielsweise --secondary-worker-boot-disk-size=1000GB auf --worker-boot-disk-size=1000GB fest.

Verwenden Sie den Enhanced Flexibility Mode für Spark-Batchjobs

Siehe Enhanced Flexibility Mode. Der Enhanced Flexibility Mode verwaltet Shuffle-Daten. Damit werden Verzögerungen durch den Fortschritt minimiert, die durch das Entfernen von Knoten aus einem laufenden Cluster entstehen, entweder durch Autoscaling oder vorzeitiges Beenden.

Wenn EFM aktiviert ist, muss das Zeitlimit für die ordnungsgemäße Außerbetriebnahme der Autoscaling-Richtlinie auf 0s gesetzt werden. Die Autoscaling-Richtlinie darf nur sekundäre Worker automatisch skalieren. Der Enhanced Flexibility Mode ermöglicht auch die sichere Nutzung von sekundären Workern auf Abruf (--secondary-worker-type=preemptible).

Zeitlimit für ordnungsgemäße Außerbetriebnahme auswählen

Autoscaling unterstützt die ordnungsgemäße Außerbetriebnahme von YARN, wenn Knoten aus einem Cluster entfernt werden. Die ordnungsgemäße Außerbetriebnahme ermöglicht es Anwendungen, nach dem Zufallsprinzip Teil der Daten zwischen den Phasen zu verschieben, um den Rückschrittfortschritt zu vermeiden. Das Zeitlimit für die ordnungsgemäße Außerbetriebnahme in einer Autoscaling-Richtlinie ist die Obergrenze des Zeitraums, den YARN auf das Ausführen von Anwendungen wartet (Anwendung, die beim Start der Außerbetriebnahme ausgeführt wurde), bevor Knoten entfernen werden.

Das Zeitlimit für die ordnungsgemäße Außerbetriebnahme sollte auf einen Wert gesetzt werden, der länger ist als der längste Job, den ein Cluster verarbeitet. Beispiel: 1h.

Erwägen Sie Jobs, die länger als eine Stunde dauern, zu ihren eigenen sitzungsspezifischen Clustern zu migrieren und so eine ordnungsgemäße Außerbetriebnahme zu vermeiden.

scaleUpFactor festlegen

scaleUpFactor steuert, wie intensiv das Autoscaling einen Cluster hochskaliert. Eine Zahl zwischen 0.0 und 1.0, die angibt, für welchen Prozentsatz des YARN-Speichers Knoten hinzugefügt werden müssen.

Wenn es beispielsweise 100 ausstehende Container gibt, die jeweils 512 MB anfordern, gibt es 50 GB ausstehender YARN-Speicher. Wenn „scaleUpFactor” 0.5 ist, fügt das Autoscaling genügend Knoten hinzu, um 25 GB YARN-Speicher hinzuzufügen. Wenn 0.1 hingegen gleich ist, fügt das Autoscaling genügend Knoten für 5 GB hinzu. Beachten Sie, dass diese Werte dem YARN-Speicher entsprechen, nicht dem Gesamtspeicher, der für eine VM verfügbar ist.

Ein guter Ausgangspunkt ist 0.05 für MapReduce-Jobs und Spark-Jobs mit aktivierter dynamischer Zuweisung. Verwenden Sie für Spark-Jobs mit einer festen Executor-Anzahl und Tez-Jobs 1.0.

scaleDownFactor festlegen

scaleDownFactor steuert, wie intensiv das Autoscaling einen Cluster herunterskaliert. Es handelt sich um eine Zahl zwischen 0.0 und 1.0, die angibt, für welchen Prozentsatz des verfügbaren ARNR-Speichers Knoten entfernt werden müssen.

Belassen Sie diesen Wert für die meisten Multi-Job-Cluster auf 1.0. Aufgrund einer ordnungsgemäßen Außerbetriebnahme sind die Vorgänge zum Herunterskalieren wesentlich langsamer als das vertikale Skalieren. Mit scaleDownFactor=1.0 wird eine aggressive Skalierung nach unten konfiguriert, wodurch die Anzahl der Skalierungsvorgänge reduziert wird, um die richtige Clustergröße zu erzielen.

Setzen Sie diesen Wert auf 0.0, um die Herunterskalierung des Clusters zu vermeiden, z. B. für sitzungsspezifische Cluster oder für einen Single-Job-Cluster.

scaleUpMinWorkerFraction und scaleDownMinWorkerFraction festlegen

scaleUpMinWorkerFraction und scaleDownMinWorkerFraction sind standardmäßig auf 0.0 gesetzt und stellen die Schwellenwerte dar, mit denen Autoscaling den Cluster vertikal oder herunterskaliert. scaleUpMinWorkerFraction steuert die minimale prozentuale Erhöhung der Clustergröße, die zum Senden einer Aktualisierungsanfrage erforderlich ist. Wenn das Autoscaling beispielsweise 5 Worker zu einem 100-Knotencluster hinzufügen soll, muss scaleUpMinWorkerFraction kleiner oder gleich 0,05 (5 %) sein, um das Update tatsächlich auszuführen. Wenn der Wert 0,1 beträgt, wird der Cluster beim Autoscaling nicht skaliert.

Ebenso stellt scaleDownMinWorkerFraction einen Grenzwert für den Anteil der derzeit im Cluster vorhandenen Knoten dar, die entfernt werden müssen, damit eine Herunterskalierung angefordert werden kann. Wenn scaleDownMinWorkerFraction 0,05 lautet, führt das Autoscaling in einem 100-Knoten-Cluster nur dann eine Aktualisierung durch, wenn mindestens fünf Knoten entfernt werden müssen.

Der Standardwert von 0.0 gibt keinen Grenzwert an. Grenzwerte können für große Cluster (> 100 Knoten) nützlich sein, um kleine, unnötige Skalierungsvorgänge zu vermeiden.

Cooldown-Zeit auswählen

Der Mindest- und Standardwert für cooldownPeriod beträgt zwei Minuten. Wenn in einer Richtlinie eine kürzere cooldownPeriod festgelegt ist, wirken sich Änderungen der Arbeitslast schneller auf die Clustergröße aus, aber Cluster können unnötigerweise vertikal oder horizontal skaliert werden. Es wird empfohlen, scaleUpMinWorkerFraction und scaleDownMinWorkerFraction einer Richtlinie auf einen Wert ungleich Null zu setzen, wenn eine kürzere cooldownPeriod verwendet wird. Dadurch wird sichergestellt, dass der Cluster nur dann vertikal oder horizontal wird, wenn die Änderung der Speicherauslastung für eine Clusteraktualisierung ausreicht.

Zählgrenzen und Gruppengewicht der Worker

Jede Worker-Gruppe hat minInstances und maxInstances, die ein festes Limit für die Größe jeder Gruppe konfigurieren.

Jede Gruppe hat außerdem einen Parameter namens weight, der das Zielsaldo zwischen den beiden Gruppen konfiguriert. Beachten Sie, dass dieser Parameter nur ein Hinweis ist. Wenn eine Gruppe die Mindest- oder Höchstgröße erreicht, werden Knoten nur der anderen Gruppe hinzugefügt oder daraus entfernt. Daher kann weight fast immer standardmäßig auf 1 bleiben.

Autoscaling-Messwerte und -Logs

Mit den folgenden Ressourcen und Tools können Sie Autoscaling-Vorgänge und deren Auswirkungen auf einen Cluster und seine Jobs überwachen.

Cloud Monitoring

Mit Cloud Monitoring können Sie:

  • Von Autoscaling verwendete Messwerte anzeigen lassen
  • Anzahl der Knotenmanager in einem Cluster anzeigen lassen
  • Informationen dazu erhalten, warum Autoscaling einen Cluster skaliert oder nicht skaliert hat autoscaling-stackdriver1 autoscaling-stackdriver2 autoscaling-stackdriver3

Cloud Logging

Verwenden Sie Cloud Logging, um Logs von Cloud Dataproc Autoscaling aufzurufen.

1) Suchen Sie Logs für Ihren Cluster.

autoscaling-logs-for-cluster

2) Wählen Sie dataproc.googleapis.com/autoscaler.

autoscaling-log-file

3) Maximieren Sie die Logeinträge, um das Feld status anzuzeigen. Die Logs sind im maschinenlesbaren Format JSON.

autoscaling-three-logs autoscaling-update-operation

4) Erweitern Sie den Logeintrag, um Skalierungsempfehlungen, für Skalierungsentscheidungen verwendete Messwerte, die ursprüngliche Clustergröße und die neue Zielclustergröße zu sehen.

autoscaling-recommendation-message

Hintergrund: Autoscaling mit Apache Hadoop und Apache Spark

In den folgenden Abschnitten wird erläutert, wie Autoscaling mit Hadoop YARN und Hadoop Mapreduce sowie mit Apache Spark, Spark Streaming und Spark Structured Streaming interagiert.

Hadoop YARN-Messwerte

Autoscaling konfiguriert Hadoop YARN so, dass Jobs basierend auf YARN-Speicheranforderungen und nicht auf YARN-Kernanforderungen geplant werden.

Das Autoscaling basiert auf den folgenden Hadoop YARN-Messwerten:

  1. Allocated memory bezieht sich auf den gesamten YARN-Speicher, der von den Containern belegt wird, die im gesamten Cluster ausgeführt werden. Wenn 6 Container ausgeführt werden, die bis zu 1 GB verwenden können, sind 6 GB Speicher zugewiesen.

  2. Available memory ist der YARN-Speicher im Cluster, der nicht von zugewiesenen Containern verwendet wird. Wenn für alle Knotenmanager 10 GB Arbeitsspeicher zur Verfügung stehen und 6 GB Arbeitsspeicher zugeordnet wurden, sind 4 GB Arbeitsspeicher frei. Wenn im Cluster freier (nicht verwendeter) Arbeitsspeicher vorhanden ist, kann Autoscaling Worker aus dem Cluster entfernen.

  3. Pending memory ist die Summe der YARN-Speicheranforderungen für ausstehende Container. Ausstehende Container warten darauf, dass in YARN Speicherplatz frei wird. Ausstehender Speicher ist nur dann ungleich null, wenn der verfügbare Speicher null oder zu klein ist, um dem nächsten Container zugewiesen zu werden. Wenn ausstehende Container vorhanden sind, fügt Autoscaling dem Cluster gegebenenfalls Worker hinzu.

Sie können diese Messwerte in Cloud Monitoring aufrufen. Standardmäßig beträgt der YARN-Speicher 0,8 * Gesamtspeicher auf dem Cluster. Der verbleibende Speicher ist für andere Daemons und die Betriebssystemnutzung reserviert, z. B. den Seitencache. Sie können den Standardwert mit der YARN-Konfigurationseinstellung "yarn.nodemanager.resource.memory-mb" überschreiben (siehe Apache Hadoop YARN, HDFS, Spark und zugehörige Attribute).

Autoscaling und Hadoop MapReduce

MapReduce führt alle MapReduce-Aufgaben als separaten YARN-Container aus. Wenn ein Job beginnt, sendet MapReduce eine Containeranfrage für jede Zuordnungsaufgabe, was zu einem starken Anstieg des ausstehenden YARN-Speichers führt. Wenn die Zuordnungsaufgaben abgeschlossen wurden, sinkt der Wert für den ausstehenden Speicher.

Wenn mapreduce.job.reduce.slowstart.completedmaps abgeschlossen wurde (ab einem Schwellwert von 95 % gilt dies in Dataproc als erfüllt), stellt MapReduce Containeranfragen für alle Reduzierungen in die Warteschlange, was zu einem weiteren Anstieg des ausstehenden Arbeitsspeichers führt.

Legen Sie keinen hohen Wert für den Autoscaling-scaleUpFactor fest, es sei denn, die Zuordnung und das Reduzieren von Aufgaben dauern mehrere Minuten oder länger. Das Hinzufügen von Workern zum Cluster dauert mindestens 1,5 Minuten. Achten Sie daher darauf, dass genügend ausstehende Arbeitslast vorhanden ist, um neue Worker für einige Minuten zu verwenden. Ein guter Ausgangspunkt ist, für scaleUpFactor einen Wert von 0,05 (5 %) oder 0,1 (10 %) des ausstehenden Speichers festzulegen.

Autoscaling und Spark

Spark fügt eine zusätzliche Ebene der Planung neben YARN hinzu. Genauer gesagt stellt die dynamische Zuordnung von Spark Core Anfragen an YARN nach Containern, die Spark-Executors ausführen, und plant dann Spark-Aufgaben für Threads auf diesen Executors. Dataproc-Cluster ermöglichen standardmäßig die dynamische Zuordnung, sodass Executors nach Bedarf hinzugefügt und entfernt werden können.

Spark fragt Container immer bei YARN an, aber ohne dynamische Zuordnung fragt es Container nur zu Beginn des Jobs an. Bei der dynamischen Zuordnung werden Container entfernt oder bei Bedarf neue Container angefragt.

Spark beginnt mit einer kleinen Anzahl von Executors – 2 in Autoscaling-Clustern – und verdoppelt die Anzahl der Executors, wenn Aufgaben im Rückstand sind. Dadurch wird ein sprunghafter Anstieg bzw. Abfall bei ausstehendem Speicher seltener. Es wird empfohlen, den Autoscaling-scaleUpFactor für Spark-Jobs auf eine große Zahl zu setzen, z. B. 1,0 (100 %).

Dynamische Zuordnung von Spark deaktivieren

Wenn Sie separate Spark-Jobs ausführen, die nicht von der dynamischen Zuordnung von Spark profitieren, können Sie die dynamische Zuordnung von Spark deaktivieren, indem Sie spark.dynamicAllocation.enabled=false und spark.executor.instances angeben. Sie können Autoscaling weiterhin verwenden, um Cluster hoch oder herunter zu skalieren, während die einzelnen Spark-Jobs ausgeführt werden.

Spark-Jobs mit im Cache gespeicherten Daten

Legen Sie spark.dynamicAllocation.cachedExecutorIdleTimeout fest oder löschen Sie den Cache von Datasets, wenn sie nicht mehr benötigt werden. Standardmäßig entfernt Spark keine Executors mit im Cache gespeicherten Daten, die das Herunterskalieren des Clusters verhindern würden.

Autoscaling und Spark-Streaming

  1. Spark-Streaming hat eine eigene Version der dynamischen Zuordnung, die Streaming-spezifische Signale zum Hinzufügen und Entfernen von Executors verwendet. Um Spark-Streaming zu verwenden, geben Sie spark.streaming.dynamicAllocation.enabled=true an und deaktivieren dann die dynamische Zuweisung von Spark Core mit spark.dynamicAllocation.enabled=false.

  2. Verwenden Sie die ordnungsgemäße Außerbetriebnahme (Autoscaling gracefulDecommissionTimeout) nicht mit Spark Streaming-Jobs. Um Worker mit Autoscaling sicher zu entfernen, konfigurieren Sie stattdessen Checkpointing für Fehlertoleranz.

Alternativ können Sie Spark Streaming ohne Autoscaling verwenden:

  1. Deaktivieren Sie die Spark Core-dynamische Zuordnung (spark.dynamicAllocation.enabled=false) und
  2. Legen Sie die Anzahl der Executors (spark.executor.instances) für Ihren Job fest. Informationen dazu finden Sie unter Clusterattribute:

Autoscaling und Spark Structured Streaming

Autoscaling ist nicht mit Spark Structured Streaming kompatibel, da Spark Structured Streaming derzeit keine dynamische Zuordnung unterstützt. Informationen dazu finden Sie unter SPARK-24815: Structured Streaming sollte die dynamische Zuordnung unterstützen.

Autoscaling durch Partitionierung und Parallelität steuern

Parallelität wird normalerweise durch Clusterressourcen festgelegt oder bestimmt, z. B. steuert die Anzahl der HDFS-Blöcke die Anzahl der Aufgaben. Beim Autoscaling ist jedoch das Gegenteil der Fall: Autoscaling legt die Clusterressourcen (Worker) anhand der Job-Parallelität fest. Diese Richtlinien können dabei helfen, die Job-Parallelität festzulegen:

  • Während Dataproc die Standardanzahl der MapReduce-Reduce-Aufgaben basierend auf der anfänglichen Clustergröße Ihres Clusters festlegt, können Sie mapreduce.job.reduces festlegen, um die Parallelität der Reduce-Phase zu erhöhen.
  • Spark SQL und DataFrame-Parallelität werden durch spark.sql.shuffle.partitions bestimmt, die standardmäßig auf 200 gesetzt sind.
  • Die RDD-Funktionen von Spark sind standardmäßig auf spark.default.parallelism gesetzt. Dieser Wert wird beim Start des Jobs auf die Anzahl der Kerne auf den Worker-Knoten festgelegt. Allerdings übernehmen alle RDD-Funktionen, die Shuffles erstellen, einen Parameter für die Anzahl der Partitionen, der spark.default.parallelism überschreibt.

Achten Sie darauf, dass Ihre Daten gleichmäßig partitioniert sind. Wenn ein signifikanter Schlüsselversatz vorliegt, können eine oder mehrere Aufgaben erheblich länger dauern als andere Aufgaben, was zu einer geringen Auslastung führt.

Autoscaling-Standardeinstellungen für Spark/Hadoop-Attribute

Autoscaling-Cluster haben Standardwerte für Clusterattribute, die dazu beitragen, Jobfehler zu vermeiden, wenn primäre Worker entfernt oder sekundäre Worker unterbrochen werden. Sie können diese Standardwerte überschreiben, wenn Sie einen Cluster mit Autoscaling erstellen. Informationen dazu finden Sie unter Clusterattribute.

Standardeinstellungen zur Erhöhung der maximalen Anzahl von Wiederholungen für Aufgaben, Anwendungsmaster und Phasen:

yarn:yarn.resourcemanager.am.max-attempts=10
mapred:mapreduce.map.maxattempts=10
mapred:mapreduce.reduce.maxattempts=10
spark:spark.task.maxFailures=10
spark:spark.stage.maxConsecutiveAttempts=10

Standardwerte, um Wiederholungszähler zurückzusetzen. Dies kann bei lang andauernden Spark Streaming-Jobs hilfreich sein:

spark:spark.yarn.am.attemptFailuresValidityInterval=1h
spark:spark.yarn.executor.failuresValidityInterval=1h

Standardeinstellung ist der Mechanismus der dynamischen Zuordnung von Spark, der mit einer kleinen Größe beginnt:

spark:spark.executor.instances=2

Häufig gestellte Fragen (FAQ)

Kann Autoscaling in Hochverfügbarkeitsclustern und Single-Node-Clustern aktiviert werden?

Autoscaling kann in Hochverfügbarkeitsclustern aktiviert werden, nicht jedoch in Clustern mit einzelnem Knoten, da diese keine Skalierung unterstützen.

Kann man Größe eines Autoscaling-Clusters manuell ändern?

Ja. Möglicherweise entscheiden Sie sich dafür, die Größe eines Clusters während der Feinabstimmung einer Autoscaling-Richtlinie manuell als Stopp festzulegen. Diese Änderungen sind jedoch nur vorübergehend wirksam, das Autoscaling übernimmt die Clusterskalierung früher oder später wieder.

Statt den Cluster manuell zu skalieren, gibt es diese Möglichkeiten:

Aktualisieren der Autoscaling-Richtlinie Änderungen an der Autoscaling-Richtlinie wirken sich auf alle Cluster aus, die die Richtlinie derzeit verwenden (siehe Multi-Cluster-Richtlinienverwendung).

Richtlinie trennen und Cluster manuell auf die bevorzugte Größe skalieren.

Dataproc-Support nutzen.

Wie unterscheidet sich Dataproc vom Dataflow-Autoscaling?

Siehe Cloud Dataflow-Autoscaling im Vergleich zu Spark und Hadoop

Kann das Entwicklungsteam von Dataproc den Status eines Clusters von ERROR auf RUNNING zurücksetzen?

Im Allgemeinen nicht. Es erfordert manuelle Bemühungen, um zu testen, ob es sicher ist, den Status des Clusters einfach zurückzusetzen. Außerdem kann ein Cluster nicht ohne andere manuelle Schritte zurückgesetzt werden, z. B. Neustart des HDFS-NameNode.

Dataproc setzt den Status eines Clusters auf ERROR, wenn er den Status eines Clusters nach einem fehlgeschlagenen Vorgang nicht ermitteln kann. Cluster in ERROR werden nicht mehr automatisch skaliert oder Jobs ausgeführt. Das kann folgende Gründe haben:

  1. Fehler, die von der Compute Engine API zurückgegeben werden, oft während eines Compute Engine-Ausfalls.

  2. HDFS kann aufgrund von Programmfehlern bei der Außerbetriebnahme von HDFS beschädigt werden.

  3. Fehler bei der Dataproc Control API, z. B. „Aufgaben freigegeben abgelaufen“

Wir arbeiten daran, die Ausfallsicherheit von Dataproc für diese Probleme zu verbessern.

Löschen und erstellen Sie bis dahin Cluster mit dem Status ERROR neu.