Suggerimenti per l'ottimizzazione del job Spark

Le sezioni che seguono forniscono suggerimenti per aiutarti a ottimizzare le applicazioni Spark di Dataproc.

Utilizzare cluster temporanei

Quando utilizzi il modello di cluster "temporaneo" di Dataproc, crei un cluster dedicato per ogni job ed elimini il cluster al termine del job. Con il modello temporaneo, puoi trattare separatamente archiviazione e computing, salvare i dati di input e output del job in Cloud Storage o BigQuery; utilizzando il cluster solo per il calcolo e l'archiviazione temporanea dei dati.

Insidie dei cluster persistenti

L'utilizzo di cluster one-job temporanei evita le insidie e le potenziali Problemi associati all'uso di elementi "persistenti" condivisi e a lunga durata cluster:

  • Punti di errore singoli: uno stato di errore del cluster condiviso può causare l'errore di tutti i job, bloccando un'intera pipeline di dati. L'analisi e il recupero da un errore possono richiedere ore. Poiché i cluster temporanei mantengono solo gli stati nel cluster temporanei, quando si verifica un errore, possono essere eliminati e ricreati rapidamente.
  • Difficoltà a mantenere e migrare gli stati dei cluster in HDFS, MySQL o file system locali
  • Contese delle risorse tra i job che influiscono negativamente sugli SLO
  • Daemon di servizio non rispondenti causati da una pressione sulla memoria
  • Accumulo di log e file temporanei che possono superare la capacità del disco
  • Errore di upscaling a causa di esaurimento della zona del cluster
  • Mancanza di supporto per le versioni delle immagini cluster obsolete.

Vantaggi del cluster temporaneo

Sul lato positivo, i cluster effimeri ti consentono di:

  • Configura autorizzazioni IAM diverse per job differenti con differenti Account di servizio VM Dataproc.
  • Ottimizzare le configurazioni hardware e software di un cluster modificando le configurazioni del cluster in base alle esigenze.
  • Esegui l'upgrade delle versioni delle immagini nei nuovi cluster per ricevere le patch di sicurezza, le correzioni di bug e le ottimizzazioni più recenti.
  • Risolvi i problemi più rapidamente su un cluster isolato con un singolo job.
  • Risparmia sui costi pagando solo per il tempo di esecuzione del cluster temporaneo, non per il tempo di inattività tra i job su un cluster condiviso.

Utilizza Spark SQL

L'API DataFrame di Spark SQL è un'ottimizzazione significativa dell'API RDD. Se interagisci con codice che utilizza RDD, ti consigliamo di leggere i dati come DataFrame prima di passare un RDD nel codice. Nel codice Java o Scala, ti consigliamo di utilizzare l'API Dataset di Spark SQL come superset di RDD e DataFrame.

Utilizza Apache Spark 3

Dataproc 2.0 installa Spark 3, che include le seguenti funzionalità e prestazioni miglioramenti:

  • Supporto GPU
  • Può leggere i file binari
  • Miglioramenti delle prestazioni
  • Potatura partizione dinamica
  • Esecuzione adattiva delle query, che ottimizza i job Spark in tempo reale

Utilizzare l'allocazione dinamica

Apache Spark include una funzionalità di allocazione dinamica che scala il numero di esecutori Spark sui worker all'interno di un cluster. Questa funzionalità consente a un job di utilizzare l'intero cluster Dataproc anche quando il cluster viene eseguito in modalità di scalabilità. Questa funzionalità è abilitata per impostazione predefinita su Dataproc (spark.dynamicAllocation.enabled impostato su true). Consulta Allocazione dinamica di Spark per ulteriori informazioni.

Usa scalabilità automatica Dataproc

La scalabilità automatica di Dataproc aggiunge e rimuove dinamicamente i worker Dataproc da un cluster per garantire che i job Spark dispongano delle risorse necessarie per essere completati rapidamente.

È una best practice configurare il criterio di scalabilità automatica in modo da ridimensionare solo i worker secondari.

Utilizzare la modalità di flessibilità avanzata di Dataproc

I cluster con VM preemptibili o un criterio di scalabilità automatica potrebbero ricevere eccezioni FetchFailed quando i worker vengono prelevati o rimossi prima di terminare l'invio dei dati misti ai riduttori. Questa eccezione può causare ripetizioni delle attività e tempi di completamento dei job più lunghi.

Consiglio: utilizza la modalità di flessibilità avanzata di Dataproc, che non memorizza i dati di ordinamento intermedio sui worker secondari, in modo che questi possano essere prerilasciati o ridotti in sicurezza.

