Parallelverarbeitung

Pipelines werden auf Clustern von Maschinen ausgeführt. Sie erzielen einen hohen Durchsatz, indem sie die zu erledigende Arbeit aufteilen und dann die Arbeit parallel auf den mehreren Executors ausführen, die über den Cluster verteilt sind. Im Allgemeinen gilt: Je größer die Anzahl der Splits (auch als Partitionen bezeichnet) ist, desto schneller kann die Pipeline ausgeführt werden. Der Grad der Parallelität in der Pipeline wird durch die Quellen und Shuffle-Phasen in der Pipeline bestimmt.

Quellen

Zu Beginn jeder Pipelineausführung berechnet jede Quelle in Ihrer Pipeline, welche Daten gelesen werden müssen und wie diese Daten in Aufteilungen aufgeteilt werden können. Stellen Sie sich beispielsweise eine einfache Pipeline vor, die aus Cloud Storage liest, einige Wrangler-Transformationen durchführt und dann zurück in Cloud Storage schreibt.

Einfache Pipeline mit Cloud Storage-Quelle, Wrangler-Transformation und Cloud Storage-Senke

Beim Start der Pipeline prüft die Cloud Storage-Quelle die Eingabedateien und teilt sie anhand der Dateigrößen in Splits auf. Beispielsweise kann eine einzelne Gigabyte-Datei in 100 Splits mit jeweils 10 MB Größe aufgeteilt werden. Jeder Executor liest die Daten für diesen Split, führt die Wrangler-Transformationen aus und schreibt dann die Ausgabe in eine part-Datei.

Partitionierte Daten in Cloud Storage in parallele Wrangler-Transformationen in Teildateien

Wenn Ihre Pipeline langsam ausgeführt wird, sollten Sie zuerst prüfen, ob Ihre Quellen genügend Aufteilungen erstellen, um die Parallelität optimal zu nutzen. Zum Beispiel machen einige Komprimierungstypen Klartextdateien nicht teilbar. Beim Lesen von mit gzip komprimierten Dateien stellen Sie möglicherweise fest, dass die Pipeline viel langsamer läuft als unkomprimierte Dateien oder mit BZIP komprimierte (teilbare) Dateien. Wenn Sie die Datenbankquelle verwenden und sie so konfiguriert haben, dass nur ein einziger Split verwendet wird, läuft der Vorgang viel langsamer, als wenn Sie sie für die Verwendung weiterer Splits konfigurieren würden.

Zufallsmix

Bestimmte Plug-in-Typen führen dazu, dass Daten im Cluster nach dem Zufallsprinzip angeordnet werden. Dies geschieht, wenn Datensätze, die von einem Executor verarbeitet werden, an einen anderen Executor gesendet werden müssen, um die Berechnung durchzuführen. Shuffle-Vorgänge sind teuer, da sie viel E/A erfordern. Plug-ins, die ein Zufallsmix von Daten verursachen, werden alle im Abschnitt Analytics von Pipeline Studio angezeigt. Dazu gehören Plug-ins wie Group By, DeDuplicate, Distinct und Joiner. Angenommen, der Pipeline im vorherigen Beispiel wird eine Group By-Phase hinzugefügt.

Nehmen wir außerdem an, dass die gelesenen Daten Käufe in einem Lebensmittelgeschäft darstellen. Jeder Eintrag enthält die Felder item und num_purchased. In der Phase Gruppieren nach konfigurieren wir die Pipeline so, dass Datensätze im Feld item gruppiert werden, und berechnen die Summe des Felds num_purchased.

Wenn die Pipeline ausgeführt wird, werden die Eingabedateien wie oben beschrieben aufgeteilt. Danach wird jeder Datensatz im Cluster nach dem Zufallsprinzip verteilt, sodass jeder Datensatz mit demselben Element zum selben Executor gehört.

Wie im vorherigen Beispiel dargestellt, wurden Datensätze für Apfelkäufe ursprünglich auf mehrere Executors verteilt. Für die Aggregation mussten alle diese Datensätze über den Cluster an denselben Executor gesendet werden.

