Solucionar problemas de DAG

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

En esta página, se proporcionan información y pasos para solucionar problemas comunes en el flujo de trabajo.

Muchos problemas de ejecución de DAG se deben a un rendimiento del entorno no óptimo. Para optimizar tu entorno, sigue la guía Cómo optimizar el rendimiento y los costos del entorno.

Algunos problemas de ejecución de DAG pueden deberse a que el programador de Airflow no funciona correctamente o de manera óptima. Sigue las instrucciones para solucionar problemas del programador para resolver estos problemas.

Flujo de trabajo para la solución de problemas

Para comenzar con la solución de problemas, siga estos pasos:

  1. Consulta los registros de Airflow.

    Para aumentar el nivel de registro de Airflow, anula la siguiente opción de configuración de Airflow.

    Sección Clave Valor
    logging logging_level El valor predeterminado es INFO. Establece en DEBUG para obtener más verbosidad en los mensajes de registro.
  2. Consulta el panel de supervisión.

  3. Revisa Cloud Monitoring.

  4. En la consola de Google Cloud, busca errores en las páginas de los componentes de tu entorno.

  5. En la interfaz web de Airflow, consulta la Vista de gráfico del DAG para ver las instancias de tareas fallidas.

    Sección Clave Valor
    webserver dag_orientation LR, TB, RL, oBT

Depura fallas del operador

Para depurar una falla del operador, sigue estos pasos:

  1. Verifica si hay errores específicos de la tarea.
  2. Consulta los registros de Airflow.
  3. Revisa Cloud Monitoring.
  4. Verifica los registros específicos del operador.
  5. Corrige los errores.
  6. Sube el DAG a la carpeta /dags.
  7. En la interfaz web de Airflow, borra los estados anteriores del DAG.
  8. Reanuda o ejecuta el DAG.

Soluciona problemas de ejecución de tareas

Airflow es un sistema distribuido con muchas entidades, como el programador, el ejecutor y los trabajadores, que se comunican entre sí a través de una cola de tareas y la base de datos de Airflow, y envían indicadores (como SIGTERM). En el siguiente diagrama, se muestra una descripción general de las interconexiones entre los componentes de Airflow.

Interacción entre componentes de Airflow
Figura 1. Interacción entre componentes de Airflow (haz clic para ampliar)

En un sistema distribuido como Airflow, es posible que haya algunos problemas de conectividad de red, o bien que la infraestructura subyacente experimente problemas intermitentes. Esto puede generar situaciones en las que las tareas pueden fallar y reprogramarse para su ejecución, o bien que las tareas no se completen correctamente (por ejemplo, tareas zombi o tareas que se quedaron atascadas en la ejecución). Airflow tiene mecanismos para abordar estas situaciones y reanudar automáticamente el funcionamiento normal. En las siguientes secciones, se explican los problemas comunes que se producen durante la ejecución de tareas por parte de Airflow: tareas zombi, instancia de finalización y señales SIGTERM.

Soluciona problemas relacionados con las tareas zombi

Airflow detecta dos tipos de discrepancias entre una tarea y un proceso que la ejecuta:

  • Las tareas zombi son tareas que se supone que se están ejecutando, pero no es así. Esto puede ocurrir si se finalizó el proceso de la tarea o no responde, si el trabajador de Airflow no informó el estado de la tarea a tiempo porque está sobrecargado o si se cerró la VM en la que se ejecuta la tarea. Airflow encuentra esas tareas de forma periódica y falla o vuelve a intentarlas, según su configuración.

    Descubre tareas zombi

    resource.type="cloud_composer_environment"
    resource.labels.environment_name="ENVIRONMENT_NAME"
    log_id("airflow-scheduler")
    textPayload:"Detected zombie job"
  • Las tareas inactivas son tareas que no deberían estar en ejecución. Airflow encuentra tales tareas de forma periódica y las finaliza.

En las siguientes secciones, se describen los motivos y las soluciones más comunes de las tareas fantasma.

El trabajador de Airflow se quedó sin memoria

Cada trabajador de Airflow puede ejecutar hasta [celery]worker_concurrency instancias de tareas simultáneamente. Si el consumo de memoria acumulativo de esas instancias de tareas supera el límite de memoria de un trabajador de Airflow, se finaliza un proceso aleatorio para liberar recursos.

Descubre los eventos de memoria insuficiente de los trabajadores de Airflow

resource.type="k8s_node"
resource.labels.cluster_name="GKE_CLUSTER_NAME"
log_id("events")
jsonPayload.message:"Killed process"
jsonPayload.message:("airflow task" OR "celeryd")

A veces, la escasez de memoria en un trabajador de Airflow puede provocar que se envíen paquetes con el formato incorrecto a la base de datos, a un servidor DNS o a cualquier otro servicio al que llame un DAG durante una sesión de SQL Alchemy. En este caso, el otro extremo de la conexión podría rechazar o descartar conexiones del trabajador de Airflow. Por ejemplo:

