Elaborazione parallela

Le pipeline vengono eseguite su cluster di macchine. Raggiungono una velocità effettiva elevata dividendo il lavoro da svolgere ed eseguendolo in parallelo su più esecutori distribuiti nel cluster. In generale, quanto maggiore è il numero di suddivisioni (chiamate anche partizioni), più velocemente può essere eseguita la pipeline. Il livello di parallelismo nella pipeline è determinato dalle origini e dalle fasi di mescolamento della pipeline.

Fonti

All'inizio di ogni esecuzione della pipeline, ogni origine della pipeline calcola quali dati devono essere letti e come possono essere suddivisi. Ad esempio, considera una pipeline di base che legge da Cloud Storage, esegue alcune trasformazioni di Wrangler e poi scrive nuovamente in Cloud Storage.

Pipeline di base che mostra l'origine Cloud Storage, la trasformazione Wrangler e lo scopo Cloud Storage

Quando la pipeline viene avviata, l'origine Cloud Storage esamina i file di input e li suddivide in base alle dimensioni. Ad esempio, un singolo file di un gigabyte può essere suddiviso in 100 parti, ciascuna di 10 MB. Ogni executor legge i dati relativi alla suddivisione, esegue le trasformazioni di Wrangler e poi scrive l'output in un file part.

Dati partizionati in Cloud Storage in trasformazioni Wrangler parallele in file part

Se la pipeline funziona lentamente, una delle prime cose da controllare è se le origini creano suddivisioni sufficienti per sfruttare appieno il parallelismo. Ad esempio, alcuni tipi di compressione rendono i file di testo non frazionabili. Se leggi file sottoposti a compressione GZIP, potresti notare che la pipeline funziona molto più lentamente rispetto alla lettura di file non compressi o file compressi con BZIP (che è suddividibile). Analogamente, se utilizzi l'origine database e la hai configurata per utilizzare una sola suddivisione, il report viene eseguito molto più lentamente rispetto a quando la configuri per utilizzare più suddivisioni.

Mescola

Alcuni tipi di plug-in causano il rimescolamento dei dati nel cluster. Questo accade quando i record elaborati da un'eseguitrice devono essere inviati a un'altra per eseguire il calcolo. Le operazioni di mescolamento sono costose perché richiedono molto I/O. I plug-in che causano il rimescolamento dei dati vengono visualizzati tutti nella sezione Analytics di Pipeline Studio. Sono inclusi i plug-in, come Gruppa per, Deduplica, Distinto e Joiner. Ad esempio, supponiamo che alla pipeline nell'esempio precedente sia stato aggiunto un passaggio Raggruppa per.

Supponiamo inoltre che i dati letti rappresentino gli acquisti effettuati in un supermercato. Ogni record contiene un campo item e un campo num_purchased. Nella fase Raggruppa per, configuriamo la pipeline in modo da raggruppare i record in base al campo item e calcolare la somma del campo num_purchased.

Quando viene eseguita la pipeline, i file di input vengono suddivisi come descritto in precedenza. Dopodiché, ogni record viene mescolato nel cluster in modo che ogni record con lo stesso elemento appartenga allo stesso executor.

Come illustrato nell'esempio precedente, i record per gli acquisti di mele erano inizialmente suddivisi tra diversi esecutori. Per eseguire l'aggregazione, tutti i record dovevano essere inviati al cluster allo stesso executor.

La maggior parte dei plug-in che richiedono un'operazione di mescolamento ti consente di specificare il numero di partizioni da utilizzare per mescolare i dati. Controlla il numero di esecuzioni utilizzate per elaborare i dati rimescolati.

Nell'esempio precedente, se il numero di partizioni è impostato su 2, ogni executor calcola gli aggregati per due elementi anziché uno.

Tieni presente che è possibile ridurre il parallelismo della pipeline dopo questa fase. Ad esempio, prendi in considerazione la visualizzazione logica della pipeline:

Se l'origine suddivide i dati in 500 partizioni, ma il gruppo per mescola utilizzando 200 partizioni, il livello massimo di parallelismo dopo il gruppo per cala da 500 a 200. Invece di 500 diversi file part scritti in Cloud Storage, ne hai solo 200.

Scegliere le partizioni

Se il numero di partizioni è troppo basso, non utilizzerai l'intera capacità del tuo cluster per eseguire in parallelo il maggior numero possibile di attività. Se le partizioni sono impostate su un valore troppo elevato, aumenta l'overhead non necessario. In generale, è meglio usare troppe partizioni che troppo poche. L'overhead aggiuntivo è un problema se la pipeline richiede alcuni minuti per essere eseguita e stai cercando di risparmiare qualche minuto. Se l'esecuzione della pipeline richiede ore, in genere non devi preoccuparti del sovraccarico.

