Soluciona problemas del procesador de DAG

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

En esta página, solo se abordan los problemas relacionados con el procesamiento de archivos DAG. Para solucionar problemas de programación de tareas, consulta Soluciona problemas del programador de Airflow.

Flujo de trabajo para la solución de problemas

Inspecciona los registros del procesador de DAG

Si tienes DAGs complejos, es posible que el procesador de DAGs no analice todos tus DAGs. Esto puede generar muchos problemas que tienen los siguientes síntomas.

Síntomas:

  • Si el procesador de DAG tiene problemas para analizar tus DAGs, es posible que se produzca una combinación de los problemas que se indican aquí. Si los DAG se generan de forma dinámica, estos problemas pueden tener un mayor impacto en comparación con los DAG estáticos.

  • Los DAG no son visibles en la IU de Airflow ni en la IU de DAG.

  • Los DAG no están programados para ejecutarse.

  • Hay errores en los registros del procesador de DAG, por ejemplo:

    dag-processor-manager [2023-04-21 21:10:44,510] {manager.py:1144} ERROR -
    Processor for /home/airflow/gcs/dags/dag-example.py with PID 68311 started
    at 2023-04-21T21:09:53.772793+00:00 has timed out, killing it.
    

    o

    dag-processor-manager [2023-04-26 06:18:34,860] {manager.py:948} ERROR -
    Processor for /home/airflow/gcs/dags/dag-example.py exited with return
    code 1.
    
  • Los procesadores de DAG experimentan problemas que provocan reinicios.

  • Se cancelan las tareas de Airflow programadas para su ejecución, y las ejecuciones de DAG para los DAG que no se pudieron analizar se pueden marcar como failed. Por ejemplo:

    airflow-scheduler Failed to get task '<TaskInstance: dag-example.task1--1
    manual__2023-04-17T10:02:03.137439+00:00 [removed]>' for dag
    'dag-example'. Marking it as removed.
    

Solution:

  • Aumenta los parámetros relacionados con el análisis del DAG:

  • Corrige o quita los DAGs que causan problemas en el procesador de DAG.

Inspecciona los tiempos de análisis del DAG

Para verificar si el problema ocurre durante el análisis del DAG, sigue estos pasos.

Console

En Google Cloud console, puedes usar la página Monitoring y la pestaña Logs para inspeccionar los tiempos de análisis del DAG.

Inspecciona los tiempos de análisis del DAG con la página de supervisión de Cloud Composer:

  1. En la consola de Google Cloud , ve a la página Entornos.

    [Ir a Entornos][console-list-env]

  2. En la lista de entornos, haz clic en el nombre de tu entorno. Se abrirá la página Supervisión.

  3. En la pestaña Monitoring, revisa el gráfico Tiempo total de análisis de todos los archivos DAG en la sección Ejecuciones de DAG y, luego, identifica posibles problemas.

    En la sección Ejecuciones de DAG, en la pestaña Monitoring de Composer, se muestran las métricas de estado de los DAG en tu entorno

Inspecciona los tiempos de análisis del DAG con la pestaña Registros de Cloud Composer:

  1. En la consola de Google Cloud , ve a la página Entornos.

    [Ir a Entornos][console-list-env]

  2. En la lista de entornos, haz clic en el nombre de tu entorno. Se abrirá la página Supervisión.

  3. Ve a la pestaña Registros y, en el árbol de navegación Todos los registros, selecciona la sección Administrador de procesadores de DAG.

  4. Revisa los registros de dag-processor-manager y detecta posibles problemas.

    En los registros del procesador de DAG, se mostrarán los tiempos de análisis del DAG

gcloud

Usa el comando dags report para ver el tiempo de análisis de todos tus DAG.

gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags report

Reemplaza lo siguiente:

  • ENVIRONMENT_NAME por el nombre del entorno.
  • LOCATION por la región en la que se encuentra el entorno.

El resultado del comando es similar al siguiente:

Executing within the following Kubernetes cluster namespace: composer-2-0-31-airflow-2-3-3
file                  | duration       | dag_num | task_num | dags
======================+================+=========+==========+===================
/manydagsbig.py       | 0:00:00.038334 | 2       | 10       | serial-0,serial-0
/airflow_monitoring.py| 0:00:00.001620 | 1       | 1        | airflow_monitoring

Busca el valor de duración para cada uno de los DAGs que se indican en la tabla. Un valor alto puede indicar que uno de tus DAG no se implementa de manera óptima. En la tabla de resultados, puedes identificar qué DAG tienen un tiempo de análisis prolongado.

Soluciona problemas durante el análisis de DAG

En las siguientes secciones, se describen los síntomas y las posibles soluciones para algunos problemas comunes durante el análisis de DAG.

Cantidad limitada de subprocesos

