Dataproc Enhanced Flexibility Mode

Enhanced Flexibility Mode (EFM) verwaltet Shuffle-Daten. Dadurch werden Verzögerungen beim Fortschritt des Jobs minimiert, die durch das Entfernen von Knoten aus einem ausgeführten Cluster verursacht werden. EFM überträgt Shuffle-Daten in einem von zwei vom Nutzer auswählbaren Modus:

  1. Shuffle für primären Worker. Mapper schreiben Daten an primäre Worker. Worker rufen diese Remoteknoten während der reduzierten Phase ab. Dieser Modus ist nur für Spark-Jobs verfügbar und wird auch empfohlen.

  2. HCFS-Shuffle (Hadoop Compatible File System). Mapper schreiben Daten in eine HCFS-Implementierung (standardmäßig HDFS). Wie beim primären Worker-Modus nehmen nur primäre Worker an HDFS- und HCFS-Implementierungen teil (wenn HCFS-Shuffle den Cloud Storage-Connector verwendet, werden Daten außerhalb des Clusters gespeichert). Dieser Modus kann Jobs mit kleinen Datenmengen nutzen. Aufgrund von Skalierungseinschränkungen wird dieser Modus jedoch nicht für größere Jobs empfohlen.

Da beide EFM-Modi keine Zwischen-Shuffle-Daten auf sekundären Workern speichern, EFM eignet sich gut für Cluster, die VMs auf Abruf oder skalieren Sie nur den sekundäre Worker-Gruppe.

Beschränkungen:

  • Apache Hadoop YARN-Jobs, die die AppMaster-Verschiebung nicht unterstützen, können im Enhanced Flexibility Mode fehlschlagen. Weitere Informationen finden Sie unter Wann auf den Abschluss von AppMastern gewartet werden muss.
  • Der Enhanced Flexibility Mode wird nicht empfohlen:
    • in einem Cluster mit nur primären Workern
    • bei Streamingjobs, da es nach Abschluss des Jobs bis zu 30 Minuten dauern kann um Zwischen-Shuffle-Daten zu bereinigen.
  • Der Enhanced Flexibility Mode wird nicht unterstützt:
    • wenn das primäre Worker-Autoscaling aktiviert ist. In den meisten Fällen speichern primäre Worker weiterhin Shuffle-Daten, die nicht automatisch migriert werden. Durch die Herunterskalierung der primären Worker-Gruppe werden ESM-Vorteile ausgeschlossen.
    • Die Ausführung von Spark-Jobs in einem Cluster mit aktivierter ordnungsgemäßer Außerbetriebnahme Die ordnungsgemäße Außerbetriebnahme und EFM können übergreifend funktionieren, da die Der YARN-Mechanismus zur ordnungsgemäßen Außerbetriebnahme sorgt dafür, dass die Knoten für die Außerbetriebnahme beibehalten werden. bis alle Bewerber abgeschlossen sind.

Enhanced Flexibility Mode verwenden

Der Enhanced Flexibility Mode wird pro Ausführungs-Engine konfiguriert und muss während der Clustererstellung konfiguriert werden.

  • Die Spark-ECM-Implementierung wird mit dem Clusterattribut dataproc:efm.spark.shuffle konfiguriert. Gültige Attributwerte:

    • primary-worker für den primären Worker-Shuffle (empfohlen)
    • hcfs für einen HCFS-basierten Zufallsmix Dieser Modus wurde eingestellt und nur auf Clustern verfügbar, auf denen die Image-Version 1.5 ausgeführt wird. Nicht empfohlen für neue Workflows.
  • Die Hadoop MapReduce-Implementierung ist mit dem Clusterattribut dataproc:efm.mapreduce.shuffle konfiguriert. Gültige Attributwerte:

    • hcfs

Beispiel: Erstellen Sie einen Cluster mit primärer Worker-Shuffle für Spark und HCFS-Shuffle für MapReduce:

