Mode de flexibilité améliorée de Dataproc

Le mode de flexibilité améliorée (EFM) de Dataproc gère les données de brassage afin de minimiser les retards de progression de tâche causés par la suppression de nœuds d'un cluster en cours d'exécution. EFM décharge les données de brassage selon l'un des deux modes sélectionnables par l'utilisateur :

  1. Brassage du nœud de calcul principal. Les mappeurs écrivent des données au niveau des nœuds de calcul principaux. Les nœuds de calcul sont extraits de ces nœuds distants pendant la phase de réduction. Ce mode n'est disponible que pour les tâches Spark et est recommandé pour ces tâches.

  2. Brassage HCFS (système de fichiers compatible Hadoop). Les mappeurs écrivent des données au niveau d'une mise en œuvre HCFS (HDFS par défaut). Comme pour le mode de nœud de calcul principal, seuls les nœuds de calcul principaux participent aux mises en œuvre HDFS et HCFS (si le brassage HCFS utilise le connecteur Cloud Storage, les données sont stockées hors cluster). Ce mode peut être utile pour les tâches avec de petites quantités de données, mais en raison de limites de scaling, il n'est pas recommandé pour les tâches plus volumineuses.

Étant donné que les deux modes EFM ne stockent pas de données de brassage intermédiaires sur les nœuds de calcul secondaires, EFM convient bien aux clusters utilisant des VM préemptives ou procédant uniquement à un autoscaling du groupe de nœuds de calcul secondaires.

Limites :

  • Les tâches Apache Hadoop YARN qui ne sont pas compatibles avec la relocalisation d'AppMaster peuvent échouer dans le mode de flexibilité améliorée (consultez la section Quand attendre qu'AppMasters se termine).
  • Le mode de flexibilité améliorée n'est pas recommandé dans les cas suivants :
    • sur un cluster ne comportant que des nœuds de calcul principaux
    • sur les tâches de streaming, car le nettoyage des données de brassage intermédiaires peut prendre jusqu'à 30 minutes après la fin de la tâche.
  • Le mode de flexibilité améliorée n'est pas compatible :
    • Lorsque l'autoscaling du nœud de calcul principal est activé. Dans la plupart des cas, les nœuds de calcul principaux continuent de stocker les données de brassage qui ne sont pas automatiquement migrées. La réduction de la capacité du groupe de nœuds de calcul principaux annule les avantages EFM.
    • Lorsque les tâches Spark s'exécutent sur un cluster avec mise hors service concertée activée. La mise hors service concertée et l'EFM peuvent être contradictoires, car le mécanisme de mise hors service concertée YARN conserve les nœuds DECOMMISSIONING jusqu'à ce que toutes les applications concernées soient terminées.

Utilisation du mode de flexibilité amélioré

Le mode "Flexibilité améliorée" est configuré par moteur d'exécution et doit être configuré lors de la création du cluster.

  • La mise en œuvre EFM de Spark est configurée avec la propriété de cluster dataproc:efm.spark.shuffle. Valeurs de propriété valides :

    • primary-worker pour le brassage de nœuds de calcul principaux (recommandé)
    • hcfs pour le brassage basé sur HCFS. Ce mode est obsolète et n'est disponible que sur les clusters exécutant la version d'image 1.5. Non recommandé pour les nouveaux workflows.
  • La mise en œuvre de Hadoop MapReduce se configure à l'aide de la propriété de cluster dataproc:efm.mapreduce.shuffle. Valeurs de propriété valides :

    • hcfs

Exemple : Créez un cluster avec le brassage de nœud de calcul principal pour Spark et brassage HCFS pour MapReduce :

gcloud dataproc clusters create cluster-name \
    --region=region \
    --properties=dataproc:efm.spark.shuffle=primary-worker \
    --properties=dataproc:efm.mapreduce.shuffle=hcfs \
    --worker-machine-type=n1-highmem-8 \
    --num-workers=25 \
    --num-worker-local-ssds=2 \
    --secondary-worker-type=preemptible \
    --secondary-worker-boot-disk-size=500GB \
    --num-secondary-workers=25

Exemple pour Apache Spark

  1. Exécutez une tâche WordCount sur du texte Shakespeare public à l'aide du fichier jar d'exemples Spark sur le cluster EFM.
    gcloud dataproc jobs submit spark \
        --cluster=cluster-name \
        --region=region \
        --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
        --class=org.apache.spark.examples.JavaWordCount \
        -- gs://apache-beam-samples/shakespeare/macbeth.txt
    

