Modalità di flessibilità avanzata di Dataproc

La modalità di flessibilità avanzata (EFM) di Dataproc gestisce i dati di shuffling per ridurre al minimo i ritardi nell'avanzamento dei job causati dalla rimozione dei nodi da un cluster in esecuzione. EFM scrive i dati di shuffling Spark sui worker principali. I worker eseguono il pull da questi nodi remoti durante la fase di riduzione.

Poiché EFM non archivia i dati di shuffling intermedi sui worker secondari, è ideale per l'utilizzo in cluster che utilizzano VM prerilasciabili o che scalano automaticamente solo il gruppo di worker secondari.

Limitazioni:

  • I job YARN di Apache Hadoop che non supportano il trasferimento di AppMaster possono non riuscire in modalità di flessibilità avanzata (vedi Quando attendere il completamento di AppMasters).
  • La modalità di flessibilità avanzata non è consigliata:
    • su un cluster che ha solo worker principali.
  • La modalità di flessibilità avanzata non è supportata:
    • quando è abilitata la scalabilità automatica del worker principale. Nella maggior parte dei casi, i worker principali continueranno ad archiviare i dati di shuffling di cui non viene eseguita la migrazione automatica. Il ridimensionamento del gruppo di lavoratori principale annulla i vantaggi EFM.
    • quando i job Spark vengono eseguiti su un cluster con la rimozione controllata abilitato. Il ritiro gestito automaticamente ed EFM possono funzionare a più scopi, poiché il meccanismo di dismissione gestito automaticamente YARN mantiene i nodi di DECOMMISSIONING fino al completamento di tutte le applicazioni coinvolte.

Utilizzo della modalità di flessibilità avanzata

La modalità di flessibilità avanzata viene configurata per motore di esecuzione e deve essere configurata durante la creazione del cluster. L'implementazione di Spark EFM è configurata con la proprietà cluster dataproc:efm.spark.shuffle=primary-worker.

Esempio:crea un cluster con shuffling del worker principale per Spark:

gcloud dataproc clusters create cluster-name \
    --region=region \
    --properties=dataproc:efm.spark.shuffle=primary-worker \
    --worker-machine-type=n1-highmem-8 \
    --num-workers=25 \
    --num-worker-local-ssds=2 \
    --secondary-worker-type=preemptible \
    --secondary-worker-boot-disk-size=500GB \
    --num-secondary-workers=25

Esempio di Apache Spark

  1. Esegui un job WordCount sul testo pubblico Shakespeare utilizzando il jar degli esempi di Spark nel cluster EFM.
    gcloud dataproc jobs submit spark \
        --cluster=cluster-name \
        --region=region \
        --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
        --class=org.apache.spark.examples.JavaWordCount \
        -- gs://apache-beam-samples/shakespeare/macbeth.txt
    

Configurazione delle SSD locali per lo shuffle del worker principale

Le implementazioni shuffer primari e HDFS scrivono dati di shuffling intermedi su dischi collegati a VM e beneficiano della velocità effettiva e delle IOPS aggiuntive offerte dalle SSD locali. Per facilitare l'allocazione delle risorse, scegli come target circa una partizione SSD locale per 4 vCPU durante la configurazione delle macchine worker principali.

Per collegare le unità SSD locali, passa il flag --num-worker-local-ssds al comando gcloud dataproc clusters create.

Rapporto worker secondario

Poiché i worker secondari scrivono i dati di shuffling nei worker principali, il cluster deve contenere un numero sufficiente di worker principali con risorse di CPU, memoria e disco sufficienti per supportare il carico di shuffling del job. Per i cluster con scalabilità automatica, al fine di impedire al gruppo principale di scalare e causare comportamenti indesiderati, imposta minInstances sul valore maxInstances nel criterio di scalabilità automatica per il gruppo di worker principale.

Se il tuo rapporto tra worker secondari e principali è elevato (ad esempio 10:1), monitora l'utilizzo della CPU, della rete e del disco dei worker principali per determinare se sono sovraccaricati. Per farlo:

  1. Vai alla pagina Istanze VM nella console Google Cloud.

  2. Fai clic sulla casella di controllo sul lato sinistro del worker principale.

  3. Fai clic sulla scheda MONITORAGGIO per visualizzare l'utilizzo della CPU, le IOPS del disco, i byte di rete e altre metriche del worker principale.

Se i lavoratori principali sono sovraccaricati, valuta la possibilità di scalare manualmente i lavoratori principali.

Ridimensionamento del gruppo di worker principale

È possibile fare lo scale up del gruppo worker principale, ma lo scale up del gruppo worker principale può influire negativamente sull'avanzamento del job. Le operazioni che eseguono il downgrade del gruppo di worker principale devono utilizzare il rimozione controllata, che viene attivato impostando il flag --graceful-decommission-timeout.

Cluster con scalabilità automatica:la scalabilità del gruppo di worker principale è disabilitata sui cluster EFM con criteri di scalabilità automatica. Per ridimensionare il gruppo di worker principale in un cluster con scalabilità automatica:

  1. Disabilita la scalabilità automatica.

    gcloud dataproc clusters update \
        --cluster=cluster-name \
        --region=region \
        --disable-autoscaling
    

  2. Scalare il gruppo principale.

    gcloud dataproc clusters update \
        --cluster=cluster-name \
        --region=region \
        --num-workers=num-primary-workers \
        --graceful-decommission-timeout=graceful-decommission-timeout # (if downscaling)
    

  3. Riabilita la scalabilità automatica:

    gcloud dataproc clusters update \
        --cluster=cluster-name \
        --region=region \
        --autoscaling-policy=autoscaling-policy
    

