Trabaja con registros de canalización

Puedes usar la infraestructura de registro incorporada del SDK de Apache Beam para registrar información cuando ejecutas la canalización. Puedes usar la consola de Google Cloud para supervisar la información de registro durante y después de que se ejecute la canalización.

Agrega mensajes de registro a tu canalización

Java

Si usas el SDK de Apache Beam para Java, se recomienda que registres los mensajes de los trabajadores a través de la biblioteca de código abierto Simple Logging Facade para Java (SLF4J). El SDK de Apache Beam para Java implementa la infraestructura de registro requerida, por lo que tu código de Java solo necesita importar la API de SLF4J. Luego, el SDK crea una instancia de Registrador para habilitar el registro de mensajes dentro de tu código de canalización.

En códigos o bibliotecas preexistentes, el SDK de Apache Beam para Java configura una infraestructura de registro adicional. Se capturan los mensajes de registro que producen las siguientes bibliotecas de registro para Java:

Python

El SDK de Apache Beam para Python proporciona el paquete de biblioteca logging, que permite a los trabajadores de la canalización emitir mensajes de registro. Para usar las funciones de la biblioteca, debes importarla:

import logging

Go

El SDK de Apache Beam para Go proporciona el paquete de biblioteca log, que permite a los trabajadores de la canalización emitir mensajes de registro. Para usar las funciones de la biblioteca, debes importarla:

import "github.com/apache/beam/sdks/v2/go/pkg/beam/log"

Ejemplo de código de mensaje de registro de trabajador

Java

En el siguiente ejemplo, se usa SLF4J para el registro de Dataflow. Si deseas obtener más información sobre cómo configurar SLF4J para el registro de Dataflow, consulta el artículo Sugerencias de Java.