Exemple pour MapReduce Apache Hadoop

  1. Exécutez une petite tâche teragen afin de générer des données d'entrée dans Cloud Storage pour une tâche terasort ultérieure à l'aide du fichier jar d'exemples mapreduce sur le cluster EFM

    gcloud dataproc jobs submit hadoop \
        --cluster=cluster-name \
        --region=region \
        --jar=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \
        -- teragen 1000 Cloud Storage output URI (for example, gs://terasort/input)
    

  2. Exécuter une tâche Terasort sur les données.

    gcloud dataproc jobs submit hadoop \
        --cluster=cluster-name \
        --region=region \
        --jar=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \
        -- terasort gs://terasort/input gs://terasort/output
    

Configurer des SSD locaux pour le brassage de nœuds de calcul principaux

Les implémentations de brassage des nœuds de calcul principaux et HDFS écrivent des données de brassage intermédiaires sur les disques associés aux VM, et bénéficient du débit et des IOPS supplémentaires offerts par les disques SSD locaux. Pour faciliter l'allocation des ressources, ciblez un objectif d'environ une partition SSD locale par 4 processeurs virtuels lors de la configuration des machines de nœud de calcul principal.

Pour associer des SSD locaux, transmettez l'option --num-worker-local-ssds à la commande gcloud dataproc clusters create.

En général, vous n'avez pas besoin de disques SSD locaux sur les nœuds de calcul secondaires. L'ajout de disques SSD locaux aux nœuds de calcul secondaires d'un cluster (à l'aide de l'option --num-secondary-worker-local-ssds) est souvent moins important, car les nœuds de calcul secondaires n'écrivent pas de données de brassage localement. Toutefois, comme les SSD locaux améliorent les performances des disques locaux, vous pouvez décider d'ajouter des SSD locaux aux nœuds de calcul secondaires si vous prévoyez que les tâches seront liées aux E/S en raison de l'utilisation du disque local: votre tâche utilise une partie importante de l'espace disque local pour l'espace de travail temporaire ou vos partitions sont trop volumineuses pour tenir en mémoire et seront transférées sur le disque.

Ratio de nœuds de calcul secondaire

Étant donné que les nœuds de calcul secondaires écrivent leurs données de brassage dans les nœuds de calcul principaux, votre cluster doit contenir un nombre suffisant de nœuds de calcul principaux disposant de ressources de processeur, de mémoire et de disque suffisantes pour prendre en charge la charge de brassage de votre tâche. Pour les clusters d'autoscaling, pour empêcher le groupe principal de procéder à un scaling et de provoquer un comportement indésirable, définissez minInstances sur la valeur maxInstances dans la stratégie d'autoscaling du groupe de nœuds de calcul principaux.

Si vous avez un ratio nœuds de calcul principaux/nœud de calculs secondaires élevé (par exemple, 10:1), surveillez l'utilisation du processeur, du réseau et de l'espace disque des nœuds de calcul principaux pour déterminer s'ils sont surchargés. Procédez comme suit :

  1. Accédez à la page Instances de VM dans la consoleGoogle Cloud .

  2. Cochez la case située à gauche du nœud de calcul principal.

  3. Cliquez sur l'onglet SURVEILLANCE pour afficher l'utilisation du processeur, les IOPS du disque, les octets du réseau et d'autres métriques du nœud de calcul principal.

Si les nœuds de calcul principaux sont surchargés, envisagez d'effectuer un scaling manuel des nœuds de calcul primaires.

Redimensionner le groupe de nœuds de calcul principaux

Le groupe de nœuds de calcul principaux peut faire l'objet d'un scaling à la hausse en toute sécurité, mais sa réduction peut affecter négativement la progression de la tâche. Les opérations qui réduisent le groupe de nœuds de calcul principaux doivent utiliser la mise hors service concertée, qui est activée en définissant l'option --graceful-decommission-timeout.

Clusters avec autoscaling : le scaling du groupe de nœuds de calcul principaux est désactivé sur les clusters EFM avec des stratégies d'autoscaling. Pour redimensionner le groupe de nœuds de calcul principaux sur un cluster avec autoscaling, procédez comme suit :

  1. Désactivez l'autoscaling.

    gcloud dataproc clusters update \
        --cluster=cluster-name \
        --region=region \
        --disable-autoscaling
    

  2. Effectuez un scaling du groupe principal.

    gcloud dataproc clusters update \
        --cluster=cluster-name \
        --region=region \
        --num-workers=num-primary-workers \
        --graceful-decommission-timeout=graceful-decommission-timeout # (if downscaling)
    

  3. Réactivez l'autoscaling :

    gcloud dataproc clusters update \
        --cluster=cluster-name \
        --region=region \
        --autoscaling-policy=autoscaling-policy
    

Surveiller l'utilisation du disque de nœuds de calcul principaux

Les nœuds de calcul principaux doivent disposer d'un espace disque suffisant pour les données de brassage du cluster. Vous pouvez surveiller surveiller cela indirectement via la métrique remaining HDFS capacity. Lorsque le disque local se remplit, une partie de l'espace disque devient indisponible pour HDFS et la capacité restante diminue.

Par défaut, lorsque le disque local d'un nœud de calcul principal dépasse 90 % de la capacité, le nœud est marqué comme UNHEALTHY dans l'UI du nœud YARN. Si vous rencontrez des problèmes de capacité de disque, vous pouvez supprimer les données inutilisées de HDFS ou faire évoluer à la hausse le pool de nœuds de calcul principaux.

Configuration avancée

Partitionnement et parallélisme

Lorsque vous envoyez une tâche MapReduce ou Spark, configurez un niveau de partitionnement approprié. Choisir le nombre de partitions d'entrée et de sortie pour une étape de brassage affecte les caractéristiques de performances différentes. Il est préférable de tester les valeurs qui fonctionnent pour vos formes de tâches.

Partitions d'entrée

Le partitionnement d'entrée MapReduce et Spark est déterminé par l'ensemble de données d'entrée. Lors de la lecture de fichiers depuis Cloud Storage, chaque tâche traite environ une "taille de bloc" de données.

  • Pour les tâches Spark SQL, la taille de partition maximale se contrôle avec la propriété spark.sql.files.maxPartitionBytes. Nous vous conseillons de l'augmenter à 1 Go en spécifiant spark.sql.files.maxPartitionBytes=1073741824.

  • Pour les tâches MapReduce et les RDD Spark, la taille de partition est de 128 Mo par défaut et se contrôle avec la propriété fs.gs.block.size. Nous vous conseillons de l'augmenter à 1 Go. Vous pouvez également définir des propriétés d'InputFormat spécifiques, telles que mapreduce.input.fileinputformat.split.minsize et mapreduce.input.fileinputformat.split.maxsize.

    • Pour les tâches MapReduce : --properties fs.gs.block.size=1073741824
    • Pour les RDD Spark : --properties spark.hadoop.fs.gs.block.size=1073741824

Partitions de sortie

Plusieurs propriétés servent à contrôler le nombre de tâches aux étapes suivantes. Pour les tâches volumineuses traitant plus de 1 To, nous vous conseillons de prévoir au moins 1 Go par partition.

  • Pour les tâches MapReduce, le nombre de partitions de sortie est contrôlé par mapreduce.job.reduces.

  • Pour Spark SQL, le nombre de partitions de sortie est contrôlé par spark.sql.shuffle.partitions.

  • Pour les tâches Spark utilisant l'API RDD, vous pouvez spécifier le nombre de partitions de sortie ou définir spark.default.parallelism.

Réglages du brassage pour le brassage de nœuds de calcul principaux

La propriété la plus importante est --properties yarn:spark.shuffle.io.serverThreads=<num-threads>. Notez qu'il s'agit d'une propriété YARN au niveau du cluster, car le serveur de brassage Spark s'exécute sur le gestionnaire de nœuds. Le nombre de threads par défaut est de deux fois (2x) le nombre de cœurs de la machine (par exemple, 16 threads sur un n1-highmem-8). Si la valeur "Shuffle Read Blocked Time" (durée du blocage en lecture du brassage) est supérieure à une seconde et que les nœuds de calcul principaux n'ont pas atteint les limites de réseau, de processeur ou de disque, nous vous conseillons d'augmenter le nombre de threads du serveur de brassage.

Sur les types de machines plus volumineux, envisagez d'augmenter spark.shuffle.io.numConnectionsPerPeer, dont la valeur par défaut est 1. (Par exemple, définissez-le sur 5 connexions par paire d'hôtes).

