Elaborazione parallela

Le pipeline vengono eseguite su cluster di macchine. Raggiungono una velocità effettiva elevata mediante la suddivisione del lavoro da completare per poi eseguire il lavoro in parallelo su più esecutori distribuiti nel cluster. In generale, maggiore è il numero di suddivisioni (chiamate anche partizioni), più rapidamente può essere eseguita la pipeline. Il livello di parallelismo nella pipeline è determinato dalle origini e dalle fasi di shuffling nella pipeline.

Origini

All'inizio di ogni esecuzione della pipeline, ogni origine nella pipeline calcola quali dati devono essere letti e in che modo possono essere suddivisi in suddivisioni. Considera ad esempio una pipeline di base che legge da Cloud Storage, esegue alcune trasformazioni di Wrangler e quindi scrive di nuovo in Cloud Storage.

Pipeline di base che mostra l'origine di Cloud Storage, la trasformazione Wrangler e il sink di Cloud Storage

All'avvio della pipeline, l'origine Cloud Storage esamina i file di input e li suddivide in suddivisioni in base alle dimensioni dei file. Ad esempio, un singolo file da gigabyte può essere suddiviso in 100 divisioni, ogni 10 MB di dimensione. Ogni esecutore legge i dati per quella suddivisione, esegue le trasformazioni di Wrangler e quindi scrive l'output in un file part.

Dati partizionati in Cloud Storage in trasformazioni Wrangler parallele in file delle parti

Se la pipeline è lenta, una delle prime cose da verificare è se le origini stanno creando suddivisioni sufficienti per sfruttare appieno il parallelismo. Ad esempio, alcuni tipi di compressione rendono non separabili i file di testo non crittografato. Se stai leggendo file che sono stati compressi con gzip, potresti notare che la pipeline viene eseguita molto più lentamente rispetto a quando leggi file non compressi o file compressi con BZIP (che è suddivisibile). Allo stesso modo, se utilizzi l'origine del database e l'hai configurata per utilizzare una singola suddivisione, il processo è molto più lento rispetto a quando utilizzi più suddivisioni.

Riproduzione casuale

Alcuni tipi di plug-in causano lo shuffling dei dati nel cluster. Questo si verifica quando i record elaborati da un esecutore devono essere inviati a un altro esecutore per eseguire il calcolo. Gli shuffle sono operazioni costose perché implicano un elevato I/O. I plug-in che causano lo shuffling dei dati vengono visualizzati nella sezione Analytics di Pipeline Studio. tra cui plug-in come Raggruppa per, De deduplicata, Distinct e Joiner. Ad esempio, supponiamo che venga aggiunta alla pipeline una fase Raggruppa per nell'esempio precedente.

Inoltre, supponiamo che i dati letti rappresentino acquisti effettuati in un negozio di alimentari. Ogni record contiene un campo item e un campo num_purchased. Nella fase Raggruppa per, configuriamo la pipeline in modo da raggruppare i record nel campo item e calcoliamo la somma del campo num_purchased.

Quando viene eseguita la pipeline, i file di input vengono suddivisi come descritto in precedenza. In seguito, ogni record viene eseguito lo shuffling nel cluster in modo che ogni record con lo stesso elemento appartenga allo stesso esecutore.

Come illustrato nell'esempio precedente, i record relativi agli acquisti di mele originariamente erano distribuiti tra diversi esecutori. Per eseguire l'aggregazione, tutti i record dovevano essere inviati nel cluster allo stesso esecutore.

La maggior parte dei plug-in che richiedono uno shuffling consente di specificare il numero di partizioni da utilizzare durante lo shuffling dei dati. Consente di controllare quanti esecutori vengono utilizzati per elaborare i dati sottoposti a shuffling.

Nell'esempio precedente, se il numero di partizioni è impostato su 2, ogni esecutore calcola i dati aggregati per due elementi anziché uno.

Tieni presente che è possibile ridurre il parallelismo della pipeline dopo questa fase. Ad esempio, considera la visione logica della pipeline:

Se l'origine divide i dati in 500 partizioni, ma il raggruppamento per esegue lo shuffling utilizzando 200 partizioni, il livello massimo di parallelismo dopo il raggruppamento in Raggruppa per scende da 500 a 200. Invece di 500 diversi file di parte scritti in Cloud Storage, ne hai solo 200.

Scelta delle partizioni

Se il numero di partizioni è troppo basso, non utilizzerai la piena capacità del tuo cluster per caricare in contemporanea il più possibile il lavoro. L'impostazione di partizioni troppo elevate aumenta l'overhead non necessario. In generale, è preferibile utilizzare troppe partizioni. L'overhead aggiuntivo è una preoccupazione se l'esecuzione della pipeline richiede pochi minuti e tu provi a ridurne un paio di minuti. Se l'esecuzione della pipeline richiede ore, l'overhead di solito non è un aspetto di cui devi preoccuparti.

