Registra mensajes de canalización

Puedes usar la infraestructura de registro incorporada del SDK de Apache Beam para registrar información durante la ejecución de la canalización. Puedes usar Google Cloud Console para supervisar la información de registro durante y después de que se ejecute la canalización.

Agrega mensajes de registro a tu canalización

Java: SDK 2.x

Si usas el SDK de Apache Beam para Java, se recomienda que registres los mensajes de los trabajadores a través de la biblioteca de código abierto SLF4J (Simple Logging Facade for Java). El SDK de Apache Beam para Java implementa la infraestructura de registro requerida, por lo que tu código de Java solo necesita importar la API de SLF4J. Luego, el SDK crea una instancia de Registrador para habilitar el registro de mensajes dentro de tu código de canalización.

En códigos o bibliotecas preexistentes, el SDK de Apache Beam para Java configura una infraestructura de registro adicional. Esto ocurre cuando se ejecuta en el trabajador a fin de capturar los mensajes de registro que producen las siguientes bibliotecas de registro para Java:

Python

El SDK de Apache Beam para Python proporciona el paquete de biblioteca logging, que permite a los trabajadores de tu canalización emitir mensajes de registro. Para usar las funciones de la biblioteca, debes importarla:

import logging

Java: SDK 1.x

Ejemplo de código de mensaje de registro de trabajador

Java: SDK 2.x

