Modalità di flessibilità avanzata di Dataproc

Mantieni tutto organizzato con le raccolte Salva e classifica i contenuti in base alle tue preferenze.

La modalità di flessibilità avanzata (EFM) di Dataproc consente di gestire i dati elaborati in ordine casuale per ridurre al minimo i ritardi nell'avanzamento dei job causati dalla rimozione dei nodi da un cluster in esecuzione. La modalità EFM trasferisce i dati elaborati in ordine casuale in una di due modalità selezionabili dall'utente:

  1. Ordine casuale del worker principale. I mappatori scrivono i dati ai worker principali. I worker lavorano da questi nodi remoti durante la fase di riduzione. Questa modalità è disponibile solo ed è consigliata per i job Spark.

  2. HCFS (Hadoop Compatibile File System) in ordine casuale. I mappatori scrivono i dati in un'implementazione HCFS (HDFS per impostazione predefinita). Come nella modalità worker principale, solo i worker principali partecipano a implementazioni HDFS e HCFS (se il comando shuffle HCFS utilizza Cloud Storage Connector, i dati vengono archiviati all'esterno del cluster). Questa modalità può essere utile per i job con piccole quantità di dati, ma a causa di limitazioni di scalabilità, non è consigliata per i job più grandi.

Dato che entrambe le modalità EFM non memorizzano i dati di shuffle intermedi sui worker secondari, la modalità EFM è ideale per i cluster che utilizzano VM prerilasciabili o solo la scalabilità automatica del gruppo di worker secondari.

