Traitement parallèle pour les opérations JOIN

Cette page explique le réglage des performances pour les opérations JOIN dans Cloud Data Fusion.

Les opérations JOIN peuvent être la partie la plus coûteuse d'un pipeline. Comme toutes les autres opérations d'un pipeline, les opérations sont exécutées en parallèle. La première étape d'une opération JOIN consiste à brasser les données afin que chaque enregistrement ayant la même clé JOIN soit envoyé au même exécuteur. Une fois toutes les données brassées, elles sont jointes et la sortie continue dans le pipeline.

Exemple de traitement parallèle dans les opérations JOIN

Par exemple, supposons que vous effectuiez une opération JOIN sur des ensembles de données appelés Purchases et Items. Chaque enregistrement d'achat contient un nom d'article et un numéro acheté. Chaque enregistrement d'article contient le nom et le prix de cet article. Une JOIN est effectuée sur le nom de l'article pour calculer le prix total de chaque achat. Lorsque les données sont jointes, elles sont brassées dans le cluster de sorte que les enregistrements ayant le même ID se retrouvent sur le même exécuteur.

Lorsque les clés JOIN sont réparties de manière assez équitable, les opérations JOIN sont efficaces, car elles peuvent être exécutées en parallèle.

Comme tout brassage, le décalage des données a un impact négatif sur les performances. Dans l'exemple précédent, les œufs sont achetés beaucoup plus fréquemment que le poulet ou le lait, ce qui signifie que l'exécuteur qui rejoigne les achats d'œufs effectue plus de travail que les autres exécuteurs. Si vous remarquez qu'une JOIN est asymétrique, deux options s'offrent à vous pour améliorer les performances.

Diviser automatiquement les partitions asymétriques

L'exécution adaptative des requêtes permet de gérer automatiquement les décalages très importants. Dès qu'un JOIN produit certaines partitions beaucoup plus grandes que d'autres, elles sont divisées en partitions plus petites. Pour vérifier que l'exécution des requêtes adaptative est activée, consultez la section Réglage automatique.

Utiliser un JOIN en mémoire

Une JOIN en mémoire peut être effectuée si un côté de l'JOIN est suffisamment petit pour tenir en mémoire. Dans ce cas, le petit ensemble de données est chargé en mémoire, puis diffusé à chaque exécuteur. L'ensemble de données volumineux n'est pas du tout mélangé, ce qui supprime les partitions irrégulières générées lors du brassage sur la clé JOIN.

Dans l'exemple précédent, l'ensemble de données "items" est d'abord chargé dans la mémoire du pilote Spark. Elle est ensuite diffusée à chaque exécuteur. Les exécuteurs peuvent désormais joindre les données sans brassage de l'ensemble de données acheté.

Cette approche nécessite de fournir suffisamment de mémoire au pilote Spark et aux exécuteurs pour leur permettre de stocker l'ensemble de données de diffusion en mémoire. Par défaut, Spark réserve un peu moins de 30% de sa mémoire pour le stockage de ce type de données. Lorsque vous utilisez des JOIN en mémoire, multipliez la taille de l'ensemble de données par quatre, puis définissez cette valeur en tant que mémoire d'exécuteur et de pilote. Par exemple, si la taille de l'ensemble de données "items" est de 1 Go, nous devons définir la mémoire de l'exécuteur et du pilote sur au moins 4 Go. Les ensembles de données de plus de 8 Go ne peuvent pas être chargés en mémoire.

La distribution des clés

Lorsque les deux côtés de la JOIN sont trop volumineux pour tenir en mémoire, une technique différente peut être utilisée pour décomposer chaque clé JOIN en plusieurs clés afin d'augmenter le niveau de parallélisme. Cette technique peut être appliquée aux opérations INNER JOIN et LEFT OUTER JOIN. Il ne peut pas être utilisé pour les opérations FULL OUTER JOIN.

Dans cette approche, le côté asymétrique est salé avec une nouvelle colonne d'entiers avec un nombre aléatoire compris entre 1 et N. Le côté non asymétrique est éclaté, chaque ligne existante générant N nouvelles lignes. Une nouvelle colonne est ajoutée à la partie éclatée, contenant chaque chiffre compris entre 1 et N. Une opération JOIN normale est ensuite effectuée, sauf que la nouvelle colonne est ajoutée à la clé JOIN. De cette manière, toutes les données qui étaient associées à une même partition sont désormais réparties dans un maximum de N partitions différentes.

Dans l'exemple précédent, le facteur de distribution N est défini sur 3. Les ensembles de données d'origine sont affichés à gauche. Les versions salés et éclatées du jeu de données sont affichées au milieu. Les données brassées sont affichées à droite, avec trois exécuteurs différents joignant des achats d'œufs, au lieu d'un.

Pour améliorer ce parallélisme, il faut augmenter les distributions. Cependant, cela se fait au prix d'une explosion d'un côté de JOIN, ce qui se traduit par une quantité plus importante de données brassées dans le cluster. Pour cette raison, les avantages diminuent à mesure que la distribution augmente. Dans la plupart des cas, définissez-la sur 20 ou moins.

Étapes suivantes