Soluciona problemas del programador de Airflow

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

En esta página, se proporcionan pasos para solucionar problemas e información de problemas problemas con los programadores de Airflow.

Identifica el origen del problema

Para comenzar a solucionar problemas, primero debes identificar si el problema ocurre en el momento del análisis del DAG o mientras se procesan las tareas en el momento de la ejecución. Para obtener más información sobre el tiempo de análisis y el tiempo de ejecución, consulta la Diferencia entre el tiempo de análisis y de ejecución del DAG.

Inspecciona los registros del procesador de DAG

Si tiene DAG complejos, entonces el procesador de DAG, que ejecuta el programador, podría no analizar todos tus DAG. Esto puede dar lugar a muchos problemas que tienes los siguientes síntomas.

Síntomas:

  • Si el procesador de DAG encuentra problemas al analizar tus DAG, podría generar una combinación de los problemas que se enumeran a continuación. Si los DAG se generan de forma dinámica, estos problemas pueden tener un mayor impacto en comparación con los DAG estáticos.

  • Los DAG no son visibles en la IU de Airflow ni en la IU del DAG.

  • Los DAG no están programados para ejecutarse.

  • Hay errores en los registros del procesador de DAG, por ejemplo:

    dag-processor-manager [2023-04-21 21:10:44,510] {manager.py:1144} ERROR -
    Processor for /home/airflow/gcs/dags/dag-example.py with PID 68311 started
    at 2023-04-21T21:09:53.772793+00:00 has timed out, killing it.
    

    o

    dag-processor-manager [2023-04-26 06:18:34,860] {manager.py:948} ERROR -
    Processor for /home/airflow/gcs/dags/dag-example.py exited with return
    code 1.
    
  • Los programadores de Airflow experimentan problemas que provocan reinicios del programador.

  • Se cancelan las tareas de Airflow que están programadas para ejecutarse y las ejecuciones de DAG para los DAG que no se pudieron analizar, es posible que se marquen como failed. Por ejemplo:

    airflow-scheduler Failed to get task '<TaskInstance: dag-example.task1--1
    manual__2023-04-17T10:02:03.137439+00:00 [removed]>' for dag
    'dag-example'. Marking it as removed.
    

Solution:

  • Aumenta los parámetros relacionados con el análisis del DAG:

  • Corrige o quita los DAG que causen problemas en el procesador de DAG.

Inspecciona los tiempos de análisis del DAG

Para verificar si el problema ocurre durante el análisis del DAG, sigue estos pasos.

Console

En la consola de Google Cloud puedes usar la página Monitoring y la pestaña Registros para inspeccionar los tiempos de análisis del DAG.

Inspecciona los tiempos de análisis del DAG con la página de supervisión de Cloud Composer:

  1. En la consola de Google Cloud, ve a la página Entornos.

    Ir a Entornos

  2. En la lista de entornos, haz clic en el nombre de tu entorno. Se abre la página Supervisión.

  3. En la pestaña Monitoring, revisa el gráfico Tiempo total de análisis de todos los archivos DAG en la sección Ejecuciones de DAG y, luego, identifica posibles problemas.

    En la sección Ejecuciones de DAG de la pestaña Composer Monitoring, se muestran las métricas de estado de los DAG de tu entorno.

Inspecciona los tiempos de análisis del DAG con la pestaña Registros de Cloud Composer:

  1. En la consola de Google Cloud, ve a la página Entornos.

    Ir a Entornos

  2. En la lista de entornos, haz clic en el nombre de tu entorno. Se abre la página Supervisión.

  3. Ve a la pestaña Registros y desde el árbol de navegación Todos los registros Selecciona la sección Administrador del procesador de DAG.

  4. Revisa los registros de dag-processor-manager y, luego, identifica posibles problemas.

    Los registros del procesador del DAG mostrarán los tiempos de análisis del DAG

gcloud: Airflow 1

Usa el comando list_dags con la marca -r para ver el tiempo de análisis de todos tus DAG.

gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    list_dags -- -r

Reemplaza lo siguiente:

  • ENVIRONMENT_NAME por el nombre del entorno.
  • LOCATION por la región en la que se encuentra el entorno

