Solucionar problemas del procesador de DAGs

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Esta página solo hace referencia a problemas relacionados con el procesamiento de archivos DAG. Si tienes problemas para programar tareas, consulta el artículo Solucionar problemas del programador de Airflow.

Solucionar problemas de flujo de trabajo

Inspeccionar los registros del procesador de DAG

Si tienes DAGs complejos, es posible que los procesadores de DAGs de Airflow no analicen todos tus DAGs. Esto puede provocar muchos problemas que tengan los siguientes síntomas.

Síntomas:

  • Si un procesador de DAG tiene problemas al analizar tus DAGs, es posible que se produzca una combinación de los problemas indicados. Si los DAGs se generan de forma dinámica, estos problemas pueden tener un mayor impacto en comparación con los DAGs estáticos.

    • Los DAGs no se muestran en la interfaz de usuario de Airflow ni en la interfaz de usuario de DAGs.
    • Los DAGs no están programados para ejecutarse.
    • Hay errores en los registros del procesador de DAG. Por ejemplo:
    dag-processor-manager [2023-04-21 21:10:44,510] {manager.py:1144} ERROR -
    Processor for /home/airflow/gcs/dags/dag-example.py with PID 68311 started
    at 2023-04-21T21:09:53.772793+00:00 has timed out, killing it.
    

    o

    dag-processor-manager [2023-04-26 06:18:34,860] {manager.py:948} ERROR -
    Processor for /home/airflow/gcs/dags/dag-example.py exited with return
    code 1.
    
  • Los procesadores de DAG tienen problemas que provocan que se reinicie el programador de Airflow.

  • Las tareas de Airflow programadas para ejecutarse se cancelan y las ejecuciones de DAGs que no se hayan podido analizar se pueden marcar como failed. Por ejemplo:

    airflow-scheduler Failed to get task '<TaskInstance: dag-example.task1--1
    manual__2023-04-17T10:02:03.137439+00:00 [removed]>' for dag
    'dag-example'. Marking it as removed.
    

Solución:

  • Aumenta los parámetros relacionados con el análisis de DAG:

  • Corrija o elimine los DAGs que causen problemas en los procesadores de DAGs.

Inspeccionar los tiempos de análisis de DAGs

Para verificar si el problema se produce durante el tiempo de análisis del DAG, sigue estos pasos.

Consola

En la consola, puedes usar la página Monitorización y la pestaña Registros para inspeccionar los tiempos de análisis de los DAGs. Google Cloud

Inspecciona los tiempos de análisis de los DAGs con la página de monitorización de Cloud Composer:

  1. En la Google Cloud consola, ve a la página Entornos.

    [Ir a Entornos][console-list-env]

  2. En la lista de entornos, haz clic en el nombre del entorno. Se abrirá la página Monitorización.

  3. En la pestaña Monitorización, selecciona Estadísticas de DAG y consulta el gráfico Tiempo total de análisis de todos los archivos DAG para identificar posibles problemas. Te recomendamos que monitorices este gráfico durante un tiempo para identificar problemas de análisis de DAG en varios ciclos de análisis de DAG.

    El tiempo total de análisis de todos los archivos DAG se muestra en la pestaña Monitorización.

Inspecciona los tiempos de análisis de los DAGs con la pestaña Registros de Cloud Composer:

  1. En la Google Cloud consola, ve a la página Entornos.

    [Ir a Entornos][console-list-env]

  2. En la lista de entornos, haz clic en el nombre del entorno. Se abrirá la página Monitorización.

  3. Vaya a la pestaña Registros y, en el árbol de navegación Todos los registros, seleccione la sección Administrador del procesador de DAG.

  4. Revisa los dag-processor-managerregistros e identifica posibles problemas.

    Los registros del procesador de DAG mostrarán los tiempos de análisis de los DAGs

gcloud

Usa el comando dags report para ver el tiempo de análisis de todos tus DAGs.

gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags report

Sustituye:

  • ENVIRONMENT_NAME con el nombre del entorno.
  • LOCATION con la región en la que se encuentra el entorno.

La salida del comando es similar a la siguiente:

file                  | duration       | dag_num | task_num | dags
======================+================+=========+==========+===================
/manydagsbig.py       | 0:00:00.038334 | 2       | 10       | serial-0,serial-0
/airflow_monitoring.py| 0:00:00.001620 | 1       | 1        | airflow_monitoring

Busque el valor de duración de cada uno de los dags que aparecen en la tabla. Un valor alto puede indicar que uno de tus DAGs no se ha implementado de forma óptima. En la tabla de resultados, puedes identificar los DAGs que tienen un tiempo de análisis prolongado.

Solucionar problemas durante el tiempo de análisis de DAGs

En las siguientes secciones se describen los síntomas y las posibles soluciones de algunos problemas habituales que se producen durante el tiempo de análisis de los DAGs.

Número limitado de cadenas

Si permites que los procesadores de DAG usen solo un número limitado de subprocesos, puede que se vea afectado el tiempo de análisis de los DAG.

