Questa pagina spiega l'ottimizzazione delle prestazioni per le operazioni JOIN
in Cloud Data Fusion.
Le operazioni JOIN
possono essere la parte più costosa di una pipeline. Come tutto il resto in una pipeline, le operazioni vengono eseguite in parallelo. Il primo passaggio di un
JOIN
consiste nello smistare i dati in modo che ogni record con la stessa chiave JOIN
venga inviato
allo stesso executor. Dopo che tutti i dati sono stati mischiati, vengono uniti e l'output continua attraverso la pipeline.
Esempio di elaborazione parallela nelle operazioni JOIN
Ad esempio, supponiamo di eseguire un'operazione JOIN
su set di dati denominati
Purchases
e Items
. Ogni record di acquisto contiene il nome e il numero di un articolo acquistato. Ogni record dell'articolo contiene il nome e il prezzo dell'articolo. Viene eseguito un
JOIN
sul nome dell'articolo per calcolare il prezzo totale di ogni
acquisto. Quando i dati vengono uniti, vengono mischiati nel cluster in modo che i record con lo stesso ID finiscano nello stesso executor.
Quando le chiavi JOIN
sono distribuite in modo abbastanza uniforme, le operazioni JOIN
hanno un buon rendimento perché possono essere eseguite in parallelo.
Come qualsiasi ordinamento casuale, la distorsione dei dati influisce negativamente sul rendimento. Nell'esempio precedente, le uova vengono acquistate molto più spesso rispetto al pollo o al latte, il che significa che l'executor che si occupa degli acquisti di uova lavora di più rispetto agli altri executor. Se noti che un valore JOIN
è distorto, esistono due modi per migliorare il rendimento.
Suddividere automaticamente le partizioni distorte
Con l'esecuzione di query adattiva, gli scostamenti molto elevati verranno gestiti automaticamente.
Non appena un JOIN
produce partizioni molto più grandi di altre, queste vengono suddivise in partizioni più piccole. Per verificare di aver attivato l'esecuzione di query adattiva, consulta Ottimizzazione automatica.
Utilizzare un JOIN
in memoria
È possibile eseguire un JOIN
in memoria se un lato del JOIN
è abbastanza piccolo da rientrare nella memoria. In questa situazione, il piccolo set di dati viene caricato in memoria e poi trasmesso a ogni utente. Il set di dati di grandi dimensioni non viene affatto mescolato, rimuovendo le partizioni non uniformi generate durante il rimescolamento in base alla chiave JOIN
.
Nell'esempio precedente, il set di dati degli articoli viene prima caricato nella memoria del driver Spark. che viene poi trasmessa a ogni esecutore. Ora gli esecutori possono unire i dati senza mescolare alcun set di dati di acquisto.
Questo approccio richiede di fornire memoria sufficiente sia al driver Spark sia agli executor per consentire loro di memorizzare il set di dati di trasmissione in memoria. Per impostazione predefinita,
Spark riserva poco meno del 30% della memoria per l'archiviazione di questo tipo di
dati. Quando utilizzi JOIN
in memoria, moltiplica le dimensioni del set di dati per quattro e impostale come memoria dell'executor e del driver. Ad esempio, se il set di dati degli articoli avesse una dimensione di 1 GB, dovremmo impostare la memoria dell'executor e del driver su almeno 4 GB. I set di dati più grandi di 8 GB non possono essere caricati in memoria.
Distribuzione delle chiavi
Quando entrambi i lati di JOIN
sono troppo grandi per essere memorizzati, è possibile utilizzare una tecnica diversa per suddividere ogni chiave JOIN
in più chiavi per aumentare il livello di parallelismo. Questa tecnica può essere applicata alle operazioni INNER JOIN
e
LEFT OUTER JOIN
. Non può essere utilizzato per operazioni FULL OUTER JOIN
.
In questo approccio, il lato distorto viene aggiunto con una nuova colonna di interi con un
numero casuale compreso tra 1 e N. Il lato non inclinato viene espanso, con ogni riga esistente che genera N
nuove righe. Al lato esploso viene aggiunta una nuova colonna, compilata con ogni numero da 1 a N. Viene quindi eseguita una normale JOIN
, tranne che la nuova colonna viene aggiunta come parte della chiave JOIN
. In questo modo, tutti i dati che in precedenza andavano in una singola partizione ora vengono distribuiti in un massimo di N
partizioni diverse.
Nell'esempio precedente, il fattore di distribuzione N
è impostato su 3
. I set di dati originali sono mostrati a sinistra. Le versioni con valori aggiunti e espanse del
set di dati sono mostrate al centro. I dati rimescolati vengono mostrati a destra, con tre diversi esecutori che partecipano agli acquisti di uova anziché uno.
Un maggiore parallelismo si ottiene aumentando le distribuzioni. Tuttavia, questo avviene
a costo dell'esplosione di un lato del JOIN
, con conseguente rimescolamento di più dati
nel cluster. Di conseguenza, il vantaggio diminuisce con l'aumento della distribuzione. Nella maggior parte dei casi, impostalo su 20 o meno.
Passaggi successivi
- Scopri di più sull'elaborazione parallela in Cloud Data Fusion.