Modalità di flessibilità avanzata di Dataproc

La modalità di flessibilità avanzata (EFM) di Dataproc consente di gestire i dati elaborati in ordine casuale per ridurre al minimo i ritardi nell'avanzamento dei job causati dalla rimozione dei nodi da un cluster in esecuzione. La modalità EFM trasferisce i dati di shuffling in uno dei due modelli selezionabili dall'utente modalità:

  1. Riproduzione casuale casuale di worker principali. I mappatori scrivono dati ai lavoratori principali. Pull worker dai nodi remoti durante la fase di riduzione. Questa modalità è disponibile soltanto e l'opzione è consigliata per i job Spark.

  2. Riproduzione casuale HCFS (Hadoop Compatible File System). I mappatori scrivono dati in un Implementazione di HCFS (HDFS per impostazione predefinita). Come per la modalità worker principale, partecipano solo i worker principali nelle implementazioni HDFS e HCFS (se Lo shuffle di HCFS utilizza il connettore Cloud Storage, vengono archiviati fuori dal cluster). Questa modalità può essere utile per i job con piccole quantità di dati, ma a causa delle limitazioni di scalabilità, non è consigliata per i job di grandi dimensioni.

Poiché entrambe le modalità EFM non memorizzano i dati elaborati in ordine casuale intermedi sui worker secondari, EFM è molto adatta ai cluster che utilizzano VM prerilasciabili o che applicano la scalabilità automatica solo al gruppo di worker secondari.

Limitazioni:

  • I job Apache Hadoop YARN che non supportano il trasferimento di AppMaster possono non riuscire in modalità di maggiore flessibilità (consulta Quando attendere il completamento degli AppMaster).
  • La modalità di flessibilità avanzata non è consigliata:
    • su un cluster che ha solo worker principali
    • sui job di flussi di dati poiché possono richiedere fino a 30 minuti dopo il completamento per la pulizia dei dati intermedi di shuffling.
  • La modalità di flessibilità avanzata non è supportata:
    • quando è attiva la scalabilità automatica del worker principale. Nella maggior parte dei casi, i worker principali continueranno a memorizzare i dati di smistamento di cui non viene eseguita la migrazione automatica. Lo scale down del gruppo di worker principale annulla i vantaggi della funzionalità EFM.
    • quando i job Spark vengono eseguiti su un cluster in cui è abilitato rimozione controllata. Il ritiro controllato e l'EFM possono essere in conflitto poiché il meccanismo di ritiro controllato YARN mantiene i nodi DECOMMISSIONING fino al completamento di tutte le applicazioni coinvolte.

Utilizzo della modalità di flessibilità avanzata

La modalità di flessibilità avanzata è configurata per ogni motore di esecuzione e deve essere configurato durante la creazione del cluster.

  • L'implementazione di Spark EFM è configurata con dataproc:efm.spark.shuffle proprietà cluster. Valori validi della proprietà:

    • primary-worker per lo shuffling del worker principale (consigliato)
    • hcfs per lo shuffling basato su HCFS. Questa modalità è ritirata ed è disponibile solo sui cluster che eseguono l'immagine versione 1.5. Opzione sconsigliata per i nuovi flussi di lavoro.
  • L'implementazione di Hadoop MapReduce è configurata con il dataproc:efm.mapreduce.shuffle proprietà cluster. Valori di proprietà validi:

    • hcfs

Esempio: crea un cluster con lo shuffling del worker principale per Spark e lo shuffling HCFS per MapReduce:

gcloud dataproc clusters create cluster-name \
    --region=region \
    --properties=dataproc:efm.spark.shuffle=primary-worker \
    --properties=dataproc:efm.mapreduce.shuffle=hcfs \
    --worker-machine-type=n1-highmem-8 \
    --num-workers=25 \
    --num-worker-local-ssds=2 \
    --secondary-worker-type=preemptible \
    --secondary-worker-boot-disk-size=500GB \
    --num-secondary-workers=25

Esempio di Apache Spark

  1. Esegui un job WordCount sul testo pubblico di Shakespeare utilizzando il jar di esempi Spark sul cluster EFM.
    gcloud dataproc jobs submit spark \
        --cluster=cluster-name \
        --region=region \
        --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
        --class=org.apache.spark.examples.JavaWordCount \
        -- gs://apache-beam-samples/shakespeare/macbeth.txt
    

