Lorsque vous sélectionnez un job Dataflow spécifique, l'interface de surveillance fournit une représentation graphique de votre pipeline : le graphique de job. La page "Graphique des tâches" de la console fournit également un résumé des tâches, un journal de tâche et des informations sur chaque étape du pipeline.
Sur le graphique de tâche d'un pipeline, chaque transformation est représentée sous la forme d'une case. Chaque case contient le nom de la transformation et certaines informations sur son état, par exemple :
- Running (en cours d'exécution) : l'étape est en cours d'exécution.
- Queued (En file d'attente) : l'étape d'une tâche FlexRS est en file d'attente.
- Succeeded (Réussie) : l'étape s'est terminée avec succès.
- Stopped (Arrêtée) : l'étape a été arrêtée, car la tâche s'est arrêtée.
- Unknown (Inconnu) : l'étape n'a pas pu indiquer l'état.
- Failed (échec) : l'étape n'a pas abouti.
Par défaut, la page du graphique de la tâche affiche la vue Graphique. Pour afficher le graphique de votre tâche sous forme de tableau, dans la vue Étapes de la tâche, sélectionnez Vue Tableau. La vue Tableau contient les mêmes informations dans un format différent. La vue Tableau est utile dans les cas suivants :
- Votre job comporte de nombreuses étapes, ce qui rend la navigation dans le graphique de job difficile.
- Vous souhaitez trier les étapes de la tâche par une propriété spécifique. Par exemple, vous pouvez trier la table par temps réel pour identifier les étapes lentes.
Graphique de la tâche de base
Code du pipeline :Java// Read the lines of the input text. p.apply("ReadLines", TextIO.read().from(options.getInputFile())) // Count the words. .apply(new CountWords()) // Write the formatted word counts to output. .apply("WriteCounts", TextIO.write().to(options.getOutput())); Python( pipeline # Read the lines of the input text. | 'ReadLines' >> beam.io.ReadFromText(args.input_file) # Count the words. | CountWords() # Write the formatted word counts to output. | 'WriteCounts' >> beam.io.WriteToText(args.output_path)) Go// Create the pipeline. p := beam.NewPipeline() s := p.Root() // Read the lines of the input text. lines := textio.Read(s, *input) // Count the words. counted := beam.ParDo(s, CountWords, lines) // Write the formatted word counts to output. textio.Write(s, *output, formatted) |
Graphique de la tâche :
|
Transformations composites
Dans le graphique de tâche, vous pouvez développer les transformations composites (qui contiennent plusieurs sous-transformations imbriquées). Les transformations composites que vous pouvez développer sont identifiées par une flèche. Cliquez sur cette flèche pour développer la transformation et afficher les sous-transformations qu'elle contient.
Code du pipeline :Java// The CountWords Composite Transform // inside the WordCount pipeline. public static class CountWords extends PTransform<PCollection<String>, PCollection<String>> { @Override public PCollection<String> apply(PCollection<String> lines) { // Convert lines of text into individual words. PCollection<String> words = lines.apply( ParDo.of(new ExtractWordsFn())); // Count the number of times each word occurs. PCollection<KV<String, Long>> wordCounts = words.apply(Count.<String>perElement()); return wordCounts; } } Python# The CountWords Composite Transform inside the WordCount pipeline. @beam.ptransform_fn def CountWords(pcoll): return ( pcoll # Convert lines of text into individual words. | 'ExtractWords' >> beam.ParDo(ExtractWordsFn()) # Count the number of times each word occurs. | beam.combiners.Count.PerElement() # Format each word and count into a printable string. | 'FormatCounts' >> beam.ParDo(FormatCountsFn())) Go// The CountWords Composite Transform inside the WordCount pipeline. func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection { s = s.Scope("CountWords") // Convert lines of text into individual words. col := beam.ParDo(s, &extractFn{SmallWordLength: *smallWordLength}, lines) // Count the number of times each word occurs. return stats.Count(s, col) } |
Graphique de la tâche :
|
Dans votre code de pipeline, vous avez peut-être appelé votre transformation composite de la manière suivante :
result = transform.apply(input);
Les transformations composites appelées de cette façon omettent l'imbrication attendue et peuvent donc apparaître développées dans l'interface de surveillance de Dataflow. Votre pipeline peut également générer des erreurs ou des avertissements relatifs aux noms uniques stables au moment de l'exécution du pipeline.
Pour éviter ces problèmes, assurez-vous d'appeler vos transformations à l'aide du format recommandé :
result = input.apply(transform);
Noms de transformation
Dataflow propose plusieurs solutions pour obtenir le nom de la transformation apparaissant dans le graphique de tâche de surveillance :
Java
- Dataflow peut utiliser un nom que vous attribuez lorsque vous appliquez votre transformation. Le premier argument que vous fournissez à la méthode
apply
sert de nom à la transformation. - Dataflow peut déduire le nom de la transformation soit du nom de la classe (si vous avez créé une transformation personnalisée), soit du nom de votre objet fonction
DoFn
(si vous utilisez une transformation de base telle queParDo
).
Python
- Dataflow peut utiliser un nom que vous attribuez lorsque vous appliquez votre transformation. Vous pouvez définir le nom d'une transformation en spécifiant l'argument
label
de celle-ci. - Dataflow peut déduire le nom de la transformation soit du nom de la classe (si vous avez créé une transformation personnalisée), soit du nom de votre objet fonction
DoFn
(si vous utilisez une transformation de base telle queParDo
).
Go
- Dataflow peut utiliser un nom que vous attribuez lorsque vous appliquez votre transformation. Vous pouvez définir le nom d'une transformation en spécifiant le
Scope
. - Dataflow peut déduire le nom de la transformation soit du nom de la structure, si vous utilisez un
DoFn
de structure, soit du nom de la fonction, si vous utilisez unDoFn
.
Comprendre les métriques
Cette section fournit des détails sur les métriques associées au graphique de la tâche.
Durée d'exécution
Lorsque vous cliquez sur une étape, la métrique Wall time (Durée d'exécution) s'affiche dans le panneau Step info (Informations sur l'étape). Cette métrique fournit une approximation du temps total passé sur l'ensemble des threads dans tous les nœuds de calcul pour les actions suivantes :
- Initialisation de l'étape
- Traitement des données
- Mélange des données
- Clôture de l'étape
Pour les étapes composites, il s'agit de la durée cumulée des étapes des composants. Cette estimation peut vous aider à identifier les étapes particulièrement lentes et à déterminer quelles parties de votre pipeline prennent plus de temps qu'elles ne le devraient.
Métriques des entrées secondaires
Les métriques des entrées secondaires vous montrent comment les modèles et algorithmes d'accès aux entrées secondaires affectent les performances de votre pipeline. Lorsque votre pipeline utilise une entrée secondaire, Dataflow écrit la collection sur une couche persistante (telle qu'un disque), et vos transformations lisent les données à partir de cette collection persistante. Ces lectures et écritures affectent le temps d'exécution de votre tâche.
L'interface de surveillance Dataflow affiche les métriques des entrées secondaires lorsque vous sélectionnez une transformation qui crée ou absorbe une collection d'entrée secondaire. Vous pouvez consulter les métriques dans la section Side Input Metrics (Métriques des entrées secondaires) du panneau Step info (Informations sur l'étape).
Transformations créant une entrée secondaire
Si la transformation sélectionnée crée une collection d'entrée secondaire, la section Side Input Metrics (Métriques des entrées secondaires) affiche le nom de la collection, ainsi que les métriques suivantes :
- Time spent writing (Durée de l'écriture) : temps passé à écrire la collection de l'entrée secondaire.
- Bytes written (Octets écrits) : nombre total d'octets écrits dans la collection de l'entrée secondaire.
- Time & bytes read from side input (Durée de la lecture et octets lus à partir des entrées secondaires) : tableau contenant des métriques supplémentaires pour toutes les transformations absorbant la collection de l'entrée secondaire. Ces transformations sont appelées side input consumers (consommateurs de l'entrée secondaire).
Le tableau Time & bytes read from side input (Durée de la lecture et octets lus à partir des entrées secondaires) contient les informations suivantes pour chaque consommateur de l'entrée secondaire :
- Side input consumer (Consommateur de l'entrée secondaire) : nom de la transformation consommant l'entrée secondaire.
- Time spent reading (Durée de la lecture) : temps passé par ce consommateur à lire la collection de l'entrée secondaire.
- Bytes read (Octets lus) : nombre d'octets lus par ce consommateur à partir de la collection de l'entrée secondaire.
Si votre pipeline comporte une transformation composite qui crée une entrée secondaire, déployez la transformation composite jusqu'à voir la sous-transformation spécifique qui crée l'entrée secondaire. Ensuite, sélectionnez cette sous-transformation pour afficher la section Side Input Metrics (Métriques des entrées secondaires).
La figure 4 illustre les métriques des entrées secondaires pour une transformation créant une collection d'entrée secondaire.
Transformations consommant une ou plusieurs entrées secondaires
Si la transformation sélectionnée consomme une ou plusieurs entrées secondaires, la section Side Input Metrics (Métriques des entrées secondaires) affiche le tableau Time & bytes read from side input (Durée de la lecture et octets lus à partir des entrées secondaires). Ce tableau contient les informations suivantes pour chaque collection de l'entrée secondaire :
- Side input collection (Collection de l'entrée secondaire) : nom de la collection de l'entrée secondaire.
- Time spent reading (Durée de la lecture) : temps passé par la transformation à lire cette collection de l'entrée secondaire.
- Bytes read (Octets lus) : nombre d'octets lus par la transformation dans la collection de l'entrée secondaire.
Si votre pipeline comporte une transformation composite qui lit une entrée secondaire, déployez la transformation composite jusqu'à voir la sous-transformation spécifique qui lit l'entrée secondaire. Ensuite, sélectionnez cette sous-transformation pour afficher la section Side Input Metrics (Métriques des entrées secondaires).
La figure 5 illustre les métriques des entrées secondaires pour une transformation lisant une collection d'entrée secondaire.
Identifier les problèmes de performance des entrées secondaires
La réitération est un problème de performances courant lié aux entrées secondaires. Si votre PCollection
d'entrée secondaire est trop volumineuse, les nœuds de calcul ne peuvent pas mettre en cache l'intégralité de la collection.
Par conséquent, les nœuds de calcul doivent régulièrement lire les données depuis le stockage persistant de la collection de l'entrée secondaire.
Dans la figure 6, les métriques des entrées secondaires montrent que le nombre total d'octets lus dans la collection de l'entrée secondaire est bien supérieur à la taille de la collection (nombre total d'octets écrits).
Pour améliorer les performances de ce pipeline, modifiez votre algorithme pour éviter les itérations ou la récupération répétée des données de l'entrée secondaires. Dans cet exemple, le pipeline calcule le produit cartésien de deux collections. Pour chaque élément de la collection principale, l'algorithme parcourt l'ensemble de la collection de l'entrée secondaire. Vous pouvez améliorer le modèle d'accès du pipeline en regroupant plusieurs éléments de la collection principale. Cette modification réduit le nombre de lectures de la collection de l'entrée secondaire requises par les nœuds de calcul.
Un autre problème de performances courant peut survenir si votre pipeline effectue une jointure en appliquant une fonction ParDo
sur une ou plusieurs entrées secondaires volumineuses. Dans ce cas, les nœuds de calcul consacrent une large proportion du temps de traitement de la jointure à lire les données des collections d'entrées secondaires.
La figure 7 illustre des exemples de métriques des entrées secondaires dans ce type de contexte :
Pour améliorer les performances de ce pipeline, utilisez CoGroupByKey au lieu des entrées secondaires.