Conseils pour régler des tâches Spark

Les sections suivantes fournissent des conseils pour vous aider à affiner vos applications Dataproc Spark.

Utiliser des clusters éphémères

Lorsque vous utilisez le modèle de cluster "éphémère" Dataproc, vous créez un cluster dédié à chaque tâche, puis vous supprimez le cluster une fois la tâche terminée. Le modèle éphémère vous permet de traiter le stockage et le calcul séparément, en enregistrant les données d'entrée et de sortie des tâches dans Cloud Storage ou BigQuery, en utilisant le cluster pour le calcul et le stockage temporaire des données uniquement.

Les pièges liés aux clusters persistants

L'utilisation de clusters éphémères à tâche unique évite les pièges et problèmes potentiels suivants associés à l'utilisation de clusters "persistants" partagés et de longue durée:

  • Points de défaillance uniques: l'état d'erreur d'un cluster partagé peut entraîner l'échec de toutes les tâches, bloquant ainsi l'intégralité d'un pipeline de données. L'investigation et la récupération d'une erreur peuvent prendre des heures. Étant donné que les clusters éphémères ne conservent que les états temporaires au sein du cluster, lorsqu'une erreur se produit, ils peuvent être rapidement supprimés et recréés.
  • Difficulté à gérer et migrer les états des clusters dans les systèmes de fichiers HDFS, MySQL ou locaux
  • Conflits de ressources entre les tâches qui ont un impact négatif sur les SLO
  • Daemons de service qui ne répondent pas en raison de la pression de la mémoire
  • Accumulation de journaux et de fichiers temporaires susceptibles de dépasser la capacité du disque
  • Échec de l'augmentation de la capacité en raison d'une rupture de stock dans la zone du cluster
  • Incompatibilité avec les versions d'image de cluster obsolètes.

Avantages des clusters éphémères

Côté positif, les clusters éphémères vous permettent d'effectuer les opérations suivantes:

  • Configurez différentes autorisations IAM pour différents jobs avec différents comptes de service de VM Dataproc.
  • Optimiser les configurations matérielles et logicielles d'un cluster pour chaque tâche, en modifiant les configurations si nécessaire.
  • Mettez à niveau les versions d'image dans les nouveaux clusters pour obtenir les derniers correctifs de sécurité, corrections de bugs et optimisations.
  • Résolvez les problèmes plus rapidement sur un cluster isolé à tâche unique.
  • Réduisez les coûts en ne payant que la durée d'exécution du cluster éphémère, et non le temps d'inactivité entre les tâches sur un cluster partagé.

Utiliser Spark SQL

L'API DataFrame Spark SQL est une optimisation importante de l'API RDD. Si vous interagissez avec du code qui utilise des RDD, envisagez de lire les données sous forme de DataFrame avant de transmettre un RDD dans le code. En code Java ou Scala, envisagez d'utiliser l'API Dataset Spark SQL en tant que sur-ensemble de RDD et de DataFrames.

Utiliser Apache Spark 3

Dataproc 2.0 installe Spark 3, qui inclut les fonctionnalités et les améliorations de performances suivantes:

  • Compatibilité avec les GPU
  • Lire des fichiers binaires
  • Amélioration des performances
  • Éliminer des partitions dynamiques
  • L'exécution des requêtes adaptable, qui optimise les tâches Spark en temps réel.

Utiliser l'allocation dynamique

Apache Spark inclut une fonctionnalité d'allocation dynamique qui ajuste le nombre d'exécuteurs Spark sur les nœuds de calcul d'un cluster. Cette fonctionnalité permet à une tâche d'utiliser le cluster Dataproc complet, même lorsqu'il évolue à la hausse. Cette fonctionnalité est activée par défaut sur Dataproc (spark.dynamicAllocation.enabled est défini sur true). Pour plus d'informations, consultez la section Allocation dynamique Spark.

Utilisez l'autoscaling Dataproc.

L'autoscaling Dataproc ajoute et supprime dynamiquement des nœuds de calcul Dataproc dans un cluster pour vous assurer que les tâches Spark disposent des ressources nécessaires pour se terminer rapidement.

Il est recommandé de configurer la règle d'autoscaling pour ne faire évoluer que les nœuds de calcul secondaires.

Utiliser le mode de flexibilité améliorée de Dataproc

Les clusters avec des VM préemptives ou une règle d'autoscaling peuvent recevoir des exceptions FetchFailed lorsque les nœuds de calcul sont préemptés ou supprimés avant de terminer la diffusion des données de brassage auprès des réducteurs. Cette exception peut entraîner des tentatives d'exécution de tâches et des délais d'exécution plus longs.

