Modalità di flessibilità avanzata di Dataproc

La modalità di flessibilità avanzata (EFM) di Dataproc consente di gestire i dati elaborati in ordine casuale per ridurre al minimo i ritardi nell'avanzamento dei job causati dalla rimozione dei nodi da un cluster in esecuzione. La modalità EFM trasferisce i dati elaborati in ordine casuale in una di due modalità selezionabili dall'utente:

  1. Shuffling del worker principale. I mapper scrivono i dati nei worker principali. I worker estraggono da questi nodi remoti durante la fase di riduzione. Questa modalità è consigliata solo per i job Spark ed è disponibile solo per questi job.

  2. Shuffling del file system compatibile con Hadoop (HCFS). I mapper scrivono i dati in un'implementazione HCFS (HDFS per impostazione predefinita). Come per la modalità dei worker principali, solo i worker principali partecipano alle implementazioni di HDFS e HCFS (se lo shuffling HCFS utilizza il connettore Cloud Storage, i dati vengono archiviati fuori dal cluster). Questa modalità può essere utile per i job con piccole quantità di dati, ma a causa delle limitazioni di scalabilità, non è consigliata per i job di grandi dimensioni.

Poiché entrambe le modalità EFM non memorizzano i dati elaborati in ordine casuale intermedi sui worker secondari, EFM è particolarmente adatta ai cluster che utilizzano VM prerilasciabili o che applicano la scalabilità automatica solo al gruppo di worker secondari.

Limitazioni:

  • I job Apache Hadoop YARN che non supportano il trasferimento di AppMaster possono non riuscire in modalità di maggiore flessibilità (consulta Quando attendere il completamento degli AppMaster).
  • La modalità flessibilità avanzata non è consigliata:
    • su un cluster con solo worker principali
    • sui job di streaming, poiché possono essere necessari fino a 30 minuti dopo il completamento del job per pulire i dati di ordinamento intermedio.
  • La modalità flessibilità avanzata non è supportata:
    • quando è attiva la scalabilità automatica del worker principale. Nella maggior parte dei casi, i worker principali continueranno a memorizzare i dati di riproduzione casuale di cui non viene eseguita la migrazione automatica. La riduzione del gruppo di worker principale annulla i vantaggi dell'EFM.
    • quando i job Spark vengono eseguiti su un cluster con la rimozione controllata abilitata. Il ritiro controllato e l'EFM possono essere in conflitto poiché il meccanismo di ritiro controllato YARN mantiene i nodi DECOMMISSIONING fino al completamento di tutte le applicazioni coinvolte.

Utilizzo della modalità di flessibilità avanzata

La modalità Flessibilità avanzata è configurata per motore di esecuzione e deve essere configurata durante la creazione del cluster.

  • L'implementazione EFM di Spark è configurata con la dataproc:efm.spark.shuffle proprietà cluster. Valori validi della proprietà:

    • primary-worker per lo shuffling del worker principale (consigliato)
    • hcfs per lo shuffling basato su HCFS. Questa modalità è ritirata ed è disponibile solo sui cluster che eseguono la versione dell'immagine 1.5. Non consigliato per i nuovi flussi di lavoro.
  • L'implementazione di Hadoop MapReduce è configurata con la dataproc:efm.mapreduce.shuffle proprietà cluster. Valori validi della proprietà:

    • hcfs

Esempio: crea un cluster con lo shuffling del worker principale per Spark e lo shuffling HCFS per MapReduce:

gcloud dataproc clusters create cluster-name \
    --region=region \
    --properties=dataproc:efm.spark.shuffle=primary-worker \
    --properties=dataproc:efm.mapreduce.shuffle=hcfs \
    --worker-machine-type=n1-highmem-8 \
    --num-workers=25 \
    --num-worker-local-ssds=2 \
    --secondary-worker-type=preemptible \
    --secondary-worker-boot-disk-size=500GB \
    --num-secondary-workers=25

Esempio di Apache Spark

  1. Esegui un job WordCount sul testo pubblico di Shakespeare utilizzando il jar di esempi Spark sul cluster EFM.
    gcloud dataproc jobs submit spark \
        --cluster=cluster-name \
        --region=region \
        --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
        --class=org.apache.spark.examples.JavaWordCount \
        -- gs://apache-beam-samples/shakespeare/macbeth.txt
    

