Scalabilità automatica dei cluster Dataproc

Che cos'è la scalabilità automatica?

Valutare il numero "giusto" di worker (nodi) del cluster per un carico di lavoro è difficile e spesso non è l'ideale. La scalabilità dei cluster avviata dall'utente risolve parzialmente questa sfida, ma richiede il monitoraggio dell'utilizzo del cluster e l'intervento manuale.

L'API AutoscalingPolicies di Dataproc fornisce un meccanismo per automatizzare la gestione delle risorse dei cluster e abilita la scalabilità automatica delle VM worker cluster. Autoscaling Policy è una configurazione riutilizzabile che descrive come devono scalare i worker del cluster che utilizzano il criterio di scalabilità automatica. Definisce limiti di scalabilità, frequenza e aggressività per fornire un controllo granulare sulle risorse del cluster per tutta la sua durata.

Quando utilizzare la scalabilità automatica

Utilizza la scalabilità automatica:

su cluster che archiviano i dati in servizi esterni come Cloud Storage o BigQuery

su cluster che elaborano molti job

per fare lo scale up dei cluster con un solo job

con la modalità di flessibilità avanzata per i job batch Spark

La scalabilità automatica non è consigliata con/per:

  • HDFS::la scalabilità automatica non è concepita per scalare HDFS su cluster per i seguenti motivi:

    1. L'utilizzo di HDFS non è un indicatore per la scalabilità automatica.
    2. I dati HDFS sono ospitati solo sui worker principali. Il numero di worker principali deve essere sufficiente per ospitare tutti i dati HDFS.
    3. La disattivazione di DataNodes HDFS può ritardare la rimozione dei worker. Datanodes copia i blocchi HDFS in altri DataNodes prima che un worker venga rimosso. A seconda delle dimensioni dei dati e del fattore di replica, questo processo può richiedere ore.
  • Etichette dei nodi YARN: la scalabilità automatica non supporta le etichette dei nodi YARN né la proprietà dataproc:am.primary_only a causa di YARN-9088. YARN segnala in modo errato le metriche del cluster quando vengono utilizzate le etichette dei nodi.

  • Stream strutturato Spark: la scalabilità automatica non supporta lo streaming strutturato Spark (vedi Scalabilità automatica e streaming strutturato Spark).

  • Cluster inattivi: la scalabilità automatica non è consigliata per scalare un cluster alla dimensione minima quando il cluster è inattivo. Poiché la creazione di un nuovo cluster richiede la stessa rapidità del ridimensionamento di uno, ti consigliamo di eliminare i cluster inattivi e ricrearli. I seguenti strumenti supportano questo modello "temporaneo":

    Utilizza Workflows Dataproc per pianificare una serie di job su un cluster dedicato, quindi elimina il cluster al termine dei job. Per un'orchestrazione più avanzata, utilizza Cloud Composer, che si basa su Apache Airflow.

    Per i cluster che elaborano query ad hoc o carichi di lavoro pianificati esternamente, utilizza l'eliminazione pianificata dei cluster per eliminare il cluster dopo un periodo o una durata di inattività specificati oppure in un momento specifico.

  • Carichi di lavoro di dimensioni diverse: quando job di piccole e grandi dimensioni vengono eseguiti su un cluster, lo scale down rimozione controllata attende il completamento dei job di grandi dimensioni. Il risultato è che un job a lunga esecuzione ritarda la scalabilità automatica delle risorse per i job più piccoli in esecuzione sul cluster fino al termine del job a lunga esecuzione. Per evitare questo risultato, raggruppa in un cluster i job più piccoli di dimensioni simili e isola ogni job di lunga durata in un cluster separato.

Abilitazione della scalabilità automatica

Per abilitare la scalabilità automatica su un cluster:

  1. Crea un criterio di scalabilità automatica.

  2. Una di queste soglie:

    1. Crea un cluster con scalabilità automatica oppure
    2. Abilita la scalabilità automatica su un cluster esistente.

Crea un criterio di scalabilità automatica

Comando g-cloud

Puoi utilizzare il comando gcloud dataproc autoscaling-policies import per creare un criterio di scalabilità automatica. Legge un file YAML locale che definisce un criterio di scalabilità automatica. Il formato e i contenuti del file devono corrispondere agli oggetti di configurazione e ai campi definiti dall'API REST autoscalingPolicies.

L'esempio YAML seguente definisce un criterio che specifica tutti i campi obbligatori. Fornisce inoltre i valori minInstances e maxInstances per i worker principali, il valore maxInstances per i worker secondari (prerilasciabili) e specifica un valore cooldownPeriod di 4 minuti (il valore predefinito è 2 minuti). L'elemento workerConfig configura i worker principali. In questo esempio, minInstances e maxInstances sono impostati sullo stesso valore per evitare di scalare i worker principali.

workerConfig:
  minInstances: 10
  maxInstances: 10
secondaryWorkerConfig:
  maxInstances: 50
basicAlgorithm:
  cooldownPeriod: 4m
  yarnConfig:
    scaleUpFactor: 0.05
    scaleDownFactor: 1.0
    gracefulDecommissionTimeout: 1h

