Solucionar problemas de DAG

Cloud Composer 1 | Cloud Composer 2

En esta página, se proporcionan información y pasos para solucionar problemas comunes en el flujo de trabajo.

Muchos problemas de ejecución de DAG se deben a un rendimiento no óptimo del entorno. Para optimizar el entorno de Cloud Composer 2, sigue la guía Optimiza el rendimiento y los costos del entorno.

Algunos problemas de ejecución de DAG pueden deberse a que el programador de Airflow no funciona correctamente o de manera óptima. Sigue las instrucciones para solucionar problemas del programador.

Flujo de trabajo para la solución de problemas

Para comenzar con la solución de problemas, siga estos pasos:

  1. Consulta los registros de Airflow.
  2. Verifica el Panel de Monitoring.
  3. Revise Cloud Monitoring.
  4. En Cloud Console, busca errores en las páginas de los componentes de tu entorno.
  5. En la interfaz web de Airflow, consulta la vista de gráfico del DAG para ver las instancias de tareas con errores.

    Sugerencia: Para navegar por un DAG grande a fin de buscar instancias de tareas fallidas, cambia la orientación de la vista de gráfico de LR a RL. Para ello, anula la siguiente opción de configuración de Airflow:

    Sección Clave Valor
    webserver dag_orientation LR, TB, RL, oBT

Depura fallas del operador

Para depurar una falla del operador, sigue estos pasos:

  1. Verifica si hay errores específicos de la tarea.
  2. Consulta los registros de Airflow.
  3. Revise Cloud Monitoring.
  4. Verificar los registros específicos del operador
  5. Corrige los errores.
  6. Sube el DAG a la carpeta dags/.
  7. En la interfaz web de Airflow, borra los estados anteriores del DAG.
  8. Reanuda o ejecuta el DAG.

Problemas comunes

En las siguientes secciones, se describen los síntomas y las posibles soluciones para algunos problemas comunes del DAG.

La tarea falla sin emitir registros debido a errores de análisis del DAG

A veces, puede haber errores de DAG sutiles que llevan a una situación en la que un programador de Airflow y el procesador de DAG pueden programar tareas para su ejecución y analizar un archivo de DAG (respectivamente), pero el trabajador de Airflow no puede ejecutar tareas de ese DAG, ya que hay errores de programación en el archivo de DAG de Python. Esto puede generar una situación en la que una tarea de Airflow se marca como Failed y no hay un registro de su ejecución.

Solución: verifica en los registros de trabajador de Airflow que no haya errores generados por un trabajador de Airflow relacionados con errores faltantes de DAG o análisis de DAG.

La tarea falla sin emitir registros debido a la presión de recursos

Síntoma: durante la ejecución de una tarea, el subproceso de trabajador de Airflow responsable de ejecutar la tarea de Airflow se interrumpe de forma repentina. El error visible en el registro del trabajador de Airflow podría ser similar al siguiente:

...
File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 412, in trace_task    R = retval = fun(*args, **kwargs)  File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 704, in __protected_call__    return self.run(*args, **kwargs)  File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 88, in execute_command    _execute_in_fork(command_to_exec)  File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 99, in _execute_in_fork
raise AirflowException('Celery command failed on host: ' + get_hostname())airflow.exceptions.AirflowException: Celery command failed on host: airflow-worker-9qg9x
...

Solución:

La tarea falla sin emitir registros debido a la expulsión del Pod

Los Pods de Google Kubernetes Engine están sujetos al ciclo de vida de los Pods de Kubernetes y su expulsión. Los aumentos de tareas y la coprogramación de trabajadores son dos causas más comunes de expulsión de Pods en Cloud Composer.

La expulsión de pods puede ocurrir cuando un pod en particular usa recursos de un nodo, en relación con las expectativas de consumo de recursos configuradas para el nodo. Por ejemplo, la expulsión puede ocurrir cuando varias tareas de alto consumo de memoria se ejecutan en un Pod, y su carga combinada hace que el nodo en el que se ejecuta este exceda el límite de consumo de memoria.

