Mode de flexibilité améliorée de Dataproc

Le mode de flexibilité améliorée (EFM) de Dataproc gère les données de brassage afin de minimiser les retards de progression de tâche causés par la suppression de nœuds d'un cluster en cours d'exécution. EFM écrit les données de brassage Spark sur les nœuds de calcul primaires. Les nœuds de calcul extraient ces nœuds distants pendant la phase de réduction.

Étant donné que l'outil EFM ne stocke pas de données brassées intermédiaires sur les nœuds de calcul secondaires, il est bien adapté à une utilisation dans des clusters qui utilisent des VM préemptives ou qui n'effectuent l'autoscaling que sur le groupe de nœuds de calcul secondaires.

Limites :

  • Les tâches Apache Hadoop YARN qui ne sont pas compatibles avec la réaffectation d'AppMaster peuvent échouer en mode de flexibilité améliorée (consultez la section Quand attendre la fin d'AppMasters).
  • Le mode de flexibilité améliorée n'est pas recommandé :
    • Sur un cluster ne comportant que des nœuds de calcul principaux
  • Le mode de flexibilité améliorée n'est pas compatible :
    • Lorsque l'autoscaling du nœud de calcul principal est activé. Dans la plupart des cas, les nœuds de calcul principaux continuent de stocker les données de brassage qui ne sont pas automatiquement migrées. La réduction de la capacité du groupe de nœuds de calcul principaux annule les avantages EFM.
    • Lorsque des tâches Spark s'exécutent sur un cluster sur lequel la mise hors service concertée est activée. La mise hors service concertée et l'EFM peuvent fonctionner à des fins croisées, car le mécanisme de mise hors service concertée de YARN maintient les nœuds DÉCOMMISSIONING jusqu'à ce que toutes les applications concernées soient terminées.

Utilisation du mode de flexibilité amélioré

Le mode "Flexibilité améliorée" est configuré par moteur d'exécution et doit être configuré lors de la création du cluster. L'implémentation de EFM Spark est configurée avec la propriété de cluster dataproc:efm.spark.shuffle=primary-worker.

Exemple:Créez un cluster avec brassage du nœud de calcul principal pour Spark:

gcloud dataproc clusters create cluster-name \
    --region=region \
    --properties=dataproc:efm.spark.shuffle=primary-worker \
    --worker-machine-type=n1-highmem-8 \
    --num-workers=25 \
    --num-worker-local-ssds=2 \
    --secondary-worker-type=preemptible \
    --secondary-worker-boot-disk-size=500GB \
    --num-secondary-workers=25

Exemple pour Apache Spark

  1. Exécuter une tâche WordCount sur du texte Shakespeare public à l'aide des exemples Spark au format JAR sur le cluster EFM.
    gcloud dataproc jobs submit spark \
        --cluster=cluster-name \
        --region=region \
        --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
        --class=org.apache.spark.examples.JavaWordCount \
        -- gs://apache-beam-samples/shakespeare/macbeth.txt
    

Configurer des disques SSD locaux pour le brassage de nœuds de calcul principaux

Les mises en œuvre de brassage des nœuds de calcul principaux et HDFS écrivent des données de brassage intermédiaires sur les disques associés aux VM, et bénéficient du débit et des IOPS supplémentaires offerts par les disques SSD locaux. Pour faciliter l'allocation des ressources, ciblez un objectif d'environ une partition SSD locale par 4 processeurs virtuels lors de la configuration des machines de nœud de calcul principal.

Pour associer des disques SSD locaux, transmettez l'option --num-worker-local-ssds à la commande gcloud dataproc clusters create.

Ratio de nœuds de calcul secondaire

Étant donné que les nœuds de calcul secondaires écrivent leurs données brassées sur les nœuds de calcul primaires, votre cluster doit contenir un nombre suffisant de nœuds de calcul primaires avec suffisamment de ressources de processeur, de mémoire et de disque pour gérer la charge aléatoire de votre tâche. Pour les clusters avec autoscaling, pour empêcher le groupe principal d'effectuer un scaling et de provoquer un comportement indésirable, définissez minInstances sur la valeur maxInstances dans la règle d'autoscaling du groupe de nœuds de calcul principal.