Ecco un altro esempio YAML che specifica tutti i campi dei criteri di scalabilità automatica facoltativi e obbligatori.

workerConfig:
  minInstances: 10
  maxInstances: 10
  weight: 1
secondaryWorkerConfig:
  minInstances: 0
  maxInstances: 100
  weight: 1
basicAlgorithm:
  cooldownPeriod: 2m
  yarnConfig:
    scaleUpFactor: 0.05
    scaleDownFactor: 1.0
    scaleUpMinWorkerFraction: 0.0
    scaleDownMinWorkerFraction: 0.0
    gracefulDecommissionTimeout: 1h

Esegui il seguente comando gcloud da un terminale locale o in Cloud Shell per creare il criterio di scalabilità automatica. Specifica un nome per la norma. Questo nome diventerà il criterio id, che potrai utilizzare nei comandi gcloud successivi per fare riferimento al criterio. Utilizza il flag --source per specificare il percorso file locale e il nome file del file YAML del criterio di scalabilità automatica da importare.

gcloud dataproc autoscaling-policies import policy-name \
    --source=filepath/filename.yaml \
    --region=region

API REST

Crea un criterio di scalabilità automatica definendo un AutoscalingPolicy come parte di una richiesta autoscalingPolicies.create.

Console

Per creare un criterio di scalabilità automatica, seleziona CREA CRITERIO dalla pagina Criteri di scalabilità automatica di Dataproc utilizzando la console Google Cloud. Nella pagina Crea criterio, puoi selezionare un riquadro dei suggerimenti sui criteri per completare i campi dei criteri di scalabilità automatica per un tipo di job o un obiettivo di scalabilità specifici.

Crea un cluster con scalabilità automatica

Dopo aver creato un criterio di scalabilità automatica, crea un cluster che utilizzerà il criterio di scalabilità automatica. Il cluster deve trovarsi nella stessa regione del criterio di scalabilità automatica.

Comando g-cloud

Esegui il seguente comando gcloud da un terminale locale o in Cloud Shell per creare un cluster con scalabilità automatica. Fornisci un nome per il cluster e utilizza il flag --autoscaling-policy per specificare policy id (il nome del criterio specificato quando hai creato il criterio) o il criterio resource URI (resource name) (consulta i campi AutoscalingPolicy id e name).

gcloud dataproc clusters create cluster-name \
    --autoscaling-policy=policy id or resource URI \
    --region=region

API REST

Crea un cluster a scalabilità automatica includendo un elemento AutoscalingConfig come parte di una richiesta clusters.create.

Console

Puoi selezionare un criterio di scalabilità automatica esistente da applicare a un nuovo cluster dalla sezione Criterio di scalabilità automatica del riquadro Configura cluster nella pagina Crea un cluster di Dataproc della console Google Cloud.

Abilita la scalabilità automatica su un cluster esistente

Dopo aver creato un criterio di scalabilità automatica, puoi abilitarlo su un cluster esistente nella stessa regione.

Comando g-cloud

Esegui il seguente comando gcloud da un terminale locale o in Cloud Shell per abilitare un criterio di scalabilità automatica su un cluster esistente. Fornisci il nome del cluster e utilizza il flag --autoscaling-policy per specificare policy id (il nome del criterio specificato quando hai creato il criterio) o il criterio resource URI (resource name) (consulta i campi AutoscalingPolicy id e name).

gcloud dataproc clusters update cluster-name \
    --autoscaling-policy=policy id or resource URI \
    --region=region

API REST

Per abilitare un criterio di scalabilità automatica su un cluster esistente, imposta il valore AutoscalingConfig.policyUri del criterio nella risorsa updateMask di una richiesta clusters.patch.

Console

Attualmente, l'abilitazione di un criterio di scalabilità automatica su un cluster esistente non è supportata nella console Google Cloud.

Utilizzo dei criteri multi-cluster

  • Un criterio di scalabilità automatica definisce il comportamento di scalabilità che può essere applicato a più cluster. Un criterio di scalabilità automatica viene applicato al meglio in più cluster quando i cluster condivideranno carichi di lavoro simili o eseguiranno job con pattern di utilizzo delle risorse simili.

  • Puoi aggiornare un criterio utilizzato da più cluster. Gli aggiornamenti influiscono immediatamente sul comportamento della scalabilità automatica per tutti i cluster che utilizzano il criterio (vedi autoscalingPolicies.update). Se non vuoi che un aggiornamento del criterio venga applicato a un cluster che utilizza il criterio, disabilita la scalabilità automatica nel cluster prima di aggiornare il criterio.

Comando g-cloud

Esegui il seguente comando gcloud da un terminale locale o in Cloud Shell per disabilitare la scalabilità automatica su un cluster.

gcloud dataproc clusters update cluster-name --disable-autoscaling \
    --region=region

API REST

Per disabilitare la scalabilità automatica su un cluster, imposta AutoscalingConfig.policyUri sulla stringa vuota e imposta update_mask=config.autoscaling_config.policy_uri in una richiesta clusters.patch.

Console

