Solucionar problemas del programador de Airflow

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

En esta página se proporcionan pasos para solucionar problemas habituales con los programadores de Airflow y los procesadores de DAGs, así como información sobre ellos.

Identificar la fuente del problema

Para empezar a solucionar el problema, identifique si ocurre en los siguientes casos:

  • En el momento del análisis del DAG, mientras un procesador de DAG de Airflow analiza el DAG
  • En tiempo de ejecución, mientras un programador de Airflow procesa el DAG

Para obtener más información sobre el tiempo de análisis y el tiempo de ejecución, consulta Diferencia entre el tiempo de análisis y el tiempo de ejecución de un DAG.

Inspeccionar problemas de procesamiento de DAGs

  1. Inspecciona los registros del procesador de DAG.
  2. Consulta los tiempos de análisis de los DAGs.

Monitorizar tareas en ejecución y en cola

Para comprobar si tienes tareas atascadas en una cola, sigue estos pasos.

  1. En la Google Cloud consola, ve a la página Entornos.

    Ir a Entornos

  2. En la lista de entornos, haz clic en el nombre del entorno. Se abrirá la página Detalles del entorno.

  3. Vaya a la pestaña Monitorización.

  4. En la pestaña Monitorización, consulta el gráfico Tareas de Airflow de la sección Ejecuciones de DAG e identifica posibles problemas. Las tareas de Airflow son tareas que están en estado de cola en Airflow. Pueden ir a la cola de intermediación de Celery o de Kubernetes Executor. Las tareas en cola de Celery son instancias de tareas que se colocan en la cola del broker de Celery.

Solucionar problemas durante el tiempo de análisis de DAGs

En las siguientes secciones se describen los síntomas y las posibles soluciones de algunos problemas habituales que se producen durante el tiempo de análisis de los DAGs.

Número y distribución temporal de las tareas

Airflow puede tener problemas al programar un gran número de DAGs o tareas al mismo tiempo. Para evitar problemas con la programación, puedes hacer lo siguiente:

  • Ajusta tus DAGs para usar un número menor de tareas más consolidadas.
  • Ajusta los intervalos de programación de tus DAGs para distribuir las ejecuciones de los DAGs de forma más uniforme a lo largo del tiempo.

Configuración de escalado de Airflow

