Traitement parallèle pour les opérations JOIN

Cette page explique comment optimiser les performances des opérations JOIN dans Cloud Data Fusion.

Les opérations JOIN peuvent représenter la partie la plus coûteuse d'un pipeline. Comme tout le reste d'un pipeline, les opérations sont exécutées en parallèle. La première étape d'une JOIN consiste à brasser les données pour que chaque enregistrement possédant la même clé JOIN soit envoyé au même exécuteur. Une fois toutes les données brassées, elles sont jointes, la sortie continue dans le pipeline.

Exemple de traitement parallèle dans les opérations JOIN

Par exemple, supposons que vous effectuiez une opération JOIN sur des ensembles de données appelés Purchases et Items. Chaque enregistrement d'achat contient le nom et le nombre d'articles achetés. Chaque enregistrement d'article contient le nom et le prix de l'article. Une JOIN est effectuée sur le nom de l'article pour calculer le prix total de chaque achat. Une fois les données jointes, elles sont brassées dans le cluster de sorte que les enregistrements ayant le même ID se retrouvent sur le même exécuteur.

Lorsque les clés JOIN sont réparties de manière assez uniforme, les opérations JOIN fonctionnent correctement, car elles peuvent être exécutées en parallèle.

Comme pour tout brassage, une asymétrie des données aura un impact négatif sur les performances. Au cours de la Par exemple, on achète beaucoup plus d'œufs que de poulet ou de lait, que l'exécuteur qui rejoigne les achats d'œufs effectue plus de travail que l'autre les exécuteurs. Si vous constatez qu'un JOIN est biaisé, deux options s'offrent à vous pour améliorer les performances.

Diviser automatiquement les partitions asymétriques

Avec l'exécution adaptative des requêtes, les biais très importants sont gérés automatiquement. Dès qu'un JOIN produit des partitions beaucoup plus grandes que d'autres, elles sont divisées en partitions plus petites. Pour vérifier que l'exécution des requêtes adaptative est activée, consultez la section Réglage automatique.

Utiliser un JOIN en mémoire

Une JOIN en mémoire peut être effectuée si un côté de la JOIN est suffisamment petit pour tenir en mémoire. Dans ce cas, le petit ensemble de données est chargé en mémoire, puis diffusé auprès de chaque exécuteur. Le jeu de données volumineux n'est pas mélangé au en supprimant les partitions irrégulières générées lors du brassage sur le Clé JOIN.

Dans l'exemple précédent, l'ensemble de données "items" est d'abord chargé dans la mémoire du pilote Spark. Elle est ensuite diffusée à chaque exécuteur. Les exécuteurs peuvent désormais joindre les données sans mélanger l'ensemble de données des achats.

Cette approche nécessite d'allouer suffisamment de mémoire au pilote Spark et aux exécuteurs pour leur permettre de stocker l'ensemble de données de diffusion en mémoire. Par défaut, Spark réserve un peu moins de 30% de sa mémoire au stockage de ce type données. Lorsque vous utilisez des JOIN en mémoire, multipliez la taille de l'ensemble de données par quatre, puis définissez-la en tant que mémoire d'exécuteur et de pilote. Par exemple, si l'ensemble de données des articles était de 1 Go, nous devrions définir la mémoire de l'exécuteur et du pilote à au moins 4 Go. Les ensembles de données de plus de 8 Go ne peuvent pas être chargés en mémoire.

La distribution des clés

Lorsque les deux côtés de JOIN sont trop grands pour tenir en mémoire, un autre permet de diviser chaque clé JOIN en plusieurs clés afin d'augmenter le niveau de parallélisme. Cette technique peut être appliquée à INNER JOIN et LEFT OUTER JOIN. Impossible de l'utiliser pour FULL OUTER JOIN opérations.

Dans cette approche, le côté biaisé est salé avec une nouvelle colonne d'entiers contenant un nombre aléatoire compris entre 1 et N. Le côté non déformé est développé, et chaque ligne existante génère N nouvelles lignes. Une nouvelle colonne est ajoutée à la face éclatée, dont chaque nombre est compris entre 1 et N. Une JOIN normale est ensuite effectuée, à l'exception de la nouvelle colonne ajoutée dans la clé JOIN. De cette manière, toutes les données qui étaient auparavant envoyées vers une seule partition sont désormais réparties sur jusqu'à N partitions différentes.

Dans l'exemple précédent, le facteur de distribution N est défini sur 3. La les jeux de données d'origine sont affichés à gauche. Les versions salés et éclatées du sont affichées au milieu. Les données brassées sont affichées à droite, avec trois exécuteurs différents joignant les achats d'œufs, au lieu d'un.

Un parallélisme plus important est obtenu en augmentant les distributions. Cependant, cela se fait au détriment de l'explosion d'un côté de JOIN, ce qui entraîne un brassage plus important des données dans le cluster. Par conséquent, l'avantage diminue à mesure que la distribution augmente. Dans la plupart des cas, définissez-le sur 20 ou moins.

Étape suivante