Limitazioni:

  • I job di Apache YARN che non supportano il trasferimento della proprietà AppMaster possono non riuscire in modalità di flessibilità avanzata (consulta l'articolo Quando attendere il completamento di AppMaster).
  • La modalità di flessibilità avanzata non è consigliata:
    • su un cluster che ha solo worker principali.
  • La modalità di flessibilità avanzata non è supportata:
    • quando è abilitata la scalabilità automatica del worker principale. Nella maggior parte dei casi, i worker principali continueranno a archiviare i dati elaborati in ordine casuale di cui non viene eseguita la migrazione automatica. Scale down del gruppo di worker principale comporta la rimozione dei vantaggi EFM.
    • Quando i job Spark vengono eseguiti su un cluster con ritiro gestito automaticamente (vedi SPARK-20628).

Utilizzo della modalità di flessibilità avanzata

La modalità di flessibilità avanzata è configurata in base al motore di esecuzione e deve essere configurata durante la creazione del cluster.

  • L'implementazione di Spark EFM è configurata con la proprietà del cluster dataproc:efm.spark.shuffle. Valori validi della proprietà:

    • primary-worker per il worker principale in ordine casuale (consigliato)
    • hcfs per la riproduzione casuale basata su HCFS. Questa modalità è ritirata ed è disponibile solo sui cluster che eseguono immagini con versione 1.5. Sconsigliato per i nuovi flussi di lavoro.
  • L'implementazione di Hadoop MapReduce è configurata con la dataproc:efm.mapreduce.shuffle proprietà del cluster. Valori validi della proprietà:

    • hcfs

Esempio: crea un cluster con shuffle worker principale per Spark e shuffle HCFS per MapReduce:

gcloud dataproc clusters create cluster-name \
    --region=region \
    --properties=dataproc:efm.spark.shuffle=primary-worker \
    --properties=dataproc:efm.mapreduce.shuffle=hcfs \
    --worker-machine-type=n1-highmem-8 \
    --num-workers=25 \
    --num-worker-local-ssds=2 \
    --secondary-worker-type=preemptible \
    --secondary-worker-boot-disk-size=500GB \
    --num-secondary-workers=25

Esempio di Apache Spark

  1. Esegui un job WordCount in un testo di Shakespeare pubblico utilizzando il file jar di esempio Spark sul cluster EFM.
    gcloud dataproc jobs submit spark \
        --cluster=cluster-name \
        --region=region \
        --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
        --class=org.apache.spark.examples.JavaWordCount \
        -- gs://apache-beam-samples/shakespeare/macbeth.txt
    

Esempio di Apache Hadoop MapReduce

  1. Esegui un piccolo job di Teragen per generare dati di input in Cloud Storage per un job successivamente di tipo terasort utilizzando il file jar di esempio Mapreduce sul cluster EFM.

    gcloud dataproc jobs submit hadoop \
        --cluster=cluster-name \
        --region=region \
        --jar=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \
        -- teragen 1000 Cloud Storage output URI (for example, gs://terasort/input)
    

  2. Eseguire un job di terze parti sui dati.

    gcloud dataproc jobs submit hadoop \
        --cluster=cluster-name \
        --region=region \
        --jar=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \
        -- terasort gs://terasort/input gs://terasort/output
    

Configurazione delle SSD locali per il worker casuale principale

Le implementazioni shuffle di worker principali e HDFS scrivono dati di shuffle intermedi sui dischi collegati alle VM e traggono vantaggio dalla velocità effettiva e dalle IOPS aggiuntive offerte dalle SSD locali. Per facilitare l'allocazione delle risorse, scegli come target un obiettivo di circa 1 partizione SSD locale per 4 vCPU durante la configurazione delle macchine worker principali.

Per collegare le unità SSD locali, passa il flag --num-worker-local-ssds al comando gcloud dataproc clusters create.

Rapporto worker secondari

Poiché i worker secondari scrivono i dati elaborati in ordine casuale ai worker principali, il cluster deve contenere un numero sufficiente di worker principali con risorse sufficienti per CPU, memoria e disco per supportare il carico casuale del job. Per la scalabilità automatica dei cluster, per impedire la scalabilità automatica del gruppo principale e causare i comportamenti indesiderati, imposta minInstances sul valore maxInstances nel criterio di scalabilità automatica per il gruppo di worker principale.

Se hai un rapporto elevato tra worker principali e secondari (ad esempio 10:1), monitora l'utilizzo della CPU, l'utilizzo della rete e del disco dei worker principali per determinare se sono sovraccarico. Per farlo:

  1. Vai alla pagina Istanze VM in Google Cloud Console.

  2. Fai clic sulla casella di controllo sul lato sinistro del lavoratore principale.

  3. Fai clic sulla scheda MONITORAGGIO per visualizzare l'utilizzo della CPU del worker principale, i IOPS del disco, i byte di rete e altre metriche.

Se i worker principali sono sovraccarichi, valuta la possibilità di scalare manualmente i worker principali.

Ridimensionamento del gruppo di worker principale

Lo scale up del gruppo di worker principale può essere eseguito in modo sicuro, ma lo scale down del gruppo di worker principale può influire negativamente sull'avanzamento del job. Le operazioni che eseguono lo scale down del gruppo di worker principale devono utilizzare il ritiro gestito automaticamente, che è abilitato impostando il flag --graceful-decommission-timeout.

Cluster con scalabilità automatica: la scalabilità del gruppo di worker principale è disabilitata sui cluster EFM con criteri di scalabilità automatica. Per ridimensionare il gruppo di worker principale su un cluster con scalabilità automatica:

  1. Disabilita scalabilità automatica.

    gcloud dataproc clusters update \
        --cluster=cluster-name \
        --region=region \
        --disable-autoscaling
    

  2. Scalare il gruppo principale.

    gcloud dataproc clusters update \
        --cluster=cluster-name \
        --region=region \
        --num-workers=num-primary-workers \
        --graceful-decommission-timeout=graceful-decommission-timeout # (if downscaling)
    

  3. Riattiva la scalabilità automatica:

    gcloud dataproc clusters update \
        --cluster=cluster-name \
        --region=region \
        --autoscaling-policy=autoscaling-policy
    

Monitoraggio dell'utilizzo del disco del worker principale

I worker principali devono disporre di spazio su disco sufficiente per i dati elaborati in ordine dei cluster. Puoi monitorare questa metrica indirettamente tramite la metrica remaining HDFS capacity. Man mano che il disco locale si riempie, lo spazio diventa non disponibile per HDFS e la capacità rimanente diminuisce.

Per impostazione predefinita, quando un disco locale di un worker principale utilizza il 90% della capacità, il nodo viene contrassegnato come NON SALUTE nell'interfaccia utente del nodo YARN. Se riscontri problemi di capacità del disco, puoi eliminare i dati inutilizzati da HDFS o fare lo scale up del pool di worker principale.

Tieni presente che i dati di shuffle intermedi in genere non vengono puliti fino alla fine di un job. Quando utilizzi la modalità casuale del worker principale con Spark, l'operazione può richiedere fino a 30 minuti dopo il completamento di un job.

Configurazione avanzata

Partizionamento e parallelismo

Quando invii un job MapReduce o Spark, configura un livello di partizionamento appropriato. Per decidere il numero di partizioni di input e di output in una fase casuale, è necessario trovare un compromesso tra le diverse caratteristiche di rendimento. È meglio sperimentare valori che siano adatti alle forme di lavoro.

Partizioni di input

MapReduce e partizionamento input Spark sono determinati dal set di dati di input. Durante la lettura dei file da Cloud Storage, ogni attività elabora circa un valore di dimensione di blocco.

  • Per i job Spark SQL, la dimensione massima della partizione è controllata da spark.sql.files.maxPartitionBytes. Valuta di aumentarlo a 1 GB: spark.sql.files.maxPartitionBytes=1073741824.

  • Per i job MapReduce e gli RDD Spark, la dimensione della partizione è generalmente controllata con fs.gs.block.size, che per impostazione predefinita è 128 MB. Ti consigliamo di aumentarlo a 1 GB. Puoi anche impostare InputFormatproprietà specifiche come mapreduce.input.fileinputformat.split.minsize e mapreduce.input.fileinputformat.split.maxsize

    • Per i job MapReduce: --properties fs.gs.block.size=1073741824
    • Per i RDD di Spark: --properties spark.hadoop.fs.gs.block.size=1073741824

Partizioni di output

Il numero di attività nelle fasi successive è controllato da diverse proprietà. Per i job più grandi che trattano più di 1 TB, potresti avere almeno 1 GB per partizione.

  • Per i job MapReduce, il numero di partizioni di output è controllato da mapreduce.job.reduces.

  • Per Spark SQL, il numero di partizioni di output è controllato da spark.sql.shuffle.partitions.

  • Per i job Spark che utilizzano l'API RDD, puoi specificare il numero di partizioni di output o impostare spark.default.parallelism.

Regolazione casuale per l'ordine casuale del worker principale

La proprietà più significativa è --properties yarn:spark.shuffle.io.serverThreads=<num-threads>. Questa è una proprietà YARN a livello di cluster perché il server Spark shuffle viene eseguito come parte di Node Manager. Il valore predefinito è il doppio del numero di core della macchina (ad esempio, 16 thread su un numero n1-highmem-8). Se il valore "Tempo di lettura casuale bloccato" è maggiore di 1 secondo e i worker principali non hanno raggiunto i limiti di rete, CPU o disco, valuta la possibilità di aumentare il numero di thread del server in ordine casuale.

Sui tipi di macchine più grandi, valuta la possibilità di aumentare spark.shuffle.io.numConnectionsPerPeer, che per impostazione predefinita è pari a 1. Ad esempio, impostalo su 5 connessioni per coppia di host.

Aumento dei tentativi

Puoi configurare il numero massimo di tentativi consentiti per master, attività e fasi dell'app impostando le seguenti proprietà:

yarn:yarn.resourcemanager.am.max-attempts
mapred:mapreduce.map.maxattempts
mapred:mapreduce.reduce.maxattempts
spark:spark.task.maxFailures
spark:spark.stage.maxConsecutiveAttempts

Poiché le attività e i master di app vengono terminati più di frequente nei cluster che utilizzano molte VM prerilasciabili o la scalabilità automatica senza ritiro gestito automaticamente, può essere utile aumentare i valori delle proprietà indicate in tali cluster (tieni presente che non è supportato l'utilizzo di EFM con Spark e di ritiro gestito automaticamente).

Configurazione di HDFS per shuffle HCFS

Per migliorare le prestazioni dei brani di grandi dimensioni, puoi diminuire il numero di blocchi nel NameNode impostando dfs.namenode.fslock.fair=false. Tieni presente che rischia di esaurire le singole richieste, ma potrebbe migliorare la velocità effettiva a livello di cluster. Per migliorare ulteriormente le prestazioni di NameNode, puoi collegare le SSD locali al nodo master impostando --num-master-local-ssds. Puoi anche aggiungere SSD locali ai worker principali per migliorare le prestazioni di DataNode impostando --num-worker-local-ssds.

Altri file system compatibili con Hadoop per HCFS shuffle

Per impostazione predefinita, i dati di riproduzione casuale EFM HCFS sono scritti in HDFS, ma puoi utilizzare qualsiasi HCFS (Hadoop Compatibility File System). Ad esempio, puoi decidere di scrivere in ordine casuale su Cloud Storage o su un HDFS diverso del cluster. Per specificare un file system, puoi indirizzare fs.defaultFS al file system di destinazione quando invii un job al tuo cluster.

Ritiro gestito tramite YARN sui cluster EFM

Il servizio YARN Graceful Decommissioning può essere utilizzato per rimuovere rapidamente i nodi con un impatto minimo sull'esecuzione delle applicazioni. Per i cluster con scalabilità automatica, il timeout di ritiro gestito automaticamente può essere impostato in un AutoscalingPolicy collegato al cluster EFM.

Miglioramenti della funzionalità EFM in caso di ritiro gestito con esito positivo

  1. Poiché i dati intermedi sono archiviati in un file system distribuito, i nodi possono essere rimossi da un cluster EFM non appena tutti i container in esecuzione su tali nodi sono terminati. Invece, i nodi non vengono rimossi nei cluster Dataproc standard fino al completamento dell'applicazione.

  2. La rimozione del nodo non richiede il completamento dei master dell'app su un nodo. Quando il container dell'app master viene terminato, viene ripianificato su un altro nodo che non è in fase di ritiro. L'avanzamento del job non viene perso: il nuovo master dell'app recupera rapidamente lo stato del master dell'app precedente leggendo la cronologia dei job.

Utilizzo del ritiro gestito automaticamente su un cluster EFM con MapReduce

  1. Crea un cluster EFM con un numero uguale di worker principali e secondari.

    gcloud dataproc clusters create cluster-name \
        --properties=dataproc:efm.mapreduce.shuffle=hcfs \
        --region=region \
        --num-workers=5 \
        --num-secondary-workers=5
    

  2. Esegui un job mappareduce che calcola il valore di pi greco utilizzando il file jar di esempio mapreduce nel cluster.

    gcloud dataproc jobs submit hadoop \
        --cluster=cluster-name \
        --region=region \
        --jar=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \
        -- pi 1000 10000000
    

  3. Durante l'esecuzione del job, fai lo scale down del cluster con il ritiro gestito automaticamente.

    gcloud dataproc clusters update cluster-name \
        --region=region \
        --num-secondary-workers=0 \
        --graceful-decommission-timeout=1h
    
    I nodi verranno rimossi dal cluster rapidamente prima del termine del job, riducendo al minimo la perdita dell'avanzamento del job. Si possono verificare pause temporanee nell'avanzamento del job per i seguenti motivi:

    • Failover app master. Se l'avanzamento del job scende allo 0%, e poi passa immediatamente al valore precedente, il master dell'app potrebbe essere terminato e un nuovo master dell'app ha recuperato il suo stato. Ciò non dovrebbe influire in modo significativo sull'avanzamento del job poiché il failover avviene rapidamente.
    • Prerilascio della VM. Poiché HDFS conserva solo gli output completi delle attività della mappa, non parziali, possono verificarsi pause temporanee nell'avanzamento dei job quando una VM viene prerilasciata durante l'esecuzione di un'attività di mappatura.