Al momento, la disabilitazione della scalabilità automatica su un cluster non è supportata nella console Google Cloud.

Come funziona la scalabilità automatica

La scalabilità automatica controlla le metriche YARN di Hadoop del cluster allo scadere di ogni periodo di "raffreddamento" per determinare se scalare il cluster e, in questo caso, l'entità dell'aggiornamento.

  1. Il valore della metrica di risorse YARN in sospeso (memoria in attesa o core in attesa) determina se fare lo scale up o lo scale down. Un valore maggiore di 0 indica che i job YARN sono in attesa di risorse e che potrebbe essere necessario lo scale up. Un valore 0 indica che YARN ha risorse sufficienti in modo che non sia necessario eseguire uno scale down o altre modifiche.

    Se la risorsa in attesa è maggiore di 0:

    $estimated\_worker\_count =$

    \[ \Biggl \lceil MEDIA\ durante\ cooldown\ periodo\Big(\frac{Pending + disponibili + allocato + riservato}{Risorsa\ per\ worker}\Big)\Biggr\rceil \]

    Se la risorsa in attesa è pari a 0:

    $estimated\_worker\_count =$

    \[ \Biggl \lceil MEDIA\ durante\ cooldown\ periodo\Big(\frac{Allocato + Riservato}{Risorsa\ per\ worker}\Big)\Biggr\rceil \]

    Per impostazione predefinita, il gestore della scalabilità automatica monitora la risorsa di memoria YARN. Se abiliti la scalabilità automatica basata su core, vengono monitorati sia i core YARN sia i core YARN: estimated_worker_count viene valutato separatamente per memoria e core e viene selezionato il numero di worker più ampio risultante.

    $estimated\_worker\_count =$

    \[ max(estimated\_worker\_count\_by\_memory,\ estimated\_worker\_count\_by\_cores) \]

    \[ estimated\ \Delta worker = Estimated\_worker\_count - current\_worker\_count \]

  2. Data la modifica stimata necessaria al numero di worker, la scalabilità automatica utilizza un valore scaleUpFactor o scaleDownFactor per calcolare la modifica effettiva del numero di worker:

    if estimated Δworkers > 0:
      actual Δworkers = ROUND_UP(estimated Δworkers * scaleUpFactor)
      # examples:
      # ROUND_UP(estimated Δworkers=5 * scaleUpFactor=0.5) = 3
      # ROUND_UP(estimated Δworkers=0.8 * scaleUpFactor=0.5) = 1
    else:
      actual Δworkers = ROUND_DOWN(estimated Δworkers * scaleDownFactor)
      # examples:
      # ROUND_DOWN(estimated Δworkers=-5 * scaleDownFactor=0.5) = -2
      # ROUND_DOWN(estimated Δworkers=-0.8 * scaleDownFactor=0.5) = 0
      # ROUND_DOWN(estimated Δworkers=-1.5 * scaleDownFactor=0.5) = 0
    
    Un fattore scaleUpFactor o scaleDownFactor pari a 1,0 significa che la scalabilità automatica verrà scalata in modo che la risorsa in attesa/disponibile sia 0 (utilizzo perfetto).

  3. Una volta calcolata la modifica al numero di worker, scaleUpMinWorkerFraction e scaleDownMinWorkerFraction fungono da soglia per determinare se la scalabilità automatica scalerà il cluster. Una piccola frazione indica che la scalabilità automatica dovrebbe scalare anche se il valore Δworkers è ridotto. Una frazione maggiore indica che la scalabilità dovrebbe avvenire solo quando Δworkers è grande.

    IF (Δworkers >  scaleUpMinWorkerFraction * current_worker_count) then scale up
    
    OPPURE
    IF (abs(Δworkers) >  scaleDownMinWorkerFraction * current_worker_count),
    THEN scale down.
    

  4. Se il numero di worker da scalare è sufficientemente elevato da attivare la scalabilità, la scalabilità automatica utilizza i limiti maxInstances minInstances di workerConfig e secondaryWorkerConfig e weight (rapporto tra worker principali e secondari) per determinare come suddividere il numero di worker tra i gruppi di istanza worker principali e secondari. Il risultato di questi calcoli è la modifica finale della scalabilità automatica del cluster per il periodo di scalabilità.

  5. Le richieste di scalabilità automatica di scalabilità verso il basso verranno annullate sui cluster creati con versioni delle immagini successive a 2.0.57 e 2.1.5 se:

    1. è in corso uno scale down con un valore di timeout per la rimozione controllata non zero; e
    2. il numero di worker YARN attivi ("lavoratori attivi") più la modifica del numero totale di worker consigliati dal gestore della scalabilità automatica (Δworkers) è uguale o superiore a DECOMMISSIONING worker YARN ("lavoratori in dismissione"), come mostrato nella seguente formula:

      IF (active workers + Δworkers ≥ active workers + decommissioning workers)
      THEN cancel the scaledown operation
      

    Per un esempio di annullamento dello scale down, consulta Quando la scalabilità automatica annulla un'operazione di scale down?.

Suggerimenti per la configurazione della scalabilità automatica

Evita di scalare i worker principali

