Graphiques de tâches Dataflow

Lorsque vous sélectionnez une tâche Dataflow spécifique, l'interface de surveillance fournit une représentation graphique de votre pipeline: le graphique de tâche. La page "Graphique des tâches" de la console fournit également un résumé des tâches, un journal de tâche et des informations sur chaque étape du pipeline.

Sur le graphique de tâche d'un pipeline, chaque transformation est représentée sous la forme d'une case. Chaque case contient le nom de la transformation et certaines informations sur son état, par exemple :

  • Running (en cours d'exécution) : l'étape est en cours d'exécution.
  • Queued (En file d'attente) : l'étape d'une tâche FlexRS est en file d'attente.
  • Succeeded (Réussie) : l'étape s'est terminée avec succès.
  • Stopped (Arrêtée) : l'étape a été arrêtée, car la tâche s'est arrêtée.
  • Unknown (Inconnu) : l'étape n'a pas pu indiquer l'état.
  • Failed (échec) : l'étape n'a pas abouti.

Graphique de la tâche de base

Code du pipeline :

Java

  // Read the lines of the input text.
  p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
     // Count the words.
     .apply(new CountWords())
     // Write the formatted word counts to output.
     .apply("WriteCounts", TextIO.write().to(options.getOutput()));

Python

(
    pipeline
    # Read the lines of the input text.
    | 'ReadLines' >> beam.io.ReadFromText(args.input_file)
    # Count the words.
    | CountWords()
    # Write the formatted word counts to output.
    | 'WriteCounts' >> beam.io.WriteToText(args.output_path))

Go

  // Create the pipeline.
  p := beam.NewPipeline()
    s := p.Root()
  // Read the lines of the input text.
  lines := textio.Read(s, *input)
  // Count the words.
  counted := beam.ParDo(s, CountWords, lines)
  // Write the formatted word counts to output.
  textio.Write(s, *output, formatted)
Graphique de la tâche :

Graphique d'exécution d'un pipeline WordCount tel qu'il s'affiche dans l'interface de surveillance de Dataflow.

Figure 1 : Code du pipeline WordCount affiché avec le graphique d'exécution obtenu dans l'interface de surveillance Dataflow.

Transformations composites

Dans le graphique de tâche, vous pouvez développer les transformations composites (qui contiennent plusieurs sous-transformations imbriquées). Les transformations composites que vous pouvez développer sont identifiées par une flèche. Cliquez sur cette flèche pour développer la transformation et afficher les sous-transformations qu'elle contient.

Code du pipeline :

Java

  // The CountWords Composite Transform
  // inside the WordCount pipeline.

  public static class CountWords
    extends PTransform<PCollection<String>, PCollection<String>> {

    @Override
    public PCollection<String> apply(PCollection<String> lines) {

      // Convert lines of text into individual words.
      PCollection<String> words = lines.apply(
        ParDo.of(new ExtractWordsFn()));

      // Count the number of times each word occurs.
      PCollection<KV<String, Long>> wordCounts =
        words.apply(Count.<String>perElement());

      return wordCounts;
    }
  }

Python

# The CountWords Composite Transform inside the WordCount pipeline.
@beam.ptransform_fn
def CountWords(pcoll):
  return (
      pcoll
      # Convert lines of text into individual words.
      | 'ExtractWords' >> beam.ParDo(ExtractWordsFn())
      # Count the number of times each word occurs.
      | beam.combiners.Count.PerElement()
      # Format each word and count into a printable string.
      | 'FormatCounts' >> beam.ParDo(FormatCountsFn()))

Go

  // The CountWords Composite Transform inside the WordCount pipeline.
  func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection {
    s = s.Scope("CountWords")

    // Convert lines of text into individual words.
    col := beam.ParDo(s, &extractFn{SmallWordLength: *smallWordLength}, lines)

    // Count the number of times each word occurs.
    return stats.Count(s, col)
  }
Graphique de la tâche :

Graphique de tâche d'un pipeline WordCount, avec la transformation CountWords développée de façon à afficher les sous-transformations qui la constituent.

Figure 2 : Code du pipeline correspondant aux sous-étapes de la transformation CountWords, affiché avec le graphique de tâche développé pour l'ensemble du pipeline.

Dans votre code de pipeline, vous avez peut-être appelé votre transformation composite de la manière suivante :

result = transform.apply(input);

Les transformations composites appelées de cette façon omettent l'imbrication attendue et peuvent donc apparaître développées dans l'interface de surveillance de Dataflow. Votre pipeline peut également générer des erreurs ou des avertissements relatifs aux noms uniques stables au moment de l'exécution du pipeline.

Pour éviter ces problèmes, assurez-vous d'appeler vos transformations à l'aide du format recommandé :

result = input.apply(transform);

Noms de transformation

Dataflow propose plusieurs solutions pour obtenir le nom de la transformation apparaissant dans le graphique de tâche de surveillance :

Java

  • Dataflow peut utiliser un nom que vous attribuez lorsque vous appliquez votre transformation. Le premier argument que vous fournissez à la méthode apply sert de nom à la transformation.
  • Dataflow peut déduire le nom de la transformation soit du nom de la classe (si vous avez créé une transformation personnalisée), soit du nom de votre objet fonction DoFn (si vous utilisez une transformation de base telle que ParDo).

Python

  • Dataflow peut utiliser un nom que vous attribuez lorsque vous appliquez votre transformation. Vous pouvez définir le nom d'une transformation en spécifiant l'argument label de celle-ci.
  • Dataflow peut déduire le nom de la transformation soit du nom de la classe (si vous avez créé une transformation personnalisée), soit du nom de votre objet fonction DoFn (si vous utilisez une transformation de base telle que ParDo).

Go

  • Dataflow peut utiliser un nom que vous attribuez lorsque vous appliquez votre transformation. Vous pouvez définir le nom d'une transformation en spécifiant le Scope.
  • Dataflow peut déduire le nom de la transformation soit du nom de la structure, si vous utilisez un DoFn de structure, soit du nom de la fonction, si vous utilisez un DoFn.

Comprendre les statistiques

Cette section fournit des détails sur les métriques associées au graphique de la tâche.

Durée d'exécution

Lorsque vous cliquez sur une étape, la métrique Wall time (Durée d'exécution) s'affiche dans le panneau Step info (Informations sur l'étape). Cette métrique fournit une approximation du temps total passé sur l'ensemble des threads dans tous les nœuds de calcul pour les actions suivantes :

  • Initialisation de l'étape
  • Traitement des données
  • Mélange des données
  • Clôture de l'étape

Pour les étapes composites, il s'agit de la durée cumulée des étapes des composants. Cette estimation peut vous aider à identifier les étapes particulièrement lentes et à déterminer quelles parties de votre pipeline prennent plus de temps qu'elles ne le devraient.

Vous pouvez voir le temps nécessaire à l'exécution d'une étape dans votre pipeline.
Figure 3 : La métrique Wall time (Durée d'exécution) peut vous aider à vérifier que votre pipeline fonctionne efficacement.

Métriques des entrées secondaires

Les métriques des entrées secondaires vous montrent comment les modèles et algorithmes d'accès aux entrées secondaires affectent les performances de votre pipeline. Lorsque votre pipeline utilise une entrée secondaire, Dataflow écrit la collection sur une couche persistante (telle qu'un disque), et vos transformations lisent les données à partir de cette collection persistante. Ces lectures et écritures affectent le temps d'exécution de votre tâche.

L'interface de surveillance Dataflow affiche les métriques des entrées secondaires lorsque vous sélectionnez une transformation qui crée ou absorbe une collection d'entrée secondaire. Vous pouvez consulter les métriques dans la section Side Input Metrics (Métriques des entrées secondaires) du panneau Step info (Informations sur l'étape).

Transformations créant une entrée secondaire

Si la transformation sélectionnée crée une collection d'entrée secondaire, la section Side Input Metrics (Métriques des entrées secondaires) affiche le nom de la collection, ainsi que les métriques suivantes :

  • Time spent writing (Durée de l'écriture) : temps passé à écrire la collection de l'entrée secondaire.
  • Bytes written (Octets écrits) : nombre total d'octets écrits dans la collection de l'entrée secondaire.
  • Time & bytes read from side input (Durée de la lecture et octets lus à partir des entrées secondaires) : tableau contenant des métriques supplémentaires pour toutes les transformations absorbant la collection de l'entrée secondaire. Ces transformations sont appelées side input consumers (consommateurs de l'entrée secondaire).

Le tableau Time & bytes read from side input (Durée de la lecture et octets lus à partir des entrées secondaires) contient les informations suivantes pour chaque consommateur de l'entrée secondaire :

  • Side input consumer (Consommateur de l'entrée secondaire) : nom de la transformation consommant l'entrée secondaire.
  • Time spent reading (Durée de la lecture) : temps passé par ce consommateur à lire la collection de l'entrée secondaire.
  • Bytes read (Octets lus) : nombre d'octets lus par ce consommateur à partir de la collection de l'entrée secondaire.

Si votre pipeline comporte une transformation composite qui crée une entrée secondaire, déployez la transformation composite jusqu'à voir la sous-transformation spécifique qui crée l'entrée secondaire. Ensuite, sélectionnez cette sous-transformation pour afficher la section Side Input Metrics (Métriques des entrées secondaires).

La figure 4 illustre les métriques des entrées secondaires pour une transformation créant une collection d'entrée secondaire.

Vous pouvez sélectionner la sous-transformation. Ses métriques d'entrées secondaires sont alors visibles dans le panneau latéral Informations sur l'étape.
Figure 4 : Le graphique de la tâche comporte une transformation composite développée (MakeMapView). La sous-transformation créant l'entrée secondaire (CreateDataflowView) est sélectionnée et les métriques des entrées secondaires sont visibles dans le panneau latéral Step info (Informations sur l'étape).

Transformations consommant une ou plusieurs entrées secondaires

Si la transformation sélectionnée consomme une ou plusieurs entrées secondaires, la section Side Input Metrics (Métriques des entrées secondaires) affiche le tableau Time & bytes read from side input (Durée de la lecture et octets lus à partir des entrées secondaires). Ce tableau contient les informations suivantes pour chaque collection de l'entrée secondaire :

  • Side input collection (Collection de l'entrée secondaire) : nom de la collection de l'entrée secondaire.
  • Time spent reading (Durée de la lecture) : temps passé par la transformation à lire cette collection de l'entrée secondaire.
  • Bytes read (Octets lus) : nombre d'octets lus par la transformation dans la collection de l'entrée secondaire.

Si votre pipeline comporte une transformation composite qui lit une entrée secondaire, déployez la transformation composite jusqu'à voir la sous-transformation spécifique qui lit l'entrée secondaire. Ensuite, sélectionnez cette sous-transformation pour afficher la section Side Input Metrics (Métriques des entrées secondaires).

La figure 5 illustre les métriques des entrées secondaires pour une transformation lisant une collection d'entrée secondaire.

Vous pouvez sélectionner la transformation. Ses métriques d'entrées secondaires sont alors visibles dans le panneau latéral Informations sur l'étape.
Figure 5 : La transformation JoinBothCollections lit une collection de l'entrée secondaire L'option JoinBothCollections est sélectionnée dans le graphique de tâche, et les métriques des entrées secondaires sont visibles dans le panneau latéral Informations sur l'étape.

Identifier les problèmes de performance des entrées secondaires

La réitération est un problème de performances courant lié aux entrées secondaires. Si votre PCollection d'entrée secondaire est trop volumineuse, les nœuds de calcul ne peuvent pas mettre en cache l'intégralité de la collection. Par conséquent, les nœuds de calcul doivent régulièrement lire les données depuis le stockage persistant de la collection de l'entrée secondaire.

Dans la figure 6, les métriques des entrées secondaires montrent que le nombre total d'octets lus dans la collection de l'entrée secondaire est bien supérieur à la taille de la collection (nombre total d'octets écrits).

Vous pouvez sélectionner la transformation. Ses métriques d'entrées secondaires sont alors visibles dans le panneau latéral Informations sur l'étape.
Figure 6 : Exemple de réitération. La collection de l'entrée secondaire a une taille de 563 Mo, et la somme des octets lus par les transformations consommatrices atteint pratiquement 12 Go.

Pour améliorer les performances de ce pipeline, modifiez votre algorithme pour éviter les itérations ou la récupération répétée des données de l'entrée secondaires. Dans cet exemple, le pipeline calcule le produit cartésien de deux collections. Pour chaque élément de la collection principale, l'algorithme parcourt l'ensemble de la collection de l'entrée secondaire. Vous pouvez améliorer le modèle d'accès du pipeline en regroupant plusieurs éléments de la collection principale. Cette modification réduit le nombre de lectures de la collection de l'entrée secondaire requises par les nœuds de calcul.

Un autre problème de performances courant peut survenir si votre pipeline effectue une jointure en appliquant une fonction ParDo sur une ou plusieurs entrées secondaires volumineuses. Dans ce cas, les nœuds de calcul consacrent une large proportion du temps de traitement de la jointure à lire les données des collections d'entrées secondaires.

La figure 7 illustre des exemples de métriques des entrées secondaires dans ce type de contexte :

Vous pouvez sélectionner la transformation. Ses métriques d'entrées secondaires sont alors visibles dans le panneau latéral Informations sur l'étape.
Figure 7 : Le temps total de traitement de la transformation JoinBothCollections est de 18 min 31 s. Les nœuds de calcul passent la majeure partie du temps de traitement (10 min 3 s) à lire la collection de l'entrée secondaire, qui a une taille de 10 Go.

Pour améliorer les performances de ce pipeline, utilisez CoGroupByKey au lieu des entrées secondaires.