Suggerimenti per l'ottimizzazione del job Spark

Le seguenti sezioni forniscono suggerimenti per ottimizzare le applicazioni Dataproc Spark.

Usa cluster temporanei

Quando utilizzi il modello di cluster "temporaneo" di Dataproc, crei un cluster dedicato per ogni job e, al termine del job, lo elimini. Con il modello temporaneo, puoi gestire separatamente l'archiviazione e l'elaborazione, salvando i dati di input e di output del job in Cloud Storage o BigQuery e utilizzando il cluster solo per le risorse di calcolo e di archiviazione temporanea dei dati.

Insidie persistenti dei cluster

L'utilizzo di cluster temporanei con un solo job evita le seguenti insidie e potenziali problemi associati all'utilizzo di cluster "persistenti" condivisi e a lunga esecuzione:

  • Single point of failure: uno stato di errore del cluster condiviso può causare l'errore di tutti i job, bloccando un'intera pipeline di dati. Le indagini e il ripristino a seguito di un errore possono richiedere ore. Poiché i cluster temporanei mantengono solo stati nel cluster temporanei, quando si verifica un errore possono essere eliminati e ricreati rapidamente.
  • Difficoltà di manutenzione e migrazione degli stati dei cluster in HDFS, MySQL o file system locali
  • Contese di risorse tra i job che influiscono negativamente sugli SLO
  • Daemon di servizio che non rispondono a causa della pressione della memoria
  • Accumulo di log e file temporanei che possono superare la capacità del disco
  • Errore di upscaling dovuto a esaurimento della zona del cluster
  • Mancanza di supporto per le versioni obsolete delle immagini del cluster.

Vantaggi dei cluster temporanei

Per quanto riguarda l'aspetto positivo, i cluster temporanei ti consentono di:

  • Configura autorizzazioni IAM diverse per job diversi con account di servizio VM Dataproc differenti.
  • Ottimizza le configurazioni hardware e software di un cluster per ciascun job, modificando le configurazioni del cluster in base alle esigenze.
  • Esegui l'upgrade delle versioni delle immagini in nuovi cluster per ricevere le patch di sicurezza, le correzioni di bug e le ottimizzazioni più recenti.
  • Risolvi più rapidamente i problemi su un cluster isolato a un solo job.
  • Risparmia sui costi pagando solo per il tempo di esecuzione temporaneo del cluster, non per il tempo di inattività tra i job su un cluster condiviso.

Utilizza Spark SQL

L'API DataFrame Spark SQL rappresenta un'ottimizzazione significativa dell'API RDD. Se interagisci con il codice che utilizza gli RDD, potresti leggere i dati come DataFrame prima di passare un RDD nel codice. Nel codice Java o Scala, potresti utilizzare l'API per set di dati Spark SQL come soprainsieme di RDD e DataFrame.

Utilizzo di Apache Spark 3

Dataproc 2.0 installa Spark 3, che include le seguenti funzionalità e miglioramenti delle prestazioni:

  • Supporto GPU
  • Può leggere i file binari
  • Miglioramenti delle prestazioni
  • Eliminazione della partizione dinamica
  • Esecuzione adattiva delle query, che ottimizza i job Spark in tempo reale.

Utilizza l'allocazione dinamica

Apache Spark include una funzionalità di allocazione dinamica che scala il numero di esecutori Spark sui worker all'interno di un cluster. Questa funzionalità consente a un job di utilizzare l'intero cluster Dataproc anche in caso di scale up del cluster. Questa funzionalità è abilitata per impostazione predefinita su Dataproc (spark.dynamicAllocation.enabled è impostato su true). Consulta Allocazione dinamica di Spark per ulteriori informazioni.

Utilizzo della scalabilità automatica di Dataproc

La scalabilità automatica di Dataproc aggiunge e rimuove dinamicamente i worker di Dataproc da un cluster per garantire che i job di Spark abbiano le risorse necessarie per essere completate rapidamente.

Una best practice consiste nel configurare il criterio di scalabilità automatica per scalare solo i worker secondari.

Utilizzo della modalità di flessibilità avanzata di Dataproc

I cluster con VM prerilasciabili o un criterio di scalabilità automatica potrebbero ricevere eccezioni FetchFailed quando i worker vengono prerilasciati o rimossi prima che abbiano terminato l'elaborazione dei dati ai riduzioni. Questa eccezione può causare nuovi tentativi delle attività e tempi di completamento dei job più lunghi.

Suggerimento: utilizza la modalità di flessibilità avanzata di Dataproc, che non archivia i dati di shuffling intermedio sui worker secondari, in modo che i worker secondari possano essere prerilasciati o sottoposti a scale down in sicurezza.

