Puedes usar la infraestructura de registro incorporada del SDK de Apache Beam para registrar información cuando ejecutas la canalización. Puedes usar la consola de Google Cloud para supervisar la información de registro durante y después de que se ejecute la canalización.
Agrega mensajes de registro a tu canalización
Java
Si usas el SDK de Apache Beam para Java, se recomienda que registres los mensajes de los trabajadores a través de la biblioteca de código abierto Simple Logging Facade para Java (SLF4J). El SDK de Apache Beam para Java implementa la infraestructura de registro requerida, por lo que tu código de Java solo necesita importar la API de SLF4J. Luego, el SDK crea una instancia de Registrador para habilitar el registro de mensajes dentro de tu código de canalización.
En códigos o bibliotecas preexistentes, el SDK de Apache Beam para Java configura una infraestructura de registro adicional. Se capturan los mensajes de registro producidos por las siguientes bibliotecas de registro para Java:
Python
El SDK de Apache Beam para Python proporciona el paquete de biblioteca logging
, que permite a los trabajadores de la canalización emitir mensajes de registro. Para usar las funciones de la biblioteca, debes importarla:
import logging
Go
El SDK de Apache Beam para Go proporciona el paquete de biblioteca log
, que permite a los trabajadores de la canalización emitir mensajes de registro. Para usar las funciones de la biblioteca, debes importarla:
import "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
Ejemplo de código de mensaje de registro de trabajador
Java
En el siguiente ejemplo, se usa SLF4J para el registro de Dataflow. Si deseas obtener más información sobre cómo configurar SLF4J para el registro de Dataflow, consulta el artículo Sugerencias de Java.
El ejemplo de WordCount de Apache Beam puede modificarse para emitir un mensaje de registro cuando se encuentra la palabra “love” en una línea del texto procesado. El código agregado se indica en negrita en el siguiente ejemplo (se incluye el código que lo rodea para brindar contexto).
package org.apache.beam.examples; // Import SLF4J packages. import org.slf4j.Logger; import org.slf4j.LoggerFactory; ... public class WordCount { ... static class ExtractWordsFn extends DoFn<String, String> { // Instantiate Logger. // Suggestion: As shown, specify the class name of the containing class // (WordCount). private static final Logger LOG = LoggerFactory.getLogger(WordCount.class); ... @ProcessElement public void processElement(ProcessContext c) { ... // Output each word encountered into the output PCollection. for (String word : words) { if (!word.isEmpty()) { c.output(word); } // Log INFO messages when the word "love" is found. if(word.toLowerCase().equals("love")) { LOG.info("Found " + word.toLowerCase()); } } } } ... // Remaining WordCount example code ...
Python
El ejemplo de wordcount.py de Apache Beam puede modificarse para emitir un mensaje de registro cuando se encuentra la palabra “love” en una línea del texto procesado.
# import Python logging module. import logging class ExtractWordsFn(beam.DoFn): def process(self, element): words = re.findall(r'[A-Za-z\']+', element) for word in words: yield word if word.lower() == 'love': # Log using the root logger at info or higher levels logging.info('Found : %s', word.lower()) # Remaining WordCount example code ...
Go
El ejemplo de wordcount.go de Apache Beam puede modificarse para emitir un mensaje de registro cuando se encuentra la palabra “love” en una línea del texto procesado.
func (f *extractFn) ProcessElement(ctx context.Context, line string, emit func(string)) { for _, word := range wordRE.FindAllString(line, -1) { // increment the counter for small words if length of words is // less than small_word_length if strings.ToLower(word) == "love" { log.Infof(ctx, "Found : %s", strings.ToLower(word)) } emit(word) } } // Remaining Wordcount example
Java
Si la canalización de WordCount modificada se ejecuta de forma local mediante el DirectRunner predeterminado y los resultados se envían a un archivo local (--output=./local-wordcounts
), el resultado de la consola incluye los mensajes de registro agregados:
INFO: Executing pipeline using the DirectRunner. ... Feb 11, 2015 1:13:22 PM org.apache.beam.examples.WordCount$ExtractWordsFn processElement INFO: Found love Feb 11, 2015 1:13:22 PM org.apache.beam.examples.WordCount$ExtractWordsFn processElement INFO: Found love Feb 11, 2015 1:13:22 PM org.apache.beam.examples.WordCount$ExtractWordsFn processElement INFO: Found love ... INFO: Pipeline execution complete.
De forma predeterminada, solo las líneas de registro marcadas como INFO
y superiores se enviarán a Cloud Logging. Si deseas modificar este comportamiento, consulta Configura los niveles de registro de los trabajadores de la canalización.
Python
Si la canalización de WordCount modificada se ejecuta de forma local mediante el DirectRunner predeterminado y los resultados se envían a un archivo local (--output=./local-wordcounts
), el resultado de la consola incluye los mensajes de registro agregados:
INFO:root:Found : love INFO:root:Found : love INFO:root:Found : love
De forma predeterminada, solo las líneas de registro marcadas como INFO
y superiores se enviarán a Cloud Logging.
Go
Si la canalización de WordCount modificada se ejecuta de forma local mediante el DirectRunner predeterminado y los resultados se envían a un archivo local (--output=./local-wordcounts
), el resultado de la consola incluye los mensajes de registro agregados:
2022/05/26 11:36:44 Found : love 2022/05/26 11:36:44 Found : love 2022/05/26 11:36:44 Found : love
De forma predeterminada, solo las líneas de registro marcadas como INFO
y superiores se enviarán a Cloud Logging.
Controla el volumen de registros
También puedes reducir el volumen de registros generados si cambias los niveles de registro de la canalización. Si no deseas continuar con la transferencia de algunos o todos tus registros de Dataflow, agrega una exclusión de Logging para excluir los registros de Dataflow. Luego, exporta los registros a un destino diferente, como BigQuery, Cloud Storage o Pub/Sub. Para obtener más información, consulta Controla la transferencia de registros de Dataflow.
Límite y regulación del registro
Los mensajes de registro de los trabajadores se limitan a 15,000 mensajes cada 30 segundos por trabajador. Si se alcanza este límite, se agrega un solo mensaje de registro de trabajador que indica que el registro está regulado:
Throttling logger worker. It used up its 30s quota for logs in only 12.345s
Retención y almacenamiento de registros
Los registros operativos se almacenan en el bucket de registros _Default
.
El nombre del servicio de la API de Logging es dataflow.googleapis.com
. Para obtener más información sobre los tipos de recursos y servicios supervisados de Google Cloud que se usan en Cloud Logging, consulta Recursos y servicios supervisados.
Para obtener detalles sobre cuánto tiempo Logging retiene las entradas de registro, consulta la información sobre retención en Cuotas y límites: períodos de retención de registros.
Para obtener información sobre cómo ver los registros operativos, consulta Supervisa y visualiza los registros de canalización.
Supervisa y visualiza los registros de canalización
Cuando ejecutas la canalización en el servicio de Dataflow, puedes usar la interfaz de supervisión de Dataflow para ver los registros que emite tu canalización.
Ejemplo de registro de trabajador de Dataflow
La canalización de WordCount modificada se puede ejecutar en la nube con las siguientes opciones:
Java
--project=WordCountExample --output=gs://<bucket-name>/counts --runner=DataflowRunner --tempLocation=gs://<bucket-name>/temp --stagingLocation=gs://<bucket-name>/binaries
Python
--project=WordCountExample --output=gs://<bucket-name>/counts --runner=DataflowRunner --staging_location=gs://<bucket-name>/binaries
Go
--project=WordCountExample --output=gs://<bucket-name>/counts --runner=DataflowRunner --staging_location=gs://<bucket-name>/binaries
Ver registros
Debido a que la canalización en la nube de WordCount usa la ejecución de bloqueo, los mensajes de la consola se emiten durante la ejecución de la canalización. Después de que se inicia el trabajo, se envía a la consola un vínculo a la página de la consola de Google Cloud, seguido del ID del trabajo de canalización:
INFO: To access the Dataflow monitoring console, please navigate to https://console.developers.google.com/dataflow/job/2017-04-13_13_58_10-6217777367720337669 Submitted job: 2017-04-13_13_58_10-6217777367720337669
La URL de la consola lleva a la interfaz de supervisión de Dataflow con una página de resumen para el trabajo enviado. Muestra un grafo de ejecución dinámico a la izquierda, con información de resumen a la derecha: Haz clic en keyboard_capslock en el panel inferior para expandir el panel de registros.
De forma predeterminada, el panel de registros muestra Registros de trabajos que informan el estado del trabajo en su conjunto. Para filtrar los mensajes que aparecen en el panel de registros, haz clic en Informaciónarrow_drop_down y, luego, en filter_listFiltrar registros.
Seleccionar un paso de canalización en el grafo cambia la vista a Step Logs (Registros de pasos), que genera tu código, y al código generado que se ejecuta en el paso de la canalización.
Para volver a Job Logs, borra el paso mediante un clic fuera del grafo o con el botón Anular selección de paso en el panel lateral derecho.
Ve al Explorador de registros (Logs Explorer).
Para abrir el Explorador de registros y seleccionar diferentes tipos de registros, en el panel de registros, haz clic en Ver en el Explorador de registros (el botón de vínculo externo).
En el Explorador de registros, para ver el panel con diferentes tipos de registro, haz clic en el botón de activar o desactivar Campos de registro.
En la página Explorador de registros, la consulta puede filtrar los registros por paso de trabajo o por tipo de registro. Para quitar los filtros, haz clic en el botón de activación Mostrar consulta y edita la consulta.
Si deseas ver todos los registros disponibles para un trabajo, sigue estos pasos:
En el campo Consulta, ingresa la siguiente consulta:
resource.type="dataflow_step" resource.labels.job_id="JOB_ID"
Reemplaza JOB_ID por el ID de tu trabajo.
Haz clic en Ejecutar consulta.
Si usas esta consulta y no ves los registros de tu trabajo, haz clic en Editar hora.
Ajusta la hora de inicio y de finalización y, luego, haz clic en Aplicar.
Tipos de registros
El explorador de registros también incluye registros de infraestructura para la canalización. Usa los registros de errores y advertencias para diagnosticar los problemas de canalización observados. Los errores y las advertencias en los registros de la infraestructura que no se correlacionan con un problema de canalización no necesariamente indican un problema.
Aquí hay un resumen de los diferentes tipos de registro disponibles para ver en la página Explorador de registros:
- Los registros de job-message contienen mensajes a nivel del trabajo que generan varios componentes de Dataflow. Los ejemplos incluyen la configuración del ajuste de escala automático, cuando los trabajadores inician o cierran una instancia, el avance en el paso del trabajo y los errores del trabajo. Los errores a nivel del trabajador que se originan en una falla del código del usuario y que están presentes en los registros del trabajador también se propagan a los registros job-message.
- Los trabajadores de Dataflow producen los registros de worker. Los trabajadores realizan la mayor parte del trabajo de canalización (por ejemplo, mediante la aplicación de
ParDo
a los datos). Los registros de worker contienen mensajes registrados por tu código y Dataflow. - Los registros de worker-startup están presentes en la mayoría de los trabajos de Dataflow y pueden capturar los mensajes que se relacionan con el proceso de inicio. El proceso de inicio incluye la descarga de los archivos jar del trabajo desde Cloud Storage seguida del inicio de los trabajadores. Si hay un problema cuando se inician los trabajadores, estos registros son un buen lugar para buscar información.
- Los registros de shuffler contienen mensajes de los trabajadores que consolidan los resultados de las operaciones de canalización paralelas.
- Los registros de system contienen mensajes de los sistemas operativos host de las VMs de trabajador. En algunas situaciones, pueden capturar fallas del proceso o eventos de memoria insuficiente (OOM).
- Los registros de docker y kubelet contienen mensajes relacionados con estas tecnologías públicas, que se usan en los trabajadores de Dataflow.
- Los registros nvidia-mps contienen mensajes sobre las operaciones de NVIDIA Multi-Process Service (MPS).
Establece los niveles de registro de los trabajadores de la canalización
Java
El nivel de registro predeterminado de SLF4J que configura el SDK de Apache Beam para Java sobre los trabajadores es INFO
. Se emitirán todos los mensajes de registro de INFO
o superior (INFO
, WARN
, ERROR
). Puedes configurar un nivel de registro predeterminado diferente para admitir niveles de registro inferiores de SLF4J (TRACE
o DEBUG
) o configurar niveles de registro distintos para diferentes paquetes de clases en tu código.
Las siguientes opciones de canalización se proporcionan para permitirte establecer niveles de registro de trabajadores desde la línea de comandos o de manera programática:
--defaultSdkHarnessLogLevel=<level>
: Usa esta opción para configurar todos los registradores en el nivel predeterminado especificado. Por ejemplo, la siguiente opción de línea de comandos anulará el nivel de registroINFO
predeterminado de Dataflow y lo configurará comoDEBUG
:--defaultSdkHarnessLogLevel=DEBUG
--sdkHarnessLogLevelOverrides={"<package or class>":"<level>"}
: Usa esta opción para configurar el nivel de registro para clases o paquetes especificados. Por ejemplo, para anular el nivel de registro de canalización predeterminado para el móduloorg.apache.beam.runners.dataflow
y configurarlo comoTRACE
:
--sdkHarnessLogLevelOverrides='{"org.apache.beam.runners.dataflow":"TRACE"}'
Para realizar varias anulaciones, proporciona un mapa JSON:
(--sdkHarnessLogLevelOverrides={"<package/class>":"<level>","<package/class>":"<level>",...}
).- Las opciones de canalización
defaultSdkHarnessLogLevel
ysdkHarnessLogLevelOverrides
no son compatibles con las canalizaciones que usan las versiones 2.50.0 y anteriores del SDK de Apache Beam sin Runner v2. En ese caso, usa las opciones de canalización--defaultWorkerLogLevel=<level>
y--workerLogLevelOverrides={"<package or class>":"<level>"}
. Para realizar varias anulaciones, proporciona un mapa JSON:
(--workerLogLevelOverrides={"<package/class>":"<level>","<package/class>":"<level>",...}
)
En el siguiente ejemplo, se configuran de manera programática las opciones de registro de canalización con valores predeterminados que se pueden anular desde la línea de comandos:
PipelineOptions options = ... SdkHarnessOptions loggingOptions = options.as(SdkHarnessOptions.class); // Overrides the default log level on the worker to emit logs at TRACE or higher. loggingOptions.setDefaultSdkHarnessLogLevel(LogLevel.TRACE); // Overrides the Foo class and "org.apache.beam.runners.dataflow" package to emit logs at WARN or higher. loggingOptions.getSdkHarnessLogLevelOverrides() .addOverrideForClass(Foo.class, LogLevel.WARN) .addOverrideForPackage(Package.getPackage("org.apache.beam.runners.dataflow"), LogLevel.WARN);
Python
El nivel de registro predeterminado que configura el SDK de Apache Beam para Python sobre los trabajadores es INFO
. Se emitirán todos los mensajes de registro de INFO
o superior (INFO
, WARNING
, ERROR
, CRITICAL
).
Puedes configurar un nivel de registro predeterminado diferente para admitir niveles de registro inferiores (DEBUG
) o configurar niveles de registro diferentes para diferentes módulos de tu código.
Se proporcionan dos opciones de canalización para permitirte establecer niveles de registro de trabajadores desde la línea de comandos o de manera programática:
--default_sdk_harness_log_level=<level>
: Usa esta opción para configurar todos los registradores en el nivel predeterminado especificado. Por ejemplo, la siguiente opción de línea de comandos anula el nivel de registroINFO
predeterminado de Dataflow y lo establece enDEBUG
:
--default_sdk_harness_log_level=DEBUG
--sdk_harness_log_level_overrides={\"<module>\":\"<level>\"}
: Usa esta opción para configurar el nivel de registro para módulos especificados. Por ejemplo, para anular el nivel de registro de canalización predeterminado para el móduloapache_beam.runners.dataflow
y configurarlo comoDEBUG
:
--sdk_harness_log_level_overrides={\"apache_beam.runners.dataflow\":\"DEBUG\"}
Para realizar varias anulaciones, proporciona un mapa JSON:
(--sdk_harness_log_level_overrides={\"<module>\":\"<level>\",\"<module>\":\"<level>\",...}
).
En el siguiente ejemplo, se usa la clase WorkerOptions
para establecer opciones de registro de canalización de manera programática que se pueden anular desde la línea de comandos:
from apache_beam.options.pipeline_options import PipelineOptions, WorkerOptions pipeline_args = [ '--project=PROJECT_NAME', '--job_name=JOB_NAME', '--staging_location=gs://STORAGE_BUCKET/staging/', '--temp_location=gs://STORAGE_BUCKET/tmp/', '--region=DATAFLOW_REGION', '--runner=DataflowRunner' ] pipeline_options = PipelineOptions(pipeline_args) worker_options = pipeline_options.view_as(WorkerOptions) worker_options.default_sdk_harness_log_level = 'WARNING' # Note: In Apache Beam SDK 2.42.0 and earlier versions, use ['{"apache_beam.runners.dataflow":"WARNING"}'] worker_options.sdk_harness_log_level_overrides = {"apache_beam.runners.dataflow":"WARNING"} # Pass in pipeline options during pipeline creation. with beam.Pipeline(options=pipeline_options) as pipeline:
Reemplaza lo siguiente:
PROJECT_NAME
: el nombre del proyectoJOB_NAME
: el nombre del trabajoSTORAGE_BUCKET
: Es el nombre de Cloud Storage.DATAFLOW_REGION
: la región en la que deseas implementar el trabajo de DataflowLa marca
--region
anula la región predeterminada que está configurada en el servidor de metadatos, el cliente local o las variables de entorno.
Go
Esta función no está disponible en el SDK de Apache Beam para Go.
Visualiza el registro de los trabajos iniciados de BigQuery
Cuando se usa BigQuery en la canalización de Dataflow, se inician los trabajos de BigQuery para realizar diversas acciones en tu nombre. Estas acciones pueden incluir la carga de datos, la exportación de datos, etcétera. Para solucionar problemas y supervisar, la interfaz de supervisión de Dataflow tiene información adicional sobre estos trabajos de BigQuery disponibles en el panel Registros.
La información de los trabajos de BigQuery que se muestra en el panel Registros se almacena y carga desde una tabla del sistema de BigQuery. Se genera un costo de facturación cuando se consulta la tabla subyacente de BigQuery.
Visualiza los detalles de los trabajos de BigQuery
Para ver la información de trabajos de BigQuery, la canalización debe usar Apache Beam 2.24.0 o una versión posterior.
Para enumerar los trabajos de BigQuery, abre la pestaña BigQuery Jobs y selecciona la ubicación de los trabajos de BigQuery. A continuación, haz clic en Cargar trabajos de BigQuery (Load BigQuery Jobs) y confirma el diálogo. Una vez completada la consulta, se muestra la lista de trabajos.
Se proporciona información básica sobre cada trabajo, como el ID de tarea, el tipo, la duración, etcétera.
Para obtener información más detallada sobre un trabajo específico, haz clic en Línea de comandos (Command line) en la columna Más información (More Info).
En la ventana modal de la línea de comandos, copia el comando bq jobs describe y ejecútalo de forma local o en Cloud Shell.
gcloud alpha bq jobs describe BIGQUERY_JOB_ID
El comando bq jobs describe
genera JobStatistics, que proporciona más detalles que son útiles para diagnosticar un trabajo de BigQuery lento o bloqueado.
Como alternativa, cuando usas BigQueryIO con una consulta en SQL, se emite un trabajo de consulta. Para ver la consulta en SQL que usa el trabajo, haz clic en Ver consulta en la columna Más información.