Suggerimenti per l'ottimizzazione del job Spark

Le sezioni che seguono forniscono suggerimenti per aiutarti a ottimizzare le applicazioni Spark di Dataproc.

Utilizzare cluster temporanei

Quando utilizzi il modello di cluster "temporaneo" di Dataproc, crei un cluster dedicato per ogni job ed elimini il cluster al termine del job. Con il modello temporaneo, puoi trattare l'archiviazione e l'elaborazione separatamente, salvare i dati di input e output dei job in Cloud Storage o BigQuery, utilizzando il cluster solo per l'elaborazione e l'archiviazione temporanea dei dati.

Svantaggi dei cluster persistenti

L'utilizzo di cluster temporanei con un solo job evita i seguenti problemi e potenziali problemi associati all'utilizzo di cluster "permanenti" condivisi e di lunga durata:

  • Punti di errore singoli: uno stato di errore del cluster condiviso può causare l'errore di tutti i job, bloccando un'intera pipeline di dati. L'analisi e il recupero da un errore possono richiedere ore. Poiché i cluster effimeri mantengono solo stati all'interno del cluster temporanei, quando si verifica un errore, possono essere eliminati e ricreati rapidamente.
  • Difficoltà di gestione e migrazione degli stati del cluster in HDFS, MySQL o file system locali
  • Controversie sulle risorse tra job che influiscono negativamente sugli SLO
  • Daemon di servizio non rispondenti causati da una pressione sulla memoria
  • Accumulo di log e file temporanei che può superare la capacità del disco
  • Errore di scale up a causa di problemi di disponibilità nella zona del cluster
  • Mancanza di supporto per le versioni obsolete delle immagini del cluster.

Vantaggi dei cluster temporanei

Sul lato positivo, i cluster effimeri ti consentono di:

  • Configura autorizzazioni IAM diverse per job diversi con diversi account di servizio VM Dataproc.
  • Ottimizza le configurazioni hardware e software di un cluster per ogni job, modificando le configurazioni del cluster in base alle esigenze.
  • Esegui l'upgrade delle versioni delle immagini nei nuovi cluster per ricevere le patch di sicurezza, le correzioni di bug e le ottimizzazioni più recenti.
  • Risolvi i problemi più rapidamente in un cluster isolato con un solo job.
  • Risparmia sui costi pagando solo per il tempo di esecuzione del cluster temporaneo, non per il tempo di inattività tra i job su un cluster condiviso.

Utilizza Spark SQL

L'API DataFrame di Spark SQL è un'ottimizzazione significativa dell'API RDD. Se interagisci con codice che utilizza RDD, ti consigliamo di leggere i dati come DataFrame prima di passare un RDD nel codice. Nel codice Java o Scala, ti consigliamo di utilizzare l'API Dataset di Spark SQL come superset di RDD e DataFrame.

Utilizzare Apache Spark 3

Dataproc 2.0 installa Spark 3, che include le seguenti funzionalità e miglioramenti delle prestazioni:

  • Supporto GPU
  • Possibilità di leggere file binari
  • Miglioramenti delle prestazioni
  • Potatura delle partizioni dinamiche
  • Esecuzione di query adattiva, che ottimizza i job Spark in tempo reale

Utilizzare l'allocazione dinamica

Apache Spark include una funzionalità di allocazione dinamica che scala il numero di esecutori Spark sui worker all'interno di un cluster. Questa funzionalità consente a un job di utilizzare l'intero cluster Dataproc anche quando il cluster viene eseguito in modalità di scalabilità. Questa funzionalità è attivata per impostazione predefinita su Dataproc (spark.dynamicAllocation.enabled è impostato su true). Per ulteriori informazioni, consulta Allocazione dinamica di Spark.

Utilizzare la scalabilità automatica Dataproc

La scalabilità automatica di Dataproc aggiunge e rimuove dinamicamente i worker Dataproc da un cluster per garantire che i job Spark dispongano delle risorse necessarie per essere completati rapidamente.

È una best practice configurare il criterio di scalabilità automatica in modo da ridimensionare solo i worker secondari.

Utilizzare la modalità di flessibilità avanzata di Dataproc

I cluster con VM preemptibili o un criterio di scalabilità automatica potrebbero ricevere eccezioni FetchFailed quando i worker vengono prelevati o rimossi prima di terminare l'invio dei dati misti ai riduttori. Questa eccezione può causare ripetizioni delle attività e tempi di completamento dei job più lunghi.

Consiglio: utilizza la modalità di flessibilità avanzata di Dataproc, che non memorizza i dati di ordinamento intermedio sui worker secondari, in modo che questi possano essere prerilasciati o ridotti in sicurezza.

