Scalabilità automatica dei cluster Dataproc

Che cos'è la scalabilità automatica?

È difficile stimare il numero "giusto" di worker (nodi) del cluster per un carico di lavoro e spesso le dimensioni di un singolo cluster per un'intera pipeline non sono ideali. Il scaling del cluster avviato dall'utente risolve parzialmente questo problema, ma richiede il monitoraggio dell'utilizzo del cluster e l'intervento manuale.

L'API AutoscalingPolicies di Dataproc offre un meccanismo per automatizzare la gestione delle risorse del cluster e consente la scalabilità automatica delle VM worker del cluster. Un Autoscaling Policy è una configurazione riutilizzabile che descrive in che modo devono essere scalati i worker del cluster che utilizzano il criterio di scalabilità automatica. Definisce i limiti, la frequenza e l'aggressività della scalabilità per fornire un controllo granulare sulle risorse del cluster nel corso del suo ciclo di vita.

Quando utilizzare la scalabilità automatica

Utilizza la scalabilità automatica:

su cluster che archiviano i dati in servizi esterni, come Cloud Storage o BigQuery

su cluster che elaborano molti job

per eseguire lo scaling dei cluster con un singolo job

con la modalità flessibilità avanzata per i job batch Spark

La scalabilità automatica non è consigliata con/per:

  • HDFS: la scalabilità automatica non è progettata per la scalabilità di HDFS on-cluster perché:

    1. L'utilizzo di HDFS non è un indicatore per la scalabilità automatica.
    2. I dati HDFS sono ospitati solo sui worker principali. Il numero di worker primari deve essere sufficiente per ospitare tutti i dati HDFS.
    3. Il ritiro dei DataNode HDFS può ritardare la rimozione dei worker. I datanode copiano i blocchi HDFS in altri datanode prima che un worker venga rimosso. A seconda delle dimensioni dei dati e del fattore di replica, questa procedura può richiedere diverse ore.
  • Etichette dei nodi YARN: la scalabilità automatica non supporta le etichette dei nodi YARN, né la proprietà dataproc:am.primary_only a causa di YARN-9088. YARN registra in modo errato le metriche del cluster quando vengono utilizzate le etichette dei nodi.

  • Spark Structured Streaming: la scalabilità automatica non supporta Spark Structured Streaming (consulta Scalabilità automatica e Spark Structured Streaming).

  • Cluster inattivi: la scalabilità automatica non è consigliata per ridurre le dimensioni di un cluster alle dimensioni minime quando il cluster è inattivo. Poiché la creazione di un nuovo cluster è veloce quanto il ridimensionamento di uno esistente, ti consigliamo di eliminare i cluster inattivi e di ricrearli. I seguenti strumenti supportano questo modello "effimero":

    Utilizza i workflow di Dataproc per pianificare un insieme di job su un cluster dedicato ed elimina il cluster al termine dei job. Per un'orchestrazione più avanzata, utilizza Cloud Composer, che si basa su Apache Airflow.

    Per i cluster che elaborano query ad hoc o carichi di lavoro pianificati esternamente, utilizza l'eliminazione pianificata del cluster per eliminare il cluster dopo un periodo di inattività o una durata specificati oppure in un momento specifico.

  • Carichi di lavoro di dimensioni diverse: quando vengono eseguiti job di piccole e grandi dimensioni su un cluster, il ridimensionamento con disattivazione graduale attenderà il completamento dei job di grandi dimensioni. Di conseguenza, un job che richiede molto tempo ritarderà la scalabilità automatica delle risorse per i job più piccoli in esecuzione sul cluster fino al termine del job che richiede molto tempo. Per evitare questo risultato, raggruppa job di dimensioni simili su un cluster e isola ogni job di lunga durata su un cluster separato.

Attivazione della scalabilità automatica

Per attivare la scalabilità automatica su un cluster:

  1. Crea un criterio di scalabilità automatica.

  2. In uno dei seguenti modi:

    1. Crea un cluster con la scalabilità automatica oppure
    2. Attiva la scalabilità automatica su un cluster esistente.

Crea un criterio di scalabilità automatica

Comando g-cloud

Puoi utilizzare il comando gcloud dataproc autoscaling-policies import per creare un criterio di scalabilità automatica. Legge un file YAML locale che definisce un criterio di scalabilità automatica. Il formato e i contenuti del file devono corrispondere agli oggetti e ai campi di configurazione definiti dall'API REST autoscalingPolicies.

L'esempio YAML seguente definisce un criterio che specifica tutti i campi obbligatori. Fornisce inoltre i valori minInstances e maxInstances per i worker principali, il valore maxInstances per i worker secondari (prerilasciabili) e specifica un cooldownPeriod di 4 minuti (il valore predefinito è di 2 minuti). workerConfig configura i worker principali. In questo esempio, minInstances e maxInstances sono impostati sullo stesso valore per evitare di scalare i worker principali.

workerConfig:
  minInstances: 10
  maxInstances: 10
secondaryWorkerConfig:
  maxInstances: 50
