Tipps zur Abstimmung von Spark-Jobs

Spark SQL nutzen

Die Spark SQL DataFrame API ist eine erhebliche Optimierung der RDD API. Wenn Sie mit Code interagieren, der RDDs verwendet, sollten Sie Daten als DataFrame lesen, bevor Sie einen RDD im Code übergeben. In Java- oder Scala-Code ist eine sinnvolle Verwendung der Dataset API Spark SQL als Obermenge von RDDs und DataFrames.

Apache Spark 3 nutzen

Dataproc 2.0 installiert Spark 3, das die folgenden Funktionen und Leistungsverbesserungen enthält:

  • GPU-Unterstützung
  • Möglichkeit, Binärdateien zu lesen
  • Leistungsverbesserungen
  • Dynamische Partitionsbereinigung
  • Adaptive Abfrageausführung: Spark-Jobs werden in Echtzeit optimiert

Dynamische Zuordnung nutzen

Apache Spark enthält ein Feature zur dynamischen Zuordnung, das die Anzahl der Spark-Executors auf Workern in einem Cluster skaliert. Mit diesem Feature kann ein Job den vollständigen Dataproc-Cluster auch dann verwenden, wenn der Cluster vertikal skaliert wird. Dieses Feature ist in Dataproc standardmäßig aktiviert (spark.dynamicAllocation.enabled ist auf true festgelegt). Weitere Informationen finden Sie unter Spark-dynamische Zuordnung.

Dataproc-Autoscaling nutzen

Durch das Autoscaling von Dataproc werden Dataproc-Worker dynamisch zu einem Cluster hinzugefügt und daraus entfernt, damit Spark-Jobs die erforderlichen Ressourcen haben, um schnell zu arbeiten.

Als Best Practice wird empfohlen, die Autoscaling-Richtlinie so zu konfigurieren, dass nur sekundäre Worker skaliert werden.

Erweiterten Flexibilitätsmodus von Dataproc nutzen

Cluster mit VMs auf Abruf oder einer Autoscaling-Richtlinie können FetchFailed-Ausnahmen erhalten, wenn Worker vorzeitig beendet oder entfernt werden, bevor sie die Bereitstellung von Shuffle-Daten an Reducer beenden. Diese Ausnahme kann zu Wiederholungen von Aufgaben und längeren Abschlusszeiten führen.

Empfehlung: Verwenden Sie den Dataproc-Modus Erweiterten Flexibilitätsmodus, in dem keine Zwischen-Shuffle-Daten auf sekundären Workern gespeichert werden, damit sekundäre Worker sicher vorzeitig beendet oder herunterskaliert werden können.

Partitionierung und Daten nach dem Zufallsprinzip konfigurieren

Spark speichert Daten in temporären Partitionen im Cluster. Wenn Ihre Anwendung DataFrames gruppiert oder zusammenführt, werden die Daten gemäß der Gruppierung und Konfiguration auf niedriger Ebene in neuen Partitionen nach dem Zufallsprinzip verteilt.

Die Datenpartitionierung beeinträchtigt die Anwendungsleistung erheblich: Zu wenige Partitionen begrenzen die Jobparallelität und die Nutzung von Clusterressourcen. Zu viele Partitionen verlangsamen den Job aufgrund zusätzlicher Partitionsverarbeitung und Shuffling.

Partitionen konfigurieren