Si se expulsa un pod de trabajador de Airflow, todas las instancias de tareas que se ejecutan en ese pod se interrumpen y, luego, se marcan como con errores en Airflow.

Los registros están almacenados en búfer. Si se expulsa un pod trabajador antes de que se vacíe el búfer, no se emiten registros. La falla de la tarea sin registros indica que los trabajadores de Airflow se reiniciaron debido a la falta de memoria (OOM). Algunos registros pueden estar presentes en Cloud Logging, aunque los registros de Airflow no se hayan emitido.

Para ver los registros, haz lo siguiente:

  1. En Google Cloud Console, ve a la página Entornos.

    Ir a Entornos

  2. En la lista de entornos, haga clic en el nombre de su entorno. Se abrirá la página Detalles del entorno.

  3. Ve a la pestaña Registros.

  4. Para ver registros de trabajadores individuales, ve a All logs -> Airflow logs -> Workers -> (individual worker).

La ejecución del DAG tiene un límite de memoria. Cada ejecución de la tarea comienza con dos procesos de Airflow: ejecución de la tarea y supervisión. Actualmente, cada nodo puede realizar hasta 6 tareas simultáneas (aproximadamente 12 procesos cargados con módulos de Airflow). Se puede consumir más memoria, según el tamaño del DAG.

Síntoma:

  1. En Google Cloud Console, ve a la página Cargas de trabajo.

    Ir a Cargas de trabajo

  2. Si hay pods airflow-worker que muestran Evicted, haz clic en cada pod expulsado y busca el mensaje The node was low on resource: memory en la parte superior de la ventana.

Solución:

  • En Cloud Composer 1, crea un entorno de Cloud Composer nuevo con un tipo de máquina más grande que el tipo de máquina actual.
  • En Cloud Composer 2, aumenta los límites de memoria para los trabajadores de Airflow.
  • Verifica los registros de airflow-worker Pods para detectar posibles causas de expulsiones. Para obtener más información sobre cómo recuperar registros de Pods individuales, consulta Soluciona problemas de las cargas de trabajo implementadas.
  • Asegúrate de que las tareas del DAG sean idempotentes y se puedan reintentar.

Tiempo de espera de importación de carga de DAG

Síntoma:

  • En la interfaz web de Airflow, en la parte superior de la página de la lista de DAG, un cuadro rojo de alerta muestra Broken DAG: [/path/to/dagfile] Timeout.
  • En Cloud Monitoring, los registros airflow-scheduler contienen entradas similares a las siguientes:

    • ERROR - Process timed out
    • ERROR - Failed to import: /path/to/dagfile
    • AirflowTaskTimeout: Timeout

Solución:

Anular la opción de configuración dag_file_processor_timeout de Airflow y permitir más tiempo para el análisis de DAG:

Sección Clave Valor
core dag_file_processor_timeout Nuevo valor del tiempo de espera

Mayor tráfico de red desde y hacia la base de datos de Airflow

La cantidad de tráfico de red entre el clúster de GKE de tu entorno y la base de datos de Airflow depende de la cantidad de DAG, de tareas en DAG, y de cómo los DAG acceden a los datos en la base de datos de Airflow. Los siguientes factores pueden influir en el uso de la red:

  • Consultas a la base de datos de Airflow Si tus DAG realizan muchas consultas, generan grandes cantidades de tráfico. Por ejemplo, verificar el estado de las tareas antes de continuar con otras tareas, consultar la tabla XCom y volcar el contenido de la base de datos de Airflow.

  • Gran cantidad de tareas. Cuantas más tareas haya para programar, más tráfico de red se generará. Esta consideración se aplica a la cantidad total de tareas en tus DAG y a la frecuencia de programación. Cuando el programador de Airflow programa las ejecuciones de DAG, realiza consultas a la base de datos de Airflow y genera tráfico.

  • La interfaz web de Airflow genera tráfico de red, ya que realiza consultas a la base de datos de Airflow. El uso intensivo de páginas con grafos, tareas y diagramas puede generar grandes volúmenes de tráfico de red.

