Cette page explique comment optimiser les performances des opérations JOIN
dans Cloud Data Fusion.
Les opérations JOIN
peuvent représenter la partie la plus coûteuse d'un pipeline. Comme tout le reste d'un pipeline, les opérations sont exécutées en parallèle. La première étape d'une JOIN
consiste à mélanger les données afin que chaque enregistrement ayant la même clé JOIN
soit envoyé au même exécuteur. Une fois toutes les données brassées, elles sont jointes et la sortie continue dans le pipeline.
Exemple de traitement parallèle dans les opérations JOIN
Par exemple, supposons que vous effectuiez une opération JOIN
sur des ensembles de données appelés Purchases
et Items
. Chaque enregistrement d'achat contient le nom et le nombre d'articles achetés. Chaque enregistrement d'article contient le nom et le prix de l'article. Une JOIN
est effectuée sur le nom de l'article pour calculer le prix total de chaque achat. Lorsque les données sont jointes, elles sont mélangées dans le cluster de sorte que les enregistrements ayant le même ID se retrouvent sur le même exécuteur.
Lorsque les clés JOIN
sont réparties de manière assez uniforme, les opérations JOIN
fonctionnent correctement, car elles peuvent être exécutées en parallèle.
Comme pour tout brassage, une asymétrie des données aura un impact négatif sur les performances. Dans l'exemple précédent, les œufs sont achetés beaucoup plus fréquemment que le poulet ou le lait, ce qui signifie que l'exécuteur qui joint les achats d'œufs effectue plus de travail que les autres exécuteurs. Si vous constatez qu'un JOIN
est biaisé, deux options s'offrent à vous pour améliorer les performances.
Diviser automatiquement les partitions déséquilibrées
Avec l'exécution adaptative des requêtes, les biais très importants sont gérés automatiquement.
Dès qu'un JOIN
produit des partitions beaucoup plus grandes que d'autres, elles sont divisées en partitions plus petites. Pour vérifier que l'exécution adaptative des requêtes est activée, consultez la section Autotuning (Autotuning).
Utiliser un JOIN
en mémoire
Une JOIN
en mémoire peut être effectuée si un côté de la JOIN
est suffisamment petit pour tenir en mémoire. Dans ce cas, le petit ensemble de données est chargé en mémoire, puis diffusé auprès de chaque exécuteur. L'ensemble de données volumineux n'est pas mélangé du tout, ce qui élimine les partitions inégales générées lors du mélange sur la clé JOIN
.
Dans l'exemple précédent, l'ensemble de données "items" est d'abord chargé dans la mémoire du pilote Spark. Il est ensuite diffusé auprès de chaque exécuteur. Les exécuteurs peuvent désormais joindre les données sans mélanger l'ensemble de données des achats.
Cette approche vous oblige à attribuer suffisamment de mémoire au pilote Spark et aux exécuteurs pour leur permettre de stocker l'ensemble de données de diffusion en mémoire. Par défaut, Spark réserve un peu moins de 30% de sa mémoire pour stocker ce type de données. Lorsque vous utilisez des JOIN
en mémoire, multipliez la taille de l'ensemble de données par quatre et définissez-la comme mémoire de l'exécuteur et du pilote. Par exemple, si l'ensemble de données des articles était de 1 Go, nous devrions définir la mémoire de l'exécuteur et du pilote à au moins 4 Go. Les ensembles de données de plus de 8 Go ne peuvent pas être chargés en mémoire.
La distribution des clés
Lorsque les deux côtés de JOIN
sont trop volumineux pour tenir en mémoire, une autre technique peut être utilisée pour diviser chaque clé JOIN
en plusieurs clés afin d'augmenter le niveau de parallélisme. Cette technique peut être appliquée aux opérations INNER JOIN
et LEFT OUTER JOIN
. Il ne peut pas être utilisé pour les opérations FULL OUTER JOIN
.
Dans cette approche, le côté biaisé est salé avec une nouvelle colonne d'entiers contenant un nombre aléatoire compris entre 1 et N. Le côté non déformé est développé, et chaque ligne existante génère N
nouvelles lignes. Une nouvelle colonne est ajoutée au côté développé, remplie de chaque chiffre compris entre 1 et N. Une JOIN
normale est ensuite effectuée, à l'exception de la nouvelle colonne ajoutée dans la clé JOIN
. De cette manière, toutes les données qui étaient auparavant envoyées vers une seule partition sont désormais réparties sur jusqu'à N
partitions différentes.
Dans l'exemple précédent, le facteur de distribution N
est défini sur 3
. Les ensembles de données d'origine sont affichés sur la gauche. Les versions salées et éclatées de l'ensemble de données sont affichées au milieu. Les données mélangées sont affichées à droite, avec trois exécuteurs différents qui rejoignent les achats d'œufs au lieu d'un.
Un parallélisme plus important est obtenu en augmentant les distributions. Cependant, cela se fait au détriment de l'explosion d'un côté de l'JOIN
, ce qui entraîne un brassage plus important des données dans le cluster. Par conséquent, l'avantage diminue à mesure que la distribution augmente. Dans la plupart des cas, définissez-le sur 20 ou moins.
Étape suivante
- En savoir plus sur le traitement en parallèle dans Cloud Data Fusion