Rééquilibrage dynamique du travail

La fonctionnalité de rééquilibrage dynamique des tâches du service Dataflow permet au service de repartitionner le travail de manière dynamique en fonction des conditions d'exécution. Ces conditions peuvent être les suivantes :

  • Déséquilibres dans les attributions de tâches
  • Nœuds de calcul nécessitant plus de temps que prévu pour terminer
  • Nœuds de calcul nécessitant moins de temps que prévu pour terminer

Le service Dataflow détecte automatiquement ces conditions et peut attribuer de manière dynamique des tâches à des nœuds de calcul inutilisés ou sous-utilisés afin de réduire le temps de traitement global de votre tâche.

Limites

Le rééquilibrage dynamique du travail a lieu uniquement lorsque le service Dataflow traite certaines données d'entrée en parallèle : lors de la lecture de données à partir d'une source d'entrée externe, lors de l'utilisation d'une classe PCollection intermédiaire matérialisée, ou lors de l'utilisation du résultat d'une agrégation, comme GroupByKey. Si de nombreuses étapes de votre tâche sont fusionnées, votre tâche comporte moins de classes PCollection intermédiaires, et le rééquilibrage dynamique du travail est limité à la nombre d'éléments dans la PCollection matérialisée source. Si vous voulez vous assurer que le rééquilibrage dynamique du travail peut être appliqué à une classe PCollection particulière de votre pipeline, vous pouvez empêcher la fusion de différentes manières afin de garantir un parallélisme dynamique.

Le rééquilibrage dynamique du travail ne peut pas remettre en parallèle des données plus fines qu'un seul enregistrement. Si vos données contiennent des enregistrements individuels qui entraînent des temps de traitement importants, elles peuvent retarder votre tâche. Dataflow ne peut pas subdiviser et redistribuer un enregistrement "à chaud" individuel sur plusieurs nœuds de calcul.

Java

Si vous définissez un nombre fixe de segments pour la sortie finale de votre pipeline (par exemple, en écrivant des données à l'aide de TextIO.Write.withNumShards), Dataflow limite la parallélisation en fonction du nombre de segments que vous choisissez.

Python

Si vous définissez un nombre fixe de segments pour la sortie finale de votre pipeline (par exemple, en écrivant des données à l'aide de beam.io.WriteToText(..., num_shards=...)), Dataflow limite la parallélisation en fonction du nombre de segments que vous choisissez.

Go

Si vous définissez un nombre fixe de segments pour la sortie finale de votre pipeline, Dataflow limite la parallélisation en fonction du nombre de segments que vous choisissez.

Travailler avec des sources de données personnalisées

Java

Si votre pipeline utilise une source de données personnalisée que vous fournissez, vous devez mettre en œuvre la méthode splitAtFraction pour permettre à votre source d'utiliser la fonctionnalité de rééquilibrage dynamique du travail.

Si vous mettez en œuvre splitAtFraction de manière incorrecte, les enregistrements de votre source peuvent être dupliqués ou supprimés. Consultez les informations de référence de l'API sur RangeTracker pour obtenir de l'aide et des conseils sur la mise en œuvre de splitAtFraction.

Python

Si votre pipeline utilise une source de données personnalisée que vous fournissez, votre RangeTracker doit mettre en œuvre try_claim, try_split, position_at_fraction et fraction_consumed pour permettre à votre source d'utiliser la fonctionnalité de rééquilibrage dynamique du travail.

Consultez les informations de référence de l'API sur RangeTracker pour en savoir plus.

Go

Si votre pipeline utilise une source de données personnalisée que vous fournissez, vous devez mettre en œuvre une méthode RTracker valide pour permettre à votre source d'utiliser la fonctionnalité de rééquilibrage dynamique du travail.

Pour en savoir plus, consultez les informations de référence de l'API RTracker.

Le rééquilibrage dynamique du travail est activé par la valeur de retour de la méthode getProgress() de votre source personnalisée. L'implémentation par défaut de getProgress() renvoie null. Pour garantir l'activation de l'autoscaling, assurez-vous que votre source personnalisée remplace getProgress()afin de renvoyer une valeur appropriée.