Enregistrer les messages de pipeline

Vous pouvez utiliser l'infrastructure de journalisation intégrée au SDK Cloud Dataflow pour consigner des informations pendant l'exécution de votre pipeline. La console Google Cloud Platform offre la possibilité de surveiller les informations de journalisation pendant et après l'exécution de votre pipeline.

Ajouter des messages de journal à votre pipeline

Java : SDK 2.x

Le SDK Apache Beam pour Java vous recommande de journaliser les messages des nœuds de calcul via la bibliothèque Open Source SLF4J (Simple Logging Facade for Java). Le SDK Apache Beam pour Java met en œuvre l'infrastructure de journalisation requise, de sorte que votre code Java n'a besoin que d'importer l'API SLF4J, puis d'instancier un enregistreur pour activer la journalisation des messages dans le code de votre pipeline.

Pour le code et/ou les bibliothèques préexistants, le SDK Apache Beam pour Java configure une infrastructure de journalisation supplémentaire. Cela se produit lors de l'exécution sur le nœud de calcul, de façon à capturer les messages de journal générés par les bibliothèques de journalisation suivantes pour Java :

Python

Le SDK Apache Beam pour Python propose le package de bibliothèque logging, qui permet aux nœuds de calcul de votre pipeline de générer des messages de journal. Pour utiliser les fonctions de la bibliothèque, vous devez importer cette dernière :

import logging

Java : SDK 1.x

Le SDK Apache Beam pour Java vous recommande de journaliser les messages des nœuds de calcul via la bibliothèque Open Source SLF4J (Simple Logging Facade for Java). Le SDK Apache Beam pour Java met en œuvre l'infrastructure de journalisation requise, de sorte que votre code Java n'a besoin que d'importer l'API SLF4J, puis d'instancier un enregistreur pour activer la journalisation des messages dans le code de votre pipeline.

Pour le code et/ou les bibliothèques préexistants, le SDK Apache Beam pour Java configure une infrastructure de journalisation supplémentaire. Cela se produit lors de l'exécution sur le nœud de calcul, de façon à capturer les messages de journal générés par les bibliothèques de journalisation suivantes pour Java :

Exemple de code pour générer les messages de journal d'un nœud de calcul

Java : SDK 2.x