El DAG provoca una falla en el servidor web de Airflow o hace que muestre un error 502 gateway timeout

Las fallas del servidor web pueden ocurrir por varias razones diferentes. Revisa los registros airflow-webserver en Cloud Logging para determinar la causa del error 502 gateway timeout.

Procesamiento pesado

Esta sección se aplica solo a Cloud Composer 1.

Evita ejecutar procesamientos pesados durante el tiempo de análisis del DAG.

A diferencia de los nodos trabajadores y programadores, cuyos tipos de máquinas se pueden personalizar para tener una mayor capacidad de CPU y memoria, el servidor web usa un tipo de máquina fijo, lo que puede generar fallas de análisis de DAG si el cálculo del tiempo de análisis es demasiado pesado.

Ten en cuenta que el servidor web tiene 2 CPU virtuales y 2 GB de memoria. El valor predeterminado para core-dagbag_import_timeout es 30 segundos. Este valor de tiempo de espera define el límite superior de tiempo que Airflow pasa cargando un módulo de Python en la carpeta dags/.

Permisos incorrectos

Esta sección se aplica solo a Cloud Composer 1.

El servidor web no se ejecuta con la misma cuenta de servicio que los trabajadores y el programador. Por lo tanto, es posible que los trabajadores y el programador puedan acceder a los recursos administrados por el usuario a los que el servidor web no puede acceder.

Te recomendamos que evites acceder a recursos no públicos durante el análisis del DAG. A veces, esto es inevitable y deberás otorgar permisos a la cuenta de servicio del servidor web. El nombre de la cuenta de servicio se deriva del dominio de tu servidor web. Por ejemplo, si el dominio es example-tp.appspot.com, la cuenta de servicio es example-tp@appspot.gserviceaccount.com.

Errores del DAG

Esta sección se aplica solo a Cloud Composer 1.

El servidor web se ejecuta en App Engine y es independiente del clúster de GKE de tu entorno. El servidor web analiza los archivos de definición del DAG, y puede mostrarse un mensaje 502 gateway timeout si hay errores en el DAG. Airflow funciona normalmente sin un servidor web funcional si el DAG problemático no interrumpe ningún proceso que se ejecute en GKE. En este caso, puedes usar gcloud composer environments run para recuperar detalles de tu entorno y como solución alternativa si el servidor web deja de estar disponible.

En otros casos, puedes ejecutar el análisis del DAG en GKE y buscar DAG que arrojen excepciones críticas de Python o para los que se haya agotado el tiempo de espera (30 segundos predeterminados). Para solucionar el problema, conéctate a un shell remoto en un contenedor de trabajador de Airflow y prueba los errores de sintaxis. Para obtener más información, consulta Probar DAG.

Error 504 cuando se accede al servidor web de Airflow

Consulta el Error 504 cuando accedes a la IU de Airflow.

Se genera la excepción Lost connection to MySQL server during query durante la ejecución de la tarea o justo después de ella

A menudo, ocurren excepciones de Lost connection to MySQL / PostgreSQL server during query cuando se cumplen las siguientes condiciones:

  • El DAG usa PythonOperator o un operador personalizado.
  • El DAG realiza consultas a la base de datos de Airflow.

Si se realizan varias consultas desde una función que admite llamadas, los objetos tracebacks pueden apuntar de forma incorrecta a la línea self.refresh_from_db(lock_for_update=True) en el código de Airflow. Es la primera consulta de la base de datos después de la ejecución de la tarea. La causa real de la excepción ocurre antes de esto, cuando una sesión de SQLAlchemy no se cierra de forma correcta.

