Solucionar problemas de DAGs

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

En esta página se ofrecen pasos para solucionar problemas habituales de los flujos de trabajo e información sobre ellos.

Algunos problemas de ejecución de DAG pueden deberse a que el programador de Airflow no funciona correctamente o de forma óptima. Sigue las instrucciones para solucionar problemas del programador para resolverlos.

Solucionar problemas de flujo de trabajo

Para empezar a solucionar el problema, haz lo siguiente:

  1. Consulta los registros de Airflow.

    Puedes aumentar el nivel de registro de Airflow anulando la siguiente opción de configuración de Airflow.

    Sección Clave Valor
    logging (core en Airflow 1) logging_level El valor predeterminado es INFO. Define el valor DEBUG para obtener más detalles en los mensajes de registro.
  2. Consulta el panel de control de monitorización.

  3. Consulta Cloud Monitoring.

  4. En la Google Cloud consola, comprueba si hay errores en las páginas de los componentes de tu entorno.

  5. En la interfaz web de Airflow, consulta la vista de gráfico del DAG para ver las instancias de tareas fallidas.

    Sección Clave Valor
    webserver dag_orientation LR, TB, RL o BT

Depurar errores del operador

Para depurar un error de operador, sigue estos pasos:

  1. Comprueba si hay errores específicos de la tarea.
  2. Consulta los registros de Airflow.
  3. Consulta Cloud Monitoring.
  4. Consulta los registros específicos del operador.
  5. Corrige los errores.
  6. Sube el DAG a la carpeta /dags.
  7. En la interfaz web de Airflow, borra los estados anteriores del DAG.
  8. Reanuda o ejecuta el DAG.

Solucionar problemas de ejecución de tareas

Airflow es un sistema distribuido con muchas entidades, como el programador, el ejecutor y los trabajadores, que se comunican entre sí a través de una cola de tareas y la base de datos de Airflow, y envían señales (como SIGTERM). En el siguiente diagrama se muestra un resumen de las interconexiones entre los componentes de Airflow.

Interacción entre los componentes de Airflow
Figura 1. Interacción entre los componentes de Airflow (haz clic para ampliar)

En un sistema distribuido como Airflow, puede haber problemas de conectividad de red o la infraestructura subyacente puede experimentar problemas intermitentes. Esto puede provocar que las tareas fallen y se reprogramen para su ejecución, o que no se completen correctamente (por ejemplo, tareas zombi o tareas que se han quedado bloqueadas durante la ejecución). Airflow tiene mecanismos para hacer frente a estas situaciones y reanudar automáticamente el funcionamiento normal. En las siguientes secciones se explican los problemas habituales que se producen durante la ejecución de tareas de Airflow.

Las tareas fallan sin emitir ningún registro

La tarea falla sin emitir registros debido a errores de análisis de DAG

A veces, puede haber errores sutiles en el DAG que provoquen que el programador de Airflow pueda programar tareas para su ejecución y que el procesador de DAG pueda analizar el archivo de DAG, pero que el trabajador de Airflow no pueda ejecutar las tareas del DAG porque hay errores de programación en el archivo de DAG. Esto puede provocar que una tarea de Airflow se marque como Failed y no haya ningún registro de su ejecución.

Soluciones:

  • Verifica en los registros de los trabajadores de Airflow que no haya errores relacionados con la falta de un DAG o con errores de análisis de DAG.

  • Aumenta los parámetros relacionados con el análisis de DAG:

    • Aumenta [dagbag-import-timeout][ext-airflow-dagrun-import-timeout] a al menos 120 segundos (o más, si es necesario).

    • Aumenta el valor de dag-file-processor-timeout a 180 segundos como mínimo (o más, si es necesario). Este valor debe ser superior a dagbag-import-timeout.

  • Consulta también Solucionar problemas del procesador DAG.

Las tareas se interrumpen de forma brusca

