Le pipeline vengono eseguite su cluster di macchine. Raggiungono una velocità effettiva elevata suddividere il lavoro da svolgere e poi eseguirlo in in parallelo ai vari esecutori distribuiti nel cluster. In generale, maggiore è il numero di suddivisioni (dette anche partizioni), più velocemente eseguire la pipeline. Il livello di parallelismo nella pipeline è determinato dalle origini e dalle fasi di mescolamento della pipeline.
Fonti
All'inizio di ogni esecuzione della pipeline, ogni origine della pipeline calcola quali dati devono essere letti e come possono essere suddivisi. Ad esempio, considera una pipeline di base che legge da Cloud Storage, esegue alcune trasformazioni di Wrangler e poi scrive nuovamente in Cloud Storage.
Quando la pipeline viene avviata, l'origine Cloud Storage esamina i file di input e li suddivide in base alle dimensioni. Ad esempio, un singolo gigabyte può essere suddiviso in 100 divisioni, ogni 10 MB dimensioni. Ogni esecutore legge i dati per quella suddivisione, quindi esegue Wrangler trasformazioni e scrive l'output in un file part.
Se la pipeline funziona lentamente, una delle prime cose da controllare è se le origini creano suddivisioni sufficienti per sfruttare appieno il parallelismo. Ad esempio, alcuni tipi di compressione rendono i file di testo non frazionabili. Se leggi file sottoposti a compressione GZIP, potresti notare che la pipeline funziona molto più lentamente rispetto alla lettura di file non compressi o file compressi con BZIP (che è suddividibile). Analogamente, se utilizzi l'origine database e la hai configurata per utilizzare una sola suddivisione, il report viene eseguito molto più lentamente rispetto a quando la configuri per utilizzare più suddivisioni.
Mescola
Alcuni tipi di plug-in causano lo shuffling dei dati nel cluster. Questo accade quando i record elaborati da un'eseguitrice devono essere inviati a un'altra per eseguire il calcolo. Le operazioni di mescolamento sono operazioni costose perché richiedono molto I/O. I plug-in che causano il rimescolamento dei dati vengono visualizzati tutti nella sezione Analytics di Pipeline Studio. Sono inclusi plug-in come Raggruppa per, Deduplica, Distinzione e Partecipa. Ad esempio, supponiamo che un campo Raggruppa per di Compute Engine viene aggiunta alla pipeline nell'esempio precedente.
Supponiamo inoltre che i dati letti rappresentino gli acquisti effettuati in un supermercato.
Ogni record contiene un campo item
e un campo num_purchased
. Nella sezione Gruppo
Per fase, configuriamo la pipeline per raggruppare i record nel campo item
e
calcolando la somma del campo num_purchased
.
Quando viene eseguita la pipeline, i file di input vengono suddivisi come descritto in precedenza. Dopo il giorno ogni record viene eseguito lo shuffling nel cluster in modo che ogni record lo stesso elemento appartiene allo stesso esecutore.
Come illustrato nell'esempio precedente, i record per gli acquisti di mele erano inizialmente suddivisi tra diversi esecutori. Per eseguire l'aggregazione, tutti di questi record dovevano essere inviati nel cluster allo stesso esecutore.
La maggior parte dei plug-in che richiedono un'operazione di mescolamento ti consente di specificare il numero di partizioni da utilizzare per mescolare i dati. Controlla il numero di esecutori utilizzati per elaborare i dati rimescolati.
Nell'esempio precedente, se il numero di partizioni è impostato su 2
, ogni esecutore calcola i dati aggregati per due elementi anziché uno.
Tieni presente che è possibile ridurre il parallelismo della pipeline dopo questa fase. Ad esempio, prendi in considerazione la visualizzazione logica della pipeline:
Se l'origine suddivide i dati in 500 partizioni, ma il gruppo per mescola utilizzando 200 partizioni, il livello massimo di parallelismo dopo il gruppo per cala da 500 a 200. Invece di 500 diversi file part scritti in Cloud Storage, ne hai solo 200.
Scegliere le partizioni
Se il numero di partizioni è troppo basso, non utilizzerai l'intera capacità del tuo cluster per eseguire in parallelo il maggior numero possibile di attività. Imposto anche le partizioni un valore elevato aumenta l'overhead non necessario. In generale, è meglio usare troppe partizioni che troppo poche. L'overhead aggiuntivo è qualcosa di cui preoccuparsi se l'esecuzione della tua pipeline richiede qualche minuto e vuoi ridurre un paio di minuti. Se l'esecuzione della pipeline richiede ore, in genere non devi preoccuparti del sovraccarico.
Un modo utile ma eccessivamente semplicistico per determinare il numero di partizioni
è di impostarlo su max(cluster CPUs, input records / 500,000)
. In altre
dividiamo il numero di record di input per 500.000. Se il numero è
maggiore del numero di CPU del cluster, usalo per il numero di partizioni.
In caso contrario, utilizza il numero di CPU del cluster. Ad esempio, se il tuo cluster ha 100 CPU e si prevede che la fase di smistamento abbia 100 milioni di record di input, utilizza 200 partizioni.
Una risposta più completa è che lo shuffling ha un rendimento migliore quando lo shuffling dei dati per ogni partizione può rientrare completamente nella memoria di un esecutore che nulla debba essere versato su disco. Spark riserva poco meno del 30% della memoria di un Executor per l'archiviazione dei dati di smistamento. Il numero esatto è (memoria totale - 300 MB) * 30%. Se supponiamo che ogni executor sia impostato per utilizzare 2 GB di memoria, significa che ogni partizione non deve contenere più di (2 GB - 300 MB) * 30% = circa 500 MB di record. Supponendo che ogni record venga compresso a 1 kB, ciò significa (500 MB / partizione) / (1 kB / ) = 500.000 record per partizione. Se gli esecutori stanno utilizzando più memoria o i record sono più piccoli, puoi modificare questo numero di conseguenza.
Distorsione dei dati
Tieni presente che, nell'esempio precedente, gli acquisti di vari articoli erano uniforme distribuiti in tempo reale. In altre parole, ci sono stati tre acquisti ciascuno per mele, banane, carote e uova. La permutazione in base a una chiave distribuita in modo uniforme è il tipo di permutazione con il rendimento migliore, ma molti set di dati non dispongono di questa proprietà. Se continuiamo con l'esempio dell'acquisto al supermercato, ci aspetteremmo molti più acquisti di uova rispetto a quelli di inviti di matrimonio. In caso di qualche riproduzione casuale che sono molto più comuni di altre chiavi, hai a che fare con chiavi e i dati di Google Cloud. I dati disallineati possono funzionare molto peggiori di quelli non disallineati perché una quantità sproporzionata di lavoro viene svolta da una piccola manciata esecutori. Ciò fa sì che un piccolo sottoinsieme di partizioni sia molto più grande di tutte le altre.
In questo esempio, gli acquisti di uova sono cinque volte superiori agli acquisti di carte, il che significa che il calcolo dell'aggregato delle uova richiede circa cinque volte più tempo. it non importa molto quando si ha a che fare con solo 10 record, anziché due, fa un'enorme differenza quando ha a che fare con cinque miliardi di record invece che uno miliardi di dollari. Quando si verifica uno scostamento dei dati, il numero di partizioni utilizzate in uno shuffle non ha un grande impatto sul rendimento della pipeline.
Puoi riconoscere il disallineamento dei dati esaminando il grafico per individuare i record di output nel tempo. Se la fase restituisce i record a un ritmo molto più elevato all'inizio del dell'esecuzione della pipeline e poi rallenta improvvisamente, ciò potrebbe significare che i dati sono distorti.
Puoi anche riconoscere il disallineamento dei dati esaminando l'utilizzo della memoria del cluster nel tempo. Se il cluster è alla capacità da un po' di tempo, ma improvvisamente ha ridotto l'utilizzo della memoria per un periodo di tempo, questo è anche un segnale che ci sono dei disallineamenti dei dati.
I dati distorti influiscono in modo più significativo sul rendimento quando viene eseguito un join. Esistono alcune tecniche che possono essere utilizzate per migliorare il rendimento
per i join disallineati. Per ulteriori informazioni, consulta
Elaborazione parallela per le operazioni JOIN
.
Ottimizzazione adattiva per l'esecuzione
Per ottimizzare in modo adattivo l'esecuzione, specifica l'intervallo di partizioni da utilizzare, non il numero di partizione esatto. Il numero esatto della partizione, anche se impostato nella configurazione della pipeline, viene ignorato quando è attiva l'esecuzione adattiva.
Se utilizzi un cluster Dataproc temporaneo, Cloud Data Fusion imposta automaticamente la configurazione adeguata, ma per i di Dataproc o Hadoop, le due configurazioni successive è possibile impostare i seguenti parametri:
spark.default.parallelism
: impostalo sul numero totale di vCore disponibili nel cluster. Ciò garantisce che il cluster non venga sottocaricato e definisce limite inferiore per il numero di partizioni.spark.sql.adaptive.coalescePartitions.initialPartitionNum
: impostalo su 32 volte il numero di vCore disponibili nel cluster. Questo definisce il valore per il numero di partizioni.Spark.sql.adaptive.enabled
: per attivare le ottimizzazioni, imposta questo valore sutrue
. Dataproc lo imposta automaticamente, ma se utilizzi di cluster Hadoop generici, devi assicurarti che sia abilitato .
Questi parametri possono essere impostati nella configurazione dell'engine di una pipeline specifica o nelle proprietà del cluster di un cluster Dataproc statico.
Passaggi successivi
- Scopri di più sull'elaborazione parallela per le operazioni
JOIN
.