Las sesiones de SQLAlchemy se limitan a un subproceso y se crean en una sesión de función que admite llamadas que pueden continuar más adelante dentro del código de Airflow. Si hay retrasos significativos entre las consultas dentro de una sesión, es posible que el servidor de MySQL o PostgreSQL ya cierre la conexión. El tiempo de espera de conexión en los entornos de Cloud Composer se establece en alrededor de 10 minutos.

Solución:

  • Usa el decorador airflow.utils.db.provide_session. Este decorador proporciona una sesión válida a la base de datos de Airflow en el parámetro session y cierra la sesión de forma correcta al final de la función.
  • No uses una sola función de larga duración. En cambio, mueve todas las consultas de base de datos a funciones separadas, de modo que haya varias funciones con el decorador airflow.utils.db.provide_session. En este caso, las sesiones se cierran de forma automática después de recuperar los resultados de la consulta.

Controlar el tiempo de ejecución de los DAG, las tareas y las ejecuciones paralelas del mismo DAG

Si quieres controlar la duración de una sola ejecución de DAG para un DAG en particular, puedes usar el parámetro dagrun_timeout de DAG. Por ejemplo, si esperas que una sola ejecución de DAG (independientemente de si la ejecución finaliza con éxito o no) no debe durar más de 1 hora, y se configura este parámetro en 3,600 segundos.

También puedes controlar cuánto tiempo quieres que dure una sola tarea de Airflow. Para hacerlo, puedes usar execution_timeout.

Si deseas controlar cuántas ejecuciones de DAG activas quieres tener para un DAG en particular, puedes usar la opción de configuración de Airflow [core]max-active-runs-per-dag.

Si deseas que solo se ejecute una instancia de un DAG en un momento determinado, establece el parámetro max-active-runs-per-dag en 1.

Problemas que afectan a los DAG y la sincronización de complementos con programadores, trabajadores y servidores web

Cloud Composer sincroniza el contenido de las carpetas /dags y /plugins con los programadores y los trabajadores. Algunos objetos de las carpetas /dags y /plugins pueden impedir que la sincronización funcione correctamente o, al menos, ralentizarla.

  • La carpeta /dags se sincroniza con programadores y trabajadores. Esta carpeta no se sincroniza con los servidores web en Cloud Composer 2 o si activas DAG Serialization en Cloud Composer 1.

  • La carpeta /plugins se sincroniza con programadores, trabajadores y servidores web.

Es posible que se presenten los siguientes problemas:

  • Subiste archivos comprimidos como gzip que usan transcodificación de compresión a carpetas /dags y /plugins. Por lo general, sucede si usas el comando gsutil cp -Z para subir datos al bucket.

    Solución: Borra el objeto que usó la transcodificación de compresión y vuelve a subirlo al bucket.

  • Uno de los objetos se llama '' ese objeto no está sincronizado con programadores y trabajadores, y es posible que deje de sincronizarse.

    Solución: Cambie el nombre de un objeto problemático.

  • Uno de los objetos de las carpetas /dags o /plugins contiene un símbolo / al final del nombre del objeto. Esos objetos pueden confundir el proceso de sincronización porque el símbolo / significa que un objeto es una carpeta, no un archivo.

    Solución: Quita el símbolo / del nombre del objeto problemático.

  • No almacenes archivos innecesarios en las carpetas /dags y /plugins.

    A veces, los DAG y los complementos que implementas están acompañados de archivos adicionales, como los archivos que almacenan pruebas de esos componentes. Estos archivos se sincronizan con los trabajadores y programadores, lo que afecta el tiempo necesario para copiarlos a programadores, trabajadores y servidores web.

    Solución: No almacenes archivos adicionales ni innecesarios en carpetas /dags y /plugins.

Los programadores y los trabajadores generan el error Done [Errno 21] Is a directory: '/home/airflow/gcs/dags/...'