Esempio di Apache Hadoop MapReduce

  1. Esegui un piccolo job teragen per generare i dati di input in Cloud Storage per un job terasort successivo utilizzando il file jar degli esempi mapreduce sul cluster EFM.

    gcloud dataproc jobs submit hadoop \
        --cluster=cluster-name \
        --region=region \
        --jar=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \
        -- teragen 1000 Cloud Storage output URI (for example, gs://terasort/input)
    

  2. Esegui un job terasort sui dati.

    gcloud dataproc jobs submit hadoop \
        --cluster=cluster-name \
        --region=region \
        --jar=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \
        -- terasort gs://terasort/input gs://terasort/output
    

Configurazione delle unità SSD locali per lo shuffling del worker principale

Le implementazioni di shuffling del worker principale e HDFS scrivono i dati intermedi sottoposti a shuffling su dischi collegati alla VM e sfruttano il throughput aggiuntivo e le IOPS offerte dagli SSD locali. Per facilitare l'allocazione delle risorse, scegli come obiettivo circa 1 partizione SSD locale per 4 vCPU quando configuri le macchine worker principali.

Per collegare le unità SSD locali, passa il flag --num-worker-local-ssds al comando gcloud Dataproc clusters create.

In genere, non hai bisogno di SSD locali sui worker secondari. Aggiunta di SSD locali ai worker secondari di un cluster (utilizzando --num-secondary-worker-local-ssds flag) è spesso meno importante perché i worker secondari non scrivono i dati di shuffling localmente. Tuttavia, poiché gli SSD locali migliorano le prestazioni del disco locale, puoi decidere di aggiungere SSD locali ai worker secondari, se prevedi job a essere vincolati all'I/O a causa dell'uso del disco locale: il job utilizza un disco locale significativo per spazio temporaneo o le partizioni troppo grandi per essere contenuti in memoria e sovraccaricati su disco.

Rapporto worker secondario

Poiché i worker secondari scrivono i dati di smistamento nei worker principali, il tuo cluster deve contenere un numero sufficiente di worker principali con risorse CPU, memoria e disco sufficienti per gestire il carico di smistamento del job. Per i cluster con scalabilità automatica, per impedire il ridimensionamento del gruppo principale e causare comportamenti indesiderati, imposta minInstances sul valore maxInstances nel criterio di scalabilità automatica per il gruppo di worker principale.

Se hai un rapporto elevato tra worker secondari e principali (ad esempio 10:1), monitora l'utilizzo della CPU, della rete e del disco dei worker principali per determinare se sono sovraccaricati. Per farlo:

  1. Vai alla pagina Istanze VM nel nella console Google Cloud.

  2. Fai clic sulla casella di controllo sul lato sinistro del worker principale.

  3. Fai clic sulla scheda MONITORAGGIO per visualizzare l'utilizzo della CPU, IOPS del disco, byte di rete e altre metriche del worker principale.

Se i worker principali sono sovraccarichi, valuta la possibilità di aumentare il livello manualmente.

Ridimensionamento del gruppo di worker principale

Il gruppo di worker principale può essere aumentato in sicurezza, ma la riduzione del gruppo di worker principale può influire negativamente sull'avanzamento del job. Operazioni che eseguono il downgrade gruppo di worker principale deve utilizzare il ritiro gestito, che viene abilitato impostando il flag --graceful-decommission-timeout.

Cluster con scalabilità automatica: la scalabilità del gruppo di worker principale è disabilitata nei cluster EFM con criteri di scalabilità automatica. Per ridimensionare il gruppo di worker principale su una cluster con scalabilità automatica:

  1. Disattiva la scalabilità automatica.

    gcloud dataproc clusters update \
        --cluster=cluster-name \
        --region=region \
        --disable-autoscaling
    

  2. Scala il gruppo principale.

    gcloud dataproc clusters update \
        --cluster=cluster-name \
        --region=region \
        --num-workers=num-primary-workers \
        --graceful-decommission-timeout=graceful-decommission-timeout # (if downscaling)
    

  3. Riattiva la scalabilità automatica:

    gcloud dataproc clusters update \
        --cluster=cluster-name \
        --region=region \
        --autoscaling-policy=autoscaling-policy
    

Monitoraggio dell'utilizzo del disco del worker principale

I worker principali devono disporre di spazio su disco sufficiente per i dati di shuffling del cluster. Puoi monitorarlo indirettamente tramite la metrica remaining HDFS capacity. Quando il disco locale si riempie, lo spazio diventa non disponibile per HDFS. la capacità rimanente diminuisce.

Per impostazione predefinita, quando il disco locale di un worker principale usa supera il 90% della capacità, il nodo verrà contrassegnato come UNHEALTHY nella UI del nodo YARN. Se riscontri problemi di capacità, puoi eliminare i dati inutilizzati da HDFS o fare lo scale up pool di worker principale.

Configurazione avanzata

Partizionamento e parallelismo

Quando invii un job MapReduce o Spark, configura un livello appropriato di il partizionamento orizzontale. La scelta del numero di partizioni di input e output per una fase di ordinamento comporta un compromesso tra diverse caratteristiche di prestazioni. È meglio sperimentare i valori più adatti alle tue forme di lavoro.

Partizioni di input

La partizione dell'input di MapReduce e Spark è determinata dal set di dati di input. Quando leggi i file da Cloud Storage, ogni attività elabora circa un "blocco di dimensioni" di dati.

  • Per i job Spark SQL, la dimensione massima della partizione è controllata da spark.sql.files.maxPartitionBytes. Valuta la possibilità di aumentarlo a 1 GB: spark.sql.files.maxPartitionBytes=1073741824.

  • Per i job MapReduce e gli RDD Spark, la dimensione della partizione viene generalmente controllata con fs.gs.block.size, che per impostazione predefinita è 128 MB. Valuta la possibilità di aumentarlo a 1 GB. Puoi anche impostare proprietà InputFormat specifiche come mapreduce.input.fileinputformat.split.minsize e mapreduce.input.fileinputformat.split.maxsize

    • Per i job MapReduce: --properties fs.gs.block.size=1073741824
    • Per gli RDD di Spark: --properties spark.hadoop.fs.gs.block.size=1073741824

Partizioni di output

Il numero di attività nelle fasi successive è controllato da diverse proprietà. Per job di dimensioni maggiori che elaborano più di 1 TB, valuta la possibilità di avere almeno 1 GB per partizione.

  • Per i job MapReduce, il numero di partizioni di output è controllato da mapreduce.job.reduces.

  • Per Spark SQL, il numero di partizioni di output è controllato da spark.sql.shuffle.partitions.

  • Per i job Spark che utilizzano l'API RDD, puoi specificare il numero di partizioni di output o impostare spark.default.parallelism.

Ottimizzazione dello shuffling per lo shuffling del worker principale

La proprietà più significativa è --properties yarn:spark.shuffle.io.serverThreads=<num-threads>. Tieni presente che si tratta di una proprietà YARN a livello di cluster perché il server di smistamento Spark viene eseguito nell'ambito del gestore dei nodi. Per impostazione predefinita, il numero di core è doppio (2x) (ad esempio, 16 thread su un n1-highmem-8). Se viene visualizzato il messaggio "Visualizzazione casuale del tempo di blocco della lettura" sono più di 1 secondo e i worker principali non hanno raggiunto la rete, la CPU o il disco limiti, valuta la possibilità di aumentare il numero di thread del server di shuffling.

Sui tipi di macchina più grandi, ti consigliamo di aumentare spark.shuffle.io.numConnectionsPerPeer, che per impostazione predefinita è 1. Ad esempio, impostalo su 5 connessioni per coppia di host.

Nuovi tentativi

È consentito il numero massimo di tentativi consentiti per i master dell'app, le attività e le fasi configurarli impostando le seguenti proprietà:

yarn:yarn.resourcemanager.am.max-attempts
mapred:mapreduce.map.maxattempts
mapred:mapreduce.reduce.maxattempts
spark:spark.task.maxFailures
spark:spark.stage.maxConsecutiveAttempts

Poiché i master e le attività delle app vengono terminati più spesso in cluster che utilizzano molte VM prerilasciabili o scalabilità automatica senza rimozione controllata, aumentando i valori delle proprietà sopra indicate in questi cluster può essere d'aiuto (tieni presente che l'utilizzo di EFM con Spark e rimozione controllata non supportati).