Configura partizionamento e shuffling

Spark archivia i dati in partizioni temporanee sul cluster. Se l'applicazione raggruppa o unisce DataFrames, i dati vengono riordinati in nuove partizioni in base al raggruppamento e alla configurazione di basso livello.

Il partizionamento dei dati influisce in modo significativo sulle prestazioni dell'applicazione: un numero troppo basso di partizioni limita il parallelismo dei job e l'utilizzo delle risorse del cluster; un numero eccessivo di partizioni rallenta il job a causa dell'elaborazione e dello shuffling aggiuntivi delle partizioni.

Configurazione delle partizioni

Le seguenti proprietà regolano il numero e le dimensioni delle partizioni:

  • spark.sql.files.maxPartitionBytes: la dimensione massima delle partizioni durante la lettura dei dati da Cloud Storage. Il valore predefinito è 128 MB, sufficiente per la maggior parte delle applicazioni che elaborano meno di 100 TB.

  • spark.sql.shuffle.partitions: il numero di partizioni dopo aver eseguito lo shuffling. Il valore predefinito è 200, ed è appropriato per i cluster con meno di 100 vCPU totali. Suggerimento: impostalo su un valore pari a 3 volte il numero di vCPU nel tuo cluster.

  • spark.default.parallelism: il numero di partizioni restituite dopo l'esecuzione delle trasformazioni RDD che richiedono shuffling, come join, reduceByKey e parallelize. L'impostazione predefinita è il numero totale di vCPU nel cluster. Quando utilizzi i RDD nei job Spark, puoi impostare questo numero su 3 volte le vCPU

Limita il numero di file

Si verifica una perdita di prestazioni quando Spark legge un numero elevato di file di piccole dimensioni. Archivia i dati in file di dimensioni maggiori, ad esempio dimensioni file comprese tra 256 e 512 MB. Analogamente, limita il numero di file di output (per forzare lo shuffling, consulta Evitare shuffling superflui).

Configurare l'esecuzione delle query adattive (Spark 3)

L'esecuzione adattiva delle query (abilitata per impostazione predefinita nell'immagine Dataproc versione 2.0) offre miglioramenti delle prestazioni dei job Spark, tra cui:

Anche se le impostazioni di configurazione predefinite sono corrette per la maggior parte dei casi d'uso, l'impostazione di spark.sql.adaptive.advisoryPartitionSizeInBytes su spark.sqlfiles.maxPartitionBytes (valore predefinito: 128 MB) può essere utile.

Evita shuffle inutili

Spark consente agli utenti di attivare manualmente uno shuffling per ribilanciare i dati con la funzione repartition. Gli shuffling sono costosi, perciò occorre usare con cautela i dati di reshuffling. L'impostazione appropriata delle configurazioni delle partizioni dovrebbe essere sufficiente per consentire a Spark di partizionare automaticamente i dati.

Eccezione: quando scrivi dati partizionati per colonne in Cloud Storage, il partizionamento su una colonna specifica evita di scrivere molti file di piccole dimensioni per ottenere tempi di scrittura più rapidi.

df.repartition("col_name").write().partitionBy("col_name").save("gs://...")

Archivia i dati in Parquet o Avro

Per impostazione predefinita, Spark SQL legge e scrive dati nei file Parquet compressi. Parquet utilizza un efficiente formato file a colonne che consente a Spark di leggere solo i dati necessari per eseguire un'applicazione. Questo è un vantaggio importante quando si lavora con set di dati di grandi dimensioni. Anche altri formati a colonne, come Apache ORC, hanno un buon rendimento.

Per i dati non a colonne, Apache Avro fornisce un formato di file a righe binarie efficiente. Anche se generalmente è più lento rispetto a Parquet, Avro ha prestazioni migliori rispetto ai formati basati su testo,come CSV o JSON.

Ottimizza dimensioni disco

La velocità effettiva dei dischi permanenti scala con le dimensioni del disco, che possono influire sulle prestazioni dei job Spark poiché i job scrivono metadati e eseguono lo shuffling dei dati sul disco. Quando utilizzi dischi permanenti standard, la dimensione del disco deve essere di almeno 1 terabyte per worker (consulta Prestazioni per dimensione del disco permanente).

Per monitorare la velocità effettiva del disco worker nella console Google Cloud:

  1. Fai clic sul nome del cluster nella pagina Cluster.
  2. Fai clic sulla scheda ISTANZE VM.
  3. Fai clic sul nome di un lavoratore.
  4. Fai clic sulla scheda MONITORAGGIO, quindi scorri verso il basso fino a Velocità effettiva del disco per visualizzare la velocità effettiva del worker.

