Auf dieser Seite wird die Leistungsoptimierung für JOIN
-Vorgänge in Cloud Data Fusion erläutert.
JOIN
-Vorgänge können der teuerste Teil einer Pipeline sein. Wie alles andere in einer Pipeline werden Vorgänge parallel ausgeführt. Im ersten Schritt einer JOIN
werden die Daten so gemischt, dass jeder Datensatz mit demselben JOIN
-Schlüssel an denselben Executor gesendet wird. Nachdem alle Daten nach dem Zufallsprinzip umverteilt wurden, werden sie zusammengeführt und
die Ausgabe durch die Pipeline weiterlaufen.
Beispiel für die parallele Verarbeitung in JOIN
-Vorgängen
Angenommen, Sie führen einen JOIN
-Vorgang auf den Datasets Purchases
und Items
aus. Jeder Kaufeintrag enthält den Namen des Artikels und die Anzahl der gekauften Artikel. Jeder Artikeldatensatz enthält den Namen und den Preis des Artikels. A
JOIN
wird für den Artikelnamen ausgeführt, um den Gesamtpreis der einzelnen Artikel zu berechnen
kaufen. Beim Zusammenführen der Daten werden die Daten im Cluster nach dem Zufallsprinzip verteilt, sodass
Datensätze mit derselben ID enden zum selben Executor.
Wenn die JOIN
-Schlüssel relativ gleichmäßig verteilt sind, werden JOIN
-Vorgänge ausgeführt
weil sie parallel
ausgeführt werden können.
Wie bei jedem Shuffle wirkt sich die Datenverzerrung negativ auf die Leistung aus. Im vorherigen Beispiel werden Eier viel häufiger gekauft als Hühner oder Milch. Das bedeutet, dass der Ausführende, der an den Eierkäufen teilnimmt, mehr Arbeit hat als die anderen Ausführenden. Wenn Sie feststellen, dass ein JOIN
verzerrt ist, gibt es zwei Möglichkeiten, die Leistung zu verbessern.
Schiefe Partitionen automatisch aufteilen
Mit der adaptiven Abfrageausführung werden wirklich starke Abweichungen automatisch verarbeitet.
Sobald bei einer JOIN
einige Partitionen viel größer als andere sind, werden sie in kleinere aufgeteilt. Informationen dazu, ob die adaptive Abfrageausführung aktiviert ist, finden Sie unter Autotuning.
Speicherinterne JOIN
verwenden
Eine speicherinterne JOIN
kann ausgeführt werden, wenn eine Seite von JOIN
klein genug ist.
ins Gedächtnis zu passen. In diesem Fall wird das kleine Dataset
in den Arbeitsspeicher geladen,
und dann an jeden Executor gesendet. Das große Dataset wird nicht
Alle ungleichmäßigen Partitionen werden entfernt, die beim Zufallsmix generiert werden.
JOIN
-Schlüssel.
Im vorherigen Beispiel wird das Dataset „items“ zuerst in den Arbeitsspeicher des Spark-Treiber. Sie wird dann an jeden Executor gesendet. Testamentsvollstrecker können jetzt die Daten zusammenführen, ohne den Kaufdatensatz zu mischen.
Bei diesem Ansatz müssen Sie sowohl dem Spark-Treiber als auch dem
Executors, mit denen sie das Broadcast-Dataset im Arbeitsspeicher speichern können. Standardmäßig
Spark reserviert etwas weniger als 30% seines Arbeitsspeichers für die Speicherung dieser Art von
Daten. Wenn Sie In-Memory-JOIN
s verwenden, multiplizieren Sie die Größe des Datasets mit vier und
als Executor und Treiberspeicher festlegen. Wenn z. B. das Element-Dataset
1 GB groß war, müssten wir den Executor und den Treiberspeicher auf
mindestens 4 GB Datasets, die größer als 8 GB sind, können nicht in den Arbeitsspeicher geladen werden.
Schlüsselverteilung
Wenn beide Seiten des JOIN
zu groß sind, um in den Arbeitsspeicher zu passen, kann ein anderes Verfahren verwendet werden, um jeden JOIN
-Schlüssel in mehrere Schlüssel aufzuteilen, um die Parallelität zu erhöhen. Dieses Verfahren kann auf INNER JOIN
und
LEFT OUTER JOIN
Betriebsabläufe. Sie kann nicht für FULL OUTER JOIN
-Vorgänge verwendet werden.
Bei diesem Ansatz wird die schiefe Seite mit einer neuen Ganzzahlspalte mit einem
Zufallszahl von 1 bis N. Die nicht gedrehte Seite wird explodiert, wobei jede vorhandene Zeile
N
neue Zeilen werden generiert. Der explodierten Seite wird eine neue Spalte hinzugefügt,
mit jeder Zahl von 1 bis N. Anschließend wird eine normale JOIN
ausgeführt, mit der Ausnahme, dass die neue Spalte als Teil des JOIN
-Schlüssels hinzugefügt wird. So werden alle Daten, die zuvor in eine einzelne Partition geschrieben wurden, jetzt auf bis zu N
verschiedene Partitionen verteilt.
Im vorherigen Beispiel ist der Verteilungsfaktor N
auf 3
festgelegt. Die ursprünglichen Datensätze sind links zu sehen. Die gesalzene und die explodierte Version des Datensatzes werden in der Mitte angezeigt. Die zufälligen Daten sind rechts zu sehen. Hier sind drei statt nur einer Ausführender für Eierkäufe verantwortlich.
Eine größere Parallelität wird durch eine Erhöhung der Verteilungen erreicht. Dies hat jedoch zur Folge, dass eine Seite der JOIN
explodiert, was dazu führt, dass mehr Daten im Cluster verschoben werden. Daher nimmt der Vorteil mit zunehmender Verbreitung ab. Legen Sie in den meisten Fällen einen Wert von 20 oder weniger fest.