"UNKNOWN:Error received from peer
{created_time:"2024-11-31T10:09:52.217738071+00:00", grpc_status:14,
grpc_message:"failed to connect to all addresses; last error: UNKNOWN:
ipv4:<ip address>:443: handshaker shutdown"}"

Soluciones:

Se expulsó el trabajador de Airflow

Las expulsiones de Pods son una parte normal de la ejecución de cargas de trabajo en Kubernetes. GKE expulsa pods si se quedaron sin almacenamiento o para liberar recursos para cargas de trabajo con una prioridad más alta.

Descubre las expulsiones de trabajadores de Airflow

resource.type="k8s_pod"
resource.labels.cluster_name="GKE_CLUSTER_NAME"
resource.labels.pod_name:"airflow-worker"
log_id("events")
jsonPayload.reason="Evicted"

Soluciones:

Se cerró el trabajador de Airflow

Es posible que los trabajadores de Airflow se quiten de forma externa. Si las tareas que se están ejecutando no finalizan durante un período de finalización de gracia, se interrumpen y es posible que se detecten como zombis.

Descubre las terminaciones de pods de trabajadores de Airflow

resource.type="k8s_cluster"
resource.labels.cluster_name="GKE_CLUSTER_NAME"
protoPayload.methodName:"pods.delete"
protoPayload.response.metadata.name:"airflow-worker"

Posibles situaciones y soluciones:

  • Los trabajadores de Airflow se reinician durante las modificaciones del entorno, como las actualizaciones o la instalación de paquetes:

    Descubre las modificaciones del entorno de Composer

    resource.type="cloud_composer_environment"
    resource.labels.environment_name="ENVIRONMENT_NAME"
    log_id("cloudaudit.googleapis.com%2Factivity")

    Puedes realizar esas operaciones cuando no se estén ejecutando tareas críticas o habilitar los reintentos de tareas.

  • Es posible que varios componentes no estén disponibles temporalmente durante las operaciones de mantenimiento.

    Descubre las operaciones de mantenimiento de GKE

    resource.type="gke_nodepool"
    resource.labels.cluster_name="GKE_CLUSTER_NAME"
    protoPayload.metadata.operationType="UPGRADE_NODES"

    Puedes especificar períodos de mantenimiento para minimizar

    se superpone con la ejecución de tareas críticas.

  • En las versiones de Cloud Composer 2 anteriores a la 2.4.5, es posible que un trabajador de Airflow que se está cerrando omita la señal SIGTERM y siga ejecutando tareas:

    Descubre cómo reducir la escala verticalmente con el escalamiento automático de Composer

    resource.type="cloud_composer_environment"
    resource.labels.environment_name="ENVIRONMENT_NAME"
    log_id("airflow-worker-set")
    textPayload:"Workers deleted"

    Puedes actualizar a una versión posterior de Cloud Composer en la que se corrigió este problema.

El trabajador de Airflow estaba bajo una carga pesada

La cantidad de recursos de CPU y memoria disponibles para un trabajador de Airflow está limitada por la configuración del entorno. Si el uso de recursos se acerca a los límites, es posible que se produzca una contención de recursos y demoras innecesarias durante la ejecución de la tarea. En situaciones extremas, cuando faltan recursos durante períodos más largos, esto puede causar tareas zombi.

Soluciones:

La base de datos de Airflow estaba bajo una carga pesada

Varios componentes de Airflow usan una base de datos para comunicarse entre sí y, en particular, para almacenar los indicadores de actividad de las instancias de tareas. La escasez de recursos en la base de datos genera tiempos de consulta más largos y puede afectar la ejecución de tareas.

A veces, los siguientes errores están presentes en los registros de un trabajador de Airflow:

(psycopg2.OperationalError) connection to server at <IP address>,
port 3306 failed: server closed the connection unexpectedly

This probably means the server terminated abnormally before or while
processing the request.

Soluciones:

La base de datos de Airflow no estuvo disponible temporalmente

Un trabajador de Airflow puede tardar un tiempo en detectar y controlar de forma fluida los errores intermitentes, como los problemas de conectividad temporales. Es posible que supere el umbral de detección de zombis predeterminado.

Descubre los tiempos de espera de la señal de monitoreo de funcionamiento de Airflow

resource.type="cloud_composer_environment"
resource.labels.environment_name="ENVIRONMENT_NAME"
log_id("airflow-worker")
textPayload:"Heartbeat time limit exceeded"

Soluciones:

  • Aumenta el tiempo de espera de las tareas zombi y anula el valor de la opción de configuración [scheduler]scheduler_zombie_task_threshold de Airflow:

    Sección Clave Valor Notas
    scheduler scheduler_zombie_task_threshold Nuevo tiempo de espera (en segundos) El valor predeterminado es 300.

Soluciona problemas relacionados con la finalización de instancias

Airflow usa el mecanismo de instancia de finalización para cerrar las tareas de Airflow. Este mecanismo se usa en las siguientes situaciones:

  • Cuando un programador finaliza una tarea que no se completó a tiempo.
  • Cuando una tarea se agota o se ejecuta durante demasiado tiempo.

Cuando Airflow finaliza las instancias de tareas, puedes ver las siguientes entradas de registro en los registros de un trabajador de Airflow que ejecutó la tarea:

  INFO - Subtask ... WARNING - State of this instance has been externally set
  to success. Terminating instance.
  INFO - Subtask ... INFO - Sending Signals.SIGTERM to GPID <X>
  INFO - Subtask ... ERROR - Received SIGTERM. Terminating subprocesses.

Soluciones posibles:

  • Verifica el código de la tarea en busca de errores que puedan hacer que se ejecute durante demasiado tiempo.

  • Aumentar la CPU y la memoria de los trabajadores de Airflow para que las tareas se ejecuten más rápido

  • Aumenta el valor de la opción de configuración de Airflow [celery_broker_transport_options]visibility-timeout.

    Como resultado, el programador espera más tiempo a que se complete una tarea antes de considerarla una tarea zombi. Esta opción es útil en especial para tareas que requieren mucho tiempo y duran varias horas. Si el valor es demasiado bajo (por ejemplo, 3 horas), el programador considera que las tareas que se ejecutan durante 5 o 6 horas están "suspendidas" (tareas zombi).

  • Aumenta el valor de la opción de configuración de Airflow [core]killed_task_cleanup_time.

    Un valor más largo les brinda más tiempo a los trabajadores de Airflow para terminar sus tareas con facilidad. Si el valor es demasiado bajo, es posible que las tareas de Airflow se interrumpan abruptamente, sin tiempo suficiente para terminar su trabajo de forma fluida.

Cómo solucionar problemas relacionados con los indicadores SIGTERM

Linux, Kubernetes, el programador de Airflow y Celery usan los indicadores SIGTERM para finalizar los procesos responsables de ejecutar tareas o trabajadores de Airflow.

Puede haber varios motivos por los que se envían indicadores SIGTERM en un entorno:

  • Una tarea se convirtió en una tarea zombi y debe detenerse.

  • El programador descubrió un duplicado de una tarea y envía señales de instancia de finalización y SIGTERM a la tarea para detenerla.

  • En el ajuste de escala automático horizontal de Pods, el plano de control de GKE envía indicadores SIGTERM para quitar los Pods que ya no se necesitan.

  • El programador puede enviar señales SIGTERM al proceso DagFileProcessorManager. El programador usa esos indicadores SIGTERM para administrar el ciclo de vida del proceso de DagFileProcessorManager y se pueden ignorar de forma segura.

    Ejemplo:

    Launched DagFileProcessorManager with pid: 353002
    Sending Signals.SIGTERM to group 353002. PIDs of all processes in the group: []
    Sending the signal Signals.SIGTERM to group 353002
    Sending the signal Signals.SIGTERM to process 353002 as process group is missing.
    
  • Condición de carrera entre la devolución de llamada de la señal de actividad y las devoluciones de llamada de salida en local_task_job, que supervisa la ejecución de la tarea. Si el heartbeat detecta que una tarea se marcó como correcta, no puede distinguir si la tarea en sí se realizó correctamente o si se le indicó a Airflow que la considerara correcta. Sin embargo, finalizará un ejecutor de tareas sin esperar a que se cierre.

    Se pueden ignorar de forma segura esos indicadores SIGTERM. La tarea ya está en el estado de correcto y la ejecución de la ejecución de DAG en su totalidad no se verá afectada.

    La entrada de registro Received SIGTERM. es la única diferencia entre la salida normal y la finalización de la tarea en el estado correcto.

    Condición de carrera entre la señal de monitoreo de funcionamiento y las devoluciones de llamada de salida
    Figura 2. Condición de carrera entre el latido y las devoluciones de llamada de salida (haz clic para ampliar)
  • Un componente de Airflow usa más recursos (CPU, memoria) de los que permite el nodo del clúster.

  • El servicio de GKE realiza operaciones de mantenimiento y envía indicadores SIGTERM a los pods que se ejecutan en un nodo que está a punto de actualizarse.

    Cuando se finaliza una instancia de tarea con SIGTERM, puedes ver las siguientes entradas de registro en los registros de un trabajador de Airflow que ejecutó la tarea:

    {local_task_job.py:211} WARNING - State of this instance has been externally
    set to queued. Terminating instance. {taskinstance.py:1411} ERROR - Received
    SIGTERM. Terminating subprocesses. {taskinstance.py:1703} ERROR - Task failed
    with exception
    

Soluciones posibles:

Este problema ocurre cuando una VM que ejecuta la tarea se queda sin memoria. Esto no se relaciona con las configuraciones de Airflow, sino con la cantidad de memoria disponible para la VM.

  • En Cloud Composer 2, puedes asignar más recursos de CPU y memoria a los trabajadores de Airflow.

  • Puedes disminuir el valor de la opción de configuración de Airflow de simultaneidad [celery]worker_concurrency. Esta opción determina cuántas tareas ejecuta un trabajador de Airflow determinado de forma simultánea.

Para obtener más información sobre cómo optimizar tu entorno, consulta Optimiza el rendimiento y los costos del entorno.

Consultas de Cloud Logging para descubrir los motivos de los reinicios o las expulsiones de Pods

Los entornos de Cloud Composer usan clústeres de GKE como capa de infraestructura de procesamiento. En esta sección, puedes encontrar consultas útiles que pueden ayudarte a encontrar los motivos de los reinicios o las expulsiones del trabajador o el programador de Airflow.

Las consultas que se presentan a continuación se pueden ajustar de la siguiente manera:

  • Puedes especificar el período requerido en Cloud Logging. Por ejemplo, las últimas 6 horas, 3 días o puedes definir un intervalo de tiempo personalizado.

  • Debes especificar el nombre del clúster de tu entorno en CLUSTER_NAME.

  • Puedes limitar la búsqueda a un Pod específico si agregas POD_NAME.

Descubre los contenedores reiniciados

    resource.type="k8s_node"
    log_id("kubelet")
    jsonPayload.MESSAGE:"will be restarted"
    resource.labels.cluster_name="CLUSTER_NAME"
  

Consulta alternativa para limitar los resultados a un pod específico:

    resource.type="k8s_node"
    log_id("kubelet")
    jsonPayload.MESSAGE:"will be restarted"
    resource.labels.cluster_name="CLUSTER_NAME"
    "POD_NAME"
  

Descubre el cierre de contenedores como resultado de un evento de memoria insuficiente

    resource.type="k8s_node"
    log_id("events")
    (jsonPayload.reason:("OOMKilling" OR "SystemOOM")
      OR jsonPayload.message:("OOM encountered" OR "out of memory"))
    severity=WARNING
    resource.labels.cluster_name="CLUSTER_NAME"
    

Consulta alternativa para limitar los resultados a un pod específico:

    resource.type="k8s_node"
    log_id("events")
    (jsonPayload.reason:("OOMKilling" OR "SystemOOM")
      OR jsonPayload.message:("OOM encountered" OR "out of memory"))
    severity=WARNING
    resource.labels.cluster_name="CLUSTER_NAME"
    "POD_NAME"
    

Descubre los contenedores que dejaron de ejecutarse

    resource.type="k8s_node"
    log_id("kubelet")
    jsonPayload.MESSAGE:"ContainerDied"
    severity=DEFAULT
    resource.labels.cluster_name="CLUSTER_NAME"
    

Consulta alternativa para limitar los resultados a un pod específico:

    resource.type="k8s_node"
    log_id("kubelet")
    jsonPayload.MESSAGE:"ContainerDied"
    severity=DEFAULT
    resource.labels.cluster_name="CLUSTER_NAME"
    "POD_NAME"
    

Impacto de las operaciones de actualización en las ejecuciones de tareas de Airflow

Las operaciones de actualización interrumpen las tareas de Airflow que se están ejecutando, a menos que se ejecuten en el modo diferido.

Te recomendamos que realices estas operaciones cuando esperes un impacto mínimo en las ejecuciones de tareas de Airflow y configures los mecanismos de reintento adecuados en tus DAG y tareas.

Problemas comunes

En las siguientes secciones, se describen los síntomas y las posibles soluciones para algunos problemas comunes del DAG.

Negsignal.SIGKILL interrumpió la tarea de Airflow

A veces, tu tarea puede estar usando más memoria de la que se asignó al trabajador de Airflow. En tal situación, Negsignal.SIGKILL podría interrumpirlo. El sistema envía esta señal para evitar un mayor consumo de memoria que podría afectar la ejecución de otras tareas de Airflow. En el registro del trabajador de Airflow, es posible que veas la siguiente entrada de registro:

{local_task_job.py:102} INFO - Task exited with return code Negsignal.SIGKILL

Negsignal.SIGKILL también puede aparecer como código -9.

Soluciones posibles:

  • Disminuye el worker_concurrency de los trabajadores de Airflow.

  • Aumenta la cantidad de memoria disponible para los trabajadores de Airflow.

  • Administra tareas intensivas en recursos en Cloud Composer con KubernetesPodOperator o GKEStartPodOperator para el aislamiento de tareas y la asignación de recursos personalizados.

  • Optimiza tus tareas para usar menos memoria.

La tarea falla sin emitir registros debido a errores de análisis de DAG

A veces, puede haber errores DAG sutiles que generen una situación en la que el programador de Airflow pueda programar tareas para su ejecución, el procesador de DAG pueda analizar el archivo DAG, pero el trabajador de Airflow no pueda ejecutar tareas del DAG porque hay errores de programación en el archivo DAG. Esto podría llevar a una situación en la que una tarea de Airflow se marque como Failed y no haya un registro de su ejecución.

Soluciones:

  • Verifica en los registros del trabajador de Airflow que no haya errores que el trabajador de Airflow haya generado y que estén relacionados con un DAG faltante o errores de análisis de DAG.

  • Aumenta los parámetros relacionados con el análisis de DAG:

  • Consulta también Cómo inspeccionar los registros del procesador de DAG.

La tarea falla sin emitir registros debido a la presión de recursos

Síntoma: Durante la ejecución de una tarea, el subproceso del trabajador de Airflow responsable de la ejecución de la tarea de Airflow se interrumpe abruptamente. El error que se ve en el registro del trabajador de Airflow puede ser similar al siguiente:

...
File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 412, in trace_task    R = retval = fun(*args, **kwargs)  File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 704, in __protected_call__    return self.run(*args, **kwargs)  File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 88, in execute_command    _execute_in_fork(command_to_exec)  File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 99, in _execute_in_fork
raise AirflowException('Celery command failed on host: ' + get_hostname())airflow.exceptions.AirflowException: Celery command failed on host: airflow-worker-9qg9x
...

Solución:

La tarea falla sin emitir registros debido a la expulsión del Pod

Los pods de Google Kubernetes Engine están sujetos al ciclo de vida de los pods de Kubernetes y a la expulsión de pods. Los aumentos repentinos de tareas son la causa más común de expulsión de pods en Cloud Composer.

La expulsión de pods puede ocurrir cuando un pod en particular usa recursos de un nodo, en relación con las expectativas de consumo de recursos configuradas para el nodo. Por ejemplo, la expulsión puede ocurrir cuando varias tareas con alto contenido de memoria se ejecutan en un pod y su carga combinada hace que el nodo en el que se ejecuta este pod supere el límite de consumo de memoria.

Si se expulsa un pod de trabajador de Airflow, todas las instancias de tareas que se ejecutan en ese pod se interrumpen y, luego, se marcan como con errores en Airflow.

Los registros están almacenados en búfer. Si se expulsa un pod trabajador antes de que se vacíe el búfer, no se emiten registros. La falla de la tarea sin registros indica que los trabajadores de Airflow se reiniciaron debido a la falta de memoria (OOM). Algunos registros pueden estar presentes en Cloud Logging, aunque los registros de Airflow no se hayan emitido.

Para ver los registros, haz lo siguiente:

  1. En la consola de Google Cloud, ve a la página Entornos.

    Ir a Entornos

  2. En la lista de entornos, haz clic en el nombre de tu entorno. Se abrirá la página Detalles del entorno.

  3. Ve a la pestaña Registros.

  4. Consulta los registros de los trabajadores individuales de Airflow en Todos los registros > Registros de Airflow > Trabajadores.

Síntoma:

  1. En la consola de Google Cloud, ve a la página Cargas de trabajo.

    Ir a Cargas de trabajo

  2. Si hay pods airflow-worker que muestran Evicted, haz clic en cada pod expulsado y busca el mensaje The node was low on resource: memory en la parte superior de la ventana.

Solución:

  • Aumenta los límites de memoria para los trabajadores de Airflow.

  • Verifica los registros de los pods de airflow-worker para encontrar posibles causas de expulsión. Para obtener más información sobre cómo recuperar registros de Pods individuales, consulta Soluciona problemas con cargas de trabajo implementadas.

  • Asegúrate de que las tareas del DAG sean idempotentes y se puedan reintentar.

  • Evita descargar archivos innecesarios en el sistema de archivos local de los trabajadores de Airflow.

    Los trabajadores de Airflow tienen una capacidad limitada del sistema de archivos local. Un trabajador de Airflow puede tener entre 1 GB y 10 GB de almacenamiento. Cuando se acaba el espacio de almacenamiento, el plano de control de GKE expulsa el Pod de trabajador de Airflow. Esto falla todas las tareas que ejecutaba el trabajador expulsado.

    Ejemplos de operaciones problemáticas:

    • Descargar archivos o objetos y almacenarlos de forma local en un trabajador de Airflow En su lugar, almacena estos objetos directamente en un servicio adecuado, como un bucket de Cloud Storage.
    • Acceso a objetos grandes en la carpeta /data desde un trabajador de Airflow El trabajador de Airflow descarga el objeto en su sistema de archivos local. En su lugar, implementa tus DAGs para que los archivos grandes se procesen fuera del pod de trabajador de Airflow.

Tiempo de espera de importación de carga de DAG

Síntoma:

  • En la interfaz web de Airflow, en la parte superior de la página de la lista de DAGs, aparece un cuadro de alerta rojo que muestra Broken DAG: [/path/to/dagfile] Timeout.
  • En Cloud Monitoring: Los registros de airflow-scheduler contienen entradas similares a las siguientes:

    • ERROR - Process timed out
    • ERROR - Failed to import: /path/to/dagfile
    • AirflowTaskTimeout: Timeout

Solución:

Anula la opción de configuración de Airflow dag_file_processor_timeout y permite más tiempo para el análisis del DAG:

Sección Clave Valor
core dag_file_processor_timeout Nuevo valor del tiempo de espera

La ejecución de DAG no finaliza dentro del tiempo esperado

Síntoma:

A veces, una ejecución de DAG no finaliza porque las tareas de Airflow se bloquean y la ejecución de DAG dura más de lo esperado. En condiciones normales, las tareas de Airflow no permanecen de forma indefinida en el estado en cola o en ejecución, ya que Airflow tiene procedimientos de tiempo de espera y limpieza que ayudan a evitar esta situación.

Solución:

  • Usa el parámetro dagrun_timeout para los DAG. Por ejemplo: dagrun_timeout=timedelta(minutes=120). Como resultado, cada ejecución de DAG debe finalizar dentro del tiempo de espera de la ejecución de DAG. Las tareas que no están terminadas se marcan como Failed o Upstream Failed. Para obtener más información sobre los estados de las tareas de Airflow, consulta la documentación de Apache Airflow.

  • Usa el parámetro tiempo de espera de ejecución de tareas para definir un tiempo de espera predeterminado para las tareas que se ejecutan en función de los operadores de Apache Airflow.

No se ejecutan las ejecuciones de DAG

Síntoma:

Cuando se establece de forma dinámica una fecha de programación para un DAG, esto puede generar varios efectos secundarios inesperados. Por ejemplo:

  • Una ejecución de DAG siempre está en el futuro y el DAG nunca se ejecuta.

  • Las ejecuciones de DAG anteriores se marcan como ejecutadas y correctas a pesar de que no se ejecutaron.

Hay más información disponible en la documentación de Apache Airflow.

Soluciones posibles:

  • Sigue las recomendaciones de la documentación de Apache Airflow.

  • Establece start_date estático para los DAG. Como opción, puedes usar catchup=False para inhabilitar la ejecución del DAG para fechas anteriores.

  • Evita usar datetime.now() o days_ago(<number of days>), a menos que conozcas los efectos secundarios de este enfoque.

Mayor tráfico de red desde y hacia la base de datos de Airflow

La cantidad de tráfico de red entre el clúster de GKE de tu entorno y la base de datos de Airflow depende de la cantidad de DAG, de tareas en DAG, y de cómo los DAG acceden a los datos en la base de datos de Airflow. Los siguientes factores pueden influir en el uso de la red:

  • Consultas a la base de datos de Airflow Si tus DAG realizan muchas consultas, generan grandes cantidades de tráfico. Por ejemplo, verificar el estado de las tareas antes de continuar con otras tareas, consultar la tabla XCom y volcar el contenido de la base de datos de Airflow.

  • Gran cantidad de tareas. Cuantas más tareas haya para programar, más tráfico de red se generará. Esta consideración se aplica a la cantidad total de tareas en tus DAG y a la frecuencia de programación. Cuando el programador de Airflow programa las ejecuciones de DAG, realiza consultas a la base de datos de Airflow y genera tráfico.

  • La interfaz web de Airflow genera tráfico de red, ya que realiza consultas a la base de datos de Airflow. El uso intensivo de páginas con grafos, tareas y diagramas puede generar grandes volúmenes de tráfico de red.

El DAG bloquea el servidor web de Airflow o hace que muestre un error de tiempo de espera de la puerta de enlace 502.

Las fallas del servidor web pueden ocurrir por varias razones diferentes. Verifica los registros de airflow-webserver en Cloud Logging para determinar la causa del error 502 gateway timeout.

Cómo controlar una gran cantidad de DAG y complementos en las carpetas de DAG y complementos

El contenido de las carpetas /dags y /plugins se sincroniza desde el bucket de tu entorno a los sistemas de archivos locales de los trabajadores y programadores de Airflow.

Cuantos más datos se almacenen en estas carpetas, más tiempo tardará la sincronización. Para abordar estas situaciones, haz lo siguiente:

  • Limita la cantidad de archivos en las carpetas /dags y /plugins. Almacena solo la cantidad mínima de archivos necesarios.

  • Aumenta el espacio en disco disponible para los programadores y trabajadores de Airflow.

  • Aumenta la CPU y la memoria de los programadores y trabajadores de Airflow para que la operación de sincronización se realice más rápido.

  • En el caso de una gran cantidad de DAG, divídelos en lotes, comprimilos en archivos ZIP y, luego, implementa estos archivos en la carpeta /dags. Este enfoque acelera el proceso de sincronización de DAG. Los componentes de Airflow descomprimen los archivos ZIP antes de procesar los DAG.

  • Generar DAG de forma programática también puede ser un método para limitar la cantidad de archivos DAG almacenados en la carpeta /dags. Consulta la sección sobre DAG programáticos para evitar problemas con la programación y ejecución de DAG generados de manera programática.

No programes DAG generados de forma programática al mismo tiempo.

Generar objetos DAG de forma programática a partir de un archivo DAG es un método eficiente para crear muchos DAG similares que solo tienen pequeñas diferencias.

Es importante no programar todos esos DAG para su ejecución de inmediato. Es probable que los trabajadores de Airflow no tengan suficientes recursos de CPU y memoria para ejecutar todas las tareas programadas al mismo tiempo.

Para evitar problemas con la programación de DAGs programáticos, haz lo siguiente:

  • Aumenta la simultaneidad de los trabajadores y escala tu entorno para que pueda ejecutar más tareas de forma simultánea.
  • Genera DAG de manera tal que se distribuyan sus programas de manera uniforme a lo largo del tiempo para evitar programar cientos de tareas al mismo tiempo, de modo que los trabajadores de Airflow tengan tiempo para ejecutar todas las tareas programadas.

Error 504 cuando se accede al servidor web de Airflow

Consulta Error 504 cuando se accede a la IU de Airflow.

Se pierde la conexión con el servidor de Postgres durante la excepción de la consulta durante la ejecución de la tarea o justo después de esta

Las excepciones de Lost connection to Postgres server during query suelen ocurrir cuando se cumplen las siguientes condiciones:

  • El DAG usa PythonOperator o un operador personalizado.
  • El DAG realiza consultas a la base de datos de Airflow.

Si se realizan varias consultas desde una función que admite llamadas, los objetos tracebacks pueden apuntar de forma incorrecta a la línea self.refresh_from_db(lock_for_update=True) en el código de Airflow. Es la primera consulta de la base de datos después de la ejecución de la tarea. La causa real de la excepción ocurre antes de esto, cuando una sesión de SQLAlchemy no se cierra de forma correcta.

Las sesiones de SQLAlchemy se limitan a un subproceso y se crean en una sesión de función que admite llamadas que pueden continuar más adelante dentro del código de Airflow. Si hay retrasos significativos entre las consultas dentro de una sesión, es posible que el servidor de Postgres ya cierre la conexión. El tiempo de espera de conexión en los entornos de Cloud Composer se establece en alrededor de 10 minutos.

Solución:

  • Usa el decorador airflow.utils.db.provide_session. Este decorador proporciona una sesión válida a la base de datos de Airflow en el parámetro session y cierra la sesión de forma correcta al final de la función.
  • No uses una sola función de larga duración. En cambio, mueve todas las consultas de base de datos a funciones separadas, de modo que haya varias funciones con el decorador airflow.utils.db.provide_session. En este caso, las sesiones se cierran de forma automática después de recuperar los resultados de la consulta.

Controla el tiempo de ejecución de los DAG, las tareas y las ejecuciones en paralelo del mismo DAG.

Si deseas controlar la duración de una sola ejecución de DAG para un DAG en particular, puedes usar el parámetro DAG dagrun_timeout para hacerlo. Por ejemplo, si esperas que una sola ejecución de DAG (independientemente de si la ejecución finaliza con éxito o falla) no debe durar más de 1 hora, establece este parámetro en 3600 segundos.

También puedes controlar durante cuánto tiempo permites que dure una sola tarea de Airflow. Para hacerlo, puedes usar execution_timeout.

Si deseas controlar cuántas ejecuciones activas de DAG deseas tener para un DAG en particular, puedes usar la opción de configuración de Airflow [core]max-active-runs-per-dag para hacerlo.

Si deseas tener solo una instancia de un DAG que se ejecute en un momento determinado, establece el parámetro max-active-runs-per-dag en 1.

Problemas que afectan la sincronización de DAG y complementos con programadores, trabajadores y servidores web

Cloud Composer sincroniza el contenido de las carpetas /dags y /plugins con los programadores y los trabajadores. Es posible que ciertos objetos en las carpetas /dags y /plugins impidan que esta sincronización funcione correctamente o la ralenticen.

  • La carpeta /dags se sincroniza con los programadores y los trabajadores.

    Esta carpeta no está sincronizada con el servidor web.

  • La carpeta /plugins se sincroniza con los programadores, los trabajadores y los servidores web.

Es posible que encuentres los siguientes problemas:

  • Subiste archivos comprimidos en gzip que usan transcodificación de compresión a las carpetas /dags y /plugins. Por lo general, ocurre si usas la marca --gzip-local-all en un comando gcloud storage cp para subir datos al bucket.

    Solución: Borra el objeto que usó la transcodificación de compresión y vuelve a subirlo al bucket.

  • Uno de los objetos se llama "."; este tipo de objeto no está sincronizado con los programadores ni los trabajadores, y es posible que deje de sincronizarse por completo.

    Solución: Cambia el nombre del objeto.

  • Una carpeta y un archivo de Python de DAG tienen los mismos nombres, por ejemplo, a.py. En este caso, el archivo DAG no se sincroniza correctamente con los componentes de Airflow.

    Solución: Quita la carpeta que tiene el mismo nombre que el archivo DAG de Python.

  • Uno de los objetos de las carpetas /dags o /plugins contiene un símbolo / al final del nombre del objeto. Estos objetos pueden interferir en el proceso de sincronización, ya que el símbolo / significa que un objeto es una carpeta, no un archivo.

    Solución: Quita el símbolo / del nombre del objeto problemático.

  • No almacenes archivos innecesarios en las carpetas /dags y /plugins.

    A veces, los DAG y los complementos que implementas vienen con archivos adicionales, como archivos que almacenan pruebas para estos componentes. Estos archivos se sincronizan con los trabajadores y programadores, y afectan el tiempo necesario para copiarlos en los programadores, trabajadores y servidores web.

    Solución: No almacenes archivos adicionales ni innecesarios en las carpetas /dags y /plugins.

Los programadores y los trabajadores generan el error "Done [Errno 21] Is a directory: '/home/airflow/gcs/dags/...'".

Este problema ocurre porque los objetos pueden tener un espacio de nombres superpuesto en Cloud Storage y, al mismo tiempo, los programadores y los trabajadores usan sistemas de archivos tradicionales. Por ejemplo, es posible agregar una carpeta y un objeto con el mismo nombre al bucket de un entorno. Cuando el bucket se sincroniza con los programadores y trabajadores del entorno, se genera este error, lo que puede provocar fallas en las tareas.

Para solucionar este problema, asegúrate de que no haya espacios de nombres superpuestos en el bucket del entorno. Por ejemplo, si /dags/misc (un archivo) y /dags/misc/example_file.txt (otro archivo) están en un bucket, el programador genera un error.

Interrupciones transitorias cuando se conecta a la base de datos de metadatos de Airflow

Cloud Composer se ejecuta en una infraestructura distribuida. Esto significa que, de vez en cuando, pueden aparecer algunos problemas transitorios que podrían interrumpir la ejecución de tus tareas de Airflow.

En esas situaciones, es posible que veas los siguientes mensajes de error en los registros de los trabajadores de Airflow:

"Can't connect to Postgres server on 'airflow-sqlproxy-service.default.svc.cluster.local' (111)"

o

"Can't connect to Postgres server on 'airflow-sqlproxy-service.default.svc.cluster.local' (104)"

Estos problemas intermitentes también pueden deberse a operaciones de mantenimiento que se realizan en tus entornos de Cloud Composer.

Por lo general, estos errores son intermitentes y, si tus tareas de Airflow son idempotentes y tienes configurados reintentos, no te afectan. También puedes definir períodos de mantenimiento.

Un motivo adicional para estos errores podría ser la falta de recursos en el clúster de tu ambiente. En esos casos, puedes escalar o optimizar tu entorno como se describe en las instrucciones para agregar escalamiento a los entornos o optimizar tu entorno.

Una ejecución de DAG se marca como correcta, pero no tiene tareas ejecutadas

Si una execution_date de ejecución de DAG es anterior a la start_date del DAG, es posible que veas ejecuciones de DAG que no tienen ninguna ejecución de tarea, pero que aún se marcan como correctas.

Ejecución correcta de un DAG sin tareas ejecutadas
Figura 3. Ejecución correcta de un DAG sin tareas ejecutadas (haz clic para ampliar)

Causa

Esta situación puede ocurrir en uno de los siguientes casos:

  • La diferencia de zona horaria entre execution_date y start_date del DAG provoca una discrepancia. Esto puede suceder, por ejemplo, cuando se usa pendulum.parse(...) para configurar start_date.

  • El start_date del DAG se establece en un valor dinámico, por ejemplo, airflow.utils.dates.days_ago(1).

Solución

  • Asegúrate de que execution_date y start_date usen la misma zona horaria.

  • Especifica un start_date estático y combínalo con catchup=False para evitar ejecutar DAG con fechas de inicio anteriores.

Un DAG no es visible en la IU de Airflow ni en la IU de DAG, y el programador no lo programa.

El encargado del tratamiento de datos analiza cada DAG antes de que el programador pueda programarlo y antes de que un DAG sea visible en la IU de Airflow o la IU de DAG.

Las siguientes opciones de configuración de Airflow definen los tiempos de espera para el análisis de DAG:

Si un DAG no se ve en la IU de Airflow o en la IU de DAG, haz lo siguiente:

  • Revisa los registros del procesador de DAG para ver si puede procesar correctamente tu DAG. En caso de problemas, es posible que veas las siguientes entradas de registro en los registros del procesador o programador de DAG:

    [2020-12-03 03:06:45,672] {dag_processing.py:1334} ERROR - Processor for
    /usr/local/airflow/dags/example_dag.py with PID 21903 started at
    2020-12-03T03:05:55.442709+00:00 has timed out, killing it.
    
  • Verifica los registros del programador para ver si funciona correctamente. En caso de problemas, es posible que veas las siguientes entradas de registro en los registros del programador:

    DagFileProcessorManager (PID=732) last sent a heartbeat 240.09 seconds ago! Restarting it
    Process timed out, PID: 68496
    

Soluciones:

  • Corrige todos los errores de análisis de DAG. El procesador de DAG analiza varios DAG, y en casos excepcionales, los errores de análisis de un DAG pueden afectar negativamente el análisis de otros DAG.

  • Si el análisis de tu DAG tarda más que la cantidad de segundos definida en [core]dagrun_import_timeout, aumenta este tiempo de espera.

  • Si el análisis de todos tus DAG demora más que la cantidad de segundos definida en [core]dag_file_processor_timeout, aumenta este tiempo de espera.

  • Si tu DAG tarda mucho tiempo en analizarse, también puede significar que no se implementa de manera óptima. Por ejemplo, si lee muchas variables de entorno o realiza llamadas a servicios externos o a la base de datos de Airflow. En la medida de lo posible, evita realizar esas operaciones en las secciones globales de los DAG.

  • Aumenta los recursos de CPU y memoria del programador para que funcione más rápido.

  • Ajusta la cantidad de programadores.

  • Aumenta la cantidad de procesos del procesador de DAG para que el análisis se realice más rápido. Para ello, aumenta el valor de [scheduler]parsing_process.

  • Baja la frecuencia del análisis de DAG.

  • Disminuye la carga en la base de datos de Airflow.

Síntomas de que la base de datos de Airflow está bajo una carga pesada

Para obtener más información, consulta Síntomas de que la base de datos de Airflow está bajo presión de carga.

¿Qué sigue?