Pipelines werden auf Clustern von Maschinen ausgeführt. Sie erreichen einen hohen Durchsatz, indem sie die zu erledigende Arbeit aufteilen und dann parallel auf den verschiedenen Executoren im Cluster ausführen. Im Allgemeinen gilt: Je mehr Teilungen (auch Partitionen genannt) vorhanden sind, desto schneller kann die Pipeline ausgeführt werden. Das Maß der Parallelität in Ihrer Pipeline wird von den Quellen und Zufallsmix-Phasen in der Pipeline bestimmt.
Quellen
Zu Beginn jedes Pipelinelaufs wird für jede Quelle in Ihrer Pipeline berechnet, welche Daten gelesen werden müssen und wie diese Daten in Teilmengen aufgeteilt werden können. Angenommen, Sie haben eine einfache Pipeline, die Daten aus Cloud Storage liest, einige Wrangler-Transformationen durchführt und dann wieder in Cloud Storage schreibt.
Wenn die Pipeline gestartet wird, prüft die Cloud Storage-Quelle die Eingabedateien und teilt sie basierend auf der Dateigröße auf. So kann eine einzelne Gigabytedatei beispielsweise in 100 Teile mit jeweils 10 MB aufgeteilt werden. Jeder Executor liest die Daten für diese Teilung, führt die Wrangler-Transformationen aus und schreibt die Ausgabe in eine part-Datei.
Wenn Ihre Pipeline langsam läuft, sollten Sie als Erstes prüfen, ob Ihre Quellen genügend Verzweigungen erstellen, um die Parallelität voll auszuschöpfen. Beispielsweise können einige Komprimierungsarten dazu führen, dass Klartextdateien nicht aufgeteilt werden können. Wenn Sie GZIP-komprimierte Dateien lesen, ist Ihre Pipeline möglicherweise viel langsamer als beim Lesen unkomprimierter Dateien oder mit BZIP komprimierter Dateien (die sich aufteilen lassen). Wenn Sie die Datenbankquelle verwenden und sie so konfiguriert haben, dass nur eine einzige Aufteilung verwendet wird, ist sie viel langsamer als wenn Sie sie so konfigurieren, dass mehr Aufteilungen verwendet werden.
Zufallsmixe
Bestimmte Arten von Plug-ins führen dazu, dass Daten im Cluster neu angeordnet werden. Das passiert, wenn Datensätze, die von einem Executor verarbeitet werden, an einen anderen Executor gesendet werden müssen, um die Berechnung auszuführen. Zufallsmixe sind aufwendige Vorgänge, da sie viele E/A-Vorgänge erfordern. Plug-ins, die zu einer Datenmixtur führen, werden im Pipeline Studio im Bereich Analytics angezeigt. Dazu gehören Plug-ins wie „Nach Gruppierung“, „Entduplizieren“, „Eindeutig“ und „Joiner“. Angenommen, der Pipeline im vorherigen Beispiel wird die Phase Nach hinzugefügt.
Angenommen, die gelesenen Daten stellen Käufe in einem Lebensmittelgeschäft dar.
Jeder Datensatz enthält ein item
-Feld und ein num_purchased
-Feld. In der Phase Nach konfigurieren wir die Pipeline so, dass Einträge nach dem Feld item
gruppiert und die Summe des Felds num_purchased
berechnet wird.
Wenn die Pipeline ausgeführt wird, werden die Eingabedateien wie oben beschrieben aufgeteilt. Anschließend werden alle Datensätze im Cluster neu angeordnet, sodass jeder Datensatz mit demselben Element demselben Executor zugewiesen ist.
Wie im vorherigen Beispiel gezeigt, waren Einträge für Apfelkäufe ursprünglich auf mehrere Testamentsvollstrecker verteilt. Für die Aggregation mussten alle diese Datensätze über den Cluster an denselben Executor gesendet werden.
Bei den meisten Plugins, für die ein Zufallsmix erforderlich ist, kannst du die Anzahl der Partitionen angeben, die beim Zufallsmix verwendet werden sollen. Damit wird festgelegt, wie viele Executors zur Verarbeitung der zufällig vermischten Daten verwendet werden.
Wenn im Beispiel oben die Anzahl der Partitionen auf 2
festgelegt ist, berechnet jeder Executor Aggregate für zwei Elemente anstelle von einem.
Beachten Sie, dass Sie die Parallelität Ihrer Pipeline nach dieser Phase verringern können. Betrachten wir beispielsweise die logische Ansicht der Pipeline:
Wenn die Quelle die Daten auf 500 Partitionen verteilt, die Gruppierung nach aber mit 200 Partitionen gemischt wird, sinkt die maximale Parallelisierungsstufe nach der Gruppierung von 500 auf 200. Anstatt 500 verschiedener Teildateien, die in Cloud Storage geschrieben werden, haben Sie nur 200.
Partitionen auswählen
Wenn die Anzahl der Partitionen zu niedrig ist, wird nicht die volle Kapazität Ihres Clusters genutzt, um so viel Arbeit wie möglich parallel auszuführen. Wenn Sie die Partitionen zu groß festlegen, erhöht sich der unnötige Overhead. Im Allgemeinen ist es besser, zu viele Partitionen zu verwenden als zu wenige. Zusätzlicher Overhead ist ein Problem, wenn die Ausführung Ihrer Pipeline einige Minuten dauert und Sie versuchen, ein paar Minuten einzusparen. Wenn die Ausführung Ihrer Pipeline mehrere Stunden dauert, müssen Sie sich in der Regel keine Gedanken über den Overhead machen.
Eine nützliche, aber zu einfache Möglichkeit, die Anzahl der zu verwendenden Partitionen zu bestimmen, besteht darin, sie auf max(cluster CPUs, input records / 500,000)
festzulegen. Teilen Sie dazu die Anzahl der Eingabeeinträge durch 500.000. Wenn diese Zahl höher als die Anzahl der Cluster-CPUs ist, verwenden Sie diese für die Anzahl der Partitionen.
Andernfalls verwenden Sie die Anzahl der Cluster-CPUs. Wenn Ihr Cluster beispielsweise 100 CPUs hat und die Zufallsmixphase voraussichtlich 100 Millionen Eingabedatensätze enthält, verwenden Sie 200 Partitionen.
Eine vollständigere Antwort ist, dass Zufallsmixe die beste Leistung erzielen, wenn die Zwischendaten für jede Partition vollständig in den Arbeitsspeicher eines Executors passen, sodass nichts auf die Festplatte ausgelagert werden muss. Spark reserviert knapp 30% des Arbeitsspeichers eines Executors für das Speichern von Zufallsmix-Daten. Die genaue Zahl lautet (Gesamtspeicher – 300 MB) * 30%. Angenommen, jeder Executor soll 2 GB Arbeitsspeicher verwenden, bedeutet das, dass jede Partition maximal (2 GB – 300 MB) * 30% = etwa 500 MB Datensätze aufnehmen sollte. Angenommen, jeder Datensatz wird auf 1 KB komprimiert,bedeutet das (500 MB / Partition) ÷ (1 KB / Datensatz) = 500.000 Datensätze pro Partition. Wenn Ihre Executors mehr Arbeitsspeicher verbrauchen oder Ihre Datensätze kleiner sind, können Sie diese Zahl entsprechend anpassen.
Datenverzerrung
Beachten Sie, dass im vorherigen Beispiel die Käufe für verschiedene Artikel gleichmäßig verteilt waren. Das heißt, es wurden jeweils drei Äpfel, Bananen, Karotten und Eier gekauft. Das Zufallsmixen nach einem gleichmäßig verteilten Schlüssel ist die leistungsstärkste Art des Zufallsmixens, aber viele Datensätze haben diese Eigenschaft nicht. Wenn Sie beim Beispiel mit dem Supermarkteinkauf im vorherigen Abschnitt bleiben, würden Sie erwarten, dass Sie viel mehr Eier als Hochzeitskarten verkaufen. Wenn einige Zufallsschlüssel viel häufiger vorkommen als andere, handelt es sich um verzerrte Daten. Bei verzerrten Daten kann die Leistung deutlich schlechter sein als bei nicht verzerrten Daten, da ein unverhältnismäßig großer Arbeitsaufwand von einer Handvoll Ausführern erledigt wird. Dadurch sind einige Partitionen viel größer als alle anderen.
In diesem Beispiel gibt es fünfmal so viele Eierkäufe wie Kartenkäufe. Das bedeutet, dass die Berechnung des Eier-Aggregats etwa fünfmal länger dauert. Das spielt bei nur zehn statt zwei Einträgen keine große Rolle, aber bei fünf Milliarden statt einer Milliarde Einträgen macht es einen großen Unterschied. Bei Datenabweichungen hat die Anzahl der Partitionen, die in einem Shuffle verwendet werden, keinen großen Einfluss auf die Pipelineleistung.
Sie können Abweichungen in den Daten erkennen, indem Sie das Diagramm auf Ausgabeeinträge im Zeitverlauf untersuchen. Wenn die Phase zu Beginn des Pipelinelaufs Datensätze viel schneller ausgibt und dann plötzlich langsamer wird, kann das auf verzerrte Daten hinweisen.
Sie können Datenverzerrungen auch erkennen, indem Sie die Clusterspeichernutzung im Zeitverlauf untersuchen. Wenn Ihr Cluster eine Zeit lang ausgelastet ist, aber plötzlich über einen längeren Zeitraum eine geringe Speichernutzung aufweist, ist dies ebenfalls ein Anzeichen für einen Datenskew.
Verzerrte Daten wirken sich am stärksten auf die Leistung aus, wenn eine Zusammenführung durchgeführt wird. Es gibt einige Methoden, mit denen sich die Leistung bei ungleichmäßigen Zusammenführungen verbessern lässt. Weitere Informationen finden Sie unter Parallele Verarbeitung für JOIN
-Vorgänge.
Adaptive Optimierung für die Ausführung
Wenn Sie die Ausführung adaptiv anpassen möchten, geben Sie den Bereich der zu verwendenden Partitionen an, nicht die genaue Partitionsnummer. Die genaue Partitionsnummer wird ignoriert, wenn die adaptive Ausführung aktiviert ist, auch wenn sie in der Pipelinekonfiguration festgelegt wurde.
Wenn Sie einen sitzungsspezifischen Dataproc-Cluster verwenden, wird die richtige Konfiguration in Cloud Data Fusion automatisch festgelegt. Bei statischen Dataproc- oder Hadoop-Clustern können die folgenden beiden Konfigurationsparameter festgelegt werden:
spark.default.parallelism
: Legen Sie sie auf die Gesamtzahl der im Cluster verfügbaren vCores fest. So wird sichergestellt, dass Ihr Cluster nicht unterlastet ist, und die Untergrenze für die Anzahl der Partitionen wird definiert.spark.sql.adaptive.coalescePartitions.initialPartitionNum
: Legen Sie den Wert auf das 32-Fache der Anzahl der im Cluster verfügbaren vCores fest. Dies definiert die Obergrenze für die Anzahl der Partitionen.Spark.sql.adaptive.enabled
: Wenn Sie die Optimierungen aktivieren möchten, setzen Sie diesen Wert auftrue
. In Dataproc wird sie automatisch festgelegt. Wenn Sie jedoch generische Hadoop-Cluster verwenden, müssen Sie dafür sorgen, dass sie aktiviert ist.
Diese Parameter können in der Engine-Konfiguration einer bestimmten Pipeline oder in den Clustereigenschaften eines statischen Dataproc-Clusters festgelegt werden.