Modalità di flessibilità avanzata di Dataproc

La modalità di flessibilità avanzata (EFM) di Dataproc gestisce i dati in ordine casuale per ridurre al minimo i ritardi di avanzamento dei job causati dalla rimozione dei nodi da un cluster in esecuzione. La modalità EFM trasferisce i dati di shuffling in una delle due modalità selezionabili dall'utente:

  1. Scambio casuale del worker principale. I mappatori scrivono dati per i worker principali. I worker eseguono il pull da questi nodi remoti durante la fase di riduzione. Questa modalità è disponibile ed è consigliata per i job Spark.

  2. Riproduzione casuale HCFS (Hadoop Compatible File System). I mappatori scrivono dati in un'implementazione HCFS (HDFS per impostazione predefinita). Come per la modalità worker principale, solo i worker principali partecipano alle implementazioni HDFS e HCFS (se lo shuffle di HCFS utilizza il connettore Cloud Storage, i dati vengono archiviati al di fuori del cluster). Questa modalità può essere utile per i job con piccole quantità di dati ma, a causa delle limitazioni di scalabilità, non è consigliata per i job di grandi dimensioni.

Poiché entrambe le modalità EFM non archiviano dati di shuffling intermedio sui worker secondari, la modalità EFM è particolarmente adatta ai cluster che utilizzano VM prerilasciabili o che scalano automaticamente solo il gruppo di worker secondari.

Limitazioni:

  • I job Apache Hadoop YARN che non supportano il trasferimento di AppMaster possono avere esito negativo in modalità di flessibilità avanzata (vedi Quando attendere il completamento di AppMasters).
  • La modalità di flessibilità avanzata non è consigliata:
    • in un cluster che ha solo worker principali.
  • La modalità di flessibilità avanzata non è supportata:
    • quando la scalabilità automatica del worker principale è abilitata. Nella maggior parte dei casi, i worker principali continueranno ad archiviare i dati di shuffling di cui non viene eseguita la migrazione automatica. Il ridimensionamento del gruppo di lavoratori principale nega i vantaggi EFM.
    • quando i job Spark vengono eseguiti su un cluster con la rimozione controllata abilitato. Il ritiro gestito automaticamente ed EFM possono funzionare a più scopi, poiché il meccanismo di dismissione controllato di YARN mantiene i nodi di DECOMMISSIONING fino al completamento di tutte le applicazioni coinvolte.

Utilizzo della modalità di flessibilità avanzata

La modalità di flessibilità avanzata viene configurata per ogni motore di esecuzione e deve essere configurata durante la creazione del cluster.

  • L'implementazione EFM di Spark è configurata con la proprietà cluster dataproc:efm.spark.shuffle. Valori della proprietà validi:

    • primary-worker per lo shuffling del worker principale (consigliato)
    • hcfs per la riproduzione casuale basata su HCFS. Questa modalità è obsoleta ed è disponibile solo nei cluster che eseguono immagini con versione 1.5. Opzione sconsigliata per i nuovi flussi di lavoro.
  • L'implementazione di Hadoop MapReduce è configurata con la dataproc:efm.mapreduce.shuffle proprietà cluster. Valori della proprietà validi:

    • hcfs

Esempio: crea un cluster con shuffle del worker principale per Spark e HCFS per MapReduce:

gcloud dataproc clusters create cluster-name \
    --region=region \
    --properties=dataproc:efm.spark.shuffle=primary-worker \
    --properties=dataproc:efm.mapreduce.shuffle=hcfs \
    --worker-machine-type=n1-highmem-8 \
    --num-workers=25 \
    --num-worker-local-ssds=2 \
    --secondary-worker-type=preemptible \
    --secondary-worker-boot-disk-size=500GB \
    --num-secondary-workers=25

Esempio di Apache Spark

  1. Esegui un job WordCount su testo Shakespeare pubblico utilizzando il jar degli esempi Spark sul cluster EFM.
    gcloud dataproc jobs submit spark \
        --cluster=cluster-name \
        --region=region \
        --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
        --class=org.apache.spark.examples.JavaWordCount \
        -- gs://apache-beam-samples/shakespeare/macbeth.txt
    