El resultado del comando es similar al siguiente:

-------------------------------------------------------------------
DagBag loading stats for /home/airflow/gcs/dags
-------------------------------------------------------------------
Number of DAGs: 5
Total task number: 13
DagBag parsing time: 0.6765180000000001
-----------+----------+---------+----------+-----------------------
file       | duration | dag_num | task_num | dags
-----------+----------+---------+----------+-----------------------
/dag_1.py  | 0.6477   |       1 |        2 | ['dag_1']
/dag_2.py  | 0.018652 |       1 |        2 | ['dag_2']
/dag_3.py  | 0.004024 |       1 |        6 | ['dag_3']
/dag_4.py  | 0.003476 |       1 |        2 | ['dag_4']
/dag_5.py  | 0.002666 |       1 |        1 | ['dag_5']
-----------+----------+---------+----------+-----------------------

Busca el valor de tiempo de análisis de DagBag. Un valor alto puede indicar que uno de tus DAG no se implementa de manera óptima. En la tabla de resultados, puedes identificar qué DAG tienen un tiempo de análisis prolongado.

gcloud: Airflow 2

Usa el comando dags report para ver el tiempo de análisis de todos tus DAG.

gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags report

Reemplaza lo siguiente:

  • ENVIRONMENT_NAME por el nombre del entorno.
  • LOCATION por la región en la que se encuentra el entorno

El resultado del comando es similar al siguiente:

Executing within the following Kubernetes cluster namespace: composer-2-0-31-airflow-2-3-3
file                  | duration       | dag_num | task_num | dags
======================+================+=========+==========+===================
/manydagsbig.py       | 0:00:00.038334 | 2       | 10       | serial-0,serial-0
/airflow_monitoring.py| 0:00:00.001620 | 1       | 1        | airflow_monitoring

Busca el valor de duration de cada uno de los DAG que aparecen en la tabla. Un valor alto puede indicar que uno de tus DAG no está implementado de forma óptima. En la tabla de salida, puedes identificar qué DAG tienen mucho tiempo de análisis.

Supervisa tareas en ejecución y en cola

Para verificar si hay tareas atascadas en una cola, sigue estos pasos.

  1. En la consola de Google Cloud, ve a la página Entornos.

    Ir a Entornos

  2. En la lista de entornos, haz clic en el nombre de tu entorno. Se abrirá la página Detalles del entorno.

  3. Ve a la pestaña Monitoring.

  4. En la pestaña Supervisión, revisa el gráfico de Tareas de Airflow. en la sección Ejecuciones de DAG para identificar posibles problemas. Tareas de Airflow son tareas que están en estado en cola en Airflow, pueden dirigirse a Cola del agente del ejecutor de Celery o Kubernetes. Las tareas en cola de Celery son tareas que se ponen en la cola del agente de Celery.

Soluciona problemas durante el análisis de DAG

En las siguientes secciones, se describen los síntomas y las posibles soluciones para algunos problemas comunes durante el análisis de DAG.

Análisis y programación del DAG en Cloud Composer 1 y Airflow 1

La eficiencia del análisis del DAG mejoró significativamente en Airflow 2. Si experimente problemas de rendimiento relacionados con el análisis y la programación del DAG, considere con la migración a Airflow 2.

En Cloud Composer 1, el programador se ejecuta en nodos del clúster junto con otros Componentes de Cloud Composer. Debido a esto, la carga de instancias los nodos del clúster pueden ser más altos o más bajos en comparación con otros nodos. La vista del programador el rendimiento (análisis y programación del DAG) puede variar según el nodo en la que se ejecuta el programador. Además, un nodo individual donde el del programador pueden cambiar como resultado de las operaciones de actualización o mantenimiento. Esta limitación se resolvió en Cloud Composer 2, donde puedes asignar Los recursos de CPU y memoria para el programador y el rendimiento del programador no dependen de la carga de los nodos del clúster.

Cantidad limitada de subprocesos

Permitir el administrador del procesador de DAG (la parte del programador que procesa archivos DAG) usar solo una cantidad limitada de subprocesos podría afectar el tiempo de análisis del DAG.

