Régler l'autoscaling horizontal pour les pipelines de traitement par flux

Dans les pipelines de traitement par flux avec un volume élevé de données d'entrée, il existe généralement un compromis entre coût et latence. Pour maintenir une faible latence, Dataflow doit ajouter des nœuds de calcul à mesure que le volume de trafic augmente. Un autre facteur est la vitesse à laquelle le pipeline doit évoluer à la hausse ou à la baisse en réponse aux modifications du débit de données d'entrée.

Les paramètres par défaut de l'autoscaler Dataflow sont adaptés à de nombreuses charges de travail. Toutefois, vous pouvez adapter ce comportement pour votre scénario particulier. Par exemple, une latence moyenne plus élevée peut être acceptable pour réduire les coûts, mais vous pouvez également souhaiter que Dataflow effectue un scaling à la hausse plus rapidement afin de répondre aux pics de trafic.

Pour optimiser l'autoscaling horizontal, vous pouvez ajuster les paramètres suivants :

Définir la plage d'autoscaling

Lorsque vous créez un job de traitement par flux, vous pouvez définir le nombre initial de nœuds de calcul et le nombre maximal de nœuds de calcul. Pour ce faire, spécifiez les options de pipeline suivantes :

Java

  • --numWorkers : nombre initial de nœuds de calcul disponibles au début de l'exécution du pipeline
  • --maxNumWorkers : nombre maximal de nœuds de calcul disponibles pour votre pipeline

Python

  • --num_workers : nombre initial de nœuds de calcul disponibles au début de l'exécution du pipeline
  • --max_num_workers : nombre maximal de nœuds de calcul disponibles pour votre pipeline

Go

  • --num_workers : nombre initial de nœuds de calcul disponibles au début de l'exécution du pipeline
  • --max_num_workers : nombre maximal de nœuds de calcul disponibles pour votre pipeline

Pour les jobs de traitement par flux qui utilisent Streaming Engine, l'option --maxNumWorkers est facultative. La valeur par défaut est 100. Pour les jobs de traitement par flux qui n'utilisent pas Streaming Engine, --maxNumWorkers est requis lorsque l'autoscaling horizontal est activé.

La valeur de départ de --maxNumWorkers détermine également le nombre de disques persistants alloués au job. Les pipelines sont déployés avec un pool fixe de disques persistants, d'un nombre égal à --maxNumWorkers. Lors du streaming, les disques persistants sont redistribués de manière à ce que chaque nœud de calcul ait un nombre égal de disques associés.

Si vous définissez --maxNumWorkers, assurez-vous que la valeur fournit suffisamment de disques pour votre pipeline. Tenez compte de la croissance future lorsque vous définissez la valeur initiale. Pour en savoir plus sur les performances de Persistent Disk, consultez la section Configurer Persistent Disk et vos VM. Dataflow facture l'utilisation de Persistent Disk et dispose de quotas Compute Engine, y compris de quotas Persistent Disk.

Par défaut, le nombre minimal de nœuds de calcul est de 1 pour les jobs de traitement par flux qui utilisent Streaming Engine et de (maxNumWorkers/15) arrondi pour les jobs qui n'utilisent pas Streaming Engine.

Mettre à jour la plage d'autoscaling

Pour les jobs qui utilisent Streaming Engine, vous pouvez ajuster le nombre minimal et maximal de nœuds de calcul, sans arrêter ni remplacer le job. Pour ajuster ces valeurs, utilisez une mise à jour de job en cours. Mettez à jour les options de tâches suivantes :

  • --min-num-workers : nombre minimal de nœuds de calcul.
  • --max-num-workers : nombre maximal de nœuds de calcul.

gcloud

Exécutez la commande gcloud dataflow jobs update-options :

gcloud dataflow jobs update-options \
  --region=REGION \
  --min-num-workers=MINIMUM_WORKERS \
  --max-num-workers=MAXIMUM_WORKERS \
  JOB_ID