basicAlgorithm:
  cooldownPeriod: 4m
  yarnConfig:
    scaleUpFactor: 0.05
    scaleDownFactor: 1.0
    gracefulDecommissionTimeout: 1h

Ecco un altro esempio YAML che specifica tutti i campi facoltativi e obbligatori delle norme di scalabilità automatica.

workerConfig:
  minInstances: 10
  maxInstances: 10
  weight: 1
secondaryWorkerConfig:
  minInstances: 0
  maxInstances: 100
  weight: 1
basicAlgorithm:
  cooldownPeriod: 2m
  yarnConfig:
    scaleUpFactor: 0.05
    scaleDownFactor: 1.0
    scaleUpMinWorkerFraction: 0.0
    scaleDownMinWorkerFraction: 0.0
    gracefulDecommissionTimeout: 1h

Esegui il seguente comando gcloud da un terminale locale o in Cloud Shell per creare il criterio di scalabilità automatica. Specifica un nome per il criterio. Questo nome diventerà il criterio id, che potrai utilizzare nei comandi gcloud successivi per fare riferimento al criterio. Utilizza il flag --source per specificare il percorso e il nome del file locale del file YAML del criterio di scalabilità automatica da importare.

gcloud dataproc autoscaling-policies import policy-name \
    --source=filepath/filename.yaml \
    --region=region

API REST

Crea un criterio di scalabilità automatica definendo un AutoscalingPolicy all'interno di una richiesta autoscalingPolicies.create.

Console

Per creare un criterio di scalabilità automatica, selezionare CREA CRITERIO dalla pagina Dataproc Criteri di scalabilità automatica utilizzando la console Google Cloud. Nella pagina Crea criterio puoi selezionare un riquadro di consigli sui criteri per compilare i campi del criterio di scalabilità automatica per un tipo di job o un obiettivo di scalabilità specifico.

Crea un cluster con la scalabilità automatica

Dopo aver creato un criterio di scalabilità automatica, crea un cluster che lo utilizzi. Il cluster deve trovarsi nella stessa regione del criterio di scalabilità automatica.

Comando g-cloud

Esegui il seguente comando gcloud da un terminale locale o in Cloud Shell per creare un cluster con il ridimensionamento automatico. Assegna un nome al cluster e utilizza il flag --autoscaling-policy per specificare policy id (il nome del criterio specificato quando lo hai creato) o il criterio resource URI (resource name) (consulta i campi AutoscalingPolicy id e name).

gcloud dataproc clusters create cluster-name \
    --autoscaling-policy=policy id or resource URI \
    --region=region

API REST

Crea un cluster con la scalabilità automatica includendo un AutoscalingConfig all'interno di una richiesta clusters.create.

Console

Puoi selezionare un criterio di scalabilità automatica esistente da applicare a un nuovo cluster dalla sezione Criterio di scalabilità automatica del riquadro Configura cluster nella pagina Dataproc Crea un cluster della console Google Cloud.

Attivare la scalabilità automatica in un cluster esistente

Dopo aver creato un criterio di scalabilità automatica, puoi attivarlo su un cluster esistente nella stessa regione.

Comando g-cloud

Esegui il seguente comando gcloud da un terminale locale o in Cloud Shell per abilitare un criterio di scalabilità automatica in un cluster esistente. Fornisci il nome del cluster e utilizza il flag --autoscaling-policy per specificare policy id (il nome del criterio specificato quando hai creato il criterio) o il criterio resource URI (resource name) (consulta i campi AutoscalingPolicy id e name).

gcloud dataproc clusters update cluster-name \
    --autoscaling-policy=policy id or resource URI \
    --region=region

API REST

Per attivare un criterio di scalabilità automatica su un cluster esistente, imposta AutoscalingConfig.policyUri del criterio in updateMask di una richiesta clusters.patch.

Console

Al momento, l'abilitazione di un criterio di scalabilità automatica su un cluster esistente non è supportata nella console Google Cloud.

Utilizzo dei criteri multi-cluster

  • Un criterio di scalabilità automatica definisce il comportamento di ridimensionamento che può essere applicato a più cluster. Un criterio di scalabilità automatica è più adatto a più cluster quando i cluster condividono carichi di lavoro simili o eseguono job con pattern di utilizzo delle risorse simili.

  • Puoi aggiornare un criterio utilizzato da più cluster. Gli aggiornamenti influiscono immediatamente sul comportamento della scalabilità automatica per tutti i cluster che utilizzano il criterio (vedi autoscalingPolicies.update). Se non vuoi che un aggiornamento del criterio venga applicato a un cluster che lo utilizza, disattiva la scalabilità automatica sul cluster prima di aggiornare il criterio.

Comando g-cloud

Esegui il seguente comando gcloud da un terminale locale o in Cloud Shell per disattivare l'autoscaling in un cluster.

gcloud dataproc clusters update cluster-name --disable-autoscaling \
    --region=region

API REST

Per disattivare la scalabilità automatica su un cluster, imposta AutoscalingConfig.policyUri sulla stringa vuota e imposta update_mask=config.autoscaling_config.policy_uri in una richiesta clusters.patch.