Si vous avez un ratio nœuds de calcul principaux/nœud de calculs secondaires élevé (par exemple, 10:1), surveillez l'utilisation du processeur, du réseau et de l'espace disque des nœuds de calcul principaux pour déterminer s'ils sont surchargés. Procédure à suivre :

  1. Accédez à la page Instances de VM de la console Google Cloud.

  2. Cochez la case située à gauche du nœud de calcul principal.

  3. Cliquez sur l'onglet SURVEILLANCE pour afficher l'utilisation du processeur, les IOPS du disque, les octets du réseau et d'autres métriques du nœud de calcul principal.

Si les nœuds de calcul principaux sont surchargés, envisagez d'effectuer un scaling manuel des nœuds de calcul primaires.

Redimensionner le groupe de nœuds de calcul principaux

Le groupe de nœuds de calcul principaux peut faire l'objet d'un scaling à la hausse en toute sécurité, mais sa réduction peut affecter négativement la progression de la tâche. Les opérations qui réduisent le groupe de nœuds de calcul principaux doivent utiliser la mise hors service concertée, qui est activée en définissant l'option --graceful-decommission-timeout.

Clusters avec autoscaling : le scaling du groupe de nœuds de calcul principaux est désactivé sur les clusters EFM avec des stratégies d'autoscaling. Pour redimensionner le groupe de nœuds de calcul principaux sur un cluster avec autoscaling, procédez comme suit :

  1. Désactivez l'autoscaling.

    gcloud dataproc clusters update \
        --cluster=cluster-name \
        --region=region \
        --disable-autoscaling
    

  2. Effectuer un scaling du groupe principal

    gcloud dataproc clusters update \
        --cluster=cluster-name \
        --region=region \
        --num-workers=num-primary-workers \
        --graceful-decommission-timeout=graceful-decommission-timeout # (if downscaling)
    

  3. Réactivez l'autoscaling:

    gcloud dataproc clusters update \
        --cluster=cluster-name \
        --region=region \
        --autoscaling-policy=autoscaling-policy
    

Surveiller l'utilisation du disque de nœuds de calcul principaux

Les nœuds de calcul principaux doivent disposer d'un espace disque suffisant pour les données de brassage du cluster. Vous pouvez surveiller surveiller cela indirectement via la métrique remaining HDFS capacity. Lorsque le disque local se remplit, une partie de l'espace disque devient indisponible pour HDFS et la capacité restante diminue.

Par défaut, lorsque le disque local d'un nœud de calcul principal dépasse 90 % de la capacité, le nœud est marqué comme UNHEALTHY dans l'UI du nœud YARN. Si vous rencontrez des problèmes de capacité de disque, vous pouvez supprimer les données inutilisées de HDFS ou faire évoluer à la hausse le pool de nœuds de calcul principaux.

Notez que les données de brassage intermédiaires ne sont généralement pas nettoyées avant la fin d'une tâche. Lorsque vous utilisez le brassage de nœuds de calcul principaux avec Spark, cela peut prendre jusqu'à 30 minutes après la fin d'une tâche.

Configuration avancée

Partitionnement et parallélisme

Lorsque vous envoyez une tâche MapReduce ou Spark, configurez un niveau de partitionnement approprié. Choisir le nombre de partitions d'entrée et de sortie pour une étape de brassage affecte les caractéristiques de performances différentes. Il est préférable de tester les valeurs qui fonctionnent pour vos formes de tâches.

Partitions d'entrée

Le partitionnement d'entrée MapReduce et Spark est déterminé par l'ensemble de données d'entrée. Lors de la lecture de fichiers depuis Cloud Storage, chaque tâche traite environ une "taille de bloc" de données.

  • Pour les tâches Spark SQL, la taille de partition maximale se contrôle avec la propriété spark.sql.files.maxPartitionBytes. Nous vous conseillons de l'augmenter à 1 Go en spécifiant spark.sql.files.maxPartitionBytes=1073741824.

  • Pour les tâches MapReduce et les RDD Spark, la taille de partition est de 128 Mo par défaut et se contrôle avec la propriété fs.gs.block.size. Nous vous conseillons de l'augmenter à 1 Go. Vous pouvez également définir des propriétés d'InputFormat spécifiques, telles que mapreduce.input.fileinputformat.split.minsize et mapreduce.input.fileinputformat.split.maxsize.

    • Pour les tâches MapReduce : --properties fs.gs.block.size=1073741824
    • Pour les RDD Spark : --properties spark.hadoop.fs.gs.block.size=1073741824