Configurazione di HDFS per shuffle HCFS

Per migliorare le prestazioni degli shuffling di grandi dimensioni, puoi ridurre il conflitto di blocchi in NameNode impostando dfs.namenode.fslock.fair=false. Tieni presente che questo rischio esaurire le singole richieste, ma potrebbe migliorare la velocità effettiva a livello di cluster. Per migliorare ulteriormente le prestazioni di NameNode, puoi collegare SSD locali al nodo principale impostando --num-master-local-ssds. Puoi anche aggiungere SSD locali ai worker principali per migliorare le prestazioni di DataNode impostando --num-worker-local-ssds.

Altri file system compatibili con Hadoop per l'ordinamento casuale HCFS

Per impostazione predefinita, i dati elaborati in ordine casuale EFM HCFS vengono scritti in HDFS, ma puoi utilizzare qualsiasi file system compatibile con Hadoop (HCFS). Ad esempio, potresti decidere di scrivere lo shuffle in Cloud Storage all'HDFS di un altro cluster. Per specificare un file system, puoi puntare fs.defaultFS al file system di destinazione quando invii un job al cluster.

Rimozione controllata di YARN sui cluster EFM

Ritiro gestito YARN consente di rimuovere i nodi rapidamente e con l'impatto sull'esecuzione delle applicazioni. Per i cluster con scalabilità automatica, il timeout di ritiro graduale può essere impostato in un AutoscalingPolicy collegato al cluster EFM.