Console

Al momento, la disattivazione della scalabilità automatica su un cluster non è supportata nella console Google Cloud.

Come funziona la scalabilità automatica

La scalabilità automatica controlla le metriche YARN Hadoop del cluster man mano che ogni periodo di "tempo di attesa" termina per determinare se eseguire lo scaling del cluster e, in caso affermativo, la portata dell'aggiornamento.

  1. Il valore della metrica delle risorse YARN in attesa (memoria in attesa o core in attesa) determina se eseguire l'aumento o la riduzione di scala. Un valore maggiore di 0 indica che i job YARN sono in attesa di risorse e che potrebbe essere necessario aumentare le risorse. Un valore 0 indica che YARN ha risorse sufficienti per cui potrebbe non essere necessario un ridimensionamento o altre modifiche.

    Se la risorsa in attesa è maggiore di 0:

    $estimated\_worker\_count =$

    \[ \Biggl \lceil AVERAGE\ during\ cooldown\ period\Big(\frac{Pending + Available + Allocated + Reserved}{Resource\ per\ worker}\Big)\Biggr \rceil \]

    Se la risorsa in attesa è 0:

    $estimated\_worker\_count =$

    \[ \Biggl \lceil AVERAGE\ during\ cooldown\ period\Big(\frac{Allocated + Reserved}{Resource\ per\ worker}\Big)\Biggr \rceil \]

    Per impostazione predefinita, il gestore della scalabilità automatica monitora la risorsa di memoria YARN. Se attivi la scalabilità automatica basata su core, vengono monitorati sia la memoria YARN sia i core YARN:estimated_worker_count viene valutato separatamente per memoria e core e viene selezionato il numero di worker maggiore risultante.

    $estimated\_worker\_count =$

    \[ max(estimated\_worker\_count\_by\_memory,\ estimated\_worker\_count\_by\_cores) \]

    \[ estimated\ \Delta worker = estimated\_worker\_count - current\_worker\_count \]

  2. Data la variazione stimata necessaria al numero di worker, la scalabilità automatica utilizza un valore scaleUpFactor o scaleDownFactor per calcolare la variazione effettiva al numero di worker:

    if estimated Δworkers > 0:
      actual Δworkers = ROUND_UP(estimated Δworkers * scaleUpFactor)
      # examples:
      # ROUND_UP(estimated Δworkers=5 * scaleUpFactor=0.5) = 3
      # ROUND_UP(estimated Δworkers=0.8 * scaleUpFactor=0.5) = 1
    else:
      actual Δworkers = ROUND_DOWN(estimated Δworkers * scaleDownFactor)
      # examples:
      # ROUND_DOWN(estimated Δworkers=-5 * scaleDownFactor=0.5) = -2
      # ROUND_DOWN(estimated Δworkers=-0.8 * scaleDownFactor=0.5) = 0
      # ROUND_DOWN(estimated Δworkers=-1.5 * scaleDownFactor=0.5) = 0
    
    Un valore scaleUpFactor o scaleDownFactor pari a 1,0 indica che la scalabilità automatica verrà eseguita in modo che la risorsa in attesa/disponibile sia pari a 0 (utilizzo perfetto).

  3. Una volta calcolata la modifica del numero di worker, scaleUpMinWorkerFraction e scaleDownMinWorkerFraction fungono da soglie per determinare se la scalabilità automatica eseguirà la scalabilità del cluster. Una piccola frazione indica che la scalabilità automatica deve essere eseguita anche se Δworkers è piccolo. Una frazione più grande indica che la scalatura deve avvenire solo quando Δworkers è grande.

    IF (Δworkers >  scaleUpMinWorkerFraction * current_worker_count) then scale up
    
    OPPURE
    IF (abs(Δworkers) >  scaleDownMinWorkerFraction * current_worker_count),
    THEN scale down.
    

  4. Se il numero di worker da scalare è sufficientemente elevato da attivare la scalabilità, la scalabilità automatica utilizza i limiti minInstances maxInstances di workerConfig e secondaryWorkerConfig e weight (rapporto tra worker principali e secondari) per determinare come suddividere il numero di worker nei gruppi di istanza worker principali e secondari. Il risultato di questi calcoli è la modifica finale dell'autoscaling del cluster per il periodo di scaling.

  5. Le richieste di riduzione della scalabilità automatica verranno annullate nei cluster creati con versioni delle immagini successive a 2.0.57 e 2.1.5 se:

    1. è in corso uno scale down con un valore del timeout per la rimozione controllata diverso da zero e
    2. il numero di worker YARN ATTIVI ("worker attivi") più la variazione del numero totale di worker consigliati dal gestore della scalabilità automatica (Δworkers) sia uguale o superiore a DECOMMISSIONING worker YARN ("worker di disattivazione"), come mostrato nella seguente formula:

      IF (active workers + Δworkers ≥ active workers + decommissioning workers)
      THEN cancel the scaledown operation
      

    Per un esempio di annullamento dello scaledown, consulta Quando la scalabilità automatica annulla un'operazione di scaledown?.