I worker principali eseguono i nodi di dati HDFS, mentre i worker secondari sono di solo calcolo. L'utilizzo di worker secondari consente di scalare in modo efficiente le risorse di calcolo senza la necessità di eseguire il provisioning dello spazio di archiviazione, determinando capacità di scalabilità più rapide. I namenodi HDFS possono avere più condizioni di gara che causano il danneggiamento di HDFS e la dismissione si blocca in modo indenne. Per evitare questo problema, evita di scalare i worker principali. Ad esempio: workerConfig: minInstances: 10 maxInstances: 10 secondaryWorkerConfig: minInstances: 0 maxInstances: 100

È necessario apportare alcune modifiche al comando di creazione del cluster:

  1. Imposta --num-workers=10 in modo che corrisponda alle dimensioni del gruppo worker principale del criterio di scalabilità automatica.
  2. Imposta --secondary-worker-type=non-preemptible per configurare i worker secondari in modo che non siano prerilasciabili. (a meno che non siano richieste VM prerilasciabili).
  3. Copia la configurazione hardware dai worker principali ai worker secondari. Ad esempio, imposta --secondary-worker-boot-disk-size=1000GB in modo che corrisponda a --worker-boot-disk-size=1000GB.

Utilizza la modalità di flessibilità avanzata per i job batch Spark

Utilizza la modalità di flessibilità avanzata (EFM) con la scalabilità automatica per:

consente di fare lo scale down più rapido del cluster mentre i job sono in esecuzione

prevengono l'interruzione dei job in esecuzione a causa dello scale down del cluster

ridurre al minimo le interruzioni dei job in esecuzione a causa del prerilascio dei Worker secondari prerilasciabili

Con EFM abilitata, il timeout per il ritiro gestito automaticamente di un criterio di scalabilità automatica deve essere impostato su 0s. Il criterio di scalabilità automatica deve scalare automaticamente solo i worker secondari.

Scegliere un timeout per la rimozione controllata

La scalabilità automatica supporta il decommissione gestito tramite YARN durante la rimozione dei nodi da un cluster. Il ritiro gestito automaticamente consente alle applicazioni di terminare lo shuffling dei dati tra le fasi per evitare di ritardare l'avanzamento del job. Il ⁠tempo di ritiro gestito automaticamente fornito in un criterio di scalabilità automatica è il limite superiore della durata di attesa da parte di YARN per le applicazioni in esecuzione (applicazioni in esecuzione quando è iniziato il ritiro) prima di rimuovere i nodi.

Quando un processo non viene completato entro il periodo di timeout per la rimozione controllata specificato, il nodo worker viene arrestato forzatamente, causando potenzialmente la perdita di dati o l'interruzione del servizio. Per evitare questa possibilità, imposta il timeout per il ritiro gestito automaticamente su un valore più lungo del job più lungo che verrà elaborato dal cluster. Ad esempio, se prevedi che il job più lungo venga eseguito per un'ora, imposta il timeout su almeno un'ora (1h).

Valuta la possibilità di eseguire la migrazione di job che richiedono più di un'ora ai propri cluster temporanei per evitare di bloccare la rimozione controllata.

Configurazione di scaleUpFactor

scaleUpFactor controlla l'aggressività con cui il gestore della scalabilità automatica esegue lo scale up di un cluster. Specifica un numero compreso tra 0.0 e 1.0 per impostare il valore frazionario della risorsa YARN in attesa che causa l'aggiunta di nodi.

Ad esempio, se ci sono 100 container in attesa che richiedono 512 MB ciascuno, ci sono 50 GB di memoria YARN in attesa. Se scaleUpFactor è 0.5, il gestore della scalabilità automatica aggiungerà un numero sufficiente di nodi per aggiungere 25 GB di memoria YARN. Allo stesso modo, se è 0.1, il gestore della scalabilità automatica aggiungerà un numero sufficiente di nodi per 5 GB. Tieni presente che questi valori corrispondono alla memoria YARN, non alla memoria totale fisicamente disponibile su una VM.

Un buon punto di partenza è 0.05 per i job MapReduce e i job Spark con l'allocazione dinamica abilitata. Per i job Spark con un numero di esecutori fisso e i job Tez, utilizza 1.0. Se il valore di scaleUpFactor è 1.0, la scalabilità automatica verrà scalata in modo che la risorsa in attesa/disponibile sia 0 (utilizzo perfetto).

Configurazione di scaleDownFactor

scaleDownFactor controlla l'aggressività con cui il gestore della scalabilità automatica esegue lo scale down di un cluster. Specifica un numero compreso tra 0.0 e 1.0 per impostare il valore frazionario della risorsa YARN disponibile che causa la rimozione del nodo.

Lascia questo valore su 1.0 per la maggior parte dei cluster multi-job che richiedono spesso lo scale up e lo scale down. A causa della rimozione controllata, le operazioni di scale down sono notevolmente più lente rispetto a quelle di scale up. L'impostazione di scaleDownFactor=1.0 consente di impostare una percentuale di scale down aggressiva, che riduce al minimo il numero di operazioni di scale down necessarie per raggiungere le dimensioni appropriate del cluster.

