La modalità di flessibilità avanzata (EFM) di Dataproc gestisce i dati in ordine casuale per ridurre al minimo i ritardi nell'avanzamento dei job causati dalla rimozione dei nodi da un cluster in esecuzione. EFM scarica i dati in ordine casuale in una delle due modalità selezionabili dall'utente:
Riproduzione casuale del lavoratore principale. I mappatori scrivono i dati ai worker principali. I worker estraeno da quei nodi remoti durante la fase di riduzione. Questa modalità è disponibile solo per i job Spark ed è consigliata per questi job.
Riproduzione casuale del file system HCFS (Hadoop Compatible File System). I mappatori scrivono i dati in un'implementazione HCFS (HDFS per impostazione predefinita). Come nella modalità worker principale, solo i worker principali partecipano alle implementazioni HDFS e HCFS (se shuffle di HCFS utilizza il connettore Cloud Storage, i dati vengono archiviati all'esterno del cluster). Questa modalità può trarre vantaggio dai job con piccole quantità di dati, ma a causa di limitazioni di scalabilità, non è consigliata per job di grandi dimensioni.
Dal momento che entrambe le modalità EFM non archiviano i dati di shuffling intermedi sui worker secondari, il protocollo EFM è adatto ai cluster che utilizzano VM prerilasciabili o scalano automaticamente solo il gruppo di worker secondari.
- I job Apache Hadoop YARN che non supportano il trasferimento della proprietà AppMaster possono non riuscire in modalità di flessibilità avanzata (consulta la sezione Quando attendere il completamento di AppMaster).
- Modalità di flessibilità avanzata sconsigliato:
- su un cluster che ha solo worker principali.
- La modalità di flessibilità avanzata non è supportata:
- Quando è abilitata la scalabilità automatica dei worker principali. Nella maggior parte dei casi, i worker principali continueranno ad archiviare i dati in ordine casuale di cui non viene eseguita la migrazione automatica. La riduzione del gruppo di worker principale annulla i vantaggi dell'EFM.
- Quando i job Spark vengono eseguiti su un cluster in cui è abilitato la rimozione controllata. Il ritiro raffinato e l'EFM possono funzionare a scopi incrociati, poiché il meccanismo di ritiro gestito da YARN mantiene i nodi per la CESSAZIONE fino al completamento di tutte le applicazioni interessate.
Utilizzo della modalità di flessibilità avanzata
La modalità di flessibilità avanzata viene configurata per ogni motore di esecuzione e deve essere configurata durante la creazione del cluster.
L'implementazione di Spark EFM è configurata con la proprietà cluster
dataproc:efm.spark.shuffle
. Valori validi della proprietà:primary-worker
per il worker casuale del worker principale (consigliato)hcfs
per la riproduzione casuale basata su HCFS. Questa modalità è ritirata ed è disponibile solo sui cluster che eseguono la versione immagine 1.5. Non consigliato per i nuovi flussi di lavoro.
L'implementazione di MapReduce di Hadoop è configurata con la proprietà del cluster
dataproc:efm.mapreduce.shuffle
. Valori validi della proprietà:hcfs
Esempio: crea un cluster con shuffling del worker principale per Spark e shuffle di HCFS per MapReduce:
gcloud dataproc clusters create cluster-name \ --region=region \ --properties=dataproc:efm.spark.shuffle=primary-worker \ --properties=dataproc:efm.mapreduce.shuffle=hcfs \ --worker-machine-type=n1-highmem-8 \ --num-workers=25 \ --num-worker-local-ssds=2 \ --secondary-worker-type=preemptible \ --secondary-worker-boot-disk-size=500GB \ --num-secondary-workers=25
Esempio di Apache Spark
- Eseguire un job WordCount sul testo di Shakespeare pubblico utilizzando il jar di esempio Spark nel cluster EFM.
gcloud dataproc jobs submit spark \ --cluster=cluster-name \ --region=region \ --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \ --class=org.apache.spark.examples.JavaWordCount \ -- gs://apache-beam-samples/shakespeare/macbeth.txt
Esempio di MapReduce di Apache Hadoop
Esegui un piccolo job teragen per generare dati di input in Cloud Storage per un job terasort successivo utilizzando il jar di esempio mapreduce sul cluster EFM.
gcloud dataproc jobs submit hadoop \ --cluster=cluster-name \ --region=region \ --jar=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \ -- teragen 1000 Cloud Storage output URI (for example, gs://terasort/input)
Eseguire un job terasort sui dati.
gcloud dataproc jobs submit hadoop \ --cluster=cluster-name \ --region=region \ --jar=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \ -- terasort gs://terasort/input gs://terasort/output
Configurazione degli SSD locali per lo shuffling del worker principale
Le implementazioni shuffling del personale e HDFS scrivono i dati di shuffling intermedi nei dischi collegati a VM e traggono vantaggio dalla velocità effettiva aggiuntiva e dalle IOPS offerte dagli SSD locali. Per facilitare l'allocazione delle risorse, scegli come target un obiettivo di circa 1 partizione SSD locale per 4 vCPU durante la configurazione delle macchine worker principali.
Per collegare gli SSD locali, passa il flag --num-worker-local-ssds
al comando
gcloud dataproc clusters create.
Rapporto worker secondario
Poiché i worker secondari scrivono i dati in ordine casuale presso i worker principali, il cluster deve contenere un numero sufficiente di worker principali con risorse di CPU, memoria e disco sufficienti per supportare il carico shuffling del job. Per
i cluster con scalabilità automatica, per impedire la scalabilità e il comportamento indesiderato del gruppo principale, imposta minInstances
sul valore maxInstances
nel
criterio di scalabilità automatica
per il gruppo di worker principale.
Se hai un elevato rapporto worker secondario-principale (ad esempio 10:1), monitora l'utilizzo della CPU, della rete e del disco dei worker principali per determinare se sono sovraccarichi. Per farlo:
Vai alla pagina Istanze VM nella console Google Cloud.
Fai clic sulla casella di controllo sul lato sinistro del worker principale.
Fai clic sulla scheda MONITORAGGIO per visualizzare l'utilizzo della CPU del worker primario, i IOPS del disco, i byte di rete e altre metriche.
Se i worker principali sono sovraccarichi, valuta la possibilità di scalare manualmente i worker principali.
Ridimensionamento del gruppo di worker principale
Lo scale up del gruppo di worker principale può essere fatto in sicurezza, ma la scalabilità del gruppo di worker principale può influire negativamente sull'avanzamento dei job. Le operazioni che eseguono lo scale down del gruppo di worker principale devono utilizzare il ritiro elegante, che viene abilitato impostando il flag --graceful-decommission-timeout
.
Cluster con scalabilità automatica: la scalabilità del gruppo di worker principale è disabilitata sui cluster EFM con criteri di scalabilità automatica. Per ridimensionare il gruppo di worker principale su un cluster con scalabilità automatica:
Disabilita scalabilità automatica.
gcloud dataproc clusters update \ --cluster=cluster-name \ --region=region \ --disable-autoscaling
Scalare il gruppo principale.
gcloud dataproc clusters update \ --cluster=cluster-name \ --region=region \ --num-workers=num-primary-workers \ --graceful-decommission-timeout=graceful-decommission-timeout # (if downscaling)
Riattiva la scalabilità automatica:
gcloud dataproc clusters update \ --cluster=cluster-name \ --region=region \ --autoscaling-policy=autoscaling-policy
Monitoraggio dell'utilizzo del disco worker principale
I worker principali devono avere spazio su disco sufficiente per i dati di shuffle del cluster.
Puoi monitorare questa metrica indirettamente tramite la metrica remaining HDFS capacity
.
Man mano che il disco locale si riempie, lo spazio diventa non disponibile per HDFS e la capacità rimanente diminuisce.
Per impostazione predefinita, quando il disco locale di un worker principale utilizza superamento del 90% della capacità, il nodo viene contrassegnato come NON SALUTE nell'interfaccia utente dei nodi YARN. Se riscontri problemi di capacità del disco, puoi eliminare i dati inutilizzati dall'HDFS o fare lo scale up del pool di worker principale.
Tieni presente che i dati di shuffle intermedi non vengono generalmente eliminati fino alla fine di un job. Quando utilizzi il shuffling del worker principale con Spark, possono essere necessari fino a 30 minuti dopo il completamento di un job.
Configurazione avanzata
Partizionamento e parallelismo
Quando invii un job MapReduce o Spark, configura un livello di partizionamento appropriato. Decidere il numero di partizioni di input e di output per una fase di shuffle richiede un compromesso tra diverse caratteristiche di prestazioni. È preferibile sperimentare valori che funzionano bene per le forme dei job.
Partizioni di input
Il partizionamento degli input MapReduce e Spark è determinato dal set di dati di input. Durante la lettura dei file da Cloud Storage, ogni attività elabora approssimativamente un valore di "dimensioni blocco".
Per i job Spark SQL, la dimensione massima della partizione è controllata da
spark.sql.files.maxPartitionBytes
. Valuta la possibilità di portarlo a 1 GB:spark.sql.files.maxPartitionBytes=1073741824
.Per i job MapReduce e gli RDD di Spark, le dimensioni delle partizioni sono generalmente controllate da
fs.gs.block.size
, il cui valore predefinito è 128 MB. Ti consigliamo di aumentarla a 1 GB. Puoi anche impostareInputFormat
proprietà specifiche, comemapreduce.input.fileinputformat.split.minsize
emapreduce.input.fileinputformat.split.maxsize
- Per i job MapReduce:
--properties fs.gs.block.size=1073741824
- Per gli RDD di Spark:
--properties spark.hadoop.fs.gs.block.size=1073741824
- Per i job MapReduce:
Partizioni di output
Il numero di attività nelle fasi successive è controllato da diverse proprietà. Per i job più grandi che elaborano più di 1 TB, valuta la possibilità di avere almeno 1 GB per partizione.
Per i job MapReduce, il numero di partizioni di output è controllato dal criterio
mapreduce.job.reduces
.Per Spark SQL, il numero di partizioni di output è controllato da
spark.sql.shuffle.partitions
.Per i job Spark che utilizzano l'API RDD, puoi specificare il numero di partizioni di output o impostare
spark.default.parallelism
.
Messa a punto casuale per riproduzione casuale del worker principale
La proprietà più significativa è --properties yarn:spark.shuffle.io.serverThreads=<num-threads>
.
Tieni presente che si tratta di una proprietà YARN a livello di cluster perché il server shuffling viene eseguito come parte di Node Manager. Il valore predefinito è due volte (2x) numero di core sulla macchina (ad esempio, 16 thread su n1-highmem-8). Se "Tempo di lettura casuale bloccato" è
superiore a 1 secondo e i worker principali non hanno raggiunto i limiti di rete, CPU
o disco, valuta la possibilità di aumentare il numero di thread del server di riproduzione casuale.
Sui tipi di macchine più grandi, ti consigliamo di aumentare spark.shuffle.io.numConnectionsPerPeer
, il cui valore predefinito è 1. Ad esempio, impostala su 5 connessioni per coppia di host.
Aumento dei tentativi
È possibile configurare il numero massimo di tentativi consentiti per master, app e fasi dell'app impostando le seguenti proprietà:
yarn:yarn.resourcemanager.am.max-attempts mapred:mapreduce.map.maxattempts mapred:mapreduce.reduce.maxattempts spark:spark.task.maxFailures spark:spark.stage.maxConsecutiveAttempts
Poiché i master e le attività delle app vengono terminati più spesso nei cluster che utilizzano molte VM prerilasciabili o la scalabilità automatica senza rimozione controllata, può essere utile aumentare i valori delle proprietà precedenti in questi cluster (tieni presente che l'utilizzo di EFM con Spark e il rimozione controllata controllato non è supportato).
Configurazione di HDFS per la riproduzione casuale di HCFS
Per migliorare le prestazioni dei shuffle di grandi dimensioni, puoi ridurre la contesa del blocco nel NameNode impostando dfs.namenode.fslock.fair=false
. Tieni presente che questo riduce il rischio di una singola richiesta, ma potrebbe migliorare la velocità effettiva a livello di cluster.
Per migliorare ulteriormente le prestazioni di NameNode, puoi collegare gli SSD locali al nodo master impostando --num-master-local-ssds
. Puoi anche aggiungere SSD locali ai worker principali per migliorare le prestazioni di DataNode impostando --num-worker-local-ssds
.
Altri file system compatibili con Hadoop per HCFS in ordine casuale
Per impostazione predefinita, i dati di FMFS HCFS di EFM vengono scritti in HDFS, ma puoi utilizzare qualsiasi
file system compatibile con HAFS (HCFS).
Ad esempio, potresti decidere di scrivere in Shuffle su Cloud Storage o in un HDFS di un altro cluster. Per specificare un file system, puoi puntare fs.defaultFS
al file system di destinazione quando invii un job al cluster.
Ritiro YARN dei controlli relativi ai cluster EFM
YARN Graceful Decommissioning può essere utilizzato per rimuovere rapidamente i nodi con un impatto minimo sull'esecuzione delle applicazioni. Per i cluster con scalabilità automatica, il timeout con ritiro gestito può essere impostato in un AutoscalingPolicy collegato al cluster EFM.
Miglioramenti del servizio EFM MapReduce al rimozione controllata
Poiché i dati intermedi sono archiviati in un file system distribuito, i nodi possono essere rimossi da un cluster EFM non appena tutti i container in esecuzione su quei nodi sono terminati. In confronto, i nodi vengono rimossi sui cluster Dataproc standard solo al termine dell'applicazione.
La rimozione del nodo non attende il completamento dei master per app in esecuzione su un nodo. Quando il container master per app viene terminato, viene ripianificato su un altro nodo che non viene ritirato. L'avanzamento del job non viene perso: il nuovo master per app recupera rapidamente lo stato del master dell'app precedente leggendo la cronologia dei job.
Utilizzo di rimozione controllata su un cluster EFM con MapReduce
Crea un cluster EFM con un numero uguale di worker principali e secondari.
gcloud dataproc clusters create cluster-name \ --properties=dataproc:efm.mapreduce.shuffle=hcfs \ --region=region \ --num-workers=5 \ --num-secondary-workers=5
Eseguire un job mappareduce che calcola il valore di pi greco utilizzando l'jar di esempio mapreduce sul cluster.
gcloud dataproc jobs submit hadoop \ --cluster=cluster-name \ --region=region \ --jar=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \ -- pi 1000 10000000
Mentre il job è in esecuzione, fare lo scale down del cluster utilizzando la rimozione controllata.
gcloud dataproc clusters update cluster-name \ --region=region \ --num-secondary-workers=0 \ --graceful-decommission-timeout=1h
I nodi verranno rimossi dal cluster rapidamente prima del termine del job, riducendo al minimo la perdita dell'avanzamento dei job. Possono verificarsi pause temporanee nell'avanzamento dei job a causa di:
- Failover dell'app master. Se l'avanzamento del job scende a 0% e poi passa immediatamente al valore pre-drop, il master dell'app potrebbe essere terminato e un nuovo master dell'app ha recuperato il proprio stato. Ciò non dovrebbe influire in modo significativo sull'avanzamento del job poiché il failover avviene rapidamente.
- Prerilascio della VM. Poiché HDFS conserva solo gli output completi, non parziali, delle attività sulla mappa, possono verificarsi pause temporanee nell'avanzamento dei job quando una VM viene prerilasciata durante l'esecuzione di un'attività sulla mappa.