Airflow proporciona opciones de configuración que controlan cuántas tareas y DAGs puede ejecutar Airflow al mismo tiempo. Para definir estas opciones de configuración, sustituye sus valores por los de tu entorno. También puedes definir algunos de estos valores a nivel de DAG o de tarea.

  • Simultaneidad de los trabajadores

    El parámetro [celery]worker_concurrency controla el número máximo de tareas que un trabajador de Airflow puede ejecutar al mismo tiempo. Si multiplicas el valor de este parámetro por el número de trabajadores de Airflow de tu entorno de Cloud Composer, obtendrás el número máximo de tareas que se pueden ejecutar en un momento dado en tu entorno. Este número está limitado por la [core]parallelismopción de configuración de Airflow, que se describe más adelante.

    En los entornos de Cloud Composer 3, el valor predeterminado de [celery]worker_concurrency se calcula automáticamente en función del número de instancias de tareas simultáneas ligeras que puede alojar un trabajador. Esto significa que su valor depende de los límites de recursos de los trabajadores. El valor de simultaneidad de los trabajadores no depende del número de trabajadores de tu entorno.

  • Número máximo de ejecuciones de DAG activas

    La opción de configuración de [core]max_active_runs_per_dagAirflow controla el número máximo de ejecuciones de DAGs activas por DAG. El programador no crea más ejecuciones de DAG si alcanza este límite.

    Si este parámetro se define de forma incorrecta, puede que el programador limite la ejecución de DAG porque no puede crear más instancias de ejecución de DAG en un momento dado.

    También puede definir este valor a nivel de DAG con el parámetro max_active_runs.

  • Número máximo de tareas activas por DAG

    La opción de configuración [core]max_active_tasks_per_dag de Airflow controla el número máximo de instancias de tareas que se pueden ejecutar simultáneamente en cada DAG.

    Si este parámetro se define incorrectamente, puede que tengas problemas y la ejecución de una sola instancia de DAG sea lenta porque solo se puede ejecutar un número limitado de tareas de DAG en un momento dado. En este caso, puede aumentar el valor de esta opción de configuración.

    También puede definir este valor a nivel de DAG con el parámetro max_active_tasks.

    Puedes usar los parámetros max_active_tis_per_dag y max_active_tis_per_dagrun a nivel de tarea para controlar cuántas instancias con un ID de tarea específico se pueden ejecutar por DAG y por ejecución de DAG.

  • Paralelismo y tamaño del grupo

    La opción de configuración [core]parallelism Airflow controla cuántas tareas puede poner en cola el programador de Airflow en la cola del ejecutor después de que se hayan cumplido todas las dependencias de estas tareas.

    Se trata de un parámetro global de toda la configuración de Airflow.

    Las tareas se ponen en cola y se ejecutan en un grupo. Los entornos de Cloud Composer solo usan un grupo. El tamaño de este grupo controla cuántas tareas puede poner en cola el programador para ejecutarse en un momento dado. Si el tamaño del grupo es demasiado pequeño, el programador no podrá poner en cola las tareas para su ejecución, aunque aún no se hayan alcanzado los umbrales, que se definen mediante la opción de configuración [core]parallelism y la opción de configuración [celery]worker_concurrency multiplicada por el número de trabajadores de Airflow.

    Puedes configurar el tamaño del grupo en la interfaz de usuario de Airflow (Menú > Administrar > Grupos). Ajusta el tamaño del grupo al nivel de paralelismo que esperas en tu entorno.

    Normalmente, [core]parallelism se define como el producto del número máximo de trabajadores y [celery]worker_concurrency.

Solucionar problemas con tareas en ejecución y en cola

En las siguientes secciones se describen los síntomas y las posibles soluciones de algunos problemas habituales con las tareas en ejecución y en cola.

No se ejecutan las ejecuciones de DAG

Síntoma:

Cuando se define dinámicamente una fecha de programación para un DAG, pueden producirse varios efectos secundarios inesperados. Por ejemplo:

  • Una ejecución de DAG siempre se realiza en el futuro y el DAG nunca se ejecuta.

  • Las ejecuciones de DAGs anteriores se marcan como ejecutadas y completadas correctamente aunque no se hayan ejecutado.

Puede consultar más información en la documentación de Apache Airflow.

Posibles soluciones:

  • Sigue las recomendaciones de la documentación de Apache Airflow.

  • Define start_date estáticos para los DAGs. También puedes usar catchup=False para inhabilitar la ejecución del DAG en fechas anteriores.

  • Evita usar datetime.now() o days_ago(<number of days>) a menos que conozcas los efectos secundarios de este enfoque.

Usar la función TimeTable del programador de Airflow

Las tablas de tiempos están disponibles a partir de Airflow 2.2.

Puedes definir una tabla de horas para un DAG con uno de los siguientes métodos:

También puedes usar Horarios integrados.

Evitar programar tareas durante las ventanas de mantenimiento

Puedes definir ventanas de mantenimiento para tu entorno de forma que el mantenimiento se realice fuera de los momentos en los que ejecutas tus DAGs. Puedes seguir ejecutando tus DAGs durante las ventanas de mantenimiento, siempre que aceptes que algunas tareas se puedan interrumpir y volver a intentar. Para obtener más información sobre cómo afectan las ventanas de mantenimiento a tu entorno, consulta Especificar ventanas de mantenimiento.

Uso de "wait_for_downstream" en tus DAGs

Si asignas el valor True al parámetro wait_for_downstream en tus DAGs, para que una tarea se complete correctamente, todas las tareas que estén inmediatamente más abajo de esta tarea también deben completarse correctamente. Esto significa que la ejecución de las tareas pertenecientes a una ejecución de DAG determinada puede ralentizarse por la ejecución de las tareas de la ejecución de DAG anterior. Consulta más información en la documentación de Airflow.