Un modo utile ma eccessivamente semplicistico per determinare il numero di partizioni da utilizzare è impostarlo su max(cluster CPUs, input records / 500,000). In altre parole, dividi il numero di record di input per 500.000. Se questo numero è maggiore del numero di CPU del cluster, utilizzalo per il numero di partizioni. In caso contrario, utilizza il numero di CPU del cluster. Ad esempio, se il cluster ha 100 CPU e la fase di shuffling deve avere 100 milioni di record di input, utilizza 200 partizioni.

Una risposta più completa è che lo shuffling funziona al meglio quando i dati intermedi di shuffling per ogni partizione possono essere inseriti completamente nella memoria di un esecutore, in modo che nulla debba essere versato sul disco. Spark riserva poco meno del 30% della memoria di un esecutore per i dati di shuffling. Il numero esatto è (memoria totale - 300 MB) * 30%. Se supponiamo che ogni esecutore sia impostato per utilizzare 2 GB di memoria, significa che ogni partizione non deve contenere più di (2 GB - 300 MB) * 30% = circa 500 MB di record. Partiamo dal presupposto che ogni record venga compresso fino a 1 kB, quindi (500 MB / partizione) / (1 kB/ record) = 500.000 record per partizione. Se gli esecutori utilizzano più memoria o se i tuoi record sono più piccoli, puoi modificare questo numero di conseguenza.

Disallineamento dati

Tieni presente che, nell'esempio precedente, gli acquisti di vari articoli sono stati distribuiti in modo uniforme. In altre parole, ci sono stati tre acquisti ciascuno per mele, banane, carote e uova. Lo shuffling su una chiave distribuita in modo uniforme è il tipo di shuffling più efficiente, ma molti set di dati non hanno questa proprietà. Proseguendo con l'acquisto nel negozio di alimentari nell'esempio precedente, ci si aspetterebbe di fare molti più acquisti per le uova rispetto a quelli per biglietti di nozze. Se alcune chiavi di shuffling sono molto più comuni di altre chiavi, devi gestire dati distorti. I dati disallineati possono avere prestazioni nettamente inferiori rispetto a quelli non disallineati perché una quantità sproporzionata di lavoro viene eseguita da una piccola manciata di esecutori. Questo fa sì che un piccolo sottoinsieme di partizioni sia molto più grande di tutte le altre.

In questo esempio, gli acquisti di uova sono cinque volte superiori rispetto agli acquisti con carta, il che significa che il calcolo del aggregato delle uova richiede circa cinque volte più tempo. Non importa molto quando si ha a che fare con solo 10 record, invece che due, ma fa una grande differenza quando si ha a che fare con cinque miliardi di record invece di un miliardo. In caso di disallineamento dei dati, il numero di partizioni utilizzate in uno shuffle non ha un grande impatto sulle prestazioni della pipeline.

Puoi riconoscere il disallineamento dei dati esaminando il grafico per individuare i record di output nel tempo. Se la fase restituisce i record a un ritmo molto più elevato all'inizio dell'esecuzione della pipeline e poi rallenta improvvisamente, è possibile che i dati siano disallineati.

Puoi anche riconoscere il disallineamento dei dati esaminando l'utilizzo della memoria del cluster nel tempo. Se il cluster è alla capacità da un po' di tempo, ma improvvisamente ha ridotto l'utilizzo della memoria per un determinato periodo di tempo, significa anche che stai riscontrando un disallineamento dei dati.

La parzialità dei dati influisce notevolmente sulle prestazioni durante l'esecuzione di un join. Esistono alcune tecniche che possono essere usate per migliorare le prestazioni dei join sbilanciati. Per maggiori informazioni, consulta Elaborazione parallela per le operazioni di JOIN.

Ottimizzazione adattiva per l'esecuzione

Per ottimizzare in modo adattivo l'esecuzione, specifica l'intervallo di partizioni da utilizzare, non il numero esatto di partizione. Il numero di partizione esatto, anche se impostato nella configurazione della pipeline, viene ignorato quando è abilitata l'esecuzione adattiva.

Se utilizzi un cluster Dataproc temporaneo, Cloud Data Fusion imposta automaticamente la configurazione adeguata, ma per i cluster Dataproc o Hadoop statici possono essere impostati i due parametri di configurazione successivi:

  • spark.default.parallelism: impostalo sul numero totale di vCore disponibili nel cluster. Ciò garantisce che il cluster non sia sotto carico e definisce il limite inferiore per il numero di partizioni.
  • spark.sql.adaptive.coalescePartitions.initialPartitionNum: impostalo su 32x del numero di vCore disponibili nel cluster. Definisce il limite superiore per il numero di partizioni.
  • Spark.sql.adaptive.enabled: per attivare le ottimizzazioni, imposta questo valore su true. Dataproc lo imposta automaticamente, ma se usi cluster Hadoop generici, devi assicurarti che sia abilitato .

Questi parametri possono essere impostati nella configurazione del motore di una pipeline specifica o nelle proprietà del cluster di un cluster Dataproc statico.

Passaggi successivi