Traitement en parallèle

Les pipelines sont exécutés sur des clusters de machines. Elles atteignent un débit élevé diviser le travail qui doit être effectué, puis exécuter le travail dans en parallèle sur les différents exécuteurs répartis dans le cluster. En général, plus le nombre de divisions (également appelées partitions) est élevé, plus la le pipeline de ML peut être exécuté. Le niveau de parallélisme de votre pipeline est déterminé par les sources et les étapes de brassage de celui-ci.

Sources

Au début de chaque exécution de pipeline, chaque source du pipeline calcule les données doivent être lues et comment ces données peuvent être divisées. Pour Prenons l'exemple d'un pipeline de base qui lit des données depuis Cloud Storage, effectue des transformations Wrangler, puis écrit Cloud Storage.

Pipeline de base montrant la source Cloud Storage, la transformation Wrangler et le récepteur Cloud Storage

Lorsque le pipeline démarre, la source Cloud Storage examine l'entrée et les divise en plusieurs parties selon leur taille. Par exemple, un d'un gigaoctet peut être divisé en 100 partitions de 10 Mo chacune la taille de l'image. Chaque exécuteur lit les données de cette division, exécute Wrangler transformations, puis écrit la sortie dans un fichier part.

Partitionnement des données dans Cloud Storage en transformations Wrangler parallèles dans des fichiers partiels

Si votre pipeline fonctionne lentement, vous devez d'abord vérifier si vos sources créent suffisamment de divisions pour tirer pleinement parti du parallélisme. Par exemple, certains types de compression rendent les fichiers en texte brut impossibles à diviser. Si vous lisent des fichiers compressés avec gzip, vous remarquerez peut-être que votre pipeline s'exécute beaucoup plus lentement que si vous lisiez des fichiers non compressés ou des fichiers compressé avec BZIP (qui peut être divisé). De même, si vous utilisez la classe de base de données et que vous l'avez configuré pour n'utiliser qu'une seule division, il exécute plus lent que si vous le configurez pour utiliser plus de divisions.

Lectures en mode aléatoire

Certains types de plug-ins entraînent le brassage des données dans le cluster. Ce se produit lorsque les enregistrements en cours de traitement par un exécuteur doivent être envoyés à un autre exécuteur pour effectuer le calcul. Les mélanges sont des opérations coûteuses, car elles impliquent de nombreuses E/S. Les plug-ins qui permettent de mélanger les données apparaissent tous dans la section Analyse de Pipeline Studio. Il s'agit notamment de plug-ins tels que "Grouper par", "Déduplicer", "Distinct" et "Joiner". Par exemple, supposons qu'un opérateur Group by (Grouper par) est ajoutée au pipeline dans l'exemple précédent.

Supposons également que les données lues représentent des achats effectués dans une épicerie. Chaque enregistrement contient un champ item et un champ num_purchased. Dans le groupe Par étape, nous configurons le pipeline pour regrouper les enregistrements dans le champ item et calculer la somme du champ num_purchased.

Lorsque le pipeline s'exécute, les fichiers d'entrée sont divisés comme décrit précédemment. Après chaque enregistrement est brassé dans le cluster de sorte que chaque enregistrement ayant même élément appartient au même exécuteur.

Comme illustré dans l'exemple précédent, les enregistrements d'achats de pommes ont été enregistrés s’étendait à l’origine entre plusieurs exécuteurs. Pour effectuer l'agrégation, de ces enregistrements devaient être envoyés sur le cluster vers le même exécuteur.

La plupart des plug-ins nécessitant un brassage vous permettent de spécifier le nombre de partitions à utiliser lors du brassage des données. Cela contrôle le nombre d’exécuteurs utilisés pour traiter les données brassées.

Dans l'exemple précédent, si le nombre de partitions est défini sur 2, chaque exécuteur calcule les agrégations pour deux éléments au lieu d'un.

Notez qu'il est possible de réduire le parallélisme de votre pipeline après cette étape. Prenons l'exemple de la vue logique du pipeline:

Si la source divise les données sur 500 partitions, mais que le regroupement les mélange à l'aide de 200 partitions, le niveau maximal de parallélisme après le regroupement passe de 500 à 200. Au lieu de 500 fichiers de parties différents écrits dans Cloud Storage, vous n'en avez que 200.

Choisir des partitions

Si le nombre de partitions est trop faible, vous n'utiliserez pas toute la capacité votre cluster pour charger en parallèle le plus de travail possible. Définir également les partitions élevée augmente la quantité de frais généraux inutiles. En général, il est préférable de utilisent trop de partitions que trop peu. Les frais supplémentaires sont quelque chose à inquiéter si l'exécution de votre pipeline prend quelques minutes et que vous essayez de réduire quelques minutes. Si l'exécution de votre pipeline prend des heures, vous n'avez généralement pas à vous soucier des frais généraux.