Durante la ejecución de las tareas, los workers de Airflow pueden finalizar de forma abrupta debido a problemas que no están relacionados específicamente con la tarea en sí. Consulta las causas raíz habituales para ver una lista de estos casos y las posibles soluciones. En las siguientes secciones se describen algunos síntomas adicionales que podrían derivarse de esas causas principales:

Tareas zombi

Airflow detecta dos tipos de discrepancias entre una tarea y un proceso que ejecuta la tarea:

  • Las tareas fallidas son tareas que deberían estar en ejecución, pero no lo están. Esto puede ocurrir si el proceso de la tarea se ha terminado o no responde, si el trabajador de Airflow no ha informado del estado de la tarea a tiempo porque está sobrecargado o si se ha apagado la VM en la que se ejecuta la tarea. Airflow busca estas tareas periódicamente y las rechaza o las vuelve a intentar, según la configuración de la tarea.

    Descubrir tareas zombi

    resource.type="cloud_composer_environment"
    resource.labels.environment_name="ENVIRONMENT_NAME"
    log_id("airflow-scheduler")
    textPayload:"Detected zombie job"
  • Las tareas zombi son tareas que no deberían estar en ejecución. Airflow busca estas tareas periódicamente y las finaliza.

Consulta las causas habituales para obtener más información sobre cómo solucionar problemas con tareas zombi.

Señales SIGTERM

Linux, Kubernetes, el programador de Airflow y Celery usan señales SIGTERM para finalizar los procesos responsables de ejecutar los trabajadores o las tareas de Airflow.

Puede haber varios motivos por los que se envían señales SIGTERM en un entorno:

  • Una tarea se ha convertido en una tarea fallida y debe detenerse.

  • El programador ha detectado un duplicado de una tarea y envía señales de instancia de finalización y SIGTERM a la tarea para detenerla.

  • En el autoescalado horizontal de pods, el plano de control de GKE envía señales SIGTERM para eliminar los pods que ya no son necesarios.

  • El programador puede enviar señales SIGTERM al proceso DagFileProcessorManager. El programador usa estas señales SIGTERM para gestionar el ciclo de vida del proceso DagFileProcessorManager y se pueden ignorar sin problemas.

    Ejemplo:

    Launched DagFileProcessorManager with pid: 353002
    Sending Signals.SIGTERM to group 353002. PIDs of all processes in the group: []
    Sending the signal Signals.SIGTERM to group 353002
    Sending the signal Signals.SIGTERM to process 353002 as process group is missing.
    
  • Condición de carrera entre la retrollamada de latido y las retrollamadas de salida en local_task_job, que monitoriza la ejecución de la tarea. Si el latido detecta que una tarea se ha marcado como correcta, no puede distinguir si la tarea se ha completado correctamente o si se le ha indicado a Airflow que la considere correcta. Sin embargo, finalizará un ejecutor de tareas sin esperar a que se cierre.

    Estas señales SIGTERM se pueden ignorar sin problemas. La tarea ya está en estado correcto y la ejecución del DAG no se verá afectada.

    La entrada de registro Received SIGTERM. es la única diferencia entre la salida normal y la finalización de la tarea en el estado correcto.

    Condición de carrera entre las devoluciones de llamada de latido y de salida
    Imagen 2. Condición de carrera entre las devoluciones de llamada de la señal de latido y de salida (haz clic para ampliar)
  • Un componente de Airflow usa más recursos (CPU, memoria) de los permitidos por el nodo del clúster.

  • El servicio de GKE realiza operaciones de mantenimiento y envía señales SIGTERM a los pods que se ejecutan en un nodo que está a punto de actualizarse.

    Cuando se termina una instancia de tarea con SIGTERM, puedes ver las siguientes entradas de registro en los registros de un trabajador de Airflow que ha ejecutado la tarea:

    {local_task_job.py:211} WARNING - State of this instance has been externally
    set to queued. Terminating instance. {taskinstance.py:1411} ERROR - Received
    SIGTERM. Terminating subprocesses. {taskinstance.py:1703} ERROR - Task failed
    with exception
    

