Dataproc Enhanced Flexibility Mode

Wenn ein Dataproc-Knoten entfernt wird, bewahrt der Dataproc Enhanced Flexibility Mode (EFM) zustandsorientierte Knotendaten, wie MapReduce-Shuffle-Daten, in HDFS.

Da HDFS nur für primäre Worker ausgeführt wird, eignet sich diese Funktion besonders für Cluster, die VMs auf Abruf verwenden oder die Worker-Gruppe auf Abruf nur automatisch skalieren.

Beschränkungen:

  • Der Enhanced Flexibility Mode wird derzeit nur in 1.4-Image-Clustern unterstützt.
  • Apache Hadoop YARN-Jobs, die die AppMaster-Verschiebung nicht unterstützen, können im Enhanced Flexibility Mode häufiger fehlschlagen. Weitere Informationen unter Wann auf den Abschluss von AppMastern gewartet werden muss.
  • Der Enhanced Flexibility Mode wird nicht empfohlen, wenn:
    • Ein Cluster nur primäre Worker hat
    • Das primäre Worker-Autoscaling aktiviert ist, es sei denn, EFM ist so konfiguriert, dass es auf das HDFS eines anderen Clusters verweist.
      Hinweis: Wenn Jobs stark von HDFS abhängen, wird Autoscaling von HDFS nicht empfohlen.

Enhanced Flexibility Mode verwenden

Erstellen Sie einen Cluster mit aktiviertem Enhanced Flexibility Mode:

gcloud dataproc clusters create cluster-name \
    --region=region \
    --properties=dataproc:efm.spark.shuffle=hcfs \
    --properties=dataproc:efm.mapreduce.shuffle=hcfs \
    --image-version=1.4

