Solución de problemas del DAG (flujos de trabajo)

En esta página, se proporcionan información y pasos para solucionar problemas comunes en el flujo de trabajo.

Solución de problemas en el flujo de trabajo

Para comenzar con la solución de problemas, siga estos pasos:

  1. Consulta los registros de Airflow.
  2. Revisa el conjunto de operaciones de Google Cloud.
  3. En Cloud Console, busca errores en las páginas de los componentes de Google Cloud que ejecutan tu entorno.
  4. En la interfaz web de Airflow, consulta la Vista de gráfico del DAG para ver las instancias de tareas fallidas.

    Sugerencia: Para navegar por un DAG grande en busca de instancias de tareas fallidas, cambia la orientación de la vista de gráfico de LR a RL. Para ello, anula la configuración predeterminada dag_orientation del servidor web:

    Sección Clave Valor
    webserver dag_orientation LR, TB, RL, oBT

Depura fallas del operador

Para depurar una falla del operador, sigue estos pasos:

  1. Verifica si hay errores específicos de la tarea.
  2. Consulta los registros de Airflow.
  3. Revisa el conjunto de operaciones de Google Cloud.
  4. Verifica los registros específicos del operador.
  5. Corrige los errores.
  6. Sube el DAG a la carpeta dags/.
  7. En la interfaz web de Airflow, borra los estados anteriores del DAG.
  8. Reanuda o ejecuta el DAG.

Problemas comunes

En las siguientes secciones, se describen los síntomas y las posibles soluciones para algunos problemas comunes del DAG.

La tarea falla sin emitir registros

Los pods de Google Kubernetes Engine están sujetos al ciclo de vida de los pods de Kubernetes y a la expulsión de pods. Los aumentos repentinos de tareas y la programación conjunta de los trabajadores son dos de las causas más comunes de expulsión de pods en Cloud Composer.

La expulsión de pods puede ocurrir cuando un pod en particular usa recursos de un nodo, en relación con las expectativas de consumo de recursos configuradas para el nodo. Por ejemplo, la expulsión puede ocurrir cuando varias tareas con alto contenido de memoria se ejecutan en un pod y su carga combinada hace que el nodo en el que se ejecuta este pod supere el límite de consumo de memoria.

Si se expulsa un pod de trabajador de Airflow, todas las instancias de tareas que se ejecutan en ese pod se interrumpen y, luego, se marcan como con errores en Airflow.

Los registros están almacenados en búfer. Si se expulsa un pod trabajador antes de que se vacíe el búfer, no se emiten registros. La falla de la tarea sin registros indica que los trabajadores de Airflow se reiniciaron debido a la falta de memoria (OOM). Algunos registros pueden estar presentes en Cloud Logging, aunque los registros de Airflow no se hayan emitido. Puedes ver los registros, por ejemplo, mediante la selección de tu entorno en Google Cloud Console, la navegación aRegistros y la visualización de los registros de los trabajadores individuales enTodos los registros ->Registros de Airflow ->Trabajadores ->(trabajador individual).

La ejecución del DAG tiene RAM limitada. Cada ejecución de la tarea comienza con dos procesos de Airflow: ejecución de la tarea y supervisión. Actualmente, cada nodo puede realizar hasta 6 tareas simultáneas (aproximadamente 12 procesos cargados con módulos de Airflow). Se puede consumir más memoria, según el tamaño del DAG.

Síntoma

  1. En Cloud Console, ve al panel de Kubernetes Engine -> Cargas de trabajo.

    Abrir el panel de cargas de trabajo

  2. Si hay pods airflow-worker que muestran Evicted, haz clic en cada pod expulsado y busca el mensaje The node was low on resource: memory en la parte superior de la ventana.

Corregir

Tiempo de espera de importación de carga de DAG

Síntoma:

  • IU web de Airflow: en la parte superior de la página de la lista de DAG, aparece un cuadro de alerta rojo que muestra Broken DAG: [/path/to/dagfile] Timeout.
  • Conjunto de operaciones de Google Cloud: los registros de airflow-scheduler contienen entradas similares a:
    • “ERROR: Se agotó el tiempo de espera del proceso”
    • “ERROR: No se pudo importar: /path/to/dagfile”
    • “AirflowTaskTimeout: tiempo de espera”

Solución:

Anula la opción de configuración de Airflow de dag_file_processor_timeout y permite más tiempo para el análisis de DAG:

Sección Clave Valor
core dag_file_processor_timeout Nuevo valor del tiempo de espera

Mayor tráfico de red desde y hacia la base de datos de Airflow