Posibles soluciones:

Este problema se produce cuando una máquina virtual que ejecuta la tarea se queda sin memoria. Esto no está relacionado con las configuraciones de Airflow, sino con la cantidad de memoria disponible para la VM.

  • En Cloud Composer 1, puedes volver a crear tu entorno con un tipo de máquina que tenga un rendimiento mayor.

  • Puedes reducir el valor de la opción de configuración de [celery]worker_concurrencyconcurrencia de Airflow. Esta opción determina cuántas tareas ejecuta simultáneamente un trabajador de Airflow.

Negsignal.SIGKILL ha interrumpido la tarea de Airflow

A veces, es posible que tu tarea use más memoria de la que se ha asignado al trabajador de Airflow. En ese caso, podría interrumpirse a las Negsignal.SIGKILL. El sistema envía esta señal para evitar un mayor consumo de memoria que podría afectar a la ejecución de otras tareas de Airflow. En el registro del trabajador de Airflow, puede que veas la siguiente entrada:

{local_task_job.py:102} INFO - Task exited with return code Negsignal.SIGKILL

Negsignal.SIGKILL también puede aparecer como código -9.

Posibles soluciones:

  • Menor worker_concurrency de trabajadores de Airflow.

  • Cambia a un tipo de máquina más grande que se use en el clúster de Cloud Composer.

  • Optimiza tus tareas para que usen menos memoria.

La tarea falla debido a la presión de los recursos

Síntoma: durante la ejecución de una tarea, el subproceso del trabajador de Airflow responsable de la ejecución de la tarea de Airflow se interrumpe de forma abrupta. El error que se muestra en el registro del trabajador de Airflow puede ser similar al siguiente:

...
File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 412, in trace_task    R = retval = fun(*args, **kwargs)  File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 704, in __protected_call__    return self.run(*args, **kwargs)  File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 88, in execute_command    _execute_in_fork(command_to_exec)  File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 99, in _execute_in_fork
raise AirflowException('Celery command failed on host: ' + get_hostname())airflow.exceptions.AirflowException: Celery command failed on host: airflow-worker-9qg9x
...

Solución:

La tarea falla debido a la expulsión de un pod

Los pods de Google Kubernetes Engine están sujetos al ciclo de vida de los pods de Kubernetes y al desalojo de pods. Los picos de tareas y la coscheduling de trabajadores son dos de las causas más habituales de la expulsión de pods en Cloud Composer.

La expulsión de pods puede producirse cuando un pod concreto usa en exceso los recursos de un nodo en relación con las expectativas de consumo de recursos configuradas para el nodo. Por ejemplo, el desalojo puede producirse cuando se ejecutan varias tareas que consumen mucha memoria en un pod y su carga combinada provoca que el nodo en el que se ejecuta este pod supere el límite de consumo de memoria.

Si se expulsa un pod de trabajador de Airflow, se interrumpirán todas las instancias de tareas que se estén ejecutando en ese pod y, más adelante, Airflow las marcará como fallidas.

Los registros se almacenan en búfer. Si se expulsa un pod de trabajador antes de que se vacíe el búfer, no se emitirán registros. Si una tarea falla y no hay registros, significa que los trabajadores de Airflow se han reiniciado debido a un error de falta de memoria (OOM). Es posible que algunos registros estén presentes en Cloud Logging aunque no se hayan emitido los registros de Airflow.

Para ver los registros, sigue estos pasos:

  1. En la Google Cloud consola, ve a la página Entornos.

    Ir a Entornos

  2. En la lista de entornos, haz clic en el nombre del entorno. Se abrirá la página Detalles del entorno.

  3. Ve a la pestaña Registros.

  4. Para ver los registros de los distintos workers de Airflow, vaya a Todos los registros > Registros de Airflow > Workers.

