Soluciona problemas del programador de Airflow

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

En esta página, se proporcionan información y pasos para solucionar problemas comunes con los programadores de Airflow y los procesadores de DAG.

Identifica el origen del problema

Para comenzar a solucionar problemas, identifica si el problema ocurre en los siguientes casos:

  • Durante el análisis del DAG, mientras un procesador de DAG de Airflow analiza el DAG
  • En el momento de la ejecución, mientras un programador de Airflow procesa el DAG

Para obtener más información sobre el tiempo de análisis y el tiempo de ejecución, consulta la Diferencia entre el tiempo de análisis y de ejecución del DAG.

Cómo inspeccionar problemas de procesamiento del DAG

  1. Inspecciona los registros del procesador de DAG.
  2. Verifica los tiempos de análisis del DAG.

Supervisa tareas en ejecución y en cola

Para verificar si hay tareas atascadas en una cola, sigue estos pasos.

  1. En la consola de Google Cloud , ve a la página Entornos.

    Ir a Entornos

  2. En la lista de entornos, haz clic en el nombre de tu entorno. Se abrirá la página Detalles del entorno.

  3. Ve a la pestaña Monitoring.

  4. En la pestaña Monitoring, revisa el gráfico Tareas de Airflow en la sección Ejecuciones de DAG y, luego, identifica posibles problemas. Las tareas de Airflow son tareas que se encuentran en estado de cola en Airflow y pueden ir a la cola de agentes de Celery o del ejecutor de Kubernetes. Las tareas en cola de Celery son instancias de tareas que se ponen en la cola de agentes de Celery.

Soluciona problemas durante el análisis de DAG

En las siguientes secciones, se describen los síntomas y las posibles soluciones para algunos problemas comunes durante el análisis de DAG.

Distribución de números y horarios de las tareas

Airflow puede tener problemas cuando programa una gran cantidad de DAG o tareas al mismo tiempo. Para evitar problemas con la programación, puedes hacer lo siguiente:

  • Ajusta tus DAG para usar una cantidad menor de tareas más consolidadas.
  • Ajusta los intervalos de programación de tus DAGs para distribuir las ejecuciones de DAG de manera más uniforme con el tiempo.

Ajusta la configuración de Airflow