Para resolver el problema, anula las siguientes opciones de configuración de Airflow:

  • Para Airflow 1.10.12 y versiones anteriores, anula el Parámetro max_threads:

    Sección Clave Valor Notas
    scheduler max_threads NUMBER_OF_CORES_IN_MACHINE - 1 Reemplaza NUMBER_OF_CORES_IN_MACHINE por la cantidad de núcleos
    . en las máquinas de los nodos trabajadores.
  • Para Airflow 1.10.14 y versiones posteriores, anula el Parámetro parsing_processes:

    Sección Clave Valor Notas
    scheduler parsing_processes NUMBER_OF_CORES_IN_MACHINE - 1 Reemplaza NUMBER_OF_CORES_IN_MACHINE por la cantidad de núcleos
    . en las máquinas de los nodos trabajadores.

Distribución de números y horarios de las tareas

Airflow es conocido por tener problemas para programar una gran cantidad de tareas pequeñas. En tales casos, debes elegir una cantidad menor de tareas más consolidadas.

Programar una gran cantidad de DAG o tareas al mismo tiempo también puede ser una posible fuente de problemas. Para evitar este problema, distribuye tus tareas de manera más uniforme en el tiempo.

Soluciona problemas de tareas en ejecución y en cola

En las siguientes secciones, se describen los síntomas y las posibles soluciones para algunos problemas comunes con las tareas en cola y en ejecución.

Las listas de tareas en cola son demasiado largas

En algunos casos, una lista de tareas en cola podría ser demasiado larga para el programador. Para obtener información sobre cómo optimizar los parámetros de trabajador y Celery, lee sobre cómo escalar tu entorno de Cloud Composer junto con tu negocio.

Usa la función TimeTable del programador de Airflow

A partir de Airflow 2.2, puedes definir un horario para un DAG con un nueva función llamada TimeTable.

Puedes definir un horario con uno de los siguientes métodos:

Recursos de clúster limitados

Esta sección solo se aplica a Cloud Composer 1.

Es posible que experimentes problemas de rendimiento si el clúster de GKE de tu entorno es demasiado pequeño para todos tus DAG y tareas. En este caso, prueba una de las siguientes soluciones:

  • Crea un entorno nuevo con un tipo de máquina que proporcione más rendimiento y migra tus DAG a él
  • Crea más entornos de Cloud Composer y divide los DAG entre ellos
  • Cambia el tipo de máquina de los nodos de GKE, como se describe en Actualización del tipo de máquina de los nodos de GKE. Dado que este procedimiento es propenso a errores, es la opción menos recomendada.
  • Actualiza el tipo de máquina de la instancia de Cloud SQL que ejecuta la base de datos de Airflow en tu entorno, por ejemplo, mediante los comandos gcloud composer environments update. El bajo rendimiento de la base de datos de Airflow puede ser el motivo por el que el programador es lento.

Evita la programación de tareas durante los períodos de mantenimiento

Puedes definir períodos de mantenimiento específicos para tus en un entorno de nube. Durante estos períodos, se producen los eventos de mantenimiento para Cloud SQL y GKE.

Haz que el programador de Airflow ignore archivos innecesarios

Puedes mejorar el rendimiento del programador de Airflow si omites los archivos innecesarios en la carpeta de DAG. El programador de Airflow ignora los archivos y carpetas especificados en el archivo .airflowignore.

Para que el programador de Airflow ignore archivos innecesarios, haz lo siguiente:

  1. Crea un archivo .airflowignore.
  2. En este archivo, se muestra una lista de los archivos y las carpetas que se deben ignorar.
  3. Sube este archivo a la carpeta /dags de tu en el bucket de tu entorno.

Para obtener más información sobre el formato de archivo .airflowignore, consulta Documentación de Airflow.

El programador de Airflow procesa los DAG pausados

Los usuarios de Airflow pausan los DAG para evitar su ejecución. Esto ahorra a los trabajadores de Airflow ciclos de procesamiento.