Suggerimenti per la configurazione della scalabilità automatica

Evita di scalare i worker principali

I worker principali eseguono i datanode HDFS, mentre i worker secondari sono solo di calcolo. L'utilizzo di worker secondari ti consente di scalare in modo efficiente le risorse di calcolo senza dover eseguire il provisioning dello spazio di archiviazione, con conseguente aumento delle funzionalità di scalabilità. I Namenode HDFS possono avere più race condition che causano la corruzione di HDFS, per cui il ritiro si blocca indefinitamente. Per evitare questo problema, evita di scalare i worker principali. Ad esempio: workerConfig: minInstances: 10 maxInstances: 10 secondaryWorkerConfig: minInstances: 0 maxInstances: 100

Devi apportare alcune modifiche al comando di creazione del cluster:

  1. Imposta --num-workers=10 in modo che corrisponda alle dimensioni del gruppo di worker principale del criterio di scalabilità automatica.
  2. Imposta --secondary-worker-type=non-preemptible per configurare i worker secondari come non prerilasciabili. (a meno che non siano richieste VM preemptible).
  3. Copia la configurazione hardware dai worker principali ai worker secondari. Ad esempio, imposta --secondary-worker-boot-disk-size=1000GB in modo che corrisponda a --worker-boot-disk-size=1000GB.

Utilizzare la modalità di flessibilità avanzata per i job batch Spark

Utilizza la modalità di flessibilità avanzata (EFM) con la scalabilità automatica per:

Consente di ridurre più rapidamente il cluster mentre i job sono in esecuzione

Evitare interruzioni dei job in esecuzione a causa del ridimensionamento del cluster

ridurre al minimo le interruzioni dei job in esecuzione a causa della preemption dei worker secondari prerilasciabili

Con la modalità EFM attivata, il timeout di disattivazione graduale di un criterio di scalabilità automatica deve essere impostato su 0s. Il criterio di scalabilità automatica deve eseguire la scalabilità automatica solo dei worker secondari.

Scegliere un timeout per la rimozione controllata

