Cloud Composer 1 | Cloud Composer 2
En esta página, se proporcionan información y pasos para solucionar problemas comunes del flujo de trabajo.
Muchos problemas de ejecución del DAG se deben a un rendimiento del entorno no óptimo. Puedes optimizar tu entorno de Cloud Composer 2 si sigues la guía Optimiza el rendimiento y los costos del entorno.
Algunos problemas de ejecuciones de DAG pueden deberse a que el programador de Airflow no funciona de forma correcta u óptima. Sigue las instrucciones de solución de problemas del programador para resolver estos problemas.
Flujo de trabajo para la solución de problemas
Para comenzar con la solución de problemas, siga estos pasos:
Consulta los registros de Airflow.
Puedes aumentar el nivel de registro de Airflow si anulas la siguiente opción de configuración de Airflow.
Airflow 2
Sección Clave Valor logging
logging_level
El valor predeterminado es INFO
. Configúralo enDEBUG
para obtener más verbosidad en los mensajes de registro.Airflow 1
Sección Clave Valor core
logging_level
El valor predeterminado es INFO
. Configúralo enDEBUG
para obtener más verbosidad en los mensajes de registro.Consulta el panel de Monitoring.
Revisa Cloud Monitoring.
En la consola de Google Cloud, comprueba si hay errores en las páginas de los componentes de tu entorno.
En la interfaz web de Airflow, consulta la Vista de gráfico del DAG para ver las instancias de tareas con errores.
Sección Clave Valor webserver
dag_orientation
LR
,TB
,RL
, oBT
Depura fallas del operador
Para depurar una falla del operador, sigue estos pasos:
- Verifica si hay errores específicos de la tarea.
- Consulta los registros de Airflow.
- Revisa Cloud Monitoring.
- Verifica los registros específicos del operador.
- Corrige los errores.
- Sube el DAG a la carpeta
dags/
. - En la interfaz web de Airflow, borra los estados anteriores del DAG.
- Reanuda o ejecuta el DAG.
Soluciona problemas en la ejecución de tareas
Airflow es un sistema distribuido con muchas entidades, como programador, ejecutor y trabajadores, que se comunican entre sí a través de una lista de tareas en cola y la base de datos de Airflow, y envían señales (como SIGTERM). En el siguiente diagrama, se muestra una descripción general de las interconexiones entre los componentes de Airflow.
En un sistema distribuido como Airflow, es posible que haya algunos problemas de conectividad de red, o que la infraestructura subyacente experimente problemas intermitentes, lo que puede generar situaciones en las que las tareas pueden fallar y reprogramarse para su ejecución, o las tareas pueden no completarse correctamente (por ejemplo, tareas zombi o tareas que se detuvieron en la ejecución). Airflow tiene mecanismos para abordar esas situaciones y reanudar automáticamente su funcionamiento normal. En las siguientes secciones, se explican los problemas comunes que ocurren durante la ejecución de tareas de Airflow: tareas zombi, píldoras venenosas e indicadores SIGTERM.
Solución de problemas de tareas zombi
Airflow detecta dos tipos de discordancia entre una tarea y un proceso que la ejecuta:
Las tareas zombi son tareas que se supone que se están ejecutando, pero no se están ejecutando. Esto puede suceder si el proceso de la tarea se finalizó o si no responde, si el trabajador de Airflow no informó el estado de la tarea a tiempo porque está sobrecargada o si se cerró la VM en la que se ejecuta la tarea. Airflow encuentra esas tareas de forma periódica y falla o reintenta, según la configuración de la tarea.
Las tareas no muertas son tareas que no deberían estar en ejecución. Airflow encuentra esas tareas de forma periódica y las finaliza.
El motivo más común de las tareas zombi es la escasez de recursos de CPU y memoria en el clúster de tu entorno. Como resultado, es posible que un trabajador de Airflow no pueda informar el estado de una tarea o que un sensor se interrumpa de manera abrupta. En este caso, el programador marca una tarea determinada como tarea zombi. Para evitar las tareas zombi, asigna más recursos a tu entorno.
Para obtener más información sobre cómo escalar tu entorno de Cloud Composer, consulta la Guía de entorno de escalamiento. Si experimentas tareas zombi, una solución posible es aumentar el tiempo de espera de estas. Como resultado, el programador espera más tiempo antes de considerar que una tarea es un zombi. De esta manera, un trabajador de Airflow tiene más tiempo para informar el estado de la tarea.
Para aumentar el tiempo de espera de las tareas zombi, anula el valor de la opción de configuración [scheduler]scheduler_zombie_task_threshold
de Airflow:
Sección | Clave | Valor | Notas |
---|---|---|---|
scheduler |
scheduler_zombie_task_threshold |
Nuevo tiempo de espera (en segundos) | El valor predeterminado es 300 . |
Solución de problemas con píldora tóxica
Poison Pill es un mecanismo que usa Airflow para cerrar las tareas de Airflow.
Airflow usa Poison Pill en estas situaciones:
- Cuando un programador finaliza una tarea que no se completó a tiempo.
- Cuando se agota el tiempo de espera de una tarea o se ejecuta durante demasiado tiempo
Cuando Airflow usa Poison Pill, puedes ver las siguientes entradas en los registros de un trabajador de Airflow que ejecutó la tarea:
INFO - Subtask ... WARNING - State of this instance has been externally set
to success. Taking the poison pill.
INFO - Subtask ... INFO - Sending Signals.SIGTERM to GPID <X>
INFO - Subtask ... ERROR - Received SIGTERM. Terminating subprocesses.
Soluciones posibles:
- Revisa el código de la tarea para detectar errores que puedan provocar que se ejecute durante demasiado tiempo.
- (Cloud Composer 2) Aumenta la CPU y la memoria de los trabajadores de Airflow para que las tareas se ejecuten más rápido.
Aumenta el valor de la opción de configuración
[celery_broker_transport_options]visibility-timeout
de Airflow.Como resultado, el programador espera más tiempo hasta que se complete una tarea antes de considerar que es una tarea zombi. Esta opción es especialmente útil para las tareas que requieren mucho tiempo y duran muchas horas. Si el valor es demasiado bajo (por ejemplo, 3 horas), el programador considerará que las tareas que se ejecutan durante 5 o 6 horas son “colgadas” (tareas zombi).
Aumenta el valor de la opción de configuración
[core]killed_task_cleanup_time
de Airflow.Un valor más largo proporciona más tiempo a los trabajadores de Airflow para que finalicen sus tareas con facilidad. Si el valor es demasiado bajo, las tareas de Airflow podrían interrumpirse de manera repentina, sin tiempo suficiente para finalizar su trabajo de forma correcta.
Solución de problemas de señales SIGTERM
Linux, Kubernetes, el programador de Airflow y Celery usan señales SIGTERM para finalizar los procesos responsables de ejecutar trabajadores de Airflow o tareas de Airflow.
Puede haber varios motivos por los que las señales SIGTERM se envían en un entorno:
Una tarea se convirtió en una tarea zombi y debe detenerse.
El programador descubrió un duplicado de una tarea y envía señales venenosas y señales SIGTERM a la tarea para detenerla.
En el Ajuste de escala automático horizontal de Pods, el plano de control de GKE envía señales SIGTERM para quitar los Pods que ya no son necesarios.
El programador puede enviar señales SIGTERM al proceso DagFileProcessorManager. El programador usa estas señales de SIGTERM para administrar el ciclo de vida del proceso de DagFileProcessorManager y se pueden ignorar de forma segura.
Ejemplo:
Launched DagFileProcessorManager with pid: 353002 Sending Signals.SIGTERM to group 353002. PIDs of all processes in the group: [] Sending the signal Signals.SIGTERM to group 353002 Sending the signal Signals.SIGTERM to process 353002 as process group is missing.
Condición de carrera entre la devolución de llamada de señal de monitoreo de funcionamiento y las devoluciones de llamada de salida en local_task_job, que supervisa la ejecución de la tarea. Si la señal de monitoreo de funcionamiento detecta que una tarea se marcó como exitosa, no puede distinguir si esta se realizó correctamente o si se le indicó a Airflow que la considerara como completada. No obstante, finalizará un ejecutor de tareas sin esperar a que salga.
Estas señales SIGTERM se pueden ignorar de forma segura. La tarea ya se encuentra en estado correcto y la ejecución del DAG en su totalidad no se verá afectada.
La entrada de registro
Received SIGTERM.
es la única diferencia entre la salida normal y la finalización de la tarea en el estado correcto.Un componente de Airflow usa más recursos (CPU, memoria) de los que permite el nodo del clúster.
El servicio de GKE realiza operaciones de mantenimiento y envía señales SIGTERM a los Pods que se ejecutan en un nodo que está a punto de actualizarse. Cuando una instancia de tarea finaliza con SIGTERM, puedes ver las siguientes entradas de registro en los registros de un trabajador de Airflow que ejecutó la tarea:
{local_task_job.py:211} WARNING - State of this instance has been externally
set to queued. Terminating instance. {taskinstance.py:1411} ERROR - Received
SIGTERM. Terminating subprocesses. {taskinstance.py:1703} ERROR - Task failed
with exception
Soluciones posibles:
Este problema ocurre cuando una VM que ejecuta la tarea se queda sin memoria. Esto no está relacionado con las configuraciones de Airflow, sino con la cantidad de memoria disponible para la VM.
El aumento de la memoria depende de la versión de Cloud Composer que uses. Por ejemplo:
En Cloud Composer 2, puedes asignar más recursos de CPU y memoria a los trabajadores de Airflow.
En el caso de Cloud Composer 1, puedes volver a crear tu entorno con un tipo de máquina con más rendimiento.
En ambas versiones de Cloud Composer, puedes reducir el valor de la opción de configuración de simultaneidad de Airflow
[celery]worker_concurrency
. Esta opción determina cuántas tareas ejecuta un trabajador determinado de Airflow al mismo tiempo.
Para obtener más información sobre cómo optimizar tu entorno de Cloud Composer 2, consulta Optimiza el rendimiento y los costos del entorno.
Consultas de Cloud Logging para descubrir los motivos de los reinicios o las expulsiones de Pods
Los entornos de Cloud Composer usan clústeres de GKE como capa de infraestructura de procesamiento. En esta sección, podrás encontrar consultas útiles que pueden ayudarte a encontrar los motivos de los reinicios o las expulsiones del trabajador o del programador de Airflow.
Las consultas que se presentan a continuación se pueden ajustar de la siguiente manera:
puedes especificar un cronograma que te interese en Cloud Logging, las últimas 6 horas o 3 días,
debes especificar el objeto CLUSTER_NAME de Cloud Composer
También puedes limitar la búsqueda a un Pod específico si agregas POD_NAME
Descubre contenedores reiniciados
resource.type="k8s_node" log_id("kubelet") jsonPayload.MESSAGE:"will be restarted" resource.labels.cluster_name="CLUSTER_NAME"
Consulta alternativa para limitar los resultados a un Pod específico:
resource.type="k8s_node" log_id("kubelet") jsonPayload.MESSAGE:"will be restarted" resource.labels.cluster_name="CLUSTER_NAME" "POD_NAME"
Descubre el cierre de los contenedores como resultado de un evento de memoria insuficiente
resource.type="k8s_node" log_id("events") (jsonPayload.reason:("OOMKilling" OR "SystemOOM") OR jsonPayload.message:("OOM encountered" OR "out of memory")) severity=WARNING resource.labels.cluster_name="CLUSTER_NAME"
Consulta alternativa para limitar los resultados a un Pod específico:
resource.type="k8s_node" log_id("events") (jsonPayload.reason:("OOMKilling" OR "SystemOOM") OR jsonPayload.message:("OOM encountered" OR "out of memory")) severity=WARNING resource.labels.cluster_name="CLUSTER_NAME" "POD_NAME"
Descubre los contenedores que dejaron de ejecutarse
resource.type="k8s_node" log_id("kubelet") jsonPayload.MESSAGE:"ContainerDied" severity=DEFAULT resource.labels.cluster_name="CLUSTER_NAME"
Consulta alternativa para limitar los resultados a un Pod específico:
resource.type="k8s_node" log_id("kubelet") jsonPayload.MESSAGE:"ContainerDied" severity=DEFAULT resource.labels.cluster_name="CLUSTER_NAME" "POD_NAME"
Impacto de las operaciones de actualización en las ejecuciones de tareas de Airflow
Las operaciones de actualización o actualización interrumpen la ejecución actual de las tareas de Airflow, a menos que una tarea se ejecute en el modo diferible.
Recomendamos realizar estas operaciones cuando se espere un impacto mínimo en las ejecuciones de tareas de Airflow y configurar mecanismos de reintento adecuados en tus DAG y tareas.
Problemas comunes
En las siguientes secciones, se describen los síntomas y las posibles soluciones para algunos problemas comunes del DAG.
Negsignal.SIGKILL
interrumpió la tarea de Airflow
A veces, tu tarea puede estar usando más memoria de la que se asigna al trabajador de Airflow.
En ese caso, es posible que Negsignal.SIGKILL
lo interrumpa. El sistema envía esta señal para evitar un mayor consumo de memoria, lo que podría afectar la ejecución de otras tareas de Airflow. En el registro del trabajador de Airflow, es posible que veas la siguiente entrada de registro:
{local_task_job.py:102} INFO - Task exited with return code Negsignal.SIGKILL
Negsignal.SIGKILL
también puede aparecer como código -9
.
Soluciones posibles:
Menor
worker_concurrency
de trabajadores de AirflowEn el caso de Cloud Composer 2, aumenta la memoria de los trabajadores de Airflow.
En el caso de Cloud Composer 1, actualiza a un tipo de máquina más grande que se usa en el clúster de Cloud Composer.
Optimiza tus tareas para usar menos memoria.
Administra tareas que requieren muchos recursos en Cloud Composer con KubernetesPodOperator o GKEStartPodOperator para el aislamiento de tareas y la asignación de recursos personalizada.
La tarea falla sin emitir registros debido a errores de análisis del DAG
A veces, puede haber errores sutiles en el DAG que lleven a una situación en la que un programador de Airflow y un procesador de DAG pueden programar tareas para su ejecución y analizar un archivo DAG (respectivamente), pero el trabajador de Airflow no puede ejecutar tareas desde un DAG, ya que hay errores de programación en el archivo DAG de Python. Esto puede generar una situación en la que una tarea de Airflow se marca como Failed
y no haya ningún registro de su ejecución.
Soluciones:
Verifica en los registros del trabajador de Airflow que el trabajador de Airflow no genere errores relacionados con errores de análisis faltantes de DAG o DAG.
Aumenta los parámetros relacionados con el análisis del DAG:
Aumenta dagbag-import-timeout a, al menos, 120 segundos (o más, si es necesario).
Aumenta el dag-file-processor-timeout de al menos 180 segundos (o más, si es necesario). Este valor debe ser superior a
dagbag-import-timeout
.
Consulta también la sección sobre cómo inspeccionar los registros del procesador de DAG.
La tarea falla sin emitir registros debido a la presión de los recursos
Síntoma: durante la ejecución de una tarea, el subproceso del trabajador de Airflow responsable de la ejecución de la tarea de Airflow se interrumpe de forma abrupta. El error visible en el registro del trabajador de Airflow podría ser similar al siguiente:
...
File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 412, in trace_task R = retval = fun(*args, **kwargs) File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 704, in __protected_call__ return self.run(*args, **kwargs) File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 88, in execute_command _execute_in_fork(command_to_exec) File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 99, in _execute_in_fork
raise AirflowException('Celery command failed on host: ' + get_hostname())airflow.exceptions.AirflowException: Celery command failed on host: airflow-worker-9qg9x
...
Solución:
- En Cloud Composer 1, crea un entorno nuevo con un tipo de máquina más grande que el tipo de máquina actual. Considera agregar más nodos a tu entorno y reducir
[celery]worker_concurrency
para tus trabajadores. - En Cloud Composer 2, aumenta los límites de memoria para los trabajadores de Airflow.
- Si tu entorno también genera tareas zombi, consulta Solución de problemas de tareas zombi.
- Para ver un instructivo sobre la depuración de problemas de memoria insuficiente, consulta Depura problemas de DAG sin memoria y sin almacenamiento.
La tarea falla sin emitir registros debido al expulsión del Pod
Los Pods de Google Kubernetes Engine están sujetos al ciclo de vida de los Pods de Kubernetes y a la expulsión de Pods. Los aumentos repentinos de tareas y la programación conjunta de trabajadores son las dos causas más comunes de la expulsión de Pods en Cloud Composer.
La expulsión de pods puede ocurrir cuando un pod en particular usa recursos de un nodo, en relación con las expectativas de consumo de recursos configuradas para el nodo. Por ejemplo, la expulsión puede ocurrir cuando se ejecutan varias tareas que consumen mucha memoria en un Pod, y su carga combinada hace que el nodo en el que se ejecuta este Pod supere el límite de consumo de memoria.
Si se expulsa un pod de trabajador de Airflow, todas las instancias de tareas que se ejecutan en ese pod se interrumpen y, luego, se marcan como con errores en Airflow.
Los registros están almacenados en búfer. Si se expulsa un pod trabajador antes de que se vacíe el búfer, no se emiten registros. La falla de la tarea sin registros indica que los trabajadores de Airflow se reiniciaron debido a la falta de memoria (OOM). Algunos registros pueden estar presentes en Cloud Logging, aunque los registros de Airflow no se hayan emitido.
Para ver los registros, haz lo siguiente:
En la consola de Google Cloud, ve a la página Entornos.
En la lista de entornos, haz clic en el nombre de tu entorno. Se abrirá la página Detalles del entorno.
Ve a la pestaña Registros.
Consulta los registros de trabajadores individuales en Todos los registros -> Registros de Airflow -> Trabajadores -> (trabajador individual).
La ejecución del DAG tiene límites de memoria. Cada ejecución de la tarea comienza con dos procesos de Airflow: ejecución de la tarea y supervisión. Actualmente, cada nodo puede realizar hasta 6 tareas simultáneas (aproximadamente 12 procesos cargados con módulos de Airflow). Se puede consumir más memoria, según el tamaño del DAG.
Síntoma:
En la consola de Google Cloud, ve a la página Cargas de trabajo.
Si hay pods
airflow-worker
que muestranEvicted
, haz clic en cada pod expulsado y busca el mensajeThe node was low on resource: memory
en la parte superior de la ventana.
Solución:
- En Cloud Composer 1, crea un nuevo entorno de Cloud Composer con un tipo de máquina más grande que el tipo de máquina actual.
- En Cloud Composer 2, aumenta los límites de memoria para los trabajadores de Airflow.
- Revisa los registros de
airflow-worker
Pods para conocer las posibles causas de expulsión. Para obtener más información sobre la recuperación de registros de Pods individuales, consulta Soluciona problemas con las cargas de trabajo implementadas. - Asegúrate de que las tareas del DAG sean idempotentes y que se puedan reintentar.
Evita descargar archivos innecesarios en el sistema de archivos local de los trabajadores de Airflow.
Los trabajadores de Airflow tienen una capacidad limitada del sistema de archivos local. Por ejemplo, en Cloud Composer 2, un trabajador puede tener entre 1 GB y 10 GB de almacenamiento. Cuando se agota el espacio de almacenamiento, el plano de control de GKE expulsa el pod trabajador de Airflow. Esto falla en todas las tareas que el trabajador expulsado estaba ejecutando.
Ejemplos de operaciones problemáticas:
- Descargar objetos o archivos y almacenarlos de forma local en un trabajador de Airflow En su lugar, almacena estos objetos directamente en un servicio adecuado, como un bucket de Cloud Storage.
- Acceso a objetos grandes en la carpeta
/data
desde un trabajador de Airflow. El trabajador de Airflow descarga el objeto en su sistema de archivos local. En su lugar, implementa tus DAG para que los archivos grandes se procesen fuera del Pod trabajador de Airflow.
Tiempo de espera de importación de carga de DAG
Síntoma:
- En la interfaz web de Airflow, en la parte superior de la página de la lista de DAG, un cuadro de alerta rojo muestra
Broken DAG: [/path/to/dagfile] Timeout
. En Cloud Monitoring: Los registros
airflow-scheduler
contienen entradas similares a las siguientes:ERROR - Process timed out
ERROR - Failed to import: /path/to/dagfile
AirflowTaskTimeout: Timeout
Solución:
Anular la opción de configuración dag_file_processor_timeout
de Airflow y permitir más tiempo para el análisis del DAG:
Sección | Clave | Valor |
---|---|---|
core |
dag_file_processor_timeout |
Nuevo valor del tiempo de espera |
La ejecución del DAG no finaliza dentro del tiempo esperado
Síntoma:
A veces, la ejecución de un DAG no finaliza porque las tareas de Airflow se atascan y la ejecución del DAG dura más de lo esperado. En condiciones normales, las tareas de Airflow no permanecen indefinidamente en el estado de ejecución o cola, porque Airflow cuenta con procedimientos de tiempo de espera y limpieza que ayudan a evitar esta situación.
Solución:
Usa el parámetro
dagrun_timeout
para los DAG. Por ejemplo:dagrun_timeout=timedelta(minutes=120)
. Como resultado, cada ejecución de DAG debe finalizar dentro del tiempo de espera de ejecución del DAG y las tareas no finalizadas se deben marcar comoFailed
oUpstream Failed
. Para obtener más información sobre los estados de las tareas de Airflow, consulta la documentación de Apache Airflow.Usa el parámetro tiempo de espera de ejecución de tareas a fin de definir un tiempo de espera predeterminado para las tareas que se ejecutan en función de los operadores de Apache Airflow.
No se ejecutaron las ejecuciones de DAG
Síntoma:
Cuando se establece de forma dinámica una fecha de programación para un DAG, esto puede generar varios efectos secundarios inesperados. Por ejemplo:
Una ejecución de DAG siempre está en el futuro y el DAG nunca se ejecuta.
Las ejecuciones de DAG anteriores se marcan como ejecutadas y tienen éxito a pesar de que no se ejecutaron.
Hay más información disponible en la documentación de Apache Airflow.
Solución:
Sigue las recomendaciones en la documentación de Apache Airflow.
Configura el
start_date
estático para los DAG. Como opción, puedes usarcatchup=False
para inhabilitar la ejecución del DAG en fechas pasadas.Evita usar
datetime.now()
odays_ago(<number of days>)
, a menos que conozcas los efectos secundarios de este enfoque.
Mayor tráfico de red desde y hacia la base de datos de Airflow
La cantidad de tráfico de red entre el clúster de GKE de tu entorno y la base de datos de Airflow depende de la cantidad de DAG, de tareas en DAG, y de cómo los DAG acceden a los datos en la base de datos de Airflow. Los siguientes factores pueden influir en el uso de la red:
Consultas a la base de datos de Airflow Si tus DAG realizan muchas consultas, generan grandes cantidades de tráfico. Por ejemplo, verificar el estado de las tareas antes de continuar con otras tareas, consultar la tabla XCom y volcar el contenido de la base de datos de Airflow.
Gran cantidad de tareas. Cuantas más tareas haya para programar, más tráfico de red se generará. Esta consideración se aplica a la cantidad total de tareas en tus DAG y a la frecuencia de programación. Cuando el programador de Airflow programa las ejecuciones de DAG, realiza consultas a la base de datos de Airflow y genera tráfico.
La interfaz web de Airflow genera tráfico de red, ya que realiza consultas a la base de datos de Airflow. El uso intensivo de páginas con grafos, tareas y diagramas puede generar grandes volúmenes de tráfico de red.
El DAG provoca una falla en el servidor web de Airflow o hace que muestre un error 502 gateway timeout
Las fallas del servidor web pueden ocurrir por varias razones diferentes. Verifica los registros de airflow-webserver en Cloud Logging para determinar la causa del error 502 gateway timeout
.
Procesamiento pesado
Esta sección solo se aplica a Cloud Composer 1.
Evita ejecutar procesamientos pesados durante el tiempo de análisis del DAG.
A diferencia de los nodos trabajadores y programadores, cuyos tipos de máquinas se pueden personalizar para tener una mayor capacidad de CPU y memoria, el servidor web usa un tipo de máquina fijo, lo que puede provocar fallas en el análisis del DAG si el procesamiento del tiempo de análisis es demasiado pesado.
Ten en cuenta que el servidor web tiene 2 CPU virtuales y 2 GB de memoria.
El valor predeterminado para core-dagbag_import_timeout
es 30 segundos. Este valor de tiempo de espera define el límite superior de tiempo que Airflow pasa cargando un módulo de Python en la carpeta dags/
.
Permisos incorrectos
Esta sección solo se aplica a Cloud Composer 1.
El servidor web no se ejecuta con la misma cuenta de servicio que los trabajadores y el programador. Por lo tanto, es posible que los trabajadores y el programador puedan acceder a los recursos administrados por el usuario a los que el servidor web no puede acceder.
Te recomendamos que evites acceder a recursos no públicos durante el análisis del DAG. A veces, esto es inevitable y deberás otorgar permisos a la cuenta de servicio del servidor web. El nombre de la cuenta de servicio se deriva del dominio de tu servidor web. Por ejemplo, si el dominio es example-tp.appspot.com
, la cuenta de servicio es example-tp@appspot.gserviceaccount.com
.
Errores del DAG
Esta sección solo se aplica a Cloud Composer 1.
El servidor web se ejecuta en App Engine y es independiente del clúster de GKE de tu entorno. El servidor web analiza los archivos de definición del DAG, y puede mostrarse un mensaje 502 gateway timeout
si hay errores en el DAG. Airflow funciona normalmente sin un servidor web funcional si el DAG problemático no interrumpe ningún proceso que se ejecute en GKE.
En este caso, puedes usar gcloud composer environments run
para recuperar detalles de tu entorno y como solución alternativa si el servidor web deja de estar disponible.
En otros casos, puedes ejecutar el análisis del DAG en GKE y buscar DAG que arrojen excepciones críticas de Python o para los que se haya agotado el tiempo de espera (30 segundos predeterminados). Para solucionar el problema, conéctate a un shell remoto en un contenedor de trabajador de Airflow y prueba los errores de sintaxis. Para obtener más información, consulta Probar DAG.
Control de una gran cantidad de DAG y complementos en carpetas de DAG y complementos
El contenido de las carpetas /dags
y /plugins
se sincroniza desde el bucket de tu entorno a los sistemas de archivos locales de los trabajadores y programadores de Airflow.
Cuantos más datos se almacenen en estas carpetas, más tiempo se necesitará para realizar la sincronización. Para abordar esas situaciones, haz lo siguiente:
Limita la cantidad de archivos en las carpetas
/dags
y/plugins
. Almacena solo la cantidad mínima de archivos necesarios.Si es posible, aumenta el espacio en disco disponible para los programadores y trabajadores de Airflow.
Si es posible, aumenta la CPU y la memoria de los programadores y trabajadores de Airflow para que la operación de sincronización se realice más rápido.
En caso de tener una gran cantidad de DAG, divide los DAG en lotes, comprímelos en archivos ZIP y, luego, impleméntalos en la carpeta
/dags
. Este enfoque acelera el proceso de sincronización de los DAG. Los componentes de Airflow descomprimen los archivos ZIP antes de procesar los DAG.Generar DAG en una programática también puede ser un método para limitar la cantidad de archivos DAG almacenados en la carpeta
/dags
. Consulta la sección sobre DAG programáticos para evitar problemas con la programación y ejecución de DAG generados de manera programática.
No programes los DAG generados de forma programática al mismo tiempo
Generar objetos DAG de manera programática desde un archivo DAG es un método eficiente para crear muchos DAG similares que solo tengan diferencias pequeñas.
Es importante no programar todos estos DAG para su ejecución de inmediato. Existe una alta probabilidad de que los trabajadores de Airflow no tengan suficientes recursos de CPU y memoria para ejecutar todas las tareas programadas al mismo tiempo.
Para evitar problemas con la programación de DAG programáticos, haz lo siguiente:
- Aumenta la simultaneidad de los trabajadores y escala verticalmente el entorno para que pueda ejecutar más tareas de forma simultánea.
- Genera DAG de una manera que distribuya sus programas de manera uniforme a lo largo del tiempo para evitar programar cientos de tareas al mismo tiempo y que los trabajadores de Airflow tengan tiempo de ejecutar todas las tareas programadas.
Error 504 cuando se accede al servidor web de Airflow
Consulta el error 504 cuando se accede a la IU de Airflow.
Se genera la excepción Lost connection to Postgres / MySQL server during query
durante la ejecución de la tarea o justo después de ella
Las excepciones Lost connection to Postgres / MySQL server during query
suelen ocurrir cuando se cumplen las siguientes condiciones:
- El DAG usa
PythonOperator
o un operador personalizado. - El DAG realiza consultas a la base de datos de Airflow.
Si se realizan varias consultas desde una función que admite llamadas, los objetos tracebacks pueden apuntar de forma incorrecta a la línea self.refresh_from_db(lock_for_update=True)
en el código de Airflow. Es la primera consulta de la base de datos después de la ejecución de la tarea. La causa real de la excepción ocurre antes de esto, cuando una sesión de SQLAlchemy no se cierra de forma correcta.
Las sesiones de SQLAlchemy se limitan a un subproceso y se crean en una sesión de función que admite llamadas que pueden continuar más adelante dentro del código de Airflow. Si hay retrasos significativos entre las consultas dentro de una sesión, es posible que el servidor de MySQL o Postgres ya haya cerrado la conexión. El tiempo de espera de conexión en los entornos de Cloud Composer se establece en alrededor de 10 minutos.
Solución:
- Usa el decorador
airflow.utils.db.provide_session
. Este decorador proporciona una sesión válida a la base de datos de Airflow en el parámetrosession
y cierra la sesión de forma correcta al final de la función. - No uses una sola función de larga duración. En cambio, mueve todas las consultas de base de datos a funciones separadas, de modo que haya varias funciones con el decorador
airflow.utils.db.provide_session
. En este caso, las sesiones se cierran de forma automática después de recuperar los resultados de la consulta.
Control del tiempo de ejecución de DAG, tareas y ejecuciones paralelas del mismo DAG
Si deseas controlar cuánto dura una sola ejecución de DAG para un DAG en particular, puedes usar el parámetro dagrun_timeout
de DAG para hacerlo. Por ejemplo, si esperas que una sola ejecución de DAG (independientemente, ya sea que la ejecución finalice con éxito o con errores) no dure más de 1 hora, configura este parámetro en 3,600 segundos.
También puedes controlar cuánto tiempo permites que dure una sola tarea de Airflow. Para hacerlo, puedes usar execution_timeout
.
Si deseas controlar cuántas ejecuciones activas de DAG deseas tener para un DAG en particular, puedes usar la opción de configuración de Airflow [core]max-active-runs-per-dag
para hacerlo.
Si quieres que se ejecute una sola instancia de un DAG en un momento dado, establece el parámetro max-active-runs-per-dag
en 1
.
Problemas que afectan la sincronización de DAG y complementos con programadores, trabajadores y servidores web
Cloud Composer sincroniza el contenido de las carpetas /dags
y /plugins
con los programadores y trabajadores. Ciertos objetos en las carpetas /dags
y /plugins
pueden impedir que esta sincronización funcione correctamente o, al menos, ralentizarla.
La carpeta
/dags
está sincronizada con programadores y trabajadores. Esta carpeta no se sincroniza con los servidores web en Cloud Composer 2 o si activasDAG Serialization
en Cloud Composer 1.La carpeta
/plugins
está sincronizada con programadores, trabajadores y servidores web.
Es posible que encuentres los siguientes problemas:
Subiste archivos comprimidos en gzip que usan transcodificación de compresión a las carpetas
/dags
y/plugins
. Por lo general, esto sucede si usas el comandogsutil cp -Z
para subir datos al bucket.Solución: Borra el objeto que usó la transcodificación de compresión y vuelve a subirlo al bucket.
Uno de los objetos se llama “.”; este objeto no está sincronizado con programadores y trabajadores, y podría dejar de sincronizarse.
Solución: Cambia el nombre del objeto problemático.
Una carpeta y un archivo DAG de Python tienen los mismos nombres, por ejemplo,
a.py
. En este caso, el archivo DAG no está correctamente sincronizado con los componentes de Airflow.Solución: Quitar la carpeta que tiene el mismo nombre que un archivo DAG de Python.
Uno de los objetos de las carpetas
/dags
o/plugins
contiene un símbolo/
al final del nombre del objeto. Estos objetos pueden confundir el proceso de sincronización porque el símbolo/
significa que un objeto es una carpeta, no un archivo.Solución: Quita el símbolo
/
del nombre del objeto problemático.No almacenes archivos innecesarios en carpetas
/dags
y/plugins
.A veces, los DAG y los complementos que implementas van acompañados de archivos adicionales, como archivos que almacenan pruebas de estos componentes. Estos archivos se sincronizan con trabajadores y programadores, y afectan el tiempo necesario para copiar los archivos en los programadores, trabajadores y servidores web.
Solución: No almacenes ningún archivo adicional innecesario en las carpetas
/dags
y/plugins
.
Los programadores y trabajadores generan Done [Errno 21] Is a directory: '/home/airflow/gcs/dags/...'
error
Este problema ocurre porque los objetos pueden tener un espacio de nombres superpuesto en Cloud Storage, mientras que, al mismo tiempo, los programadores y los trabajadores usan sistemas de archivos tradicionales. Por ejemplo, es posible agregar una carpeta y un objeto con el mismo nombre a un bucket de entorno. Cuando el bucket se sincroniza con los programadores y trabajadores del entorno, se genera este error, lo que puede provocar fallas en las tareas.
Para solucionar este problema, asegúrate de que no haya espacios de nombres superpuestos en el bucket del entorno. Por ejemplo, si tanto /dags/misc
(un archivo) como /dags/misc/example_file.txt
(otro archivo) están en un bucket, el programador genera un error.
Interrupciones transitorias durante la conexión a la base de datos de metadatos de Airflow
Cloud Composer se ejecuta sobre la infraestructura de nube distribuida. Significa que, de vez en cuando, pueden aparecer algunos problemas transitorios que pueden interrumpir la ejecución de tus tareas de Airflow.
En esas situaciones, es posible que veas los siguientes mensajes de error en los registros de los trabajadores de Airflow:
"Can't connect to Postgres / MySQL server on 'airflow-sqlproxy-service.default.svc.cluster.local' (111)"
o
"Can't connect to Postgres / MySQL server on 'airflow-sqlproxy-service.default.svc.cluster.local' (104)"
Estos problemas intermitentes también pueden deberse a las operaciones de mantenimiento realizadas para los entornos de Cloud Composer.
Por lo general, estos errores son intermitentes y, si tus tareas de Airflow son idempotentes y tienes reintentos configurados, no debes ser inmune a ellos. También puedes considerar la definición de períodos de mantenimiento.
Una razón adicional de estos errores podría ser la falta de recursos en el clúster de tu entorno. En esos casos, puedes optimizar o escalar tu entorno como se describe en las instrucciones para escalar entornos o Optimiza tu entorno.
Una ejecución de DAG se marca como exitosa, pero no tiene tareas ejecutadas
Si una ejecución de DAG execution_date
es anterior a la start_date
del DAG, es posible que veas ejecuciones de DAG que no tienen ejecuciones de tareas, pero que siguen marcadas como exitosas.
Causa
Esta situación puede ocurrir en uno de los siguientes casos:
Una discrepancia se debe a la diferencia de zona horaria entre
execution_date
ystart_date
del DAG. Esto puede ocurrir, por ejemplo, cuando se usapendulum.parse(...)
para configurarstart_date
.El
start_date
del DAG está configurado en un valor dinámico, por ejemplo,airflow.utils.dates.days_ago(1)
.
Solución
Asegúrate de que
execution_date
ystart_date
usen la misma zona horaria.Especifica un
start_date
estático y combínalo concatchup=False
para evitar ejecutar DAG con fechas de inicio anteriores.
Un DAG no es visible en la IU de Airflow ni en la IU del DAG, y el programador no lo programa
El procesador de DAG analiza cada DAG antes de que el programador pueda programarlos y antes de que un DAG se vuelva visible en la IU de Airflow o la IU de DAG.
Las siguientes opciones de configuración de Airflow definen los tiempos de espera para analizar los DAG:
[core]dagrun_import_timeout
define cuánto tiempo tiene el procesador de DAG para analizar un solo DAG.[core]dag_file_processor_timeout
define la cantidad total de tiempo que el procesador de DAG puede dedicarle al análisis de todos los DAG.
Si un DAG no es visible en la IU de Airflow o en la IU del DAG, haz lo siguiente:
- Verifica los registros del procesador de DAG si este puede procesar tu DAG de forma correcta. En caso de problemas, es posible que veas las siguientes entradas de registro en los registros del procesador o del programador de DAG:
[2020-12-03 03:06:45,672] {dag_processing.py:1334} ERROR - Processor for
/usr/local/airflow/dags/example_dag.py with PID 21903 started at
2020-12-03T03:05:55.442709+00:00 has timed out, killing it.
- Revisa los registros del programador para ver si funciona correctamente. En caso de problemas, es posible que veas las siguientes entradas de registro en los registros del programador:
DagFileProcessorManager (PID=732) last sent a heartbeat 240.09 seconds ago! Restarting it
Process timed out, PID: 68496
Soluciones:
Se corrigieron todos los errores de análisis del DAG. El procesador de DAG analiza varios DAG y, en casos excepcionales, los errores de análisis de un DAG pueden tener un impacto negativo en el análisis de otros DAG.
Si el análisis de tu DAG tarda más que la cantidad de segundos definidos en
[core]dagrun_import_timeout
, aumenta este tiempo de espera.Si el análisis de todos tus DAG tarda más que la cantidad de segundos definidos en
[core]dag_file_processor_timeout
, aumenta este tiempo de espera.Si tu DAG tarda mucho tiempo en analizarse, también puede significar que no se implementó de manera óptima. Por ejemplo, si lee muchas variables de entorno o realiza llamadas a servicios externos o bases de datos de Airflow. En la medida de lo posible, evita realizar esas operaciones en secciones globales de los DAG.
Aumentar los recursos de CPU y memoria para Scheduler para que pueda funcionar más rápido
Aumenta la cantidad de procesos del procesador de DAG para que el análisis se pueda realizar más rápido. Para hacerlo, aumenta el valor de
[scheduler]parsing_process
.
Síntomas de la base de datos de Airflow con una carga pesada
Para obtener más información, consulta Síntomas de la base de datos de Airflow bajo presión de carga.
¿Qué sigue?
- Soluciona problemas de instalación del paquete de PyPI
- Solución de problemas de actualizaciones y mejoras de entornos