Recommandation: Utilisez le mode de flexibilité améliorée de Dataproc, qui ne stocke pas les données de brassage intermédiaires sur les nœuds de calcul secondaires, afin que ces derniers puissent être préemptés ou réduits en toute sécurité.

Configurer le partitionnement et le brassage

Spark stocke les données dans des partitions temporaires sur le cluster. Si votre application regroupe ou joint des DataFrames, elle brasse les données dans de nouvelles partitions en fonction du regroupement et de la configuration de bas niveau.

Le partitionnement des données a un impact important sur les performances de l'application: trop peu de partitions limitent le parallélisme des tâches et l'utilisation des ressources du cluster. Un trop grand nombre de partitions ralentit la tâche en raison d'un traitement et d'un brassage supplémentaires des partitions.

Configurer des partitions

Les propriétés suivantes régissent le nombre et la taille de vos partitions:

  • spark.sql.files.maxPartitionBytes: taille maximale des partitions lorsque vous lisez des données dans Cloud Storage. La valeur par défaut est de 128 Mo, ce qui est suffisamment volumineux pour la plupart des applications qui traitent moins de 100 To.

  • spark.sql.shuffle.partitions: nombre de partitions après avoir effectué un brassage. La valeur par défaut est de 200, ce qui convient pour les clusters comportant moins de 100 processeurs virtuels au total. Recommandation: Définissez ce paramètre sur trois fois le nombre de processeurs virtuels de votre cluster.

  • spark.default.parallelism: nombre de partitions renvoyées après avoir effectué des transformations RDD nécessitant des brassages, tels que join, reduceByKey et parallelize. La valeur par défaut correspond au nombre total de processeurs virtuels dans votre cluster. Lorsque vous utilisez des RDD dans des tâches Spark, vous pouvez définir ce nombre sur trois fois vos processeurs virtuels.

Limiter le nombre de fichiers

Une perte de performances survient lorsque Spark lit un grand nombre de petits fichiers. Stockez les données dans des fichiers de plus grande taille, par exemple compris entre 256 Mo et 512 Mo. De même, limitez le nombre de fichiers de sortie (pour forcer un brassage, consultez la section Éviter les brassages inutiles).

Configurer l'exécution de requêtes adaptable (Spark 3)

L'exécution de requêtes adaptable (activée par défaut dans la version 2.0 de l'image Dataproc) offre des améliorations de performances des tâches Spark, y compris:

Bien que les paramètres de configuration par défaut soient acceptables dans la plupart des cas, il peut être utile de définir spark.sql.adaptive.advisoryPartitionSizeInBytes sur spark.sqlfiles.maxPartitionBytes (128 Mo par défaut).

Éviter les brassages inutiles

Spark permet aux utilisateurs de déclencher manuellement un brassage afin de rééquilibrer leurs données avec la fonction repartition. Les brassages étant coûteux, les brassages de données doivent être utilisés avec précaution. La définition des configurations de partition devrait être suffisante pour permettre à Spark de partitionner automatiquement vos données.

Exception:Lors de l'écriture de données partitionnées par colonne dans Cloud Storage, le repartitionnement sur une colonne spécifique évite d'écrire de nombreux petits fichiers pour obtenir des temps d'écriture plus rapides.

df.repartition("col_name").write().partitionBy("col_name").save("gs://...")

Stocker des données dans Parquet ou Avro

Spark SQL utilise par défaut la lecture et l'écriture de données dans des fichiers Parquet compressés avec Snappy. Parquet est un format de fichiers en colonnes efficace qui permet à Spark de ne lire que les données nécessaires à l'exécution d'une application. C'est un avantage important lorsque vous travaillez avec de grands ensembles de données. D'autres formats en colonnes, tels qu'Apache ORC, fonctionnent également bien.

Apache Avro fournit un format de fichier de ligne binaire efficace pour les données non en colonnes. Bien que généralement plus lent que Parquet, les performances d'Avro sont meilleures que celles de formats texte, tels que CSV ou JSON.

Optimiser la taille du disque

Le débit des disques persistants évolue avec la taille du disque, ce qui peut affecter les performances des tâches Spark, car elles écrivent des métadonnées et brassent les données sur le disque. Lorsque vous utilisez des disques persistants standards, la taille du disque doit être d'au moins 1 téraoctet par nœud de calcul (consultez la section Performances par taille de disque persistant).

Pour surveiller le débit du disque des nœuds de calcul dans la console Google Cloud:

  1. Cliquez sur le nom du cluster sur la page Clusters.
  2. Cliquez sur l'onglet Instances de VM.
  3. Cliquez sur le nom d'un nœud de calcul.
  4. Cliquez sur l'onglet "MONITORING", puis faites défiler la page jusqu'à la section "Débit du disque" pour afficher le débit des nœuds de calcul.