Miglioramenti di MapReduce EFM alla rimozione controllata

  1. Poiché i dati intermedi vengono archiviati in un file system distribuito, i nodi possono essere rimossi da un cluster EFM non appena tutti i contenitori in esecuzione su questi nodi sono stati completati. In confronto, i nodi non vengono rimossi nei cluster Dataproc standard fino al termine dell'applicazione.

  2. La rimozione del nodo non attende il completamento dei master dell'app in esecuzione su un nodo. Quando il contenitore principale dell'app viene terminato, viene riprogrammato su un altro nodo che non verrà ritirato. L'avanzamento del job non viene perso: il nuovo master dell'app recupera rapidamente lo stato dal master dell'app precedente leggendo la cronologia dei job.

Utilizzo del ritiro gestito automaticamente in un cluster EFM con MapReduce

  1. Crea un cluster EFM con un numero uguale di worker principali e secondari.

    gcloud dataproc clusters create cluster-name \
        --properties=dataproc:efm.mapreduce.shuffle=hcfs \
        --region=region \
        --num-workers=5 \
        --num-secondary-workers=5
    

  2. Esegui un job mapreduce che calcola il valore di pi utilizzando il file JAR di esempi mapreduce sul cluster.

    gcloud dataproc jobs submit hadoop \
        --cluster=cluster-name \
        --region=region \
        --jar=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \
        -- pi 1000 10000000
    

  3. Mentre il job è in esecuzione, esegui il ridimensionamento del cluster utilizzando il ritiro gestito automaticamente.

    gcloud dataproc clusters update cluster-name \
        --region=region \
        --num-secondary-workers=0 \
        --graceful-decommission-timeout=1h
    
    I nodi verranno rimossi rapidamente dal cluster prima del termine del job, minimizzando al contempo la perdita di avanzamento del job. Potrebbero verificarsi pause temporanee nell'avanzamento del job a causa di:

    • Failover del master dell'app. Se l'avanzamento del job scende allo 0% e poi salta immediatamente fino al valore pre-drop, il master dell'app potrebbe essere mentre un nuovo master dell'app ha recuperato il suo stato. Ciò non dovrebbe influire significativamente l'avanzamento del job, dato che il failover avviene rapidamente.
    • Prerilascio delle VM. Poiché HDFS conserva solo gli output completi, non parziali, delle attività di mappatura, possono verificarsi interruzioni temporanee nell'avanzamento del job quando una VM viene prelevata durante il lavoro su un'attività di mappatura.

Per velocizzare la rimozione dei nodi, puoi ridurre il cluster senza il ritiro graduale omettendo il flag --graceful-decommission-timeout nell'esempio di comando gcloud precedente. L'avanzamento del job dalle attività di mappa completate: l'output dell'attività della mappa parzialmente completata andrà perso (le attività della mappa verranno eseguite nuovamente).