gcloud dataproc clusters create cluster-name \
    --region=region \
    --properties=dataproc:efm.spark.shuffle=primary-worker \
    --properties=dataproc:efm.mapreduce.shuffle=hcfs \
    --worker-machine-type=n1-highmem-8 \
    --num-workers=25 \
    --num-worker-local-ssds=2 \
    --secondary-worker-type=preemptible \
    --secondary-worker-boot-disk-size=500GB \
    --num-secondary-workers=25

Beispiel für Apache Spark

  1. Führen Sie einen WordCount-Job für öffentlichen Shakespeare-Text mithilfe der Spark-Beispiel-JAR-Datei auf dem EFM-Cluster aus.
    gcloud dataproc jobs submit spark \
        --cluster=cluster-name \
        --region=region \
        --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
        --class=org.apache.spark.examples.JavaWordCount \
        -- gs://apache-beam-samples/shakespeare/macbeth.txt
    

Beispiel für Apache Hadoop MapReduce

  1. Führen Sie einen kleinen TeraG-Job aus, um Eingabedaten in Cloud Storage für einen späteren TeraSort-Job unter Verwendung der JAR-Datei mapreduce im EFM-Cluster zu generieren.

    gcloud dataproc jobs submit hadoop \
        --cluster=cluster-name \
        --region=region \
        --jar=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \
        -- teragen 1000 Cloud Storage output URI (for example, gs://terasort/input)
    

  2. Führen Sie einen TeraSort-Job für die Daten aus.

    gcloud dataproc jobs submit hadoop \
        --cluster=cluster-name \
        --region=region \
        --jar=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \
        -- terasort gs://terasort/input gs://terasort/output
    

Lokale SSDs für primären Worker-Shuffle konfigurieren

Die primären Worker- und HDFS-Shuffle-Implementierungen schreiben Zwischen-Shuffle-Daten auf VM-Anhängen mit zusätzlichem Durchsatz und zusätzlichen IOPS, die von lokalen SSDs angeboten werden. Damit die Ressourcenzuweisung vereinfacht wird, sollten Sie beim Konfigurieren von primären Worker-Maschinen ein Ziel von etwa einer lokalen SSD-Partition pro 4 vCPUs festlegen.

Um lokale SSDs hinzuzufügen, übergeben Sie das Flag --num-worker-local-ssds an den gcloud Dataproc cluster create-Befehl.

Im Allgemeinen benötigen Sie keine lokalen SSDs auf sekundären Workern. Lokale SSDs zu den sekundären Workern eines Clusters hinzufügen (mithilfe der --num-secondary-worker-local-ssds Flag) ist häufig von geringerer Bedeutung, da sekundäre Worker Shuffle-Daten nicht lokal schreiben. Da lokale SSDs die Leistung lokaler Laufwerke verbessern, können Sie lokale SSDs zu sekundären Workern hinzufügen, Jobs für die E/A-Bindung Aufgrund der lokalen Laufwerksnutzung: Ihr Job benötigt eine erhebliche Anzahl von lokalen Laufwerken für oder Ihre Partitionen zu groß für den Arbeitsspeicher und wird auf das Laufwerk übertragen.

Verhältnis der sekundären Worker

Da sekundäre Worker ihre Shuffle-Daten an primäre Worker schreiben, muss der Cluster eine ausreichende Anzahl von primären Workern mit ausreichend CPU-, Arbeitsspeicher- und Speicherressourcen haben, um den Shuffle-Ladevorgang des Jobs zu ermöglichen. Wenn Sie bei Autoscaling-Clustern verhindern möchten, dass die primäre Gruppe skaliert wird und unerwünschtes Verhalten verursacht, setzen Sie minInstances in der Autoscaling-Richtlinie auf den Wert maxInstances für die primäre Worker-Gruppe.

Wenn Sie ein hohes Verhältnis von sekundären zu primären Workern haben (z. B. 10:1), überwachen Sie die CPU-Auslastung, das Netzwerk und die Laufwerknutzung primärer Worker, um festzustellen, ob sie überlastet sind. So gehen Sie dazu vor:

  1. Rufen Sie die Seite VM-Instanzen in der Google Cloud Console

  2. Klicken Sie auf das Kästchen links neben dem primären Worker.

  3. Klicken Sie auf den Tab MONITORING, um die CPU-Auslastung, die Laufwerk-IOPS, die Netzwerk-Byte und andere Messwerte des primären Workers anzuzeigen.

Wenn primäre Worker überlastet sind, sollten Sie primäre Worker manuell vertikal skalieren.

Größe der primären Workergruppe anpassen

Die primäre Workergruppe kann sicher skaliert werden. Das Herunterskalieren der primären Workergruppe kann jedoch den Jobfortschritt beeinträchtigen. Vorgänge, die die primäre Worker-Gruppe herunterskalieren, sollten die ordnungsgemäße Außerbetriebnahme verwenden. Diese wird durch Festlegen des Flags --graceful-decommission-timeout aktiviert.

Automatisch skalierte Cluster: Die Skalierung der primären Workergruppe ist in EFS-Clustern mit Autoscaling-Richtlinien deaktiviert. So passen Sie die Größe der primären Workergruppe auf einem automatisch skalierten Cluster an:

  1. Autoscaling deaktivieren

    gcloud dataproc clusters update \
        --cluster=cluster-name \
        --region=region \
        --disable-autoscaling
    

  2. Primäre Gruppe skalieren

    gcloud dataproc clusters update \
        --cluster=cluster-name \
        --region=region \
        --num-workers=num-primary-workers \
        --graceful-decommission-timeout=graceful-decommission-timeout # (if downscaling)
    

  3. Aktivieren Sie das Autoscaling wieder:

    gcloud dataproc clusters update \
        --cluster=cluster-name \
        --region=region \
        --autoscaling-policy=autoscaling-policy
    

Nutzung des primären Worker-Laufwerks überwachen

Primäre Worker müssen genügend Speicherplatz für die Shuffle-Daten des Clusters haben. Sie können dies indirekt anhand des Messwerts remaining HDFS capacity überwachen. Wenn das lokale Laufwerk gefüllt wird, ist der Speicherplatz für HDFS nicht verfügbar und die verbleibende Kapazität sinkt.

Wenn das lokale Laufwerk eines primären Workers über 90 % der Kapazität überschreitet, wird der Knoten in der YARN-Knoten-Benutzeroberfläche standardmäßig als UNHEALTHY gekennzeichnet. Wenn Probleme mit der Laufwerkskapazität auftreten, können Sie nicht verwendete Daten aus HDFS löschen oder den primären Worker-Pool hochskalieren.

Erweiterte Konfiguration

Partitionierung und Parallelität

Konfigurieren Sie beim Senden eines MapReduce- oder Spark-Jobs eine geeignete Partitionierungsebene. Die Entscheidung über die Anzahl der Eingabe- und Ausgabepartitionen für eine Shuffle-Phase hat einen Kompromiss zwischen verschiedenen Leistungsmerkmalen. Am besten experimentieren Sie mit Werten, die für Ihre Jobformen funktionieren.

Eingabepartitionen

Die Partitionierung von MapReduce und Spark wird vom Eingabe-Dataset bestimmt. Beim Lesen von Dateien aus Cloud Storage verarbeitet jede Aufgabe ungefähr eine "Blockgröße" an Daten.

  • Bei Spark SQL-Jobs wird die maximale Partitionsgröße von spark.sql.files.maxPartitionBytes festgelegt. Wir empfehlen, den Wert auf 1 GB zu erhöhen: spark.sql.files.maxPartitionBytes=1073741824.

  • Bei MapReduce-Jobs und Spark-RDDs wird die Partitionsgröße in der Regel mit fs.gs.block.size gesteuert, die standardmäßig 128 MB beträgt. Wir empfehlen, den Wert auf 1 GB zu erhöhen. Sie können auch InputFormat-spezifische Attribute wie mapreduce.input.fileinputformat.split.minsize und mapreduce.input.fileinputformat.split.maxsize festlegen.

    • Für MapReduce-Jobs: --properties fs.gs.block.size=1073741824
    • Für Spark-RDDs: --properties spark.hadoop.fs.gs.block.size=1073741824

Ausgabepartitionen

Die Anzahl der Aufgaben in den folgenden Phasen wird durch mehrere Attribute gesteuert. Bei größeren Jobs, die mehr als 1 TB verarbeiten, sollten Sie mindestens 1 GB pro Partition bereitstellen.

  • Bei MapReduce-Jobs wird die Anzahl der Ausgabepartitionen von mapreduce.job.reduces gesteuert.

  • Bei Spark SQL wird die Anzahl der Ausgabepartitionen von spark.sql.shuffle.partitions gesteuert.

  • Bei Spark-Jobs, die die RDD API verwenden, können Sie die Anzahl der Ausgabepartitionen angeben oder spark.default.parallelism festlegen.

Shuffle-Tuning des primären Workers

Das wichtigste Attribut ist --properties yarn:spark.shuffle.io.serverThreads=<num-threads>. Beachten Sie, dass hier ein YARN-Attribut auf Clusterebene angegeben ist, da der Spark-Shuffle-Server als Teil des Knoten-Managers ausgeführt wird. Standardmäßig wird die Anzahl der Kerne auf dem Computer verdoppelt (z. B. 16 Threads auf einem n1-highmem-8). Wenn „Shuffle Read Blocked Time“ größer als 1 Sekunde ist und primäre Worker das Netzwerk-, CPU- oder Laufwerklimit nicht erreicht haben, sollten Sie die Anzahl der Shuffle-Server-Threads erhöhen.

Bei größeren Maschinentypen sollten Sie möglicherweise spark.shuffle.io.numConnectionsPerPeer erhöhen. Der Standardwert ist 1. (Setzen Sie es z. B. auf 5 Verbindungen pro Hostpaar).

Wiederholungsversuche erhöhen

Die maximale Anzahl der Versuche, die App-Master, Aufgaben und Phasen verwenden können, kann über die folgenden Attribute konfiguriert werden:

yarn:yarn.resourcemanager.am.max-attempts
mapred:mapreduce.map.maxattempts
mapred:mapreduce.reduce.maxattempts
spark:spark.task.maxFailures
spark:spark.stage.maxConsecutiveAttempts

Da App-Master und Aufgaben häufiger in Clustern beendet werden, die viele VMs auf Abruf oder Autoscaling ohne ordnungsgemäße Außerbetriebnahme verwenden, können die Werte der oben genannten Attribute in diesen Clustern gesteigert werden. (Bitte beachten Sie, dass die Verwendung von EFM mit Spark und ordnungsgemäßer Außerbetriebnahme nicht unterstützt wird.

HDFS für HCFS-Shuffle konfigurieren

Um die Leistung großer Shuffle zu verbessern, können Sie die Inhaltskonflikte im NameNode durch Festlegen von dfs.namenode.fslock.fair=false verringern. Beachten Sie, dass dies Risiken für einzelne Anfragen bedeutet, aber den clusterweiten Durchsatz verbessern kann. Sie können lokale SSDs noch weiter verbessern, indem Sie lokale SSDs mit dem Masterknoten verknüpfen, indem Sie --num-master-local-ssds festlegen. Sie können für primäre Worker auch lokale SSDs hinzufügen, um die Leistung von DataNode zu verbessern, indem Sie --num-worker-local-ssds festlegen.

Andere Hadoop-kompatible Dateisysteme für HCFS-Shuffle

Standardmäßig werden die EFM-HCFS-Shuffle-Daten in HDFS geschrieben. Sie können jedoch jedes Hadoop-kompatible Dateisystem (HCFS) verwenden. Beispiel: Sie können Shuffle in Cloud Storage oder in das HDFS eines anderen Clusters schreiben. Zur Angabe eines Dateisystems können Sie fs.defaultFS auf das Zieldateisystem verweisen , wenn Sie einen Job an Ihr Cluster senden.

YARN: Ordnungsgemäße Außerbetriebnahme in EFS-Clustern

Ordnungsgemäße Außerbetriebnahme von YARN Knoten lassen sich schnell mit minimalen auf die Ausführung von Anwendungen. Bei Autoscaling-Clustern kann das Zeitlimit für eine ordnungsgemäße Außerbetriebnahme in einer AutoscalingPolicy festgelegt werden, die an den EFM-Cluster angehängt ist.

MapReduce-EFM-Verbesserungen für eine ordnungsgemäße Außerbetriebnahme

  1. Da Zwischendaten in einem verteilten Dateisystem gespeichert werden, können Knoten aus einem EFM-Cluster entfernt werden, sobald alle Container, die auf diesen Knoten ausgeführt werden, abgeschlossen sind. Dagegen werden Knoten erst dann aus Standard-Dataproc-Clustern entfernt, wenn die Anwendung beendet wurde

  2. Das Entfernen von Knoten wartet nicht, bis die auf einem Knoten ausgeführten App-Master beendet wurden. Wenn der Master-Container der Anwendung beendet wird, wird er auf einem anderen Knoten verschoben, der nicht außer Betrieb genommen wird. Jobfortschritte gehen nicht verloren. Der neue App-Master stellt den Status des vorherigen App-Master durch das Einlesen des Jobverlaufs schnell wieder her.

Ordnungsgemäße Außerbetriebnahme in einem EFM-Cluster mit MapReduce

  1. Erstellen Sie einen ECM-Cluster mit der gleichen Anzahl an primären und sekundären Workern.

    gcloud dataproc clusters create cluster-name \
        --properties=dataproc:efm.mapreduce.shuffle=hcfs \
        --region=region \
        --num-workers=5 \
        --num-secondary-workers=5
    

  2. Führen Sie einen mapreduce-Job aus, der den Wert von Pi mit der JAR-Datei mapreduce im Cluster berechnet.

    gcloud dataproc jobs submit hadoop \
        --cluster=cluster-name \
        --region=region \
        --jar=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \
        -- pi 1000 10000000
    

  3. Während der Job ausgeführt wird, skalieren Sie den Cluster mithilfe einer ordnungsgemäßen Außerbetriebnahme herunter.

    gcloud dataproc clusters update cluster-name \
        --region=region \
        --num-secondary-workers=0 \
        --graceful-decommission-timeout=1h
    
    Die Knoten werden schnell aus dem Cluster entfernt (vor Abschluss des Jobs), wobei Verluste beim Jobfortschritt minimiert werden. Temporäre Pausen im Jobfortschritt können aus folgenden Gründen auftreten:

    • Failover zwischen App-Master Wenn der Jobfortschritt auf 0 % sinkt und sofort zum Pre-Drop-Wert springt, wurde der Anwendungsmaster möglicherweise beendet und ein neuer Anwendungsmaster wird wiederhergestellt. Der Fortschritt des Jobs sollte sich durch die schnellen Failovers wesentlich schneller ändern.
    • Vorzeitiges Beenden der VM. Da HDFS nur vollständige, nicht teilweise Ausgaben von Zuordnungsaufgaben speichert, kann es zu vorübergehenden Einbrüchen beim Jobfortschritt kommen, wenn eine VM während der Arbeit an einer Zuordnungsaufgabe vorzeitig beendet wird.

Um das Entfernen von Knoten zu beschleunigen, können Sie den ohne ordnungsgemäße Außerbetriebnahme durch Auslassen der --graceful-decommission-timeout-Flag im vorherigen Beispiel für einen gcloud-Befehl. Der Auftragsfortschritt aus abgeschlossenen Zuordnungsaufgaben bleibt erhalten, die teilweise abgeschlossene Ausgabe der Zuordnungsaufgaben geht jedoch verloren (die Zuordnungsaufgaben werden noch einmal ausgeführt).