Registra mensajes de canalización

Puedes usar la infraestructura de registro incorporada del SDK de Cloud Dataflow para registrar información durante la ejecución de tu canalización. Puedes usar Google Cloud Platform Console para supervisar la información de registro durante y después de la ejecución de tu canalización.

Agrega mensajes de registro a tu canalización

Java: SDK 2.x

Si usas el SDK de Apache Beam para Java, se recomienda que registres los mensajes de los trabajadores a través de la biblioteca de código abierto SLF4J (Simple Logging Facade para Java). El SDK de Apache Beam para Java implementa la infraestructura de registro requerida, por lo que tu código de Java solo necesita importar la API de SLF4J. Luego, el SDK crea una instancia de Registrador para habilitar el registro de mensajes dentro de tu código de canalización.

En códigos o bibliotecas preexistentes, el SDK de Apache Beam para Java configura una infraestructura de registro adicional. Esto ocurre cuando se ejecuta en el trabajador a fin de capturar los mensajes de registro que producen las siguientes bibliotecas de registro para Java:

Python

El SDK de Apache Beam para Python proporciona el paquete de biblioteca logging, que permite a los trabajadores de tu canalización emitir mensajes de registro. Para usar las funciones de la biblioteca, debes importarla:

import logging

Java: SDK 1.x

Si usas el SDK de Apache Beam para Java, se recomienda que registres los mensajes de los trabajadores a través de la biblioteca de código abierto SLF4J (Simple Logging Facade para Java). El SDK de Apache Beam para Java implementa la infraestructura de registro requerida, por lo que tu código de Java solo necesita importar la API de SLF4J. Luego, el SDK crea una instancia de Registrador para habilitar el registro de mensajes dentro de tu código de canalización.

En códigos o bibliotecas preexistentes, el SDK de Apache Beam para Java configura una infraestructura de registro adicional. Esto ocurre cuando se ejecuta en el trabajador a fin de capturar los mensajes de registro que producen las siguientes bibliotecas de registro para Java:

Ejemplo de código de mensaje de registro de trabajador

Java: SDK 2.x