Per i cluster che richiedono maggiore stabilità, imposta un valore scaleDownFactor più basso per una percentuale di scale down più lenta.

Imposta questo valore su 0.0 per impedire lo scale down del cluster, ad esempio quando utilizzi cluster temporanei o con un solo job.

Impostazione di scaleUpMinWorkerFraction e scaleDownMinWorkerFraction

scaleUpMinWorkerFraction e scaleDownMinWorkerFraction vengono utilizzati con scaleUpFactor o scaleDownFactor e hanno valori predefiniti di 0.0. Rappresentano le soglie con cui il gestore della scalabilità automatica eseguirà lo scale up o fare lo scale down del cluster: l'aumento o la riduzione del valore frazionario minimo delle dimensioni del cluster necessarie per inviare richieste di scale up o scale down.

Esempi: il gestore della scalabilità automatica non invierà una richiesta di aggiornamento per aggiungere 5 worker a un cluster con 100 nodi, a meno che scaleUpMinWorkerFraction non sia inferiore o uguale a 0.05 (5%). Se è impostato su 0.1, il gestore della scalabilità automatica non invierà la richiesta di scale up del cluster. Allo stesso modo, se scaleDownMinWorkerFraction è 0.05, il gestore della scalabilità automatica non emetterà una richiesta di aggiornamento per rimuovere i nodi da un cluster con 100 nodi, a meno che non vengano rimossi almeno 5 nodi.

Il valore predefinito 0.0 indica che non è presente alcuna soglia.

L'impostazione di scaleDownMinWorkerFractionthresholds su cluster di grandi dimensioni (più di 100 nodi) per evitare operazioni di scalabilità piccole e non necessarie è vivamente consigliata.

Scegliere un periodo di attesa

cooldownPeriod imposta un periodo di tempo durante il quale il gestore della scalabilità automatica non invierà richieste per modificare le dimensioni del cluster. Puoi utilizzarlo per limitare la frequenza delle modifiche del gestore della scalabilità automatica alle dimensioni del cluster.

Il valore minimo e predefinito cooldownPeriod è di due minuti. Se in un criterio viene impostato un valore cooldownPeriod più breve, le modifiche ai carichi di lavoro influiranno più rapidamente sulle dimensioni del cluster, ma i cluster potrebbero fare lo scale up e lo scale down inutilmente. Si consiglia di impostare scaleUpMinWorkerFraction e scaleDownMinWorkerFraction di un criterio su un valore diverso da zero quando si utilizza un cooldownPeriod più corto. Ciò garantisce che il cluster faccia lo scale up o lo scale down solo quando la variazione nell'utilizzo delle risorse è sufficiente per garantire un aggiornamento del cluster.

Se il carico di lavoro è sensibile alle modifiche delle dimensioni del cluster, puoi aumentare il periodo di attesa. Ad esempio, se stai eseguendo un job di elaborazione batch, puoi impostare il periodo di attesa su 10 minuti o più. Prova diversi periodi di attesa per trovare il valore più adatto al tuo carico di lavoro.

Limiti di conteggio dei worker e pesi dei gruppi

Ogni gruppo di worker ha minInstances e maxInstances che configurano un limite fisso per le dimensioni di ogni gruppo.

Ogni gruppo include anche un parametro denominato weight che configura il bilanciamento target tra i due gruppi. Tieni presente che questo parametro è solo un suggerimento e, se un gruppo raggiunge la dimensione minima o massima, i nodi verranno aggiunti o rimossi solo dall'altro gruppo. Di conseguenza, puoi quasi sempre lasciare weight il valore predefinito di 1.

Abilita la scalabilità automatica basata su core

Per impostazione predefinita, YARN utilizza le metriche di memoria per l'allocazione delle risorse. Per le applicazioni che utilizzano molta CPU, una best practice consiste nel configurare YARN in modo da utilizzare il calcolatore delle risorse dominante. A questo scopo, imposta la seguente proprietà quando crei un cluster:

capacity-scheduler:yarn.scheduler.capacity.resource-calculator=org.apache.hadoop.yarn.util.resource.DominantResourceCalculator

Metriche e log di scalabilità automatica

Le risorse e gli strumenti seguenti possono aiutarti a monitorare le operazioni di scalabilità automatica e il loro effetto sul cluster e sui relativi job.

Cloud Monitoring

Utilizza Cloud Monitoring per:

  • visualizza le metriche utilizzate dalla scalabilità automatica
  • visualizza il numero di gestori dei nodi nel cluster
  • comprendere perché la scalabilità automatica ha o non ha scalato il cluster scalabilità automatica-stackdriver1 scalabilità automatica-stackdriver2 scalabilità automatica-stackdriver3

Cloud Logging

Utilizza Cloud Logging per visualizzare i log del gestore della scalabilità automatica di Cloud Dataproc.

1) Trova i log per il tuo cluster.

autoscaling-logs-for-cluster

2) Seleziona dataproc.googleapis.com/autoscaler.

scalabilità automatica-file-log

3) Espandi i messaggi di log per visualizzare il campo status. I log sono in formato JSON, un formato leggibile dalle macchine.