La scalabilità automatica supporta il ritiro graduale di YARN quando vengono rimossi i nodi da un cluster. Il ritiro graduale consente alle applicazioni di completare l'ordinamento casuale dei dati tra le fasi per evitare di reimpostare l'avanzamento del job. Il ⁠timeout di ritiro graduale fornito in un criterio di scalabilità automatica è il limite superiore della durata per cui YARN attenderà le applicazioni in esecuzione (application in esecuzione all'avvio del ritiro) prima di rimuovere i nodi.

Quando un processo non viene completato entro il periodo di tempo di rimozione controllata specificato, il nodo worker viene arrestato forzatamente, causando potenzialmente la perdita di dati o l'interruzione del servizio. Per evitare questa possibilità, imposta il timeout per il ritiro gestito automaticamente su un valore maggiore del job più lungo che verrà elaborato dal cluster. Ad esempio, se prevedi che il job più lungo duri un'ora, imposta il timeout su almeno un'ora (1h).

Valuta la possibilità di eseguire la migrazione dei job che richiedono più di un'ora ai propri cluster temporanei per evitare di bloccare il rimozione controllata.

Configurazione di scaleUpFactor in corso…

scaleUpFactor controlla l'aggressività con cui il gestore della scalabilità automatica esegue lo scale up di un cluster. Specifica un numero compreso tra 0.0 e 1.0 per impostare il valore frazionario della risorsa in attesa di YARN che causa l'aggiunta di nodi.

Ad esempio, se ci sono 100 contenitori in attesa che richiedono ciascuno 512 MB, sono disponibili 50 GB di memoria YARN in attesa. Se scaleUpFactor è 0.5, il gestore della scalabilità automatica aggiungerà nodi sufficienti per aggiungere 25 GB di memoria YARN. Allo stesso modo, se è 0.1, il gestore della scalabilità aggiungerà nodi sufficienti per 5 GB. Tieni presente che questi valori corrispondono alla memoria YARN, non alla memoria totale fisicamente disponibile su una VM.

Un buon punto di partenza è 0.05 per i job MapReduce e Spark con l'allocazione dinamica abilitata. Per i job Spark con un numero fisso di esecuzioni e i job Tez, utilizza 1.0. Un valore scaleUpFactor pari a 1.0 indica che la scalabilità automatica verrà regolata in modo che la risorsa in attesa/disponibile sia pari a 0 (utilizzo perfetto).

Configurazione di scaleDownFactor in corso…

scaleDownFactor controlla l'aggressività con cui il gestore della scalabilità automatica riduce un cluster. Specifica un numero compreso tra 0.0 e 1.0 per impostare il valore frazionario della risorsa YARN disponibile che causa la rimozione del nodo.

Lascia questo valore su 1.0 per la maggior parte dei cluster con più job che devono essere scalati spesso. A causa della rimozione controllata, le operazioni di riduzione sono molto più lente rispetto a quelle di aumento. L'impostazione scaleDownFactor=1.0 imposta un tasso di riduzione aggressivo, che riduce al minimo il numero di operazioni di riduzione necessarie per ottenere le dimensioni appropriate del cluster.

Per i cluster che richiedono una maggiore stabilità, imposta un valore scaleDownFactor inferiore per un tasso di riduzione più lento.

Imposta questo valore su 0.0 per impedire il ridimensionamento del cluster, ad esempio quando utilizzi cluster temporanei o con un singolo job.

Configurazione di scaleUpMinWorkerFraction e scaleDownMinWorkerFraction

scaleUpMinWorkerFraction e scaleDownMinWorkerFraction vengono utilizzati con scaleUpFactor o scaleDownFactor e hanno valori predefiniti di 0.0. Rappresentano le soglie a cui il gestore della scalabilità automatica aumenterà o fare lo scale down le dimensioni del cluster: l'aumento o la diminuzione del valore frazionario minimo delle dimensioni del cluster necessario per emettere richieste di scale up o scale down.

Esempi: il gestore della scalabilità automatica non invierà una richiesta di aggiornamento per aggiungere 5 worker a un cluster di 100 nodi, a meno che scaleUpMinWorkerFraction non sia inferiore o uguale a 0.05 (5%). Se impostato su 0.1, il gestore della scalabilità automatica non emette la richiesta di scale up del cluster. Analogamente, se scaleDownMinWorkerFraction è 0.05, il gestore della scalabilità non invierà una richiesta di aggiornamento per rimuovere i nodi da un cluster di 100 nodi, a meno che non debbano essere rimossi almeno 5 nodi.

Il valore predefinito 0.0 indica che non è impostata alcuna soglia.

È vivamente consigliato impostare un valore più elevato per scaleDownMinWorkerFractionthresholds su cluster di grandi dimensioni (> 100 nodi) per evitare operazioni di ridimensionamento piccole e non necessarie.

Scegliere un periodo di attesa

cooldownPeriod imposta un periodo di tempo durante il quale lo scalare automatico non invierà richieste di modifica delle dimensioni del cluster. Puoi utilizzarlo per limitare la frequenza delle modifiche del gestore della scalabilità automatica alle dimensioni del cluster.

Il valore minimo e predefinito cooldownPeriod è di due minuti. Se in un criterio è impostato un valore cooldownPeriod più breve, le modifiche al carico di lavoro influiranno più rapidamente sulle dimensioni del cluster, ma i cluster potrebbero essere scalati inutilmente. La best practice consiste nell'impostare scaleUpMinWorkerFraction e scaleDownMinWorkerFraction di un criterio su un valore diverso da zero quando si utilizza un cooldownPeriod più breve. In questo modo, il cluster viene scalato verso l'alto o verso il basso solo quando la variazione dell'utilizzo delle risorse è sufficiente a giustificare un aggiornamento del cluster.

Se il tuo carico di lavoro è sensibile alle modifiche delle dimensioni del cluster, puoi aumentare il periodo di attesa. Ad esempio, se stai eseguendo un job di elaborazione collettiva, puoi impostare il periodo di attesa su almeno 10 minuti. Prova periodi di attesa diversi per trovare il valore più adatto al tuo carico di lavoro.

Limiti del numero di worker e pesi del gruppo

Ogni gruppo di worker ha minInstances e maxInstances che configurano un limite massimo per le dimensioni di ciascun gruppo.

Ogni gruppo ha anche un parametro chiamato weight che configura il bilanciamento target tra i due gruppi. Tieni presente che questo parametro è solo un suggerimento e che se un gruppo raggiunge le dimensioni minime o massime, i nodi verranno aggiunti o rimossi solo dall'altro gruppo. Pertanto, weight può quasi sempre essere lasciato al valore predefinito 1.

Abilita la scalabilità automatica basata su core

Per impostazione predefinita, YARN utilizza le metriche della memoria per l'allocazione delle risorse. Per le applicazioni che richiedono un'elevata intensità di risorse della CPU, una best practice consiste nel configurare YARN in modo da utilizzare il Calcolatore delle risorse dominanti. Per farlo, imposta la seguente proprietà quando crei un cluster:

capacity-scheduler:yarn.scheduler.capacity.resource-calculator=org.apache.hadoop.yarn.util.resource.DominantResourceCalculator

Metriche e log di scalabilità automatica

Le seguenti risorse e strumenti possono aiutarti a monitorare le operazioni di scalabilità automatica e il loro effetto sul cluster e sui relativi job.

Cloud Monitoring

Utilizza Cloud Monitoring per:

  • Visualizza le metriche utilizzate dalla scalabilità automatica
  • Visualizza il numero di gestori di nodi nel cluster
  • Scopri perché la scalabilità automatica ha o meno scalato il tuo cluster autoscaling-stackdriver1 autoscaling-stackdriver2 autoscaling-stackdriver3

Cloud Logging

Utilizza Cloud Logging per visualizzare i log di Cloud Dataproc Autoscaler.

1) Trova i log per il tuo cluster.

autoscaling-logs-for-cluster

2) Seleziona dataproc.googleapis.com/autoscaler.

autoscaling-log-file