Bei den meisten Plug-ins, für die ein Shuffle erforderlich ist, können Sie die Anzahl der Partitionen angeben, die beim Shuffle der Daten verwendet werden sollen. Damit wird gesteuert, wie viele Executors zum Verarbeiten der Shuffle-Daten verwendet werden.

Wenn im vorherigen Beispiel die Anzahl der Partitionen auf 2 gesetzt ist, berechnet jeder Executor Aggregatwerte für zwei Elemente anstelle von einem.

Beachten Sie, dass es möglich ist, die Parallelität Ihrer Pipeline nach dieser Phase zu verringern. Betrachten Sie zum Beispiel die logische Ansicht der Pipeline:

Wenn die Quelle Daten auf 500 Partitionen teilt, „Group By“ (Gruppieren nach) jedoch mit 200 Partitionen nach dem Zufallsprinzip verteilt, fällt die maximale Parallelität nach „Group By“ (Gruppieren nach) von 500 auf 200 ab. Anstelle von 500 verschiedenen Teildateien, die in Cloud Storage geschrieben werden, sind nur 200 vorhanden.

Partitionen auswählen

Wenn die Anzahl der Partitionen zu gering ist, nutzen Sie nicht die volle Kapazität Ihres Clusters, um so viel Arbeit wie möglich zu parallelisieren. Wenn Sie die Partitionen zu hoch festlegen, erhöht sich der unnötige Aufwand. Im Allgemeinen ist es besser, zu viele Partitionen zu verwenden. Wenn die Ausführung Ihrer Pipeline einige Minuten in Anspruch nimmt und Sie versuchen, ein paar Minuten Zeit zu sparen, ist zusätzlicher Aufwand ein Problem. Wenn die Ausführung Ihrer Pipeline mehrere Stunden dauert, müssen Sie sich im Allgemeinen keine Gedanken mehr um Aufwand machen.

Eine nützliche, aber zu vereinfachte Möglichkeit, die Anzahl der zu verwendenden Partitionen zu bestimmen, besteht darin, sie auf max(cluster CPUs, input records / 500,000) festzulegen. Mit anderen Worten: Nehmen Sie die Anzahl der Eingabedatensätze und dividieren Sie sie durch 500.000. Wenn diese Anzahl größer als die Anzahl der Cluster-CPUs ist, verwenden Sie diese für die Anzahl der Partitionen. Andernfalls verwenden Sie die Anzahl der Cluster-CPUs. Wenn Ihr Cluster beispielsweise 100 CPUs hat und die Shuffle-Phase voraussichtlich 100 Millionen Eingabedatensätze umfasst, verwenden Sie 200 Partitionen.

Eine umfassendere Antwort ist, dass Shuffle-Daten am besten funktionieren, wenn die Zwischen-Shuffle-Daten für jede Partition vollständig in den Arbeitsspeicher eines Executors passen können, sodass nichts an das Laufwerk übergeben werden muss. Spark reserviert knapp 30% des Arbeitsspeichers eines Executors für das Speichern von Shuffle-Daten. Die genaue Zahl lautet (Gesamtarbeitsspeicher - 300 MB) * 30%. Wenn wir davon ausgehen, dass jedes Executor auf die Verwendung von 2 GB Arbeitsspeicher eingestellt ist, bedeutet dies, dass jede Partition nicht mehr als (2 GB – 300 MB) × 30% = ca. 500 MB an Datensätzen enthalten sollte. Wenn wir davon ausgehen, dass jeder Datensatz auf eine Größe von 1 KB komprimiert wird,bedeutet dies (500 MB/Partition) ÷ (1 KB/Datensatz) = 500.000 Datensätze pro Partition. Wenn die Executors mehr Arbeitsspeicher verbrauchen oder Ihre Datensätze kleiner sind, können Sie diese Zahl entsprechend anpassen.

Datenverzerrung