Considérations relatives aux disques

Les clusters Dataproc éphémères, qui ne bénéficient pas du stockage persistant, peuvent utiliser des disques SSD locaux. Les disques SSD locaux sont rattachés physiquement au cluster et offrent un débit plus élevé que les disques persistants (voir le tableau des performances). Les disques SSD locaux sont disponibles dans une taille fixe de 375 gigaoctets, mais vous pouvez en ajouter plusieurs pour augmenter les performances.

Les disques SSD locaux ne conservent pas les données après l'arrêt d'un cluster. Si vous avez besoin d'un espace de stockage persistant, vous pouvez utiliser des disques persistants SSD, qui offrent un débit plus élevé pour leur taille que les disques persistants standards. Les disques persistants SSD constituent également un bon choix si la taille de la partition est inférieure à 8 Ko. Toutefois, évitez les petites partitions.

Associer des GPU à votre cluster

Spark 3 est compatible avec les GPU. Utilisez des GPU avec l'action d'initialisation RAPIDS pour accélérer les tâches Spark à l'aide de l'accélérateur SQL RAPIDS. L'action d'initialisation du pilote de GPU pour configurer un cluster avec des GPU.

Échecs et correctifs courants des tâches

Mémoire saturée

Exemples :

  • "Exécuteur perdu"
  • "java.lang.OutOfMemoryError: limite de surcharge GC dépassée"
  • "Conteneur supprimé par YARN pour avoir dépassé les limites de mémoire"

Corrections possibles :

Échecs de la récupération de fichiers brassés

Exemples :

  • "FetchFailedException" (erreur Spark)
  • "Échec de la connexion à..." (Erreur Spark)
  • "Échec de la récupération" (erreur MapReduce)

généralement causée par la suppression prématurée des nœuds de calcul contenant toujours des données de brassage à diffuser.

Causes et correctifs possibles:

  • Les VM de nœuds de calcul préemptifs ont été récupérées ou les VM de nœuds de calcul non préemptifs ont été supprimés par l'autoscaler. Solution: utilisez le mode de flexibilité améliorée pour rendre les nœuds de calcul secondaires préemptifs ou évolutifs en toute sécurité.
  • L'exécuteur ou le mappeur a planté en raison d'une erreur "OutOfMemory". Solution : augmentez la mémoire de l'exécuteur ou du mappeur.
  • Le service de brassage Spark peut être surchargé. Solution : réduisez le nombre de partitions de tâches.

Les nœuds YARN ne sont PAS OPÉRATIONNELS

Exemples (à partir des journaux YARN):

...reported UNHEALTHY with details: 1/1 local-dirs usable space is below
configured utilization percentage/no more usable space
[ /hadoop/yarn/nm-local-dir : used space above threshold of 90.0% ]

Souvent associée à un espace disque insuffisant pour les données de brassage. Diagnostiquez le problème en affichant les fichiers journaux:

  • Ouvrez la page Clusters de votre projet dans la console Google Cloud, puis cliquez sur le nom du cluster.
  • Cliquez sur View logs (Afficher les journaux).
  • Filtrez les journaux par hadoop-yarn-nodemanager.
  • Recherchez "UNHEALTHY".

Solutions possibles:

  • Le cache utilisateur est stocké dans le répertoire spécifié par la propriété yarn.nodemanager.local-dirs du fichier yarn-site.xml file. Ce fichier se trouve à l'emplacement /etc/hadoop/conf/yarn-site.xml. Vous pouvez vérifier l'espace disponible dans le chemin d'accès /hadoop/yarn/nm-local-dir et libérer de l'espace en supprimant le dossier de cache utilisateur /hadoop/yarn/nm-local-dir/usercache.
  • Si le journal indique l'état "UNHEALTHY", recréez votre cluster avec un espace disque plus important, ce qui aura pour effet d'augmenter le plafond de débit.

Échec du job en raison d'une mémoire de pilote insuffisante

Lors de l'exécution de tâches en mode cluster, celles-ci échouent si la taille de la mémoire du nœud maître est nettement supérieure à celle de la mémoire du nœud de calcul.

Exemple à partir des journaux de pilotes:

'Exception in thread "main" java.lang.IllegalArgumentException:
Required AM memory (32768+3276 MB) is above the max threshold (12288 MB) of this cluster!
Please check the values of 'yarn.scheduler.maximum -allocation-mb' and/or 'yarn.nodemanager.resource.memory-mb'.'

Solutions possibles:

  • Définissez une valeur pour spark:spark.driver.memory inférieure à yarn:yarn.scheduler.maximum-allocation-mb.
  • Utilisez le même type de machine pour les nœuds maîtres et de calcul.

Pour en savoir plus