Elaborazione parallela per le operazioni JOIN

Questa pagina spiega l'ottimizzazione delle prestazioni per le operazioni JOIN in Cloud Data Fusion.

Le operazioni JOIN possono essere la parte più costosa di una pipeline. Come tutto il resto in una pipeline, le operazioni vengono eseguite in parallelo. Il primo passaggio JOIN sta eseguendo lo shuffling dei dati in modo che venga inviato ogni record con la stessa chiave JOIN allo stesso esecutore. Dopo lo shuffling di tutti i dati, questi vengono uniti e continua attraverso la pipeline.

Esempio di elaborazione parallela nelle operazioni di JOIN

Ad esempio, supponi di eseguire un'operazione JOIN su set di dati chiamati Purchases e Items. Ogni record di acquisto contiene il nome e il numero di un articolo acquistato. Ogni record dell'articolo contiene il nome e il prezzo dell'articolo. Viene eseguito un JOIN sul nome dell'articolo per calcolare il prezzo totale di ogni acquisto. Quando i dati vengono uniti, viene eseguito lo shuffling dei dati nel cluster in modo che con lo stesso ID finiscono nello stesso esecutore.

Quando le chiavi JOIN sono distribuite in modo abbastanza uniforme, le operazioni JOIN hanno un buon rendimento perché possono essere eseguite in parallelo.

Come in qualsiasi shuffling, il disallineamento dei dati influisce negativamente sulle prestazioni. Nella precedente Ad esempio, le uova vengono acquistate molto più spesso del pollo o il latte, che significa che l'esecutore che si unisce agli acquisti di uova fa più lavoro dell'altro esecutori. Se noti che un valore JOIN è distorto, ci sono due modi per migliorare le prestazioni dei dispositivi.

Suddivide automaticamente le partizioni disallineate

Con l'esecuzione di query adattiva, gli scostamenti molto elevati verranno gestiti automaticamente. Non appena un JOIN genera alcune partizioni molto più grandi di altre, queste vengono suddivise in partizioni più piccole. Per verificare se l'esecuzione adattiva delle query è abilitata, consulta Ottimizzazione automatica.

Utilizzare un JOIN in memoria

È possibile eseguire un JOIN in memoria se un lato dell'JOIN è sufficientemente piccolo in memoria. In questa situazione, il piccolo set di dati viene caricato in memoria e poi trasmesso a ogni utente. Lo shuffling del set di dati di grandi dimensioni non viene eseguito tutte, rimuovendo le partizioni non uniformi generate durante lo shuffling sul JOIN chiave.

Nell'esempio precedente, il set di dati degli articoli viene prima caricato nella memoria del driver Spark. Viene quindi trasmesso a ciascun esecutore. Ora gli esecutori possono partecipare senza eseguire lo shuffling di alcun set di dati di acquisto.

Questo approccio richiede di dedicare memoria sufficiente sia al driver Spark esecutori per consentire loro di archiviare il set di dati di broadcast in memoria. Per impostazione predefinita, Spark riserva poco meno del 30% della memoria per l'archiviazione di questo tipo di dati. Quando utilizzi JOIN in memoria, moltiplica le dimensioni del set di dati per quattro e impostale come memoria dell'executor e del driver. Ad esempio, se il set di dati degli articoli avesse una dimensione di 1 GB, dovremmo impostare la memoria dell'executor e del driver su almeno 4 GB. I set di dati di dimensioni superiori a 8 GB non possono essere caricati in memoria.

Distribuzione delle chiavi

Quando entrambi i lati di JOIN sono troppo grandi per essere memorizzati, è possibile utilizzare una tecnica diversa per suddividere ogni chiave JOIN in più chiavi per aumentare il livello di parallelismo. Questa tecnica può essere applicata alle operazioni INNER JOIN e LEFT OUTER JOIN. Non può essere utilizzato per operazioni FULL OUTER JOIN.

In questo approccio, il lato distorto viene aggiunto con una nuova colonna di interi con un numero casuale compreso tra 1 e N. Il lato non inclinato viene espanso, con ogni riga esistente che genera N nuove righe. Viene aggiunta una nuova colonna compilata al lato esploso con ciascun numero da 1 a N. Viene quindi eseguito un JOIN normale, ad eccezione della nuova viene aggiunta come parte della chiave JOIN. In questo modo, tutti i dati che hanno utilizzato per andare a una singola partizione è ora distribuito su un massimo di N partizioni diverse.

Nell'esempio precedente, il fattore di distribuzione N è impostato su 3. La i set di dati originali sono mostrati a sinistra. Le versioni con valori aggiunti e espanse del set di dati sono mostrate al centro. I dati ordinati in ordine casuale vengono mostrati sulla destra, con tre diversi esecutori che si uniscono agli acquisti di uova, invece di uno.

Un maggiore parallelismo si ottiene aumentando le distribuzioni. Tuttavia, questo avviene a costo dell'esplosione di un lato del JOIN, con conseguente rimescolamento di più dati nel cluster. Per questo motivo, il vantaggio si riduce con la distribuzione aumenta. Nella maggior parte dei casi, impostalo su 20 o meno.

Passaggi successivi