La cantidad de tráfico de red entre el clúster de GKE de tu entorno y la base de datos de Airflow depende de la cantidad de DAG, de tareas en DAG, y de cómo los DAG acceden a los datos en la base de datos de Airflow. Los siguientes factores pueden influir en el uso de la red:

  • Consultas a la base de datos de Airflow Si tus DAG realizan muchas consultas, generan grandes cantidades de tráfico. Por ejemplo, verificar el estado de las tareas antes de continuar con otras tareas, consultar la tabla XCom y volcar el contenido de la base de datos de Airflow.

  • Gran cantidad de tareas. Cuantas más tareas haya para programar, más tráfico de red se generará. Esta consideración se aplica a la cantidad total de tareas en tus DAG y a la frecuencia de programación. Cuando el programador de Airflow programa las ejecuciones de DAG, realiza consultas a la base de datos de Airflow y genera tráfico.

  • La interfaz web de Airflow genera tráfico de red, ya que realiza consultas a la base de datos de Airflow. El uso intensivo de páginas con grafos, tareas y diagramas puede generar grandes volúmenes de tráfico de red.

El DAG provoca una falla en el servidor web de Airflow o hace que muestre un error 502 gateway timeout

Las fallas del servidor web pueden ocurrir por varias razones diferentes. Verifica los registros de airflow-webserver en Cloud Logging para determinar la causa del error 502 gateway timeout.

Procesamiento pesado

Evita ejecutar procesamientos pesados durante el tiempo de análisis del DAG. A diferencia de los nodos de trabajador y programador, cuyos tipos de máquina pueden personalizarse para tener mayor capacidad de CPU y memoria, el servidor web usa un tipo de máquina fijo que puede provocar errores de análisis del DAG si el procesamiento es demasiado pesado.

Ten en cuenta que el servidor web tiene 2 CPU virtuales y 2 GB de memoria. El valor predeterminado para core-dagbag_import_timeout es 30 segundos. Este valor de tiempo de espera define el límite superior de tiempo que Airflow pasa cargando un módulo de Python en la carpeta dags/.

Permisos incorrectos

El servidor web no se ejecuta con la misma cuenta de servicio que los trabajadores y el programador. Por lo tanto, es posible que los trabajadores y el programador puedan acceder a los recursos administrados por el usuario a los que el servidor web no puede acceder.

Te recomendamos que evites acceder a recursos no públicos durante el análisis del DAG. A veces, esto es inevitable y deberás otorgar permisos a la cuenta de servicio del servidor web. El nombre de la cuenta de servicio se deriva del dominio de tu servidor web. Por ejemplo, si el dominio es foo-tp.appspot.com, la cuenta de servicio es foo-tp@appspot.gserviceaccount.com.

Errores del DAG

El servidor web se ejecuta en App Engine y es independiente del clúster de GKE de tu entorno. El servidor web analiza los archivos de definición del DAG, y puede mostrarse un mensaje 502 gateway timeout si hay errores en el DAG. Airflow funciona de forma normal sin un servidor web funcional, si el DAG problemático no interrumpe ningún proceso que se ejecute en GKE. En este caso, puedes usar gcloud composer environments run para recuperar detalles de tu entorno y como una solución si el servidor web deja de estar disponible.

En otros casos, puedes ejecutar el análisis del DAG en GKE y buscar DAG que arrojen excepciones críticas de Python o para los que se haya agotado el tiempo de espera (30 segundos predeterminados). Para solucionar el problema, conéctate a un shell remoto en un contenedor de trabajador de Airflow y prueba los errores de sintaxis. Para obtener más información, consulta Probar DAG.

Se genera la excepción Lost connection to MySQL server during query durante la ejecución de la tarea o justo después de ella

Las excepciones de Lost connection to MySQL server during query suelen ocurrir cuando se cumplen las siguientes condiciones:

  • El DAG usa PythonOperator o un operador personalizado.
  • El DAG realiza consultas a la base de datos de Airflow.

Si se realizan varias consultas desde una función que admite llamadas, los objetos tracebacks pueden apuntar de forma incorrecta a la línea self.refresh_from_db(lock_for_update=True) en el código de Airflow. Es la primera consulta de la base de datos después de la ejecución de la tarea. La causa real de la excepción ocurre antes de esto, cuando una sesión de SQLAlchemy no se cierra de forma correcta.

Las sesiones de SQLAlchemy se limitan a un subproceso y se crean en una sesión de función que admite llamadas que pueden continuar más adelante dentro del código de Airflow. Si hay retrasos significativos entre las consultas dentro de una sesión, es posible que el servidor de MySQL o PostgreSQL ya cierre la conexión. El tiempo de espera de conexión en los entornos de Cloud Composer se establece en alrededor de 10 minutos.

Solución:

  • Usa el decorador airflow.utils.db.provide_session. Este decorador proporciona una sesión válida a la base de datos de Airflow en el parámetro session y cierra la sesión de forma correcta al final de la función.
  • No uses una sola función de larga duración. En cambio, mueve todas las consultas de base de datos a funciones separadas, de modo que haya varias funciones con el decorador airflow.utils.db.provide_session. En este caso, las sesiones se cierran de forma automática después de recuperar los resultados de la consulta.