Traitement parallèle pour les opérations JOIN

Cette page explique le réglage des performances pour les opérations JOIN dans Cloud Data Fusion.

Les opérations JOIN peuvent être la partie la plus coûteuse d'un pipeline. Comme tout les opérations sont exécutées en parallèle. La première étape d'une JOIN consiste à brasser les données pour que chaque enregistrement possédant la même clé JOIN soit envoyé au même exécuteur. Une fois toutes les données brassées, elles sont jointes, la sortie continue dans le pipeline.

Exemple de traitement parallèle dans les opérations JOIN

Par exemple, supposons que vous effectuiez une opération JOIN sur des ensembles de données appelés Purchases et Items. Chaque enregistrement d'achat contient un nom et un numéro d'article acheté. Chaque enregistrement d'article contient le nom et le prix de cet article. A La fonction JOIN est appliquée au nom de l'article pour calculer le prix total de chaque à l'achat. Une fois les données jointes, elles sont brassées dans le cluster de sorte que les enregistrements ayant le même ID se retrouvent sur le même exécuteur.

Lorsque les clés JOIN sont réparties de manière assez équitable, les opérations JOIN exécutent car ils peuvent être exécutés en parallèle.

Comme tout brassage, le décalage des données a un impact négatif sur les performances. Au cours de la Par exemple, on achète beaucoup plus d'œufs que de poulet ou de lait, que l'exécuteur qui rejoigne les achats d'œufs effectue plus de travail que l'autre les exécuteurs. Si vous remarquez qu'une JOIN est asymétrique, vous pouvez l'améliorer de deux façons. des performances.

Diviser automatiquement les partitions asymétriques

L'exécution adaptative des requêtes permet de gérer automatiquement les décalages très importants. Dès qu'un JOIN génère des partitions beaucoup plus volumineuses que d'autres, il est divisé en plusieurs plus petits. Pour vérifier que l'exécution des requêtes adaptative est activée, consultez la section Réglage automatique.

Utiliser un JOIN en mémoire

Une opération JOIN en mémoire peut être effectuée si un côté de l'élément JOIN est suffisamment petit pour tenir en mémoire. Dans cette situation, le petit jeu de données est chargé en mémoire, puis est diffusé à chaque exécuteur. Le jeu de données volumineux n'est pas mélangé au en supprimant les partitions irrégulières générées lors du brassage sur le Clé JOIN.

Dans l'exemple précédent, l'ensemble de données "items" est d'abord chargé dans la mémoire du pilote Spark. Elle est ensuite diffusée à chaque exécuteur. Les exécuteurs peuvent désormais rejoindre le sans effectuer de brassage dans l'ensemble de données des achats.

Cette approche nécessite de fournir suffisamment de mémoire au pilote Spark exécuteurs leur permettant de stocker l'ensemble de données de diffusion en mémoire. Par défaut, Spark réserve un peu moins de 30% de sa mémoire au stockage de ce type données. Lorsque vous utilisez des JOIN en mémoire, multipliez la taille de l'ensemble de données par quatre, puis définissez-la en tant que mémoire d'exécuteur et de pilote. Par exemple, si l'ensemble de données "items" avait une taille de 1 Go, nous devions régler la mémoire de l'exécuteur et du pilote sur d'au moins 4 Go. Les ensembles de données de plus de 8 Go ne peuvent pas être chargés en mémoire.

La distribution des clés

Lorsque les deux côtés de JOIN sont trop grands pour tenir en mémoire, un autre permet de diviser chaque clé JOIN en plusieurs clés afin d'augmenter le niveau de parallélisme. Cette technique peut être appliquée à INNER JOIN et LEFT OUTER JOIN. Impossible de l'utiliser pour FULL OUTER JOIN operations.

Dans cette approche, le côté asymétrique est salé avec une nouvelle colonne d'entiers avec un un nombre aléatoire compris entre 1 et N. Le côté non incliné est éclaté, chaque ligne existante N nouvelles lignes générées. Une nouvelle colonne est ajoutée à la face éclatée, dont chaque chiffre est compris entre 1 et N. Une opération JOIN normale est alors effectuée, à l'exception est ajoutée avec la clé JOIN. De cette façon, toutes les données qui ont utilisé pour accéder à une seule partition est maintenant réparti entre N partitions différentes.

Dans l'exemple précédent, le facteur de distribution N est défini sur 3. La les jeux de données d'origine sont affichés à gauche. Les versions salés et éclatées du sont affichées au milieu. Les données brassées sont affichées à droite, avec trois exécuteurs différents rejoignant les achats d'œufs, au lieu d'un.

Pour améliorer ce parallélisme, il faut augmenter les distributions. Cependant, cela vient au prix d'une explosion d'un côté de JOIN, ce qui entraîne une augmentation du brassage des données dans l'ensemble du cluster. Pour cette raison, l'avantage diminue à mesure que la distribution augmente. Dans la plupart des cas, définissez-la sur 20 ou moins.

Étape suivante