El ejemplo de WordCount de Apache Beam puede modificarse para emitir un mensaje de registro cuando se encuentra la palabra “love” en una línea del texto procesado. El código agregado se indica en negrita en el siguiente ejemplo (se incluye el código que lo rodea para brindar contexto).

 package org.apache.beam.examples;
 // Import SLF4J packages.
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 ...
 public class WordCount {
   ...
   static class ExtractWordsFn extends DoFn<String, String> {
     // Instantiate Logger.
     // Suggestion: As shown, specify the class name of the containing class
     // (WordCount).
     private static final Logger LOG = LoggerFactory.getLogger(WordCount.class);
     ...
     @ProcessElement
     public void processElement(ProcessContext c) {
       ...
       // Output each word encountered into the output PCollection.
       for (String word : words) {
         if (!word.isEmpty()) {
           c.output(word);
         }
         // Log INFO messages when the word "love" is found.
         if(word.toLowerCase().equals("love")) {
           LOG.info("Found " + word.toLowerCase());
         }
       }
     }
   }
 ... // Remaining WordCount example code ...

Python

El ejemplo de wordcount.py de Apache Beam puede modificarse para emitir un mensaje de registro cuando se encuentra la palabra “love” en una línea del texto procesado.

# import Python logging module.
import logging

class ExtractWordsFn(beam.DoFn):
  def process(self, element):
    words = re.findall(r'[A-Za-z\']+', element)
    for word in words:
      yield word

      if word.lower() == 'love':
        # Log using the root logger at info or higher levels
        logging.info('Found : %s', word.lower())

# Remaining WordCount example code ...

Go

El ejemplo de wordcount.go de Apache Beam puede modificarse para emitir un mensaje de registro cuando se encuentra la palabra “love” en una línea del texto procesado.

func (f *extractFn) ProcessElement(ctx context.Context, line string, emit func(string)) {
    for _, word := range wordRE.FindAllString(line, -1) {
        // increment the counter for small words if length of words is
        // less than small_word_length
        if strings.ToLower(word) == "love" {
            log.Infof(ctx, "Found : %s", strings.ToLower(word))
        }

        emit(word)
    }
}

// Remaining Wordcount example

Java

Si la canalización de WordCount modificada se ejecuta de forma local mediante el DirectRunner predeterminado y los resultados se envían a un archivo local (--output=./local-wordcounts), el resultado de la consola incluye los mensajes de registro agregados:

INFO: Executing pipeline using the DirectRunner.
...
Feb 11, 2015 1:13:22 PM org.apache.beam.examples.WordCount$ExtractWordsFn processElement
INFO: Found love
Feb 11, 2015 1:13:22 PM org.apache.beam.examples.WordCount$ExtractWordsFn processElement
INFO: Found love
Feb 11, 2015 1:13:22 PM org.apache.beam.examples.WordCount$ExtractWordsFn processElement
INFO: Found love
...
INFO: Pipeline execution complete.

De forma predeterminada, solo las líneas de registro marcadas como INFO y superiores se enviarán a Cloud Logging. Si deseas modificar este comportamiento, consulta Configura los niveles de registro de los trabajadores de la canalización.

Python

Si la canalización de WordCount modificada se ejecuta de forma local mediante el DirectRunner predeterminado y los resultados se envían a un archivo local (--output=./local-wordcounts), el resultado de la consola incluye los mensajes de registro agregados:

INFO:root:Found : love
INFO:root:Found : love
INFO:root:Found : love

De forma predeterminada, solo las líneas de registro marcadas como INFO y superiores se enviarán a Cloud Logging.

Go

Si la canalización de WordCount modificada se ejecuta de forma local mediante el DirectRunner predeterminado y los resultados se envían a un archivo local (--output=./local-wordcounts), el resultado de la consola incluye los mensajes de registro agregados:

2022/05/26 11:36:44 Found : love
2022/05/26 11:36:44 Found : love
2022/05/26 11:36:44 Found : love

De forma predeterminada, solo las líneas de registro marcadas como INFO y superiores se enviarán a Cloud Logging.

Controla el volumen de registros

También puedes reducir el volumen de registros generados si cambias los niveles de registro de la canalización. Si no deseas continuar con la transferencia de algunos o todos tus registros de Dataflow, agrega una exclusión de Logging para excluir los registros de Dataflow. Luego, exporta los registros a un destino diferente, como BigQuery, Cloud Storage o Pub/Sub. Para obtener más información, consulta Controla la transferencia de registros de Dataflow.

Límite y regulación del registro

Los mensajes de registro de los trabajadores se limitan a 15,000 mensajes cada 30 segundos por trabajador. Si se alcanza este límite, se agrega un solo mensaje de registro de trabajador que indica que el registro está regulado:

Throttling logger worker. It used up its 30s quota for logs in only 12.345s
No se registrarán más mensajes hasta que finalice el intervalo de 30 segundos. Este límite se comparte mediante mensajes de registro generados por el SDK de Apache Beam y el código de usuario.

Retención y almacenamiento de registros

Los registros operativos se almacenan en el bucket de registros _Default. El nombre del servicio de la API de Logging es dataflow.googleapis.com. Para obtener más información sobre los tipos de recursos y servicios supervisados de Google Cloud que se usan en Cloud Logging, consulta Recursos y servicios supervisados.

Para obtener detalles sobre cuánto tiempo Logging retiene las entradas de registro, consulta la información sobre retención en Cuotas y límites: períodos de retención de registros.

Para obtener información sobre cómo ver los registros operativos, consulta Supervisa y visualiza los registros de canalización.

Supervisa y visualiza los registros de canalización

Cuando ejecutas la canalización en el servicio de Dataflow, puedes usar la interfaz de supervisión de Dataflow para ver los registros que emite tu canalización.

Ejemplo de registro de trabajador de Dataflow

La canalización de WordCount modificada se puede ejecutar en la nube con las siguientes opciones:

Java

--project=WordCountExample
--output=gs://<bucket-name>/counts
--runner=DataflowRunner
--tempLocation=gs://<bucket-name>/temp
--stagingLocation=gs://<bucket-name>/binaries

Python

--project=WordCountExample
--output=gs://<bucket-name>/counts
--runner=DataflowRunner
--staging_location=gs://<bucket-name>/binaries

Go

--project=WordCountExample
--output=gs://<bucket-name>/counts
--runner=DataflowRunner
--staging_location=gs://<bucket-name>/binaries

Ver registros

Debido a que la canalización en la nube de WordCount usa la ejecución de bloqueo, los mensajes de la consola se emiten durante la ejecución de la canalización. Después de que se inicia el trabajo, se envía a la consola un vínculo a la página de la consola de Google Cloud, seguido del ID del trabajo de canalización:

INFO: To access the Dataflow monitoring console, please navigate to
https://console.developers.google.com/dataflow/job/2017-04-13_13_58_10-6217777367720337669
Submitted job: 2017-04-13_13_58_10-6217777367720337669

La URL de la consola lleva a la interfaz de supervisión de Dataflow con una página de resumen para el trabajo enviado. Muestra un grafo de ejecución dinámico a la izquierda, con información de resumen a la derecha: Haz clic en en el panel inferior para expandir el panel de registros.

De forma predeterminada, el panel de registros muestra Registros de trabajos que informan el estado del trabajo en su conjunto. Para filtrar los mensajes que aparecen en el panel de registros, haz clic en Información y, luego, en Filtrar registros.

Seleccionar un paso de canalización en el grafo cambia la vista a Step Logs (Registros de pasos), que genera tu código, y al código generado que se ejecuta en el paso de la canalización.

Para volver a Job Logs, borra el paso mediante un clic fuera del grafo o con el botón Anular selección de paso en el panel lateral derecho.

Hacer clic en el botón de vínculo externo desde el panel de registros lleva a Logging con un menú para seleccionar diferentes tipos de registros.

Logging también incluye otros registros de infraestructura para la canalización. Para obtener más detalles sobre cómo explorar tus registros, consulta la guía Explorador de registros.

Tipos de registros

Aquí hay un resumen de los diferentes tipos de registro disponibles para ver en la página Logging:

  • Los registros de job-message contienen mensajes a nivel del trabajo que generan varios componentes de Dataflow. Los ejemplos incluyen la configuración del ajuste de escala automático, cuando los trabajadores inician o cierran una instancia, el avance en el paso del trabajo y los errores del trabajo. Los errores a nivel del trabajador que se originan en una falla del código del usuario y que están presentes en los registros del trabajador también se propagan a los registros job-message.
  • Los trabajadores de Dataflow producen los registros de worker. Los trabajadores realizan la mayor parte del trabajo de canalización (por ejemplo, mediante la aplicación de ParDo a los datos). Los registros de worker contienen mensajes registrados por tu código y Dataflow.
  • Los registros de worker-startup están presentes en la mayoría de los trabajos de Dataflow y pueden capturar los mensajes que se relacionan con el proceso de inicio. El proceso de inicio incluye la descarga de los archivos jar del trabajo desde Cloud Storage seguida del inicio de los trabajadores. Si hay un problema cuando se inician los trabajadores, estos registros son un buen lugar para buscar información.
  • Los registros de shuffler contienen mensajes de los trabajadores que consolidan los resultados de las operaciones de canalización paralelas.
  • Los registros de docker y kubelet contienen mensajes relacionados con estas tecnologías públicas, que se usan en los trabajadores de Dataflow.
  • Los registros nvidia-mps contienen mensajes sobre las operaciones de NVIDIA Multi-Process Service (MPS).

Establece los niveles de registro de los trabajadores de la canalización

Java

El nivel de registro predeterminado de SLF4J que configura el SDK de Apache Beam para Java sobre los trabajadores es INFO. Se emitirán todos los mensajes de registro de INFO o superior (INFO, WARN, ERROR). Puedes configurar un nivel de registro predeterminado diferente para admitir niveles de registro inferiores de SLF4J (TRACE o DEBUG) o configurar niveles de registro distintos para diferentes paquetes de clases en tu código.

Las siguientes opciones de canalización se proporcionan para que puedas establecer niveles de registro de trabajadores desde la línea de comandos o de manera programática:

  • --defaultSdkHarnessLogLevel=<level>: Usa esta opción para configurar todos los registradores en el nivel predeterminado especificado. Por ejemplo, la siguiente opción de línea de comandos anulará el nivel de registro INFO predeterminado de Dataflow y lo configurará como DEBUG:
    --defaultSdkHarnessLogLevel=DEBUG
  • --sdkHarnessLogLevelOverrides={"<package or class>":"<level>"}: Usa esta opción para configurar el nivel de registro para clases o paquetes especificados. Por ejemplo, para anular el nivel de registro de canalización predeterminado para el módulo org.apache.beam.runners.dataflow y configurarlo como TRACE:
    --sdkHarnessLogLevelOverrides='{"org.apache.beam.runners.dataflow":"TRACE"}'
    Para realizar varias anulaciones, proporciona un mapa JSON:
    (--sdkHarnessLogLevelOverrides={"<package/class>":"<level>","<package/class>":"<level>",...}).
  • Las opciones de canalización defaultSdkHarnessLogLevel y sdkHarnessLogLevelOverrides no son compatibles con las canalizaciones que usan las versiones 2.50.0 y anteriores del SDK de Apache Beam sin Runner v2. En ese caso, usa las opciones de canalización --defaultWorkerLogLevel=<level> y --workerLogLevelOverrides={"<package or class>":"<level>"}. Para realizar varias anulaciones, proporciona un mapa JSON:
    (--workerLogLevelOverrides={"<package/class>":"<level>","<package/class>":"<level>",...})

En el siguiente ejemplo, se configuran de manera programática las opciones de registro de canalización con valores predeterminados que se pueden anular desde la línea de comandos:

 PipelineOptions options = ...
 SdkHarnessOptions loggingOptions = options.as(SdkHarnessOptions.class);
 // Overrides the default log level on the worker to emit logs at TRACE or higher.
 loggingOptions.setDefaultSdkHarnessLogLevel(LogLevel.TRACE);
 // Overrides the Foo class and "org.apache.beam.runners.dataflow" package to emit logs at WARN or higher.
 loggingOptions.getSdkHarnessLogLevelOverrides()
     .addOverrideForClass(Foo.class, LogLevel.WARN)
     .addOverrideForPackage(Package.getPackage("org.apache.beam.runners.dataflow"), LogLevel.WARN);

Python

El nivel de registro predeterminado que configura el SDK de Apache Beam para Python sobre los trabajadores es INFO. Se emitirán todos los mensajes de registro de INFO o superior (INFO, WARNING, ERROR, CRITICAL). Puedes configurar un nivel de registro predeterminado diferente para admitir niveles de registro inferiores (DEBUG) o configurar niveles de registro diferentes para diferentes módulos de tu código.

Se proporcionan dos opciones de canalización para permitirte establecer niveles de registro de trabajadores desde la línea de comandos o de manera programática:

  • --default_sdk_harness_log_level=<level>: Usa esta opción para configurar todos los registradores en el nivel predeterminado especificado. Por ejemplo, la siguiente opción de línea de comandos anula el nivel de registro INFO predeterminado de Dataflow y lo establece en DEBUG:
    --default_sdk_harness_log_level=DEBUG
  • --sdk_harness_log_level_overrides={\"<module>\":\"<level>\"}: Usa esta opción para configurar el nivel de registro para módulos especificados. Por ejemplo, para anular el nivel de registro de canalización predeterminado para el módulo apache_beam.runners.dataflow y configurarlo como DEBUG:
    --sdk_harness_log_level_overrides={\"apache_beam.runners.dataflow\":\"DEBUG\"}
    Para realizar varias anulaciones, proporciona un mapa JSON:
    (--sdk_harness_log_level_overrides={\"<module>\":\"<level>\",\"<module>\":\"<level>\",...}).

En el siguiente ejemplo, se usa la clase WorkerOptions para establecer opciones de registro de canalización de manera programática que se pueden anular desde la línea de comandos:

  from apache_beam.options.pipeline_options import PipelineOptions, WorkerOptions

  pipeline_args = [
    '--project=PROJECT_NAME',
    '--job_name=JOB_NAME',
    '--staging_location=gs://STORAGE_BUCKET/staging/',
    '--temp_location=gs://STORAGE_BUCKET/tmp/',
    '--region=DATAFLOW_REGION',
    '--runner=DataflowRunner'
  ]

  pipeline_options = PipelineOptions(pipeline_args)
  worker_options = pipeline_options.view_as(WorkerOptions)
  worker_options.default_sdk_harness_log_level = 'WARNING'

  # Note: In Apache Beam SDK 2.42.0 and earlier versions, use ['{"apache_beam.runners.dataflow":"WARNING"}']
  worker_options.sdk_harness_log_level_overrides = {"apache_beam.runners.dataflow":"WARNING"}

  # Pass in pipeline options during pipeline creation.
  with beam.Pipeline(options=pipeline_options) as pipeline:

Reemplaza lo siguiente:

  • PROJECT_NAME: el nombre del proyecto
  • JOB_NAME: el nombre del trabajo
  • STORAGE_BUCKET: Es el nombre de Cloud Storage.
  • DATAFLOW_REGION: la región en la que deseas implementar el trabajo de Dataflow

    La marca --region anula la región predeterminada que está configurada en el servidor de metadatos, el cliente local o las variables de entorno.

Go

Esta función no está disponible en el SDK de Apache Beam para Go.

Visualiza el registro de los trabajos iniciados de BigQuery

Cuando usas BigQuery en tu canalización de Dataflow, se inician los trabajos de BigQuery para realizar diversas acciones en tu nombre. Estas acciones pueden incluir la carga de datos, la exportación de datos, etcétera. Para solucionar problemas y supervisar, la interfaz de supervisión de Dataflow tiene información adicional sobre estos trabajos de BigQuery disponibles en el panel Registros.

La información de los trabajos de BigQuery que se muestra en el panel Registros se almacena y carga desde una tabla del sistema de BigQuery. Se genera un costo de facturación cuando se consulta la tabla subyacente de BigQuery.

Visualiza los detalles de los trabajos de BigQuery

Para ver la información de trabajos de BigQuery, la canalización debe usar Apache Beam 2.24.0 o una versión posterior.

Para enumerar los trabajos de BigQuery, abre la pestaña BigQuery Jobs y selecciona la ubicación de los trabajos de BigQuery. A continuación, haz clic en Cargar trabajos de BigQuery (Load BigQuery Jobs) y confirma el diálogo. Una vez completada la consulta, se muestra la lista de trabajos.

El botón Cargar trabajos de BigQuery (Load BigQuery Jobs) en la tabla de información de trabajos de BigQuery

Se proporciona información básica sobre cada trabajo, como el ID de tarea, el tipo, la duración, etcétera.

Una tabla que muestra los trabajos de BigQuery que se ejecutaron durante la ejecución actual del trabajo de canalización.

Para obtener información más detallada sobre un trabajo específico, haz clic en Línea de comandos (Command line) en la columna Más información (More Info).

En la ventana modal de la línea de comandos, copia el comando bq jobs describe y ejecútalo de forma local o en Cloud Shell.

gcloud alpha bq jobs describe BIGQUERY_JOB_ID

El comando bq jobs describe genera JobStatistics, que proporciona más detalles que son útiles para diagnosticar un trabajo de BigQuery lento o bloqueado.

Como alternativa, cuando usas BigQueryIO con una consulta en SQL, se emite un trabajo de consulta. Para ver la consulta en SQL que usa el trabajo, haz clic en Ver consulta en la columna Más información.