scalabilità automatica-tre-log autoscaling-update-operation

4) Espandi il messaggio di log per visualizzare i suggerimenti di scalabilità, le metriche utilizzate per le decisioni di scalabilità, la dimensione originale del cluster e le nuove dimensioni del cluster di destinazione.

messaggio-consiglio-scalabilità automatica

Background: scalabilità automatica con Apache Hadoop e Apache Spark

Le seguenti sezioni descrivono in che modo la scalabilità automatica interagisce (o meno) con Hadoop YARN e Hadoop Mapreduce e con Apache Spark, Spark Streaming e Spark Structured Streaming.

Metriche YARN Hadoop

La scalabilità automatica si basa sulle seguenti metriche YARN di Hadoop:

  1. Allocated resource si riferisce alla risorsa YARN totale occupata dall'esecuzione dei container nell'intero cluster. Se sono presenti 6 container in esecuzione che possono utilizzare fino a 1 unità di risorsa, le risorse allocate sono 6.

  2. Available resource è una risorsa YARN nel cluster non utilizzata dai container allocati. Se sono presenti 10 unità di risorse in tutti i gestori dei nodi e sono allocate 6 unità di risorse, le risorse disponibili sono 4. Se nel cluster sono disponibili risorse (non utilizzate), la scalabilità automatica potrebbe rimuovere worker dal cluster.

  3. Pending resource è la somma delle richieste di risorse YARN per i container in attesa. I container in attesa sono in attesa di spazio per l'esecuzione in YARN. La risorsa In attesa è diversa da zero solo se la risorsa disponibile è pari a zero o è troppo piccola per essere allocata al container successivo. Se sono presenti container in attesa, la scalabilità automatica potrebbe aggiungere worker al cluster.

Puoi visualizzare queste metriche in Cloud Monitoring. Per impostazione predefinita, la memoria YARN sarà pari a 0,8 * di memoria totale sul cluster, con la memoria rimanente riservata ad altri daemon e all'uso del sistema operativo, come la cache della pagina. Puoi eseguire l'override del valore predefinito con l'impostazione di configurazione YARN "yarn.nodemanager.resource.memory-mb" (vedi Apache Hadoop YARN, HDFS, Spark e proprietà correlate).

Scalabilità automatica e MapReduce di Hadoop

MapReduce esegue ogni mappa e riduce l'attività come contenitore YARN separato. Quando inizia un job, MapReduce invia richieste container per ogni attività di mappa, causando un picco significativo nella memoria YARN in attesa. Al termine delle attività sulle mappe, la memoria in sospeso diminuisce.

Al termine di mapreduce.job.reduce.slowstart.completedmaps (95% per impostazione predefinita su Dataproc), MapReduce mette in coda le richieste container per tutti i riduttori, causando un ulteriore picco di memoria in attesa.

A meno che la mappatura e la riduzione delle attività non richiedano diversi minuti o più tempo, non impostare un valore alto per la scalabilità automatica di scaleUpFactor. L'aggiunta di worker al cluster richiede almeno 1,5 minuti, quindi assicurati che il lavoro in attesa sia sufficiente per utilizzare il nuovo worker per diversi minuti. Un buon punto di partenza è impostare scaleUpFactor su 0,05 (5%) o 0,1 (10%) di memoria in attesa.

Scalabilità automatica e Spark

Spark aggiunge un ulteriore livello di pianificazione oltre a YARN. In particolare, l'allocazione dinamica di Spark Core effettua richieste a YARN per consentire ai container di eseguire esecutori Spark, quindi pianifica le attività Spark sui thread su questi esecutori. I cluster Dataproc abilitano l'allocazione dinamica per impostazione predefinita, perciò gli esecutori vengono aggiunti e rimossi in base alle esigenze.

Spark richiede sempre a YARN i container, ma senza l'allocazione dinamica, richiede i container solo all'inizio del job. Con l'allocazione dinamica, rimuove i container o ne chiede di nuovi, se necessario.

Spark parte da un numero ridotto di esecutori, due in cluster con scalabilità automatica, e continua a raddoppiare il numero di esecutori mentre ci sono attività in backlog. Questa operazione ottimizza la memoria in attesa (meno picchi di memoria in sospeso). Ti consigliamo di impostare la scalabilità automatica scaleUpFactor su un numero elevato, ad esempio 1,0 (100%), per i job Spark.

Disabilitazione dell'allocazione dinamica Spark

Se esegui job Spark separati che non traggono vantaggio dall'allocazione dinamica Spark, puoi disabilitare l'allocazione dinamica Spark impostando spark.dynamicAllocation.enabled=false e spark.executor.instances. Puoi comunque utilizzare la scalabilità automatica per fare lo scale up e lo scale down dei cluster durante l'esecuzione dei job Spark separati.

Job Spark con dati memorizzati nella cache

Imposta spark.dynamicAllocation.cachedExecutorIdleTimeout o annulla la memorizzazione nella cache dei set di dati quando non sono più necessari. Per impostazione predefinita, Spark non rimuove gli esecutori con dati memorizzati nella cache, il che impedirebbe lo scale down del cluster.