El ejemplo de WordCount de Apache Beam puede modificarse para emitir un mensaje de registro cuando se encuentra la palabra “love” en una línea del texto procesado. El código agregado se indica en negrita a continuación (se incluye el código que lo rodea para brindar contexto).

 package org.apache.beam.examples;
 // Import SLF4J packages.
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 ...
 public class WordCount {
   ...
   static class ExtractWordsFn extends DoFn<String, String> {
     // Instantiate Logger.
     // Suggestion: As shown, specify the class name of the containing class
     // (WordCount).
     private static final Logger LOG = LoggerFactory.getLogger(WordCount.class);
     ...
     @ProcessElement
     public void processElement(ProcessContext c) {
       ...
       // Output each word encountered into the output PCollection.
       for (String word : words) {
         if (!word.isEmpty()) {
           c.output(word);
         }
         // Log INFO messages when the word "love" is found.
         if(word.toLowerCase().equals("love")) {
           LOG.info("Found " + word.toLowerCase());
         }
       }
     }
   }
 ... // Remaining WordCount example code ...

Python

El ejemplo de wordcount.py de Apache Beam puede modificarse para emitir un mensaje de registro cuando se encuentra la palabra “love” en una línea del texto procesado.

# import Python logging module.
import logging

class ExtractWordsFn(beam.DoFn):
  def process(self, element):
    words = re.findall(r'[A-Za-z\']+', element)
    for word in words:
      yield word

      if word.lower() == 'love':
        # Log using the root logger at info or higher levels
        logging.info('Found : %s', word.lower())

# Remaining WordCount example code ...

Java: SDK 1.x

Java: SDK 2.x

Si la canalización de WordCount modificada se ejecuta de forma local mediante el DirectRunner predeterminado y los resultados se envían a un archivo local (--output=./local-wordcounts), el resultado de la consola incluye los mensajes de registro agregados:

INFO: Executing pipeline using the DirectRunner.
...
Feb 11, 2015 1:13:22 PM org.apache.beam.examples.WordCount$ExtractWordsFn processElement
INFO: Found love
Feb 11, 2015 1:13:22 PM org.apache.beam.examples.WordCount$ExtractWordsFn processElement
INFO: Found love
Feb 11, 2015 1:13:22 PM org.apache.beam.examples.WordCount$ExtractWordsFn processElement
INFO: Found love
...
INFO: Pipeline execution complete.

De forma predeterminada, solo las líneas de registro marcadas como INFO y superiores se enviarán a Cloud Logging. Si deseas modificar este comportamiento, consulta Configura los niveles de registro de los trabajadores de la canalización.

Python

Si la canalización de WordCount modificada se ejecuta de forma local mediante el DirectRunner predeterminado y los resultados se envían a un archivo local (--output=./local-wordcounts), el resultado de la consola incluye los mensajes de registro agregados:

INFO:root:Found : love
INFO:root:Found : love
INFO:root:Found : love

De forma predeterminada, solo las líneas de registro marcadas como INFO y superiores se enviarán a Cloud Logging.

Java: SDK 1.x

Límite y regulación del registro

Los mensajes de registro de los trabajadores se limitan a 15,000 mensajes cada 30 segundos por trabajador. Si se alcanza este límite, se agrega un solo mensaje de registro de trabajador que indica que el registro está regulado:

Throttling logger worker. It used up its 30s quota for logs in only 12.345s
No se registrarán más mensajes hasta que finalice el intervalo de 30 segundos. Este límite se comparte mediante mensajes de registro generados por el SDK de Apache Beam y el código de usuario.

Supervisa los registros de las canalizaciones

Cuando ejecutas la canalización en el servicio de Dataflow, puedes usar la Interfaz de supervisión de Dataflow para ver los registros que emite tu canalización.

Ejemplo de registro de trabajador de Dataflow

La canalización de WordCount modificada se puede ejecutar en la nube con las siguientes opciones:

Java: SDK 2.x

--project=WordCountExample
--output=gs://<bucket-name>/counts
--runner=DataflowRunner
--tempLocation=gs://<bucket-name>/temp
--stagingLocation=gs://<bucket-name>/binaries

Python

--project=WordCountExample
--output=gs://<bucket-name>/counts
--runner=DataflowRunner
--staging_location=gs://<bucket-name>/binaries

Java: SDK 1.x

Cómo ver registros

Debido a que la canalización en la nube de WordCount usa la ejecución de bloqueo, los mensajes de la consola se emiten durante la ejecución de la canalización. Una vez que se inicia el trabajo, se envía a la consola un vínculo a la página de Cloud Console, seguido del ID del trabajo de canalización:

INFO: To access the Dataflow monitoring console, please navigate to
https://console.developers.google.com/dataflow/job/2017-04-13_13_58_10-6217777367720337669
Submitted job: 2017-04-13_13_58_10-6217777367720337669

La URL de la consola lleva a la Interfaz de supervisión de Dataflow con una página de resumen para el trabajo enviado. Muestra un grafo de ejecución dinámico a la izquierda, con información de resumen a la derecha: Haz clic en en el panel inferior para expandir el panel de registros.

El panel de registros muestra de forma predeterminada los Registros de trabajo que informan el estado del trabajo en su totalidad. Puedes filtrar los mensajes que aparecen en el panel de registros. Para ello, haz clic en Información y Filtrar registros.

Seleccionar un paso de canalización en el grafo cambia la vista a Step Logs (Registros de pasos), que genera tu código, y al código generado que se ejecuta en el paso de la canalización.

Para volver a Job Logs, anula la selección del paso mediante un clic fuera del grafo o con el botón Anular selección de paso en el panel lateral derecho.

Hacer clic en el botón de vínculo externo desde el panel de registros lleva a Logging con un menú para seleccionar diferentes tipos de registros.

Logging también incluye otros registros de infraestructura para la canalización. Para obtener más detalles sobre cómo explorar tus registros, consulta la guía Explorador de registros.

Aquí hay un resumen de los diferentes tipos de registro disponibles para ver en la página Logging:

  • Los registros de job-message contienen mensajes a nivel del trabajo que generan varios componentes de Dataflow. Los ejemplos incluyen la configuración del ajuste de escala automático, cuando los trabajadores inician o cierran una instancia, el avance en el paso del trabajo y los errores del trabajo. Los errores a nivel del trabajador que se originan en una falla del código del usuario y que están presentes en los registros del trabajador también se propagan a los registros job-message.
  • Los trabajadores de Dataflow producen los registros de worker. Los trabajadores realizan la mayor parte del trabajo de canalización (por ejemplo, mediante la aplicación de ParDo a los datos). Los registros de worker contienen mensajes registrados por tu código y Dataflow.
  • Los registros de worker-startup están presentes en la mayoría de los trabajos de Dataflow y pueden capturar los mensajes que se relacionan con el proceso de inicio. El proceso de inicio incluye la descarga de los archivos jar de un trabajo desde Cloud Storage seguida del inicio de los trabajadores. Si hay un problema cuando se inician los trabajadores, estos registros son un buen lugar para buscar información.
  • Los registros de shuffler contienen mensajes de los trabajadores que consolidan los resultados de las operaciones de canalización paralelas.
  • Los registros de docker y kubelet contienen mensajes relacionados con estas tecnologías públicas, que se usan en los trabajadores de Dataflow.

Configura los niveles de registro de los trabajadores de la canalización

Java: SDK 2.x

El nivel de registro predeterminado de SLF4J que configura el SDK de Apache Beam para Java sobre los trabajadores es INFO. Se emitirán todos los mensajes de registro de INFO o superior (INFO, WARN, ERROR). Puedes configurar un nivel de registro predeterminado diferente para admitir niveles de registro inferiores de SLF4J (TRACE o DEBUG) o configurar niveles de registro distintos para diferentes paquetes de clases en tu código.

Se proporcionan dos opciones de canalización para permitirte establecer niveles de registro de trabajadores desde la línea de comandos o de manera programática:

  • --defaultWorkerLogLevel=<level>: Usa esta opción para configurar todos los registradores en el nivel predeterminado especificado. Por ejemplo, la siguiente opción de línea de comandos anulará el nivel de registro INFO predeterminado de Dataflow y lo configurará como DEBUG:
    --defaultWorkerLogLevel=DEBUG
  • --workerLogLevelOverrides={"<package or class>":"<level>"}: Usa esta opción para configurar el nivel de registro para clases o paquetes especificados. Haz esto, por ejemplo, si quieres anular el nivel de registro de canalización predeterminado para el paquete com.google.cloud.dataflow y configurarlo como TRACE:
    --workerLogLevelOverrides={"com.google.cloud.dataflow":"TRACE"}
    o anular el nivel de registro predeterminado de la canalización para la clase com.google.cloud.Foo y configurarlo en DEBUG:
    --workerLogLevelOverrides={"com.google.cloud.Foo":"DEBUG"}
    Se pueden realizar varias anulaciones si se proporciona una asignación JSON:
    (--workerLogLevelOverrides={"<package/class>":"<level>","<package/class>":"<level>",...}).

En el siguiente ejemplo, se configuran de manera programática las opciones de registro de canalización con valores predeterminados que se pueden anular desde la línea de comandos:

 PipelineOptions options = ...
 DataflowWorkerLoggingOptions loggingOptions = options.as(DataflowWorkerLoggingOptions.class);
 // Overrides the default log level on the worker to emit logs at TRACE or higher.
 loggingOptions.setDefaultWorkerLogLevel(Level.TRACE);
 // Overrides the Foo class and "com.google.cloud.dataflow" package to emit logs at WARN or higher.
 loggingOptions.setWorkerLogLevelOverrides(
     WorkerLogLevelOverride.forClass(Foo.class, Level.WARN),
     WorkerLogLevelOverride.forPackage(Package.getPackage("com.google.cloud.dataflow"), Level.WARN));

Python

Esta función aún no está disponible en el SDK de Apache Beam para Python.

Java: SDK 1.x

Visualiza el registro de los trabajos de BigQuery iniciados

Cuando se usa BigQuery en la canalización de Dataflow, se inician los trabajos de BigQuery para realizar diversas acciones en tu nombre, como cargar datos, exportarlos, etcétera. Para solucionar problemas y supervisar, la IU de supervisión de Dataflow tiene información adicional sobre estos trabajos de BigQuery disponibles en el panel Registros.

La información de los trabajos de BigQuery que se muestra en el panel Registros se almacena y carga desde una tabla del sistema de BigQuery, por lo que se genera un costo de facturación cuando se consulta la tabla subyacente de BigQuery.

Configura el proyecto

Para ver la información de trabajos de BigQuery, la canalización debe usar Apache Beam 2.24.0 o una versión posterior. No obstante, hasta que se lance la versión previa, debes usar una versión de desarrollo del SDK de Apache Beam compilada a partir de la rama principal.

Java: SDK 2.x

  1. Agrega el siguiente perfil al archivo pom.xml del proyecto.

    <profiles>
      <!-- Additional profiles listed here. -->
      <profile>
        <id>snapshot</id>
        <repositories>
          <repository>
            <id>apache.snapshots</id>
            <url>https://repository.apache.org/content/repositories/snapshots</url>
          </repository>
        </repositories>
      </profile>
    </profiles>
    
  2. Cuando pruebes o ejecutes el proyecto, establece la opción de perfil en el valor id que aparece en tu pom.xml y establece la propiedad beam.version en 2.24.0-SNAPSHOT o posterior. Por ejemplo:

    mvn test -Psnapshot -Dbeam.version=2.24.0-SNAPSHOT
    

    Para obtener más valores de instantáneas, consulta el índice de instantáneas.

Python

  1. Accede a GitHub.

  2. Navega a la lista de resultados de las compilaciones exitosas del SDK de Apache Beam para Python.

  3. Haz clic en un trabajo recién completado que se compiló a partir de la rama principal (instancia principal).

  4. En el panel lateral, haz clic en List files on Google Cloud Storage Bucket.

  5. En el panel principal, expande List file on Google Cloud Storage Bucket.

  6. Descarga el archivo ZIP de la lista de archivos a una máquina local o ubicación donde ejecutas el proyecto de Python.

    El nombre del bucket de Cloud Storage es beam-wheels-staging, por lo que debes incluirlo cuando crees la URL de descarga. Por ejemplo:

    gsutil cp gs://beam-wheels-staging/master/02bf081d0e86f16395af415cebee2812620aff4b-207975627/apache-beam-2.25.0.dev0.zip <var>SAVE_TO_LOCATION</var>
    
  7. Instala el archivo ZIP que se descargó.

    pip install apache-beam-2.25.0.dev0.zip
    
  8. Cuando ejecutes la canalización de Apache Beam, pasa la marca --sdk_location y haz referencia al archivo ZIP del SDK.

    --sdk_location=apache-beam-2.25.0.dev0.zip
    

Java: SDK 1.x

Visualiza los detalles del trabajo de BigQuery

Para enumerar los trabajos de BigQuery, abre la pestaña BigQuery Jobs y selecciona la ubicación de los trabajos de BigQuery. A continuación, haz clic en Cargar trabajos de BigQuery (Load BigQuery Jobs) y confirma el diálogo. Una vez completada la consulta, se muestra la lista de trabajos.

El botón Cargar trabajos de BigQuery (Load BigQuery Jobs) en la tabla de información de trabajos de BigQuery

Se proporciona información básica sobre cada trabajo, como el ID de tarea, el tipo, la duración, etcétera.

Una tabla que muestra los trabajos de BigQuery que se ejecutaron durante la ejecución actual del trabajo de canalización.

Para obtener información más detallada sobre un trabajo específico, haz clic en Línea de comandos (Command line) en la columna Más información (More Info).

En la ventana modal de la línea de comandos, copia el comando bq jobs describe y ejecútalo de forma local o en Cloud Shell.

gcloud alpha bq jobs describe BIGQUERY_JOB_ID

El comando bq jobs describe genera JobStatistics, que proporciona más detalles que son útiles para diagnosticar un trabajo de BigQuery lento o bloqueado.

Como alternativa, cuando usas BigQueryIO con una consulta de SQL, se emite un trabajo de consulta. Haz clic en Ver consulta (View query) en la columna Más información (More Info) para ver la consulta de SQL que usa el trabajo.