Este problema ocurre porque los objetos pueden tener un espacio de nombres superpuesto en Cloud Storage, al mismo tiempo que los programadores y los trabajadores usan sistemas de archivos tradicionales. Por ejemplo, es posible agregar una carpeta y un objeto con el mismo nombre a un bucket de entorno. Cuando el bucket se sincroniza con los programadores y los trabajadores del entorno, se genera este error, que puede generar fallas en la tarea.

Para solucionar este problema, asegúrate de que no haya espacios de nombres superpuestos en el bucket del entorno. Por ejemplo, si el /dags/misc (un archivo) y /dags/misc/example_file.txt (otro archivo) se encuentran en un bucket, el programador genera un error.

Interrupciones transitorias cuando se conecta a la base de datos de metadatos de Airflow

Cloud Composer se ejecuta en la infraestructura de nube distribuida. Significa que, cada tanto, pueden aparecer algunos problemas transitorios que podrían interrumpir la ejecución de tus tareas de Airflow.

En esas situaciones, es posible que veas los siguientes mensajes de error en los registros de trabajadores de Airflow:

"Can't connect to MySQL server on 'airflow-sqlproxy-service.default.svc.cluster.local' (111)"

o

"Can't connect to MySQL server on 'airflow-sqlproxy-service.default.svc.cluster.local' (104)"

Estos problemas intermitentes también pueden deberse a las operaciones de mantenimiento realizadas en tus entornos de Cloud Composer.

Por lo general, estos errores son intermitentes y, si tus tareas de Airflow son idempotentes y tienes reintentos establecidos, debes ser inmune a ellas. También puedes definir períodos de mantenimiento.

Una razón adicional de estos errores podría ser la falta de recursos en el clúster de tu entorno. En esos casos, puedes escalar verticalmente u optimizar tu entorno como se describe en las instrucciones de Escalamiento de entornos o Optimización del entorno.

Un DAG no es visible en la IU de Airflow ni en la de DAG, y el programador no lo programa

El procesador de DAG analiza cada DAG antes de que el programador pueda programarlo y antes de que un DAG se vuelva visible en la IU de Airflow o en la IU de DAG. La opción de configuración de Airflow [core]dag_file_processor_timeout define cuánto tiempo tiene el procesador de DAG para analizar un solo DAG.

Si un DAG no es visible en la IU de Airflow o en la IU del DAG:

  • Verifica los registros del procesador de DAG si este puede procesar correctamente tu DAG. En caso de problemas, es posible que veas las siguientes entradas de registro en el procesador del DAG o en los registros del programador:
[2020-12-03 03:06:45,672] {dag_processing.py:1334} ERROR - Processor for
/usr/local/airflow/dags/example_dag.py with PID 21903 started at
2020-12-03T03:05:55.442709+00:00 has timed out, killing it.

Este error puede deberse a una de las siguientes razones:

  • Tu DAG no se implementó correctamente y el procesador del DAG no puede analizarlo. En este caso, corrija su DAG.

  • El análisis de tu DAG tarda más de la cantidad de segundos definidos en [core]dag_file_processor_timeout.

    En este caso, puedes aumentar este tiempo de espera.

  • Si tu DAG tarda mucho tiempo en analizarse, también puede significar que no se implementa de manera óptima. Por ejemplo, si lee muchas variables de entorno o realiza llamadas a servicios externos. Si este es el caso, optimiza tu DAG para que el procesador de DAG pueda procesarlo con rapidez.

Síntomas de la base de datos de Airflow en carga

A veces, en los registros de trabajador de Airflow, puedes ver las siguientes entradas de registro de advertencia.

Para MySQL:

(_mysql_exceptions.OperationalError) (2006, "Lost connection to MySQL server at
'reading initial communication packet', system error: 0")"

En PostgreSQL:

psycopg2.OperationalError: connection to server at ... failed

Este error o una advertencia pueden indicar que la base de datos de Airflow está abrumada por la cantidad de consultas que debe controlar.

Soluciones posibles:

¿Qué sigue?