Esempio di Apache Hadoop MapReduce

  1. Esegui un piccolo job di Teragen per generare dati di input in Cloud Storage per un job di destinazione successivo utilizzando il jar degli esempi di MapReduce nel cluster EFM.

    gcloud dataproc jobs submit hadoop \
        --cluster=cluster-name \
        --region=region \
        --jar=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \
        -- teragen 1000 Cloud Storage output URI (for example, gs://terasort/input)
    

  2. Eseguire un job di targeting sui dati.

    gcloud dataproc jobs submit hadoop \
        --cluster=cluster-name \
        --region=region \
        --jar=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \
        -- terasort gs://terasort/input gs://terasort/output
    

Configurazione delle unità SSD locali per lo shuffle del worker principale

Le implementazioni shuffle dei worker principali e HDFS scrivono dati di shuffle intermedia su dischi collegati alle VM e beneficiano della velocità effettiva e del numero di IOPS aggiuntivi offerti dagli SSD locali. Per facilitare l'allocazione delle risorse, scegli come target circa una partizione SSD locale per 4 vCPU durante la configurazione delle macchine worker principali.

Per collegare gli SSD locali, passa il flag --num-worker-local-ssds al comando gcloud dataproc clusters create.

Rapporto worker secondario

Poiché i worker secondari scrivono i propri dati di shuffle nei worker principali, il cluster deve contenere un numero sufficiente di worker principali con risorse di CPU, memoria e disco sufficienti per supportare il carico di shuffle del job. Per i cluster con scalabilità automatica, per impedire al gruppo principale di scalare e causare comportamenti indesiderati, imposta minInstances sul valore maxInstances nel criterio di scalabilità automatica per il gruppo di worker principale.

Se hai un rapporto alto tra worker secondari e principali (ad esempio 10:1), monitora l'utilizzo della CPU, della rete e del disco dei worker principali per determinare se sono sovraccaricati. Per farlo:

  1. Vai alla pagina Istanze VM nella console Google Cloud.

  2. Fai clic sulla casella di controllo a sinistra del worker principale.

  3. Fai clic sulla scheda MONITORAGGIO per visualizzare l'utilizzo della CPU, il numero di IOPS del disco, i byte di rete e altre metriche del worker principale.

Se i worker principali sono sovraccaricati, valuta la possibilità di scalare manualmente i worker principali.

Ridimensionamento del gruppo di worker principale

È possibile fare lo scale up del gruppo di worker principale in modo sicuro, ma lo scale down del gruppo di worker principale può influire negativamente sull'avanzamento del job. Le operazioni che eseguono il downgrade del gruppo di worker principale devono utilizzare il scommissione gestito automaticamente, che viene abilitato impostando il flag --graceful-decommission-timeout.

Cluster con scalabilità automatica:la scalabilità del gruppo di worker principale è disabilitata nei cluster EFM con criteri di scalabilità automatica. Per ridimensionare il gruppo di worker principale su un cluster con scalabilità automatica:

  1. Disabilita la scalabilità automatica.

    gcloud dataproc clusters update \
        --cluster=cluster-name \
        --region=region \
        --disable-autoscaling
    

  2. Scalare il gruppo principale.

    gcloud dataproc clusters update \
        --cluster=cluster-name \
        --region=region \
        --num-workers=num-primary-workers \
        --graceful-decommission-timeout=graceful-decommission-timeout # (if downscaling)
    

  3. Riattiva la scalabilità automatica:

    gcloud dataproc clusters update \
        --cluster=cluster-name \
        --region=region \
        --autoscaling-policy=autoscaling-policy
    

Monitoraggio dell'utilizzo del disco worker primario

I worker principali devono avere spazio su disco sufficiente per i dati di shuffling del cluster. Puoi monitorare questo comportamento indirettamente tramite la metrica remaining HDFS capacity. Man mano che il disco locale si riempie, lo spazio non sarà più disponibile per HDFS e la capacità rimanente diminuirà.

Per impostazione predefinita, quando il disco locale di un worker principale utilizza supera il 90% della capacità, il nodo verrà contrassegnato come UNHEALTHY nella UI del nodo YARN. In caso di problemi di capacità del disco, puoi eliminare i dati inutilizzati da HDFS o fare lo scale up del pool di worker principali.

Tieni presente che i dati di shuffling intermedio in genere non vengono eliminati fino alla fine di un job. Quando utilizzi lo shuffling del worker principale con Spark, questa operazione può richiedere fino a 30 minuti dopo il completamento di un job.

Configurazione avanzata

Partizionamento e parallelismo

Quando invii un job MapReduce o Spark, configura un livello di partizionamento appropriato. La scelta del numero di partizioni di input e di output per una fase di shuffling comporta un compromesso tra le diverse caratteristiche delle prestazioni. È preferibile sperimentare con i valori che funzionano per le forme di lavoro.

Partizioni di input

Il partizionamento di input di MapReduce e Spark è determinato dal set di dati di input. Durante la lettura dei file da Cloud Storage, ogni attività elabora circa una "dimensione del blocco" di dati.

  • Per i job Spark SQL, la dimensione massima della partizione è controllata da spark.sql.files.maxPartitionBytes. Valuta la possibilità di aumentarlo fino a 1 GB: spark.sql.files.maxPartitionBytes=1073741824.

  • Per i job MapReduce e i RDD Spark, le dimensioni della partizione vengono in genere controllate con fs.gs.block.size, che per impostazione predefinita è di 128 MB. Valuta la possibilità di aumentarlo fino a 1 GB. Puoi anche impostare proprietà specifiche di InputFormat, come mapreduce.input.fileinputformat.split.minsize e mapreduce.input.fileinputformat.split.maxsize

    • Per i job MapReduce: --properties fs.gs.block.size=1073741824
    • Per i RDD Spark: --properties spark.hadoop.fs.gs.block.size=1073741824

Partizioni di output

Il numero di attività nelle fasi successive è controllato da diverse proprietà. Nei job di grandi dimensioni che elaborano più di 1 TB, valuta la possibilità di avere almeno 1 GB per partizione.

  • Per i job MapReduce, il numero di partizioni di output è controllato da mapreduce.job.reduces.

  • Per Spark SQL, il numero di partizioni di output è controllato da spark.sql.shuffle.partitions.

  • Per i job Spark che utilizzano l'API RDD, puoi specificare il numero di partizioni di output o impostare spark.default.parallelism.

Ottimizzazione casuale per la riproduzione casuale dei lavoratori principali

La proprietà più significativa è --properties yarn:spark.shuffle.io.serverThreads=<num-threads>. Tieni presente che questa è una proprietà YARN a livello di cluster perché il server di shuffle Spark viene eseguito come parte del Gestore nodi. Il valore predefinito è il doppio (2x) del numero di core sulla macchina (ad esempio 16 thread su n1-highmem-8). Se il valore di "Tempo di blocco per lettura shuffle" è superiore a 1 secondo e i worker principali non hanno raggiunto i limiti di rete, CPU o disco, valuta la possibilità di aumentare il numero di thread del server di shuffle.

Sui tipi di macchina più grandi, valuta la possibilità di aumentare spark.shuffle.io.numConnectionsPerPeer, che per impostazione predefinita è 1. ad esempio impostando 5 connessioni per coppia di host.

Aumento dei nuovi tentativi

Il numero massimo di tentativi consentiti per master, attività e fasi dell'app può essere configurato impostando le seguenti proprietà:

yarn:yarn.resourcemanager.am.max-attempts
mapred:mapreduce.map.maxattempts
mapred:mapreduce.reduce.maxattempts
spark:spark.task.maxFailures
spark:spark.stage.maxConsecutiveAttempts

Poiché i master e le attività delle app vengono terminati più spesso nei cluster che utilizzano molte VM prerilasciabili o scalabilità automatica senza dismissione controllato, l'aumento dei valori delle proprietà precedenti in questi cluster potrebbe essere utile (tieni presente che l'utilizzo di EFM con Spark e il rimozione controllata non è supportato).

