Suggerimenti per l'ottimizzazione del job Spark

Le sezioni seguenti forniscono suggerimenti per aiutarti a ottimizzare Dataproc Applicazioni Spark.

Utilizza i cluster temporanei

Quando utilizzi il modello "temporaneo" di Dataproc un modello di cluster Kubernetes, un cluster dedicato per ogni job e, al termine del job, lo elimini. Con il modello temporaneo, puoi trattare separatamente archiviazione e computing, salvare i dati di input e output del job in Cloud Storage o BigQuery; utilizzando il cluster solo per il calcolo e l'archiviazione temporanea dei dati.

Insidie dei cluster persistenti

L'utilizzo di cluster one-job temporanei evita le insidie e le potenziali Problemi associati all'uso di elementi "persistenti" condivisi e a lunga durata cluster:

  • Single point of failure: uno stato di errore di un cluster condiviso può causare l'esito negativo di tutti i job. bloccare un'intera pipeline di dati. Analisi e ripristino in seguito a un errore possono richiedere ore. Poiché i cluster temporanei mantengono solo gli stati nel cluster temporanei, quando si verifica un errore, possono essere eliminati e ricreati rapidamente.
  • Difficoltà a mantenere e migrare gli stati dei cluster in HDFS, MySQL o file system locali
  • Contese delle risorse tra i job che influiscono negativamente sugli SLO
  • I daemon di servizio non rispondono causati dalla pressione della memoria
  • Accumulo di log e file temporanei che possono superare la capacità del disco
  • Errore di upscaling a causa di esaurimento della zona del cluster
  • Mancato supporto per le versioni delle immagini cluster obsolete.

Vantaggi del cluster temporaneo

Il lato positivo è che i cluster temporanei ti consentono di:

  • Configura autorizzazioni IAM diverse per job differenti con differenti Account di servizio VM Dataproc.
  • Ottimizzare le configurazioni hardware e software di un cluster modificando le configurazioni del cluster in base alle esigenze.
  • Esegui l'upgrade delle versioni delle immagini in nuovi cluster per ottenere la sicurezza più recente patch, correzioni di bug e ottimizzazioni.
  • Risolvi i problemi più rapidamente su un cluster isolato con un singolo job.
  • Risparmia sui costi pagando solo il tempo di esecuzione del cluster temporaneo, non per il tempo di inattività tra un job su un cluster condiviso e l'altro.

Utilizza Spark SQL

L'API SQL di Spark L'API DataFrame è un'ottimizzazione significativa dell'API RDD. Se interagisci con il codice che utilizza gli RDD, leggere i dati come DataFrame prima di passare un RDD nel codice. Nel codice Java o Scala, valuta la possibilità di utilizzare l'API del set di dati Spark SQL come soprainsieme di RDD e DataFrame.

Utilizza Apache Spark 3

Dataproc 2.0 installa Spark 3, che include le seguenti funzionalità e prestazioni miglioramenti:

  • Supporto GPU
  • Può leggere i file binari
  • Miglioramenti delle prestazioni
  • Potatura partizione dinamica
  • Esecuzione adattiva delle query, che ottimizza i job Spark in tempo reale

Utilizza l'allocazione dinamica

Apache Spark include una funzionalità di allocazione dinamica che scala il numero di esecutori Spark sui worker all'interno di un cluster. Questa funzione consente un job per utilizzare l'intero cluster Dataproc anche quando lo scale up. Questa funzionalità è abilitata per impostazione predefinita su Dataproc (spark.dynamicAllocation.enabled impostato su true). Consulta Allocazione dinamica di Spark per ulteriori informazioni.

Utilizzo della scalabilità automatica Dataproc

Dataproc Scalabilità automatica aggiunge e rimuove dinamicamente i worker Dataproc da un cluster per garantire I job Spark dispongono delle risorse necessarie per essere completati rapidamente.

È una best practice per configurare il criterio di scalabilità automatica in modo che si adatti solo ai lavoratori secondari.

Utilizzo della modalità di flessibilità avanzata di Dataproc

I cluster con VM prerilasciabili o un criterio di scalabilità automatica potrebbero ricevere FetchFailed eccezioni quando i worker vengono prerilasciati o rimossi prima del termine della pubblicazione mescola i dati ai riduttori. Questa eccezione può causare nuovi tentativi di attività e job più lunghi tempi di completamento.

Consiglio: utilizza Modalità di flessibilità avanzata di Dataproc, che non archivia dati di shuffling intermedi sui worker secondari, i worker secondari possono essere prerilasciati o ridimensionati in modo sicuro.

Configura il partizionamento e lo shuffling