Remplacez les éléments suivants :

  • REGION : ID de région du point de terminaison régional de la tâche.
  • MINIMUM_WORKERS : nombre minimal d'instances Compute Engine.
  • MAXIMUM_WORKERS : nombre maximal d'instances Compute Engine.
  • JOB_ID : ID de la tâche à mettre à jour.

Vous pouvez également mettre à jour --min-num-workers et --max-num-workers individuellement.

REST

Utilisez la méthode projects.locations.jobs.update :

PUT https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs/JOB_ID?updateMask=runtime_updatable_params.max_num_workers,runtime_updatable_params.min_num_workers
{
  "runtime_updatable_params": {
    "min_num_workers": MINIMUM_WORKERS,
    "max_num_workers": MAXIMUM_WORKERS
  }
}

Remplacez les éléments suivants :

  • PROJECT_ID : ID de projet Google Cloud du job Dataflow
  • REGION : ID de région du point de terminaison régional de la tâche.
  • JOB_ID : ID de la tâche à mettre à jour.
  • MINIMUM_WORKERS : nombre minimal d'instances Compute Engine.
  • MAXIMUM_WORKERS : nombre maximal d'instances Compute Engine.

Vous pouvez également mettre à jour min_num_workers et max_num_workers individuellement. Spécifiez les paramètres à mettre à jour dans le paramètre de requête updateMask et incluez les valeurs mises à jour dans le champ runtimeUpdatableParams du corps de la requête. L'exemple suivant met à jour min_num_workers :

PUT https://dataflow.googleapis.com/v1b3/projects/my_project/locations/us-central1/jobs/job1?updateMask=runtime_updatable_params.min_num_workers
{
  "runtime_updatable_params": {
    "min_num_workers": 5
  }
}

Pour les jobs qui n'utilisent pas Streaming Engine, vous pouvez remplacer le job existant par une valeur mise à jour de maxNumWorkers.

Si vous mettez à jour un job de traitement par flux qui n'utilise pas Streaming Engine, l'autoscaling horizontal est désactivé par défaut sur le job mis à jour. Pour maintenir l'autoscaling activé, spécifiez --autoscalingAlgorithm et --maxNumWorkers pour le job mis à jour.

Définir l'optimisation de l'utilisation des nœuds de calcul

Dataflow utilise l'utilisation moyenne du processeur comme signal pour savoir quand appliquer l'autoscaling horizontal. Par défaut, Dataflow définit une utilisation du processeur cible de 0,8. Lorsque l'utilisation se situe en dehors de cette plage, Dataflow peut ajouter ou supprimer des nœuds de calcul.

Pour mieux contrôler le comportement de l'autoscaling, vous pouvez définir l'utilisation du processeur cible sur une valeur comprise dans la plage [0,1, 0,9].

  • Définissez une valeur d'utilisation du processeur inférieure si vous souhaitez obtenir des latences maximales plus faibles. Une valeur inférieure permet à Dataflow d'effectuer un scaling horizontal plus agressif en réponse à l'utilisation croissante des nœuds de calcul et d'effectuer un scaling à la baisse plus prudemment pour améliorer la stabilité. Une valeur inférieure fournit également plus de marge lorsque le pipeline s'exécute dans un état stable, ce qui entraîne généralement une latence de queue plus faible. (La latence de queue mesure les temps d'attente les plus longs avant le traitement d'un nouvel enregistrement.)

  • Définissez une valeur plus élevée si vous souhaitez économiser des ressources et réduire les coûts en cas de pics de trafic. Une valeur plus élevée évite un scaling à la hausse excessif au détriment d'une latence plus élevée.

Pour configurer l'optimisation de l'utilisation lorsque vous exécutez une tâche, définissez l'option de service worker_utilization_hint :

Java

--dataflowServiceOptions=worker_utilization_hint=TARGET_UTILIZATION

Remplacez TARGET_UTILIZATION par une valeur comprise dans la plage [0.1, 0.9].

Python

--dataflow_service_options=worker_utilization_hint=TARGET_UTILIZATION

Remplacez TARGET_UTILIZATION par une valeur comprise dans la plage [0.1, 0.9].

Go

--dataflow_service_options=worker_utilization_hint=TARGET_UTILIZATION