Las tareas que permanezcan en la cola durante demasiado tiempo se cancelarán y se volverán a programar

Si una tarea de Airflow permanece en la cola durante demasiado tiempo, el programador la volverá a programar para que se ejecute una vez que haya transcurrido el tiempo establecido en la opción de configuración [scheduler]task_queued_timeout de Airflow. El valor predeterminado es 2400. En las versiones de Airflow anteriores a la 2.3.1, la tarea también se marca como fallida y se vuelve a intentar si cumple los requisitos para ello.

Una forma de observar los síntomas de esta situación es consultar el gráfico con el número de tareas en cola (pestaña "Monitorización" de la interfaz de usuario de Cloud Composer). Si los picos de este gráfico no bajan en unas dos horas, es muy probable que las tareas se vuelvan a programar (sin registros) y que aparezcan entradas de registro "Adopted tasks were still pending ..." en los registros del programador. En estos casos, puede que veas el mensaje "No se ha encontrado el archivo de registro..." en los registros de tareas de Airflow porque la tarea no se ha ejecutado.

Por lo general, este comportamiento es el esperado y la siguiente instancia de la tarea programada se ejecutará según la programación. Si observas muchos casos de este tipo en tus entornos de Cloud Composer, puede que no haya suficientes trabajadores de Airflow en tu entorno para procesar todas las tareas programadas.

Solución: Para solucionar este problema, debes asegurarte de que siempre haya capacidad en los trabajadores de Airflow para ejecutar las tareas en cola. Por ejemplo, puedes aumentar el número de trabajadores o worker_concurrency. También puedes ajustar el paralelismo o los grupos para evitar que se pongan en cola más tareas de las que puedes gestionar.

Las tareas que se quedan bloqueadas en la cola pueden impedir la ejecución de un DAG específico

Para solucionar este problema, actualiza tu entorno a Cloud Composer 2.1.12 o una versión posterior.

En casos normales, el programador de Airflow debería poder gestionar situaciones en las que haya tareas en la cola y, por algún motivo, no sea posible ejecutarlas correctamente (por ejemplo, cuando se haya eliminado un DAG al que pertenezcan estas tareas).

Si el programador no purga estas tareas, es posible que tengas que eliminarlas manualmente. Por ejemplo, puedes hacerlo en la interfaz de usuario de Airflow (Menú > Navegador > Instancias de tareas), buscar las tareas en cola y eliminarlas.

Enfoque de Cloud Composer para el parámetro min_file_process_interval

Cloud Composer cambia la forma en que el [scheduler]min_file_process_interval utiliza el programador de Airflow.

En las versiones de Cloud Composer anteriores a la 2.0.26, se ignora [scheduler]min_file_process_interval.

En las versiones de Cloud Composer posteriores a la 2.0.26:

El programador de Airflow se reinicia después de que se hayan programado todos los DAG un número determinado de veces. El parámetro [scheduler]num_runs controla cuántas veces lo hace el programador. Cuando el programador alcanza [scheduler]num_runs bucles de programación, se reinicia. El programador es un componente sin estado, por lo que este reinicio es un mecanismo de reparación automática para cualquier problema que pueda experimentar el programador. El valor predeterminado de [scheduler]num_runs es 5000.

[scheduler]min_file_process_interval se puede usar para configurar la frecuencia con la que se analiza el DAG, pero este parámetro no puede ser más largo que el tiempo necesario para que un programador realice bucles [scheduler]num_runs al programar los DAGs.

Marcar tareas como fallidas después de alcanzar dagrun_timeout

El programador marca como fallidas las tareas que no se han completado (en ejecución, programadas y en cola) si una ejecución de DAG no finaliza en dagrun_timeout (un parámetro de DAG).

Solución:

Síntomas de que la base de datos de Airflow está sometida a una carga pesada

A veces, en los registros del programador de Airflow, puede que veas la siguiente entrada de registro de advertencia:

Scheduler heartbeat got an exception: (_mysql_exceptions.OperationalError) (2006, "Lost connection to MySQL server at 'reading initial communication packet', system error: 0")"

También se pueden observar síntomas similares en los registros de los trabajadores de Airflow:

En MySQL:

(_mysql_exceptions.OperationalError) (2006, "Lost connection to MySQL server at
'reading initial communication packet', system error: 0")"

En PostgreSQL:

psycopg2.OperationalError: connection to server at ... failed

Estos errores o advertencias pueden ser un síntoma de que la base de datos de Airflow está sobrecargada por el número de conexiones abiertas o el número de consultas ejecutadas al mismo tiempo, ya sea por los programadores o por otros componentes de Airflow, como los trabajadores, los activadores y los servidores web.

Posibles soluciones:

El servidor web muestra la advertencia "Parece que el programador no está funcionando"

El programador envía su señal de latido de forma periódica a la base de datos de Airflow. Basándose en esta información, el servidor web de Airflow determina si el programador está activo.

A veces, si el programador está sometido a una carga pesada, es posible que no pueda informar de su latido cada [scheduler]scheduler_heartbeat_sec.

En esta situación, el servidor web de Airflow puede mostrar la siguiente advertencia:

The scheduler does not appear to be running. Last heartbeat was received <X>
seconds ago.

Posibles soluciones:

  • Aumenta los recursos de CPU y memoria del programador.

  • Optimiza tus DAGs para que su análisis y programación sean más rápidos y no consuman demasiados recursos del programador.

  • Evita usar variables globales en los DAGs de Airflow. En su lugar, utilice variables de entorno y variables de Airflow.

  • Aumenta el valor de la opción de configuración de Airflow [scheduler]scheduler_health_check_threshold para que el servidor web espere más tiempo antes de informar de que el programador no está disponible.

Soluciones alternativas para los problemas que se producen al rellenar DAGs

En ocasiones, es posible que quieras volver a ejecutar DAGs que ya se hayan ejecutado. Puedes hacerlo con un comando de la CLI de Airflow de la siguiente manera:

gcloud composer environments run \
  ENVIRONMENT_NAME \
  --location LOCATION \
   dags backfill -- -B \
   -s START_DATE \
   -e END_DATE \
   DAG_NAME

Para volver a ejecutar solo las tareas fallidas de un DAG específico, también puedes usar el argumento --rerun-failed-tasks.

Sustituye:

  • ENVIRONMENT_NAME con el nombre del entorno.
  • LOCATION con la región en la que se encuentra el entorno.
  • START_DATE con un valor para el parámetro start_date DAG, en formato YYYY-MM-DD.
  • END_DATE con un valor para el parámetro end_date DAG, en formato YYYY-MM-DD.
  • DAG_NAME con el nombre del DAG.

En ocasiones, la operación de relleno puede generar un interbloqueo en el que no se pueda realizar el relleno porque hay un bloqueo en una tarea. Por ejemplo:

2022-11-08 21:24:18.198 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.201 CET -------- --------- -------- ------------
2022-11-08 21:24:18.202 CET 2022-11-08 21:24:18.203 CET These tasks are deadlocked:
2022-11-08 21:24:18.203 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.204 CET ----------------------- ----------- ----------------------------------- ------------
2022-11-08 21:24:18.204 CET <DAG name> <Task name> backfill__2022-10-27T00:00:00+00:00 1
2022-11-08 21:24:19.249 CET Command exited with return code 1
...
2022-11-08 21:24:19.348 CET Failed to execute job 627927 for task backfill

En algunos casos, puedes usar las siguientes soluciones alternativas para evitar los interbloqueos:

  • Inhabilita la miniprogramación sustituyendo el [core]schedule_after_task_execution por False.

  • Ejecuta retroadaptaciones para intervalos de fechas más acotados. Por ejemplo, define START_DATE y END_DATE para especificar un periodo de solo 1 día.

Siguientes pasos