Enregistrer les messages de pipeline

Vous pouvez utiliser l'infrastructure de journalisation intégrée du SDK Apache Beam pour enregistrer des informations pendant l'exécution de votre pipeline. Vous pouvez exploiter Google Cloud Console pour surveiller les informations de journalisation pendant et après l'exécution de votre pipeline.

Ajouter des messages de journal à votre pipeline

Java : SDK 2.x

Le SDK Apache Beam pour Java recommande de journaliser les messages des nœuds de calcul via la bibliothèque Open Source SLF4J (Simple Logging Facade for Java). Le SDK Apache Beam pour Java met en œuvre l'infrastructure de journalisation requise, de sorte que votre code Java n'a besoin que d'importer l'API SLF4J. Il instancie ensuite un enregistreur pour activer la journalisation des messages dans le code de votre pipeline.

Pour le code et/ou les bibliothèques préexistants, le SDK Apache Beam pour Java configure une infrastructure de journalisation supplémentaire. Cela se produit lors de l'exécution sur le nœud de calcul, de façon à capturer les messages de journal générés par les bibliothèques de journalisation suivantes pour Java :

Python

Le SDK Apache Beam pour Python propose le package de bibliothèques logging, qui permet aux nœuds de calcul de votre pipeline de générer des messages de journal. Pour utiliser les fonctions de la bibliothèque, vous devez importer celle-ci :

import logging

Java : SDK 1.x

Exemple de code pour générer les messages de journal d'un nœud de calcul

Java : SDK 2.x

