In den folgenden Abschnitten finden Sie Tipps zur Optimierung Ihrer Dataproc-Spark-Anwendungen.
Sitzungsspezifische Cluster verwenden
Wenn Sie das Dataproc-Clustermodell „sitzungsspezifisch“ verwenden, erstellen Sie für jeden Job einen eigenen Cluster. Nach Abschluss des Jobs wird der Cluster gelöscht. Mit dem sitzungsspezifischen Modell können Sie Speicher und Rechenleistung getrennt behandeln, Job-Eingabe- und -Ausgabedaten in Cloud Storage oder BigQuery speichern und den Cluster nur für die Rechenleistung und den temporären Datenspeicher verwenden.
Stolperfallen bei persistenten Clustern
Mit sitzungsspezifischen Clustern für einen einzelnen Job lassen sich die folgenden Fallstricke und potenziellen Probleme vermeiden, die mit der Verwendung gemeinsam genutzter und langlebiger „persistenter“ Cluster verbunden sind:
- Single Point of Failure: Ein Fehlerstatus eines freigegebenen Clusters kann dazu führen, dass alle Jobs fehlschlagen und eine gesamte Datenpipeline blockiert wird. Die Untersuchung und Behebung eines Fehlers kann Stunden dauern. Da in sitzungsspezifischen Clustern nur temporäre Clusterstatus beibehalten werden, können sie bei einem Fehler schnell gelöscht und neu erstellt werden.
- Schwierigkeiten bei der Verwaltung und Migration von Clusterstatus in HDFS, MySQL oder lokalen Dateisystemen
- Ressourcenkonflikte zwischen Jobs, die sich negativ auf SLOs auswirken
- Nicht reagierende Dienstdaemons aufgrund von Speichermangel
- Ansammlung von Protokollen und temporären Dateien, die die Laufwerkkapazität überschreiten können
- Hochskalierung fehlgeschlagen, da die Clusterzone aufgebraucht ist
- Es wird keine Unterstützung für veraltete Cluster-Image-Versionen angeboten.
Vorteile von Sitzungsspezifischen Clustern
Ephemeral Cluster bieten folgende Vorteile:
- Konfigurieren Sie unterschiedliche IAM-Berechtigungen für verschiedene Jobs mit unterschiedlichen Dataproc-VM-Dienstkonten.
- Optimieren Sie die Hardware- und Softwarekonfigurationen eines Clusters für jeden Job und ändern Sie die Clusterkonfigurationen nach Bedarf.
- Führen Sie ein Upgrade der Image-Versionen in neuen Clustern durch, um die neuesten Sicherheitspatches, Fehlerkorrekturen und Optimierungen zu erhalten.
- Probleme in einem isolierten Cluster mit einem einzelnen Job schneller beheben.
- Sie sparen Kosten, da Sie nur für die Laufzeit von Sitzungsspezifischen Clustern und nicht für die Inaktivitätszeiten zwischen Jobs in einem freigegebenen Cluster bezahlen.
Spark SQL nutzen
Die Spark SQL DataFrame API ist eine erhebliche Optimierung der RDD API. Wenn Sie mit Code interagieren, der RDDs verwendet, sollten Sie Daten als DataFrame lesen, bevor Sie einen RDD im Code übergeben. In Java- oder Scala-Code ist eine sinnvolle Verwendung der Dataset API Spark SQL als Obermenge von RDDs und DataFrames.
Apache Spark 3 nutzen
Dataproc 2.0 installiert Spark 3, das die folgenden Funktionen und Leistungsverbesserungen enthält:
- GPU-Unterstützung
- Möglichkeit, Binärdateien zu lesen
- Leistungsverbesserungen
- Dynamische Partitionsbereinigung
- Adaptive Abfrageausführung: Spark-Jobs werden in Echtzeit optimiert
Dynamische Zuordnung nutzen
Apache Spark enthält ein Feature zur dynamischen Zuordnung, das die Anzahl der Spark-Executors auf Workern in einem Cluster skaliert. Mit diesem Feature kann ein Job den vollständigen Dataproc-Cluster auch dann verwenden, wenn der Cluster vertikal skaliert wird. Dieses Feature ist in Dataproc standardmäßig aktiviert (spark.dynamicAllocation.enabled
ist auf true
festgelegt). Weitere Informationen finden Sie unter Spark-dynamische Zuordnung.
Dataproc-Autoscaling nutzen
Durch das Autoscaling von Dataproc werden Dataproc-Worker dynamisch zu einem Cluster hinzugefügt und daraus entfernt, damit Spark-Jobs die erforderlichen Ressourcen haben, um schnell zu arbeiten.
Als Best Practice wird empfohlen, die Autoscaling-Richtlinie so zu konfigurieren, dass nur sekundäre Worker skaliert werden.
Erweiterten Flexibilitätsmodus von Dataproc nutzen
Cluster mit VMs auf Abruf oder einer Autoscaling-Richtlinie können FetchFailed-Ausnahmen erhalten, wenn Worker vorzeitig beendet oder entfernt werden, bevor sie die Bereitstellung von Shuffle-Daten an Reducer beenden. Diese Ausnahme kann zu Wiederholungen von Aufgaben und längeren Abschlusszeiten führen.
Empfehlung: Verwenden Sie den Dataproc-Modus Erweiterten Flexibilitätsmodus, in dem keine Zwischen-Shuffle-Daten auf sekundären Workern gespeichert werden, damit sekundäre Worker sicher vorzeitig beendet oder herunterskaliert werden können.
Partitionierung und Daten nach dem Zufallsprinzip konfigurieren
Spark speichert Daten in temporären Partitionen im Cluster. Wenn Ihre Anwendung DataFrames gruppiert oder zusammenführt, werden die Daten gemäß der Gruppierung und Konfiguration auf niedriger Ebene in neuen Partitionen nach dem Zufallsprinzip verteilt.
Die Datenpartitionierung beeinträchtigt die Anwendungsleistung erheblich: Zu wenige Partitionen begrenzen die Jobparallelität und die Nutzung von Clusterressourcen. Zu viele Partitionen verlangsamen den Job aufgrund zusätzlicher Partitionsverarbeitung und Shuffling.
Partitionen konfigurieren
Die folgenden Attribute bestimmen die Anzahl und Größe Ihrer Partitionen:
spark.sql.files.maxPartitionBytes
: Die maximale Größe der Partitionen, wenn Sie Daten aus Cloud Storage lesen. Der Standardwert ist 128 MB. Dieser ist für die meisten Anwendungen, die weniger als 100 TB verarbeiten, ausreichend.spark.sql.shuffle.partitions
: Die Anzahl der Partitionen nach dem Shuffle-Vorgang. Der Standardwert ist 200, was für Cluster mit insgesamt weniger als 100 vCPUs geeignet ist. Empfehlung: Legen Sie dafür den Wert auf das Dreifache der Anzahl von vCPUs in Ihrem Cluster fest.spark.default.parallelism
: Die Anzahl der Partitionen, die nach der Ausführung von RDD-Transformationen zurückgegeben werden, die Shuffles erfordern, z. B.join
,reduceByKey
undparallelize
. Der Standardwert ist die Gesamtzahl der vCPUs in Ihrem Cluster. Wenn Sie RDDs in Spark-Jobs verwenden, können Sie diesen Wert auf das Dreifache Ihrer vCPUs festlegen.
Anzahl der Dateien begrenzen
Wenn Spark eine große Anzahl kleiner Dateien liest, kommt es zu einem Leistungsverlust. Speichern Sie Daten in größeren Dateien, z. B. mit einer Größe von 256 MB bis 512 MB. Begrenzen Sie die Anzahl der Ausgabedateien, um einen Shuffle zu erzwingen. Weitere Informationen finden Sie unter unnötige Shuffles vermeiden.
Adaptive Abfrageausführung konfigurieren (Spark 3)
Adaptive Abfrageausführung (standardmäßig in Dataproc-Image-Version 2.0 aktiviert) bietet Verbesserungen der Spark-Jobleistung, darunter:
- Partitionen nach Shuffles zusammenfassen
- Joins der Sortierzusammenführung in Broadcast-Joins umwandeln
- Optimierungen für verzerrte Joins.
Obwohl die Standardkonfigurationseinstellungen für die meisten Anwendungsfälle klingen, kann es sinnvoll sein, spark.sql.adaptive.advisoryPartitionSizeInBytes
auf spark.sqlfiles.maxPartitionBytes
(Standard: 128 MB) zu setzen.
Unnötige Shuffles vermeiden
Mit Spark können Nutzer ein Shuffle manuell auslösen, um ihre Daten mit der Funktion repartition
neu auszubalancieren. Shuffle sind teuer, daher sollten Daten neu gemischt werden. Die entsprechende Konfiguration der Partitionen sollte ausreichen, damit Spark die Daten automatisch partitionieren kann.
Ausnahme: Beim Schreiben von spaltenpartitionierten Daten in Cloud Storage wird beim Neupartitionieren einer bestimmten Spalte vermieden, dass viele kleine Dateien geschrieben werden, um schnellere Schreibzeiten zu erzielen.
df.repartition("col_name").write().partitionBy("col_name").save("gs://...")
Daten in Parquet oder Avro speichern
Spark SQL verwendet standardmäßig Lese- und Schreibzugriff auf Daten in komprimierten Snappy-Parquet-Dateien. Parquet hat ein effizientes Dateiformat, das es Spark ermöglicht, nur die Daten zu lesen, die zum Ausführen einer Anwendung benötigt werden. Dies ist ein wichtiger Vorteil bei der Arbeit mit großen Datasets. Andere Spaltenformate wie Apache ORC sind ebenfalls gut geeignet.
Für nicht spaltenorientierte Daten bietet Apache Avro ein effizientes Dateiformat für binäre Zeilen. Obwohl Avro normalerweise langsamer als Parquet ist, ist die Leistung von Avro-Formaten besser als textbasierte Formate wie CSV oder JSON.
Laufwerkgröße optimieren
Der Durchsatz nichtflüchtiger Speicher hängt von der Laufwerkgröße ab, was sich auf die Leistung des Spark-Jobs auswirken kann, da Jobs Metadaten schreiben und Daten auf das Laufwerk verschieben. Bei Verwendung von nichtflüchtigen Standardspeichern sollte die Laufwerkgröße mindestens 1 Terabyte pro Worker betragen (siehe Leistung nach Größe des nichtflüchtigen Speichers).
So überwachen Sie den Worker-Laufwerkdurchsatz in der Google Cloud Console:
- Klicken Sie auf der Seite Cluster auf den Clusternamen.
- Klicken Sie auf den Tab VM-INSTANZEN.
- Klicken Sie auf einen beliebigen Worker-Namen.
- Klicken Sie auf den Tab MONITORING und scrollen Sie dann nach unten zum Laufwerkdurchsatz, um den Worker-Durchsatz aufzurufen.
Hinweise zu Laufwerken
Sitzungsspezifische Dataproc-Cluster, die nicht von nichtflüchtigem Speicher profitieren, können lokale SSDs verwenden. Lokale SSDs sind physisch mit dem Cluster verbunden und bieten einen höheren Durchsatz als nichtflüchtige Speicher (siehe Leistungstabelle). Lokale SSDs sind mit einer festen Größe von 375 Gigabyte verfügbar. Sie können jedoch mehrere SSDs hinzufügen, um die Leistung zu erhöhen.
Lokale SSDs speichern keine Daten nach dem Herunterfahren eines Clusters. Wenn Sie nichtflüchtigen Speicher benötigen, können Sie nichtflüchtige SSD-Speicher verwenden, die einen höheren Durchsatz für ihre Größe als nichtflüchtige Standardspeicher bieten. Nichtflüchtige SSD-Speicher sind auch eine gute Wahl, wenn die Partitionsgröße kleiner als 8 KB ist. Vermeiden Sie jedoch kleine Partitionen..
GPUs Ihrem Cluster hinzufügen
Spark 3 unterstützt GPUs. Verwenden Sie GPUs mit der RAPIDS-Initialisierungsaktion, um Spark-Jobs mit dem RAPIDS SQL Accelerator zu beschleunigen. Die GPU-Treiberinitialisierungsaktion zum Konfigurieren eines Clusters mit GPUs.
Häufige Jobfehler und -korrekturen
Nicht genügend Arbeitsspeicher
Beispiele:
- "Lost Executor"
- "java.lang.OutOfMemoryError: GC-Overhead überschritten"
- "Container, der von YARN beendet wurde, um Speicherlimits zu überschreiten"
Mögliche Korrekturen:
- Wenn Sie PySpark verwenden, erhöhen Sie
spark.executor.memoryOverhead
und senken Sie den Wertspark.executor.memory
. - Verwenden Sie Maschinentypen mit großem Arbeitsspeicher.
- Verwenden Sie kleinere Partitionen.
Shuffle-Abruffehler
Beispiele:
- "FetchFailedException" (Spark-Fehler)
- "Verbindung konnte nicht hergestellt werden..." (Spark-Fehler)
- "Fehler beim Abrufen" (MapReduce-Fehler)
Normalerweise verursacht durch das vorzeitige Entfernen von Workern, die noch Shuffle-Daten für die Bereitstellung haben.
Mögliche Ursachen und Lösungen:
- Worker-VMs auf Abruf wurden zurückgefordert oder nicht auf Abruf verwendete Worker-VMs wurden vom Autoscaling entfernt. Lösung: Verwenden Sie den Enhanced Flexibility Mode, um sekundäre Worker sicher auf Abruf oder skalierbar zu machen.
- Der Executor oder Mapper ist aufgrund eines OutOfMemory-Fehlers abgestürzt. Lösung: Erhöhen Sie den Arbeitsspeicher des Executors oder des Mappers.
- Der Spark-Shuffle-Dienst ist möglicherweise überlastet. Lösung: Reduzieren Sie die Anzahl der Jobpartitionen.
YARN-Knoten sind FEHLERHAFT
Beispiele (aus YARN-Logs):
...reported UNHEALTHY with details: 1/1 local-dirs usable space is below
configured utilization percentage/no more usable space
[ /hadoop/yarn/nm-local-dir : used space above threshold of 90.0% ]
Häufig mit unzureichendem Speicherplatz für Shuffle-Daten verbunden. Diagnose durch Aufrufen der Logdateien:
- Öffnen Sie die Seite Cluster Ihres Projekts in der Google Cloud Console und klicken Sie auf den Namen des Clusters.
- Klicken Sie auf LOGS ANSEHEN.
- Logs nach
hadoop-yarn-nodemanager
filtern. - Suchen Sie nach "FEHLERHAFT".
Mögliche Lösungen:
- Der Nutzercache wird im Verzeichnis gespeichert, das durch die Eigenschaft
yarn.nodemanager.local-dirs
inyarn-site.xml file
angegeben ist. Diese Datei befindet sich unter/etc/hadoop/conf/yarn-site.xml
. Sie können den freien Speicherplatz im Pfad/hadoop/yarn/nm-local-dir
prüfen und Speicherplatz freigeben, indem Sie den Cache-Ordner des Nutzers/hadoop/yarn/nm-local-dir/usercache
löschen. - Wenn das Log den Status „FEHLERHAFT“ meldet, erstellen Sie den Cluster mit größerem Speicherplatz neu. Dadurch wird die Durchsatzobergrenze erhöht.
Job fehlgeschlagen aufgrund unzureichenden Treiberspeichers
Wenn Jobs im Clustermodus ausgeführt werden, schlägt der Job fehl, wenn die Arbeitsspeichergröße des Masterknotens deutlich größer als die Arbeitsspeichergröße des Workerknotens ist.
Beispiel aus Fahrerprotokollen:
'Exception in thread "main" java.lang.IllegalArgumentException: Required AM memory (32768+3276 MB) is above the max threshold (12288 MB) of this cluster! Please check the values of 'yarn.scheduler.maximum -allocation-mb' and/or 'yarn.nodemanager.resource.memory-mb'.'
Mögliche Lösungen:
- Legen Sie
spark:spark.driver.memory
auf einen Wert fest, der kleiner alsyarn:yarn.scheduler.maximum-allocation-mb
ist. - Verwenden Sie für Master- und Worker-Knoten denselben Maschinentyp.