Partitions de sortie

Plusieurs propriétés servent à contrôler le nombre de tâches aux étapes suivantes. Pour les tâches volumineuses traitant plus de 1 To, nous vous conseillons de prévoir au moins 1 Go par partition.

  • Pour les tâches MapReduce, le nombre de partitions de sortie est contrôlé par mapreduce.job.reduces.

  • Pour Spark SQL, le nombre de partitions de sortie est contrôlé par spark.sql.shuffle.partitions.

  • Pour les tâches Spark utilisant l'API RDD, vous pouvez spécifier le nombre de partitions de sortie ou définir spark.default.parallelism.

Réglages du brassage pour le brassage de nœuds de calcul principaux

La propriété la plus importante est --properties yarn:spark.shuffle.io.serverThreads=<num-threads>. Notez qu'il s'agit d'une propriété YARN au niveau du cluster, car le serveur de brassage Spark s'exécute sur le gestionnaire de nœuds. Le nombre de threads par défaut est de deux fois (2x) le nombre de cœurs de la machine (par exemple, 16 threads sur un n1-highmem-8). Si la valeur "Shuffle Read Blocked Time" (durée du blocage en lecture du brassage) est supérieure à une seconde et que les nœuds de calcul principaux n'ont pas atteint les limites de réseau, de processeur ou de disque, nous vous conseillons d'augmenter le nombre de threads du serveur de brassage.

Sur les types de machines plus volumineux, envisagez d'augmenter spark.shuffle.io.numConnectionsPerPeer, dont la valeur par défaut est 1. (Par exemple, définissez-le sur 5 connexions par paire d'hôtes).

Augmenter le nombre de tentatives

Le nombre maximal de tentatives autorisées pour les applications maîtres, les tâches et les étapes peut être configuré en définissant les propriétés suivantes :

yarn:yarn.resourcemanager.am.max-attempts
mapred:mapreduce.map.maxattempts
mapred:mapreduce.reduce.maxattempts
spark:spark.task.maxFailures
spark:spark.stage.maxConsecutiveAttempts

Étant donné que les maîtres et les tâches d'application sont plus souvent arrêtés dans les clusters qui utilisent de nombreuses VM préemptives ou l'autoscaling sans mise hors service concertée, il peut être utile d'augmenter les valeurs des propriétés précédentes dans ces clusters (notez qu'il n'est pas possible d'utiliser EFM avec Spark et la mise hors service concertée).

Mise hors service concertée YARN sur les clusters EFM

La mise hors service concertée YARN peut être utilisée pour supprimer rapidement des nœuds avec un impact minimal sur l'exécution des applications. Pour les clusters d'autoscaling, le délai d'inactivité de la mise hors service concertée peut être défini dans une AutoscalingPolicy associée au cluster EFM.

Améliorations EFM pour la mise hors service concertée

  1. Les données intermédiaires étant stockées dans un système de fichiers distribué, les nœuds peuvent être supprimés d'un cluster EFM dès la fin de l'exécution de tous les conteneurs sur ces nœuds. En comparaison, les nœuds ne sont pas supprimés sur les clusters Dataproc standards tant que l'exécution de l'application n'est pas terminée.

  2. Pour supprimer des nœuds, il n'est pas nécessaire d'attendre la fin de l'exécution des applications maîtres sur un nœud. Lorsque le conteneur de l'application maître est arrêté, il est reprogrammé sur un autre nœud qui n'est pas mis hors service. La progression de la tâche n'est pas perdue : la nouvelle application maître récupère rapidement l'état de l'application maître précédente en lisant l'historique des tâches.