Spark SQL nutzen
Die Spark SQL DataFrame API ist eine erhebliche Optimierung der RDD API. Wenn Sie mit Code interagieren, der RDDs verwendet, sollten Sie Daten als DataFrame lesen, bevor Sie einen RDD im Code übergeben. In Java- oder Scala-Code ist eine sinnvolle Verwendung der Dataset API Spark SQL als Obermenge von RDDs und DataFrames.
Apache Spark 3 nutzen
Dataproc 2.0 installiert Spark 3, das die folgenden Funktionen und Leistungsverbesserungen enthält:
- GPU-Unterstützung
- Möglichkeit, Binärdateien zu lesen
- Leistungsverbesserungen
- Dynamische Partitionsbereinigung
- Adaptive Abfrageausführung: Spark-Jobs werden in Echtzeit optimiert
Dynamische Zuordnung nutzen
Apache Spark enthält ein Feature zur dynamischen Zuordnung, das die Anzahl der Spark-Executors auf Workern in einem Cluster skaliert. Mit diesem Feature kann ein Job den vollständigen Dataproc-Cluster auch dann verwenden, wenn der Cluster vertikal skaliert wird. Dieses Feature ist in Dataproc standardmäßig aktiviert (spark.dynamicAllocation.enabled
ist auf true
festgelegt). Weitere Informationen finden Sie unter Spark-dynamische Zuordnung.
Dataproc-Autoscaling nutzen
Durch das Autoscaling von Dataproc werden Dataproc-Worker dynamisch zu einem Cluster hinzugefügt und daraus entfernt, damit Spark-Jobs die erforderlichen Ressourcen haben, um schnell zu arbeiten.
Als Best Practice wird empfohlen, die Autoscaling-Richtlinie so zu konfigurieren, dass nur sekundäre Worker skaliert werden.
Erweiterten Flexibilitätsmodus von Dataproc nutzen
Cluster mit VMs auf Abruf oder einer Autoscaling-Richtlinie können FetchFailed-Ausnahmen erhalten, wenn Worker vorzeitig beendet oder entfernt werden, bevor sie die Bereitstellung von Shuffle-Daten an Reducer beenden. Diese Ausnahme kann zu Wiederholungen von Aufgaben und längeren Abschlusszeiten führen.
Empfehlung: Verwenden Sie den Dataproc-Modus Erweiterten Flexibilitätsmodus, in dem keine Zwischen-Shuffle-Daten auf sekundären Workern gespeichert werden, damit sekundäre Worker sicher vorzeitig beendet oder herunterskaliert werden können.
Partitionierung und Daten nach dem Zufallsprinzip konfigurieren
Spark speichert Daten in temporären Partitionen im Cluster. Wenn Ihre Anwendung DataFrames gruppiert oder zusammenführt, werden die Daten gemäß der Gruppierung und Konfiguration auf niedriger Ebene in neuen Partitionen nach dem Zufallsprinzip verteilt.
Die Datenpartitionierung beeinträchtigt die Anwendungsleistung erheblich: Zu wenige Partitionen begrenzen die Jobparallelität und die Nutzung von Clusterressourcen. Zu viele Partitionen verlangsamen den Job aufgrund zusätzlicher Partitionsverarbeitung und Shuffling.
Partitionen konfigurieren
Die folgenden Attribute bestimmen die Anzahl und Größe Ihrer Partitionen:
spark.sqlfiles.maxPartitionBytes
: Die maximale Größe der Partitionen, wenn Sie Daten aus Cloud Storage lesen. Der Standardwert ist 128 MB. Dieser ist für die meisten Anwendungen, die weniger als 100 TB verarbeiten, ausreichend.spark.sql.shuffle.partitions
: Die Anzahl der Partitionen nach dem Shuffle-Vorgang. Der Standardwert ist 200, was für Cluster mit insgesamt weniger als 100 vCPUs geeignet ist. Empfehlung: Legen Sie dafür den Wert auf das Dreifache der Anzahl von vCPUs in Ihrem Cluster fest.spark.default.parallelism
: Die Anzahl der Partitionen, die nach der Ausführung von RDD-Transformationen zurückgegeben werden, die Shuffles erfordern, z. B.join
,reduceByKey
undparallelize
. Der Standardwert ist die Gesamtzahl der vCPUs in Ihrem Cluster. Wenn Sie RDDs in Spark-Jobs verwenden, können Sie diesen Wert auf das Dreifache Ihrer vCPUs festlegen.
Anzahl der Dateien begrenzen
Wenn Spark eine große Anzahl kleiner Dateien liest, kommt es zu einem Leistungsverlust. Speichern Sie Daten in Dateien, die größer als 100 MB sind. Begrenzen Sie die Anzahl der Ausgabedateien, um einen Shuffle zu erzwingen. Weitere Informationen finden Sie unter unnötige Shuffles vermeiden.
Adaptive Abfrageausführung konfigurieren (Spark 3)
Adaptive Abfrageausführung (standardmäßig in Dataproc-Image-Version 2.0 aktiviert) bietet Verbesserungen der Spark-Jobleistung, darunter:
- Partitionen nach Shuffles zusammenfassen
- Joins der Sortierzusammenführung in Broadcast-Joins umwandeln
- Optimierungen für verzerrte Joins.
Obwohl die Standardkonfigurationseinstellungen für die meisten Anwendungsfälle klingen, kann es sinnvoll sein, spark.sql.adaptive.advisoryPartitionSizeInBytes
auf spark.sqlfiles.maxPartitionBytes
(Standard: 128 MB) zu setzen.
Unnötige Shuffles vermeiden
Mit Spark können Nutzer ein Shuffle manuell auslösen, um ihre Daten mit der Funktion repartition
neu auszubalancieren. Shuffle sind teuer, daher sollten Daten neu gemischt werden. Die entsprechende Konfiguration der Partitionen sollte ausreichen, damit Spark die Daten automatisch partitionieren kann.
Ausnahme: Beim Schreiben von spaltenpartitionierten Daten in Cloud Storage wird beim Neupartitionieren einer bestimmten Spalte vermieden, dass viele kleine Dateien geschrieben werden, um schnellere Schreibzeiten zu erzielen.
df.repartition("col_name").write().partitionBy("col_name").save("gs://...")
Daten in Parquet oder Avro speichern
Spark SQL verwendet standardmäßig Lese- und Schreibzugriff auf Daten in komprimierten Snappy-Parquet-Dateien. Parquet hat ein effizientes Dateiformat, das es Spark ermöglicht, nur die Daten zu lesen, die zum Ausführen einer Anwendung benötigt werden. Dies ist ein wichtiger Vorteil bei der Arbeit mit großen Datasets. Andere Spaltenformate wie Apache ORC sind ebenfalls gut geeignet.
Für nicht spaltenorientierte Daten bietet Apache Avro ein effizientes Dateiformat für binäre Zeilen. Obwohl Avro normalerweise langsamer als Parquet ist, ist die Leistung von Avro-Formaten besser als textbasierte Formate wie CSV oder JSON.
Laufwerkgröße optimieren
Der Durchsatz nichtflüchtiger Speicher hängt von der Laufwerkgröße ab, was sich auf die Leistung des Spark-Jobs auswirken kann, da Jobs Metadaten schreiben und Daten auf das Laufwerk verschieben. Bei Verwendung von nichtflüchtigen Standardspeichern sollte die Laufwerkgröße mindestens 1 Terabyte pro Worker betragen (siehe Leistung nach Größe des nichtflüchtigen Speichers).
So überwachen Sie den Durchsatz der Worker-Laufwerke in der Google Cloud Console:
- Klicken Sie auf der Seite Cluster auf den Clusternamen.
- Klicken Sie auf den Tab VM-INSTANZEN.
- Klicken Sie auf einen beliebigen Worker-Namen.
- Klicken Sie auf den Tab MONITORING und scrollen Sie dann nach unten zum Laufwerkdurchsatz, um den Worker-Durchsatz aufzurufen.
Hinweise zu Laufwerken
Sitzungsspezifische Dataproc-Cluster, die nicht von nichtflüchtigem Speicher profitieren, können lokale SSDs verwenden. Lokale SSDs sind physisch mit dem Cluster verbunden und bieten einen höheren Durchsatz als nichtflüchtige Speicher (siehe Leistungstabelle). Lokale SSDs sind mit einer festen Größe von 375 Gigabyte verfügbar. Sie können jedoch mehrere SSDs hinzufügen, um die Leistung zu erhöhen.
Lokale SSDs speichern keine Daten nach dem Herunterfahren eines Clusters. Wenn nichtflüchtiger Speicher erwünscht ist, können Sie nichtflüchtige SSD-Speicher verwenden, die einen höheren Durchsatz für ihre Größe als nichtflüchtige Standardspeicher bieten. Nichtflüchtige SSD-Speicher sind auch eine gute Wahl, wenn die Partitionsgröße kleiner als 8 KB ist. Vermeiden Sie jedoch kleine Partitionen..
GPUs Ihrem Cluster hinzufügen
Spark 3 unterstützt GPUs. Verwenden Sie GPUs mit der RAPIDS-Initialisierungsaktion, um Spark-Jobs mit dem RAPIDS SQL Accelerator zu beschleunigen. Die GPU-Treiberinitialisierungsaktion zum Konfigurieren eines Clusters mit GPUs.
Häufige Jobfehler und -korrekturen
Nicht genügend Arbeitsspeicher
Beispiele:
- "Lost Executor"
- "java.lang.OutOfMemoryError: GC-Overhead überschritten"
- "Container, der von YARN beendet wurde, um Speicherlimits zu überschreiten"
Mögliche Korrekturen:
- Wenn Sie PySpark verwenden, erhöhen Sie
spark.executor.memoryOverhead
und senken Sie den Wertspark.executor.memory
. - Verwenden Sie Maschinentypen mit großem Arbeitsspeicher.
- Verwenden Sie kleinere Partitionen.
Shuffle-Abruffehler
Beispiele:
- "FetchFailedException" (Spark-Fehler)
- "Verbindung konnte nicht hergestellt werden..." (Spark-Fehler)
- "Fehler beim Abrufen" (MapReduce-Fehler)
Normalerweise verursacht durch das vorzeitige Entfernen von Workern, die noch Shuffle-Daten für die Bereitstellung haben.
Mögliche Ursachen und Lösungen:
- Worker-VMs auf Abruf wurden zurückgefordert oder nicht auf Abruf verwendete Worker-VMs wurden vom Autoscaling entfernt. Lösung: Verwenden Sie den Enhanced Flexibility Mode, um sekundäre Worker sicher auf Abruf oder skalierbar zu machen.
- Der Executor oder Mapper ist aufgrund eines OutOfMemory-Fehlers abgestürzt. Lösung: Erhöhen Sie den Arbeitsspeicher des Executors oder des Mappers.
- Der Spark-Shuffle-Dienst ist möglicherweise überlastet. Lösung: Reduzieren Sie die Anzahl der Jobpartitionen.
YARN-Knoten sind FEHLERHAFT
Beispiele (aus YARN-Logs):
...reported UNHEALTHY with details: 1/1 local-dirs usable space is below
configured utilization percentage/no more usable space
[ /hadoop/yarn/nm-local-dir : used space above threshold of 90.0% ]
Häufig mit unzureichendem Speicherplatz für Shuffle-Daten verbunden. Diagnose durch Aufrufen der Logdateien:
- Öffnen Sie in der Google Cloud Console die Seite Cluster für Ihr Projekt und klicken Sie auf den Namen des Clusters.
- Klicken Sie auf LOGS ANSEHEN.
- Logs nach
hadoop-yarn-nodemanager
filtern. - Suchen Sie nach "FEHLERHAFT".
Lösung: Wenn das Log den Status "FEHLERHAFT" meldet, erstellen Sie den Cluster mit größerem Speicherplatz neu. Dadurch wird die Durchsatzobergrenze erhöht.