Configurazione di HDFS per lo shuffle HCFS

Per migliorare le prestazioni degli shuffling di grandi dimensioni, puoi ridurre la contesa dei blocchi in NameNode impostando dfs.namenode.fslock.fair=false. Tieni presente che questo rischia di esaurire le richieste individuali, ma potrebbe migliorare la velocità effettiva a livello di cluster. Per migliorare ulteriormente le prestazioni di NameNode, puoi collegare gli SSD locali al nodo master impostando --num-master-local-ssds. Puoi anche aggiungere SSD locali ai worker principali per migliorare le prestazioni di DataNode impostando --num-worker-local-ssds.

Altri file system compatibili con Hadoop per lo shuffle HCFS

Per impostazione predefinita, i dati shuffling di EFM HCFS vengono scritti in HDFS, ma puoi utilizzare qualsiasi file system compatibile con Hadoop (HCFS). Ad esempio, puoi decidere di scrivere shuffle in Cloud Storage o nell'HDFS di un cluster diverso. Per specificare un file system, puoi puntare fs.defaultFS al file system di destinazione quando invii un job al tuo cluster.

Rimozione controllata da YARN sui cluster EFM

Il ritiro gestito automaticamente YARN può essere utilizzato per rimuovere rapidamente i nodi con un impatto minimo sull'esecuzione delle applicazioni. Per i cluster a scalabilità automatica, il timeout per il ritiro automatico può essere impostato in un AutoscalingPolicy collegato al cluster EFM.