L'exemple Apache Beam WordCount peut être modifié pour générer un message de journal lorsque le mot "love" est trouvé dans une ligne du texte traité. Le code ajouté est indiqué en gras ci-dessous (le code environnant est inclus pour situer le contexte).

 package org.apache.beam.examples;
 // Import SLF4J packages.
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 ...
 public class WordCount {
   ...
   static class ExtractWordsFn extends DoFn<String, String> {
     // Instantiate Logger.
     // Suggestion: As shown, specify the class name of the containing class
     // (WordCount).
     private static final Logger LOG = LoggerFactory.getLogger(WordCount.class);
     ...
     @ProcessElement
     public void processElement(ProcessContext c) {
       ...
       // Output each word encountered into the output PCollection.
       for (String word : words) {
         if (!word.isEmpty()) {
           c.output(word);
         }
         // Log INFO messages when the word "love" is found.
         if(word.toLowerCase().equals("love")) {
           LOG.info("Found " + word.toLowerCase());
         }
       }
     }
   }
 ... // Remaining WordCount example code ...

Python

L'exemple Apache Beam wordcount.py peut être modifié pour générer un message de journal lorsque le mot "love" est trouvé dans une ligne du texte traité.

# import Python logging module.
import logging

class ExtractWordsFn(beam.DoFn):
  def process(self, element):
    words = re.findall(r'[A-Za-z\']+', element)
    for word in words:
      yield word

      if word.lower() == 'love':
        # Log using the root logger at info or higher levels
        logging.info('Found : %s', word.lower())

# Remaining WordCount example code ...

Java : SDK 1.x

Java : SDK 2.x

Si le pipeline WordCount modifié est exécuté en local à l'aide de l'exécuteur par défaut DirectRunner et que la sortie est envoyée dans un fichier local (--output=./local-wordcounts), la sortie de la console inclut les messages de journal ajoutés :

INFO: Executing pipeline using the DirectRunner.
...
Feb 11, 2015 1:13:22 PM org.apache.beam.examples.WordCount$ExtractWordsFn processElement
INFO: Found love
Feb 11, 2015 1:13:22 PM org.apache.beam.examples.WordCount$ExtractWordsFn processElement
INFO: Found love
Feb 11, 2015 1:13:22 PM org.apache.beam.examples.WordCount$ExtractWordsFn processElement
INFO: Found love
...
INFO: Pipeline execution complete.

Par défaut, seules les lignes de journal de niveau INFO ou supérieur seront envoyées à Cloud Logging. Si vous souhaitez modifier ce comportement, consultez la section Définir les niveaux de journalisation des nœuds de calcul d'un pipeline.

Python

Si le pipeline WordCount modifié est exécuté en local à l'aide de l'exécuteur par défaut DirectRunner et que la sortie est envoyée dans un fichier local (--output=./local-wordcounts), la sortie de la console inclut les messages de journal ajoutés :

INFO:root:Found : love
INFO:root:Found : love
INFO:root:Found : love

Par défaut, seules les lignes de journal de niveau INFO ou supérieur seront envoyées à Cloud Logging.

Java : SDK 1.x

Limite et limitation de journalisation

Les messages de journal de nœud de calcul sont limités à 15 000 messages toutes les 30 secondes, par nœud de calcul. Si cette limite est atteinte, un seul message de journal de nœud de calcul est ajouté indiquant que la journalisation est limitée :

Throttling logger worker. It used up its 30s quota for logs in only 12.345s
Aucun autre message n'est consigné avant la fin de l'intervalle de 30 secondes. Cette limite est partagée entre les messages de journal générés par le SDK Apache Beam et le code utilisateur.

Surveiller les journaux de pipeline

Lorsque vous exécutez votre pipeline sur le service Dataflow, l'interface de surveillance de Dataflow vous permet d'afficher les journaux émis par votre pipeline.

Exemple de journal de nœud de calcul Dataflow

Pour exécuter le pipeline WordCount modifié dans le Cloud, spécifiez les options suivantes :

Java : SDK 2.x

--project=WordCountExample
--output=gs://<bucket-name>/counts
--runner=DataflowRunner
--tempLocation=gs://<bucket-name>/temp
--stagingLocation=gs://<bucket-name>/binaries

Python

--project=WordCountExample
--output=gs://<bucket-name>/counts
--runner=DataflowRunner
--staging_location=gs://<bucket-name>/binaries

Java : SDK 1.x

Afficher les journaux

Étant donné que le pipeline cloud WordCount utilise l'exécution bloquante, des messages de la console sont générés pendant l'exécution du pipeline. Une fois la tâche démarrée, un lien vers la page Cloud Console est généré dans la console, suivi de l'ID de la tâche du pipeline :

INFO: To access the Dataflow monitoring console, please navigate to
https://console.developers.google.com/dataflow/job/2017-04-13_13_58_10-6217777367720337669
Submitted job: 2017-04-13_13_58_10-6217777367720337669

L'URL de la console mène à l'interface de surveillance de Dataflow, qui présente la page de résumé de la tâche envoyée. Cette page affiche un graphique d'exécution dynamique sur la gauche et des informations récapitulatives sur la droite : Cliquez sur dans le panneau inférieur pour développer le panneau des journaux.

Par défaut, le panneau des journaux affiche les journaux des tâches indiquant l'état de la tâche dans son ensemble. Vous pouvez filtrer les messages qui apparaissent dans le panneau des journaux en cliquant sur Info (Infos) et sur Filter logs (Filtrer les journaux).

La sélection d'une étape du pipeline dans le graphique remplace la vue affichée par la vue Step Logs (Journaux des étapes), qui comprend les messages générés par votre code et le code généré s'exécutant dans le cadre de cette étape du pipeline.

Pour revenir à la vue Job Logs (Journaux des tâches), désélectionnez l'étape en cliquant à l'extérieur du graphique ou en utilisant le bouton Deslect step (Désélectionner l'étape) du panneau de droite.

En cliquant sur le bouton de lien externe du panneau des journaux, vous accédez à Logging, qui vous présente un menu permettant de sélectionner différents types de journaux.

La journalisation inclut également d'autres journaux d'infrastructure correspondant à votre pipeline. Pour en savoir plus sur l'exploration des journaux, consultez le guide de l'explorateur de journaux.

Voici un récapitulatif des différents types de journaux consultables à partir de la page Logging :

  • Les journaux job-message contiennent des messages au niveau des tâches générés par plusieurs composants de Dataflow. Il peut s'agir de la configuration de l'autoscaling, du démarrage ou de l'arrêt des nœuds de calcul, de la progression d'une étape de la tâche et des erreurs concernant la tâche. Les erreurs au niveau des nœuds de calcul, qui sont dues au plantage du code utilisateur et qui s'affichent dans les journaux worker, se propagent également dans les journaux job-message.
  • Les journaux worker sont produits par les nœuds de calcul Dataflow. Les nœuds de calcul effectuent la majeure partie du travail en pipeline (par exemple, en appliquant vos objets ParDo aux données). Les journaux worker contiennent des messages enregistrés par votre code et par Dataflow.
  • Les journaux worker-startup sont présents sur la plupart des tâches Dataflow et peuvent capturer les messages liés au processus de démarrage. Le processus de démarrage comprend le téléchargement des fichiers JAR d'une tâche à partir de Cloud Storage, puis le démarrage des nœuds de calcul. Il est recommandé d'examiner ces journaux en cas de problème lié au démarrage des nœuds de calcul.
  • Les journaux shuffler accueillent les messages des nœuds de calcul qui consolident les résultats des opérations de pipeline exécutées en parallèle.
  • Les journaux docker et kubelet contiennent les messages liés à ces technologies publiques, qui sont utilisées sur les nœuds de calcul Dataflow.

Définir les niveaux de journalisation des nœuds de calcul d'un pipeline

Java : SDK 2.x

Le niveau de journalisation SLF4J par défaut défini sur les nœuds de calcul par le SDK Apache Beam pour Java est INFO. Tous les messages de journal de niveau INFO ou supérieur (INFO, WARN, ERROR) sont émis. Vous pouvez redéfinir ce paramètre par défaut afin de prendre en charge des niveaux de journalisation SLF4J inférieurs (TRACE ou DEBUG) ou de définir des niveaux de journalisation distincts pour différents packages de classe de votre code.

Vous disposez de deux options de pipeline permettant de définir les niveaux de journalisation des nœuds de calcul à partir de la ligne de commande ou de manière automatisée :

  • --defaultWorkerLogLevel=<level> : cette option permet de définir tous les enregistreurs au niveau par défaut spécifié. Par exemple, l'option de ligne de commande suivante remplace le niveau de journalisation Dataflow INFO par défaut et le définit sur DEBUG :
    --defaultWorkerLogLevel=DEBUG
  • --workerLogLevelOverrides={"<package or class>":"<level>"} : cette option permet de définir le niveau de journalisation pour le ou les packages ou classes spécifiés. Par exemple, pour ignorer le niveau de journalisation par défaut du pipeline pour le package com.google.cloud.dataflow, et le définir sur TRACE :
    --workerLogLevelOverrides={"com.google.cloud.dataflow":"TRACE"}
    ou, pour ignorer le niveau de journalisation par défaut du pipeline pour la classe com.google.cloud.Foo, et le définir sur DEBUG :
    --workerLogLevelOverrides={"com.google.cloud.Foo":"DEBUG"}
    Vous pouvez ignorer plusieurs paramètres simultanément en fournissant une carte JSON :
    (--workerLogLevelOverrides={"<package/class>":"<level>","<package/class>":"<level>",...}).

L'exemple suivant définit de façon automatisée les options de journalisation du pipeline avec des valeurs par défaut pouvant être remplacées à partir de la ligne de commande :

 PipelineOptions options = ...
 DataflowWorkerLoggingOptions loggingOptions = options.as(DataflowWorkerLoggingOptions.class);
 // Overrides the default log level on the worker to emit logs at TRACE or higher.
 loggingOptions.setDefaultWorkerLogLevel(Level.TRACE);
 // Overrides the Foo class and "com.google.cloud.dataflow" package to emit logs at WARN or higher.
 loggingOptions.setWorkerLogLevelOverrides(
     WorkerLogLevelOverride.forClass(Foo.class, Level.WARN),
     WorkerLogLevelOverride.forPackage(Package.getPackage("com.google.cloud.dataflow"), Level.WARN));

Python

Cette fonctionnalité n'est pas encore disponible dans le SDK Apache Beam pour Python.

Java : SDK 1.x

Afficher le journal des tâches BigQuery lancées

Lorsque vous utilisez BigQuery dans votre pipeline Dataflow, des tâches BigQuery sont lancées pour effectuer diverses actions en votre nom, par exemple pour charger des données, exporter des données, etc. À des fins de dépannage et de surveillance, l'interface utilisateur de surveillance de Dataflow contient des informations supplémentaires sur ces tâches BigQuery disponibles dans le panneau Journaux.

Les informations sur les tâches BigQuery affichées dans le panneau Journaux sont stockées et chargées à partir d'une table système BigQuery. Par conséquent, des coûts de facturation sont générés lorsque la table BigQuery sous-jacente est interrogée.

Configurer le projet

Pour afficher les informations sur les tâches BigQuery, votre pipeline doit utiliser Apache Beam 2.24.0 ou version ultérieure. Cependant, tant que cette version n'est pas disponible, vous devez utiliser une version de développement du SDK Apache Beam créée à partir de la branche principale.

Java : SDK 2.x

  1. Ajoutez le profil suivant au fichier pom.xml de votre projet.

    <profiles>
      <!-- Additional profiles listed here. -->
      <profile>
        <id>snapshot</id>
        <repositories>
          <repository>
            <id>apache.snapshots</id>
            <url>https://repository.apache.org/content/repositories/snapshots</url>
          </repository>
        </repositories>
      </profile>
    </profiles>
    
  2. Lorsque vous testez ou exécutez votre projet, définissez l'option de profil sur la valeur id répertoriée dans votre fichier pom.xml, et définissez la propriété beam.version sur 2.24.0-SNAPSHOT ou une version ultérieure. Exemple :

    mvn test -Psnapshot -Dbeam.version=2.24.0-SNAPSHOT
    

    Pour obtenir plus de valeurs d'instantané, consultez l'index d'instantané.

Python

  1. Connectez-vous à GitHub.

  2. Accédez à la liste des résultats pour connaître les versions terminées du SDK Apache Beam pour Python.

  3. Cliquez sur une tâche récemment terminée qui a été créée à partir de la branche principale (maître).

  4. Dans le panneau latéral, cliquez sur Répertorier les fichiers sur le bucket Google Cloud Storage.

  5. Dans le panneau principal, développez l'option Répertorier les fichiers sur le bucket Google Cloud Storage.

  6. Téléchargez le fichier ZIP de la liste de fichiers vers un ordinateur local ou l'emplacement dans lequel vous exécutez votre projet Python.

    Le nom du bucket Cloud Storage est beam-wheels-staging. Vous devez donc l'inclure lorsque vous créez l'URL de téléchargement. Exemple :

    gsutil cp gs://beam-wheels-staging/master/02bf081d0e86f16395af415cebee2812620aff4b-207975627/apache-beam-2.25.0.dev0.zip <var>SAVE_TO_LOCATION</var>
    
  7. Installez le fichier ZIP téléchargé.

    pip install apache-beam-2.25.0.dev0.zip
    
  8. Lorsque vous exécutez votre pipeline Apache Beam, transmettez l'option --sdk_location et référencez le fichier ZIP du SDK.

    --sdk_location=apache-beam-2.25.0.dev0.zip
    

Java : SDK 1.x

Afficher les détails d'une tâche BigQuery

Pour répertorier les tâches BigQuery, ouvrez l'onglet BigQuery Jobs (Tâches BigQuery) et sélectionnez l'emplacement des tâches BigQuery. Cliquez ensuite sur Load BigQuery Jobs (Charger les tâches BigQuery) et confirmez la boîte de dialogue. Une fois la requête terminée, la liste des tâches s'affiche.

Bouton &quot;Load BigQuery Jobs&quot; (Charger les tâches BigQuery) dans la table d&#39;informations des tâches BigQuery

Des informations de base sur chaque tâche sont fournies, y compris l'ID, le type et la durée.

Une table montrant les tâches BigQuery exécutées lors de l&#39;exécution de la tâche de pipeline en cours.

Pour obtenir des informations plus détaillées sur une tâche spécifique, cliquez sur Command line (Ligne de commande) dans la colonne More Info (Plus d'infos).

Dans la fenêtre modale de la ligne de commande, copiez la commande bq jobs describe et exécutez-la localement ou dans Cloud Shell.

gcloud alpha bq jobs describe BIGQUERY_JOB_ID

La commande bq jobs describe génère JobStatistics, qui fournit des informations supplémentaires utiles pour le diagnostic d'une tâche BigQuery lente ou bloquée.

Sinon, lorsque vous utilisez BigQueryIO avec une requête SQL, une tâche de requête est émise. Cliquez sur View query (Afficher la requête) dans la colonne More Info (Plus d'infos) pour voir la requête SQL utilisée par la tâche.