Un moyen utile, mais trop simpliste, de déterminer le nombre de partitions à utiliser consiste à le définir sur max(cluster CPUs, input records / 500,000). Dans d'autres mots, prenez le nombre d'enregistrements d'entrée et divisez-les par 500 000. Si ce nombre est supérieur au nombre de processeurs du cluster, utilisez cette valeur pour le nombre de partitions. Sinon, utilisez le nombre de processeurs du cluster. Par exemple, si votre cluster comporte 100 processeurs et l'étape de brassage devrait compter 100 millions d'entrées utilisez 200 partitions.

Une réponse plus complète est que les brassages fonctionnent mieux lorsque les niveaux intermédiaires les données de brassage pour chaque partition peuvent tenir entièrement dans la mémoire d'un exécuteur, afin que rien n'a besoin d'être renversé sur le disque. Spark réserve un peu moins de 30 % la mémoire de l'exécuteur pour contenir les données de brassage. Le nombre exact est (mémoire totale - 300 Mo) x 30%. Si nous supposons que chaque exécuteur est configuré pour utiliser 2 Go de mémoire, cela signifie que chaque partition ne doit pas contenir plus de (2 Go - 300 Mo) * 30 % = environ 500 Mo d'enregistrements. Si nous supposons que chaque enregistrement se compresse à 1 Ko, cela signifie (500 Mo / partition) / (1 Ko / record) = 500 000 enregistrements par partition. Si vos exécuteurs utilisent plus mémoire ou si vos enregistrements sont plus petits, vous pouvez ajuster ce nombre en conséquence.

Décalage de données

Notez que dans l'exemple précédent, les achats de différents articles ont été uniformément distribué. Autrement dit, il y a eu trois achats chacun pour les pommes, les bananes, les carottes et les œufs. Le brassage sur une clé distribuée uniformément est l'approche la plus performante de brassage, mais de nombreux ensembles de données ne possèdent pas cette propriété. Poursuivez un achat dans une épicerie dans l'exemple précédent, vous vous attendez à recevoir plus d'achats d'œufs que de cartes de mariage. Lorsque des éléments aléatoires beaucoup plus courantes que d'autres, vous devez faire face à des cas données. Des données asymétriques peuvent être nettement moins performantes que des données non asymétriques, car une une quantité disproportionnée de travail est effectuée par une petite poignée de les exécuteurs. Il en résulte qu'un petit sous-ensemble de partitions est beaucoup plus volumineux que toutes les autres.

Dans cet exemple, il y a cinq fois plus d'achats d'œufs que d'achats par carte, ce qui signifie que le calcul de l'agrégat d'œufs prend environ cinq fois plus de temps. Il importe peu lorsqu'il s'agit de 10 enregistrements au lieu de deux, fait une grande différence lorsqu'il s'agit de traiter cinq milliards d'enregistrements au lieu d'un milliards. En cas de décalage des données, le nombre de partitions utilisées dans un brassage n'a pas un impact important sur les performances du pipeline.

Vous pouvez identifier un décalage de données en examinant le graphique pour y rechercher des enregistrements de sortie au fil du temps. Si cette étape génère des enregistrements à un rythme beaucoup plus élevé au début l'exécution du pipeline, puis ralentit soudainement, ce qui peut indiquer que vos données sont biaisées.

Vous pouvez également identifier un décalage de données en examinant l'utilisation de la mémoire du cluster au fil du temps. Si votre cluster est saturé depuis un certain temps, mais présente soudainement une faible utilisation de la mémoire une période, c'est aussi le signe que vous faites face à un décalage des données.

Les données asymétriques ont un impact plus important sur les performances lorsqu'une jointure est en cours d'exécution. Quelques techniques peuvent être utilisées pour améliorer les performances pour les jointures asymétriques. Pour en savoir plus, consultez Traitement parallèle pour les opérations JOIN.

Réglage adaptatif pour l'exécution

Pour ajuster l'exécution de manière adaptative, spécifiez la plage de partitions à utiliser, et non la le numéro de partition exact. Le numéro de partition exact, même s'il est défini dans le pipeline est ignorée lorsque l'exécution adaptative est activée.

Si vous utilisez un cluster Dataproc éphémère, Cloud Data Fusion définit automatiquement la configuration appropriée, mais pour les données Clusters Dataproc ou Hadoop, les deux configurations suivantes paramètres peuvent être définis:

  • spark.default.parallelism: définissez-le sur le nombre total de vCore disponibles. dans le cluster. Cela garantit que votre cluster n'est pas sous-chargé et définit le limite inférieure pour le nombre de partitions.
  • spark.sql.adaptive.coalescePartitions.initialPartitionNum : définissez-le sur 32 fois le nombre de vCores disponibles dans le cluster. Il s'agit de la valeur supérieure est limité au nombre de partitions.
  • Spark.sql.adaptive.enabled: pour activer les optimisations, définissez cette valeur sur true Dataproc le définit automatiquement, mais si vous utilisez des clusters Hadoop génériques, vous devez vous assurer qu'il est activé .

Ces paramètres peuvent être définis dans la configuration du moteur d'un ou dans les propriétés du cluster d'une instance Dataproc cluster.

Étape suivante