Elaborazione parallela per operazioni di JOIN

Questa pagina spiega l'ottimizzazione delle prestazioni per le operazioni JOIN in Cloud Data Fusion.

Le operazioni JOIN possono essere la parte più costosa di una pipeline. Come tutto il resto di una pipeline, le operazioni vengono eseguite in parallelo. Il primo passaggio di un JOIN è lo shuffling dei dati in modo che ogni record con la stessa chiave JOIN venga inviato allo stesso esecutore. Dopo aver eseguito lo shuffling di tutti i dati, questi vengono uniti e l'output continua attraverso la pipeline.

Esempio di elaborazione parallela nelle operazioni di JOIN

Ad esempio, supponi di eseguire un'operazione JOIN su set di dati chiamati Purchases e Items. Ogni record di acquisto contiene un nome dell'articolo e il numero acquistato. Ogni record di articolo contiene il nome e il prezzo dell'articolo. Un JOIN viene assegnato al nome dell'articolo per calcolare il prezzo totale di ogni acquisto. Quando i dati vengono uniti, i dati vengono sottoposti a shuffling nel cluster in modo che i record con lo stesso ID finiscano sullo stesso esecutore.

Quando le chiavi JOIN sono distribuite in modo uniforme, le operazioni JOIN funzionano bene perché possono essere eseguite in parallelo.

Come in qualsiasi shuffling, il disallineamento dei dati influisce negativamente sulle prestazioni. Nell'esempio precedente, le uova vengono acquistate molto più spesso del pollo o il latte, il che significa che l'esecutore che unisce gli acquisti delle uova fa più lavoro degli altri esecutori. Se noti un'inclinazione di JOIN, ci sono due modi per migliorare le prestazioni.

Suddivide automaticamente le partizioni disallineate

Con l'esecuzione adattiva delle query, verranno gestiti automaticamente disallineamenti molto gravosi. Non appena un JOIN produce alcune partizioni molto più grandi di altre, queste vengono suddivise in parti più piccole. Per verificare che l'esecuzione adattiva delle query sia abilitata, consulta Ottimizzazione automatica.

Usa un JOIN in memoria

È possibile eseguire un JOIN in memoria se un lato dell'JOIN è sufficientemente piccolo da essere in memoria. In questo caso, il set di dati di piccole dimensioni viene caricato in memoria e poi trasmesso a ogni esecutore. Il set di dati di grandi dimensioni non viene eseguito affatto, rimuovendo le partizioni irregolari che vengono generate durante lo shuffling sulla chiave JOIN.

Nell'esempio precedente, il set di dati degli elementi viene caricato per la prima volta nella memoria del driver Spark. Viene quindi trasmesso a ciascun esecutore. Ora gli esecutori possono unire i dati senza eseguire lo shuffling del set di dati di acquisto.

Questo approccio richiede di concedere memoria sufficiente sia al driver Spark sia agli esecutori per consentire loro di archiviare il set di dati di broadcast. Per impostazione predefinita, Spark riserva poco meno del 30% della sua memoria per l'archiviazione di questo tipo di dati. Quando utilizzi JOIN in memoria, moltiplica la dimensione del set di dati per quattro e impostala come memoria di esecutore e driver. Ad esempio, se il set di dati degli elementi aveva una dimensione di 1 GB, avremmo dovuto impostare la memoria di esecutore e driver su almeno 4 GB. I set di dati di dimensioni superiori a 8 GB non possono essere caricati in memoria.

Distribuzione delle chiavi

Quando entrambi i lati dell'elemento JOIN sono troppo grandi per essere contenuti nella memoria, è possibile utilizzare una tecnica diversa per suddividere ogni chiave JOIN in più chiavi per aumentare il livello di parallelismo. Questa tecnica può essere applicata alle operazioni INNER JOIN e LEFT OUTER JOIN. Non può essere utilizzato per le operazioni FULL OUTER JOIN.

In questo approccio, il lato inclinato viene salato con una nuova colonna di numeri interi con un numero casuale compreso tra 1 e N. Il lato senza inclinazione è esploso e ogni riga esistente genera N nuove righe. Viene aggiunta una nuova colonna al lato esploso, compilata con ogni numero da 1 a N. Viene quindi eseguito un JOIN normale, ma la nuova colonna viene aggiunta come parte della chiave JOIN. In questo modo, tutti i dati utilizzati per andare a una singola partizione ora vengono distribuiti fino a un massimo di N partizioni diverse.

Nell'esempio precedente, il fattore di distribuzione N è impostato su 3. I set di dati originali sono mostrati a sinistra. Al centro sono mostrate le versioni con sale ed esplose del set di dati. I dati sottoposti a shuffling vengono mostrati sulla destra, con tre diversi esecutori che uniscono gli acquisti di uova, anziché uno.

Un maggiore parallelismo si ottiene aumentando le distribuzioni. Tuttavia, ciò comporta l'esplosione di un lato di JOIN, con conseguente shuffling di più dati nel cluster. Per questo motivo, il vantaggio diminuisce con l'aumento della distribuzione. Nella maggior parte dei casi, imposta un valore pari o inferiore a 20.

Passaggi successivi