Questa pagina spiega l'ottimizzazione delle prestazioni per le operazioni JOIN
in Cloud Data Fusion.
Le operazioni JOIN
possono essere la parte più costosa di una pipeline. Come tutto il resto di una pipeline, le operazioni vengono eseguite in parallelo. Il primo passaggio di un JOIN
è lo shuffling dei dati in modo che ogni record con la stessa chiave JOIN
venga inviato allo stesso esecutore. Dopo aver eseguito lo shuffling di tutti i dati, questi vengono uniti e l'output continua attraverso la pipeline.
Esempio di elaborazione parallela nelle operazioni di JOIN
Ad esempio, supponi di eseguire un'operazione JOIN
su set di dati chiamati
Purchases
e Items
. Ogni record di acquisto contiene un nome dell'articolo e il numero
acquistato. Ogni record di articolo contiene il nome e il prezzo dell'articolo. Un JOIN
viene assegnato al nome dell'articolo per calcolare il prezzo totale di ogni acquisto. Quando i dati vengono uniti, i dati vengono sottoposti a shuffling nel cluster in modo che
i record con lo stesso ID finiscano sullo stesso esecutore.
Quando le chiavi JOIN
sono distribuite in modo uniforme, le operazioni JOIN
funzionano bene
perché possono essere eseguite in parallelo.
Come in qualsiasi shuffling, il disallineamento dei dati influisce negativamente sulle prestazioni. Nell'esempio precedente, le uova vengono acquistate molto più spesso del pollo o il latte, il che significa che l'esecutore che unisce gli acquisti delle uova fa più lavoro degli altri esecutori. Se noti un'inclinazione di JOIN
, ci sono due modi per migliorare
le prestazioni.
Suddivide automaticamente le partizioni disallineate
Con l'esecuzione adattiva delle query, verranno gestiti automaticamente disallineamenti molto gravosi.
Non appena un JOIN
produce alcune partizioni molto più grandi di altre, queste vengono
suddivise in parti più piccole. Per verificare che l'esecuzione adattiva delle query sia abilitata,
consulta Ottimizzazione automatica.
Usa un JOIN
in memoria
È possibile eseguire un JOIN
in memoria se un lato dell'JOIN
è sufficientemente piccolo da
essere in memoria. In questo caso, il set di dati di piccole dimensioni viene
caricato in memoria e poi trasmesso a ogni esecutore. Il set di dati di grandi dimensioni non viene eseguito affatto, rimuovendo le partizioni irregolari che vengono generate durante lo shuffling sulla
chiave JOIN
.
Nell'esempio precedente, il set di dati degli elementi viene caricato per la prima volta nella memoria del driver Spark. Viene quindi trasmesso a ciascun esecutore. Ora gli esecutori possono unire i dati senza eseguire lo shuffling del set di dati di acquisto.
Questo approccio richiede di concedere memoria sufficiente sia al driver Spark sia agli esecutori per consentire loro di archiviare il set di dati di broadcast. Per impostazione predefinita,
Spark riserva poco meno del 30% della sua memoria per l'archiviazione di questo tipo di dati. Quando utilizzi JOIN
in memoria, moltiplica la dimensione del set di dati per quattro e impostala come memoria di esecutore e driver. Ad esempio, se il set di dati degli elementi
aveva una dimensione di 1 GB, avremmo dovuto impostare la memoria di esecutore e driver su
almeno 4 GB. I set di dati di dimensioni superiori a 8 GB non possono essere caricati in memoria.
Distribuzione delle chiavi
Quando entrambi i lati dell'elemento JOIN
sono troppo grandi per essere contenuti nella memoria, è possibile utilizzare una tecnica diversa per suddividere ogni chiave JOIN
in più chiavi per aumentare il livello di parallelismo. Questa tecnica può essere applicata alle operazioni INNER JOIN
e
LEFT OUTER JOIN
. Non può essere utilizzato per le operazioni
FULL OUTER JOIN
.
In questo approccio, il lato inclinato viene salato con una nuova colonna di numeri interi con un
numero casuale compreso tra 1 e N. Il lato senza inclinazione è esploso e ogni riga esistente
genera N
nuove righe. Viene aggiunta una nuova colonna al lato esploso, compilata con ogni numero da 1 a N. Viene quindi eseguito un JOIN
normale, ma la nuova colonna viene aggiunta come parte della chiave JOIN
. In questo modo, tutti i dati utilizzati per andare a una singola partizione ora vengono distribuiti fino a un massimo di N
partizioni diverse.
Nell'esempio precedente, il fattore di distribuzione N
è impostato su 3
. I
set di dati originali sono mostrati a sinistra. Al centro sono mostrate le versioni con sale ed esplose del set di dati. I dati sottoposti a shuffling vengono mostrati sulla destra, con
tre diversi esecutori che uniscono gli acquisti di uova, anziché uno.
Un maggiore parallelismo si ottiene aumentando le distribuzioni. Tuttavia, ciò comporta l'esplosione di un lato di JOIN
, con conseguente shuffling di più dati nel cluster. Per questo motivo, il vantaggio diminuisce con l'aumento
della distribuzione. Nella maggior parte dei casi, imposta un valore pari o inferiore a 20.
Passaggi successivi
- Scopri di più sull'elaborazione parallela in Cloud Data Fusion.