Scalabilità automatica e Spark Streaming

  1. Poiché Spark Streaming dispone di una propria versione di allocazione dinamica che utilizza indicatori specifici per i flussi di dati per aggiungere e rimuovere esecutori, imposta spark.streaming.dynamicAllocation.enabled=true e disabilita l'allocazione dinamica di Spark Core impostando spark.dynamicAllocation.enabled=false.

  2. Non utilizzare il ritiro gestito automaticamente (scalabilità automatica gracefulDecommissionTimeout) con i job Spark Streaming. Per rimuovere in modo sicuro i worker con scalabilità automatica, configura il checkpoint per la tolleranza di errore.

In alternativa, per utilizzare Spark Streaming senza scalabilità automatica:

  1. Disabilita l'allocazione dinamica di Spark Core (spark.dynamicAllocation.enabled=false) e
  2. Imposta il numero di esecutori (spark.executor.instances) per il tuo job. Vedi Proprietà dei cluster.

Scalabilità automatica e streaming strutturato Spark

La scalabilità automatica non è compatibile con lo streaming strutturato Spark, poiché al momento il flusso strutturato Spark non supporta l'allocazione dinamica (vedi SPARK-24815: lo streaming strutturato deve supportare l'allocazione dinamica).

Controllo della scalabilità automatica tramite partizionamento e parallelismo

Sebbene il parallelismo sia solitamente impostato o determinato dalle risorse del cluster (ad esempio, il numero di HDFS blocca i controlli in base al numero di attività), con la scalabilità automatica si applica il contrario: la scalabilità automatica imposta il numero di worker in base al parallelismo del job. Di seguito sono riportate le linee guida per aiutarti a impostare il parallelismo dei job:

  • Mentre Dataproc imposta il numero predefinito di MapReduce per ridurre le attività in base alla dimensione iniziale del cluster, puoi impostare mapreduce.job.reduces per aumentare il parallelismo della fase di riduzione.
  • Il parallelismo di Spark SQL e Dataframe è determinato da spark.sql.shuffle.partitions, il cui valore predefinito è 200.
  • Il valore predefinito delle funzioni RDD di Spark è spark.default.parallelism, impostato sul numero di core sui nodi worker all'avvio del job. Tuttavia, tutte le funzioni RDD che creano shuffling prevedono un parametro per il numero di partizioni, che sostituisce spark.default.parallelism.

Devi assicurarti che i dati siano partizionati in modo uniforme. Se si verifica un disallineamento significativo delle chiavi, una o più attività possono richiedere molto più tempo rispetto alle altre, con un conseguente utilizzo ridotto.

Impostazioni predefinite delle proprietà Spark/Hadoop predefinite di scalabilità automatica

I cluster a scalabilità automatica hanno valori predefiniti delle proprietà dei cluster che consentono di evitare errori dei job quando i worker principali vengono rimossi o i worker secondari vengono prerilasciati. Puoi eseguire l'override di questi valori predefiniti quando crei un cluster con scalabilità automatica (vedi Proprietà dei cluster).

L'impostazione predefinita consente di aumentare il numero massimo di nuovi tentativi per attività, master delle applicazioni e fasi:

yarn:yarn.resourcemanager.am.max-attempts=10
mapred:mapreduce.map.maxattempts=10
mapred:mapreduce.reduce.maxattempts=10
spark:spark.task.maxFailures=10
spark:spark.stage.maxConsecutiveAttempts=10

I valori predefiniti per reimpostare i contatori dei tentativi (utili per job Spark Streaming a lunga esecuzione):

spark:spark.yarn.am.attemptFailuresValidityInterval=1h
spark:spark.yarn.executor.failuresValidityInterval=1h

Per impostazione predefinita, il meccanismo di allocazione dinamica di Spark ad avvio lento deve essere di dimensioni ridotte:

spark:spark.executor.instances=2

Domande frequenti

La scalabilità automatica può essere abilitata su cluster ad alta disponibilità e cluster a nodo singolo?

La scalabilità automatica può essere abilitata sui cluster ad alta disponibilità, ma non sui cluster a nodo singolo (i cluster a nodo singolo non supportano il ridimensionamento).

Puoi ridimensionare manualmente un cluster con scalabilità automatica?

Sì. Puoi decidere di ridimensionare manualmente un cluster come misura temporanea durante l'ottimizzazione di un criterio di scalabilità automatica. Tuttavia, queste modifiche avranno un effetto solo temporaneo e, in seguito, la scalabilità automatica ridurrà il livello del cluster.

Anziché ridimensionare manualmente un cluster a scalabilità automatica, valuta la possibilità di:

Aggiornamento del criterio di scalabilità automatica in corso... Eventuali modifiche apportate al criterio di scalabilità automatica interesseranno tutti i cluster che attualmente utilizzano il criterio (vedi Utilizzo dei criteri multi-cluster).

Scollegamento del criterio e scalabilità manuale del cluster alla dimensione preferita.

Assistenza per Dataproc.

Qual è la differenza tra Dataproc e la scalabilità automatica di Dataflow?