Airflow proporciona opciones de configuración que controlan cuántas tareas y DAG de Airflow pueden ejecutarse al mismo tiempo. Para establecer estas opciones de configuración, anula sus valores para tu entorno. También puedes establecer algunos de estos valores a nivel del DAG o de la tarea.

  • Simultaneidad de los trabajadores

    El parámetro [celery]worker_concurrency controla la cantidad máxima de tareas que un trabajador de Airflow puede ejecutar al mismo tiempo. Si multiplicas el valor de este parámetro por la cantidad de trabajadores de Airflow en tu entorno de Cloud Composer, obtendrás la cantidad máxima de tareas que se pueden ejecutar en un momento determinado de tu entorno. Este número está limitado por la opción de configuración [core]parallelism de Airflow, que se describe con más detalle.

    En los entornos de Cloud Composer 3, el valor predeterminado de [celery]worker_concurrency se calcula automáticamente según la cantidad de instancias de tareas simultáneas ligeras que puede admitir un trabajador. Esto significa que su valor depende de los límites de recursos del trabajador. El valor de simultaneidad del trabajador no depende de la cantidad de trabajadores en tu entorno.

  • Ejecuciones máximas de DAG activas

    La opción de configuración [core]max_active_runs_per_dag de Airflow controla la cantidad máxima de ejecuciones activas del DAG por DAG. El programador no crea más ejecuciones de DAG si alcanza este límite.

    Si este parámetro se establece de forma incorrecta, puedes encontrar un problema en el que el programador regula la ejecución del DAG porque no puede crear más instancias de ejecución de DAG en un momento determinado.

    También puedes establecer este valor a nivel del DAG con el parámetro max_active_runs.

  • Cantidad máxima de tareas activas por DAG

    La opción de configuración [core]max_active_tasks_per_dag de Airflow controla la cantidad máxima de instancias de tareas que se pueden ejecutar de forma simultánea en cada DAG.

    Si este parámetro se establece de manera incorrecta, es posible que tengas un problema en el que la ejecución de una sola instancia de DAG sea lenta porque solo hay una cantidad limitada de tareas de DAG que se pueden ejecutar en un momento determinado. En este caso, puedes aumentar el valor de esta opción de configuración.

    También puedes establecer este valor a nivel del DAG con el parámetro max_active_tasks.

    Puedes usar los parámetros max_active_tis_per_dag y max_active_tis_per_dagrun a nivel de la tarea para controlar cuántas instancias con un ID de tarea específico pueden ejecutarse por DAG y por ejecución de DAG.

  • Paralelismo y tamaño del grupo

    La opción de configuración [core]parallelism de Airflow controla cuántas tareas puede poner en cola el programador de Airflow en la cola del ejecutor después de que se cumplen todas las dependencias para estas tareas.

    Este es un parámetro global para toda la configuración de Airflow.

    Las tareas se ponen en cola y se ejecutan dentro de un grupo. Los entornos de Cloud Composer usan solo un grupo. El tamaño de este grupo controla cuántas tareas puede poner en cola el programador para su ejecución en un momento determinado. Si el tamaño del grupo es demasiado pequeño, el programador no puede poner en cola las tareas para su ejecución a pesar de los límites, que se definen mediante la opción de configuración [core]parallelism y [celery]worker_concurrency multiplicada por la cantidad de trabajadores de Airflow, que todavía no se cumplen.

    Puedes configurar el tamaño del grupo en la IU de Airflow (Menú > Administrador > Grupos). Ajusta el tamaño del grupo al nivel de paralelismo que esperas en tu entorno.

    Por lo general, [core]parallelism se establece como el producto de la cantidad máxima de trabajadores y [celery]worker_concurrency.

Soluciona problemas de tareas en ejecución y en cola

En las siguientes secciones, se describen los síntomas y las posibles soluciones para algunos problemas comunes con las tareas en cola y en ejecución.

No se ejecutan las ejecuciones de DAG

Síntoma:

Cuando se establece una fecha de programación para un DAG de forma dinámica, esto puede generar varios efectos secundarios inesperados. Por ejemplo:

  • La ejecución de un DAG siempre es en el futuro, y el DAG nunca se ejecuta.

  • Las ejecuciones de DAG anteriores se marcan como ejecutadas y correctas, a pesar de no haberse ejecutado.

Puedes encontrar más información en la documentación de Apache Airflow.

Soluciones posibles:

  • Sigue las recomendaciones de la documentación de Apache Airflow.

  • Establece start_date estático para los DAG. Como opción, puedes usar catchup=False para inhabilitar la ejecución del DAG para fechas anteriores.

  • Evita usar datetime.now() o days_ago(<number of days>), a menos que conozcas los efectos secundarios de este enfoque.

Usa la función TimeTable del programador de Airflow

Las tablas de tiempo están disponibles a partir de Airflow 2.2.

Puedes definir una tabla de tiempo para un DAG con uno de los siguientes métodos:

También puedes usar los horarios integrados.

Evita la programación de tareas durante los períodos de mantenimiento

Puedes definir períodos de mantenimiento para tu entorno de modo que el mantenimiento del entorno se realice fuera de los horarios en los que ejecutas tus DAGs. Aun así, puedes ejecutar tus DAG durante los períodos de mantenimiento, siempre y cuando aceptes que algunas tareas se puedan interrumpir y volver a intentar. Para obtener más información sobre cómo los períodos de mantenimiento afectan tu entorno, consulta Especifica períodos de mantenimiento.

Uso de “wait_for_downstream” en tus DAG

Si configuras el parámetro wait_for_downstream en True en tus DAG, para que una tarea tenga éxito, todas las tareas que se encuentran en una etapa posterior de esta tarea también deben tener éxito. Significa que la ejecución de las tareas que pertenecen a una ejecución de DAG determinada puede ralentizarse por la ejecución de tareas de la ejecución del DAG anterior. Obtén más información sobre este tema en la documentación de Airflow.