Augmenter le nombre de tentatives

Le nombre maximal de tentatives autorisées pour les applications maîtres, les tâches et les étapes peut être configuré en définissant les propriétés suivantes :

yarn:yarn.resourcemanager.am.max-attempts
mapred:mapreduce.map.maxattempts
mapred:mapreduce.reduce.maxattempts
spark:spark.task.maxFailures
spark:spark.stage.maxConsecutiveAttempts

Comme les applications maîtres et les tâches sont arrêtées plus fréquemment dans les clusters utilisant de nombreuses VM préemptives ou l'autoscaling sans mise hors service concertée, augmenter les valeurs des propriétés ci-dessus dans ces clusters peut aider (notez qu'utiliser EFM avec Spark et la mise hors service concertée n'est pas pris en charge).

Configurer le brassage HCFS pour HDFS

Pour améliorer les performances des brassages étendus, vous pouvez réduire la contention de verrouillage dans le NameNode en définissant dfs.namenode.fslock.fair=false. Notez que cela risque d'affecter les requêtes individuelles, mais peut améliorer le débit à l'échelle du cluster. Pour améliorer encore les performances du NameNode, vous pouvez associer des disques SSD locaux au nœud maître en définissant --num-master-local-ssds. Vous pouvez également ajouter des disques SSD locaux aux nœuds de calcul principaux afin d'améliorer les performances du DataNode en définissant l'option --num-worker-local-ssds.