El ejemplo de WordCount de Apache Beam puede modificarse para emitir un mensaje de registro cuando se encuentra la palabra “love” en una línea del texto procesado. El código agregado se indica en negrita a continuación (se incluye el código que lo rodea para brindar contexto).

 package org.apache.beam.examples;
 // Import SLF4J packages.
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 ...
 public class WordCount {
   ...
   static class ExtractWordsFn extends DoFn<String, String> {
     // Instantiate Logger.
     // Suggestion: As shown, specify the class name of the containing class
     // (WordCount).
     private static final Logger LOG = LoggerFactory.getLogger(WordCount.class);
     ...
     @ProcessElement
     public void processElement(ProcessContext c) {
       ...
       // Output each word encountered into the output PCollection.
       for (String word : words) {
         if (!word.isEmpty()) {
           c.output(word);
         }
         // Log INFO messages when the word "love" is found.
         if(word.toLowerCase().equals("love")) {
           LOG.info("Found " + word.toLowerCase());
         }
       }
     }
   }
 ... // Remaining WordCount example code ...

Python

El ejemplo de wordcount.py de Apache Beam puede modificarse para emitir un mensaje de registro cuando se encuentra la palabra “love” en una línea del texto procesado.

# import Python logging module.
import logging

class ExtractWordsFn(beam.DoFn):

  def process(self, element):
    words = re.findall(r'[A-Za-z\']+', element)
    for word in words:
      yield word

      if word.lower() == 'love':
        # Log using the root logger at info or higher levels
        logging.info('Found : %s', word.lower())

# Remaining WordCount example code ...

Java: SDK 1.x

El ejemplo de WordCount puede modificarse para emitir un mensaje de registro cuando se encuentra la palabra “love” en una línea del texto procesado. El código agregado se indica en negrita a continuación (se incluye el código que lo rodea para brindar contexto).

 package com.google.cloud.dataflow.examples;
 // Import SLF4J packages.
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 ...
 public class WordCount {
   ...
   static class ExtractWordsFn extends DoFn<String, String> {
     // Instantiate Logger.
     // Suggestion: As shown, specify the class name of the containing class
     // (WordCount).
     private static final Logger LOG = LoggerFactory.getLogger(WordCount.class);
     ...
     @Override
     public void processElement(ProcessContext c) {
       ...
       // Output each word encountered into the output PCollection.
       for (String word : words) {
         if (!word.isEmpty()) {
           c.output(word);
         }
         // Log INFO messages when the word "love" is found.
         if(word.toLowerCase().equals("love")) {
           LOG.info("Found " + word.toLowerCase());
         }
       }
     }
   }
 ... // Remaining WordCount example code ...

Java: SDK 2.x

Si la canalización de WordCount modificada se ejecuta de forma local mediante el DirectRunner predeterminado y los resultados se envían a un archivo local (--output=./local-wordcounts), el resultado de la consola incluye los mensajes de registro agregados:

INFO: Executing pipeline using the DirectRunner.
...
Feb 11, 2015 1:13:22 PM org.apache.beam.examples.WordCount$ExtractWordsFn processElement
INFO: Found love
Feb 11, 2015 1:13:22 PM org.apache.beam.examples.WordCount$ExtractWordsFn processElement
INFO: Found love
Feb 11, 2015 1:13:22 PM org.apache.beam.examples.WordCount$ExtractWordsFn processElement
INFO: Found love
...
INFO: Pipeline execution complete.

De forma predeterminada, solo las líneas de registro marcadas como INFO y superiores se enviarán a Cloud Logging. Si deseas modificar este comportamiento, consulta Configura los niveles de registro de los trabajadores de la canalización.

Python

Si la canalización de WordCount modificada se ejecuta de forma local mediante el DirectRunner predeterminado y los resultados se envían a un archivo local (--output=./local-wordcounts), el resultado de la consola incluye los mensajes de registro agregados:

INFO:root:Found : love
INFO:root:Found : love
INFO:root:Found : love

De forma predeterminada, solo las líneas de registro marcadas como INFO y superiores se enviarán a Cloud Logging.

Java: SDK 1.x

Si la canalización de WordCount modificada se ejecuta de forma local mediante el DirectPipelineRunner predeterminado y los resultados se envían a un archivo local (--output=./local-wordcounts), el resultado de la consola incluye los mensajes de registro agregados:

INFO: Executing pipeline using the DirectPipelineRunner.
...
Feb 11, 2015 1:13:22 PM com.google.cloud.dataflow.examples.WordCount$ExtractWordsFn processElement
INFO: Found love
Feb 11, 2015 1:13:22 PM com.google.cloud.dataflow.examples.WordCount$ExtractWordsFn processElement
INFO: Found love
Feb 11, 2015 1:13:22 PM com.google.cloud.dataflow.examples.WordCount$ExtractWordsFn processElement
INFO: Found love
...
INFO: Pipeline execution complete.

De forma predeterminada, solo las líneas de registro marcadas como INFO y superiores se enviarán a Cloud Logging. Si deseas modificar este comportamiento, consulta Configura los niveles de registro de los trabajadores de la canalización.

Supervisa los registros de las canalizaciones

Cuando ejecutas tu canalización en el servicio de Cloud Dataflow, puedes usar la Interfaz de supervisión de Cloud Dataflow para ver los registros que emite tu canalización.

Ejemplo de registro de trabajador de Cloud Dataflow

La canalización de WordCount modificada se puede ejecutar en la nube con las siguientes opciones:

Java: SDK 2.x

--project=WordCountExample
--output=gs://<bucket-name>/counts
--runner=DataflowRunner
--tempLocation=gs://<bucket-name>/temp
--stagingLocation=gs://<bucket-name>/binaries

Python

--project=WordCountExample
--output=gs://<bucket-name>/counts
--runner=DataflowRunner
--staging_location=gs://<bucket-name>/binaries

Java: SDK 1.x

--project=WordCountExample
--output=gs://<bucket-name>/counts
--runner=BlockingDataflowPipelineRunner
--stagingLocation=gs://<bucket-name>/binaries

Mira el estado y el resumen de los trabajos

Debido a que la canalización en la nube de WordCount usa la ejecución de bloqueo, los mensajes de la consola se emiten durante la ejecución de la canalización. Una vez que se inicia el trabajo, se emite a la consola un vínculo a la página de GCP Console seguido del ID de trabajo de la canalización:

INFO: To access the Cloud Dataflow monitoring console, please navigate to
https://console.developers.google.com/dataflow/job/2017-04-13_13_58_10-6217777367720337669
Submitted job: 2017-04-13_13_58_10-6217777367720337669

La URL de la consola lleva a la Interfaz de supervisión de Cloud Dataflow con una página de resumen del trabajo enviado. Muestra un grafo de ejecución dinámico a la izquierda, con información de resumen a la derecha:

El botón Logs (Registros) abre el panel de registros inferior y muestra de forma predeterminada los mensajes de Job Logs (Registro de trabajo), que informan el estado del trabajo en su totalidad. Puedes usar el selector Minimum Severity (Gravedad mínima) para filtrar los mensajes de estado y del progreso del trabajo.

Seleccionar un paso de canalización en el grafo cambia la vista a Step Logs (Registros de pasos), que genera tu código, y al código generado que se ejecuta en el paso de la canalización.

Para volver a Job Logs, anula la selección del paso mediante un clic fuera del grafo o con el botón Close (cerrar) en el panel lateral derecho.

Visualiza los registros

Step Logs en la Interfaz de supervisión de Cloud Dataflow solo muestra los mensajes de registro más recientes. Puedes ver todos los Step Logs de un paso de la canalización en Stackdriver Logging si haces clic en el vínculo a Stackdriver en la parte derecha del panel de registros.

Logging también incluye otros registros de infraestructura para tu canalización. Hacer clic en el botón de vínculo externo desde Job Logs lleva a Cloud Logging con un menú para seleccionar diferentes tipos de registros.

Aquí hay un resumen de los diferentes tipos de registro disponibles para ver en la página Monitoring → Logs (Supervisión → Registros):

  • Los trabajadores de Cloud Dataflow generan registros de worker. Los trabajadores hacen la mayor parte del trabajo de canalización (p. ej., aplicar tus ParDo a los datos). Los registros de worker contienen mensajes que registran tu código y Cloud Dataflow.
  • Los registros de worker-startup están presentes en la mayoría de los trabajos de Cloud Dataflow y pueden capturar mensajes relacionados con el proceso de inicio. El proceso de inicio incluye la descarga de los archivos jar de un trabajo desde Cloud Storage seguida del inicio de los trabajadores. Si hay un problema cuando se inician los trabajadores, estos registros son un buen lugar para buscar información.
  • Los registros de shuffler contienen mensajes de los trabajadores que consolidan los resultados de las operaciones de canalización paralelas.
  • Los registros de docker y kubelet contienen mensajes relacionados con estas tecnologías públicas que se usan en los trabajadores de Cloud Dataflow.

Configura los niveles de registro de los trabajadores de la canalización

Java: SDK 2.x

El nivel de registro de SLF4J predeterminado que configura el SDK de Apache Beam para Java sobre los trabajadores es INFO. Se emitirán todos los mensajes de registro de INFO o superior (INFO, WARN o ERROR). Puedes configurar un nivel de registro predeterminado diferente para admitir niveles de registro SLF4J inferiores (TRACE o DEBUG) o configurar niveles de registro distintos para diferentes paquetes de clases en tu código.

Se proporcionan dos opciones de canalización para permitirte establecer niveles de registro de trabajadores desde la línea de comandos o de manera programática:

  • --defaultWorkerLogLevel=<level>: usa esta opción para configurar todos los registradores en el nivel predeterminado especificado. Por ejemplo, la siguiente opción de línea de comandos anulará el nivel de registro INFO predeterminado de Cloud Dataflow y lo establecerá en DEBUG:
    --defaultWorkerLogLevel=DEBUG
  • --workerLogLevelOverrides={"<package or class>":"<level>"}: usa esta opción a fin de configurar el nivel de registro para paquetes o clases especificados. Por ejemplo, si deseas anular el nivel de registro de canalización predeterminado para el paquete com.google.cloud.dataflow y establecerlo en TRACE, haz lo siguiente:
    --workerLogLevelOverrides={"com.google.cloud.dataflow":"TRACE"}
    o a fin de anular el nivel de registro de canalización predeterminado para la clase com.google.cloud.Foo y configurarlo como DEBUG, haz esto:
    --workerLogLevelOverrides={"com.google.cloud.Foo":"DEBUG"}
    Se pueden realizar múltiples anulaciones si se proporciona una asignación JSON:
    (--workerLogLevelOverrides={"<package/class>":"<level>","<package/class>":"<level>",...}).

El siguiente ejemplo configura de manera programática las opciones de registro de canalización con valores predeterminados que se pueden anular desde la línea de comandos:

 PipelineOptions options = ...
 DataflowWorkerLoggingOptions loggingOptions = options.as(DataflowWorkerLoggingOptions.class);
 // Overrides the default log level on the worker to emit logs at TRACE or higher.
 loggingOptions.setDefaultWorkerLogLevel(Level.TRACE);
 // Overrides the Foo class and "com.google.cloud.dataflow" package to emit logs at WARN or higher.
 loggingOptions.setWorkerLogLevelOverrides(
     WorkerLogLevelOverride.forClass(Foo.class, Level.WARN),
     WorkerLogLevelOverride.forPackage(Package.getPackage("com.google.cloud.dataflow"), Level.WARN));

Python

Esta función aún no está disponible en el SDK de Apache Beam para Python.

Java: SDK 1.x

El nivel de registro de SLF4J predeterminado que configura el SDK de Apache Beam para Java sobre los trabajadores es INFO. Se emitirán todos los mensajes de registro de INFO o superior (INFO, WARN o ERROR). Puedes configurar un nivel de registro predeterminado diferente para admitir niveles de registro SLF4J inferiores (TRACE o DEBUG) o configurar niveles de registro distintos para diferentes paquetes de clases en tu código.

Se proporcionan dos opciones de canalización para permitirte establecer niveles de registro de trabajadores desde la línea de comandos o de manera programática:

  • --defaultWorkerLogLevel=<level>: usa esta opción para configurar todos los registradores en el nivel predeterminado especificado. Por ejemplo, la siguiente opción de línea de comandos anulará el nivel de registro INFO predeterminado y lo establecerá en DEBUG:
    --defaultWorkerLogLevel=DEBUG
  • --workerLogLevelOverrides={"<package or class>":"<level>"}: usa esta opción a fin de configurar el nivel de registro para paquetes o clases especificados. Por ejemplo, si deseas anular el nivel de registro de canalización predeterminado para el paquete com.google.cloud.dataflow y establecerlo en TRACE, haz lo siguiente:
    --workerLogLevelOverrides={"com.google.cloud.dataflow":"TRACE"}
    o a fin de anular el nivel de registro de canalización predeterminado para la clase com.google.cloud.Foo y configurarlo como DEBUG, haz esto:
    --workerLogLevelOverrides={"com.google.cloud.Foo":"DEBUG"}
    Se pueden realizar múltiples anulaciones si se proporciona una asignación JSON:
    (--workerLogLevelOverrides={"<package/class>":"<level>","<package/class>":"<level>",...}).

El siguiente ejemplo configura de manera programática las opciones de registro de canalización con valores predeterminados que puedes anular desde la línea de comandos:

 PipelineOptions options = ...
 DataflowWorkerLoggingOptions loggingOptions = options.as(DataflowWorkerLoggingOptions.class);
 // Overrides the default log level on the worker to emit logs at TRACE or higher.
 loggingOptions.setDefaultWorkerLogLevel(Level.TRACE);
 // Overrides the Foo class and "com.google.cloud.dataflow" package to emit logs at WARN or higher.
 loggingOptions.setWorkerLogLevelOverrides(
     WorkerLogLevelOverride.forClass(Foo.class, Level.WARN),
     WorkerLogLevelOverride.forPackage(Package.getPackage("com.google.cloud.dataflow"), Level.WARN));

¿Te ha resultado útil esta página? Enviar comentarios:

Enviar comentarios sobre...

Si necesitas ayuda, visita nuestra página de asistencia.