Permitir que el administrador del procesador de DAG use solo una cantidad limitada de subprocesos podría afectar el tiempo de análisis del DAG.

Para resolver el problema, anula las siguientes opciones de configuración de Airflow:

  • Reemplaza el parámetro parsing_processes:

    Sección Clave Valor Notas
    scheduler parsing_processes NUMBER_OF_CPUs_IN_SCHEDULER - 1 Reemplaza NUMBER_OF_CPUs_IN_SCHEDULER por la cantidad de CPU
    en el programador.

Haz que el procesador de DAG ignore archivos innecesarios

Puedes mejorar el rendimiento del procesador de DAG si omites los archivos innecesarios en la carpeta de DAG. El procesador de DAG ignora los archivos y las carpetas especificados en el archivo .airflowignore.

Para que el procesador de DAG ignore archivos innecesarios, haz lo siguiente:

  1. Crea un archivo .airflowignore.
  2. En este archivo, se muestra una lista de los archivos y las carpetas que se deben ignorar.
  3. Sube este archivo a la carpeta /dags en el bucket de tu entorno.

Para obtener más información sobre el formato de archivo .airflowignore, consulta la documentación de Airflow.

Airflow procesa los DAG pausados

Los usuarios de Airflow pausan los DAG para evitar su ejecución. Esto ahorra ciclos de procesamiento de los trabajadores de Airflow.

Airflow continuará analizando los DAG pausados. Si realmente deseas mejorar el rendimiento del procesador de DAG, usa .airflowignore o borra los DAG pausados de la carpeta de DAG.

Problemas comunes

En las siguientes secciones, se describen los síntomas y las posibles soluciones para algunos problemas comunes de análisis.

Tiempo de espera de importación de carga de DAG

Síntoma:

  • En la interfaz web de Airflow, en la parte superior de la página de la lista de DAG, aparece un cuadro de alerta rojo que muestra Broken DAG: [/path/to/dagfile] Timeout.
  • En Cloud Monitoring: Los registros de airflow-scheduler contienen entradas similares a las siguientes:

    • ERROR - Process timed out
    • ERROR - Failed to import: /path/to/dagfile
    • AirflowTaskTimeout: Timeout

Solución:

Anula la opción de configuración de Airflow dag_file_processor_timeout y permite más tiempo para el análisis del DAG:

Sección Clave Valor
core dag_file_processor_timeout Nuevo valor del tiempo de espera

No se ve un DAG en la IU de Airflow o en la IU del DAG, y el programador no lo programa

El procesador de DAG analiza cada DAG antes de que el programador pueda programarlo y antes de que un DAG se haga visible en la IU de Airflow o la IU de DAG.

Las siguientes opciones de configuración de Airflow definen los tiempos de espera para analizar los DAGs:

Si un DAG no está visible en la IU de Airflow o en la IU del DAG, haz lo siguiente:

  • Consulta los registros del procesador de DAG si este puede procesar correctamente tu DAG. En caso de problemas, es posible que veas las siguientes entradas de registro en los registros del procesador de DAG o del programador:

    [2020-12-03 03:06:45,672] {dag_processing.py:1334} ERROR - Processor for
    /usr/local/airflow/dags/example_dag.py with PID 21903 started at
    2020-12-03T03:05:55.442709+00:00 has timed out, killing it.
    
  • Verifica los registros del programador para ver si funciona correctamente. En caso de problemas, es posible que veas las siguientes entradas de registro en los registros del programador:

    DagFileProcessorManager (PID=732) last sent a heartbeat 240.09 seconds ago! Restarting it
    Process timed out, PID: 68496
    

Soluciones:

  • Corrige todos los errores de análisis del DAG. El procesador de DAG analiza varios DAG y, en casos excepcionales, los errores de análisis de un DAG pueden afectar negativamente el análisis de otros DAG.

  • Si el análisis de tu DAG tarda más que la cantidad de segundos definida en [core]dagbag_import_timeout, aumenta este tiempo de espera.

  • Si el análisis de todos tus DAG tarda más que la cantidad de segundos definida en [core]dag_file_processor_timeout, aumenta este tiempo de espera.

  • Si tu DAG tarda mucho en analizarse, también puede significar que no se implementó de manera óptima. Por ejemplo, si lee muchas variables de entorno o realiza llamadas a servicios externos o a la base de datos de Airflow. En la medida de lo posible, evita realizar estas operaciones en las secciones globales de los DAG.

  • Aumenta los recursos de CPU y memoria del programador para que pueda funcionar más rápido.

  • Ajusta la cantidad de programadores.

  • Aumenta la cantidad de procesos de procesadores de DAG para que el análisis se pueda realizar más rápido. Para ello, aumenta el valor de [scheduler]parsing_process.

  • Reduce la frecuencia del análisis del DAG.

  • Reduce la carga en la base de datos de Airflow.

¿Qué sigue?