Remplacez TARGET_UTILIZATION par une valeur comprise dans la plage [0.1, 0.9].

Pour les nouveaux pipelines, nous vous recommandons d'effectuer des tests sous des charges réalistes en utilisant le paramètre par défaut. Évaluez ensuite le comportement de l'autoscaling tel qu'il s'applique à votre pipeline et apportez les ajustements nécessaires.

L'optimisation de l'utilisation n'est qu'un facteur utilisé par Dataflow pour déterminer s'il convient de faire évoluer les nœuds de calcul. D'autres facteurs, tels que le traitement en attente et les clés disponibles, peuvent remplacer la valeur de l'optimisation. En outre, la cible n'est pas une cible stricte. L'autoscaler tente de maintenir l'utilisation du processeur dans la plage de la valeur d'optimisation, mais la métrique d'utilisation agrégée peut être supérieure ou inférieure. Pour en savoir plus, consultez la section Heuristique d'autoscaling de flux.

Mettre à jour l'optimisation de l'utilisation

Pour mettre à jour l'optimisation de l'utilisation pendant l'exécution d'une tâche, effectuez une mise à jour en cours comme suit :

gcloud

Exécutez la commande gcloud dataflow jobs update-options :

gcloud dataflow jobs update-options \
  --region=REGION \
  -worker_utilization_hint=TARGET_UTILIZATION \
  JOB_ID

Remplacez les éléments suivants :

  • REGION : ID de région du point de terminaison régional de la tâche.
  • JOB_ID : ID de la tâche à mettre à jour.
  • TARGET_UTILIZATION: valeur comprise dans la plage [0,1, 0,9]

Pour rétablir la valeur par défaut de l'optimisation d'utilisation, exécutez la commande gcloud suivante :

gcloud dataflow jobs update-options \
  --unset_worker_utilization_hint \
  --region=REGION \
  --project=PROJECT_ID \
  JOB_ID

REST

Utilisez la méthode projects.locations.jobs.update :

PUT https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs/JOB_ID?updateMask=runtime_updatable_params.worker_utilization_hint
{
  "runtime_updatable_params": {
    "worker_utilization_hint": TARGET_UTILIZATION
  }
}

Remplacez les éléments suivants :

  • PROJECT_ID : ID de projet Google Cloud de la tâche Dataflow.
  • REGION : ID de région du point de terminaison régional de la tâche.
  • JOB_ID : ID de la tâche à mettre à jour.
  • TARGET_UTILIZATION: valeur comprise dans la plage [0,1, 0,9]

Heuristique d'autoscaling de flux

Pour les pipelines de traitement par flux, l'objectif de l'autoscaling horizontal est de minimiser les jobs en attente tout en optimisant l'utilisation et le débit des nœuds de calcul, ainsi que de réagir rapidement aux pics de charge.

Dataflow prend en compte plusieurs facteurs lors de l'autoscaling, y compris les suivants :

  • En attente. Le temps d'attente estimé est calculé à partir du débit et des octets en attente de traitement à partir de la source d'entrée. Un pipeline est considéré comme en retard lorsque le temps d'attente estimé est supérieur à 15 secondes.

  • Objectif d'utilisation du processeur La cible par défaut pour l'utilisation moyenne du processeur est de 0,8. Vous pouvez remplacer cette valeur.

  • Clés disponibles. Les clés sont l'unité de base du parallélisme dans Dataflow.

Dans certains cas, Dataflow utilise les facteurs suivants dans les décisions d'autoscaling. Si ces facteurs sont utilisés pour votre job, vous pouvez consulter ces informations dans l'onglet des métriques Autoscaling.

  • La limitation basée sur les clés utilise le nombre de clés de traitement reçues par le job pour calculer la limite des nœuds de calcul utilisateur, car chaque clé ne peut être traitée que par un seul nœud de calcul à la fois.

  • Blocage du scaling à la baisse. Si Dataflow détecte que des décisions d'autoscaling instables ont été prises, il ralentit le taux de réduction de la capacité afin d'améliorer la stabilité.

  • Le scaling à la hausse basé sur le processeur utilise l'utilisation élevée du processeur comme critère de scaling à la hausse.

  • Pour les tâches de traitement par flux qui n'utilisent pas Streaming Engine, le scaling peut être limité par le nombre de disques persistants. Pour en savoir plus, consultez la section Définir la plage d'autoscaling.

