Parallele Verarbeitung für JOIN-Vorgänge

Auf dieser Seite wird die Leistungsoptimierung für JOIN-Vorgänge in Cloud Data Fusion erläutert.

JOIN-Vorgänge können der teuerste Teil einer Pipeline sein. Wie alles andere in einer Pipeline werden Vorgänge parallel ausgeführt. Im ersten Schritt einer JOIN werden die Daten so gemischt, dass jeder Datensatz mit demselben JOIN-Schlüssel an denselben Executor gesendet wird. Nachdem alle Daten nach dem Zufallsprinzip umverteilt wurden, werden sie zusammengeführt und die Ausgabe durch die Pipeline weiterlaufen.

Beispiel für die parallele Verarbeitung in JOIN-Vorgängen

Angenommen, Sie führen einen JOIN-Vorgang auf den Datasets Purchases und Items aus. Jeder Kaufeintrag enthält den Namen des Artikels und die Anzahl der gekauften Artikel. Jeder Artikeldatensatz enthält den Namen und den Preis des Artikels. A JOIN wird für den Artikelnamen ausgeführt, um den Gesamtpreis der einzelnen Artikel zu berechnen kaufen. Beim Zusammenführen der Daten werden die Daten im Cluster nach dem Zufallsprinzip verteilt, sodass Datensätze mit derselben ID enden zum selben Executor.

Wenn die JOIN-Schlüssel relativ gleichmäßig verteilt sind, werden JOIN-Vorgänge ausgeführt weil sie parallel ausgeführt werden können.

Wie bei jedem Shuffle wirkt sich die Datenverzerrung negativ auf die Leistung aus. Im vorherigen Beispiel werden Eier viel häufiger gekauft als Hühner oder Milch. Das bedeutet, dass der Ausführende, der an den Eierkäufen teilnimmt, mehr Arbeit hat als die anderen Ausführenden. Wenn Sie feststellen, dass ein JOIN verzerrt ist, gibt es zwei Möglichkeiten, die Leistung zu verbessern.

Schiefe Partitionen automatisch aufteilen

Mit der adaptiven Abfrageausführung werden wirklich starke Abweichungen automatisch verarbeitet. Sobald bei einer JOIN einige Partitionen viel größer als andere sind, werden sie in kleinere aufgeteilt. Informationen dazu, ob die adaptive Abfrageausführung aktiviert ist, finden Sie unter Autotuning.

Speicherinterne JOIN verwenden

Eine speicherinterne JOIN kann ausgeführt werden, wenn eine Seite von JOIN klein genug ist. ins Gedächtnis zu passen. In diesem Fall wird das kleine Dataset in den Arbeitsspeicher geladen, und dann an jeden Executor gesendet. Das große Dataset wird nicht Alle ungleichmäßigen Partitionen werden entfernt, die beim Zufallsmix generiert werden. JOIN-Schlüssel.

Im vorherigen Beispiel wird das Dataset „items“ zuerst in den Arbeitsspeicher des Spark-Treiber. Sie wird dann an jeden Executor gesendet. Testamentsvollstrecker können jetzt die Daten zusammenführen, ohne den Kaufdatensatz zu mischen.

Bei diesem Ansatz müssen Sie sowohl dem Spark-Treiber als auch dem Executors, mit denen sie das Broadcast-Dataset im Arbeitsspeicher speichern können. Standardmäßig Spark reserviert etwas weniger als 30% seines Arbeitsspeichers für die Speicherung dieser Art von Daten. Wenn Sie In-Memory-JOINs verwenden, multiplizieren Sie die Größe des Datasets mit vier und als Executor und Treiberspeicher festlegen. Wenn z. B. das Element-Dataset 1 GB groß war, müssten wir den Executor und den Treiberspeicher auf mindestens 4 GB Datasets, die größer als 8 GB sind, können nicht in den Arbeitsspeicher geladen werden.

Schlüsselverteilung

Wenn beide Seiten des JOIN zu groß sind, um in den Arbeitsspeicher zu passen, kann ein anderes Verfahren verwendet werden, um jeden JOIN-Schlüssel in mehrere Schlüssel aufzuteilen, um die Parallelität zu erhöhen. Dieses Verfahren kann auf INNER JOIN und LEFT OUTER JOINBetriebsabläufe. Sie kann nicht für FULL OUTER JOIN-Vorgänge verwendet werden.

Bei diesem Ansatz wird die schiefe Seite mit einer neuen Ganzzahlspalte mit einem Zufallszahl von 1 bis N. Die nicht gedrehte Seite wird explodiert, wobei jede vorhandene Zeile N neue Zeilen werden generiert. Der explodierten Seite wird eine neue Spalte hinzugefügt, mit jeder Zahl von 1 bis N. Anschließend wird eine normale JOIN ausgeführt, mit der Ausnahme, dass die neue Spalte als Teil des JOIN-Schlüssels hinzugefügt wird. So werden alle Daten, die zuvor in eine einzelne Partition geschrieben wurden, jetzt auf bis zu N verschiedene Partitionen verteilt.

Im vorherigen Beispiel ist der Verteilungsfaktor N auf 3 festgelegt. Die ursprünglichen Datensätze sind links zu sehen. Die gesalzene und die explodierte Version des Datensatzes werden in der Mitte angezeigt. Die zufälligen Daten sind rechts zu sehen. Hier sind drei statt nur einer Ausführender für Eierkäufe verantwortlich.

Eine größere Parallelität wird durch eine Erhöhung der Verteilungen erreicht. Dies hat jedoch zur Folge, dass eine Seite der JOIN explodiert, was dazu führt, dass mehr Daten im Cluster verschoben werden. Daher nimmt der Vorteil mit zunehmender Verbreitung ab. Legen Sie in den meisten Fällen einen Wert von 20 oder weniger fest.

Nächste Schritte