Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
En esta página se proporcionan pasos para solucionar problemas habituales con los programadores de Airflow y los procesadores de DAGs, así como información sobre ellos.
Identificar la fuente del problema
Para empezar a solucionar el problema, identifique si ocurre en los siguientes casos:
- En el momento del análisis del DAG, mientras un procesador de DAG de Airflow analiza el DAG
- En tiempo de ejecución, mientras un programador de Airflow procesa el DAG
Para obtener más información sobre el tiempo de análisis y el tiempo de ejecución, consulta Diferencia entre el tiempo de análisis y el tiempo de ejecución de un DAG.
Inspeccionar problemas de procesamiento de DAGs
Monitorizar tareas en ejecución y en cola
Para comprobar si tienes tareas atascadas en una cola, sigue estos pasos.
En la Google Cloud consola, ve a la página Entornos.
En la lista de entornos, haz clic en el nombre del entorno. Se abrirá la página Detalles del entorno.
Vaya a la pestaña Monitorización.
En la pestaña Monitorización, consulta el gráfico Tareas de Airflow de la sección Ejecuciones de DAG e identifica posibles problemas. Las tareas de Airflow son tareas que están en estado de cola en Airflow. Pueden ir a la cola de intermediación de Celery o de Kubernetes Executor. Las tareas en cola de Celery son instancias de tareas que se colocan en la cola del broker de Celery.
Solucionar problemas durante el tiempo de análisis de DAGs
En las siguientes secciones se describen los síntomas y las posibles soluciones de algunos problemas habituales que se producen durante el tiempo de análisis de los DAGs.
Número y distribución temporal de las tareas
Airflow puede tener problemas al programar un gran número de DAGs o tareas al mismo tiempo. Para evitar problemas con la programación, puedes hacer lo siguiente:
- Ajusta tus DAGs para usar un número menor de tareas más consolidadas.
- Ajusta los intervalos de programación de tus DAGs para distribuir las ejecuciones de los DAGs de forma más uniforme a lo largo del tiempo.
Configuración de escalado de Airflow
Airflow proporciona opciones de configuración que controlan cuántas tareas y DAGs puede ejecutar Airflow al mismo tiempo. Para definir estas opciones de configuración, sustituye sus valores por los de tu entorno. También puedes definir algunos de estos valores a nivel de DAG o de tarea.
Simultaneidad de los trabajadores
El parámetro
[celery]worker_concurrency
controla el número máximo de tareas que un trabajador de Airflow puede ejecutar al mismo tiempo. Si multiplicas el valor de este parámetro por el número de trabajadores de Airflow de tu entorno de Cloud Composer, obtendrás el número máximo de tareas que se pueden ejecutar en un momento dado en tu entorno. Este número está limitado por la[core]parallelism
opción de configuración de Airflow, que se describe más adelante.En los entornos de Cloud Composer 3, el valor predeterminado de
[celery]worker_concurrency
se calcula automáticamente en función del número de instancias de tareas simultáneas ligeras que puede alojar un trabajador. Esto significa que su valor depende de los límites de recursos de los trabajadores. El valor de simultaneidad de los trabajadores no depende del número de trabajadores de tu entorno.Número máximo de ejecuciones de DAG activas
La opción de configuración de
[core]max_active_runs_per_dag
Airflow controla el número máximo de ejecuciones de DAGs activas por DAG. El programador no crea más ejecuciones de DAG si alcanza este límite.Si este parámetro se define de forma incorrecta, puede que el programador limite la ejecución de DAG porque no puede crear más instancias de ejecución de DAG en un momento dado.
También puede definir este valor a nivel de DAG con el parámetro
max_active_runs
.Número máximo de tareas activas por DAG
La opción de configuración
[core]max_active_tasks_per_dag
de Airflow controla el número máximo de instancias de tareas que se pueden ejecutar simultáneamente en cada DAG.Si este parámetro se define incorrectamente, puede que tengas problemas y la ejecución de una sola instancia de DAG sea lenta porque solo se puede ejecutar un número limitado de tareas de DAG en un momento dado. En este caso, puede aumentar el valor de esta opción de configuración.
También puede definir este valor a nivel de DAG con el parámetro
max_active_tasks
.Puedes usar los parámetros
max_active_tis_per_dag
ymax_active_tis_per_dagrun
a nivel de tarea para controlar cuántas instancias con un ID de tarea específico se pueden ejecutar por DAG y por ejecución de DAG.Paralelismo y tamaño del grupo
La opción de configuración
[core]parallelism
Airflow controla cuántas tareas puede poner en cola el programador de Airflow en la cola del ejecutor después de que se hayan cumplido todas las dependencias de estas tareas.Se trata de un parámetro global de toda la configuración de Airflow.
Las tareas se ponen en cola y se ejecutan en un grupo. Los entornos de Cloud Composer solo usan un grupo. El tamaño de este grupo controla cuántas tareas puede poner en cola el programador para ejecutarse en un momento dado. Si el tamaño del grupo es demasiado pequeño, el programador no podrá poner en cola las tareas para su ejecución, aunque aún no se hayan alcanzado los umbrales, que se definen mediante la opción de configuración
[core]parallelism
y la opción de configuración[celery]worker_concurrency
multiplicada por el número de trabajadores de Airflow.Puedes configurar el tamaño del grupo en la interfaz de usuario de Airflow (Menú > Administrar > Grupos). Ajusta el tamaño del grupo al nivel de paralelismo que esperas en tu entorno.
Normalmente,
[core]parallelism
se define como el producto del número máximo de trabajadores y[celery]worker_concurrency
.
Solucionar problemas con tareas en ejecución y en cola
En las siguientes secciones se describen los síntomas y las posibles soluciones de algunos problemas habituales con las tareas en ejecución y en cola.
No se ejecutan las ejecuciones de DAG
Síntoma:
Cuando se define dinámicamente una fecha de programación para un DAG, pueden producirse varios efectos secundarios inesperados. Por ejemplo:
Una ejecución de DAG siempre se realiza en el futuro y el DAG nunca se ejecuta.
Las ejecuciones de DAGs anteriores se marcan como ejecutadas y completadas correctamente aunque no se hayan ejecutado.
Puede consultar más información en la documentación de Apache Airflow.
Posibles soluciones:
Sigue las recomendaciones de la documentación de Apache Airflow.
Define
start_date
estáticos para los DAGs. También puedes usarcatchup=False
para inhabilitar la ejecución del DAG en fechas anteriores.Evita usar
datetime.now()
odays_ago(<number of days>)
a menos que conozcas los efectos secundarios de este enfoque.
Usar la función TimeTable del programador de Airflow
Las tablas de tiempos están disponibles a partir de Airflow 2.2.
Puedes definir una tabla de horas para un DAG con uno de los siguientes métodos:
También puedes usar Horarios integrados.
Evitar programar tareas durante las ventanas de mantenimiento
Puedes definir ventanas de mantenimiento para tu entorno de forma que el mantenimiento se realice fuera de los momentos en los que ejecutas tus DAGs. Puedes seguir ejecutando tus DAGs durante las ventanas de mantenimiento, siempre que aceptes que algunas tareas se puedan interrumpir y volver a intentar. Para obtener más información sobre cómo afectan las ventanas de mantenimiento a tu entorno, consulta Especificar ventanas de mantenimiento.
Uso de "wait_for_downstream" en tus DAGs
Si asignas el valor True
al parámetro wait_for_downstream
en tus DAGs, para que una tarea se complete correctamente, todas las tareas que estén inmediatamente más abajo de esta tarea también deben completarse correctamente. Esto significa que la ejecución de las tareas pertenecientes a una ejecución de DAG determinada puede ralentizarse por la ejecución de las tareas de la ejecución de DAG anterior. Consulta más información en la documentación de Airflow.
Las tareas que permanezcan en la cola durante demasiado tiempo se cancelarán y se volverán a programar
Si una tarea de Airflow permanece en la cola durante demasiado tiempo, el programador la volverá a programar para que se ejecute una vez que haya transcurrido el tiempo establecido en la opción de configuración [scheduler]task_queued_timeout
de Airflow. El valor predeterminado es 2400
.
En las versiones de Airflow anteriores a la 2.3.1, la tarea también se marca como fallida y se vuelve a intentar si cumple los requisitos para ello.
Una forma de observar los síntomas de esta situación es consultar el gráfico con el número de tareas en cola (pestaña "Monitorización" de la interfaz de usuario de Cloud Composer). Si los picos de este gráfico no bajan en unas dos horas, es muy probable que las tareas se vuelvan a programar (sin registros) y que aparezcan entradas de registro "Adopted tasks were still pending ..." en los registros del programador. En estos casos, puede que veas el mensaje "No se ha encontrado el archivo de registro..." en los registros de tareas de Airflow porque la tarea no se ha ejecutado.
Por lo general, este comportamiento es el esperado y la siguiente instancia de la tarea programada se ejecutará según la programación. Si observas muchos casos de este tipo en tus entornos de Cloud Composer, puede que no haya suficientes trabajadores de Airflow en tu entorno para procesar todas las tareas programadas.
Solución: Para solucionar este problema, debes asegurarte de que siempre haya capacidad en los trabajadores de Airflow para ejecutar las tareas en cola. Por ejemplo, puedes aumentar el número de trabajadores o worker_concurrency. También puedes ajustar el paralelismo o los grupos para evitar que se pongan en cola más tareas de las que puedes gestionar.
Las tareas que se quedan bloqueadas en la cola pueden impedir la ejecución de un DAG específico
Para solucionar este problema, actualiza tu entorno a Cloud Composer 2.1.12 o una versión posterior.
En casos normales, el programador de Airflow debería poder gestionar situaciones en las que haya tareas en la cola y, por algún motivo, no sea posible ejecutarlas correctamente (por ejemplo, cuando se haya eliminado un DAG al que pertenezcan estas tareas).
Si el programador no purga estas tareas, es posible que tengas que eliminarlas manualmente. Por ejemplo, puedes hacerlo en la interfaz de usuario de Airflow (Menú > Navegador > Instancias de tareas), buscar las tareas en cola y eliminarlas.
Enfoque de Cloud Composer para el parámetro min_file_process_interval
Cloud Composer cambia la forma en que el [scheduler]min_file_process_interval
utiliza el programador de Airflow.
En las versiones de Cloud Composer anteriores a la 2.0.26, se ignora [scheduler]min_file_process_interval
.
En las versiones de Cloud Composer posteriores a la 2.0.26:
El programador de Airflow se reinicia después de que se hayan programado todos los DAG un número determinado de veces. El parámetro [scheduler]num_runs
controla cuántas veces lo hace el programador. Cuando el programador alcanza [scheduler]num_runs
bucles de programación, se reinicia. El programador es un componente sin estado, por lo que este reinicio es un mecanismo de reparación automática para cualquier problema que pueda experimentar el programador. El valor predeterminado de [scheduler]num_runs
es 5000.
[scheduler]min_file_process_interval
se puede usar para configurar la frecuencia con la que se analiza el DAG, pero este parámetro no puede ser más largo que el tiempo necesario para que un programador realice bucles [scheduler]num_runs
al programar los DAGs.
Marcar tareas como fallidas después de alcanzar dagrun_timeout
El programador marca como fallidas las tareas que no se han completado (en ejecución, programadas y en cola) si una ejecución de DAG no finaliza en dagrun_timeout
(un parámetro de DAG).
Solución:
Extiende
dagrun_timeout
para que se cumpla el tiempo de espera.Aumenta el número de trabajadores o aumenta los parámetros de rendimiento de los trabajadores para que el DAG se ejecute más rápido.
Síntomas de que la base de datos de Airflow está sometida a una carga pesada
A veces, en los registros del programador de Airflow, puede que veas la siguiente entrada de registro de advertencia:
Scheduler heartbeat got an exception: (_mysql_exceptions.OperationalError) (2006, "Lost connection to MySQL server at 'reading initial communication packet', system error: 0")"
También se pueden observar síntomas similares en los registros de los trabajadores de Airflow:
En MySQL:
(_mysql_exceptions.OperationalError) (2006, "Lost connection to MySQL server at
'reading initial communication packet', system error: 0")"
En PostgreSQL:
psycopg2.OperationalError: connection to server at ... failed
Estos errores o advertencias pueden ser un síntoma de que la base de datos de Airflow está sobrecargada por el número de conexiones abiertas o el número de consultas ejecutadas al mismo tiempo, ya sea por los programadores o por otros componentes de Airflow, como los trabajadores, los activadores y los servidores web.
Posibles soluciones:
Elimina los datos innecesarios de la base de datos de Airflow.
Amplía la base de datos de Airflow ajustando el tamaño de tu entorno.
Reduce el número de programadores. En la mayoría de los casos, uno o dos programadores son suficientes para analizar y programar tareas de Airflow. No se recomienda configurar más de dos programadores, a menos que haya un motivo justificado.
Evita usar variables globales en los DAGs de Airflow. En su lugar, utilice variables de entorno y variables de Airflow.
Asigna a
[scheduler]scheduler_heartbeat_sec
un valor más alto, por ejemplo, 15 segundos o más.Asigna a
[scheduler]job_heartbeat_sec
un valor más alto, por ejemplo, 30 segundos o más.Asigna a
[scheduler]scheduler_health_check_threshold
un valor igual a[scheduler]job_heartbeat_sec
multiplicado por4
.
El servidor web muestra la advertencia "Parece que el programador no está funcionando"
El programador envía su señal de latido de forma periódica a la base de datos de Airflow. Basándose en esta información, el servidor web de Airflow determina si el programador está activo.
A veces, si el programador está sometido a una carga pesada, es posible que no pueda
informar de su latido cada
[scheduler]scheduler_heartbeat_sec
.
En esta situación, el servidor web de Airflow puede mostrar la siguiente advertencia:
The scheduler does not appear to be running. Last heartbeat was received <X>
seconds ago.
Posibles soluciones:
Aumenta los recursos de CPU y memoria del programador.
Optimiza tus DAGs para que su análisis y programación sean más rápidos y no consuman demasiados recursos del programador.
Evita usar variables globales en los DAGs de Airflow. En su lugar, utilice variables de entorno y variables de Airflow.
Aumenta el valor de la opción de configuración de Airflow
[scheduler]scheduler_health_check_threshold
para que el servidor web espere más tiempo antes de informar de que el programador no está disponible.
Soluciones alternativas para los problemas que se producen al rellenar DAGs
En ocasiones, es posible que quieras volver a ejecutar DAGs que ya se hayan ejecutado. Puedes hacerlo con un comando de la CLI de Airflow de la siguiente manera:
gcloud composer environments run \
ENVIRONMENT_NAME \
--location LOCATION \
dags backfill -- -B \
-s START_DATE \
-e END_DATE \
DAG_NAME
Para volver a ejecutar solo las tareas fallidas de un DAG específico, también puedes usar el argumento --rerun-failed-tasks
.
Sustituye:
ENVIRONMENT_NAME
con el nombre del entorno.LOCATION
con la región en la que se encuentra el entorno.START_DATE
con un valor para el parámetrostart_date
DAG, en formatoYYYY-MM-DD
.END_DATE
con un valor para el parámetroend_date
DAG, en formatoYYYY-MM-DD
.DAG_NAME
con el nombre del DAG.
En ocasiones, la operación de relleno puede generar un interbloqueo en el que no se pueda realizar el relleno porque hay un bloqueo en una tarea. Por ejemplo:
2022-11-08 21:24:18.198 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.201 CET -------- --------- -------- ------------
2022-11-08 21:24:18.202 CET 2022-11-08 21:24:18.203 CET These tasks are deadlocked:
2022-11-08 21:24:18.203 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.204 CET ----------------------- ----------- ----------------------------------- ------------
2022-11-08 21:24:18.204 CET <DAG name> <Task name> backfill__2022-10-27T00:00:00+00:00 1
2022-11-08 21:24:19.249 CET Command exited with return code 1
...
2022-11-08 21:24:19.348 CET Failed to execute job 627927 for task backfill
En algunos casos, puedes usar las siguientes soluciones alternativas para evitar los interbloqueos:
Inhabilita la miniprogramación sustituyendo el
[core]schedule_after_task_execution
porFalse
.Ejecuta retroadaptaciones para intervalos de fechas más acotados. Por ejemplo, define
START_DATE
yEND_DATE
para especificar un periodo de solo 1 día.