La modalità di flessibilità avanzata (EFM) di Dataproc consente di gestire i dati elaborati in ordine casuale per ridurre al minimo i ritardi nell'avanzamento dei job causati dalla rimozione dei nodi da un cluster in esecuzione. La modalità EFM trasferisce i dati elaborati in ordine casuale in una di due modalità selezionabili dall'utente:
Shuffling del worker principale. I mapper scrivono i dati nei worker principali. I worker estraggono da questi nodi remoti durante la fase di riduzione. Questa modalità è consigliata solo per i job Spark ed è disponibile solo per questi job.
Shuffling del file system compatibile con Hadoop (HCFS). I mapper scrivono i dati in un'implementazione HCFS (HDFS per impostazione predefinita). Come per la modalità dei worker principali, solo i worker principali partecipano alle implementazioni di HDFS e HCFS (se lo shuffling HCFS utilizza il connettore Cloud Storage, i dati vengono archiviati fuori dal cluster). Questa modalità può essere utile per i job con piccole quantità di dati, ma a causa delle limitazioni di scalabilità, non è consigliata per i job di grandi dimensioni.
Poiché entrambe le modalità EFM non memorizzano i dati elaborati in ordine casuale intermedi sui worker secondari, EFM è particolarmente adatta ai cluster che utilizzano VM prerilasciabili o che applicano la scalabilità automatica solo al gruppo di worker secondari.
- I job Apache Hadoop YARN che non supportano il trasferimento di AppMaster possono non riuscire in modalità di maggiore flessibilità (consulta Quando attendere il completamento degli AppMaster).
- La modalità flessibilità avanzata non è consigliata:
- su un cluster con solo worker principali
- sui job di streaming, poiché possono essere necessari fino a 30 minuti dopo il completamento del job per pulire i dati di ordinamento intermedio.
- La modalità flessibilità avanzata non è supportata:
- quando è attiva la scalabilità automatica del worker principale. Nella maggior parte dei casi, i worker principali continueranno a memorizzare i dati di riproduzione casuale di cui non viene eseguita la migrazione automatica. La riduzione del gruppo di worker principale annulla i vantaggi dell'EFM.
- quando i job Spark vengono eseguiti su un cluster con la rimozione controllata abilitata. Il ritiro controllato e l'EFM possono essere in conflitto poiché il meccanismo di ritiro controllato YARN mantiene i nodi DECOMMISSIONING fino al completamento di tutte le applicazioni coinvolte.
Utilizzo della modalità di flessibilità avanzata
La modalità Flessibilità avanzata è configurata per motore di esecuzione e deve essere configurata durante la creazione del cluster.
L'implementazione EFM di Spark è configurata con la
dataproc:efm.spark.shuffle
proprietà cluster. Valori validi della proprietà:primary-worker
per lo shuffling del worker principale (consigliato)hcfs
per lo shuffling basato su HCFS. Questa modalità è ritirata ed è disponibile solo sui cluster che eseguono la versione dell'immagine 1.5. Non consigliato per i nuovi flussi di lavoro.
L'implementazione di Hadoop MapReduce è configurata con la
dataproc:efm.mapreduce.shuffle
proprietà cluster. Valori validi della proprietà:hcfs
Esempio: crea un cluster con lo shuffling del worker principale per Spark e lo shuffling HCFS per MapReduce:
gcloud dataproc clusters create cluster-name \ --region=region \ --properties=dataproc:efm.spark.shuffle=primary-worker \ --properties=dataproc:efm.mapreduce.shuffle=hcfs \ --worker-machine-type=n1-highmem-8 \ --num-workers=25 \ --num-worker-local-ssds=2 \ --secondary-worker-type=preemptible \ --secondary-worker-boot-disk-size=500GB \ --num-secondary-workers=25
Esempio di Apache Spark
- Esegui un job WordCount sul testo pubblico di Shakespeare utilizzando il jar di esempi Spark sul cluster EFM.
gcloud dataproc jobs submit spark \ --cluster=cluster-name \ --region=region \ --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \ --class=org.apache.spark.examples.JavaWordCount \ -- gs://apache-beam-samples/shakespeare/macbeth.txt
Esempio di Apache Hadoop MapReduce
Esegui un piccolo job teragen per generare i dati di input in Cloud Storage per un job terasort successivo utilizzando il file jar degli esempi mapreduce sul cluster EFM.
gcloud dataproc jobs submit hadoop \ --cluster=cluster-name \ --region=region \ --jar=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \ -- teragen 1000 Cloud Storage output URI (for example, gs://terasort/input)
Esegui un job Terasort sui dati.
gcloud dataproc jobs submit hadoop \ --cluster=cluster-name \ --region=region \ --jar=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \ -- terasort gs://terasort/input gs://terasort/output
Configurazione delle unità SSD locali per lo shuffling del worker principale
Le implementazioni di shuffling del worker principale e HDFS scrivono i dati intermedi sottoposti a shuffling su dischi collegati alla VM e sfruttano il throughput aggiuntivo e le IOPS offerti dagli SSD locali. Per facilitare l'allocazione delle risorse, scegli come obiettivo circa 1 partizione SSD locale per 4 vCPU quando configuri le macchine worker principali.
Per collegare le unità SSD locali, passa il flag --num-worker-local-ssds
al comando
gcloud Dataproc clusters create.
In genere, non sono necessarie unità SSD locali sui worker secondari.
L'aggiunta di unità SSD locali ai worker secondari di un cluster (utilizzando il flag --num-secondary-worker-local-ssds
) è spesso meno importante perché i worker secondari non scrivono i dati di miscelazione localmente.
Tuttavia, poiché le unità SSD locali migliorano le prestazioni del disco locale, puoi decidere di aggiungerle ai worker secondari se prevedi che i job siano limitati dall'I/O a causa dell'utilizzo del disco locale: il tuo job utilizza una quantità significativa di spazio su disco locale per la memorizzazione temporanea o le partizioni sono troppo grandi per essere memorizzate in memoria e verranno trasferite sul disco.
Rapporto worker secondari
Poiché i worker secondari scrivono i dati di ordinamento nei worker principali, il tuo cluster deve contenere un numero sufficiente di worker principali con risorse di CPU, memoria e disco sufficienti per gestire il carico di ordinamento del job. Per i cluster con scalabilità automatica, per impedire la scalabilità del gruppo principale e causare comportamenti indesiderati, imposta minInstances
sul valore maxInstances
nel criterio di scalabilità automatica per il gruppo di worker principale.
Se hai un rapporto elevato tra worker secondari e principali (ad esempio 10:1), monitora l'utilizzo della CPU, della rete e del disco dei worker principali per determinare se sono sovraccaricati. Per farlo:
Vai alla pagina Istanze VM nella console Google Cloud.
Fai clic sulla casella di controllo a sinistra del worker principale.
Fai clic sulla scheda MONITORAGGIO per visualizzare l'utilizzo della CPU, gli IOPS del disco, i byte di rete e altre metriche del worker principale.
Se i worker principali sono sovraccaricati, valuta la possibilità di aumentare manualmente il numero di worker principali.
Ridimensionamento del gruppo di worker principale
Il gruppo di worker principale può essere aumentato in sicurezza, ma la riduzione del gruppo di worker principale può influire negativamente sull'avanzamento del job. Le operazioni che riducono il gruppo di lavoro principale devono utilizzare il rimozione controllata, che viene attivato impostando il flag --graceful-decommission-timeout
.
Cluster con scalabilità automatica:la scalabilità del gruppo di worker principale è disabilitata nei cluster EFM con criteri di scalabilità automatica. Per ridimensionare il gruppo di worker principale in un cluster con scalabilità automatica:
Disattiva la scalabilità automatica.
gcloud dataproc clusters update \ --cluster=cluster-name \ --region=region \ --disable-autoscaling
Scala il gruppo principale.
gcloud dataproc clusters update \ --cluster=cluster-name \ --region=region \ --num-workers=num-primary-workers \ --graceful-decommission-timeout=graceful-decommission-timeout # (if downscaling)
Riattiva la scalabilità automatica:
gcloud dataproc clusters update \ --cluster=cluster-name \ --region=region \ --autoscaling-policy=autoscaling-policy
Monitoraggio dell'utilizzo del disco del worker principale
I worker principali devono disporre di spazio su disco sufficiente per i dati di smistamento del cluster.
Puoi monitorarlo indirettamente tramite la metrica remaining HDFS capacity
.
Man mano che il disco locale si riempie, lo spazio diventa non disponibile per HDFS e la capacità rimanente diminuisce.
Per impostazione predefinita, quando l'utilizzo del disco locale di un worker principale supera il 90% della capacità, il nodo verrà contrassegnato come NON INTEGRO nell'interfaccia utente del nodo YARN. Se riscontri problemi di capacità del disco, puoi eliminare i dati inutilizzati da HDFS o aumentare il pool di worker principale.
Configurazione avanzata
Partizionamento e parallelismo
Quando invii un job MapReduce o Spark, configura un livello appropriato di suddivisione. La scelta del numero di partizioni di input e output per una fase di ordinamento comporta un compromesso tra diverse caratteristiche di rendimento. È meglio fare esperimenti con i valori che funzionano per le forme dei tuoi job.
Partizioni di input
La partizione dell'input di MapReduce e Spark è determinata dal set di dati di input. Quando leggi i file da Cloud Storage, ogni attività elabora circa un "blocco di dimensioni" di dati.
Per i job Spark SQL, la dimensione massima della partizione è controllata da
spark.sql.files.maxPartitionBytes
. Valuta la possibilità di aumentarlo a 1 GB:spark.sql.files.maxPartitionBytes=1073741824
.Per i job MapReduce e gli RDD di Spark, le dimensioni della partizione vengono in genere controllate con
fs.gs.block.size
, il cui valore predefinito è 128 MB. Valuta la possibilità di aumentarlo a 1 GB. Puoi anche impostare proprietàInputFormat
specifiche comemapreduce.input.fileinputformat.split.minsize
emapreduce.input.fileinputformat.split.maxsize
- Per i job MapReduce:
--properties fs.gs.block.size=1073741824
- Per gli RDD di Spark:
--properties spark.hadoop.fs.gs.block.size=1073741824
- Per i job MapReduce:
Partizioni di output
Il numero di attività nelle fasi successive è controllato da diverse proprietà. Per job di dimensioni maggiori che elaborano più di 1 TB, valuta la possibilità di avere almeno 1 GB per partizione.
Per i job MapReduce, il numero di partizioni di output è controllato da
mapreduce.job.reduces
.Per Spark SQL, il numero di partizioni di output è controllato da
spark.sql.shuffle.partitions
.Per i job Spark che utilizzano l'API RDD, puoi specificare il numero di partizioni di output o impostare
spark.default.parallelism
.
Ottimizzazione dello shuffling per lo shuffling del worker principale
La proprietà più significativa è --properties yarn:spark.shuffle.io.serverThreads=<num-threads>
.
Tieni presente che si tratta di una proprietà YARN a livello di cluster perché il server di smistamento Spark viene eseguito nell'ambito del gestore dei nodi. Il valore predefinito è il doppio (2x) del numero di core della macchina (ad esempio, 16 thread su una macchina n1-highmem-8). Se "Tempo di blocco lettura riproduzione casuale" è superiore a 1 secondo e i worker principali non hanno raggiunto i limiti di rete, CPU o disco, ti consigliamo di aumentare il numero di thread del server di riproduzione casuale.
Per i tipi di macchine più grandi, ti consigliamo di aumentare spark.shuffle.io.numConnectionsPerPeer
,
che per impostazione predefinita è 1. Ad esempio, impostalo su 5 connessioni per coppia di host.
Nuovi tentativi
Il numero massimo di tentativi consentiti per app master, attività e fasi può essere configurato impostando le seguenti proprietà:
yarn:yarn.resourcemanager.am.max-attempts mapred:mapreduce.map.maxattempts mapred:mapreduce.reduce.maxattempts spark:spark.task.maxFailures spark:spark.stage.maxConsecutiveAttempts
Poiché i master dell'app e le attività vengono terminati più di frequente nei cluster che utilizzano molte VM preemptibili o la scalabilità automatica senza la rimozione controllata, può essere utile aumentare i valori delle proprietà sopra indicate in questi cluster (tieni presente che l'utilizzo di EFM con Spark e il rimozione controllata non è supportato).
Configurazione di HDFS per lo shuffling HCFS
Per migliorare le prestazioni degli schemi di ordinamento di grandi dimensioni, puoi ridurre la contesa dei blocchi nel NameNode impostando dfs.namenode.fslock.fair=false
. Tieni presente che questo rischia di bloccare le singole richieste, ma potrebbe migliorare il throughput a livello di cluster.
Per migliorare ulteriormente le prestazioni del NameNode, puoi collegare SSD locali al
nodo principale impostando --num-master-local-ssds
. Puoi anche aggiungere SSD locali ai worker principali per migliorare le prestazioni di DataNode impostando --num-worker-local-ssds
.
Altri file system compatibili con Hadoop per l'ordinamento casuale HCFS
Per impostazione predefinita, i dati elaborati in ordine casuale EFM HCFS vengono scritti in HDFS, ma puoi utilizzare qualsiasi
file system compatibile con Hadoop (HCFS).
Ad esempio, puoi decidere di scrivere lo shuffling su Cloud Storage o su HDFS di un altro cluster. Per specificare un file system, puoi indicarefs.defaultFS
il file system di destinazione quando invii un job al cluster.
Rimozione controllata di YARN sui cluster EFM
Il ritiro gestito automaticamente di YARN può essere utilizzato per rimuovere rapidamente i nodi con un impatto minimo sulle applicazioni in esecuzione. Per i cluster con scalabilità automatica, il timeout di ritiro graduale puoi essere impostato in un AutoscalingPolicy collegato al cluster EFM.
Miglioramenti all'EFM MapReduce per la rimozione controllata
Poiché i dati intermedi vengono archiviati in un file system distribuito, i nodi possono essere rimossi da un cluster EFM non appena tutti i contenitori in esecuzione su questi nodi sono stati completati. In confronto, i nodi non vengono rimossi nei cluster Dataproc standard fino al termine dell'applicazione.
La rimozione del nodo non attende il completamento dei master dell'app in esecuzione su un nodo. Quando il contenitore principale dell'app viene terminato, viene riprogrammato su un altro nodo che non verrà ritirato. L'avanzamento del job non viene perso: il nuovo app master recupera rapidamente lo stato dall'app master precedente leggendo la cronologia dei job.
Utilizzo della rimozione controllata in un cluster EFM con MapReduce
Crea un cluster EFM con un numero uguale di worker principali e secondari.
gcloud dataproc clusters create cluster-name \ --properties=dataproc:efm.mapreduce.shuffle=hcfs \ --region=region \ --num-workers=5 \ --num-secondary-workers=5
Esegui un job mapreduce che calcola il valore di pi utilizzando il file JAR di esempi mapreduce sul cluster.
gcloud dataproc jobs submit hadoop \ --cluster=cluster-name \ --region=region \ --jar=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \ -- pi 1000 10000000
Mentre il job è in esecuzione, fare lo scale down del cluster utilizzando la rimozione controllata.
I nodi verranno rimossi rapidamente dal cluster prima del termine del job, minimizzando al contempo la perdita di avanzamento del job. Possono verificarsi interruzioni temporanee nell'avanzamento dei job per i seguenti motivi:gcloud dataproc clusters update cluster-name \ --region=region \ --num-secondary-workers=0 \ --graceful-decommission-timeout=1h
- Failover dell'app master. Se l'avanzamento del job scende allo 0% e poi risale immediatamente al valore precedente al calo, l'app master potrebbe essere stato interrotto e un nuovo app master potrebbe aver recuperato il suo stato. Ciò non dovrebbe influire in modo significativo sul progresso del job, poiché il failover avviene rapidamente.
- Preemption delle VM. Poiché HDFS conserva solo gli output completi, non parziali, delle attività di mappatura, possono verificarsi interruzioni temporanee nell'avanzamento del job quando una VM viene prelevata durante il lavoro su un'attività di mappatura.
Per velocizzare la rimozione dei nodi, puoi fare lo scale down il cluster senza il rimozione controllata omettendo il flag --graceful-decommission-timeout
nell'esempio di comando gcloud
precedente. L'avanzamento del job delle attività di mappatura completate verrà conservato, ma l'output delle attività di mappatura parzialmente completate andrà perso (le attività di mappatura verranno eseguite di nuovo).