Miglioramenti EFM di MapReduce alla rimozione controllata

  1. Poiché i dati intermedi sono archiviati in un file system distribuito, i nodi possono essere rimossi da un cluster EFM non appena tutti i container in esecuzione su questi nodi sono stati completati. In confronto, i nodi non vengono rimossi nei cluster Dataproc standard fino al termine dell'applicazione.

  2. La rimozione dei nodi non richiede il completamento dei master dell'app in esecuzione su un nodo. Quando il container master dell'app viene terminato, viene ripianificato su un altro nodo non dismesso. L'avanzamento del job non andrà perso: il nuovo master dell'app recupera rapidamente lo stato dell'app master precedente leggendo la cronologia dei job.

Utilizzo della rimozione controllata su un cluster EFM con MapReduce

  1. Crea un cluster EFM con un numero uguale di worker primari e secondari.

    gcloud dataproc clusters create cluster-name \
        --properties=dataproc:efm.mapreduce.shuffle=hcfs \
        --region=region \
        --num-workers=5 \
        --num-secondary-workers=5
    

  2. Esegui un job di MapReduce che calcola il valore di pi greco utilizzando il jar degli esempi di MapReduce sul cluster.

    gcloud dataproc jobs submit hadoop \
        --cluster=cluster-name \
        --region=region \
        --jar=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \
        -- pi 1000 10000000
    

  3. Mentre il job è in esecuzione, fare lo scale down del cluster utilizzando la rimozione controllata.

    gcloud dataproc clusters update cluster-name \
        --region=region \
        --num-secondary-workers=0 \
        --graceful-decommission-timeout=1h
    
    I nodi verranno rimossi dal cluster rapidamente prima del termine del job, riducendo al minimo la perdita dell'avanzamento del job. Le pause temporanee nell'avanzamento del job possono verificarsi a causa di:

    • Failover del master dell'app. Se l'avanzamento del job scende a 0% e poi passa immediatamente al valore di pre-drop, il master dell'app potrebbe essere stato terminato e un nuovo master dell'app ha recuperato il proprio stato. Ciò non dovrebbe influire in modo significativo sull'avanzamento del job poiché il failover avviene rapidamente.
    • Prerilascio delle VM. Poiché HDFS conserva solo gli output completi e non parziali delle attività di mappatura, si possono verificare pause temporanee nell'avanzamento del job quando una VM viene prerilasciata durante l'elaborazione di un'attività di mappa.