Síntoma:

  1. En la Google Cloud consola, ve a la página Cargas de trabajo.

    Ve a Cargas de trabajo.

  2. Si hay airflow-worker pods que muestran Evicted, haz clic en cada pod expulsado y busca el mensaje The node was low on resource: memory en la parte superior de la ventana.

Solución:

  • Crea un entorno de Cloud Composer 1 con un tipo de máquina más grande que el actual.

  • Consulta los registros de los pods de airflow-worker para ver los posibles motivos de desalojo. Para obtener más información sobre cómo obtener registros de pods concretos, consulta el artículo Solucionar problemas con cargas de trabajo implementadas.

  • Asegúrate de que las tareas del DAG sean idempotentes y se puedan volver a intentar.

  • Evita descargar archivos innecesarios en el sistema de archivos local de los trabajadores de Airflow.

    Los workers de Airflow tienen una capacidad limitada del sistema de archivos local. Cuando se agota el espacio de almacenamiento, el plano de control de GKE expulsa el pod de trabajador de Airflow. Se produce un error en todas las tareas que estaba ejecutando el trabajador expulsado.

    Ejemplos de operaciones problemáticas:

    • Descargar archivos u objetos y almacenarlos localmente en un trabajador de Airflow. En su lugar, almacena estos objetos directamente en un servicio adecuado, como un segmento de Cloud Storage.
    • Acceder a objetos grandes en la carpeta /data desde un trabajador de Airflow. El trabajador de Airflow descarga el objeto en su sistema de archivos local. En su lugar, implementa tus DAGs de forma que los archivos de gran tamaño se procesen fuera del pod de trabajador de Airflow.

Causas habituales

El trabajador de Airflow se ha quedado sin memoria

Cada trabajador de Airflow puede ejecutar hasta [celery]worker_concurrency instancias de tareas simultáneamente. Si el consumo acumulativo de memoria de esas instancias de tareas supera el límite de memoria de un trabajador de Airflow, se termina un proceso aleatorio para liberar recursos.

Descubrir eventos de falta de memoria de los trabajadores de Airflow

resource.type="k8s_node"
resource.labels.cluster_name="GKE_CLUSTER_NAME"
log_id("events")
jsonPayload.message:"Killed process"
jsonPayload.message:("airflow task" OR "celeryd")

A veces, la falta de memoria en un trabajador de Airflow puede provocar que se envíen paquetes mal formados durante una sesión de SQL Alchemy a la base de datos, a un servidor DNS o a cualquier otro servicio al que llame un DAG. En este caso, el otro extremo de la conexión podría rechazar o interrumpir las conexiones del trabajador de Airflow. Por ejemplo:

"UNKNOWN:Error received from peer
{created_time:"2024-11-31T10:09:52.217738071+00:00", grpc_status:14,
grpc_message:"failed to connect to all addresses; last error: UNKNOWN:
ipv4:<ip address>:443: handshaker shutdown"}"

Soluciones:

Se ha expulsado un trabajador de Airflow

El desalojo de pods es una parte normal de la ejecución de cargas de trabajo en Kubernetes. GKE expulsa pods si se quedan sin almacenamiento o para liberar recursos para cargas de trabajo con una prioridad más alta.

Descubrir expulsiones de trabajadores de Airflow

resource.type="k8s_pod"
resource.labels.cluster_name="GKE_CLUSTER_NAME"
resource.labels.pod_name:"airflow-worker"
log_id("events")
jsonPayload.reason="Evicted"

Soluciones:

Se ha terminado el trabajador de Airflow

Es posible que los workers de Airflow se eliminen externamente. Si las tareas que se están ejecutando no finalizan durante el periodo de finalización correcta, se interrumpirán y es posible que se detecten como zombis.

Descubrir las finalizaciones de pods de trabajadores de Airflow

resource.type="k8s_cluster"
resource.labels.cluster_name="GKE_CLUSTER_NAME"
protoPayload.methodName:"pods.delete"
protoPayload.response.metadata.name:"airflow-worker"

Posibles situaciones y soluciones:

  • Los trabajadores de Airflow se reinician durante las modificaciones del entorno, como las actualizaciones o la instalación de paquetes:

    Descubrir modificaciones del entorno de Composer

    resource.type="cloud_composer_environment"
    resource.labels.environment_name="ENVIRONMENT_NAME"
    log_id("cloudaudit.googleapis.com%2Factivity")

    Puedes realizar estas operaciones cuando no haya tareas críticas en curso o habilitar los reintentos de tareas.

  • Es posible que varios componentes no estén disponibles temporalmente durante las operaciones de mantenimiento.

    Descubrir las operaciones de mantenimiento de GKE

    resource.type="gke_nodepool"
    resource.labels.cluster_name="GKE_CLUSTER_NAME"
    protoPayload.metadata.operationType="UPGRADE_NODES"

    Puedes especificar ventanas de mantenimiento para minimizar

    se solapa con la ejecución de las tareas críticas.

El trabajador de Airflow estaba sometido a una carga pesada

La cantidad de recursos de CPU y memoria disponibles para un trabajador de Airflow está limitada por la configuración del entorno. Si el uso de recursos se acerca a los límites, puede provocar una contención de recursos y retrasos innecesarios durante la ejecución de la tarea. En situaciones extremas, cuando faltan recursos durante periodos prolongados, esto puede provocar tareas zombi.

Soluciones:

Consultas de Cloud Logging para descubrir los motivos de los reinicios o las expulsiones de pods

Los entornos de Cloud Composer usan clústeres de GKE como capa de infraestructura de computación. En esta sección, encontrarás consultas útiles que pueden ayudarte a descubrir los motivos por los que se reinician o se expulsan los workers o los programadores de Airflow.

Las consultas que se presentan más adelante se pueden ajustar de la siguiente manera:

  • Puedes especificar la cronología necesaria en Cloud Logging. Por ejemplo, las últimas 6 horas o los últimos 3 días, o bien puedes definir un periodo personalizado.

  • Debes especificar el nombre del clúster de tu entorno en CLUSTER_NAME.

  • Puedes limitar la búsqueda a un pod específico añadiendo el POD_NAME.

Descubrir contenedores reiniciados

    resource.type="k8s_node"
    log_id("kubelet")
    jsonPayload.MESSAGE:"will be restarted"
    resource.labels.cluster_name="CLUSTER_NAME"
  

Consulta alternativa para limitar los resultados a un pod específico:

    resource.type="k8s_node"
    log_id("kubelet")
    jsonPayload.MESSAGE:"will be restarted"
    resource.labels.cluster_name="CLUSTER_NAME"
    "POD_NAME"
  

Descubrir contenedores que se han apagado como resultado de un evento de falta de memoria

    resource.type="k8s_node"
    log_id("events")
    (jsonPayload.reason:("OOMKilling" OR "SystemOOM")
      OR jsonPayload.message:("OOM encountered" OR "out of memory"))
    severity=WARNING
    resource.labels.cluster_name="CLUSTER_NAME"
    

Consulta alternativa para limitar los resultados a un pod específico:

    resource.type="k8s_node"
    log_id("events")
    (jsonPayload.reason:("OOMKilling" OR "SystemOOM")
      OR jsonPayload.message:("OOM encountered" OR "out of memory"))
    severity=WARNING
    resource.labels.cluster_name="CLUSTER_NAME"
    "POD_NAME"
    

Descubrir contenedores que han dejado de ejecutarse

    resource.type="k8s_node"
    log_id("kubelet")
    jsonPayload.MESSAGE:"ContainerDied"
    severity=DEFAULT
    resource.labels.cluster_name="CLUSTER_NAME"
    

Consulta alternativa para limitar los resultados a un pod específico:

    resource.type="k8s_node"
    log_id("kubelet")
    jsonPayload.MESSAGE:"ContainerDied"
    severity=DEFAULT
    resource.labels.cluster_name="CLUSTER_NAME"
    "POD_NAME"
    

La base de datos de Airflow estaba sometida a una carga elevada

Varios componentes de Airflow utilizan una base de datos para comunicarse entre sí y, en concreto, para almacenar los latidos de las instancias de tareas. La escasez de recursos en la base de datos provoca que las consultas tarden más y puede afectar a la ejecución de las tareas.

A veces, se producen los siguientes errores en los registros de un trabajador de Airflow:

(psycopg2.OperationalError) connection to server at <IP address>,
port 3306 failed: server closed the connection unexpectedly

This probably means the server terminated abnormally before or while
processing the request.

Soluciones:

La base de datos de Airflow no estaba disponible temporalmente

Un trabajador de Airflow puede tardar en detectar y gestionar correctamente los errores intermitentes, como los problemas de conectividad temporales. Puede superar el umbral de detección de zombis predeterminado.

Descubrir los tiempos de espera de la señal de latido de Airflow

resource.type="cloud_composer_environment"
resource.labels.environment_name="ENVIRONMENT_NAME"
log_id("airflow-worker")
textPayload:"Heartbeat time limit exceeded"

Soluciones:

  • Aumenta el tiempo de espera de las tareas inertes y anula el valor de la [scheduler]scheduler_zombie_task_threshold opción de configuración de Airflow:

    Sección Clave Valor Notas
    scheduler scheduler_zombie_task_threshold Nuevo tiempo de espera (en segundos) El valor predeterminado es 300.

Las tareas fallan porque se ha producido un error durante la ejecución

Terminating instance

Airflow usa el mecanismo de finalización de instancias para cerrar las tareas de Airflow. Este mecanismo se utiliza en las siguientes situaciones:

  • Cuando un programador finaliza una tarea que no se ha completado a tiempo.
  • Cuando una tarea agota el tiempo de espera o se ejecuta durante demasiado tiempo.

Cuando Airflow finaliza las instancias de tareas, puedes ver las siguientes entradas de registro en los registros de un trabajador de Airflow que haya ejecutado la tarea:

  INFO - Subtask ... WARNING - State of this instance has been externally set
  to success. Terminating instance.
  INFO - Subtask ... INFO - Sending Signals.SIGTERM to GPID <X>
  INFO - Subtask ... ERROR - Received SIGTERM. Terminating subprocesses.

Posibles soluciones:

  • Comprueba el código de la tarea para detectar errores que puedan provocar que se ejecute durante demasiado tiempo.

  • Aumenta el valor de la opción de configuración de Airflow [celery_broker_transport_options]visibility_timeout.

    Por lo tanto, el programador espera más tiempo a que se complete una tarea antes de considerarla una tarea zombi. Esta opción es especialmente útil para tareas que requieren mucho tiempo y duran muchas horas. Si el valor es demasiado bajo (por ejemplo, 3 horas), el programador considera que las tareas que se ejecutan durante 5 o 6 horas están colgadas (tareas zombi).

  • Aumenta el valor de la opción de configuración [core]killed_task_cleanup_time Airflow.

    Un valor más largo proporciona más tiempo a los trabajadores de Airflow para que terminen sus tareas correctamente. Si el valor es demasiado bajo, las tareas de Airflow podrían interrumpirse de forma abrupta, sin tiempo suficiente para finalizar su trabajo correctamente.

La ejecución del DAG no finaliza en el tiempo previsto

Síntoma:

A veces, una ejecución de DAG no finaliza porque las tareas de Airflow se bloquean y la ejecución de DAG dura más de lo esperado. En condiciones normales, las tareas de Airflow no permanecen indefinidamente en el estado de cola o en ejecución, ya que Airflow tiene procedimientos de tiempo de espera y de limpieza que ayudan a evitar esta situación.

Solución:

  • Usa el parámetro dagrun_timeout para los DAGs. Por ejemplo: dagrun_timeout=timedelta(minutes=120). Por lo tanto, cada ejecución de DAG debe finalizar dentro del tiempo de espera de la ejecución de DAG. Para obtener más información sobre los estados de las tareas de Airflow, consulta la documentación de Apache Airflow.

  • Usa el parámetro Tiempo de espera de ejecución de tareas para definir un tiempo de espera predeterminado para las tareas que se ejecutan en función de los operadores de Apache Airflow.

Se pierde la conexión con el servidor de PostgreSQL o MySQL durante la ejecución de la tarea o justo después de ella.

Las Lost connection to Postgres / MySQL server during query excepciones suelen producirse cuando se cumplen las siguientes condiciones:

  • Tu DAG usa PythonOperator o un operador personalizado.
  • Tu DAG hace consultas a la base de datos de Airflow.

Si se hacen varias consultas desde una función invocable, es posible que los rastreos de pila apunten incorrectamente a la línea self.refresh_from_db(lock_for_update=True) del código de Airflow. Se trata de la primera consulta de la base de datos después de la ejecución de la tarea. La causa real de la excepción se produce antes, cuando no se cierra correctamente una sesión de SQLAlchemy.

Las sesiones de SQLAlchemy se limitan a un hilo y se crean en una función invocable. La sesión se puede continuar más adelante en el código de Airflow. Si hay retrasos significativos entre las consultas de una sesión, es posible que el servidor de PostgreSQL o MySQL ya haya cerrado la conexión. El tiempo de espera de la conexión en los entornos de Cloud Composer es de aproximadamente 10 minutos.

Solución:

  • Usa el decorador airflow.utils.db.provide_session. Este decorador proporciona una sesión válida a la base de datos de Airflow en el parámetro session y cierra correctamente la sesión al final de la función.
  • No uses una sola función de larga duración. En su lugar, mueve todas las consultas de la base de datos a funciones independientes para que haya varias funciones con el decorador airflow.utils.db.provide_session. En este caso, las sesiones se cierran automáticamente después de recuperar los resultados de la consulta.

Interrupciones transitorias al conectarse a la base de datos de metadatos de Airflow

Cloud Composer se ejecuta sobre una infraestructura distribuida. Esto significa que, de vez en cuando, pueden aparecer algunos problemas transitorios que interrumpan la ejecución de tus tareas de Airflow.

En estas situaciones, es posible que veas los siguientes mensajes de error en los registros de los trabajadores de Airflow:

"Can't connect to Postgres / MySQL server on 'airflow-sqlproxy-service.default.svc.cluster.local' (111)"

o

"Can't connect to Postgres / MySQL server on 'airflow-sqlproxy-service.default.svc.cluster.local' (104)"

Estos problemas intermitentes también pueden deberse a las operaciones de mantenimiento que se realizan en tus entornos de Cloud Composer.

Normalmente, estos errores son intermitentes y, si tus tareas de Airflow son idempotentes y tienes configurados reintentos, no te afectarán. También puedes definir ventanas de mantenimiento.

Otro motivo de estos errores podría ser la falta de recursos en el clúster de tu entorno. En estos casos, puede aumentar la escala u optimizar su entorno tal como se describe en las instrucciones para aumentar la escala de los entornos o optimizar su entorno.

Una ejecución de DAG se marca como correcta, pero no tiene tareas ejecutadas

Si la ejecución de un DAG execution_date es anterior a la start_date del DAG, es posible que veas ejecuciones de DAGs que no tienen ninguna ejecución de tarea, pero que siguen marcadas como correctas.

Una ejecución de DAG correcta sin tareas ejecutadas
Imagen 3. Ejecución de un DAG correcta sin tareas ejecutadas (haz clic para ampliar)

Causa

Esta situación puede producirse en uno de los siguientes casos:

  • La diferencia horaria entre execution_date y start_date del DAG provoca un error de coincidencia. Por ejemplo, puede ocurrir cuando se usa pendulum.parse(...) para definir start_date.

  • El start_date del DAG se ha definido con un valor dinámico, por ejemplo, airflow.utils.dates.days_ago(1).

Solución

  • Asegúrate de que execution_date y start_date usen la misma zona horaria.

  • Especifica un start_date estático y combínalo con catchup=False para evitar ejecutar DAGs con fechas de inicio anteriores.

Prácticas recomendadas

Impacto de las operaciones de actualización en las ejecuciones de tareas de Airflow

Las operaciones de actualización o de mejora interrumpen las tareas de Airflow que se estén ejecutando, a menos que se ejecuten en el modo aplazable.

Te recomendamos que realices estas operaciones cuando preveas que tendrán un impacto mínimo en las ejecuciones de tareas de Airflow y que configures los mecanismos de reintento adecuados en tus DAGs y tareas.

No programes DAGs generados mediante programación a la misma hora

Generar objetos DAG de forma programática a partir de un archivo DAG es un método eficaz para crear muchos DAG similares que solo tienen pequeñas diferencias.

Es importante no programar la ejecución de todos estos DAGs inmediatamente. Es muy probable que los trabajadores de Airflow no tengan suficientes recursos de CPU y memoria para ejecutar todas las tareas programadas al mismo tiempo.

Para evitar problemas al programar DAGs programáticos, sigue estas recomendaciones:

  • Aumenta la simultaneidad de los trabajadores y amplía tu entorno para que pueda ejecutar más tareas simultáneamente.
  • Genera DAGs de forma que sus programaciones se distribuyan de manera uniforme a lo largo del tiempo para evitar programar cientos de tareas al mismo tiempo, de modo que los trabajadores de Airflow tengan tiempo para ejecutar todas las tareas programadas.

Controlar el tiempo de ejecución de los DAGs, las tareas y las ejecuciones paralelas del mismo DAG

Si quieres controlar la duración de una sola ejecución de un DAG concreto, puedes usar el parámetro dagrun_timeout del DAG. Por ejemplo, si crees que una sola ejecución de un DAG (independientemente de si se completa correctamente o no) no debe durar más de una hora, asigna a este parámetro el valor 3600 segundos.

También puedes controlar la duración de una tarea de Airflow. Para ello, puedes usar execution_timeout.

Si quieres controlar el número de ejecuciones de DAG activas que quieres tener para un DAG concreto, puedes usar la [core]max-active-runs-per-dag opción de configuración de Airflow.

Si quieres que solo se ejecute una instancia de un DAG en un momento dado, asigna el valor 1 al parámetro max-active-runs-per-dag.

Evitar el aumento del tráfico de red hacia y desde la base de datos de Airflow

La cantidad de tráfico de red entre el clúster de GKE de tu entorno y la base de datos de Airflow depende del número de DAGs, del número de tareas de los DAGs y de la forma en que los DAGs acceden a los datos de la base de datos de Airflow. Los siguientes factores pueden influir en el uso de la red:

  • Consultas a la base de datos de Airflow. Si tus DAGs hacen muchas consultas, generarán grandes cantidades de tráfico. Por ejemplo, comprobar el estado de las tareas antes de continuar con otras, consultar la tabla XCom o volcar el contenido de la base de datos de Airflow.

  • Gran número de tareas. Cuantas más tareas haya que programar, más tráfico de red se generará. Esta consideración se aplica tanto al número total de tareas de tus DAGs como a la frecuencia de programación. Cuando el programador de Airflow programa ejecuciones de DAGs, hace consultas a la base de datos de Airflow y genera tráfico.

  • La interfaz web de Airflow genera tráfico de red porque hace consultas a la base de datos de Airflow. Usar de forma intensiva páginas con gráficos, tareas y diagramas puede generar grandes volúmenes de tráfico de red.

Siguientes pasos