Configura il partizionamento e lo shuffling

Spark archivia i dati in partizioni temporanee in un cluster Kubernetes. Se l'applicazione raggruppa o unisce DataFrame, mescola i dati in nuove partizioni in base al raggruppamento e alla configurazione a basso livello.

Il partizionamento dei dati influisce in modo significativo sulle prestazioni dell'applicazione: troppe o troppo poche partizioni limitano il parallelismo dei job e l'utilizzo delle risorse del cluster; troppe partizioni rallentano il job a causa dell'elaborazione e dello shuffling aggiuntivi delle partizioni.

Configurazione delle partizioni

Le seguenti proprietà regolano il numero e le dimensioni delle partizioni:

  • spark.sql.files.maxPartitionBytes: la dimensione massima delle partizioni quando e leggere i dati da Cloud Storage. Il valore predefinito è 128 MB, che è sufficientemente grande per la maggior parte delle applicazioni che elaborano meno di 100 TB.

  • spark.sql.shuffle.partitions: il numero di partizioni dopo l'esecuzione di un ordinamento casuale. Il valore predefinito è 200, che è appropriato per i cluster con meno di 100 vCPU in totale. Consiglio: imposta questo valore su un importo pari al triplo del di vCPU nel tuo cluster.

  • spark.default.parallelism: il numero di partizioni restituite dopo l'esecuzione di trasformazioni RDD che richiedono rimescolamenti, come join, reduceByKey e parallelize. Il valore predefinito è il numero totale di vCPU nel cluster. Quando utilizzi gli RDD nei job Spark, puoi impostare questo numero al triplo delle vCPU

Limita il numero di file

Si verifica una perdita di prestazioni quando Spark legge un numero elevato di file di piccole dimensioni. Archivia i dati in file di dimensioni maggiori, ad esempio dimensioni dei file comprese tra 256 MB e 512 MB. Analogamente, limita il numero di file di output (per forzare una riproduzione casuale, consulta Evitare le riproduzioni casuali non necessarie).

Configura l'esecuzione adattiva delle query (Spark 3)

L'esecuzione di query adattiva (attivata per impostazione predefinita nella versione 2.0 dell'immagine Dataproc) offre miglioramenti delle prestazioni dei job Spark, tra cui:

Anche se nella maggior parte dei casi d'uso le impostazioni predefinite sono audio, impostazione di spark.sql.adaptive.advisoryPartitionSizeInBytes su spark.sqlfiles.maxPartitionBytes (128 MB predefinito) può essere utile.

Evita shuffling non necessari

Spark consente agli utenti di attivare manualmente un'operazione di mescolamento per riequilibrare i dati con la funzione repartition. Le riorganizzazioni sono costose, quindi devono essere utilizzate con cautela. L'impostazione delle configurazioni della partizione in modo appropriato dovrebbe essere sufficiente per consentire a Spark di partizionare automaticamente i dati.

Eccezione: durante la scrittura di dati partizionati in colonne Cloud Storage, il partizionamento su una colonna specifica evita scrivere molti file di piccole dimensioni per ottenere tempi di scrittura più rapidi.

df.repartition("col_name").write().partitionBy("col_name").save("gs://...")

Archiviare i dati in Parquet o Avro

Spark SQL legge e scrive per impostazione predefinita i dati in formato Snappy compresso Parquet. Parquet è un formato file colonnare efficiente che consente a Spark di leggere solo i dati necessari per eseguire un'applicazione. È un importante vantaggio quando si lavora con set di dati di grandi dimensioni. Altri formati a colonne, ad esempio Apache ORC, ha prestazioni soddisfacenti.

Per i dati non colonnari, Apache Avro fornisce un formato file di righe binarie efficiente. Sebbene in genere sia più lento di Parquet, le prestazioni di Avro sono migliori rispetto ai formati basati su testo, come CSV o JSON.

Ottimizza le dimensioni del disco

La velocità effettiva dei dischi permanenti viene scalata in base alla dimensione del disco, il che può influire le prestazioni del job Spark, poiché i job scrivono metadati ed eseguono lo shuffling dei dati disco. Quando utilizzi i dischi permanenti standard, le dimensioni del disco devono essere almeno di 1 terabyte per worker (consulta Prestazioni in base alle dimensioni del disco permanente).

Per monitorare la produttività del disco del worker nella console Google Cloud:

  1. Fai clic sul nome del cluster nella Cluster .
  2. Fai clic sulla scheda ISTANZE VM.
  3. Fai clic su un nome di utente.
  4. Fai clic sulla scheda MONITORING, quindi scorri verso il basso fino a Velocità effettiva del disco per visualizzare la velocità effettiva del worker.