El programador de Airflow continuará analizando los DAG pausados. Si realmente quieres mejorar el rendimiento del programador de Airflow, usar .airflowignore o borrar los elementos detenidos DAG de la carpeta de DAG.

Uso de “wait_for_downstream” en tus DAG

Si configuras el parámetro wait_for_downstream en True en tus DAG, para que una tarea tenga éxito, todas las tareas que se encuentran en una etapa posterior de esta tarea también deben tener éxito. Significa que la ejecución de las tareas que pertenecen a una ejecución de DAG determinada puede ralentizarse mediante la ejecución de tareas de la ejecución del DAG anterior. Obtén más información al respecto en la documentación de Airflow.

Las tareas en cola durante demasiado tiempo se cancelarán y se reprogramarán

Si una tarea de Airflow se mantiene en la cola por mucho tiempo, el programador lo volveremos a programar para su ejecución (en versiones de Airflow anteriores a la 2.3.1, la tarea también se marca como con errores y se vuelve a intentar si es apta para un reintento).

Una forma de observar los síntomas de esta situación es mirar el gráfico con la cantidad de tareas en cola Pestaña (“Monitoring” en la IU de Cloud Composer) y si los aumentos repentinos en este gráfico no disminuyen en unas dos horas, es más probable que las tareas se reprogramen (sin registros) seguidas de "Las tareas adoptadas aún estaban pendientes ..." de registro en los registros del programador. En esos casos, es posible que veas el mensaje "No se encontró el archivo de registro...". mensaje en los registros de tareas de Airflow porque la tarea no se ejecutó.

En general, este comportamiento es el esperado, y la siguiente instancia de la tarea está diseñada para ejecutarse de acuerdo con el cronograma. Si observas que hay en tus entornos de Cloud Composer, entonces podría significar de que no haya suficientes trabajadores de Airflow en su entorno las tareas programadas.

Resolución: Para resolver este problema, debes asegurarte de que siempre haya capacidad en trabajadores de Airflow para ejecutar tareas en cola. Por ejemplo, puedes aumentar la cantidad de trabajadores o worker_concurrency. También puedes ajustar el paralelismo o los grupos para y evitar poner en cola las tareas más que la capacidad que tienes.

De forma esporádica, las tareas inactivas pueden bloquear la ejecución de un DAG específico

En casos habituales, el programador de Airflow debe poder lidiar con las situaciones en los que hay tareas inactivas en la cola y, por algún motivo, no están para ejecutarlas correctamente (p.ej., un DAG al que pertenecen las tareas inactivas se borró).

Si el programador no borra definitivamente estas tareas inactivas, es posible que debas borrarlos manualmente. Puedes hacerlo, por ejemplo, en la IU de Airflow, navega a (Menú &gt; Navegador &gt; Instancias de tareas), busca las tareas en cola que pertenezcan a un DAG inactivo y bórralas.

Para resolver este problema, actualiza tu entorno a Cloud Composer versión 2.1.12 o posterior.

Enfoque de Cloud Composer en el parámetro [scheduler]min_file_process_interval

Cloud Composer cambia la forma en que el programador de Airflow usa [scheduler]min_file_process_interval.

Airflow 1

En el caso de Cloud Composer con Airflow 1, los usuarios pueden establecer el valor de [scheduler]min_file_process_interval entre 0 y 600 segundos. Valores superiores a 600 segundos generan los mismos resultados que si [scheduler]min_file_process_interval se estableciera en 600 segundos.

Airflow 2

En Airflow 2, [scheduler]min_file_process_interval solo se puede usar con versiones 1.19.9 y 2.0.26 o más recientes

  • Versiones de Cloud Composer anteriores a 1.19.9 y 2.0.26

    En estas versiones, se ignora [scheduler]min_file_process_interval.

  • Cloud Composer 1.19.9, 2.0.26 o versiones más recientes

    El programador de Airflow se reinicia después de una cierta cantidad de veces que todos los DAG están programadas y el parámetro [scheduler]num_runs controla cuántas veces lo realiza el programador. Cuando el programador alcanza [scheduler]num_runs bucles de programación, es reiniciado: el programador es un componente sin estado y ese reinicio es un mecanismo de reparación automática para cualquier problema que el Programador pueda experimentar. Si no se especifica, el valor predeterminado valor de [scheduler]num_runs se aplica, que es 5,000.

    Se puede usar [scheduler]min_file_process_interval para configurar la frecuencia Se realiza el análisis del DAG, pero este parámetro no puede superar el tiempo requerido. para que un programador realice [scheduler]num_runs cuando programes tus DAG.