Esempio di Apache Hadoop MapReduce

  1. Esegui un piccolo job teragen per generare i dati di input in Cloud Storage per un job terasort successivo utilizzando il file jar degli esempi mapreduce sul cluster EFM.

    gcloud dataproc jobs submit hadoop \
        --cluster=cluster-name \
        --region=region \
        --jar=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \
        -- teragen 1000 Cloud Storage output URI (for example, gs://terasort/input)
    

  2. Esegui un job Terasort sui dati.

    gcloud dataproc jobs submit hadoop \
        --cluster=cluster-name \
        --region=region \
        --jar=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \
        -- terasort gs://terasort/input gs://terasort/output
    

Configurazione delle unità SSD locali per lo shuffling del worker principale

Le implementazioni di shuffling del worker principale e HDFS scrivono i dati intermedi sottoposti a shuffling su dischi collegati alla VM e sfruttano il throughput aggiuntivo e le IOPS offerti dagli SSD locali. Per facilitare l'allocazione delle risorse, scegli come obiettivo circa 1 partizione SSD locale per 4 vCPU quando configuri le macchine worker principali.

Per collegare le unità SSD locali, passa il flag --num-worker-local-ssds al comando gcloud Dataproc clusters create.

In genere, non sono necessarie unità SSD locali sui worker secondari. L'aggiunta di unità SSD locali ai worker secondari di un cluster (utilizzando il flag --num-secondary-worker-local-ssds) è spesso meno importante perché i worker secondari non scrivono i dati di miscelazione localmente. Tuttavia, poiché le unità SSD locali migliorano le prestazioni del disco locale, puoi decidere di aggiungerle ai worker secondari se prevedi che i job siano limitati dall'I/O a causa dell'utilizzo del disco locale: il tuo job utilizza una quantità significativa di spazio su disco locale per la memorizzazione temporanea o le partizioni sono troppo grandi per essere memorizzate in memoria e verranno trasferite sul disco.

Rapporto worker secondari

Poiché i worker secondari scrivono i dati di ordinamento nei worker principali, il tuo cluster deve contenere un numero sufficiente di worker principali con risorse di CPU, memoria e disco sufficienti per gestire il carico di ordinamento del job. Per i cluster con scalabilità automatica, per impedire la scalabilità del gruppo principale e causare comportamenti indesiderati, imposta minInstances sul valore maxInstances nel criterio di scalabilità automatica per il gruppo di worker principale.

Se hai un rapporto elevato tra worker secondari e principali (ad esempio 10:1), monitora l'utilizzo della CPU, della rete e del disco dei worker principali per determinare se sono sovraccaricati. Per farlo:

  1. Vai alla pagina Istanze VM nella console Google Cloud.

  2. Fai clic sulla casella di controllo a sinistra del worker principale.

  3. Fai clic sulla scheda MONITORAGGIO per visualizzare l'utilizzo della CPU, gli IOPS del disco, i byte di rete e altre metriche del worker principale.

Se i worker principali sono sovraccaricati, valuta la possibilità di aumentare manualmente il numero di worker principali.

Ridimensionamento del gruppo di worker principale

Il gruppo di worker principale può essere aumentato in sicurezza, ma la riduzione del gruppo di worker principale può influire negativamente sull'avanzamento del job. Le operazioni che riducono il gruppo di lavoro principale devono utilizzare il rimozione controllata, che viene attivato impostando il flag --graceful-decommission-timeout.

Cluster con scalabilità automatica:la scalabilità del gruppo di worker principale è disabilitata nei cluster EFM con criteri di scalabilità automatica. Per ridimensionare il gruppo di worker principale in un cluster con scalabilità automatica:

  1. Disattiva la scalabilità automatica.

    gcloud dataproc clusters update \
        --cluster=cluster-name \
        --region=region \
        --disable-autoscaling
    

  2. Scala il gruppo principale.

    gcloud dataproc clusters update \
        --cluster=cluster-name \
        --region=region \
        --num-workers=num-primary-workers \
        --graceful-decommission-timeout=graceful-decommission-timeout # (if downscaling)
    

  3. Riattiva la scalabilità automatica:

    gcloud dataproc clusters update \
        --cluster=cluster-name \
        --region=region \
        --autoscaling-policy=autoscaling-policy
    

Monitoraggio dell'utilizzo del disco del worker principale

I worker principali devono disporre di spazio su disco sufficiente per i dati di smistamento del cluster. Puoi monitorarlo indirettamente tramite la metrica remaining HDFS capacity. Man mano che il disco locale si riempie, lo spazio diventa non disponibile per HDFS e la capacità rimanente diminuisce.

Per impostazione predefinita, quando l'utilizzo del disco locale di un worker principale supera il 90% della capacità, il nodo verrà contrassegnato come NON INTEGRO nell'interfaccia utente del nodo YARN. Se riscontri problemi di capacità del disco, puoi eliminare i dati inutilizzati da HDFS o aumentare il pool di worker principale.

Configurazione avanzata

Partizionamento e parallelismo

Quando invii un job MapReduce o Spark, configura un livello appropriato di suddivisione. La scelta del numero di partizioni di input e output per una fase di ordinamento comporta un compromesso tra diverse caratteristiche di rendimento. È meglio fare esperimenti con i valori che funzionano per le forme dei tuoi job.

Partizioni di input

La partizione dell'input di MapReduce e Spark è determinata dal set di dati di input. Quando leggi i file da Cloud Storage, ogni attività elabora circa un "blocco di dimensioni" di dati.

  • Per i job Spark SQL, la dimensione massima della partizione è controllata da spark.sql.files.maxPartitionBytes. Valuta la possibilità di aumentarlo a 1 GB: spark.sql.files.maxPartitionBytes=1073741824.

  • Per i job MapReduce e gli RDD di Spark, le dimensioni della partizione vengono in genere controllate con fs.gs.block.size, il cui valore predefinito è 128 MB. Valuta la possibilità di aumentarlo a 1 GB. Puoi anche impostare proprietà InputFormat specifiche come mapreduce.input.fileinputformat.split.minsize e mapreduce.input.fileinputformat.split.maxsize

    • Per i job MapReduce: --properties fs.gs.block.size=1073741824
    • Per gli RDD di Spark: --properties spark.hadoop.fs.gs.block.size=1073741824

Partizioni di output

Il numero di attività nelle fasi successive è controllato da diverse proprietà. Per job di dimensioni maggiori che elaborano più di 1 TB, valuta la possibilità di avere almeno 1 GB per partizione.

  • Per i job MapReduce, il numero di partizioni di output è controllato da mapreduce.job.reduces.

  • Per Spark SQL, il numero di partizioni di output è controllato da spark.sql.shuffle.partitions.

  • Per i job Spark che utilizzano l'API RDD, puoi specificare il numero di partizioni di output o impostare spark.default.parallelism.

Ottimizzazione dello shuffling per lo shuffling del worker principale

La proprietà più significativa è --properties yarn:spark.shuffle.io.serverThreads=<num-threads>. Tieni presente che si tratta di una proprietà YARN a livello di cluster perché il server di smistamento Spark viene eseguito nell'ambito del gestore dei nodi. Il valore predefinito è il doppio (2x) del numero di core della macchina (ad esempio, 16 thread su una macchina n1-highmem-8). Se "Tempo di blocco lettura riproduzione casuale" è superiore a 1 secondo e i worker principali non hanno raggiunto i limiti di rete, CPU o disco, ti consigliamo di aumentare il numero di thread del server di riproduzione casuale.

Per i tipi di macchine più grandi, ti consigliamo di aumentare spark.shuffle.io.numConnectionsPerPeer, che per impostazione predefinita è 1. Ad esempio, impostalo su 5 connessioni per coppia di host.

Nuovi tentativi

Il numero massimo di tentativi consentiti per app master, attività e fasi può essere configurato impostando le seguenti proprietà:

yarn:yarn.resourcemanager.am.max-attempts
mapred:mapreduce.map.maxattempts
mapred:mapreduce.reduce.maxattempts
spark:spark.task.maxFailures
spark:spark.stage.maxConsecutiveAttempts

Poiché i master dell'app e le attività vengono terminati più di frequente nei cluster che utilizzano molte VM preemptibili o la scalabilità automatica senza la rimozione controllata, può essere utile aumentare i valori delle proprietà sopra indicate in questi cluster (tieni presente che l'utilizzo di EFM con Spark e il rimozione controllata non è supportato).