Monitoraggio dell'utilizzo del disco del worker principale

I worker principali devono avere spazio su disco sufficiente per i dati di shuffling del cluster. Puoi monitorare questo comportamento indirettamente tramite la metrica remaining HDFS capacity. Man mano che il disco locale si riempie, lo spazio non è più disponibile per HDFS e la capacità rimanente diminuisce.

Per impostazione predefinita, quando il disco locale di un worker principale supera il 90% della capacità, il nodo viene contrassegnato come UNHEALTHY nella UI del nodo YARN. In caso di problemi di capacità del disco, puoi eliminare i dati inutilizzati da HDFS o fare lo scale up del pool di worker principale.

Tieni presente che i dati di shuffling intermedio in genere non vengono eliminati fino alla fine di un job. Quando utilizzi lo shuffling del worker principale con Spark, questa operazione può richiedere fino a 30 minuti dopo il completamento di un job.

Configurazione avanzata

Partizionamento e parallelismo

Quando invii un job MapReduce o Spark, configura un livello di partizionamento appropriato. La scelta del numero di partizioni di input e di output per una fase di shuffling comporta un compromesso tra le diverse caratteristiche delle prestazioni. È preferibile sperimentare con i valori che funzionano meglio per le forme del tuo lavoro.

Partizioni di input

Il partizionamento di input di MapReduce e Spark è determinato dal set di dati di input. Durante la lettura dei file da Cloud Storage, ogni attività elabora circa una "dimensione del blocco" di dati.

  • Per i job Spark SQL, la dimensione massima della partizione è controllata da spark.sql.files.maxPartitionBytes. Valuta la possibilità di aumentarlo a 1 GB: spark.sql.files.maxPartitionBytes=1073741824.

  • Per i job MapReduce e i RDD Spark, le dimensioni delle partizioni vengono in genere controllate con fs.gs.block.size, che per impostazione predefinita è pari a 128 MB. Ti consigliamo di aumentarlo a 1 GB. Puoi anche impostare proprietà specifiche di InputFormat come mapreduce.input.fileinputformat.split.minsize e mapreduce.input.fileinputformat.split.maxsize

    • Per i job MapReduce: --properties fs.gs.block.size=1073741824
    • Per gli RDD Spark: --properties spark.hadoop.fs.gs.block.size=1073741824

Partizioni di output

Il numero di attività nelle fasi successive è controllato da diverse proprietà. In caso di job di grandi dimensioni che elaborano più di 1 TB, valuta la possibilità di avere almeno 1 GB per partizione.

  • Per i job MapReduce, il numero di partizioni di output è controllato da mapreduce.job.reduces.

  • Per Spark SQL, il numero di partizioni di output è controllato da spark.sql.shuffle.partitions.

  • Per i job Spark che utilizzano l'API RDD, puoi specificare il numero di partizioni di output o impostare spark.default.parallelism.

Ottimizzazione casuale per lo shuffling del worker principale

La proprietà più significativa è --properties yarn:spark.shuffle.io.serverThreads=<num-threads>. Tieni presente che questa è una proprietà YARN a livello di cluster perché il server di shuffle Spark viene eseguito come parte del Gestore nodi. Il valore predefinito è il doppio (2x) di un numero di core sulla macchina (ad esempio 16 thread su n1-highmem-8). Se il valore "Tempo di blocco per la lettura Shuffle" è maggiore di 1 secondo e i worker principali non hanno raggiunto i limiti di rete, CPU o disco, valuta la possibilità di aumentare il numero di thread del server di shuffling.

Sui tipi di macchine più grandi, valuta la possibilità di aumentare il valore di spark.shuffle.io.numConnectionsPerPeer, il valore predefinito è 1. (ad esempio, imposta 5 connessioni per coppia di host).

Aumento dei nuovi tentativi

Il numero massimo di tentativi consentiti per master, attività e fasi dell'app può essere configurato impostando le seguenti proprietà:

yarn:yarn.resourcemanager.am.max-attempts
mapred:mapreduce.map.maxattempts
mapred:mapreduce.reduce.maxattempts
spark:spark.task.maxFailures
spark:spark.stage.maxConsecutiveAttempts

Poiché i master e le attività delle app vengono terminati più spesso nei cluster che utilizzano molte VM prerilasciabili o con scalabilità automatica senza rimozione controllata, l'aumento dei valori delle proprietà precedenti in questi cluster può essere utile (tieni presente che l'utilizzo di EFM con Spark e il ritiro controllato non è supportato).

Rimozione controllata YARN sui cluster EFM

Il decommissioning Graceful YARN può essere utilizzato per rimuovere i nodi rapidamente con un impatto minimo sull'esecuzione delle applicazioni. Per i cluster con scalabilità automatica, il timeout per il ritiro automatico può essere impostato in un AutoscalingPolicy collegato al cluster EFM.

Miglioramenti EFM alla rimozione controllata

  1. Poiché i dati intermedi sono archiviati in un file system distribuito, i nodi possono essere rimossi da un cluster EFM non appena tutti i container in esecuzione su questi nodi sono stati completati. In confronto, i nodi non vengono rimossi sui cluster Dataproc standard fino al completamento dell'applicazione.

  2. La rimozione del nodo non attende il completamento dei master delle app in esecuzione su un nodo. Quando il container master dell'app viene terminato, viene ripianificato su un altro nodo che non è in fase di dismissione. L'avanzamento del job non andrà perso: il nuovo master dell'app recupera rapidamente lo stato dal master dell'app precedente leggendo la cronologia dei job.