Questa pagina spiega l'ottimizzazione delle prestazioni per le operazioni JOIN
in Cloud Data Fusion.
Le operazioni JOIN
possono essere la parte più costosa di una pipeline. Come tutto
il resto in una pipeline, le operazioni vengono eseguite in parallelo. Il primo passaggio
JOIN
sta eseguendo lo shuffling dei dati in modo che venga inviato ogni record con la stessa chiave JOIN
allo stesso esecutore. Dopo lo shuffling di tutti i dati, questi vengono uniti e
continua attraverso la pipeline.
Esempio di elaborazione parallela nelle operazioni di JOIN
Ad esempio, supponi di eseguire un'operazione JOIN
su set di dati chiamati
Purchases
e Items
. Ogni record di acquisto contiene il nome e il numero di un articolo acquistato. Ogni record dell'articolo contiene il nome e il prezzo dell'articolo. Viene eseguito un
JOIN
sul nome dell'articolo per calcolare il prezzo totale di ogni
acquisto. Quando i dati vengono uniti, viene eseguito lo shuffling dei dati nel cluster in modo che
con lo stesso ID finiscono nello stesso esecutore.
Quando le chiavi JOIN
sono distribuite in modo abbastanza uniforme, le operazioni JOIN
hanno un buon rendimento perché possono essere eseguite in parallelo.
Come in qualsiasi shuffling, il disallineamento dei dati influisce negativamente sulle prestazioni. Nella precedente
Ad esempio, le uova vengono acquistate molto più spesso del pollo o il latte, che
significa che l'esecutore che si unisce agli acquisti di uova fa più lavoro dell'altro
esecutori. Se noti che un valore JOIN
è distorto, ci sono due modi per migliorare
le prestazioni dei dispositivi.
Suddivide automaticamente le partizioni disallineate
Con l'esecuzione di query adattiva, gli scostamenti molto elevati verranno gestiti automaticamente.
Non appena un JOIN
genera alcune partizioni molto più grandi di altre, queste vengono suddivise in partizioni più piccole. Per verificare se l'esecuzione adattiva delle query è abilitata,
consulta Ottimizzazione automatica.
Utilizzare un JOIN
in memoria
È possibile eseguire un JOIN
in memoria se un lato dell'JOIN
è sufficientemente piccolo
in memoria. In questa situazione, il piccolo set di dati viene caricato in memoria e poi trasmesso a ogni utente. Lo shuffling del set di dati di grandi dimensioni non viene eseguito
tutte, rimuovendo le partizioni non uniformi generate durante lo shuffling sul
JOIN
chiave.
Nell'esempio precedente, il set di dati degli articoli viene prima caricato nella memoria del driver Spark. Viene quindi trasmesso a ciascun esecutore. Ora gli esecutori possono partecipare senza eseguire lo shuffling di alcun set di dati di acquisto.
Questo approccio richiede di dedicare memoria sufficiente sia al driver Spark
esecutori per consentire loro di archiviare
il set di dati di broadcast in memoria. Per impostazione predefinita,
Spark riserva poco meno del 30% della memoria per l'archiviazione di questo tipo di
dati. Quando utilizzi JOIN
in memoria, moltiplica le dimensioni del set di dati per quattro e impostale come memoria dell'executor e del driver. Ad esempio, se il set di dati degli articoli avesse una dimensione di 1 GB, dovremmo impostare la memoria dell'executor e del driver su almeno 4 GB. I set di dati di dimensioni superiori a 8 GB non possono essere caricati in memoria.
Distribuzione delle chiavi
Quando entrambi i lati di JOIN
sono troppo grandi per essere memorizzati, è possibile utilizzare una tecnica diversa per suddividere ogni chiave JOIN
in più chiavi per aumentare il livello di parallelismo. Questa tecnica può essere applicata alle operazioni INNER JOIN
e
LEFT OUTER JOIN
. Non può essere utilizzato per operazioni FULL OUTER JOIN
.
In questo approccio, il lato distorto viene aggiunto con una nuova colonna di interi con un
numero casuale compreso tra 1 e N. Il lato non inclinato viene espanso, con ogni riga esistente che genera N
nuove righe. Viene aggiunta una nuova colonna compilata al lato esploso
con ciascun numero da 1 a N. Viene quindi eseguito un JOIN
normale, ad eccezione della nuova
viene aggiunta come parte della chiave JOIN
. In questo modo, tutti i dati che hanno utilizzato
per andare a una singola partizione è ora distribuito su un massimo di N
partizioni diverse.
Nell'esempio precedente, il fattore di distribuzione N
è impostato su 3
. La
i set di dati originali sono mostrati a sinistra. Le versioni con valori aggiunti e espanse del
set di dati sono mostrate al centro. I dati ordinati in ordine casuale vengono mostrati sulla destra, con
tre diversi esecutori che si uniscono agli acquisti di uova, invece di uno.
Un maggiore parallelismo si ottiene aumentando le distribuzioni. Tuttavia, questo avviene
a costo dell'esplosione di un lato del JOIN
, con conseguente rimescolamento di più dati
nel cluster. Per questo motivo, il vantaggio si riduce con la distribuzione
aumenta. Nella maggior parte dei casi, impostalo su 20 o meno.
Passaggi successivi
- Scopri di più sull'elaborazione parallela in Cloud Data Fusion.