Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
En esta página se ofrecen pasos para solucionar problemas habituales de los flujos de trabajo e información sobre ellos.
Algunos problemas de ejecución de DAG pueden deberse a que el programador de Airflow no funciona correctamente o de forma óptima. Sigue las instrucciones para solucionar problemas del programador para resolverlos.
Solucionar problemas de flujo de trabajo
Para empezar a solucionar el problema, haz lo siguiente:
Consulta los registros de Airflow.
Puedes aumentar el nivel de registro de Airflow anulando la siguiente opción de configuración de Airflow.
Sección Clave Valor logging
(core
en Airflow 1)logging_level
El valor predeterminado es INFO
. Define el valorDEBUG
para obtener más detalles en los mensajes de registro.Consulta el panel de control de monitorización.
Consulta Cloud Monitoring.
En la Google Cloud consola, comprueba si hay errores en las páginas de los componentes de tu entorno.
En la interfaz web de Airflow, consulta la vista de gráfico del DAG para ver las instancias de tareas fallidas.
Sección Clave Valor webserver
dag_orientation
LR
,TB
,RL
oBT
Depurar errores del operador
Para depurar un error de operador, sigue estos pasos:
- Comprueba si hay errores específicos de la tarea.
- Consulta los registros de Airflow.
- Consulta Cloud Monitoring.
- Consulta los registros específicos del operador.
- Corrige los errores.
- Sube el DAG a la carpeta
/dags
. - En la interfaz web de Airflow, borra los estados anteriores del DAG.
- Reanuda o ejecuta el DAG.
Solucionar problemas de ejecución de tareas
Airflow es un sistema distribuido con muchas entidades, como el programador, el ejecutor y los trabajadores, que se comunican entre sí a través de una cola de tareas y la base de datos de Airflow, y envían señales (como SIGTERM). En el siguiente diagrama se muestra un resumen de las interconexiones entre los componentes de Airflow.
En un sistema distribuido como Airflow, puede haber problemas de conectividad de red o la infraestructura subyacente puede experimentar problemas intermitentes. Esto puede provocar que las tareas fallen y se reprogramen para su ejecución, o que no se completen correctamente (por ejemplo, tareas zombi o tareas que se han quedado bloqueadas durante la ejecución). Airflow tiene mecanismos para hacer frente a estas situaciones y reanudar automáticamente el funcionamiento normal. En las siguientes secciones se explican los problemas habituales que se producen durante la ejecución de tareas de Airflow.
Las tareas fallan sin emitir ningún registro
La tarea falla sin emitir registros debido a errores de análisis de DAG
A veces, puede haber errores sutiles en el DAG que provoquen que el programador de Airflow pueda programar tareas para su ejecución y que el procesador de DAG pueda analizar el archivo de DAG, pero que el trabajador de Airflow no pueda ejecutar las tareas del DAG porque hay errores de programación en el archivo de DAG. Esto puede provocar que una tarea de Airflow se marque como Failed
y no haya ningún registro de su ejecución.
Soluciones:
Verifica en los registros de los trabajadores de Airflow que no haya errores relacionados con la falta de un DAG o con errores de análisis de DAG.
Aumenta los parámetros relacionados con el análisis de DAG:
Aumenta [dagbag-import-timeout][ext-airflow-dagrun-import-timeout] a al menos 120 segundos (o más, si es necesario).
Aumenta el valor de dag-file-processor-timeout a 180 segundos como mínimo (o más, si es necesario). Este valor debe ser superior a
dagbag-import-timeout
.
Consulta también Solucionar problemas del procesador DAG.
Las tareas se interrumpen de forma brusca
Durante la ejecución de las tareas, los workers de Airflow pueden finalizar de forma abrupta debido a problemas que no están relacionados específicamente con la tarea en sí. Consulta las causas raíz habituales para ver una lista de estos casos y las posibles soluciones. En las siguientes secciones se describen algunos síntomas adicionales que podrían derivarse de esas causas principales:
Tareas zombi
Airflow detecta dos tipos de discrepancias entre una tarea y un proceso que ejecuta la tarea:
Las tareas fallidas son tareas que deberían estar en ejecución, pero no lo están. Esto puede ocurrir si el proceso de la tarea se ha terminado o no responde, si el trabajador de Airflow no ha informado del estado de la tarea a tiempo porque está sobrecargado o si se ha apagado la VM en la que se ejecuta la tarea. Airflow busca estas tareas periódicamente y las rechaza o las vuelve a intentar, según la configuración de la tarea.
Descubrir tareas zombi
resource.type="cloud_composer_environment" resource.labels.environment_name="ENVIRONMENT_NAME" log_id("airflow-scheduler") textPayload:"Detected zombie job"
Las tareas zombi son tareas que no deberían estar en ejecución. Airflow busca estas tareas periódicamente y las finaliza.
Consulta las causas habituales para obtener más información sobre cómo solucionar problemas con tareas zombi.
Señales SIGTERM
Linux, Kubernetes, el programador de Airflow y Celery usan señales SIGTERM para finalizar los procesos responsables de ejecutar los trabajadores o las tareas de Airflow.
Puede haber varios motivos por los que se envían señales SIGTERM en un entorno:
Una tarea se ha convertido en una tarea fallida y debe detenerse.
El programador ha detectado un duplicado de una tarea y envía señales de instancia de finalización y SIGTERM a la tarea para detenerla.
En el autoescalado horizontal de pods, el plano de control de GKE envía señales SIGTERM para eliminar los pods que ya no son necesarios.
El programador puede enviar señales SIGTERM al proceso DagFileProcessorManager. El programador usa estas señales SIGTERM para gestionar el ciclo de vida del proceso DagFileProcessorManager y se pueden ignorar sin problemas.
Ejemplo:
Launched DagFileProcessorManager with pid: 353002 Sending Signals.SIGTERM to group 353002. PIDs of all processes in the group: [] Sending the signal Signals.SIGTERM to group 353002 Sending the signal Signals.SIGTERM to process 353002 as process group is missing.
Condición de carrera entre la retrollamada de latido y las retrollamadas de salida en local_task_job, que monitoriza la ejecución de la tarea. Si el latido detecta que una tarea se ha marcado como correcta, no puede distinguir si la tarea se ha completado correctamente o si se le ha indicado a Airflow que la considere correcta. Sin embargo, finalizará un ejecutor de tareas sin esperar a que se cierre.
Estas señales SIGTERM se pueden ignorar sin problemas. La tarea ya está en estado correcto y la ejecución del DAG no se verá afectada.
La entrada de registro
Received SIGTERM.
es la única diferencia entre la salida normal y la finalización de la tarea en el estado correcto.Imagen 2. Condición de carrera entre las devoluciones de llamada de la señal de latido y de salida (haz clic para ampliar) Un componente de Airflow usa más recursos (CPU, memoria) de los permitidos por el nodo del clúster.
El servicio de GKE realiza operaciones de mantenimiento y envía señales SIGTERM a los pods que se ejecutan en un nodo que está a punto de actualizarse.
Cuando se termina una instancia de tarea con SIGTERM, puedes ver las siguientes entradas de registro en los registros de un trabajador de Airflow que ha ejecutado la tarea:
{local_task_job.py:211} WARNING - State of this instance has been externally set to queued. Terminating instance. {taskinstance.py:1411} ERROR - Received SIGTERM. Terminating subprocesses. {taskinstance.py:1703} ERROR - Task failed with exception
Posibles soluciones:
Este problema se produce cuando una máquina virtual que ejecuta la tarea se queda sin memoria. Esto no está relacionado con las configuraciones de Airflow, sino con la cantidad de memoria disponible para la VM.
En Cloud Composer 1, puedes volver a crear tu entorno con un tipo de máquina que tenga un rendimiento mayor.
Puedes reducir el valor de la opción de configuración de
[celery]worker_concurrency
concurrencia de Airflow. Esta opción determina cuántas tareas ejecuta simultáneamente un trabajador de Airflow.
Negsignal.SIGKILL
ha interrumpido la tarea de Airflow
A veces, es posible que tu tarea use más memoria de la que se ha asignado al trabajador de Airflow.
En ese caso, podría interrumpirse a las Negsignal.SIGKILL
. El sistema envía esta señal para evitar un mayor consumo de memoria que podría afectar a la ejecución de otras tareas de Airflow. En el registro del trabajador de Airflow, puede que veas la siguiente entrada:
{local_task_job.py:102} INFO - Task exited with return code Negsignal.SIGKILL
Negsignal.SIGKILL
también puede aparecer como código -9
.
Posibles soluciones:
Menor
worker_concurrency
de trabajadores de Airflow.Cambia a un tipo de máquina más grande que se use en el clúster de Cloud Composer.
Optimiza tus tareas para que usen menos memoria.
La tarea falla debido a la presión de los recursos
Síntoma: durante la ejecución de una tarea, el subproceso del trabajador de Airflow responsable de la ejecución de la tarea de Airflow se interrumpe de forma abrupta. El error que se muestra en el registro del trabajador de Airflow puede ser similar al siguiente:
...
File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 412, in trace_task R = retval = fun(*args, **kwargs) File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 704, in __protected_call__ return self.run(*args, **kwargs) File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 88, in execute_command _execute_in_fork(command_to_exec) File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 99, in _execute_in_fork
raise AirflowException('Celery command failed on host: ' + get_hostname())airflow.exceptions.AirflowException: Celery command failed on host: airflow-worker-9qg9x
...
Solución:
En Cloud Composer 1, crea un entorno con un tipo de máquina más grande que el actual. Añade más nodos a tu entorno y reduce el
[celery]worker_concurrency
para tus trabajadores.Si tu entorno también genera tareas zombi, consulta Solucionar problemas con tareas zombi.
Para ver un tutorial sobre cómo depurar problemas de falta de memoria, consulta Depurar problemas de falta de memoria y de almacenamiento de DAGs.
La tarea falla debido a la expulsión de un pod
Los pods de Google Kubernetes Engine están sujetos al ciclo de vida de los pods de Kubernetes y al desalojo de pods. Los picos de tareas y la coscheduling de trabajadores son dos de las causas más habituales de la expulsión de pods en Cloud Composer.
La expulsión de pods puede producirse cuando un pod concreto usa en exceso los recursos de un nodo en relación con las expectativas de consumo de recursos configuradas para el nodo. Por ejemplo, el desalojo puede producirse cuando se ejecutan varias tareas que consumen mucha memoria en un pod y su carga combinada provoca que el nodo en el que se ejecuta este pod supere el límite de consumo de memoria.
Si se expulsa un pod de trabajador de Airflow, se interrumpirán todas las instancias de tareas que se estén ejecutando en ese pod y, más adelante, Airflow las marcará como fallidas.
Los registros se almacenan en búfer. Si se expulsa un pod de trabajador antes de que se vacíe el búfer, no se emitirán registros. Si una tarea falla y no hay registros, significa que los trabajadores de Airflow se han reiniciado debido a un error de falta de memoria (OOM). Es posible que algunos registros estén presentes en Cloud Logging aunque no se hayan emitido los registros de Airflow.
Para ver los registros, sigue estos pasos:
En la Google Cloud consola, ve a la página Entornos.
En la lista de entornos, haz clic en el nombre del entorno. Se abrirá la página Detalles del entorno.
Ve a la pestaña Registros.
Para ver los registros de los distintos workers de Airflow, vaya a Todos los registros > Registros de Airflow > Workers.
Síntoma:
En la Google Cloud consola, ve a la página Cargas de trabajo.
Si hay
airflow-worker
pods que muestranEvicted
, haz clic en cada pod expulsado y busca el mensajeThe node was low on resource: memory
en la parte superior de la ventana.
Solución:
Crea un entorno de Cloud Composer 1 con un tipo de máquina más grande que el actual.
Consulta los registros de los pods de
airflow-worker
para ver los posibles motivos de desalojo. Para obtener más información sobre cómo obtener registros de pods concretos, consulta el artículo Solucionar problemas con cargas de trabajo implementadas.Asegúrate de que las tareas del DAG sean idempotentes y se puedan volver a intentar.
Evita descargar archivos innecesarios en el sistema de archivos local de los trabajadores de Airflow.
Los workers de Airflow tienen una capacidad limitada del sistema de archivos local. Cuando se agota el espacio de almacenamiento, el plano de control de GKE expulsa el pod de trabajador de Airflow. Se produce un error en todas las tareas que estaba ejecutando el trabajador expulsado.
Ejemplos de operaciones problemáticas:
- Descargar archivos u objetos y almacenarlos localmente en un trabajador de Airflow. En su lugar, almacena estos objetos directamente en un servicio adecuado, como un segmento de Cloud Storage.
- Acceder a objetos grandes en la carpeta
/data
desde un trabajador de Airflow. El trabajador de Airflow descarga el objeto en su sistema de archivos local. En su lugar, implementa tus DAGs de forma que los archivos de gran tamaño se procesen fuera del pod de trabajador de Airflow.
Causas habituales
El trabajador de Airflow se ha quedado sin memoria
Cada trabajador de Airflow puede ejecutar hasta [celery]worker_concurrency
instancias de tareas simultáneamente. Si el consumo acumulativo de memoria de esas instancias de tareas supera el límite de memoria de un trabajador de Airflow, se termina un proceso aleatorio para liberar recursos.
Descubrir eventos de falta de memoria de los trabajadores de Airflow
resource.type="k8s_node" resource.labels.cluster_name="GKE_CLUSTER_NAME" log_id("events") jsonPayload.message:"Killed process" jsonPayload.message:("airflow task" OR "celeryd")
A veces, la falta de memoria en un trabajador de Airflow puede provocar que se envíen paquetes mal formados durante una sesión de SQL Alchemy a la base de datos, a un servidor DNS o a cualquier otro servicio al que llame un DAG. En este caso, el otro extremo de la conexión podría rechazar o interrumpir las conexiones del trabajador de Airflow. Por ejemplo:
"UNKNOWN:Error received from peer
{created_time:"2024-11-31T10:09:52.217738071+00:00", grpc_status:14,
grpc_message:"failed to connect to all addresses; last error: UNKNOWN:
ipv4:<ip address>:443: handshaker shutdown"}"
Soluciones:
Optimiza las tareas para que usen menos memoria. Por ejemplo, evita el código de nivel superior.
Reducir
[celery]worker_concurrency
.En Cloud Composer 1, cambia a un tipo de máquina mayor.
Se ha expulsado un trabajador de Airflow
El desalojo de pods es una parte normal de la ejecución de cargas de trabajo en Kubernetes. GKE expulsa pods si se quedan sin almacenamiento o para liberar recursos para cargas de trabajo con una prioridad más alta.
Descubrir expulsiones de trabajadores de Airflow
resource.type="k8s_pod" resource.labels.cluster_name="GKE_CLUSTER_NAME" resource.labels.pod_name:"airflow-worker" log_id("events") jsonPayload.reason="Evicted"
Soluciones:
- Si la expulsión se debe a la falta de almacenamiento, puedes reducir el uso del almacenamiento o eliminar los archivos temporales en cuanto no los necesites.
Como alternativa, puedes aumentar el almacenamiento disponible o ejecutar cargas de trabajo en un pod dedicado con
KubernetesPodOperator
.
Se ha terminado el trabajador de Airflow
Es posible que los workers de Airflow se eliminen externamente. Si las tareas que se están ejecutando no finalizan durante el periodo de finalización correcta, se interrumpirán y es posible que se detecten como zombis.
Descubrir las finalizaciones de pods de trabajadores de Airflow
resource.type="k8s_cluster" resource.labels.cluster_name="GKE_CLUSTER_NAME" protoPayload.methodName:"pods.delete" protoPayload.response.metadata.name:"airflow-worker"
Posibles situaciones y soluciones:
Los trabajadores de Airflow se reinician durante las modificaciones del entorno, como las actualizaciones o la instalación de paquetes:
Descubrir modificaciones del entorno de Composer
resource.type="cloud_composer_environment" resource.labels.environment_name="ENVIRONMENT_NAME" log_id("cloudaudit.googleapis.com%2Factivity")
Puedes realizar estas operaciones cuando no haya tareas críticas en curso o habilitar los reintentos de tareas.
Es posible que varios componentes no estén disponibles temporalmente durante las operaciones de mantenimiento.
Descubrir las operaciones de mantenimiento de GKE
resource.type="gke_nodepool" resource.labels.cluster_name="GKE_CLUSTER_NAME" protoPayload.metadata.operationType="UPGRADE_NODES"
Puedes especificar ventanas de mantenimiento para minimizar
se solapa con la ejecución de las tareas críticas.
El trabajador de Airflow estaba sometido a una carga pesada
La cantidad de recursos de CPU y memoria disponibles para un trabajador de Airflow está limitada por la configuración del entorno. Si el uso de recursos se acerca a los límites, puede provocar una contención de recursos y retrasos innecesarios durante la ejecución de la tarea. En situaciones extremas, cuando faltan recursos durante periodos prolongados, esto puede provocar tareas zombi.
Soluciones:
- Monitoriza el uso de la CPU y la memoria de los trabajadores y ajústalo para evitar que supere el 80%.
Consultas de Cloud Logging para descubrir los motivos de los reinicios o las expulsiones de pods
Los entornos de Cloud Composer usan clústeres de GKE como capa de infraestructura de computación. En esta sección, encontrarás consultas útiles que pueden ayudarte a descubrir los motivos por los que se reinician o se expulsan los workers o los programadores de Airflow.
Las consultas que se presentan más adelante se pueden ajustar de la siguiente manera:
Puedes especificar la cronología necesaria en Cloud Logging. Por ejemplo, las últimas 6 horas o los últimos 3 días, o bien puedes definir un periodo personalizado.
Debes especificar el nombre del clúster de tu entorno en CLUSTER_NAME.
Puedes limitar la búsqueda a un pod específico añadiendo el POD_NAME.
Descubrir contenedores reiniciados
resource.type="k8s_node" log_id("kubelet") jsonPayload.MESSAGE:"will be restarted" resource.labels.cluster_name="CLUSTER_NAME"
Consulta alternativa para limitar los resultados a un pod específico:
resource.type="k8s_node" log_id("kubelet") jsonPayload.MESSAGE:"will be restarted" resource.labels.cluster_name="CLUSTER_NAME" "POD_NAME"
Descubrir contenedores que se han apagado como resultado de un evento de falta de memoria
resource.type="k8s_node" log_id("events") (jsonPayload.reason:("OOMKilling" OR "SystemOOM") OR jsonPayload.message:("OOM encountered" OR "out of memory")) severity=WARNING resource.labels.cluster_name="CLUSTER_NAME"
Consulta alternativa para limitar los resultados a un pod específico:
resource.type="k8s_node" log_id("events") (jsonPayload.reason:("OOMKilling" OR "SystemOOM") OR jsonPayload.message:("OOM encountered" OR "out of memory")) severity=WARNING resource.labels.cluster_name="CLUSTER_NAME" "POD_NAME"
Descubrir contenedores que han dejado de ejecutarse
resource.type="k8s_node" log_id("kubelet") jsonPayload.MESSAGE:"ContainerDied" severity=DEFAULT resource.labels.cluster_name="CLUSTER_NAME"
Consulta alternativa para limitar los resultados a un pod específico:
resource.type="k8s_node" log_id("kubelet") jsonPayload.MESSAGE:"ContainerDied" severity=DEFAULT resource.labels.cluster_name="CLUSTER_NAME" "POD_NAME"
La base de datos de Airflow estaba sometida a una carga elevada
Varios componentes de Airflow utilizan una base de datos para comunicarse entre sí y, en concreto, para almacenar los latidos de las instancias de tareas. La escasez de recursos en la base de datos provoca que las consultas tarden más y puede afectar a la ejecución de las tareas.
A veces, se producen los siguientes errores en los registros de un trabajador de Airflow:
(psycopg2.OperationalError) connection to server at <IP address>,
port 3306 failed: server closed the connection unexpectedly
This probably means the server terminated abnormally before or while
processing the request.
Soluciones:
- Evita usar muchas instrucciones
Variables.get
en el código DAG de nivel superior. En su lugar, usa plantillas de Jinja para obtener los valores de las variables de Airflow. - Optimiza (reduce) el uso de las instrucciones xcom_push y xcom_pull en las plantillas de Jinja del código de DAG de nivel superior.
- Te recomendamos que cambies a un entorno de mayor tamaño (Mediano o Grande).
- Reduce el número de programadores
- Reduce la frecuencia del análisis de DAGs.
- Monitoriza el uso de la CPU y la memoria de la base de datos.
La base de datos de Airflow no estaba disponible temporalmente
Un trabajador de Airflow puede tardar en detectar y gestionar correctamente los errores intermitentes, como los problemas de conectividad temporales. Puede superar el umbral de detección de zombis predeterminado.
Descubrir los tiempos de espera de la señal de latido de Airflow
resource.type="cloud_composer_environment" resource.labels.environment_name="ENVIRONMENT_NAME" log_id("airflow-worker") textPayload:"Heartbeat time limit exceeded"
Soluciones:
Aumenta el tiempo de espera de las tareas inertes y anula el valor de la
[scheduler]scheduler_zombie_task_threshold
opción de configuración de Airflow:Sección Clave Valor Notas scheduler
scheduler_zombie_task_threshold
Nuevo tiempo de espera (en segundos) El valor predeterminado es 300
.
Las tareas fallan porque se ha producido un error durante la ejecución
Terminating instance
Airflow usa el mecanismo de finalización de instancias para cerrar las tareas de Airflow. Este mecanismo se utiliza en las siguientes situaciones:
- Cuando un programador finaliza una tarea que no se ha completado a tiempo.
- Cuando una tarea agota el tiempo de espera o se ejecuta durante demasiado tiempo.
Cuando Airflow finaliza las instancias de tareas, puedes ver las siguientes entradas de registro en los registros de un trabajador de Airflow que haya ejecutado la tarea:
INFO - Subtask ... WARNING - State of this instance has been externally set
to success. Terminating instance.
INFO - Subtask ... INFO - Sending Signals.SIGTERM to GPID <X>
INFO - Subtask ... ERROR - Received SIGTERM. Terminating subprocesses.
Posibles soluciones:
Comprueba el código de la tarea para detectar errores que puedan provocar que se ejecute durante demasiado tiempo.
Aumenta el valor de la opción de configuración de Airflow
[celery_broker_transport_options]visibility_timeout
.Por lo tanto, el programador espera más tiempo a que se complete una tarea antes de considerarla una tarea zombi. Esta opción es especialmente útil para tareas que requieren mucho tiempo y duran muchas horas. Si el valor es demasiado bajo (por ejemplo, 3 horas), el programador considera que las tareas que se ejecutan durante 5 o 6 horas están colgadas (tareas zombi).
Aumenta el valor de la opción de configuración
[core]killed_task_cleanup_time
Airflow.Un valor más largo proporciona más tiempo a los trabajadores de Airflow para que terminen sus tareas correctamente. Si el valor es demasiado bajo, las tareas de Airflow podrían interrumpirse de forma abrupta, sin tiempo suficiente para finalizar su trabajo correctamente.
La ejecución del DAG no finaliza en el tiempo previsto
Síntoma:
A veces, una ejecución de DAG no finaliza porque las tareas de Airflow se bloquean y la ejecución de DAG dura más de lo esperado. En condiciones normales, las tareas de Airflow no permanecen indefinidamente en el estado de cola o en ejecución, ya que Airflow tiene procedimientos de tiempo de espera y de limpieza que ayudan a evitar esta situación.
Solución:
Usa el parámetro
dagrun_timeout
para los DAGs. Por ejemplo:dagrun_timeout=timedelta(minutes=120)
. Por lo tanto, cada ejecución de DAG debe finalizar dentro del tiempo de espera de la ejecución de DAG. Para obtener más información sobre los estados de las tareas de Airflow, consulta la documentación de Apache Airflow.Usa el parámetro Tiempo de espera de ejecución de tareas para definir un tiempo de espera predeterminado para las tareas que se ejecutan en función de los operadores de Apache Airflow.
Se pierde la conexión con el servidor de PostgreSQL o MySQL durante la ejecución de la tarea o justo después de ella.
Las Lost connection to Postgres / MySQL server during query
excepciones
suelen producirse cuando se cumplen las siguientes condiciones:
- Tu DAG usa
PythonOperator
o un operador personalizado. - Tu DAG hace consultas a la base de datos de Airflow.
Si se hacen varias consultas desde una función invocable, es posible que los rastreos de pila apunten incorrectamente a la línea self.refresh_from_db(lock_for_update=True)
del código de Airflow. Se trata de la primera consulta de la base de datos después de la ejecución de la tarea. La causa real de la excepción se produce antes, cuando no se cierra correctamente una sesión de SQLAlchemy.
Las sesiones de SQLAlchemy se limitan a un hilo y se crean en una función invocable. La sesión se puede continuar más adelante en el código de Airflow. Si hay retrasos significativos entre las consultas de una sesión, es posible que el servidor de PostgreSQL o MySQL ya haya cerrado la conexión. El tiempo de espera de la conexión en los entornos de Cloud Composer es de aproximadamente 10 minutos.
Solución:
- Usa el decorador
airflow.utils.db.provide_session
. Este decorador proporciona una sesión válida a la base de datos de Airflow en el parámetrosession
y cierra correctamente la sesión al final de la función. - No uses una sola función de larga duración. En su lugar, mueve todas las consultas de la base de datos a funciones independientes para que haya varias funciones con el decorador
airflow.utils.db.provide_session
. En este caso, las sesiones se cierran automáticamente después de recuperar los resultados de la consulta.
Interrupciones transitorias al conectarse a la base de datos de metadatos de Airflow
Cloud Composer se ejecuta sobre una infraestructura distribuida. Esto significa que, de vez en cuando, pueden aparecer algunos problemas transitorios que interrumpan la ejecución de tus tareas de Airflow.
En estas situaciones, es posible que veas los siguientes mensajes de error en los registros de los trabajadores de Airflow:
"Can't connect to Postgres / MySQL server on 'airflow-sqlproxy-service.default.svc.cluster.local' (111)"
o
"Can't connect to Postgres / MySQL server on 'airflow-sqlproxy-service.default.svc.cluster.local' (104)"
Estos problemas intermitentes también pueden deberse a las operaciones de mantenimiento que se realizan en tus entornos de Cloud Composer.
Normalmente, estos errores son intermitentes y, si tus tareas de Airflow son idempotentes y tienes configurados reintentos, no te afectarán. También puedes definir ventanas de mantenimiento.
Otro motivo de estos errores podría ser la falta de recursos en el clúster de tu entorno. En estos casos, puede aumentar la escala u optimizar su entorno tal como se describe en las instrucciones para aumentar la escala de los entornos o optimizar su entorno.
Una ejecución de DAG se marca como correcta, pero no tiene tareas ejecutadas
Si la ejecución de un DAG execution_date
es anterior a la start_date
del DAG, es posible que veas ejecuciones de DAGs que no tienen ninguna ejecución de tarea, pero que siguen marcadas como correctas.

Causa
Esta situación puede producirse en uno de los siguientes casos:
La diferencia horaria entre
execution_date
ystart_date
del DAG provoca un error de coincidencia. Por ejemplo, puede ocurrir cuando se usapendulum.parse(...)
para definirstart_date
.El
start_date
del DAG se ha definido con un valor dinámico, por ejemplo,airflow.utils.dates.days_ago(1)
.
Solución
Asegúrate de que
execution_date
ystart_date
usen la misma zona horaria.Especifica un
start_date
estático y combínalo concatchup=False
para evitar ejecutar DAGs con fechas de inicio anteriores.
Prácticas recomendadas
Impacto de las operaciones de actualización en las ejecuciones de tareas de Airflow
Las operaciones de actualización o de mejora interrumpen las tareas de Airflow que se estén ejecutando, a menos que se ejecuten en el modo aplazable.
Te recomendamos que realices estas operaciones cuando preveas que tendrán un impacto mínimo en las ejecuciones de tareas de Airflow y que configures los mecanismos de reintento adecuados en tus DAGs y tareas.
No programes DAGs generados mediante programación a la misma hora
Generar objetos DAG de forma programática a partir de un archivo DAG es un método eficaz para crear muchos DAG similares que solo tienen pequeñas diferencias.
Es importante no programar la ejecución de todos estos DAGs inmediatamente. Es muy probable que los trabajadores de Airflow no tengan suficientes recursos de CPU y memoria para ejecutar todas las tareas programadas al mismo tiempo.
Para evitar problemas al programar DAGs programáticos, sigue estas recomendaciones:
- Aumenta la simultaneidad de los trabajadores y amplía tu entorno para que pueda ejecutar más tareas simultáneamente.
- Genera DAGs de forma que sus programaciones se distribuyan de manera uniforme a lo largo del tiempo para evitar programar cientos de tareas al mismo tiempo, de modo que los trabajadores de Airflow tengan tiempo para ejecutar todas las tareas programadas.
Controlar el tiempo de ejecución de los DAGs, las tareas y las ejecuciones paralelas del mismo DAG
Si quieres controlar la duración de una sola ejecución de un DAG concreto, puedes usar el parámetro dagrun_timeout
del DAG. Por ejemplo, si crees que una sola ejecución de un DAG (independientemente de si se completa correctamente o no) no debe durar más de una hora, asigna a este parámetro el valor 3600 segundos.
También puedes controlar la duración de una tarea de Airflow. Para ello, puedes usar execution_timeout
.
Si quieres controlar el número de ejecuciones de DAG activas que quieres tener para un DAG concreto, puedes usar la [core]max-active-runs-per-dag
opción de configuración de Airflow.
Si quieres que solo se ejecute una instancia de un DAG en un momento dado, asigna el valor 1
al parámetro max-active-runs-per-dag
.
Evitar el aumento del tráfico de red hacia y desde la base de datos de Airflow
La cantidad de tráfico de red entre el clúster de GKE de tu entorno y la base de datos de Airflow depende del número de DAGs, del número de tareas de los DAGs y de la forma en que los DAGs acceden a los datos de la base de datos de Airflow. Los siguientes factores pueden influir en el uso de la red:
Consultas a la base de datos de Airflow. Si tus DAGs hacen muchas consultas, generarán grandes cantidades de tráfico. Por ejemplo, comprobar el estado de las tareas antes de continuar con otras, consultar la tabla XCom o volcar el contenido de la base de datos de Airflow.
Gran número de tareas. Cuantas más tareas haya que programar, más tráfico de red se generará. Esta consideración se aplica tanto al número total de tareas de tus DAGs como a la frecuencia de programación. Cuando el programador de Airflow programa ejecuciones de DAGs, hace consultas a la base de datos de Airflow y genera tráfico.
La interfaz web de Airflow genera tráfico de red porque hace consultas a la base de datos de Airflow. Usar de forma intensiva páginas con gráficos, tareas y diagramas puede generar grandes volúmenes de tráfico de red.
Siguientes pasos
- Solucionar problemas de instalación de paquetes de PyPI
- Solucionar problemas de actualizaciones del entorno