Parallele Verarbeitung für JOIN-Vorgänge

Auf dieser Seite wird die Leistungsoptimierung für JOIN-Vorgänge in Cloud Data Fusion erläutert.

JOIN-Vorgänge können der teuerste Teil einer Pipeline sein. Wie alles andere in einer Pipeline werden auch Vorgänge parallel ausgeführt. Der erste Schritt einer JOIN besteht darin, die Daten so zu mischen, dass jeder Datensatz mit demselben JOIN-Schlüssel an denselben Executor gesendet wird. Nachdem alle Daten nach dem Zufallsprinzip umverteilt wurden, werden sie zusammengeführt und die Ausgabe durch die Pipeline fortgesetzt.

Beispiel für die parallele Verarbeitung in JOIN-Vorgängen

Angenommen, Sie führen einen JOIN-Vorgang für Datasets mit den Namen Purchases und Items aus. Jeder Kaufdatensatz enthält einen Artikelnamen und eine gekaufte Nummer. Jeder Artikeleintrag enthält den Artikelnamen und den Preis dieses Artikels. JOIN wird für den Artikelnamen ausgeführt, um den Gesamtpreis jedes Kaufs zu berechnen. Beim Zusammenführen der Daten werden die Daten im Cluster nach dem Zufallsprinzip verteilt, sodass Datensätze mit derselben ID im selben Executor landen.

Wenn die JOIN-Schlüssel relativ gleichmäßig verteilt sind, erzielen JOIN-Vorgänge eine gute Leistung, da sie parallel ausgeführt werden können.

Wie bei jedem Shuffle wirkt sich die Datenverzerrung negativ auf die Leistung aus. Im obigen Beispiel werden Eier viel häufiger gekauft als Hühnchen oder Milch. Das bedeutet, dass der Executor, der den Eierkäufen hinzufügt, mehr Arbeit erledigt als die anderen Executors. Wenn Sie feststellen, dass ein JOIN verzerrt ist, gibt es zwei Möglichkeiten, die Leistung zu verbessern.

Schiefe Partitionen automatisch aufteilen

Mit der adaptiven Abfrageausführung werden wirklich starke Abweichungen automatisch verarbeitet. Sobald ein JOIN einige Partitionen erzeugt, die viel größer als andere sind, werden sie in kleinere Partitionen aufgeteilt. Unter Automatische Abstimmung können Sie prüfen, ob die adaptive Abfrageausführung aktiviert ist.

Speicherinterne JOIN verwenden

Ein speicherinterner JOIN kann ausgeführt werden, wenn eine Seite von JOIN klein genug ist, um in den Speicher zu passen. In diesem Fall wird das kleine Dataset in den Arbeitsspeicher geladen und dann an jeden Executor gesendet. Das große Dataset wird überhaupt nicht nach dem Zufallsprinzip umverteilt. Dadurch werden die ungleichmäßigen Partitionen entfernt, die beim Verschieben des Schlüssels JOIN nach dem Zufallsprinzip generiert werden.

Im vorherigen Beispiel wird das Element-Dataset zuerst in den Arbeitsspeicher des Spark-Treibers geladen. Sie wird dann an jeden Executor gesendet. Executors können die Daten jetzt zusammenführen, ohne das Kauf-Dataset nach dem Zufallsprinzip zu verteilen.

Bei diesem Ansatz müssen Sie sowohl dem Spark-Treiber als auch den Executors genügend Arbeitsspeicher zur Verfügung stellen, damit diese das Broadcast-Dataset im Arbeitsspeicher speichern können. Standardmäßig reserviert Spark etwas weniger als 30% seines Arbeitsspeichers für die Speicherung dieser Art von Daten. Wenn Sie speicherinterne JOINs verwenden, multiplizieren Sie die Größe des Datasets mit vier und legen Sie diese als Executor und Treiberspeicher fest. Wenn das Element-Dataset beispielsweise 1 GB groß war, müssten wir den Executor und den Treiberarbeitsspeicher auf mindestens 4 GB festlegen. Datasets, die größer als 8 GB sind, können nicht in den Arbeitsspeicher geladen werden.

Schlüsselverteilung

Wenn beide Seiten von JOIN zu groß sind, um in den Speicher zu passen, kann jeder JOIN-Schlüssel mit einer anderen Methode in mehrere Schlüssel aufgeteilt werden, um den Grad der Parallelität zu erhöhen. Dieses Verfahren kann auf INNER JOIN- und LEFT OUTER JOIN-Vorgänge angewendet werden. Es kann nicht für FULL OUTER JOIN-Vorgänge verwendet werden.

Bei diesem Ansatz wird ein Salt für die schiefe Seite mit einer neuen Ganzzahlspalte mit einer Zufallszahl von 1 bis N eingefügt. Die nicht geneigte Seite wird explodiert, wobei jede vorhandene Zeile N neue Zeilen generiert. Auf der explodierten Seite wird eine neue Spalte hinzugefügt, die jede Zahl von 1 bis N enthält. Anschließend wird ein normaler JOIN-Vorgang ausgeführt, mit dem Unterschied, dass die neue Spalte als Teil des JOIN-Schlüssels hinzugefügt wird. Auf diese Weise werden alle Daten, die früher in eine Partition gingen, auf bis zu N verschiedene Partitionen verteilt.

Im vorherigen Beispiel ist der Verteilungsfaktor N auf 3 festgelegt. Die ursprünglichen Datasets werden links angezeigt. In der Mitte sind die Versionen des Datasets mit Salt und Explodierung dargestellt. Rechts sind die gemischten Daten zu sehen, wobei statt einer die Eierkäufe von drei verschiedenen Executors zusammengeführt werden.

Eine größere Parallelität wird durch zunehmende Verteilungen erreicht. Allerdings wird dadurch eine Seite von JOIN explodiert, was dazu führt, dass mehr Daten im Cluster nach dem Zufallsprinzip umverteilt werden. Daher sinkt der Nutzen mit zunehmender Verteilung. In den meisten Fällen sollten Sie den Wert auf 20 oder weniger einstellen.

Nächste Schritte