Un modo utile, ma eccessivamente semplicistico, per determinare il numero di partizioni da utilizzare è impostarlo su max(cluster CPUs, input records / 500,000). In altre parole, prendi il numero di record di input e dividilo per 500.000. Se questo numero è superiore al numero di CPU del cluster, utilizzalo per il numero di partizioni. In caso contrario, utilizza il numero di CPU del cluster. Ad esempio, se il tuo cluster ha 100 CPU e si prevede che la fase di smistamento abbia 100 milioni di record di input, utilizza 200 partizioni.

Una risposta più completa è che le operazioni di ordinamento casuale hanno il rendimento migliore quando i dati di ordinamento intermedio per ogni partizione possono essere inseriti completamente nella memoria di un'unità di esecuzione, in modo che non sia necessario trasferire nulla sul disco. Spark riserva poco meno del 30% della memoria di un Executor per l'archiviazione dei dati di smistamento. Il numero esatto è (memoria totale - 300 MB) * 30%. Se supponiamo che ogni executor sia impostato per utilizzare 2 GB di memoria, significa che ogni partizione non deve contenere più di (2 GB - 300 MB) * 30% = circa 500 MB di record. Se supponiamo che ogni record venga compresso fino a 1 KB, significa che (500 MB / partizione) / (1 KB/record) = 500.000 record per partizione. Se gli esecutori stanno utilizzando più memoria o i record sono più piccoli, puoi modificare questo numero di conseguenza.

Distorsione dei dati

Tieni presente che nell'esempio precedente, gli acquisti di vari articoli erano distribuiti in modo uniforme. Ciò significa che sono stati effettuati tre acquisti ciascuno per mele, banane, carote e uova. La permutazione in base a una chiave distribuita in modo uniforme è il tipo di permutazione con il rendimento migliore, ma molti set di dati non dispongono di questa proprietà. Se continuiamo con l'esempio dell'acquisto al supermercato, ci si aspetterebbe di avere molti più acquisti di uova rispetto a quelli di partecipazioni di matrimonio. Quando sono presenti alcune chiavi di ordinamento molto più comuni di altre, hai a che fare con dati distorti. I dati distorti possono avere un rendimento notevolmente peggiore rispetto a quelli non distorti perché un numero esiguo di persone esegue una quantità sproporzionata di lavoro. Ciò fa sì che un piccolo sottoinsieme di partizioni sia molto più grande di tutte le altre.

In questo esempio, gli acquisti di uova sono cinque volte superiori agli acquisti di carte, il che significa che il calcolo dell'aggregato delle uova richiede circa cinque volte più tempo. Non fa molto la differenza se si hanno a che fare con solo 10 record anziché due, ma fa una grande differenza se si hanno a che fare con cinque miliardi di record anziché un miliardo. Quando si verifica uno scostamento dei dati, il numero di partizioni utilizzate in uno shuffle non ha un grande impatto sul rendimento della pipeline.

Puoi riconoscere la distorsione dei dati esaminando i record di output nel grafico nel tempo. Se la fase genera record a un ritmo molto più elevato all'inizio dell'esecuzione della pipeline e poi rallenta improvvisamente, è possibile che tu abbia dati distorti.

Puoi anche riconoscere lo scostamento dei dati esaminando l'utilizzo della memoria del cluster nel tempo. Se il tuo cluster è al massimo della capacità per un po' di tempo, ma ha improvvisamente un utilizzo ridotto della memoria per un determinato periodo di tempo, è anche un segno che hai a che fare con uno scostamento dei dati.

I dati distorti influiscono in modo più significativo sul rendimento quando viene eseguito un join. Esistono alcune tecniche che possono essere utilizzate per migliorare le prestazioni per le unioni con distribuzione non uniforme. Per ulteriori informazioni, consulta Elaborazione parallela per le operazioni JOIN.

Ottimizzazione adattiva per l'esecuzione

Per ottimizzare l'esecuzione in modo adattivo, specifica l'intervallo di partizioni da utilizzare, non il numero esatto della partizione. Il numero esatto della partizione, anche se impostato nella configurazione della pipeline, viene ignorato quando è attiva l'esecuzione adattiva.

Se utilizzi un cluster Dataproc temporaneo, Cloud Data Fusion imposta automaticamente la configurazione corretta, ma per i cluster statici Dataproc o Hadoop, puoi impostare i due parametri di configurazione seguenti:

  • spark.default.parallelism: impostalo sul numero totale di vCore disponibili nel cluster. In questo modo, il cluster non è sottoutilizzato e viene definito il limite inferiore per il numero di partizioni.
  • spark.sql.adaptive.coalescePartitions.initialPartitionNum: impostalo su 32 volte il numero di vCore disponibili nel cluster. Questo definisce il limite superiore per il numero di partizioni.
  • Spark.sql.adaptive.enabled: per attivare le ottimizzazioni, imposta questo valore su true. Dataproc lo imposta automaticamente, ma se utilizzi cluster Hadoop generici, devi assicurarti che sia abilitato .

Questi parametri possono essere impostati nella configurazione dell'engine di una pipeline specifica o nelle proprietà del cluster di un cluster Dataproc statico.

Passaggi successivi