3) Espandi i messaggi di log per visualizzare il campo status. I log sono in formato JSON, un formato leggibile dal computer.

autoscaling-three-logs autoscaling-update-operation

4) Espandi il messaggio di log per visualizzare i consigli per la scalabilità, le metriche utilizzate per le decisioni di scalabilità, le dimensioni del cluster originale e le dimensioni del nuovo cluster di destinazione.

autoscaling-recommendation-message

Informazioni generali: scalabilità automatica con Apache Hadoop e Apache Spark

Le sezioni seguenti illustrano in che modo la scalabilità automatica è (o meno) interoperabile con Hadoop YARN e Hadoop Mapreduce, nonché con Apache Spark, Spark Streaming e Spark Structured Streaming.

Metriche YARN Hadoop

La scalabilità automatica si basa sulle seguenti metriche YARN Hadoop:

  1. Allocated resource si riferisce alla risorsa YARN totale utilizzata dai container in esecuzione nell'intero cluster. Se sono in esecuzione sei contenitori che possono utilizzare fino a 1 unità di risorsa, sono disponibili sei risorse allocate.

  2. Available resource è la risorsa YARN nel cluster non utilizzata dai contenitori allocati. Se sono disponibili 10 unità di risorse per tutti i gestori di nodi e 6 di queste sono allocate, ci sono 4 risorse disponibili. Se nel cluster sono disponibili risorse (non utilizzate), la scalabilità automatica potrebbe rimuovere i worker dal cluster.

  3. Pending resource è la somma delle richieste di risorse YARN per i container in attesa. I container in attesa sono in attesa di spazio per l'esecuzione in YARN. La risorsa in attesa è diversa da zero solo se la risorsa disponibile è pari a zero o troppo piccola per essere allocata al contenitore successivo. Se sono presenti contenitori in attesa, la scalabilità automatica potrebbe aggiungere worker al cluster.

Puoi visualizzare queste metriche in Cloud Monitoring. Per impostazione predefinita, la memoria YARN sarà pari a 0,8 * la memoria totale del cluster, con la memoria rimanente riservata ad altri demoni e all'utilizzo del sistema operativo, come la cache di pagine. Puoi ignorare il valore predefinito con l'impostazione di configurazione YARN "yarn.nodemanager.resource.memory-mb" (vedi Apache Hadoop YARN, HDFS, Spark e proprietà correlate).

Scalabilità automatica e Hadoop MapReduce

MapReduce esegue ogni attività di mappatura e riduzione come contenitore YARN separato. Quando inizia un job, MapReduce invia richieste di contenitori per ogni attività di mappatura, con un conseguente picco elevato della memoria YARN in attesa. Al termine delle attività di mappatura, la memoria in attesa diminuisce.

Quando mapreduce.job.reduce.slowstart.completedmaps sono stati completati (95% per impostazione predefinita su Dataproc), MapReduce mette in coda le richieste dei contenitori per tutti i riduttore, con un altro picco di memoria in attesa.

A meno che le attività di mappatura e riduzione non richiedano diversi minuti o più, non impostare un valore elevato per la scalabilità automatica scaleUpFactor. L'aggiunta di worker al cluster richiede almeno 1,5 minuti, quindi assicurati che sia presente un lavoro in sospeso sufficiente per utilizzare il nuovo worker per diversi minuti. Un buon punto di partenza è impostare scaleUpFactor su 0,05 (5%) o 0,1 (10%) della memoria in attesa.

Scalabilità automatica e Spark

Spark aggiunge un ulteriore livello di pianificazione a YARN. Nello specifico, la allocazione dinamica di Spark Core invia richieste a YARN per i container per eseguire gli executor Spark, quindi pianifica le attività Spark sui thread di questi executor. I cluster Dataproc attivano l'allocazione dinamica per impostazione predefinita, pertanto gli esecutori vengono aggiunti e rimossi in base alle esigenze.

Spark richiede sempre i container a YARN, ma senza allocazione dinamica richiede i container solo all'inizio del job. Con l'allocazione dinamica,rimuoverà i container o ne richiederà di nuovi, se necessario.

Spark inizia con un numero ridotto di executor (2 nei cluster con scalabilità automatica) e continua a raddoppiare il numero di executor finché ci sono attività in coda. In questo modo, la memoria in attesa viene uniformata (meno picchi di memoria in attesa). Per i job Spark, ti consigliamo di impostare la scalabilità automatica scaleUpFactor su un numero elevato, ad esempio 1,0 (100%).

Disattivare l'allocazione dinamica di Spark

Se esegui job Spark separati che non usufruiscono dell'allocazione dinamica di Spark, puoi disattivarla impostando spark.dynamicAllocation.enabled=false e spark.executor.instances. Puoi comunque utilizzare la scalabilità automatica per aumentare e diminuire i cluster durante l'esecuzione dei job Spark separati.

Job Spark con dati memorizzati nella cache

Imposta spark.dynamicAllocation.cachedExecutorIdleTimeout o annulla la memorizzazione nella cache dei set di dati quando non sono più necessari. Per impostazione predefinita, Spark non rimuove gli esecutori che hanno memorizzato nella cache i dati, il che impedirebbe il ridimensionamento del cluster.