Beispiel für Apache Hadoop MapReduce

  1. Führen Sie in der MapReduce-Beispiel-JAR-Datei im Cluster mit Enhanced Flexibility Mode einen kleinen TeraGen-Job aus, um Eingabedaten in Cloud Storage für einen späteren TeraSort-Job zu generieren.
    gcloud dataproc jobs submit hadoop \
        --cluster=cluster-name \
        --region=region \
        --jar=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \
        -- teragen 1000 Cloud Storage output URI (for example, gs://terasort/input)
    
  2. Führen Sie einen TeraSort-Job für die Daten aus.
    gcloud dataproc jobs submit hadoop \
        --cluster=cluster-name \
        --region=region \
        --jar=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \
        -- terasort gs://terasort/input gs://terasort/output
    

Beispiel für Apache Spark

  1. Führen Sie einen WordCount-Job für öffentlichen Shakespeare-Text mithilfe der Spark-Beispiel-JAR-Datei auf dem Cluster mit Enhanced Flexibility Mode aus.
    gcloud dataproc jobs submit spark \
        --cluster=cluster-name \
        --region=region \
        --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
        --class=org.apache.spark.examples.JavaWordCount \
        -- gs://apache-beam-samples/shakespeare/macbeth.txt
    

YARN: Ordnungsgemäße Außerbetriebnahme in Clustern mit Enanced Flexibility Mode

Die ordnungsgemäße Außerbetriebnahme in YARN kann verwendet werden, um Knoten schnell und mit minimalen Auswirkungen auf die Ausführung von Anwendungen zu entfernen. Bei Autoscaling-Clustern kann das Zeitlimit für eine ordnungsgemäße Außerbetriebnahme in einer AutoscalingPolicy festgelegt werden, die an den Cluster mit Enhanced Flexibility Mode angehängt ist.

Enhanced Flexibility Mode für eine ordnungsgemäße Außerbetriebnahme

  1. Da Zwischendaten in verteilten Dateisystemen gespeichert werden, können Knoten im erweiterten Modus aus Clustern entfernt werden, sobald alle auf diesen Knoten ausgeführten Container ihre Arbeit beendet haben. Dagegen werden Knoten erst dann aus Standard-Dataproc-Clustern entfernt, wenn die Anwendung beendet wurde

  2. Das Entfernen von Knoten wartet nicht, bis die auf einem Knoten ausgeführten App-Master beendet wurden. Wird der Master-Container der Anwendung geendet, so wird er auf einen anderen Knoten verschoben, der nicht außer Betrieb genommen wird. Jobfortschritte gehen nicht verloren. Der neue App-Master stellt den Status des vorherigen App-Master durch das Einlesen des Jobverlaufs schnell wieder her.

Ordnungsgemäße Außerbetriebnahme in einem Cluster mit Enhanced Flexibility Mode

Beispiel:

  1. Erstellen Sie einen Cluster mit Enhanced Flexibility Mode und einer gleichen Anzahl von primären und sekundären Workern.
    gcloud dataproc clusters create cluster-name \
        --properties=dataproc:efm.spark.shuffle=hcfs \
        --properties=dataproc:efm.mapreduce.shuffle=hcfs \
        --image-version=1.4 \
        --region=region \
        --num-workers=5 \
        --num-secondary-workers=5
    
  2. Führen Sie im mapreduce-Beispiel-JAR im Cluster einen MapReduce-Job aus, der den Wert von Pi berechnet.
    gcloud dataproc jobs submit hadoop \
        --cluster=cluster-name \
        --region=region \
        --jar=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \
        -- pi 1000 10000000
    
  3. Während der Job ausgeführt wird, skalieren Sie den Cluster mithilfe einer ordnungsgemäßen Außerbetriebnahme herunter.
    gcloud dataproc clusters update cluster-name \
        --region=region \
        --num-secondary-workers=0 \
        --graceful-decommission-timeout=1h
    

Die Knoten werden schnell aus dem Cluster entfernt (vor Abschluss des Jobs), wobei Verluste beim Jobfortschritt minimiert werden. Vorübergehende Rückschritte des Jobfortschritts können folgende Ursachen haben:

  • App-Master-Failover: Wenn der Jobfortschritt auf 0 % sinkt und dann sofort auf den vor dem Abfall angezeigten Wert steigt, wurde wahrscheinlich der App-Master geendet und ein neuer App-Master hat diesen Zustand wiederhergestellt. Dies sollte den Fortschritt des Jobs nicht wesentlich beeinträchtigen, da das Failover schnell erfolgt.
  • Vorzeitiges Beenden der VM. Da HDFS nur vollständige, nicht teilweise Ausgaben von Zuordnungsaufgaben speichert, kann es zu vorübergehenden Einbrüchen beim Jobfortschritt kommen, wenn eine VM während der Arbeit an einer Zuordnungsaufgabe vorzeitig beendet wird.

Erweiterte Konfiguration

HDFS konfigurieren

Zur Verbesserung der Leistung großer Shuffle können Sie die Anzahl der Serving-Threads für Namen- und Datenknoten steigern. Dazu erhöhen Sie beim Erstellen des Clusters die Werte für dfs.namenode.handler.count und dfs.datanode.handler.count. Standardmäßig haben der Namenknoten und die Datenknoten 10 Serving-Threads. Zur weiteren Verbesserung der Leistung von Namenknoten können Sie lokale SSDs mit dem Masterknoten verknüpfen. Dazu übergeben Sie das Flag --num-master-local-ssds an den Befehl clusters create. Sie können auch das Flag --num-worker-local-ssds verwenden, um lokale SSDs an Datenknoten anzuhängen und so die HDFS-Leistung zu verbessern.

Partitionierung und Parallelität

Beim Senden eines MapReduce- oder Spark-Jobs ist es wichtig, eine geeignete Partitionierung zu konfigurieren. Die Anzahl der Eingabe- und Ausgabepartitionen für Shuffle-Phasen beeinflussen unterschiedliche Leistungsmerkmale. Idealerweise experimentieren Sie mit für Ihre Art Jobs geeigneten Werten.

Bei MapReduce-Jobs wird die Anzahl der Ausgabepartitionen durch mapreduce.job.reduces gesteuert. Bei Spark SQL wird die Anzahl der Ausgabepartitionen durch spark.sql.shuffle.partitions gesteuert. Für Spark-Jobs, die die RDD API verwenden, können Sie die Anzahl der Partitionen angeben. Die Eingabepartitierung für MapReduce und Spark wird implizit durch das Eingabe-Dataset festgelegt.

Wiederholungsversuche erhöhen

Die maximale Anzahl der Versuche, die App-Master, Aufgaben und Phasen verwenden können, kann über die folgenden Attribute konfiguriert werden:

yarn:yarn.resourcemanager.am.max-attempts
mapred:mapreduce.map.maxattempts
mapred:mapreduce.reduce.maxattempts
spark:spark.task.maxFailures
spark:spark.stage.maxConsecutiveAttempts

Da App-Master und Aufgaben häufiger in Clustern beendet werden, die viele VMs auf Abruf oder Autoscaling ohne ordnungsgemäße Außerbetriebnahme verwenden, wird empfohlen, die obigen Werte in diesen Clustern zu erhöhen.

Verhältnis der Worker auf Abruf

Da Worker ihre Shuffle-Daten in HDFS schreiben, ist es wichtig, dass Ihr Cluster genügend primäre Worker enthält, um die Shuffle-Daten Ihres Jobs zu verarbeiten. Bei Autoscaling-Clustern können Sie den Zielanteil der Worker auf Abruf in Ihrem Cluster steuern, indem Sie in der AutoscalingPolicy Gewichtungen in derworkerConfig und der secondaryWorkerConfig zuweisen.

Andere Hadoop-kompatible Dateisysteme

Standardmäßig werden Shuffle-Daten in ein HDFS geschrieben. Sie können jedoch auch jedes Hadoop-kompatibles Dateisystem (HCFS) verwenden. Beispiel: Sie können Shuffle in Cloud Storage oder in das HDFS eines anderen Clusters schreiben. Zur Angabe eines Dateisystems können Sie fs.defaultFS auf das Zieldateisystem verweisen , wenn Sie einen Job an Ihr Cluster senden.