Régler l'autoscaling horizontal pour les pipelines de traitement par flux

Dans les pipelines de traitement par flux avec un volume élevé de données d'entrée, il existe généralement un compromis entre coût et latence. Pour conserver une latence faible, Dataflow doit ajouter des nœuds de calcul à mesure que le volume de trafic augmente. Un autre facteur est la vitesse à laquelle le pipeline doit effectuer un scaling à la hausse ou à la baisse en réponse aux modifications du débit de données d'entrée.

L'autoscaler Dataflow dispose de paramètres par défaut qui conviennent à de nombreuses charges de travail. Toutefois, vous souhaiterez peut-être ajuster ce comportement en fonction de votre scénario. Par exemple, une latence moyenne plus élevée peut être acceptable pour réduire les coûts, mais vous pouvez également souhaiter que Dataflow effectue un scaling à la hausse plus rapidement afin de répondre aux pics de trafic.

Pour optimiser l'autoscaling horizontal, vous pouvez ajuster les paramètres suivants :

Définir la plage d'autoscaling

Lorsque vous créez un job de traitement par flux, vous pouvez définir le nombre initial de nœuds de calcul et le nombre maximal de nœuds de calcul. Pour ce faire, spécifiez les options de pipeline suivantes :

Java

  • --numWorkers : nombre initial de nœuds de calcul disponibles au début de l'exécution du pipeline
  • --maxNumWorkers : nombre maximal de nœuds de calcul disponibles pour votre pipeline

Python

  • --num_workers : nombre initial de nœuds de calcul disponibles au début de l'exécution du pipeline
  • --max_num_workers : nombre maximal de nœuds de calcul disponibles pour votre pipeline

Accéder

  • --num_workers : nombre initial de nœuds de calcul disponibles au début de l'exécution du pipeline
  • --max_num_workers : nombre maximal de nœuds de calcul disponibles pour votre pipeline

Pour les jobs de traitement par flux qui utilisent Streaming Engine, l'option --maxNumWorkers est facultative. La valeur par défaut est 100. Pour les jobs de traitement par flux qui n'utilisent pas Streaming Engine, --maxNumWorkers est requis lorsque l'autoscaling horizontal est activé.

La valeur de départ de --maxNumWorkers détermine également le nombre de disques persistants alloués au job. Les pipelines sont déployés avec un pool fixe de disques persistants, d'un nombre égal à --maxNumWorkers. Lors du streaming, les disques persistants sont redistribués de manière à ce que chaque nœud de calcul ait un nombre égal de disques associés.

Si vous définissez --maxNumWorkers, assurez-vous que la valeur fournit suffisamment de disques pour votre pipeline. Tenez compte de la croissance future lors de la définition de la valeur initiale. Pour en savoir plus sur les performances de Persistent Disk, consultez la section Configurer Persistent Disk et vos VM. Dataflow facture l'utilisation de Persistent Disk et dispose de quotas Compute Engine, y compris de quotas Persistent Disk.

Par défaut, le nombre minimal de nœuds de calcul est de 1 pour les jobs de traitement par flux qui utilisent Streaming Engine et de (maxNumWorkers/15) arrondi pour les jobs qui n'utilisent pas Streaming Engine.

Mettre à jour la plage d'autoscaling

Pour les jobs qui utilisent Streaming Engine, vous pouvez ajuster le nombre minimal et maximal de nœuds de calcul, sans arrêter ni remplacer le job. Pour ajuster ces valeurs, utilisez une mise à jour de job en cours. Mettez à jour les options de tâches suivantes :

  • --min-num-workers : nombre minimal de nœuds de calcul.
  • --max-num-workers : nombre maximal de nœuds de calcul.

gcloud

Exécutez la commande gcloud dataflow jobs update-options :

gcloud dataflow jobs update-options \
  --region=REGION \
  --min-num-workers=MINIMUM_WORKERS \
  --max-num-workers=MAXIMUM_WORKERS \
  JOB_ID

Remplacez les éléments suivants :

  • REGION : ID de région du point de terminaison régional de la tâche.
  • MINIMUM_WORKERS : nombre minimal d'instances Compute Engine.
  • MAXIMUM_WORKERS : nombre maximal d'instances Compute Engine.
  • JOB_ID : ID de la tâche à mettre à jour.