Die folgenden Attribute bestimmen die Anzahl und Größe Ihrer Partitionen:

  • spark.sqlfiles.maxPartitionBytes: Die maximale Größe der Partitionen, wenn Sie Daten aus Cloud Storage lesen. Der Standardwert ist 128 MB. Dieser ist für die meisten Anwendungen, die weniger als 100 TB verarbeiten, ausreichend.

  • spark.sql.shuffle.partitions: Die Anzahl der Partitionen nach dem Shuffle-Vorgang. Der Standardwert ist 200, was für Cluster mit insgesamt weniger als 100 vCPUs geeignet ist. Empfehlung: Legen Sie dafür den Wert auf das Dreifache der Anzahl von vCPUs in Ihrem Cluster fest.

  • spark.default.parallelism: Die Anzahl der Partitionen, die nach der Ausführung von RDD-Transformationen zurückgegeben werden, die Shuffles erfordern, z. B. join, reduceByKey und parallelize. Der Standardwert ist die Gesamtzahl der vCPUs in Ihrem Cluster. Wenn Sie RDDs in Spark-Jobs verwenden, können Sie diesen Wert auf das Dreifache Ihrer vCPUs festlegen.

Anzahl der Dateien begrenzen

Wenn Spark eine große Anzahl kleiner Dateien liest, kommt es zu einem Leistungsverlust. Speichern Sie Daten in Dateien, die größer als 100 MB sind. Begrenzen Sie die Anzahl der Ausgabedateien, um einen Shuffle zu erzwingen. Weitere Informationen finden Sie unter unnötige Shuffles vermeiden.

Adaptive Abfrageausführung konfigurieren (Spark 3)

Adaptive Abfrageausführung (standardmäßig in Dataproc-Image-Version 2.0 aktiviert) bietet Verbesserungen der Spark-Jobleistung, darunter:

Obwohl die Standardkonfigurationseinstellungen für die meisten Anwendungsfälle klingen, kann es sinnvoll sein, spark.sql.adaptive.advisoryPartitionSizeInBytes auf spark.sqlfiles.maxPartitionBytes (Standard: 128 MB) zu setzen.

Unnötige Shuffles vermeiden

Mit Spark können Nutzer ein Shuffle manuell auslösen, um ihre Daten mit der Funktion repartition neu auszubalancieren. Shuffle sind teuer, daher sollten Daten neu gemischt werden. Die entsprechende Konfiguration der Partitionen sollte ausreichen, damit Spark die Daten automatisch partitionieren kann.

Ausnahme: Beim Schreiben von spaltenpartitionierten Daten in Cloud Storage wird beim Neupartitionieren einer bestimmten Spalte vermieden, dass viele kleine Dateien geschrieben werden, um schnellere Schreibzeiten zu erzielen.

df.repartition("col_name").write().partitionBy("col_name").save("gs://...")

Daten in Parquet oder Avro speichern

Spark SQL verwendet standardmäßig Lese- und Schreibzugriff auf Daten in komprimierten Snappy-Parquet-Dateien. Parquet hat ein effizientes Dateiformat, das es Spark ermöglicht, nur die Daten zu lesen, die zum Ausführen einer Anwendung benötigt werden. Dies ist ein wichtiger Vorteil bei der Arbeit mit großen Datasets. Andere Spaltenformate wie Apache ORC sind ebenfalls gut geeignet.

Für nicht spaltenorientierte Daten bietet Apache Avro ein effizientes Dateiformat für binäre Zeilen. Obwohl Avro normalerweise langsamer als Parquet ist, ist die Leistung von Avro-Formaten besser als textbasierte Formate wie CSV oder JSON.

Laufwerkgröße optimieren

Der Durchsatz nichtflüchtiger Speicher hängt von der Laufwerkgröße ab, was sich auf die Leistung des Spark-Jobs auswirken kann, da Jobs Metadaten schreiben und Daten auf das Laufwerk verschieben. Bei Verwendung von nichtflüchtigen Standardspeichern sollte die Laufwerkgröße mindestens 1 Terabyte pro Worker betragen (siehe Leistung nach Größe des nichtflüchtigen Speichers).

So überwachen Sie den Durchsatz der Worker-Laufwerke in der Google Cloud Console:

  1. Klicken Sie auf der Seite Cluster auf den Clusternamen.
  2. Klicken Sie auf den Tab VM-INSTANZEN.
  3. Klicken Sie auf einen beliebigen Worker-Namen.
  4. Klicken Sie auf den Tab MONITORING und scrollen Sie dann nach unten zum Laufwerkdurchsatz, um den Worker-Durchsatz aufzurufen.