Consulta Scalabilità automatica orizzontale di Dataflow e Scalabilità automatica verticale di Dataflow Prime.

Il team di sviluppo di Dataproc può reimpostare lo stato di un cluster da ERROR a RUNNING?

In generale no. Questa operazione richiede uno sforzo manuale per verificare se sia possibile ripristinare in sicurezza semplicemente lo stato del cluster e spesso un cluster non può essere reimpostato senza altri passaggi manuali, come il riavvio del Namenode di HDFS.

Dataproc imposta lo stato di un cluster su ERROR se non è in grado di determinare lo stato di un cluster dopo un'operazione non riuscita. I cluster in ERROR non vengono scalati automaticamente o non eseguono job. Le cause più comuni sono:

  1. Errori restituiti dall'API Compute Engine, spesso durante le interruzioni di Compute Engine.

  2. Stato di danneggiamento di HDFS a causa di bug nella sua dismissione.

  3. Errori dell'API Dataproc Control, ad esempio "Lease attività scaduto"

Elimina e ricrea i cluster il cui stato è ERROR.

Quando la scalabilità automatica annulla un'operazione di scale down?

L'immagine seguente è un'illustrazione che mostra quando la scalabilità automatica annullerà un'operazione di scale down (vedi anche Come funziona la scalabilità automatica).

dataproc-autoscaling-cancellation-example

Note:

  • La scalabilità automatica del cluster è abilitata solo in base alle metriche di memoria YARN (impostazione predefinita).
  • I valori T1-T9 rappresentano gli intervalli di attesa quando il gestore della scalabilità automatica valuta il numero di worker (le tempistiche degli eventi sono state semplificate).
  • Le barre in pila rappresentano il numero di worker YARN dei cluster attivi, in dismissione e dismessi.
  • Il numero di worker consigliato dal gestore della scalabilità automatica (linea nera) si basa sulle metriche di memoria YARN, sul conteggio dei worker attivi YARN e sulle impostazioni dei criteri di scalabilità automatica (vedi Come funziona la scalabilità automatica).
  • L'area di sfondo rossa indica il periodo in cui è in corso l'operazione di scale down.
  • L'area di sfondo giallo indica il periodo in cui l'operazione di scale down viene annullata.
  • L'area di sfondo verde indica il periodo dell'operazione di scaleup.

Le seguenti operazioni si verificano nei seguenti orari:

  • T1: il gestore della scalabilità automatica avvia un'operazione di scale down gestito automaticamente per fare lo scale down di circa la metà degli attuali worker del cluster.

  • T2: il gestore della scalabilità automatica continua a monitorare le metriche del cluster. Lo scale down consigliato non viene modificato e l'operazione di scale down continua. Alcuni lavoratori sono stati dismessi, mentre altri sono in fase di ritiro (Dataproc eliminerà i lavoratori dismessi).

  • T3: Il gestore della scalabilità automatica calcola che il numero di worker può essere fare lo scale down ulteriormente, probabilmente a causa della disponibilità di memoria YARN aggiuntiva. Tuttavia, poiché il numero di worker attivi più la modifica consigliata nel numero di worker non è uguale o maggiore del numero di worker attivi più in dismissione, i criteri per l'annullamento dello scale down non vengono soddisfatti e il gestore della scalabilità automatica non annulla l'operazione di scale down.

  • T4: YARN segnala un aumento della memoria in attesa. Tuttavia, il gestore della scalabilità automatica non modifica il suggerimento sul conteggio dei worker. Come nel T3, i criteri di annullamento dello scale down rimangono non soddisfatti e il gestore della scalabilità automatica non annulla l'operazione di scale down.

  • T5: la memoria YARN in attesa aumenta, mentre il cambiamento del numero di worker consigliato dal gestore della scalabilità automatica aumenta. Tuttavia, poiché il numero di worker attivi più la modifica consigliata nel numero di worker è inferiore al numero di worker attivi più il numero di worker in fase di ritiro, i criteri di annullamento rimangono non soddisfatti e l'operazione di scale down non viene annullata.

  • T6: La memoria YARN in attesa aumenta ulteriormente. Il numero di worker attivi più la variazione del numero di worker consigliato dal gestore della scalabilità automatica è ora superiore al numero di lavoratori attivi più il numero di worker in dismissione. I criteri di annullamento vengono soddisfatti e il gestore della scalabilità automatica annulla l'operazione di scale down.

  • T7: il gestore della scalabilità automatica è in attesa del completamento dell'operazione di scale down. Il gestore della scalabilità automatica non valuta né consiglia una modifica del numero di worker durante questo intervallo.

  • T8: l'annullamento dell'operazione di scale down viene completata. I worker dismissione vengono aggiunti al cluster e diventano attivi. Il gestore della scalabilità automatica rileva il completamento dell'annullamento dell'operazione di scale down e attende il periodo di valutazione successivo (T9) per calcolare il numero consigliato di worker.

  • T9: non ci sono operazioni attive all'ora T9. In base al criterio del gestore della scalabilità automatica e alle metriche YARN, il gestore della scalabilità automatica consiglia un'operazione di scale up.