Vous pouvez également mettre à jour --min-num-workers et --max-num-workers individuellement.

REST

Utilisez la méthode projects.locations.jobs.update :

PUT https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs/JOB_ID?updateMask=runtime_updatable_params.max_num_workers,runtime_updatable_params.min_num_workers
{
  "runtime_updatable_params": {
    "min_num_workers": MINIMUM_WORKERS,
    "max_num_workers": MAXIMUM_WORKERS
  }
}

Remplacez les éléments suivants :

  • PROJECT_ID : ID de projet Google Cloud du job Dataflow
  • REGION : ID de région du point de terminaison régional de la tâche.
  • JOB_ID : ID de la tâche à mettre à jour.
  • MINIMUM_WORKERS : nombre minimal d'instances Compute Engine.
  • MAXIMUM_WORKERS : nombre maximal d'instances Compute Engine.

Vous pouvez également mettre à jour min_num_workers et max_num_workers individuellement. Spécifiez les paramètres à mettre à jour dans le paramètre de requête updateMask et incluez les valeurs mises à jour dans le champ runtimeUpdatableParams du corps de la requête. L'exemple suivant met à jour min_num_workers :

PUT https://dataflow.googleapis.com/v1b3/projects/my_project/locations/us-central1/jobs/job1?updateMask=runtime_updatable_params.min_num_workers
{
  "runtime_updatable_params": {
    "min_num_workers": 5
  }
}

Pour les jobs qui n'utilisent pas Streaming Engine, vous pouvez remplacer le job existant par une valeur mise à jour de maxNumWorkers.

Si vous mettez à jour un job de traitement par flux qui n'utilise pas Streaming Engine, l'autoscaling horizontal est désactivé par défaut sur le job mis à jour. Pour maintenir l'autoscaling activé, spécifiez --autoscalingAlgorithm et --maxNumWorkers pour le job mis à jour.

Définir l'optimisation de l'utilisation des nœuds de calcul

Dataflow utilise l'utilisation moyenne du processeur comme signal pour savoir quand appliquer l'autoscaling horizontal. Par défaut, Dataflow définit une utilisation du processeur cible de 0,8. Lorsque l'utilisation se situe en dehors de cette plage, Dataflow peut ajouter ou supprimer des nœuds de calcul.

Pour un contrôle plus précis du comportement de l'autoscaling, vous pouvez définir l'objectif d'utilisation du processeur sur une valeur comprise dans la plage [0,1 - 0,9].

  • Définissez une valeur d'utilisation du processeur plus faible si vous souhaitez obtenir des latences maximales plus faibles. Une valeur inférieure permet à Dataflow d'effectuer un scaling horizontal plus agressif en réponse à une utilisation croissante des nœuds de calcul, et de procéder à un scaling à la baisse plus prudent pour améliorer la stabilité. Une valeur inférieure offre également plus de marge de manœuvre lorsque le pipeline affiche un état stable, ce qui entraîne généralement une latence de queue plus faible. (La latence de queue mesure les temps d'attente les plus longs avant le traitement d'un nouvel enregistrement.)

  • Définissez une valeur plus élevée si vous souhaitez économiser les ressources et réduire les coûts lors des pics de trafic. Une valeur plus élevée évite un scaling à la hausse excessif, au prix d'une latence plus élevée.

Pour configurer l'optimisation de l'utilisation lorsque vous exécutez une tâche sans modèle, définissez l'option de service worker_utilization_hint. Pour une tâche de modèle, modifiez plutôt l'optimisation de l'utilisation, car les options de service ne sont pas prises en charge.

L'exemple suivant montre comment utiliser worker_utilization_hint :

Java

--dataflowServiceOptions=worker_utilization_hint=TARGET_UTILIZATION

Remplacez TARGET_UTILIZATION par une valeur comprise dans la plage [0.1 - 0.9].

Python

--dataflow_service_options=worker_utilization_hint=TARGET_UTILIZATION

Remplacez TARGET_UTILIZATION par une valeur comprise dans la plage [0.1 - 0.9].