Im vorherigen Beispiel wurden die Käufe verschiedener Artikel gleichmäßig verteilt. Das heißt, es gab jeweils drei Käufe für Äpfel, Bananen, Karotten und Eier. Die Shuffle-Funktion anhand eines gleichmäßig verteilten Schlüssels ist die leistungsstärkste Art des Shuffle. Allerdings haben viele Datasets dieses Attribut nicht. Wenn wir den Lebensmittelkauf im vorherigen Beispiel fortsetzen, müssten Sie davon ausgehen, dass für Eier viel mehr Käufe als für Hochzeitskarten erzielt werden. Wenn es einige Shuffle-Schlüssel gibt, die viel häufiger als andere Schlüssel verwendet werden, haben Sie es mit verzerrten Daten zu tun. Verzerrte Daten können eine erheblich schlechtere Leistung als nicht verzerrte Daten erzielen, da eine überproportionale Menge an Arbeit von einer kleinen Handvoll von Executors ausgeführt wird. Dadurch ist eine kleine Teilmenge von Partitionen viel größer als alle anderen.

In diesem Beispiel gibt es fünfmal so viele Eierkäufe als Kartenkäufe, was bedeutet, dass die Berechnung der Eiermenge etwa fünfmal länger dauert. Es spielt keine große Rolle, wenn es sich um nur 10 Datensätze anstelle von zwei handelt, macht aber einen großen Unterschied, wenn es um fünf Milliarden Datensätze statt mit einer Milliarde geht. Bei einer Datenverzerrung hat die Anzahl der Partitionen, die in einem Shuffle verwendet werden, keinen großen Einfluss auf die Pipelineleistung.

Sie können eine Datenverzerrung erkennen, indem Sie das Diagramm auf Ausgabedatensätze im Zeitverlauf untersuchen. Wenn die Phase zu Beginn der Pipelineausführung Datensätze mit einer viel höheren Geschwindigkeit ausgibt und dann plötzlich langsamer wird, kann dies auf verzerrte Daten hinweisen.

Sie können die Datenverzerrung auch erkennen, indem Sie die Arbeitsspeichernutzung des Clusters im Zeitverlauf untersuchen. Wenn Ihr Cluster eine Zeit lang ausgelastet ist, aber plötzlich für einen bestimmten Zeitraum nur eine geringe Arbeitsspeichernutzung aufweist, ist dies ebenfalls ein Zeichen für eine Datenverzerrung.

Verzerrte Daten wirken sich am stärksten auf die Leistung aus, wenn ein Join ausgeführt wird. Es gibt einige Techniken, mit denen die Leistung bei verzerrten Joins verbessert werden kann. Weitere Informationen finden Sie unter Parallele Verarbeitung für JOIN-Vorgänge.

Adaptive Abstimmung für die Ausführung

Wenn Sie die Ausführung anpassen möchten, geben Sie den zu verwendenden Partitionsbereich und nicht die genaue Partitionsnummer an. Die genaue Partitionsnummer wird ignoriert, auch wenn sie in der Pipelinekonfiguration festgelegt ist, wenn die adaptive Ausführung aktiviert ist.

Wenn Sie einen sitzungsspezifischen Dataproc-Cluster verwenden, legt Cloud Data Fusion automatisch die richtige Konfiguration fest. Für statische Dataproc- oder Hadoop-Cluster können Sie jedoch die nächsten beiden Konfigurationsparameter festlegen:

  • spark.default.parallelism: Legen Sie den Wert auf die Gesamtzahl der im Cluster verfügbaren vCores fest. Dadurch wird sichergestellt, dass Ihr Cluster nicht unterlastet ist, und definiert die Untergrenze für die Anzahl der Partitionen.
  • spark.sql.adaptive.coalescePartitions.initialPartitionNum: Legen Sie den Wert auf das 32-Fache der Anzahl der im Cluster verfügbaren vCores fest. Dies definiert die Obergrenze für die Anzahl der Partitionen.
  • Spark.sql.adaptive.enabled: Setzen Sie diesen Wert auf true, um die Optimierungen zu aktivieren. Dataproc legt ihn automatisch fest. Wenn Sie jedoch generische Hadoop-Cluster verwenden, müssen Sie darauf achten, dass er aktiviert ist.

Diese Parameter können in der Engine-Konfiguration einer bestimmten Pipeline oder in den Clusterattributen eines statischen Dataproc-Clusters festgelegt werden.

Nächste Schritte