Configurare il partizionamento e la permutazione

Spark memorizza i dati in partizioni temporanee sul cluster. Se l'applicazione raggruppa o unisce DataFrame, mescola i dati in nuove partizioni in base al raggruppamento e alla configurazione a basso livello.

Il partizionamento dei dati influisce in modo significativo sulle prestazioni dell'applicazione: troppe o troppo poche partizioni limitano il parallelismo dei job e l'utilizzo delle risorse del cluster; troppe partizioni rallentano il job a causa dell'elaborazione e dello shuffling aggiuntivi delle partizioni.

Configurazione delle partizioni

Le seguenti proprietà regolano il numero e le dimensioni delle partizioni:

  • spark.sql.files.maxPartitionBytes: la dimensione massima delle partizioni quando leggi i dati da Cloud Storage. Il valore predefinito è 128 MB, che è sufficientemente grande per la maggior parte delle applicazioni che elaborano meno di 100 TB.

  • spark.sql.shuffle.partitions: il numero di partizioni dopo l'esecuzione di un ordinamento casuale. Il valore predefinito è 200, che è appropriato per i cluster con meno di 100 vCPU in totale. Consiglio: imposta questo valore su 3 volte il numero di vCPU nel cluster.

  • spark.default.parallelism: il numero di partizioni restituite dopo l'esecuzione di trasformazioni RDD che richiedono rimescolamenti, come join, reduceByKey e parallelize. Il valore predefinito è il numero totale di vCPU nel cluster. Quando utilizzi gli RDD nei job Spark, puoi impostare questo numero su 3 volte le vCPU

Limita il numero di file

Si verifica una perdita di prestazioni quando Spark legge un numero elevato di file di piccole dimensioni. Memorizza i dati in file di dimensioni maggiori, ad esempio con dimensioni comprese tra 256 MB e 512 MB. Analogamente, limita il numero di file di output (per forzare una riproduzione casuale, consulta Evitare le riproduzioni casuali non necessarie).

Configurare l'esecuzione di query adattiva (Spark 3)

L'esecuzione di query adattiva (attivata per impostazione predefinita nella versione 2.0 dell'immagine Dataproc) offre miglioramenti delle prestazioni dei job Spark, tra cui:

Sebbene le impostazioni di configurazione predefinite siano valide per la maggior parte dei casi d'uso, può essere utile impostare spark.sql.adaptive.advisoryPartitionSizeInBytes su spark.sqlfiles.maxPartitionBytes (128 MB predefiniti).

Evitare rimescolamenti non necessari

Spark consente agli utenti di attivare manualmente un'operazione di mescolamento per riequilibrare i dati con la funzione repartition. Le riorganizzazioni sono costose, quindi devono essere utilizzate con cautela. L'impostazione delle configurazioni della partizione in modo appropriato dovrebbe essere sufficiente per consentire a Spark di partizionare automaticamente i dati.

Eccezione:quando scrivi dati con partizione delle colonne in Cloud Storage, la ripartizione in base a una colonna specifica evita di scrivere molti file di piccole dimensioni per ottenere tempi di scrittura più rapidi.

df.repartition("col_name").write().partitionBy("col_name").save("gs://...")

Memorizza i dati in Parquet o Avro

Per impostazione predefinita, Spark SQL legge e scrive i dati in file Parquet compressi con Snappy. Parquet è un formato file colonnare efficiente che consente a Spark di leggere solo i dati necessari per eseguire un'applicazione. Questo è un vantaggio importante quando si lavora con set di dati di grandi dimensioni. Anche altri formati colonnari, come Apache ORC, hanno un buon rendimento.

Per i dati non colonnari, Apache Avro fornisce un formato file di righe binarie efficiente. Sebbene in genere sia più lento di Parquet, le prestazioni di Avro sono migliori rispetto ai formati basati su testo,come CSV o JSON.

Ottimizza le dimensioni del disco

La velocità effettiva dei dischi permanenti aumenta in base alle dimensioni del disco, il che può influire sulle prestazioni dei job Spark poiché i job scrivono metadati e rimescolano i dati sul disco. Quando utilizzi i dischi permanenti standard, le dimensioni del disco devono essere almeno di 1 terabyte per worker (consulta Prestazioni in base alle dimensioni del disco permanente).

Per monitorare la produttività del disco del worker nella console Google Cloud:

  1. Fai clic sul nome del cluster nella pagina Cluster.
  2. Fai clic sulla scheda ISTANZE VM.
  3. Fai clic su un nome di utente.
  4. Fai clic sulla scheda MONITORING, quindi scorri verso il basso fino a Throughput del disco per visualizzare il throughput del worker.