Considerazioni sul disco

i cluster Dataproc temporanei, che non possono trarre vantaggio dall'archiviazione permanente. possono utilizzare SSD locali. Gli SSD locali sono collegati fisicamente al cluster e offrono una maggiore velocità effettiva rispetto ai dischi permanenti (consulta la tabella delle prestazioni). Gli SSD locali sono disponibili con una dimensione fissa 375 gigabyte, ma puoi aggiungere più SSD per aumentare le prestazioni.

Gli SSD locali non mantengono i dati dopo l'arresto di un cluster. Se hai bisogno di puoi usare i dischi permanenti SSD, Offrono una velocità effettiva maggiore. rispetto ai dischi permanenti standard. Anche i dischi permanenti SSD sono è una buona scelta se la dimensione della partizione è inferiore a 8 kB (tuttavia, evita piccole parità).

Collega le GPU al cluster

Spark 3 supporta le GPU. Utilizza GPU con Azione di inizializzazione RAPIDS per velocizzare i job Spark utilizzando Acceleratore SQL RAPIDS. La Azione di inizializzazione del driver GPU per configurare un cluster con GPU.

Correzioni e errori comuni dei job

Memoria esaurita

Esempi:

  • "Esecutore perduto"
  • "java.lang.OutOfMemoryError: limite di overhead GC superato"
  • "Contenitore interrotto da YARN per superamento dei limiti di memoria"

Possibili correzioni:

Shuffling degli errori di recupero

Esempi:

  • "FetchFailedEccezione" (Errore di Spark)
  • "Impossibile connettersi a..." (Errore Spark)
  • "Failed to fetch" (Errore MapReduce)

In genere è causato dalla rimozione prematura di worker che hanno ancora dati da pubblicare con l'ordinamento casuale.

Possibili cause e correzioni:

  • Le VM dei worker prerilasciabili sono state recuperate o le VM dei worker non prerilasciabili sono state rimosse dall'agente di scalabilità automatica. Soluzione: utilizza la modalità flessibilità avanzata per rendere i lavoratori secondari preemptibili o scalabili in modo sicuro.
  • Si è verificato un arresto anomalo dell'esecutore o del mappatore a causa di un errore di OutOfMemory. Soluzione: aumenta la memoria dell'executor o del mapper.
  • Il servizio di shuffle Spark potrebbe essere sovraccarico. Soluzione: per ridurre il numero di partizioni del job.

I nodi YARN non sono integri

Esempi (dai log di YARN):

...reported UNHEALTHY with details: 1/1 local-dirs usable space is below
configured utilization percentage/no more usable space
[ /hadoop/yarn/nm-local-dir : used space above threshold of 90.0% ]

Spesso è correlato a spazio su disco insufficiente per lo shuffling dei dati. Diagnosi in base a visualizzazione dei file di log:

  • Apri la finestra di dialogo del progetto Cluster nella console Google Cloud, quindi fai clic sul nome del cluster.
  • Fai clic su VISUALIZZA LOG.
  • Filtra i log per hadoop-yarn-nodemanager.
  • Cerca "MALSANO".

Possibili correzioni:

  • La cache dell'utente viene archiviata nella directory specificata yarn.nodemanager.local-dirs proprietà nel yarn-site.xml file. Questo file si trova in /etc/hadoop/conf/yarn-site.xml. Puoi controllare lo spazio libero nel percorso /hadoop/yarn/nm-local-dir e libera spazio eliminazione della cartella /hadoop/yarn/nm-local-dir/usercache della cache dell'utente.
  • Se il log riporta lo stato "NON INTEGRO", ricrea il cluster con uno spazio su disco più grande, che aumenterà il limite di throughput.

Il job non riesce a causa di memoria del driver insufficiente

Quando esegui i job in modalità cluster, il job non va a buon fine se le dimensioni della memoria del nodo master sono notevolmente superiori a quelle del nodo worker.

Esempio dai log del driver:

'Exception in thread "main" java.lang.IllegalArgumentException:
Required AM memory (32768+3276 MB) is above the max threshold (12288 MB) of this cluster!
Please check the values of 'yarn.scheduler.maximum -allocation-mb' and/or 'yarn.nodemanager.resource.memory-mb'.'

Possibili correzioni:

  • Imposta spark:spark.driver.memory su un valore inferiore a yarn:yarn.scheduler.maximum-allocation-mb.
  • Utilizza lo stesso tipo di macchina per i nodi master e worker.

Per ulteriori informazioni