Accéder

--dataflow_service_options=worker_utilization_hint=TARGET_UTILIZATION

Remplacez TARGET_UTILIZATION par une valeur comprise dans la plage [0.1 - 0.9].

Pour les nouveaux pipelines, nous vous recommandons d'effectuer les tests sous des charges réalistes en utilisant les paramètres par défaut. Évaluez ensuite le comportement de l'autoscaling tel qu'il s'applique à votre pipeline et effectuez des ajustements si nécessaire.

L'optimisation de l'utilisation n'est qu'un des facteurs que Dataflow prend en compte pour déterminer s'il convient d'effectuer le scaling des nœuds de calcul. D'autres facteurs, tels que les tâches en attente et les clés disponibles, peuvent remplacer la valeur d'optimisation. En outre, l'optimisation n'est pas une cible stricte. L'autoscaler tente de maintenir l'utilisation du processeur dans la plage de la valeur d'optimisation, mais la métrique d'utilisation agrégée peut être supérieure ou inférieure. Pour en savoir plus, consultez la section Heuristique d'autoscaling de flux.

Mettre à jour l'optimisation de l'utilisation

Pour mettre à jour l'optimisation de l'utilisation pendant l'exécution d'une tâche, effectuez une mise à jour en cours comme suit :

gcloud

Exécutez la commande gcloud dataflow jobs update-options :

gcloud dataflow jobs update-options \
  --region=REGION \
  --worker-utilization-hint=TARGET_UTILIZATION \
  JOB_ID

Remplacez les éléments suivants :

  • REGION : ID de région du point de terminaison régional de la tâche.
  • JOB_ID : ID de la tâche à mettre à jour.
  • TARGET_UTILIZATION : valeur comprise dans la plage [0,1 - 0,9].

Pour rétablir la valeur par défaut de l'optimisation d'utilisation, exécutez la commande gcloud suivante :

gcloud dataflow jobs update-options \
  --unset-worker-utilization-hint \
  --region=REGION \
  --project=PROJECT_ID \
  JOB_ID

REST

Utilisez la méthode projects.locations.jobs.update :

PUT https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs/JOB_ID?updateMask=runtime_updatable_params.worker_utilization_hint
{
  "runtime_updatable_params": {
    "worker_utilization_hint": TARGET_UTILIZATION
  }
}

Remplacez les éléments suivants :

  • PROJECT_ID : ID de projet Google Cloud de la tâche Dataflow.
  • REGION : ID de région du point de terminaison régional de la tâche.
  • JOB_ID : ID de la tâche à mettre à jour.
  • TARGET_UTILIZATION : valeur comprise dans la plage [0,1 - 0,9].

Heuristique d'autoscaling de flux

Pour les pipelines de traitement par flux, l'objectif de l'autoscaling horizontal est de minimiser les jobs en attente tout en optimisant l'utilisation et le débit des nœuds de calcul, ainsi que de réagir rapidement aux pics de charge.

Dataflow prend en compte plusieurs facteurs lors de l'autoscaling, y compris les suivants :

  • En attente. Le temps d'attente estimé est calculé à partir du débit et des octets en attente de traitement à partir de la source d'entrée. Un pipeline est considéré comme en retard lorsque le temps d'attente estimé est supérieur à 15 secondes.

  • Objectif d'utilisation du processeur L'objectif par défaut pour l'utilisation moyenne du processeur est de 0,8. Vous pouvez remplacer cette valeur.

  • Clés disponibles. Les clés sont l'unité de base du parallélisme dans Dataflow.

Dans certains cas, Dataflow utilise les facteurs suivants dans les décisions d'autoscaling. Si ces facteurs sont utilisés pour votre job, vous pouvez consulter ces informations dans l'onglet des métriques Autoscaling.

  • La limitation basée sur les clés utilise le nombre de clés de traitement reçues par le job pour calculer la limite des nœuds de calcul utilisateur, car chaque clé ne peut être traitée que par un seul nœud de calcul à la fois.

  • Blocage du scaling à la baisse. Si Dataflow détecte que des décisions d'autoscaling instables ont été prises, il ralentit le taux de scaling à la baisse afin d'améliorer la stabilité.

  • Le scaling à la hausse basé sur le processeur utilise l'utilisation élevée du processeur comme critère de scaling à la hausse.

  • Pour les tâches de traitement par flux qui n'utilisent pas Streaming Engine, le scaling peut être limité par le nombre de disques persistants. Pour en savoir plus, consultez la section Définir la plage d'autoscaling.