Considerazioni sul disco

I cluster Dataproc temporanei, che non beneficiano dello spazio di archiviazione permanente, possono utilizzare unità SSD locali. Gli SSD locali sono collegati fisicamente al cluster e offrono una maggiore velocità effettiva rispetto ai dischi permanenti (consulta la tabella delle prestazioni). Le unità SSD locali sono disponibili con una dimensione fissa di 375 gigabyte, ma puoi aggiungere più unità SSD per aumentare le prestazioni.

Le unità SSD locali non mantengono i dati dopo l'arresto di un cluster. Se hai bisogno di archiviazione permanente, puoi utilizzare i dischi permanenti SSD, che offrono una velocità effettiva superiore per le loro dimensioni rispetto ai dischi permanenti standard. I dischi permanenti SSD sono anche una buona scelta se le dimensioni della partizione saranno inferiori a 8 KB (tuttavia, evitare partizioni piccole).

Collega le GPU al cluster

Spark 3 supporta le GPU. Utilizza le GPU con l'azione di inizializzazione RAPIDS per velocizzare i job Spark utilizzando l'acceleratore SQL RAPIDS. L'azione di inizializzazione del driver GPU per configurare un cluster con GPU.

Errori e correzioni comuni dei job

Memoria insufficiente

Esempi:

  • "Esecutore perso"
  • "java.lang.OutOfMemoryError: GC overhead limit exceeded"
  • "Contenitore interrotto da YARN per superamento dei limiti di memoria"

Possibili correzioni:

Errori di recupero della riproduzione casuale

Esempi:

  • "FetchFailedException" (errore Spark)
  • "Impossibile connettersi a…" (Errore Spark)
  • "Failed to fetch" (Errore MapReduce)

In genere è causato dalla rimozione prematura di worker che hanno ancora dati da pubblicare con l'ordinamento casuale.

Possibili cause e correzioni:

  • Le VM dei worker preemptible sono state recuperate o le VM dei worker non preemptible sono state rimosse dall'agente di scalabilità automatica. Soluzione: utilizza la modalità flessibilità avanzata per rendere i lavoratori secondari preemibili o scalabili in modo sicuro.
  • L'executor o il mapper ha avuto un arresto anomalo a causa di un errore OutOfMemory. Soluzione: aumenta la memoria dell'executor o del mapper.
  • Il servizio di riproduzione casuale di Spark potrebbe essere sovraccaricato. Soluzione: riduci il numero di partizioni di job.

I nodi YARN non sono integri

Esempi (dai log YARN):

...reported UNHEALTHY with details: 1/1 local-dirs usable space is below
configured utilization percentage/no more usable space
[ /hadoop/yarn/nm-local-dir : used space above threshold of 90.0% ]

Spesso è correlato a uno spazio su disco insufficiente per i dati di riproduzione casuale. Esegui la diagnosi visualizzando i file di log:

  • Apri la pagina Cluster del progetto nella console Google Cloud, quindi fai clic sul nome del cluster.
  • Fai clic su VISUALIZZA LOG.
  • Filtra i log per hadoop-yarn-nodemanager.
  • Cerca "MALSANO".

Possibili correzioni:

  • La cache utente viene archiviata nella directory specificata dalla proprietà yarn.nodemanager.local-dirs in yarn-site.xml file. Questo file si trova in /etc/hadoop/conf/yarn-site.xml. Puoi controllare lo spazio libero nel percorso /hadoop/yarn/nm-local-dir e liberare spazio eliminando la cartella della cache utente /hadoop/yarn/nm-local-dir/usercache.
  • Se il log riporta lo stato "NON INTEGRO", ricrea il cluster con uno spazio su disco maggiore, che aumenterà il limite di throughput.

Il job non riesce a causa di una memoria del driver insufficiente

Quando esegui i job in modalità cluster, il job non va a buon fine se le dimensioni della memoria del nodo master sono notevolmente superiori a quelle del nodo worker.

Esempio dai log del driver:

'Exception in thread "main" java.lang.IllegalArgumentException:
Required AM memory (32768+3276 MB) is above the max threshold (12288 MB) of this cluster!
Please check the values of 'yarn.scheduler.maximum -allocation-mb' and/or 'yarn.nodemanager.resource.memory-mb'.'

Possibili correzioni:

  • Imposta spark:spark.driver.memory su un valore inferiore a yarn:yarn.scheduler.maximum-allocation-mb.
  • Utilizza lo stesso tipo di macchina per i nodi master e worker.

Per ulteriori informazioni