Les pipelines sont exécutés sur des clusters de machines. Ils obtiennent un débit élevé en divisant le travail à effectuer, puis en l'exécutant en parallèle sur les différents exécuteurs répartis sur le cluster. En règle générale, plus le nombre de divisions (ou partitions) est élevé, plus le pipeline peut s'exécuter rapidement. Le niveau de parallélisme de votre pipeline est déterminé par les sources et les étapes de brassage de celui-ci.
Sources
Au début de chaque exécution de pipeline, chaque source de votre pipeline calcule les données à lire et la manière dont elles peuvent être divisées en segments. Par exemple, considérons un pipeline de base qui lit à partir de Cloud Storage, effectue des transformations Wrangler, puis écrit à nouveau dans Cloud Storage.
Lorsque le pipeline démarre, la source Cloud Storage examine les fichiers d'entrée et les divise en fonction de leur taille. Par exemple, un fichier d'un gigaoctet peut être divisé en 100 splits, chacun de 10 Mo. Chaque exécuteur lit les données de cette division, exécute les transformations Wrangler, puis écrit la sortie dans un fichier part.
Si votre pipeline s'exécute lentement, l'une des premières choses à vérifier est si vos sources créent suffisamment de fractionnements pour exploiter pleinement le parallélisme. Par exemple, certains types de compression rendent les fichiers en texte brut non séparables. Si vous lisez des fichiers compressés au format gzip, vous remarquerez peut-être que votre pipeline s'exécute beaucoup plus lentement que si vous lisiez des fichiers non compressés ou compressés au format BZIP (qui est séparable). De même, si vous utilisez la source de base de données et que vous l'avez configurée pour n'utiliser qu'une seule division, elle s'exécute beaucoup plus lentement que si vous la configurez pour utiliser plusieurs divisions.
Mélanges
Certains types de plug-ins entraînent un mélange des données dans le cluster. Cela se produit lorsque les enregistrements traités par un exécuteur doivent être envoyés à un autre exécuteur pour effectuer le calcul. Les mélanges sont des opérations coûteuses, car elles impliquent de nombreuses E/S. Les plug-ins qui entraînent le mélange des données s'affichent tous dans la section Analytics de Pipeline Studio. Il s'agit de plug-ins tels que "Grouper par", "Déduplicer", "Distinct" et "Joiner". Par exemple, supposons qu'une étape Grouper par soit ajoutée au pipeline dans l'exemple précédent.
Supposons également que les données lues représentent les achats effectués dans une épicerie.
Chaque enregistrement contient un champ item
et un champ num_purchased
. À l'étape Grouper par, nous configurons le pipeline pour regrouper les enregistrements sur le champ item
et calculer la somme du champ num_purchased
.
Lorsque le pipeline s'exécute, les fichiers d'entrée sont divisés comme décrit précédemment. Ensuite, chaque enregistrement est mélangé dans le cluster de sorte que chaque enregistrement avec le même élément appartienne au même exécuteur.
Comme illustré dans l'exemple précédent, les enregistrements des achats d'Apple étaient initialement répartis entre plusieurs exécuteurs. Pour effectuer l'agrégation, tous ces enregistrements devaient être envoyés au même exécuteur via le cluster.
La plupart des plug-ins nécessitant un mélange vous permettent de spécifier le nombre de partitions à utiliser lors du mélange des données. Ce paramètre contrôle le nombre d'exécuteurs utilisés pour traiter les données mélangées.
Dans l'exemple précédent, si le nombre de partitions est défini sur 2
, chaque exécuteur calcule des agrégats pour deux éléments au lieu d'un.
Notez qu'il est possible de réduire le parallélisme de votre pipeline après cette étape. Prenons l'exemple de la vue logique du pipeline:
Si la source divise les données sur 500 partitions, mais que le regroupement les mélange à l'aide de 200 partitions, le niveau maximal de parallélisme après le regroupement passe de 500 à 200. Au lieu de 500 fichiers de parties différents écrits dans Cloud Storage, vous n'en avez que 200.
Choisir des partitions
Si le nombre de partitions est trop faible, vous n'utiliserez pas toute la capacité de votre cluster pour paralléliser autant de tâches que possible. Définir les partitions trop haut augmente les frais généraux inutiles. En règle générale, il est préférable d'utiliser trop de partitions que pas assez. Les frais supplémentaires sont à prendre en compte si l'exécution de votre pipeline prend quelques minutes et que vous essayez de gagner quelques minutes. Si l'exécution de votre pipeline prend des heures, vous n'avez généralement pas à vous soucier des frais généraux.
Un moyen utile, mais trop simpliste, de déterminer le nombre de partitions à utiliser consiste à le définir sur max(cluster CPUs, input records / 500,000)
. En d'autres termes, prenez le nombre d'enregistrements d'entrée et divisez-le par 500 000. Si ce nombre est supérieur au nombre de processeurs du cluster, utilisez-le pour le nombre de partitions.
Sinon, utilisez le nombre de processeurs du cluster. Par exemple, si votre cluster compte 100 CPU et que l'étape de mélange devrait comporter 100 millions d'enregistrements d'entrée, utilisez 200 partitions.
Une réponse plus complète est que les mélanges fonctionnent mieux lorsque les données de mélange intermédiaires de chaque partition peuvent tenir entièrement dans la mémoire d'un exécuteur, de sorte que rien ne doit être transféré sur le disque. Spark réserve un peu moins de 30% de la mémoire d'un exécuteur pour stocker les données de mélange. Le nombre exact est (mémoire totale - 300 Mo) * 30%. Si nous supposons que chaque exécuteur est configuré pour utiliser 2 Go de mémoire, cela signifie que chaque partition ne doit pas contenir plus de (2 Go - 300 Mo) * 30% = environ 500 Mo d'enregistrements. Si nous supposons que chaque enregistrement est compressé à 1 ko, cela signifie que (500 Mo / partition) / (1 ko/enregistrement) = 500 000 enregistrements par partition. Si vos exécuteurs utilisent plus de mémoire ou si vos enregistrements sont plus petits, vous pouvez ajuster ce nombre en conséquence.
Décalage de données
Notez que dans l'exemple précédent, les achats de différents articles étaient répartis de manière uniforme. Autrement dit, il y a eu trois achats chacun pour les pommes, les bananes, les carottes et les œufs. Le mélange sur une clé répartie uniformément est le type de mélange le plus performant, mais de nombreux ensembles de données ne présentent pas cette propriété. Poursuivons avec l'exemple d'achat d'épicerie de l'exemple précédent. Vous vous attendez à vendre beaucoup plus d'œufs que de cartes de mariage. Lorsque certaines clés de mélange sont beaucoup plus courantes que d'autres, vous devez faire face à des données biaisées. Les données asymétriques peuvent être beaucoup moins performantes que les données non asymétriques, car une quantité disproportionnée de travail est effectuée par un petit nombre d'exécuteurs. Un petit sous-ensemble de partitions est alors beaucoup plus volumineux que tous les autres.
Dans cet exemple, il y a cinq fois plus d'achats d'œufs que d'achats de cartes, ce qui signifie que le calcul de l'agrégation des œufs prend environ cinq fois plus de temps. Cela n'a pas beaucoup d'importance lorsque vous ne travaillez que sur 10 enregistrements au lieu de deux, mais cela fait une grande différence lorsque vous travaillez sur cinq milliards d'enregistrements au lieu d'un milliard. En cas de biais de données, le nombre de partitions utilisées dans un brassage n'a pas d'impact majeur sur les performances du pipeline.
Vous pouvez identifier un décalage des données en examinant le graphique des enregistrements de sortie au fil du temps. Si l'étape génère des enregistrements à un rythme beaucoup plus élevé au début de l'exécution du pipeline, puis ralentit soudainement, cela peut signifier que vos données sont faussées.
Vous pouvez également identifier un décalage des données en examinant l'utilisation de la mémoire du cluster au fil du temps. Si votre cluster est saturé pendant un certain temps, mais que l'utilisation de la mémoire est soudainement faible pendant une période donnée, cela indique également que vous rencontrez un décalage de données.
Les données biaisées ont le plus d'impact sur les performances lors d'une jointure. Plusieurs techniques peuvent être utilisées pour améliorer les performances des jointures biaisées. Pour en savoir plus, consultez la section Traitement parallèle pour les opérations JOIN
.
Réglage adaptatif pour l'exécution
Pour ajuster l'exécution de manière adaptative, spécifiez la plage de partitions à utiliser, et non le numéro de partition exact. Le numéro de partition exact, même s'il est défini dans la configuration du pipeline, est ignoré lorsque l'exécution adaptative est activée.
Si vous utilisez un cluster Dataproc éphémère, Cloud Data Fusion définit automatiquement la configuration appropriée. Toutefois, pour les clusters Dataproc ou Hadoop statiques, vous pouvez définir les deux paramètres de configuration suivants:
spark.default.parallelism
: définissez-le sur le nombre total de vCores disponibles dans le cluster. Cela garantit que votre cluster n'est pas sous-chargé et définit la limite inférieure du nombre de partitions.spark.sql.adaptive.coalescePartitions.initialPartitionNum
: définissez-le sur 32 fois le nombre de vCores disponibles dans le cluster. Il définit la limite supérieure du nombre de partitions.Spark.sql.adaptive.enabled
: pour activer les optimisations, définissez cette valeur surtrue
. Dataproc le définit automatiquement, mais si vous utilisez des clusters Hadoop génériques, vous devez vous assurer qu'il est activé .
Ces paramètres peuvent être définis dans la configuration du moteur d'un pipeline spécifique ou dans les propriétés du cluster d'un cluster Dataproc statique.
Étape suivante
- Découvrez le traitement en parallèle pour les opérations
JOIN
.