Pipelines werden auf Clustern von Maschinen ausgeführt. Sie erzielen einen hohen Durchsatz, Zu erledigende Arbeit aufteilen und dann parallel auf den mehreren Executors im Cluster. Im Allgemeinen gilt: Je mehr Teilungen (auch Partitionen genannt) vorhanden sind, desto schneller kann die Pipeline ausgeführt werden. Der Grad der Parallelität in Ihrer Pipeline wird von den Quellen und Zufallsmix-Phasen in der Pipeline bestimmt.
Quellen
Zu Beginn jeder Pipelineausführung berechnet jede Quelle in Ihrer Pipeline, was Daten gelesen werden müssen und wie diese Daten in Aufteilungen aufgeteilt werden können. Angenommen, Sie haben eine einfache Pipeline, die Daten aus Cloud Storage liest, einige Wrangler-Transformationen durchführt und dann wieder in Cloud Storage schreibt.
Beim Start der Pipeline prüft die Cloud Storage-Quelle die Eingabe. Dateien und unterteilt sie je nach Größe in Splits. Beispiel: Eine einzelne Gigabyte-Datei kann in 100 Splits aufgeteilt werden, die jeweils 10 MB in Größe. Jeder Executor liest die Daten für diesen Split und führt den Wrangler aus. Transformationen und schreibt die Ausgabe dann in eine part-Datei.
Wenn Ihre Pipeline langsam läuft, sollten Sie als Erstes prüfen, ob Ihre Quellen genügend Verzweigungen erstellen, um die Parallelität voll auszuschöpfen. Zum Beispiel machen einige Komprimierungstypen Klartextdateien nicht teilbar. Wenn Sie GZIP-komprimierte Dateien lesen, ist Ihre Pipeline möglicherweise viel langsamer als beim Lesen von unkomprimierten Dateien oder von mit BZIP komprimierten Dateien (die sich aufteilen lassen). Wenn Sie die Methode und sie so konfiguriert haben, dass sie nur einen Split verwendet, als wenn Sie es für die Nutzung von mehr Splits konfigurieren.
Zufallsmixe
Bestimmte Arten von Plug-ins bewirken, dass Daten im Cluster nach dem Zufallsprinzip angeordnet werden. Das passiert, wenn Datensätze, die von einem Executor verarbeitet werden, an einen anderen Executor gesendet werden müssen, um die Berechnung auszuführen. Shuffles sind teure Vorgänge, Sie sind mit vielen E/A-Vorgängen verbunden. Plug-ins, die eine Shuffle-Verarbeitung von Daten verursachen, werden alle in im Abschnitt Analytics von Pipeline Studio. Dazu gehören Plug-ins wie „Gruppieren nach“, „Deduplizieren“, „Differenzen“ und „Zusammenfügen“. Beispiel: Ein Gruppieren nach- Phase wurde der Pipeline im vorherigen Beispiel hinzugefügt.
Nehmen wir außerdem an, dass die gelesenen Daten Käufe in einem Lebensmittelgeschäft darstellen.
Jeder Eintrag enthält die Felder item
und num_purchased
. In der Gruppe
Nach Phase konfigurieren wir die Pipeline so, dass Datensätze im Feld item
gruppiert werden und
Summe des Felds num_purchased
berechnen.
Wenn die Pipeline ausgeführt wird, werden die Eingabedateien wie oben beschrieben aufgeteilt. Anschließend werden alle Datensätze im Cluster neu angeordnet, sodass jeder Datensatz mit demselben Element demselben Executor zugewiesen ist.
Wie im vorherigen Beispiel veranschaulicht, waren die Datensätze für den Apfel-Kauf die ursprünglich auf mehrere Executors verteilt waren. Um die Aggregation durchzuführen, dieser Datensätze über den Cluster an denselben Executor gesendet werden müssen.
Bei den meisten Plugins, für die ein Zufallsmix erforderlich ist, kannst du die Anzahl der Partitionen angeben, die beim Zufallsmix verwendet werden sollen. Damit wird gesteuert, wie viele Executors verarbeitet werden.
Wenn im vorherigen Beispiel die Anzahl der Partitionen auf 2
gesetzt ist, berechnet jeder Executor Aggregatwerte für zwei Elemente anstelle von einem.
Beachten Sie, dass es danach möglich ist, die Parallelität Ihrer Pipeline zu verringern. . Betrachten Sie beispielsweise die logische Ansicht der Pipeline:
Wenn die Quelle die Daten auf 500 Partitionen verteilt, die Gruppierung nach aber mit 200 Partitionen gemischt wird, sinkt die maximale Parallelisierungsstufe nach der Gruppierung von 500 auf 200. Anstatt 500 verschiedene Teiledateien zu schreiben, Cloud Storage haben Sie nur 200.
Partitionen auswählen
Wenn die Anzahl der Partitionen zu gering ist, nutzen Sie nicht die volle Kapazität von um so viel Arbeit wie möglich zu parallelisieren. Partitionen auch festlegen erhöht den unnötigen Overhead. Im Allgemeinen ist es besser, zu viele Partitionen zu verwenden als zu wenige. Zusätzlicher Overhead ist ein Problem, wenn die Ausführung Ihrer Pipeline einige Minuten dauert und Sie versuchen, ein paar Minuten einzusparen. Wenn die Ausführung Ihrer Pipeline Stunden dauert, ist der Aufwand in der Regel nicht über den Sie sich Sorgen machen müssen.
Eine nützliche, aber zu vereinfachte Methode, die Anzahl der Partitionen zu bestimmen,
legen Sie sie auf max(cluster CPUs, input records / 500,000)
fest. In anderen
Wörter, nehmen wir die Anzahl der Eingabedatensätze und dividieren durch 500.000. Wenn diese Zahl gleich
die größer als die Anzahl der Cluster-CPUs ist, verwenden Sie diese für die Anzahl der Partitionen.
Andernfalls verwenden Sie die Anzahl der Cluster-CPUs. Wenn Ihr Cluster beispielsweise 100 CPUs hat und die Zufallsmixphase voraussichtlich 100 Millionen Eingabedatensätze enthält, verwenden Sie 200 Partitionen.
Eine umfassendere Antwort ist, dass die Zufallswiedergabe am besten funktioniert, wenn die Mittelstufe für jede Partition ganz in den Arbeitsspeicher eines Executors passen, dass nichts auf das Laufwerk übergegeben werden muss. Spark reserviert fast 30% eines Arbeitsspeicher des Executors für die Speicherung von Shuffle-Daten. Die genaue Zahl lautet (Gesamtspeicher – 300 MB) * 30 %. Angenommen, jeder Executor soll 2 GB Arbeitsspeicher verwenden, bedeutet das, dass jede Partition maximal (2 GB – 300 MB) * 30 % = etwa 500 MB Datensätze aufnehmen sollte. Wenn wir davon ausgehen, dass jeder Datensatz auf 1 KB groß ist, bedeutet dies (500 MB / Partition) / (1 KB / record) = 500.000 Datensätze pro Partition. Wenn Ihre Executors oder Ihre Datensätze kleiner sind, können Sie diese Zahl entsprechend anpassen.
Datenverzerrung
Beachten Sie, dass im vorherigen Beispiel die Käufe verschiedener Artikel gleichmäßig verteilt waren. verteilt sind. Das heißt, es gab jeweils drei Käufe für Äpfel, Bananen, Karotten und Eier. Die nach dem Shuffing mit einem gleichmäßig verteilten Schlüssel ist am leistungsfähigsten Shuffle-Typ, aber viele Datasets haben dieses Attribut nicht. Wenn Sie beim Beispiel mit dem Supermarkteinkauf im vorherigen Abschnitt bleiben, würden Sie erwarten, dass es viel mehr Käufe von Eiern als von Hochzeitskarten gibt. Bei einigen Zufallsmix die wesentlich häufiger als andere Schlüssel sind, Daten. Bei verzerrten Daten kann die Leistung deutlich schlechter sein als bei nicht verzerrten Daten, da ein unverhältnismäßig großer Arbeitsaufwand von einer Handvoll Ausführern erledigt wird. Dadurch sind einige Partitionen viel größer als alle anderen.
In diesem Beispiel gibt es fünfmal so viele Eierkäufe wie Kartenkäufe. Das bedeutet, dass die Berechnung des Eieraggregats etwa fünfmal länger dauert. Es spielt bei 10 statt zwei Datensätzen keine große Rolle, macht einen großen Unterschied, wenn es bei fünf Milliarden Datensätzen Milliarden. Bei Datenabweichungen hat die Anzahl der Partitionen, die in einem Shuffle verwendet werden, keinen großen Einfluss auf die Pipelineleistung.
Sie können Abweichungen erkennen, indem Sie im Diagramm nach Ausgabeeinträgen im Zeitverlauf suchen. Gibt die Phase zu Beginn des Ereignisses Datensätze mit viel höherer Geschwindigkeit aus? und dann plötzlich langsamer wird, kann dies auf verzerrte Daten hinweisen.
Sie können die Datenverzerrung auch erkennen, indem Sie die Arbeitsspeichernutzung des Clusters im Zeitverlauf untersuchen. Wenn Ihr Cluster ist für einige Zeit ausgelastet, aber die Arbeitsspeichernutzung ist plötzlich Zeiträume, ist dies auch ein Zeichen dafür, dass Sie mit Datenverzerrungen zu tun haben.
Verzerrte Daten wirken sich am stärksten auf die Leistung aus, wenn ein Join ausgeführt wird
durchgeführt wurde. Es gibt einige Techniken, mit denen Sie die Leistung verbessern können.
für verzerrte Joins. Weitere Informationen finden Sie unter Parallele Verarbeitung für JOIN
-Vorgänge.
Adaptive Abstimmung für die Ausführung
Wenn Sie die Ausführung anpassen möchten, geben Sie den Partitionsbereich an, der verwendet werden soll, nicht den exakte Partitionsnummer ein. Die genaue Partitionsnummer, auch wenn sie in der Pipeline festgelegt wurde Konfiguration, wird ignoriert, wenn die adaptive Ausführung aktiviert ist.
Wenn Sie einen sitzungsspezifischen Dataproc-Cluster verwenden, wird die richtige Konfiguration in Cloud Data Fusion automatisch festgelegt. Bei statischen Dataproc- oder Hadoop-Clustern können die folgenden beiden Konfigurationsparameter festgelegt werden:
spark.default.parallelism
: Legen Sie sie auf die Gesamtzahl der im Cluster verfügbaren vCores fest. So wird sichergestellt, dass Ihr Cluster nicht unterlastet ist, und die Untergrenze für die Anzahl der Partitionen wird definiert.spark.sql.adaptive.coalescePartitions.initialPartitionNum
: Legen Sie den Wert auf das 32-Fache der Anzahl der im Cluster verfügbaren vCores fest. Dies definiert die Obergrenze für die Anzahl der Partitionen.Spark.sql.adaptive.enabled
: Um die Optimierungen zu aktivieren, setzen Sie diesen Wert auftrue
. In Dataproc wird sie automatisch festgelegt. Wenn Sie jedoch generische Hadoop-Cluster verwenden, müssen Sie dafür sorgen, dass sie aktiviert ist.
Diese Parameter können in der Engine-Konfiguration einer bestimmten Pipeline oder in den Clustereigenschaften eines statischen Dataproc-Clusters festgelegt werden.