Configurazione di HDFS per lo shuffling HCFS

Per migliorare le prestazioni degli schemi di ordinamento di grandi dimensioni, puoi ridurre la contesa dei blocchi nel NameNode impostando dfs.namenode.fslock.fair=false. Tieni presente che questo rischia di bloccare le singole richieste, ma potrebbe migliorare il throughput a livello di cluster. Per migliorare ulteriormente le prestazioni del NameNode, puoi collegare SSD locali al nodo principale impostando --num-master-local-ssds. Puoi anche aggiungere SSD locali ai worker principali per migliorare le prestazioni di DataNode impostando --num-worker-local-ssds.

Altri file system compatibili con Hadoop per l'ordinamento casuale HCFS

Per impostazione predefinita, i dati elaborati in ordine casuale EFM HCFS vengono scritti in HDFS, ma puoi utilizzare qualsiasi file system compatibile con Hadoop (HCFS). Ad esempio, puoi decidere di scrivere lo shuffling su Cloud Storage o su HDFS di un altro cluster. Per specificare un file system, puoi indicarefs.defaultFS il file system di destinazione quando invii un job al cluster.

Rimozione controllata di YARN sui cluster EFM

Il ritiro gestito automaticamente di YARN può essere utilizzato per rimuovere rapidamente i nodi con un impatto minimo sulle applicazioni in esecuzione. Per i cluster con scalabilità automatica, il timeout di ritiro graduale puoi essere impostato in un AutoscalingPolicy collegato al cluster EFM.