Augmentation de la résolution. Si un pipeline de traitement par flux reste en retard avec un parallélisme suffisant sur les nœuds de calcul pendant plusieurs minutes, Dataflow effectue un scaling à la hausse. Dataflow tente de supprimer le traitement en attente dans les 150 secondes environ suivant le scaling à la hausse, en fonction du débit actuel par nœud de calcul. En cas de retard alors que le nœud de calcul n'a pas assez de parallélisme pour les nœuds de calcul supplémentaires, le pipeline n'effectue pas de scaling à la hausse. (Le scaling du nombre de nœuds de calcul au-delà du nombre de clés disponibles pour le traitement en parallèle ne permet pas de traiter plus rapidement les tâches en attente.)

Scaling à la baisse : lorsque l'autoscaler prend une décision de scaling à la baisse, le traitement en attente est le facteur de priorité le plus élevé. L'autoscaler ne cible pas plus de 15 secondes de retard. Si le temps de traitement des tâches en attente passe en dessous de 10 secondes et que l'utilisation moyenne des nœuds de calcul est inférieure à l'objectif d'utilisation du processeur, Dataflow effectue un scaling à la baisse. Tant que le nombre de tâches en attente est acceptable, l'autoscaler tente de maintenir l'utilisation du processeur proche de l'utilisation cible du processeur. Toutefois, si l'utilisation est déjà suffisamment proche de la cible, l'autoscaler peut conserver le nombre de nœuds de calcul inchangé, car chaque étape de scaling à la baisse a un coût.

Streaming Engine utilise également une technique d'autoscaling prédictif basée sur les timers en attente. Les données illimitées d'un pipeline de streaming sont divisées en fenêtres regroupées par horodatage. À la fin d'une fenêtre, des timers (minuteurs) se déclenchent pour chaque clé en cours de traitement dans cette fenêtre. Le déclenchement d'un timer indique que la fenêtre a expiré pour une clé donnée. Streaming Engine peut mesurer les timers en attente et prédire combien d'entre eux se déclencheront à la fermeture d'une fenêtre. En utilisant les timers en attente comme signal, Dataflow peut estimer le volume de traitement qui doit avoir lieu lors du déclenchement des timers à venir. Dataflow s'adapte automatiquement et à l'avance suivant la charge future estimée afin de répondre à la demande prévue.

Métriques

Pour connaître les limites actuelles d'autoscaling d'une tâche, interrogez les métriques suivantes :

  • job/max_worker_instances_limit : Nombre maximal de nœuds de calcul
  • job/min_worker_instances_limit : Nombre minimal de nœuds de calcul

Pour obtenir des informations sur l'utilisation des nœuds de calcul, interrogez les métriques suivantes :

  • job/aggregated_worker_utilization : utilisation agrégée des nœuds de calcul.
  • job/worker_utilization_hint : optimisation de l'utilisation actuelle des nœuds de calcul.

Pour obtenir des informations sur le comportement de l'autoscaler, interrogez la métrique suivante :

  • job.worker_utilization_hint_is_actively_used : indique si l'autoscaler utilise activement l'optimisation d'utilisation des nœuds de calcul. Si d'autres facteurs remplacent la valeur d'optimisation lors de l'échantillonnage de cette métrique, la valeur est false.
  • job/horizontal_worker_scaling : décrit les décisions prises par l'autoscaler. Cette métrique contient les libellés suivants :
    • direction : indique si l'autoscaler a effectué un scaling à la hausse, à la baisse ou n'a effectué aucune action.
    • rationale : spécifie la logique de la décision de l'autoscaler.

Pour en savoir plus, consultez les métriques Cloud Monitoring. Ces métriques sont également affichées dans les graphiques de surveillance de l'autoscaling.

Étapes suivantes