Considerazioni sul disco

I cluster Dataproc temporanei, che non traggono vantaggio dall'archiviazione permanente, possono utilizzare SSD locali. Le unità SSD locali sono fisicamente collegate al cluster e offrono una velocità effettiva superiore rispetto ai dischi permanenti (consulta la tabella delle prestazioni). Gli SSD locali hanno una dimensione fissa di 375 gigabyte, ma puoi aggiungerne più di uno per aumentare le prestazioni.

Gli SSD locali non mantengono i dati dopo l'arresto del cluster. Se hai bisogno di archiviazione permanente, puoi utilizzare i dischi permanenti SSD, che offrono una velocità effettiva più elevata per le loro dimensioni rispetto ai dischi permanenti standard. I dischi permanenti SSD sono una buona scelta anche se la dimensione della partizione sarà inferiore a 8 kB (tuttavia, evita piccole parità).

Collega le GPU al cluster

Spark 3 supporta le GPU. Utilizza le GPU con l'azione di inizializzazione RAPIDS per accelerare i job Spark utilizzando l'acceleratore SQL RAPIDS. L'azione di inizializzazione del driver GPU per configurare un cluster con GPU.

Correzioni e errori comuni relativi ai job

Memoria esaurita

Esempi:

  • "Esecutore smarrito"
  • "java.lang.OutOfMemoryError: superamento del limite di overhead GC"
  • "Container interrotto da YARN per aver superato i limiti di memoria"

Possibili correzioni:

Errori di recupero casuale

Esempi:

  • "FetchFailedException" (errore Spark)
  • "Impossibile connettersi a..." (Errore di Spark)
  • "Recupero non riuscito" (errore MapReduce)

Di solito il problema è causato dalla rimozione prematura di worker che hanno ancora dati elaborati (shuffle) da elaborare.

Possibili cause e correzioni:

  • Le VM worker prerilasciabili sono state recuperate o le VM worker non prerilasciabili sono state rimosse dal gestore della scalabilità automatica. Soluzione: utilizza la modalità di flessibilità avanzata per rendere i worker secondari prerilasciabili o scalabili in modo sicuro.
  • Si è verificato un arresto anomalo dell'esecutore o del mappatore a causa di un errore di memoria insufficiente. Soluzione: aumenta la memoria di esecutore o mappatore.
  • Il servizio Spark shuffle potrebbe essere sovraccarico. Soluzione: riduci il numero di partizioni dei job.

I nodi YARN non sono sani

Esempi (dai log YARN):

...reported UNHEALTHY with details: 1/1 local-dirs usable space is below
configured utilization percentage/no more usable space
[ /hadoop/yarn/nm-local-dir : used space above threshold of 90.0% ]

Spesso è correlato a uno spazio su disco insufficiente per i dati di shuffling. Esegui la diagnostica visualizzando i file di log:

  • Apri la pagina Cluster del progetto nella console Google Cloud, quindi fai clic sul nome del cluster.
  • Fai clic su VISUALIZZA LOG.
  • Filtra i log per hadoop-yarn-nodemanager.
  • Cerca "UNHEALTHY".

Possibili correzioni:

  • La cache dell'utente è archiviata nella directory specificata dalla proprietà yarn.nodemanager.local-dirs in yarn-site.xml file. Questo file si trova nella cartella /etc/hadoop/conf/yarn-site.xml. Puoi verificare lo spazio libero nel percorso /hadoop/yarn/nm-local-dir e liberare spazio eliminando la cartella della cache dell'utente /hadoop/yarn/nm-local-dir/usercache.
  • Se il log riporta lo stato "UNHEALTHY", ricrea il cluster con uno spazio su disco più grande; in questo modo aumenti il limite di velocità effettiva.

Il job non riesce a causa di memoria del driver insufficiente

Durante l'esecuzione dei job in modalità cluster, il job non riesce se la dimensione della memoria del nodo master è notevolmente superiore a quella del nodo worker.

Esempio dai log del conducente:

'Exception in thread "main" java.lang.IllegalArgumentException:
Required AM memory (32768+3276 MB) is above the max threshold (12288 MB) of this cluster!
Please check the values of 'yarn.scheduler.maximum -allocation-mb' and/or 'yarn.nodemanager.resource.memory-mb'.'

Possibili correzioni:

  • Imposta spark:spark.driver.memory su un valore inferiore a yarn:yarn.scheduler.maximum-allocation-mb.
  • Utilizza lo stesso tipo di macchina per i nodi master e worker.

Per maggiori informazioni