Para solucionar el problema, anula la opción de configuración de [scheduler]parsing_processes Airflow. Define el valor inicial como el número de vCPUs que usa el programador menos una CPU. Por ejemplo, si el programador usa 2 vCPUs, asigna el valor 1. A continuación, ajusta la asignación de recursos del programador para que funcione a aproximadamente el 70% de la capacidad de vCPU o de memoria.

Hacer que el procesador de DAG ignore los archivos innecesarios

Puedes mejorar el rendimiento del procesador de DAGs de Airflow (que se ejecuta junto con el programador de Airflow en Cloud Composer 2) si excluyes los archivos innecesarios de la carpeta DAGs. El procesador de DAGs de Airflow ignora los archivos y las carpetas especificados en el archivo .airflowignore.

Para que el procesador de DAGs de Airflow ignore los archivos innecesarios, haz lo siguiente:

  1. Crea un archivo .airflowignore.
  2. En este archivo, enumera los archivos y las carpetas que se deben ignorar.
  3. Sube este archivo a la carpeta /dags del segmento de tu entorno.

Para obtener más información sobre el formato de archivo .airflowignore, consulta la documentación de Airflow.

Airflow procesa los DAGs en pausa

Puedes pausar los DAGs para que dejen de ejecutarse. De esta forma, se ahorran recursos de los trabajadores de Airflow.

Los procesadores de DAGs de Airflow siguen analizando los DAGs pausados. Si quieres mejorar el rendimiento de los procesadores de DAGs, usa .airflowignore o elimina los DAGs pausados de la carpeta de DAGs.

Incidencias frecuentes

En las siguientes secciones se describen los síntomas y las posibles soluciones de algunos problemas de análisis habituales.

Tiempo de espera de importación de carga de DAG

Síntoma:

  • En la interfaz web de Airflow, en la parte superior de la página de la lista de DAGs, se muestra un cuadro de alerta rojo Broken DAG: [/path/to/dagfile] Timeout.
  • En Cloud Monitoring: los registros airflow-scheduler contienen entradas similares a las siguientes:

    • ERROR - Process timed out
    • ERROR - Failed to import: /path/to/dagfile
    • AirflowTaskTimeout: Timeout

Solución:

Anula la opción de configuración de dag_file_processor_timeout Airflow y permite que se tarde más en analizar el DAG:

Sección Clave Valor
core dag_file_processor_timeout Valor nuevo de tiempo de espera

Un DAG no está visible en la interfaz de usuario de Airflow o en la interfaz de usuario de DAG y el programador no lo programa

El procesador de DAGs analiza cada DAG antes de que el programador pueda programarlo y antes de que un DAG se muestre en la interfaz de usuario de Airflow o en la interfaz de usuario de DAGs.

Las siguientes opciones de configuración de Airflow definen los tiempos de espera para analizar los DAGs:

Si un DAG no se ve en la interfaz de usuario de Airflow o en la interfaz de usuario de DAG, haz lo siguiente:

  • Consulta los registros del procesador de DAG para comprobar si puede procesar correctamente tu DAG. Si hay algún problema, es posible que veas las siguientes entradas de registro en los registros del procesador de DAG o del programador:

    [2020-12-03 03:06:45,672] {dag_processing.py:1334} ERROR - Processor for
    /usr/local/airflow/dags/example_dag.py with PID 21903 started at
    2020-12-03T03:05:55.442709+00:00 has timed out, killing it.
    
  • Consulta los registros del programador para ver si funciona correctamente. Si hay problemas, es posible que veas las siguientes entradas de registro en los registros del programador:

    DagFileProcessorManager (PID=732) last sent a heartbeat 240.09 seconds ago! Restarting it
    Process timed out, PID: 68496
    

Soluciones:

  • Corrija todos los errores de análisis de DAG. El procesador de DAGs analiza varios DAGs y, en casos excepcionales, los errores de análisis de un DAG pueden afectar negativamente al análisis de otros DAGs.

  • Si el análisis de tu DAG tarda más de los segundos definidos en [core]dagbag_import_timeout, aumenta este tiempo de espera.

  • Si el análisis de todos tus DAGs tarda más de los segundos definidos en [core]dag_file_processor_timeout, aumenta este tiempo de espera.

  • Si tu DAG tarda mucho en analizarse, también puede significar que no se ha implementado de forma óptima. Por ejemplo, si lee muchas variables de entorno o realiza llamadas a servicios externos o a la base de datos de Airflow. En la medida de lo posible, evite realizar estas operaciones en secciones globales de los DAGs.

  • Aumenta los recursos de CPU y memoria de Scheduler para que pueda funcionar más rápido.

  • Ajusta el número de programadores.

  • Aumenta el número de procesos del procesador de DAG para que el análisis se pueda realizar más rápido. Para ello, aumenta el valor de [scheduler]parsing_process.

  • Reduce la frecuencia del análisis de DAGs.

  • Reducir la carga de la base de datos de Airflow.

Siguientes pasos