Spark archivia i dati in partizioni temporanee in un cluster Kubernetes. Se la tua applicazione raggruppa o unisce DataFrames, esegue lo shuffling dei dati in nuove partizioni in base al raggruppamento e alla configurazione di basso livello.

Il partizionamento dei dati influisce notevolmente sulle prestazioni dell'applicazione: un numero troppo basso le partizioni limitano il parallelismo dei job e l'utilizzo delle risorse del cluster; un numero eccessivo di partizioni rallenta il job a causa ulteriori elaborazione delle partizioni e shuffling.

Configurazione delle partizioni

Le seguenti proprietà regolano il numero e le dimensioni delle partizioni:

  • spark.sql.files.maxPartitionBytes: la dimensione massima delle partizioni quando e leggere i dati da Cloud Storage. Il valore predefinito è 128 MB, ovvero sia sufficientemente grande per la maggior parte delle applicazioni che elaborano meno di 100 TB.

  • spark.sql.shuffle.partitions: il numero di partizioni dopo l'esecuzione di una shuffling. Il valore predefinito è 200 ed è appropriato per i cluster con un totale di meno di 100 vCPU. Consiglio: imposta questo valore su un importo pari al triplo del di vCPU nel tuo cluster.

  • spark.default.parallelism: il numero di partizioni restituite dopo di eseguire trasformazioni RDD che richiedono shuffling, ad esempio join, reduceByKey e parallelize. Il valore predefinito è il numero totale di vCPU nel tuo cluster. Quando utilizzi gli RDD nei job Spark, puoi impostare questo numero al triplo delle vCPU

Limita il numero di file

Si verifica una perdita di prestazioni quando Spark legge un numero elevato di file di piccole dimensioni. Archivia i dati in file di dimensioni maggiori, ad esempio dimensioni dei file comprese tra 256 MB e 512 MB. Analogamente, limita il numero di file di output (per forzare lo shuffling, vedi Evita di riprodurre casualmente i contenuti non necessari).

Configura l'esecuzione adattiva delle query (Spark 3)