Ajusta la configuración de Airflow

Airflow proporciona opciones de configuración que controlan cuántas tareas y DAG de Airflow pueden ejecutarse al mismo tiempo. Para establecer estas opciones de configuración, anular sus valores para tu entorno.

  • Simultaneidad de los trabajadores

    El parámetro [celery]worker_concurrency controla la cantidad máxima de tareas que un trabajador de Airflow puede ejecutar al mismo tiempo. Si multiplicas el valor de este parámetro por la cantidad de trabajadores de Airflow en tu entorno de Cloud Composer, obtendrás la cantidad máxima de tareas que se pueden ejecutar en un momento determinado de tu entorno. Esta está limitada por la opción [core]parallelism de configuración de Airflow, que se describe más adelante.

    En los entornos de Cloud Composer 2, el valor predeterminado de [celery]worker_concurrency se calcula automáticamente

    • Para las versiones de Airflow 2.3.3 y posteriores, se establece [celery]worker_concurrency a un valor mínimo de 32, 12 * worker_CPU y 8 * worker_memory.

    • Para las versiones de Airflow 2.2.5 o anteriores, [celery]worker_concurrency es establecido en 12 * cantidad de trabajadores y CPU virtuales.

  • Ejecuciones máximas de DAG activas

    La opción de configuración [core]max_active_runs_per_dag de Airflow controla la cantidad máxima de ejecuciones activas del DAG por DAG. El programador no crea más ejecuciones de DAG si alcanza este límite.

    Si este parámetro se establece de forma incorrecta, puedes encontrar un problema en el que el programador regula la ejecución del DAG porque no puede crear más instancias de ejecución de DAG en un momento determinado.

  • Cantidad máxima de tareas activas por DAG

    La opción de configuración [core]max_active_tasks_per_dag de Airflow controla la cantidad máxima de instancias de tareas que se pueden ejecutar de forma simultánea en cada DAG. Es un parámetro a nivel del DAG.

    Si este parámetro se establece de manera incorrecta, es posible que tengas un problema en el que la ejecución de una sola instancia de DAG sea lenta porque solo hay una cantidad limitada de tareas de DAG que se pueden ejecutar en un momento determinado.

    Solución: Aumentar [core]max_active_tasks_per_dag.

  • Paralelismo y tamaño del grupo

    La opción de configuración [core]parallelism de Airflow controla cuántas tareas puede poner en cola el programador de Airflow en la cola del ejecutor después de que se cumplen todas las dependencias para estas tareas.

    Este es un parámetro global para toda la configuración de Airflow.

    Las tareas se ponen en cola y se ejecutan dentro de un grupo. Los entornos de Cloud Composer usan solo un grupo. El tamaño de este grupo controla cuántas tareas puede poner en cola el programador para su ejecución en un momento determinado. Si el tamaño del grupo es demasiado pequeño, el programador no puede poner en cola las tareas para su ejecución a pesar de los límites, que se definen mediante la opción de configuración [core]parallelism y [celery]worker_concurrency multiplicada por la cantidad de trabajadores de Airflow, que todavía no se cumplen.

    Puedes configurar el tamaño del grupo en la IU de Airflow (Menú > Administrador > Grupos). Ajusta el tamaño del grupo al nivel del paralelismo que esperas en tu entorno.

    Por lo general, se establece [core]parallelism como un producto de una cantidad máxima de trabajadores y [celery]worker_concurrency.

El programador no programa los DAG debido a que se agotó el tiempo de espera del procesador de DAG

Para obtener más información sobre este problema, consulta Soluciona problemas de DAG.

Se marcaron las tareas como fallidas después de alcanzar dagrun_timeout

