La modalità di flessibilità avanzata (EFM) di Dataproc consente di gestire i dati elaborati in ordine casuale per ridurre al minimo i ritardi nell'avanzamento dei job causati dalla rimozione dei nodi da un cluster in esecuzione. La modalità EFM trasferisce i dati di shuffling in uno dei due modelli selezionabili dall'utente modalità:
Riproduzione casuale casuale di worker principali. I mappatori scrivono dati ai lavoratori principali. Pull worker dai nodi remoti durante la fase di riduzione. Questa modalità è disponibile soltanto e l'opzione è consigliata per i job Spark.
Riproduzione casuale HCFS (Hadoop Compatible File System). I mappatori scrivono dati in un Implementazione di HCFS (HDFS per impostazione predefinita). Come per la modalità worker principale, partecipano solo i worker principali nelle implementazioni HDFS e HCFS (se Lo shuffle di HCFS utilizza il connettore Cloud Storage, vengono archiviati fuori dal cluster). Questa modalità può essere utile per i job con piccole quantità di dati, ma a causa delle limitazioni di scalabilità, non è consigliata per i job di grandi dimensioni.
Poiché entrambe le modalità EFM non memorizzano i dati elaborati in ordine casuale intermedi sui worker secondari, EFM è molto adatta ai cluster che utilizzano VM prerilasciabili o che applicano la scalabilità automatica solo al gruppo di worker secondari.
- I job Apache Hadoop YARN che non supportano il trasferimento di AppMaster possono non riuscire in modalità di maggiore flessibilità (consulta Quando attendere il completamento degli AppMaster).
- La modalità di flessibilità avanzata non è consigliata:
- su un cluster che ha solo worker principali
- sui job di flussi di dati poiché possono richiedere fino a 30 minuti dopo il completamento per la pulizia dei dati intermedi di shuffling.
- La modalità di flessibilità avanzata non è supportata:
- quando è attiva la scalabilità automatica del worker principale. Nella maggior parte dei casi, i worker principali continueranno a memorizzare i dati di smistamento di cui non viene eseguita la migrazione automatica. Lo scale down del gruppo di worker principale annulla i vantaggi della funzionalità EFM.
- quando i job Spark vengono eseguiti su un cluster in cui è abilitato rimozione controllata. Il ritiro controllato e l'EFM possono essere in conflitto poiché il meccanismo di ritiro controllato YARN mantiene i nodi DECOMMISSIONING fino al completamento di tutte le applicazioni coinvolte.
Utilizzo della modalità di flessibilità avanzata
La modalità di flessibilità avanzata è configurata per ogni motore di esecuzione e deve essere configurato durante la creazione del cluster.
L'implementazione di Spark EFM è configurata con
dataproc:efm.spark.shuffle
proprietà cluster. Valori validi della proprietà:primary-worker
per lo shuffling del worker principale (consigliato)hcfs
per lo shuffling basato su HCFS. Questa modalità è ritirata ed è disponibile solo sui cluster che eseguono l'immagine versione 1.5. Opzione sconsigliata per i nuovi flussi di lavoro.
L'implementazione di Hadoop MapReduce è configurata con il
dataproc:efm.mapreduce.shuffle
proprietà cluster. Valori di proprietà validi:hcfs
Esempio: crea un cluster con lo shuffling del worker principale per Spark e lo shuffling HCFS per MapReduce:
gcloud dataproc clusters create cluster-name \ --region=region \ --properties=dataproc:efm.spark.shuffle=primary-worker \ --properties=dataproc:efm.mapreduce.shuffle=hcfs \ --worker-machine-type=n1-highmem-8 \ --num-workers=25 \ --num-worker-local-ssds=2 \ --secondary-worker-type=preemptible \ --secondary-worker-boot-disk-size=500GB \ --num-secondary-workers=25
Esempio di Apache Spark
- Esegui un job WordCount sul testo pubblico di Shakespeare utilizzando il jar di esempi Spark sul cluster EFM.
gcloud dataproc jobs submit spark \ --cluster=cluster-name \ --region=region \ --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \ --class=org.apache.spark.examples.JavaWordCount \ -- gs://apache-beam-samples/shakespeare/macbeth.txt
Esempio di Apache Hadoop MapReduce
Esegui un piccolo job teragen per generare i dati di input in Cloud Storage per un job terasort successivo utilizzando il file jar degli esempi mapreduce sul cluster EFM.
gcloud dataproc jobs submit hadoop \ --cluster=cluster-name \ --region=region \ --jar=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \ -- teragen 1000 Cloud Storage output URI (for example, gs://terasort/input)
Esegui un job terasort sui dati.
gcloud dataproc jobs submit hadoop \ --cluster=cluster-name \ --region=region \ --jar=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \ -- terasort gs://terasort/input gs://terasort/output
Configurazione delle unità SSD locali per lo shuffling del worker principale
Le implementazioni di shuffling del worker principale e HDFS scrivono i dati intermedi sottoposti a shuffling su dischi collegati alla VM e sfruttano il throughput aggiuntivo e le IOPS offerte dagli SSD locali. Per facilitare l'allocazione delle risorse, scegli come obiettivo circa 1 partizione SSD locale per 4 vCPU quando configuri le macchine worker principali.
Per collegare le unità SSD locali, passa il flag --num-worker-local-ssds
al comando
gcloud Dataproc clusters create.
In genere, non hai bisogno di SSD locali sui worker secondari.
Aggiunta di SSD locali ai worker secondari di un cluster (utilizzando
--num-secondary-worker-local-ssds
flag)
è spesso meno importante perché i worker secondari non scrivono i dati di shuffling localmente.
Tuttavia, poiché gli SSD locali migliorano le prestazioni del disco locale, puoi decidere di aggiungere
SSD locali ai worker secondari, se prevedi
job a essere vincolati all'I/O
a causa dell'uso del disco locale: il job utilizza un disco locale significativo per
spazio temporaneo o le partizioni
troppo grandi per essere contenuti in memoria e sovraccaricati su disco.
Rapporto worker secondario
Poiché i worker secondari scrivono i dati di smistamento nei worker principali, il tuo cluster deve contenere un numero sufficiente di worker principali con risorse CPU, memoria e disco sufficienti per gestire il carico di smistamento del job. Per i cluster con scalabilità automatica, per impedire il ridimensionamento del gruppo principale e causare comportamenti indesiderati, imposta minInstances
sul valore maxInstances
nel criterio di scalabilità automatica per il gruppo di worker principale.
Se hai un rapporto elevato tra worker secondari e principali (ad esempio 10:1), monitora l'utilizzo della CPU, della rete e del disco dei worker principali per determinare se sono sovraccaricati. Per farlo:
Vai alla pagina Istanze VM nel nella console Google Cloud.
Fai clic sulla casella di controllo sul lato sinistro del worker principale.
Fai clic sulla scheda MONITORAGGIO per visualizzare l'utilizzo della CPU, IOPS del disco, byte di rete e altre metriche del worker principale.
Se i worker principali sono sovraccarichi, valuta la possibilità di aumentare il livello manualmente.
Ridimensionamento del gruppo di worker principale
Il gruppo di worker principale può essere aumentato in sicurezza, ma la riduzione del gruppo di worker principale può influire negativamente sull'avanzamento del job. Operazioni che eseguono il downgrade
gruppo di worker principale deve utilizzare
il ritiro gestito, che viene abilitato impostando il flag --graceful-decommission-timeout
.
Cluster con scalabilità automatica: la scalabilità del gruppo di worker principale è disabilitata nei cluster EFM con criteri di scalabilità automatica. Per ridimensionare il gruppo di worker principale su una cluster con scalabilità automatica:
Disattiva la scalabilità automatica.
gcloud dataproc clusters update \ --cluster=cluster-name \ --region=region \ --disable-autoscaling
Scala il gruppo principale.
gcloud dataproc clusters update \ --cluster=cluster-name \ --region=region \ --num-workers=num-primary-workers \ --graceful-decommission-timeout=graceful-decommission-timeout # (if downscaling)
Riattiva la scalabilità automatica:
gcloud dataproc clusters update \ --cluster=cluster-name \ --region=region \ --autoscaling-policy=autoscaling-policy
Monitoraggio dell'utilizzo del disco del worker principale
I worker principali devono disporre di spazio su disco sufficiente per i dati di shuffling del cluster.
Puoi monitorarlo indirettamente tramite la metrica remaining HDFS capacity
.
Quando il disco locale si riempie, lo spazio diventa non disponibile per HDFS.
la capacità rimanente diminuisce.
Per impostazione predefinita, quando il disco locale di un worker principale usa supera il 90% della capacità, il nodo verrà contrassegnato come UNHEALTHY nella UI del nodo YARN. Se riscontri problemi di capacità, puoi eliminare i dati inutilizzati da HDFS o fare lo scale up pool di worker principale.
Configurazione avanzata
Partizionamento e parallelismo
Quando invii un job MapReduce o Spark, configura un livello appropriato di il partizionamento orizzontale. La scelta del numero di partizioni di input e output per una fase di ordinamento comporta un compromesso tra diverse caratteristiche di prestazioni. È meglio sperimentare i valori più adatti alle tue forme di lavoro.
Partizioni di input
La partizione dell'input di MapReduce e Spark è determinata dal set di dati di input. Quando leggi i file da Cloud Storage, ogni attività elabora circa un "blocco di dimensioni" di dati.
Per i job Spark SQL, la dimensione massima della partizione è controllata da
spark.sql.files.maxPartitionBytes
. Valuta la possibilità di aumentarlo a 1 GB:spark.sql.files.maxPartitionBytes=1073741824
.Per i job MapReduce e gli RDD Spark, la dimensione della partizione viene generalmente controllata con
fs.gs.block.size
, che per impostazione predefinita è 128 MB. Valuta la possibilità di aumentarlo a 1 GB. Puoi anche impostare proprietàInputFormat
specifiche comemapreduce.input.fileinputformat.split.minsize
emapreduce.input.fileinputformat.split.maxsize
- Per i job MapReduce:
--properties fs.gs.block.size=1073741824
- Per gli RDD di Spark:
--properties spark.hadoop.fs.gs.block.size=1073741824
- Per i job MapReduce:
Partizioni di output
Il numero di attività nelle fasi successive è controllato da diverse proprietà. Per job di dimensioni maggiori che elaborano più di 1 TB, valuta la possibilità di avere almeno 1 GB per partizione.
Per i job MapReduce, il numero di partizioni di output è controllato da
mapreduce.job.reduces
.Per Spark SQL, il numero di partizioni di output è controllato da
spark.sql.shuffle.partitions
.Per i job Spark che utilizzano l'API RDD, puoi specificare il numero di partizioni di output o impostare
spark.default.parallelism
.
Ottimizzazione dello shuffling per lo shuffling del worker principale
La proprietà più significativa è --properties yarn:spark.shuffle.io.serverThreads=<num-threads>
.
Tieni presente che si tratta di una proprietà YARN a livello di cluster perché il server di smistamento Spark viene eseguito nell'ambito del gestore dei nodi. Per impostazione predefinita, il numero di core è doppio (2x)
(ad esempio, 16 thread su un n1-highmem-8). Se viene visualizzato il messaggio "Visualizzazione casuale del tempo di blocco della lettura" sono
più di 1 secondo e i worker principali non hanno raggiunto la rete, la CPU o il disco
limiti, valuta la possibilità di aumentare il numero di thread del server di shuffling.
Sui tipi di macchina più grandi, ti consigliamo di aumentare spark.shuffle.io.numConnectionsPerPeer
,
che per impostazione predefinita è 1. Ad esempio, impostalo su 5 connessioni per coppia di host.
Nuovi tentativi
È consentito il numero massimo di tentativi consentiti per i master dell'app, le attività e le fasi configurarli impostando le seguenti proprietà:
yarn:yarn.resourcemanager.am.max-attempts mapred:mapreduce.map.maxattempts mapred:mapreduce.reduce.maxattempts spark:spark.task.maxFailures spark:spark.stage.maxConsecutiveAttempts
Poiché i master e le attività delle app vengono terminati più spesso in cluster che utilizzano molte VM prerilasciabili o scalabilità automatica senza rimozione controllata, aumentando i valori delle proprietà sopra indicate in questi cluster può essere d'aiuto (tieni presente che l'utilizzo di EFM con Spark e rimozione controllata non supportati).
Configurazione di HDFS per shuffle HCFS
Per migliorare le prestazioni degli shuffling di grandi dimensioni, puoi ridurre il conflitto di blocchi in
NameNode impostando dfs.namenode.fslock.fair=false
. Tieni presente che questo rischio
esaurire le singole richieste, ma potrebbe migliorare la velocità effettiva a livello di cluster.
Per migliorare ulteriormente le prestazioni di NameNode, puoi collegare SSD locali al
nodo principale impostando --num-master-local-ssds
. Puoi anche aggiungere SSD locali ai worker principali per migliorare le prestazioni di DataNode impostando --num-worker-local-ssds
.
Altri file system compatibili con Hadoop per l'ordinamento casuale HCFS
Per impostazione predefinita, i dati elaborati in ordine casuale EFM HCFS vengono scritti in HDFS, ma puoi utilizzare qualsiasi
file system compatibile con Hadoop (HCFS).
Ad esempio, potresti decidere di scrivere lo shuffle in Cloud Storage
all'HDFS di un altro cluster. Per specificare un file system, puoi puntare
fs.defaultFS
al file system di destinazione quando invii un job al cluster.
Rimozione controllata di YARN sui cluster EFM
Ritiro gestito YARN consente di rimuovere i nodi rapidamente e con l'impatto sull'esecuzione delle applicazioni. Per i cluster con scalabilità automatica, il timeout di ritiro graduale può essere impostato in un AutoscalingPolicy collegato al cluster EFM.
Miglioramenti di MapReduce EFM alla rimozione controllata
Poiché i dati intermedi vengono archiviati in un file system distribuito, i nodi possono essere rimossi da un cluster EFM non appena tutti i contenitori in esecuzione su questi nodi sono stati completati. In confronto, i nodi non vengono rimossi nei cluster Dataproc standard fino al termine dell'applicazione.
La rimozione del nodo non attende il completamento dei master dell'app in esecuzione su un nodo. Quando il contenitore principale dell'app viene terminato, viene riprogrammato su un altro nodo che non verrà ritirato. L'avanzamento del job non viene perso: il nuovo master dell'app recupera rapidamente lo stato dal master dell'app precedente leggendo la cronologia dei job.
Utilizzo del ritiro gestito automaticamente in un cluster EFM con MapReduce
Crea un cluster EFM con un numero uguale di worker principali e secondari.
gcloud dataproc clusters create cluster-name \ --properties=dataproc:efm.mapreduce.shuffle=hcfs \ --region=region \ --num-workers=5 \ --num-secondary-workers=5
Esegui un job mapreduce che calcola il valore di pi utilizzando il file JAR di esempi mapreduce sul cluster.
gcloud dataproc jobs submit hadoop \ --cluster=cluster-name \ --region=region \ --jar=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \ -- pi 1000 10000000
Mentre il job è in esecuzione, esegui il ridimensionamento del cluster utilizzando il ritiro gestito automaticamente.
I nodi verranno rimossi rapidamente dal cluster prima del termine del job, minimizzando al contempo la perdita di avanzamento del job. Potrebbero verificarsi pause temporanee nell'avanzamento del job a causa di:gcloud dataproc clusters update cluster-name \ --region=region \ --num-secondary-workers=0 \ --graceful-decommission-timeout=1h
- Failover del master dell'app. Se l'avanzamento del job scende allo 0% e poi salta immediatamente fino al valore pre-drop, il master dell'app potrebbe essere mentre un nuovo master dell'app ha recuperato il suo stato. Ciò non dovrebbe influire significativamente l'avanzamento del job, dato che il failover avviene rapidamente.
- Prerilascio delle VM. Poiché HDFS conserva solo gli output completi, non parziali, delle attività di mappatura, possono verificarsi interruzioni temporanee nell'avanzamento del job quando una VM viene prelevata durante il lavoro su un'attività di mappatura.
Per velocizzare la rimozione dei nodi, puoi ridurre il cluster senza il ritiro graduale omettendo il flag --graceful-decommission-timeout
nell'esempio di comando gcloud
precedente. L'avanzamento del job dalle attività di mappa completate:
l'output dell'attività della mappa parzialmente completata andrà perso
(le attività della mappa verranno eseguite nuovamente).