Scalabilità automatica e Spark Streaming

  1. Poiché Spark Streaming ha una propria versione dell'allocazione dinamica che utilizza indicatori specifici per lo streaming per aggiungere e rimuovere gli esecutori, imposta spark.streaming.dynamicAllocation.enabled=true e disattiva l'allocazione dinamica di Spark Core impostando spark.dynamicAllocation.enabled=false.

  2. Non utilizzare il ritiro gestito automaticamente (scalabilità automatica gracefulDecommissionTimeout) con i job Spark Streaming. Per rimuovere in sicurezza i worker con la scalabilità automatica, configura il checkpointing per la tolleranza ai guasti.

In alternativa, per utilizzare Spark Streaming senza la scalabilità automatica:

  1. Disattiva l'allocazione dinamica di Spark Core (spark.dynamicAllocation.enabled=false) e
  2. Imposta il numero di esecutori (spark.executor.instances) per il tuo job. Consulta Proprietà cluster.

Scalabilità automatica e Spark Structured Streaming

La scalabilità automatica non è compatibile con Spark Structured Streaming poiché al momento Spark Structured Streaming non supporta l'allocazione dinamica (vedi SPARK-24815: Structured Streaming should support dynamic allocation).

Controllo della scalabilità automatica tramite partizionamento e parallelismo

Sebbene il parallelismo sia in genere impostato o determinato dalle risorse del cluster (ad esempio, il numero di blocchi HDFS viene controllato dal numero di attività), con la scalabilità automatica si applica il contrario: la scalabilità automatica imposta il numero di worker in base al parallelismo del job. Di seguito sono riportate alcune linee guida per aiutarti a impostare il parallelismo dei job:

  • Sebbene Dataproc imposti il numero predefinito di attività di riduzione MapReduce in base alle dimensioni iniziali del cluster, puoi impostare mapreduce.job.reduces per aumentare il parallelismo della fase di riduzione.
  • Il parallelismo di Spark SQL e Dataframe è determinato da spark.sql.shuffle.partitions, che per impostazione predefinita è 200.
  • Per impostazione predefinita, le funzioni RDD di Spark utilizzano spark.default.parallelism, che viene impostato sul numero di core sui nodi worker all'avvio del job. Tuttavia, tutte le funzioni RDD che creano rimescolamenti accettano un parametro per il numero di partizioni, che sostituisce spark.default.parallelism.

Assicurati che i dati siano partizionati in modo uniforme. Se lo skew delle chiavi è significativo, una o più attività potrebbero richiedere molto più tempo rispetto ad altre, con un conseguente utilizzo ridotto.

Impostazioni predefinite delle proprietà Spark/Hadoop per la scalabilità automatica

I cluster con scalabilità automatica hanno valori predefiniti per le proprietà del cluster che aiutano a evitare l'errore del job quando i worker principali vengono rimossi o i worker secondari vengono prelevati. Puoi eseguire l'override di questi valori predefiniti quando crei un cluster con la scalabilità automatica (consulta Proprietà del cluster).

Per impostazione predefinita, il numero massimo di nuovi tentativi per attività, master dell'applicazione e fasi viene aumentato:

yarn:yarn.resourcemanager.am.max-attempts=10
mapred:mapreduce.map.maxattempts=10
mapred:mapreduce.reduce.maxattempts=10
spark:spark.task.maxFailures=10
spark:spark.stage.maxConsecutiveAttempts=10

Per impostazione predefinita, reimposta i contatori di ripetizione (utile per i job Spark Streaming di lunga durata):

spark:spark.yarn.am.attemptFailuresValidityInterval=1h
spark:spark.yarn.executor.failuresValidityInterval=1h

Per impostazione predefinita, il meccanismo di allocazione dinamica con avvio lento di Spark inizia da una dimensione ridotta:

spark:spark.executor.instances=2

Domande frequenti

La scalabilità automatica può essere attivata nei cluster ad alta disponibilità e nei cluster a nodo singolo?

La scalabilità automatica può essere attivata su cluster ad alta disponibilità, ma non su cluster a nodo singolo (i cluster a nodo singolo non supportano il ridimensionamento).

È possibile ridimensionare manualmente un cluster con scalabilità automatica?

Sì. Puoi decidere di ridimensionare manualmente un cluster come misura temporanea quando ottimizzi un criterio di scalabilità automatica. Tuttavia, queste modifiche avranno solo un effetto temporaneo e la scalabilità automatica ridurrà il cluster.

Anziché ridimensionare manualmente un cluster con scalabilità automatica, valuta la possibilità di:

Aggiornamento del criterio di scalabilità automatica. Qualsiasi modifica apportata al criterio di scalabilità automatica influirà su tutti i cluster che attualmente lo utilizzano (consulta Utilizzo dei criteri multi-cluster).

Scollegare il criterio e ridimensionare manualmente il cluster in base alle dimensioni preferite.

Ricevere assistenza per Dataproc.

Qual è la differenza tra Dataproc e la scalabilità automatica di Dataflow?