El programador marca las tareas que no han finalizado (en ejecución, programadas y en cola). como un error si una ejecución de DAG no finaliza dentro del dagrun_timeout (un parámetro de DAG).

Solución:

Síntomas de que la base de datos de Airflow está bajo presión de carga

A veces, es posible que veas en los registros del programador de Airflow la siguiente entrada de registro de advertencia:

Scheduler heartbeat got an exception: (_mysql_exceptions.OperationalError) (2006, "Lost connection to MySQL server at 'reading initial communication packet', system error: 0")"

También se pueden observar síntomas similares en los registros de trabajadores de Airflow:

Para MySQL:

(_mysql_exceptions.OperationalError) (2006, "Lost connection to MySQL server at
'reading initial communication packet', system error: 0")"

Para PostgreSQL:

psycopg2.OperationalError: connection to server at ... failed

Tales errores o advertencias pueden ser un síntoma de que la base de datos de Airflow abrumado por la cantidad de conexiones abiertas o de consultas ejecutadas al mismo tiempo, ya sea por programadores o por otros componentes de Airflow como trabajadores, activadores y servidores web.

Soluciones posibles:

El servidor web muestra el mensaje "Parece que el programador no se está ejecutando". advertencia

El programador informa las señales de monitoreo de funcionamiento a Airflow de forma regular. en la base de datos. Según esta información, el servidor web de Airflow determina si la El programador está activo.

A veces, si el programador tiene una gran carga, es posible que no pueda informar latido de su corazón cada [scheduler]scheduler-heartbeat-sec.

En ese caso, el servidor web de Airflow podría mostrar la siguiente advertencia:

The scheduler does not appear to be running. Last heartbeat was received <X>
seconds ago.

Soluciones posibles:

Soluciones alternativas a los problemas que se encontraron durante el reabastecimiento de DAG

A veces, es posible que desees volver a ejecutar DAG que ya se ejecutaron. Puedes hacerlo con la herramienta de línea de comandos de Airflow de la siguiente manera:

Airflow 1

gcloud composer environments run \
  ENVIRONMENT_NAME \
  --location LOCATION \
  backfill -- -B \
  -s START_DATE \
  -e END_DATE \
  DAG_NAME

Si quieres volver a ejecutar solo tareas fallidas para un DAG específico, usa también --rerun_failed_tasks.

Airflow 2

gcloud composer environments run \
  ENVIRONMENT_NAME \
  --location LOCATION \
   dags backfill -- -B \
   -s START_DATE \
   -e END_DATE \
   DAG_NAME

Si quieres volver a ejecutar solo tareas fallidas para un DAG específico, usa también --rerun-failed-tasks.

Reemplaza lo siguiente:

  • ENVIRONMENT_NAME por el nombre del entorno.
  • LOCATION por la región en la que se encuentra el entorno
  • START_DATE por un valor para el parámetro start_date de DAG, en el formato YYYY-MM-DD.
  • END_DATE por un valor para el parámetro end_date de DAG, en el formato YYYY-MM-DD.
  • DAG_NAME por el nombre del DAG.

La operación de reabastecimiento podría generar una situación de interbloqueo El reabastecimiento no es posible porque hay un bloqueo en una tarea. Por ejemplo:

2022-11-08 21:24:18.198 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.201 CET -------- --------- -------- ------------
2022-11-08 21:24:18.202 CET 2022-11-08 21:24:18.203 CET These tasks are deadlocked:
2022-11-08 21:24:18.203 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.204 CET ----------------------- ----------- ----------------------------------- ------------
2022-11-08 21:24:18.204 CET <DAG name> <Task name> backfill__2022-10-27T00:00:00+00:00 1
2022-11-08 21:24:19.249 CET Command exited with return code 1
...
2022-11-08 21:24:19.348 CET Failed to execute job 627927 for task backfill

En algunos casos, puedes usar las siguientes soluciones para evitar los interbloqueos:

  • Inhabilita el miniprogramador anulando la [core]schedule-after-task-execution a False.

  • Ejecuta reabastecimientos para períodos más específicos. Por ejemplo, establece START_DATE. y END_DATE para especificar un período de solo 1 día.

¿Qué sigue?