L'exemple Apache Beam WordCount peut être modifié pour générer un message de journal lorsque le mot "love" est trouvé dans une ligne du texte traité. Le code ajouté est indiqué en gras ci-dessous (le code environnant est inclus pour le contexte).

 package org.apache.beam.examples;
 // Import SLF4J packages.
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 ...
 public class WordCount {
   ...
   static class ExtractWordsFn extends DoFn<String, String> {
     // Instantiate Logger.
     // Suggestion: As shown, specify the class name of the containing class
     // (WordCount).
     private static final Logger LOG = LoggerFactory.getLogger(WordCount.class);
     ...
     @ProcessElement
     public void processElement(ProcessContext c) {
       ...
       // Output each word encountered into the output PCollection.
       for (String word : words) {
         if (!word.isEmpty()) {
           c.output(word);
         }
         // Log INFO messages when the word "love" is found.
         if(word.toLowerCase().equals("love")) {
           LOG.info("Found " + word.toLowerCase());
         }
       }
     }
   }
 ... // Remaining WordCount example code ...

Python

L'exemple Apache Beam wordcount.py peut être modifié pour générer un message de journal lorsque le mot "love" est trouvé dans une ligne du texte traité.

# import Python logging module.
import logging

class ExtractWordsFn(beam.DoFn):

  def process(self, element):
    words = re.findall(r'[A-Za-z\']+', element)
    for word in words:
      yield word

      if word.lower() == 'love':
        # Log using the root logger at info or higher levels
        logging.info('Found : %s', word.lower())

# Remaining WordCount example code ...

Java : SDK 1.x

L'exemple WordCount peut être modifié pour générer un message de journal lorsque le mot "love" est trouvé dans une ligne du texte traité. Le code ajouté est indiqué en gras ci-dessous (le code environnant est inclus pour le contexte).

 package com.google.cloud.dataflow.examples;
 // Import SLF4J packages.
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 ...
 public class WordCount {
   ...
   static class ExtractWordsFn extends DoFn<String, String> {
     // Instantiate Logger.
     // Suggestion: As shown, specify the class name of the containing class
     // (WordCount).
     private static final Logger LOG = LoggerFactory.getLogger(WordCount.class);
     ...
     @Override
     public void processElement(ProcessContext c) {
       ...
       // Output each word encountered into the output PCollection.
       for (String word : words) {
         if (!word.isEmpty()) {
           c.output(word);
         }
         // Log INFO messages when the word "love" is found.
         if(word.toLowerCase().equals("love")) {
           LOG.info("Found " + word.toLowerCase());
         }
       }
     }
   }
 ... // Remaining WordCount example code ...

Java : SDK 2.x

Si le pipeline WordCount modifié est exécuté en local à l'aide de l'exécuteur par défaut DirectRunner et que la sortie est envoyée dans un fichier local (--output=./local-wordcounts), la sortie de la console inclut les messages de journal ajoutés :

INFO: Executing pipeline using the DirectRunner.
...
Feb 11, 2015 1:13:22 PM org.apache.beam.examples.WordCount$ExtractWordsFn processElement
INFO: Found love
Feb 11, 2015 1:13:22 PM org.apache.beam.examples.WordCount$ExtractWordsFn processElement
INFO: Found love
Feb 11, 2015 1:13:22 PM org.apache.beam.examples.WordCount$ExtractWordsFn processElement
INFO: Found love
...
INFO: Pipeline execution complete.

Par défaut, seules les lignes de journal de niveau INFO ou supérieur seront envoyées à Cloud Logging. Si vous souhaitez modifier ce comportement, consultez la section Définir les niveaux de journalisation des nœuds de calcul d'un pipeline.

Python

Si le pipeline WordCount modifié est exécuté en local à l'aide de l'exécuteur par défaut DirectRunner et que la sortie est envoyée dans un fichier local (--output=./local-wordcounts), la sortie de la console inclut les messages de journal ajoutés :

INFO:root:Found : love
INFO:root:Found : love
INFO:root:Found : love

Par défaut, seules les lignes de journal de niveau INFO ou supérieur seront envoyées à Cloud Logging.

Java : SDK 1.x

Si le pipeline WordCount modifié est exécuté en local à l'aide de l'exécuteur par défaut DirectPipelineRunner et que la sortie est envoyée dans un fichier local (--output=./local-wordcounts), la sortie de la console inclut les messages de journal ajoutés :

INFO: Executing pipeline using the DirectPipelineRunner.
...
Feb 11, 2015 1:13:22 PM com.google.cloud.dataflow.examples.WordCount$ExtractWordsFn processElement
INFO: Found love
Feb 11, 2015 1:13:22 PM com.google.cloud.dataflow.examples.WordCount$ExtractWordsFn processElement
INFO: Found love
Feb 11, 2015 1:13:22 PM com.google.cloud.dataflow.examples.WordCount$ExtractWordsFn processElement
INFO: Found love
...
INFO: Pipeline execution complete.

Par défaut, seules les lignes de journal de niveau INFO ou supérieur seront envoyées à Cloud Logging. Si vous souhaitez modifier ce comportement, consultez la section Définir les niveaux de journalisation des nœuds de calcul d'un pipeline.

Surveiller les journaux de pipeline

Lorsque vous exécutez votre pipeline sur le service Cloud Dataflow, l'interface de surveillance de Cloud Dataflow vous permet d'afficher les journaux émis par votre pipeline.

Exemple de journal de nœud de calcul Cloud Dataflow

Pour exécuter le pipeline WordCount modifié dans le Cloud, spécifiez les options suivantes :

Java : SDK 2.x

--project=WordCountExample
--output=gs://<bucket-name>/counts
--runner=DataflowRunner
--tempLocation=gs://<bucket-name>/temp
--stagingLocation=gs://<bucket-name>/binaries

Python

--project=WordCountExample
--output=gs://<bucket-name>/counts
--runner=DataflowRunner
--staging_location=gs://<bucket-name>/binaries

Java : SDK 1.x

--project=WordCountExample
--output=gs://<bucket-name>/counts
--runner=BlockingDataflowPipelineRunner
--stagingLocation=gs://<bucket-name>/binaries

Afficher le résumé et l'état d'une tâche

Étant donné que le pipeline cloud WordCount utilise l'exécution bloquante, des messages de la console sont générés pendant l'exécution du pipeline. Une fois la tâche démarrée, un lien vers la page de la console GCP est généré sur la console, suivi de l'ID de tâche du pipeline :

INFO: To access the Cloud Dataflow monitoring console, please navigate to
https://console.developers.google.com/dataflow/job/2017-04-13_13_58_10-6217777367720337669
Submitted job: 2017-04-13_13_58_10-6217777367720337669

L'URL de la console mène à l'interface de surveillance de Cloud Dataflow, qui présente la page de résumé de la tâche envoyée. Cette page affiche un graphique d'exécution dynamique sur la gauche et des informations récapitulatives sur la droite :

Le bouton Logs (Journaux) ouvre le panneau des journaux au bas de la page, qui affiche par défaut les messages de la catégorie Job Logs (Journaux des tâches), indiquant l'état de la tâche dans son ensemble. Vous disposez du sélecteur Niveau de gravité minimal pour filtrer les messages relatifs à la progression et à l'état de la tâche.

La sélection d'une étape du pipeline dans le graphique remplace la vue affichée par la vue Step Logs (Journaux des étapes), qui comprend les messages générés par votre code et le code généré s'exécutant dans le cadre de cette étape du pipeline.

Pour revenir à la vue Job Logs (Journaux des tâches), désélectionnez l'étape en cliquant à l'extérieur du graphique ou en utilisant le bouton "Fermer" du panneau de droite.

Afficher les journaux

La vue Step Logs (Journaux des étapes) de l'interface de surveillance de Cloud Dataflow affiche uniquement les messages de journal les plus récents. Vous pouvez afficher tous les journaux d'une étape de pipeline dans Stackdriver Logging en cliquant sur le lien Stackdriver à droite du volet des journaux.

La journalisation inclut également d'autres journaux d'infrastructure correspondant à votre pipeline. En cliquant sur le bouton de lien externe depuis le volet "Job Logs" (Journaux des tâches), vous accédez à Cloud Logging, qui vous présente un menu permettant de sélectionner différents types de journaux.

Voici un récapitulatif des différents types de journaux consultables à partir de la page Surveillance → Journaux :

  • Les journaux worker sont produits par les nœuds de calcul Cloud Dataflow. Les nœuds de calcul effectuent la majeure partie du travail en pipeline (par exemple, en appliquant vos objets ParDo aux données). Les journaux des nœuds de calcul contiennent des messages enregistrés par votre code et par Cloud Dataflow.
  • Les journaux worker-startup sont présents sur la plupart des tâches Cloud Dataflow et peuvent capturer les messages liés au processus de démarrage. Le processus de démarrage comprend le téléchargement des fichiers JAR d'une tâche à partir de Cloud Storage, puis le démarrage des nœuds de calcul. Il est recommandé d'examiner ces journaux en cas de problème lié au démarrage des nœuds de calcul.
  • Les journaux shuffler accueillent les messages des nœuds de calcul qui consolident les résultats des opérations de pipeline exécutées en parallèle.
  • Les journaux docker et kubelet contiennent les messages liés à ces technologies publiques, qui sont utilisées sur les nœuds de calcul Cloud Dataflow.

Définir les niveaux de journalisation des nœuds de calcul d'un pipeline

Java : SDK 2.x

Le niveau de journalisation SLF4J par défaut défini sur les nœuds de calcul par le SDK Apache Beam pour Java est INFO. Tous les messages de journal de niveau INFO ou supérieur (INFO, WARN, ERROR) sont émis. Vous pouvez redéfinir ce paramètre par défaut afin de prendre en charge des niveaux de journalisation SLF4J inférieurs (TRACE ou DEBUG) ou de définir des niveaux de journalisation distincts pour différents packages de classe de votre code.

Vous disposez de deux options de pipeline permettant de définir les niveaux de journalisation des nœuds de calcul à partir de la ligne de commande ou de manière automatisée :

  • --defaultWorkerLogLevel=<level> : cette option permet de définir tous les enregistreurs au niveau par défaut spécifié. Par exemple, l'option de ligne de commande suivante ignore le niveau de journalisation par défaut INFO de Cloud Dataflow et le redéfinit sur DEBUG :
    --defaultWorkerLogLevel=DEBUG
  • --workerLogLevelOverrides={"<package or class>":"<level>"} : cette option permet de définir le niveau de journalisation pour le ou les packages ou classes spécifiés. Par exemple, pour ignorer le niveau de journalisation par défaut du pipeline pour le package com.google.cloud.dataflow et le définir sur TRACE :
    --workerLogLevelOverrides={"com.google.cloud.dataflow":"TRACE"}
    ou, pour ignorer le niveau de journalisation par défaut du pipeline pour la classe com.google.cloud.Foo et le définir sur DEBUG :
    --workerLogLevelOverrides={"com.google.cloud.Foo":"DEBUG"}
    Vous pouvez effectuer simultanément plusieurs remplacements en fournissant une carte JSON :
    --workerLogLevelOverrides={"<package/class>":"<level>","<package/class>":"<level>",...}

L'exemple suivant définit de façon automatisée les options de journalisation du pipeline avec des valeurs par défaut pouvant être remplacées à partir de la ligne de commande :

 PipelineOptions options = ...
 DataflowWorkerLoggingOptions loggingOptions = options.as(DataflowWorkerLoggingOptions.class);
 // Overrides the default log level on the worker to emit logs at TRACE or higher.
 loggingOptions.setDefaultWorkerLogLevel(Level.TRACE);
 // Overrides the Foo class and "com.google.cloud.dataflow" package to emit logs at WARN or higher.
 loggingOptions.setWorkerLogLevelOverrides(
     WorkerLogLevelOverride.forClass(Foo.class, Level.WARN),
     WorkerLogLevelOverride.forPackage(Package.getPackage("com.google.cloud.dataflow"), Level.WARN));

Python

Cette fonctionnalité n'est pas encore disponible dans le SDK Apache Beam pour Python.

Java : SDK 1.x

Le niveau de journalisation SLF4J par défaut défini sur les nœuds de calcul par le SDK Apache Beam pour Java est INFO. Tous les messages de journal de niveau INFO ou supérieur (INFO, WARN, ERROR) sont émis. Vous pouvez redéfinir ce paramètre par défaut afin de prendre en charge des niveaux de journalisation SLF4J inférieurs (TRACE ou DEBUG) ou de définir des niveaux de journalisation distincts pour différents packages de classe de votre code.

Vous disposez de deux options de pipeline permettant de définir les niveaux de journalisation des nœuds de calcul à partir de la ligne de commande ou de manière automatisée :

  • --defaultWorkerLogLevel=<level> : cette option permet de définir tous les enregistreurs au niveau par défaut spécifié. Par exemple, l'option de ligne de commande suivante ignore le niveau de journalisation par défaut INFO et le redéfinit sur DEBUG :
    --defaultWorkerLogLevel=DEBUG
  • --workerLogLevelOverrides={"<package or class>":"<level>"} : cette option permet de définir le niveau de journalisation pour le ou les packages ou classes spécifiés. Par exemple, pour ignorer le niveau de journalisation par défaut du pipeline pour le package com.google.cloud.dataflow et le définir sur TRACE :
    --workerLogLevelOverrides={"com.google.cloud.dataflow":"TRACE"}
    ou, pour ignorer le niveau de journalisation par défaut du pipeline pour la classe com.google.cloud.Foo et le définir sur DEBUG :
    --workerLogLevelOverrides={"com.google.cloud.Foo":"DEBUG"}
    Vous pouvez effectuer simultanément plusieurs remplacements en fournissant une carte JSON :
    --workerLogLevelOverrides={"<package/class>":"<level>","<package/class>":"<level>",...}

L'exemple suivant définit par programme les options de journalisation du pipeline avec des valeurs par défaut que vous pouvez remplacer depuis la ligne de commande :

 PipelineOptions options = ...
 DataflowWorkerLoggingOptions loggingOptions = options.as(DataflowWorkerLoggingOptions.class);
 // Overrides the default log level on the worker to emit logs at TRACE or higher.
 loggingOptions.setDefaultWorkerLogLevel(Level.TRACE);
 // Overrides the Foo class and "com.google.cloud.dataflow" package to emit logs at WARN or higher.
 loggingOptions.setWorkerLogLevelOverrides(
     WorkerLogLevelOverride.forClass(Foo.class, Level.WARN),
     WorkerLogLevelOverride.forPackage(Package.getPackage("com.google.cloud.dataflow"), Level.WARN));

Cette page vous a-t-elle été utile ? Évaluez-la :

Envoyer des commentaires concernant…

Besoin d'aide ? Consultez notre page d'assistance.