Consulta Scalabilità automatica orizzontale di Dataflow e Scalabilità automatica verticale di Dataflow Prime.

Il team di sviluppo di Dataproc può reimpostare lo stato di un cluster da ERROR a RUNNING?

In generale, no. Questa operazione richiede un intervento manuale per verificare se è sicuro reimpostare lo stato del cluster e spesso un cluster non può essere reimpostato senza altri passaggi manuali, come il riavvio del Namenode di HDFS.

Dataproc imposta lo stato di un cluster su ERROR quando non riesce a determinarne lo stato dopo un'operazione non riuscita. I cluster in ERROR non sono sottoposti a scalabilità automatica o non eseguono job. Le cause più comuni sono:

  1. Errori restituiti dall'API Compute Engine, spesso durante le interruzioni di Compute Engine.

  2. HDFS entra in uno stato di corruzione a causa di bug nel ritiro di HDFS.

  3. Errori dell'API Dataproc Control, ad esempio "Task lease expired" (Contratto di locazione della task scaduto)

Elimina e ricrea i cluster il cui stato è ERROR.

Quando la scalabilità automatica annulla un'operazione di scale down?

La seguente illustrazione mostra quando la scalabilità automatica annulla un'operazione di riduzione (consulta anche Come funziona la scalabilità automatica).

dataproc-autoscaling-cancellation-example

Note:

  • La scalabilità automatica del cluster è attivata solo in base alle metriche della memoria YARN (impostazione predefinita).
  • T1-T9 rappresentano gli intervalli di attesa quando il gestore della scalabilità automatica valuta il numero di worker (la temporizzazione degli eventi è stata semplificata).
  • Le barre in pila rappresentano i conteggi dei worker YARN del cluster attivi, in fase di ritiro e ritirati.
  • Il numero consigliato di worker dell'autoscaler (linea nera) si basa sulle metriche della memoria YARN, sul conteggio dei worker attivi YARN e sulle impostazioni dei criteri di scalabilità automatica (consulta Come funziona la scalabilità automatica).
  • L'area di sfondo rossa indica il periodo in cui è in esecuzione l'operazione di riduzione.
  • L'area di sfondo gialla indica il periodo in cui l'operazione di riduzione viene annullata.
  • L'area di sfondo verde indica il periodo dell'operazione di scalabilità.

Le seguenti operazioni vengono eseguite nei seguenti momenti:

  • T1: il gestore della scalabilità automatica avvia un'operazione di scalabilità rimozione controllata per fare lo scale down di circa la metà dei worker del cluster attuali.

  • T2: il gestore della scalabilità automatica continua a monitorare le metriche del cluster. Il consiglio di riduzione non cambia e l'operazione di riduzione continua. Alcuni worker sono stati dismessi e altri sono in fase di dismissione (Dataproc eliminerà i worker dismessi).

  • T3: lo scalatore automatico calcola che il numero di worker può essere fare lo scale down ulteriormente, possibly due to additional YARN memory becoming available. Tuttavia, poiché il numero di worker attivi più la variazione consigliata nel numero di worker non è uguale o superiore al numero di worker attivi più quelli in fase di ritiro, i criteri per l'annullamento del ridimensionamento non sono soddisfatti e il gestore della scalabilità automatica non annulla l'operazione di ridimensionamento.

  • T4: YARN registra un aumento della memoria in attesa. Tuttavia, il gestore della scalabilità automatica non modifica il numero di worker consigliato. Come in T3, i criteri di annullamento dello scale down rimangono insoddisfatti e lo scalatore automatico non annulla l'operazione di scale down.

  • T5: la memoria in attesa di YARN aumenta e aumenta la variazione del numero di worker consigliati dall'agente di scalabilità automatica. Tuttavia, poiché il numero di worker attivi più la variazione consigliata del numero di worker è inferiore al numero di worker attivi più quelli in fase di ritiro, i criteri di annullamento rimangono insoddisfatti e l'operazione di riduzione non viene annullata.

  • T6: la memoria YARN in attesa aumenta ulteriormente. Il numero di worker attivi più la variazione del numero di worker consigliati dall'autoscaler ora è maggiore del numero di worker attivi più quelli in fase di ritiro. I criteri di annullamento vengono soddisfatti e il gestore della scalabilità automatica annulla l'operazione di scale down.

  • T7: il gestore della scalabilità automatica è in attesa del completamento dell'annullamento dell'operazione di riduzione. Il gestore della scalabilità automatica non valuta e consiglia una modifica del numero di worker durante questo intervallo.

  • T8: l'annullamento dell'operazione di riduzione viene completato. I worker di ritiro vengono aggiunti al cluster e diventano attivi. Il gestore della scalabilità automatica rileva il completamento dell'annullamento dell'operazione di riduzione e attende il successivo periodo di valutazione (T9) per calcolare il numero consigliato di worker.

  • T9: non sono presenti operazioni attive al momento T9. In base ai criteri del programma di scalabilità automatica e alle metriche YARN, il programma di scalabilità automatica consiglia un'operazione di scale up.