Las tareas que permanezcan en la cola durante demasiado tiempo se cancelarán y reprogramarán

Si una tarea de Airflow permanece en la cola durante demasiado tiempo, el programador la reprogramará para su ejecución después de que transcurra la cantidad de tiempo establecida en la opción de configuración [scheduler]task_queued_timeout de Airflow. El valor predeterminado es 2400. En las versiones de Airflow anteriores a la 2.3.1, la tarea también se marca como con errores y se reintenta si es apta para un reintento.

Una forma de observar los síntomas de esta situación es mirar el gráfico con la cantidad de tareas en cola (pestaña "Supervisión" en la IU de Cloud Composer). Si los picos en este gráfico no disminuyen en aproximadamente dos horas, es muy probable que las tareas se reprogramen (sin registros), seguidas de entradas de registro "Adopted tasks were still pending…" en los registros del programador. En estos casos, es posible que veas el mensaje "No se encontró el archivo de registro…" en los registros de tareas de Airflow porque no se ejecutó la tarea.

En general, este comportamiento es normal y la próxima instancia de la tarea programada se debe ejecutar según la programación. Si observas muchos casos de este tipo en tus entornos de Cloud Composer, es posible que no haya suficientes trabajadores de Airflow en tu entorno para procesar todas las tareas programadas.

Resolución: Para solucionar este problema, debes asegurarte de que siempre haya capacidad en los trabajadores de Airflow para ejecutar las tareas en cola. Por ejemplo, puedes aumentar la cantidad de trabajadores o worker_concurrency. También puedes ajustar el paralelismo o los grupos para evitar que se pongan en cola más tareas de las que puedes procesar.

Las tareas que se atascan en la cola pueden bloquear la ejecución de un DAG específico.

Para solucionar este problema, actualiza tu entorno a la versión 2.1.12 o posterior de Cloud Composer.

En casos normales, el programador de Airflow debería poder manejar situaciones en las que hay tareas en la cola y, por algún motivo, no es posible ejecutarlas correctamente (por ejemplo, cuando se borró un DAG al que pertenecen estas tareas).

Si el programador no borra estas tareas, es posible que debas borrarlas manualmente. Puedes hacerlo, por ejemplo, en la IU de Airflow (Menú > Navegador > Instancias de tareas), buscar tareas en cola y borrarlas.

Enfoque de Cloud Composer para el parámetro min_file_process_interval

Cloud Composer cambia la forma en que el programador de Airflow usa [scheduler]min_file_process_interval.

En las versiones de Cloud Composer anteriores a la 2.0.26, se ignora [scheduler]min_file_process_interval.

En las versiones de Cloud Composer posteriores a la 2.0.26, haz lo siguiente:

El programador de Airflow se reinicia después de que se programan todos los DAG una cierta cantidad de veces, y el parámetro [scheduler]num_runs controla cuántas veces lo hace el programador. Cuando el programador alcanza los bucles de programación de [scheduler]num_runs, se reinicia. El programador es un componente sin estado, y este reinicio es un mecanismo de reparación automática para cualquier problema que pueda experimentar el programador. El valor predeterminado de [scheduler]num_runs es 5,000.

[scheduler]min_file_process_interval se puede usar para configurar la frecuencia con la que se realiza el análisis del DAG, pero este parámetro no puede ser más largo que el tiempo que requiere un programador para realizar bucles de [scheduler]num_runs cuando programa tus DAGs.

Marcar tareas como fallidas después de alcanzar dagrun_timeout

El programador marca como fallidas las tareas que no se completaron (en ejecución, programadas y en cola) si una ejecución de DAG no finaliza dentro de dagrun_timeout (un parámetro del DAG).

Solución:

Síntomas de que la base de datos de Airflow está bajo una carga pesada

A veces, en los registros del programador de Airflow, es posible que veas la siguiente entrada de registro de advertencia:

Scheduler heartbeat got an exception: (_mysql_exceptions.OperationalError) (2006, "Lost connection to MySQL server at 'reading initial communication packet', system error: 0")"

También se pueden observar síntomas similares en los registros de trabajador de Airflow:

Para MySQL:

(_mysql_exceptions.OperationalError) (2006, "Lost connection to MySQL server at
'reading initial communication packet', system error: 0")"

En PostgreSQL, haz lo siguiente:

psycopg2.OperationalError: connection to server at ... failed

Estos errores o advertencias pueden ser un síntoma de que la base de datos de Airflow está sobrecargada por la cantidad de conexiones abiertas o la cantidad de consultas ejecutadas al mismo tiempo, ya sea por los programadores o por otros componentes de Airflow, como los trabajadores, los activadores y los servidores web.

Soluciones posibles:

El servidor web muestra la advertencia "Parece que el programador no se está ejecutando"

El programador informa su señal de monitoreo de funcionamiento periódicamente a la base de datos de Airflow. Según esta información, el servidor web de Airflow determina si el programador está activo.

A veces, si el programador está bajo una carga pesada, es posible que no pueda informar su latido cada [scheduler]scheduler_heartbeat_sec.

En tal situación, es posible que el servidor web de Airflow muestre la siguiente advertencia:

The scheduler does not appear to be running. Last heartbeat was received <X>
seconds ago.

Soluciones posibles:

  • Aumenta los recursos de CPU y memoria del programador.

  • Optimiza tus DAG para que su análisis y programación sean más rápidos y no consuman demasiados recursos del programador.

  • Evita usar variables globales en los DAG de Airflow. En su lugar, usa variables de entorno y variables de Airflow.

  • Aumenta el valor de la opción de configuración de Airflow [scheduler]scheduler_health_check_threshold para que el servidor web espere más tiempo antes de informar la falta de disponibilidad del programador.

Soluciones alternativas para los problemas que se producen durante el reabastecimiento de DAG

A veces, es posible que desees volver a ejecutar los DAGs que ya se ejecutaron. Puedes hacerlo con un comando de la CLI de Airflow de la siguiente manera:

gcloud composer environments run \
  ENVIRONMENT_NAME \
  --location LOCATION \
   dags backfill -- -B \
   -s START_DATE \
   -e END_DATE \
   DAG_NAME

Para volver a ejecutar solo las tareas fallidas de un DAG específico, también usa el argumento --rerun-failed-tasks.

Reemplaza lo siguiente:

  • ENVIRONMENT_NAME por el nombre del entorno.
  • LOCATION por la región en la que se encuentra el entorno.
  • START_DATE con un valor para el parámetro start_date del DAG, en el formato YYYY-MM-DD.
  • END_DATE con un valor para el parámetro end_date del DAG, en el formato YYYY-MM-DD.
  • DAG_NAME por el nombre del DAG.

A veces, la operación de relleno puede generar una situación de bloqueo en la que no es posible realizar el relleno porque hay un bloqueo en una tarea. Por ejemplo:

2022-11-08 21:24:18.198 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.201 CET -------- --------- -------- ------------
2022-11-08 21:24:18.202 CET 2022-11-08 21:24:18.203 CET These tasks are deadlocked:
2022-11-08 21:24:18.203 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.204 CET ----------------------- ----------- ----------------------------------- ------------
2022-11-08 21:24:18.204 CET <DAG name> <Task name> backfill__2022-10-27T00:00:00+00:00 1
2022-11-08 21:24:19.249 CET Command exited with return code 1
...
2022-11-08 21:24:19.348 CET Failed to execute job 627927 for task backfill

En algunos casos, puedes usar las siguientes soluciones alternativas para superar los bloqueos:

  • Inhabilita el programador pequeño anulando [core]schedule_after_task_execution a False.

  • Ejecuta reabastecimientos para períodos más cortos. Por ejemplo, configura START_DATE y END_DATE para especificar un período de solo 1 día.

¿Qué sigue?