Esecuzione adattiva delle query (abilitato per impostazione predefinita in Dataproc versione 2.0 dell'immagine) offre miglioramenti delle prestazioni dei job Spark, tra cui:

Anche se nella maggior parte dei casi d'uso le impostazioni predefinite sono audio, impostazione di spark.sql.adaptive.advisoryPartitionSizeInBytes su spark.sqlfiles.maxPartitionBytes (128 MB predefinito) può essere utile.

Evita shuffling non necessari

Spark consente agli utenti di attivare manualmente uno shuffling per ribilanciare i dati con la funzione repartition. Gli shuffling sono costosi, quindi il rimpasto dei dati dovrebbe da usare con cautela. Impostare le configurazioni della partizione in modo appropriato dovrebbe essere sufficiente per consentire a Spark di eseguire automaticamente il partizionamento i tuoi dati.

Eccezione: durante la scrittura di dati partizionati in colonne Cloud Storage, il partizionamento su una colonna specifica evita scrivere molti file di piccole dimensioni per ottenere tempi di scrittura più rapidi.

df.repartition("col_name").write().partitionBy("col_name").save("gs://...")

Archiviare i dati in Parquet o Avro

Spark SQL legge per impostazione predefinita i dati in lettura e scrittura in formato Snappy compresso Parquet. Parquet è in una colonna efficiente che consenta a Spark di leggere solo i dati necessari per eseguire un'applicazione. È un importante vantaggio quando si lavora con set di dati di grandi dimensioni. Altri formati a colonne, ad esempio Apache ORC, ha prestazioni soddisfacenti.

Per i dati non a colonne, Apache Avro offre una formato di file efficiente di riga binaria. Anche se in genere è più lento di Parquet, Il rendimento di Avro è migliore rispetto a formati basati su testo,come CSV o JSON.

Ottimizza dimensioni disco

La velocità effettiva dei dischi permanenti viene scalata in base alla dimensione del disco, il che può influire le prestazioni del job Spark, poiché i job scrivono metadati ed eseguono lo shuffling dei dati disco. Se utilizzi dischi permanenti standard, la dimensione del disco deve essere almeno pari a 1 terabyte per worker (vedi Prestazioni in base alle dimensioni del disco permanente).

Per monitorare la velocità effettiva del disco worker Console Google Cloud:

  1. Fai clic sul nome del cluster nella Cluster .
  2. Fai clic sulla scheda ISTANZE VM.
  3. Fai clic sul nome di un lavoratore.
  4. Fai clic sulla scheda MONITORING, quindi scorri verso il basso fino a Velocità effettiva del disco per visualizzare la velocità effettiva del worker.

Considerazioni sul disco

i cluster Dataproc temporanei, che non possono trarre vantaggio dall'archiviazione permanente. possono utilizzare SSD locali. Le unità SSD locali sono fisicamente collegate al cluster e forniscono una velocità effettiva più elevata. rispetto ai dischi permanenti (consulta la tabella delle prestazioni). Gli SSD locali sono disponibili con una dimensione fissa 375 gigabyte, ma puoi aggiungere più SSD per aumentare le prestazioni.

Gli SSD locali non mantengono i dati dopo l'arresto di un cluster. Se hai bisogno di mantenere puoi usare i dischi permanenti SSD, Offrono una velocità effettiva maggiore. rispetto ai dischi permanenti standard. Anche i dischi permanenti SSD sono è una buona scelta se la dimensione della partizione è inferiore a 8 kB (tuttavia, evita piccole parità).

Collega GPU al cluster

Spark 3 supporta le GPU. Utilizza GPU con Azione di inizializzazione RAPIDS per velocizzare i job Spark utilizzando Acceleratore SQL RAPIDS. La Azione di inizializzazione del driver GPU per configurare un cluster con GPU.

Correzioni e errori comuni dei job

Memoria esaurita

Esempi:

  • "Esecutore perduto"
  • "java.lang.OutOfMemoryError: Limite di overhead di GC superato"
  • "Container interrotto da YARN a causa del superamento dei limiti di memoria"

Possibili correzioni:

Shuffling degli errori di recupero

Esempi:

  • "FetchFailedException" (Errore di Spark)
  • "Impossibile connettersi a..." (Errore di Spark)
  • "Impossibile recuperare" (errore MapReduce)

In genere è causato da una rimozione prematura di lavoratori ancora in attività lo shuffling i dati da distribuire.

Possibili cause e soluzioni:

  • Le VM worker prerilasciabili sono state recuperate oppure le VM worker non prerilasciabili sono state rimosso dal gestore della scalabilità automatica. Soluzione: utilizza la modalità di flessibilità avanzata per rendere i worker secondari prerilasciabili o scalabili in modo sicuro.
  • Si è verificato un arresto anomalo dell'esecutore o del mappatore a causa di un errore di OutOfMemory. Soluzione: aumenta la memoria dell'esecutore o del mappatore.
  • Il servizio di shuffle Spark potrebbe essere sovraccarico. Soluzione: per ridurre il numero di partizioni del job.

I nodi YARN sono NON HEALTHY

Esempi (dai log YARN):

...reported UNHEALTHY with details: 1/1 local-dirs usable space is below
configured utilization percentage/no more usable space
[ /hadoop/yarn/nm-local-dir : used space above threshold of 90.0% ]

Spesso è correlato a spazio su disco insufficiente per lo shuffling dei dati. Diagnosi in base a visualizzazione dei file di log:

  • Apri la finestra di dialogo del progetto Cluster nella console Google Cloud, quindi fai clic sul nome del cluster.
  • Fai clic su VISUALIZZA LOG.
  • Filtra i log per hadoop-yarn-nodemanager.
  • Cerca "UNHEALTHY".

Possibili correzioni:

  • La cache dell'utente viene archiviata nella directory specificata yarn.nodemanager.local-dirs proprietà nel yarn-site.xml file. Questo file si trova in /etc/hadoop/conf/yarn-site.xml. Puoi controllare lo spazio libero nel percorso /hadoop/yarn/nm-local-dir e libera spazio eliminazione della cartella /hadoop/yarn/nm-local-dir/usercache della cache dell'utente.
  • Se il log indica "UNHEALTHY" esistente, ricrea il cluster con maggiore spazio su disco, Aumenta il limite di velocità effettiva.

Il job non riesce a causa di memoria del driver insufficiente

Durante l'esecuzione di job in modalità cluster, il job non riesce se la dimensione della memoria del master rispetto alle dimensioni della memoria del nodo worker.

Esempio dai log del driver:

'Exception in thread "main" java.lang.IllegalArgumentException:
Required AM memory (32768+3276 MB) is above the max threshold (12288 MB) of this cluster!
Please check the values of 'yarn.scheduler.maximum -allocation-mb' and/or 'yarn.nodemanager.resource.memory-mb'.'

Possibili correzioni:

  • Imposta spark:spark.driver.memory su un valore inferiore a yarn:yarn.scheduler.maximum-allocation-mb.
  • Utilizza lo stesso tipo di macchina per i nodi master e worker.

Per ulteriori informazioni