Hinweise zu Laufwerken

Sitzungsspezifische Dataproc-Cluster, die nicht von nichtflüchtigem Speicher profitieren, können lokale SSDs verwenden. Lokale SSDs sind physisch mit dem Cluster verbunden und bieten einen höheren Durchsatz als nichtflüchtige Speicher (siehe Leistungstabelle). Lokale SSDs sind mit einer festen Größe von 375 Gigabyte verfügbar. Sie können jedoch mehrere SSDs hinzufügen, um die Leistung zu erhöhen.

Lokale SSDs speichern keine Daten nach dem Herunterfahren eines Clusters. Wenn nichtflüchtiger Speicher erwünscht ist, können Sie nichtflüchtige SSD-Speicher verwenden, die einen höheren Durchsatz für ihre Größe als nichtflüchtige Standardspeicher bieten. Nichtflüchtige SSD-Speicher sind auch eine gute Wahl, wenn die Partitionsgröße kleiner als 8 KB ist. Vermeiden Sie jedoch kleine Partitionen..

GPUs Ihrem Cluster hinzufügen

Spark 3 unterstützt GPUs. Verwenden Sie GPUs mit der RAPIDS-Initialisierungsaktion, um Spark-Jobs mit dem RAPIDS SQL Accelerator zu beschleunigen. Die GPU-Treiberinitialisierungsaktion zum Konfigurieren eines Clusters mit GPUs.

Häufige Jobfehler und -korrekturen

Nicht genügend Arbeitsspeicher

Beispiele:

  • "Lost Executor"
  • "java.lang.OutOfMemoryError: GC-Overhead überschritten"
  • "Container, der von YARN beendet wurde, um Speicherlimits zu überschreiten"

Mögliche Korrekturen:

Shuffle-Abruffehler

Beispiele:

  • "FetchFailedException" (Spark-Fehler)
  • "Verbindung konnte nicht hergestellt werden..." (Spark-Fehler)
  • "Fehler beim Abrufen" (MapReduce-Fehler)

Normalerweise verursacht durch das vorzeitige Entfernen von Workern, die noch Shuffle-Daten für die Bereitstellung haben.

Mögliche Ursachen und Lösungen:

  • Worker-VMs auf Abruf wurden zurückgefordert oder nicht auf Abruf verwendete Worker-VMs wurden vom Autoscaling entfernt. Lösung: Verwenden Sie den Enhanced Flexibility Mode, um sekundäre Worker sicher auf Abruf oder skalierbar zu machen.
  • Der Executor oder Mapper ist aufgrund eines OutOfMemory-Fehlers abgestürzt. Lösung: Erhöhen Sie den Arbeitsspeicher des Executors oder des Mappers.
  • Der Spark-Shuffle-Dienst ist möglicherweise überlastet. Lösung: Reduzieren Sie die Anzahl der Jobpartitionen.

YARN-Knoten sind FEHLERHAFT

Beispiele (aus YARN-Logs):

...reported UNHEALTHY with details: 1/1 local-dirs usable space is below
configured utilization percentage/no more usable space
[ /hadoop/yarn/nm-local-dir : used space above threshold of 90.0% ]

Häufig mit unzureichendem Speicherplatz für Shuffle-Daten verbunden. Diagnose durch Aufrufen der Logdateien:

  • Öffnen Sie in der Google Cloud Console die Seite Cluster für Ihr Projekt und klicken Sie auf den Namen des Clusters.
  • Klicken Sie auf LOGS ANSEHEN.
  • Logs nach hadoop-yarn-nodemanager filtern.
  • Suchen Sie nach "FEHLERHAFT".

Lösung: Wenn das Log den Status "FEHLERHAFT" meldet, erstellen Sie den Cluster mit größerem Speicherplatz neu. Dadurch wird die Durchsatzobergrenze erhöht.

Weitere Informationen