Parallele Verarbeitung für JOIN-Vorgänge

Auf dieser Seite wird die Leistungsoptimierung für JOIN-Vorgänge in Cloud Data Fusion erläutert.

JOIN-Vorgänge können der teuerste Teil einer Pipeline sein. Wie alles andere in einer Pipeline werden Vorgänge parallel ausgeführt. Im ersten Schritt einer JOIN werden die Daten so gemischt, dass jeder Datensatz mit demselben JOIN-Schlüssel an denselben Executor gesendet wird. Nachdem alle Daten gemischt wurden, werden sie zusammengeführt und die Ausgabe wird durch die Pipeline weitergeleitet.

Beispiel für die parallele Verarbeitung bei JOIN-Vorgängen

Angenommen, Sie führen einen JOIN-Vorgang auf den Datasets Purchases und Items aus. Jeder Kaufeintrag enthält den Namen des Artikels und die Anzahl der gekauften Artikel. Jeder Artikeldatensatz enthält den Namen und den Preis des Artikels. Mit JOIN wird der Artikelname ausgewertet, um den Gesamtpreis jedes Kaufs zu berechnen. Beim Zusammenführen der Daten werden sie im Cluster so neu angeordnet, dass Datensätze mit derselben ID auf demselben Executor landen.

Wenn die JOIN-Schlüssel relativ gleichmäßig verteilt sind, sind JOIN-Vorgänge leistungsstark, da sie parallel ausgeführt werden können.

Wie bei jeder Zufallsmix-Funktion wirkt sich eine Datenverzerrung negativ auf die Leistung aus. Im vorherigen Beispiel werden Eier viel häufiger gekauft als Hühner oder Milch. Das bedeutet, dass der Ausführende, der an den Eierkäufen teilnimmt, mehr Arbeit hat als die anderen Ausführenden. Wenn Sie feststellen, dass ein JOIN verzerrt ist, gibt es zwei Möglichkeiten, die Leistung zu verbessern.

Schiefe Partitionen automatisch aufteilen

Bei der adaptiven Abfrageausführung werden sehr starke Abweichungen automatisch verarbeitet. Sobald bei einer JOIN einige Partitionen viel größer als andere sind, werden sie in kleinere aufgeteilt. Informationen dazu, ob die adaptive Abfrageausführung aktiviert ist, finden Sie unter Autotuning.

In-Memory-JOIN verwenden

Eine JOIN-Berechnung im Arbeitsspeicher ist möglich, wenn eine Seite der JOIN klein genug ist, um im Arbeitsspeicher zu passen. In diesem Fall wird der kleine Datensatz in den Arbeitsspeicher geladen und dann an alle Executor gesendet. Der große Datensatz wird gar nicht gemischt, sodass die ungleichmäßigen Partitionen, die beim Zufallsmix mit dem Schlüssel JOIN generiert werden, entfernt werden.

Im vorherigen Beispiel wird das Dataset „items“ zuerst in den Arbeitsspeicher des Spark-Treibers geladen. Sie wird dann an jeden Ausführenden gesendet. Testamentsvollstrecker können jetzt die Daten zusammenführen, ohne den Kaufdatensatz zu mischen.

Bei diesem Ansatz müssen Sie sowohl dem Spark-Treiber als auch den Executors genügend Arbeitsspeicher zuweisen, damit sie das Broadcast-Dataset im Arbeitsspeicher speichern können. Standardmäßig reserviert Spark etwas weniger als 30% seines Arbeitsspeichers für die Speicherung dieser Art von Daten. Wenn Sie In-Memory-JOINs verwenden, multiplizieren Sie die Größe des Datensatzes mit vier und legen Sie diesen Wert als Executor- und Treiberspeicher fest. Wenn der Datensatz „Artikel“ beispielsweise 1 GB groß ist, müssen wir den Arbeitsspeicher des Executors und des Treibers auf mindestens 4 GB festlegen. Datasets mit mehr als 8 GB können nicht in den Arbeitsspeicher geladen werden.

Schlüsselverteilung

Wenn beide Seiten des JOIN zu groß sind, um in den Arbeitsspeicher zu passen, kann ein anderes Verfahren verwendet werden, um jeden JOIN-Schlüssel in mehrere Schlüssel aufzuteilen, um die Parallelität zu erhöhen. Diese Technik kann auf INNER JOIN- und LEFT OUTER JOIN-Vorgänge angewendet werden. Sie kann nicht für FULL OUTER JOIN-Vorgänge verwendet werden.

Bei diesem Ansatz wird die verzerrte Seite mit einer neuen Ganzzahlspalte mit einer Zufallszahl zwischen 1 und N gesalzt. Die nicht verzerrte Seite wird explodiert, wobei für jede vorhandene Zeile N neue Zeilen generiert werden. Der explodierten Seite wird eine neue Spalte hinzugefügt, die mit den Zahlen 1 bis N ausgefüllt ist. Anschließend wird eine normale JOIN ausgeführt, mit der Ausnahme, dass die neue Spalte als Teil des JOIN-Schlüssels hinzugefügt wird. So werden alle Daten, die zuvor in eine einzelne Partition geschrieben wurden, jetzt auf bis zu N verschiedene Partitionen verteilt.

Im vorherigen Beispiel ist der Verteilungsfaktor N auf 3 festgelegt. Die ursprünglichen Datensätze sind links zu sehen. Die gesalzene und die explodierte Version des Datensatzes werden in der Mitte angezeigt. Die zufälligen Daten werden rechts angezeigt. Dabei sind an den Eierkäufen drei statt nur ein Ausführender beteiligt.

Eine größere Parallelität wird durch eine Erhöhung der Verteilungen erreicht. Dies hat jedoch zur Folge, dass eine Seite der JOIN explodiert, was dazu führt, dass mehr Daten im Cluster verschoben werden. Daher nimmt der Vorteil mit zunehmender Verbreitung ab. Legen Sie in den meisten Fällen einen Wert von 20 oder weniger fest.

Nächste Schritte