Autres systèmes de fichiers compatibles Hadoop pour le brassage HCFS

Par défaut, les données de brassage EFM HCFS sont écrites dans HDFS, mais vous pouvez utiliser n'importe quel système de fichiers compatible Hadoop (HCFS). Par exemple, vous pouvez décider d'écrire des données de brassage dans Cloud Storage ou dans le HDFS d'un autre cluster. Pour spécifier un système de fichiers, vous pouvez pointer fs.defaultFS vers le système de fichiers cible lorsque vous envoyez une tâche à votre cluster.

Mise hors service concertée YARN sur les clusters EFM

La mise hors service concertée YARN peut être utilisée pour supprimer rapidement des nœuds avec un impact minimal sur l'exécution des applications. Pour les clusters d'autoscaling, le délai d'inactivité de la mise hors service concertée peut être défini dans une AutoscalingPolicy associée au cluster EFM.

Améliorations apportées à MapReduce EFM pour une mise hors service concertée

  1. Les données intermédiaires étant stockées dans un système de fichiers distribué, les nœuds peuvent être supprimés d'un cluster EFM dès la fin de l'exécution de tous les conteneurs sur ces nœuds. En comparaison, les nœuds ne sont pas supprimés sur les clusters Dataproc standards tant que l'exécution de l'application n'est pas terminée.

  2. Pour supprimer des nœuds, il n'est pas nécessaire d'attendre la fin de l'exécution des applications maîtres sur un nœud. Lorsque le conteneur de l'application maître est arrêté, il est reprogrammé sur un autre nœud qui n'est pas mis hors service. La progression de la tâche n'est pas perdue : la nouvelle application maître récupère rapidement l'état de l'application maître précédente en lisant l'historique des tâches.

Utiliser la mise hors service concertée sur un cluster EFM avec MapReduce

  1. Créez un cluster EFM possédant autant de nœuds de calcul principaux que de de nœuds de calcul secondaires.

    gcloud dataproc clusters create cluster-name \
        --properties=dataproc:efm.mapreduce.shuffle=hcfs \
        --region=region \
        --num-workers=5 \
        --num-secondary-workers=5
    

  2. Exécutez une tâche mapreduce qui calcule la valeur de pi en utilisant le fichier jar d'exemples mapreduce sur le cluster.

    gcloud dataproc jobs submit hadoop \
        --cluster=cluster-name \
        --region=region \
        --jar=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \
        -- pi 1000 10000000
    

  3. Pendant l'exécution de la tâche, réduisez le cluster en utilisant une mise hors service concertée.

    gcloud dataproc clusters update cluster-name \
        --region=region \
        --num-secondary-workers=0 \
        --graceful-decommission-timeout=1h
    
    Les nœuds seront supprimés du cluster rapidement avant la fin de la tâche tout en minimisant la perte de la progression de la tâche. Des pauses temporaires de la progression de la tâche peuvent survenir pour les raisons suivantes:

    • Basculement du maître d'application Si la progression de la tâche passe à 0 % et saute immédiatement à la valeur de pré-largage, il se peut que l'application maître se soit arrêtée et qu'une nouvelle application maître ait récupéré son état. Cela ne devrait pas avoir d'incidence importante sur la progression de la tâche, car le basculement est rapide.
    • Préemption de VM. Étant donné que HDFS ne conserve que les résultats de tâches de mappage complets et non partiels, des pauses temporaires de progression des tâches peuvent survenir lorsqu'une VM est préemptée lors de l'exécution d'une tâche de mappage.

Pour accélérer la suppression des nœuds, vous pouvez réduire le cluster sans mise hors service concertée en omettant l'indicateur --graceful-decommission-timeout dans l'exemple de commande gcloud précédent. La progression des tâches de mappage terminées sera conservée, mais les résultats partiels seront perdus (les tâches de mappage seront exécutées à nouveau).