Miglioramenti all'EFM MapReduce per la rimozione controllata

  1. Poiché i dati intermedi vengono archiviati in un file system distribuito, i nodi possono essere rimossi da un cluster EFM non appena tutti i contenitori in esecuzione su questi nodi sono stati completati. In confronto, i nodi non vengono rimossi nei cluster Dataproc standard fino al termine dell'applicazione.

  2. La rimozione del nodo non attende il completamento dei master dell'app in esecuzione su un nodo. Quando il contenitore principale dell'app viene terminato, viene riprogrammato su un altro nodo che non verrà ritirato. L'avanzamento del job non viene perso: il nuovo app master recupera rapidamente lo stato dall'app master precedente leggendo la cronologia dei job.

Utilizzo della rimozione controllata in un cluster EFM con MapReduce

  1. Crea un cluster EFM con un numero uguale di worker principali e secondari.

    gcloud dataproc clusters create cluster-name \
        --properties=dataproc:efm.mapreduce.shuffle=hcfs \
        --region=region \
        --num-workers=5 \
        --num-secondary-workers=5
    

  2. Esegui un job mapreduce che calcola il valore di pi utilizzando il file JAR di esempi mapreduce sul cluster.

    gcloud dataproc jobs submit hadoop \
        --cluster=cluster-name \
        --region=region \
        --jar=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \
        -- pi 1000 10000000
    

  3. Mentre il job è in esecuzione, fare lo scale down del cluster utilizzando la rimozione controllata.

    gcloud dataproc clusters update cluster-name \
        --region=region \
        --num-secondary-workers=0 \
        --graceful-decommission-timeout=1h
    
    I nodi verranno rimossi rapidamente dal cluster prima del termine del job, minimizzando al contempo la perdita di avanzamento del job. Possono verificarsi interruzioni temporanee nell'avanzamento dei job per i seguenti motivi:

    • Failover dell'app master. Se l'avanzamento del job scende allo 0% e poi risale immediatamente al valore precedente al calo, l'app master potrebbe essere stato interrotto e un nuovo app master potrebbe aver recuperato il suo stato. Ciò non dovrebbe influire in modo significativo sul progresso del job, poiché il failover avviene rapidamente.
    • Preemption delle VM. Poiché HDFS conserva solo gli output completi, non parziali, delle attività di mappatura, possono verificarsi interruzioni temporanee nell'avanzamento del job quando una VM viene prelevata durante il lavoro su un'attività di mappatura.

Per velocizzare la rimozione dei nodi, puoi fare lo scale down il cluster senza il rimozione controllata omettendo il flag --graceful-decommission-timeout nell'esempio di comando gcloud precedente. L'avanzamento del job delle attività di mappatura completate verrà conservato, ma l'output delle attività di mappatura parzialmente completate andrà perso (le attività di mappatura verranno eseguite di nuovo).