Augmentation de la résolution. Si un pipeline de traitement par flux reste en retard avec un parallélisme suffisant sur les nœuds de calcul pendant plusieurs minutes, Dataflow effectue un scaling à la hausse. Dataflow tente de supprimer le traitement en attente dans les 150 secondes environ suivant le scaling à la hausse, en fonction du débit actuel par nœud de calcul. En cas de retard alors que le nœud de calcul n'a pas assez de parallélisme pour les nœuds de calcul supplémentaires, le pipeline n'effectue pas de scaling à la hausse. (Le scaling du nombre de nœuds de calcul au-delà du nombre de clés disponibles pour le traitement en parallèle ne permet pas de traiter plus rapidement les tâches en attente.)

Scaling à la baisse : lorsque l'autoscaler prend une décision concernant un scaling à la baisse, les tâches en attente est le facteur de priorité le plus élevé. L'autoscaler ne cible pas plus de 15 secondes de retard. Si le temps de traitement des tâches en attente passe en dessous de 10 secondes et que l'utilisation moyenne des nœuds de calcul est inférieure à l'objectif d'utilisation du processeur, Dataflow effectue un scaling à la baisse. Tant que le nombre de tâches en attente est acceptable, l'autoscaler tente de maintenir l'utilisation du processeur proche de l'utilisation cible du processeur. Toutefois, si l'utilisation est déjà suffisamment proche de l'objectif, l'autoscaler peut conserver même le nombre de nœuds de calcul, car chaque étape de scaling à la baisse a un coût.

Streaming Engine utilise également une technique d'autoscaling prédictif basée sur les timers en attente. Les données illimitées d'un pipeline de traitement par flux sont divisées en fenêtres regroupées par code temporel. À la fin d'une fenêtre, des timers (minuteurs) se déclenchent pour chaque clé en cours de traitement dans cette fenêtre. Le déclenchement d'un timer indique que la fenêtre a expiré pour une clé donnée. Streaming Engine peut mesurer les timers en attente et prédire combien d'entre eux se déclencheront à la fermeture d'une fenêtre. En utilisant les timers en attente comme signal, Dataflow peut estimer le volume de traitement qui doit avoir lieu lors du déclenchement des timers à venir. Dataflow s'adapte automatiquement et à l'avance suivant la charge future estimée afin de répondre à la demande prévue.

Métriques

Pour connaître les limites actuelles d'autoscaling d'une tâche, interrogez les métriques suivantes :

  • job/max_worker_instances_limit : Nombre maximal de nœuds de calcul
  • job/min_worker_instances_limit : Nombre minimal de nœuds de calcul

Pour obtenir des informations sur l'utilisation des nœuds de calcul, interrogez les métriques suivantes :

  • job/aggregated_worker_utilization : utilisation agrégée des nœuds de calcul.
  • job/worker_utilization_hint : optimisation de l'utilisation actuelle des nœuds de calcul.

Pour obtenir des informations sur le comportement de l'autoscaler, interrogez la métrique suivante :

  • job.worker_utilization_hint_is_actively_used : indique si l'autoscaler utilise activement l'optimisation d'utilisation des nœuds de calcul. Si d'autres facteurs remplacent la valeur d'optimisation lors de l'échantillonnage de cette métrique, la valeur est false.
  • job/horizontal_worker_scaling : décrit les décisions prises par l'autoscaler. Cette métrique contient les étiquettes suivantes :
    • direction : indique si l'autoscaler a effectué un scaling à la hausse, à la baisse ou n'a effectué aucune action.
    • rationale : spécifie la logique de la décision de l'autoscaler.

Pour en savoir plus, consultez les métriques Cloud Monitoring. Ces métriques sont également affichées dans les graphiques de surveillance de l'autoscaling.

Étapes suivantes