Résoudre les problèmes avec le programmeur Airflow

Cloud Composer 1 | Cloud Composer 2

Cette page fournit des étapes de dépannage et des informations sur les problèmes courants liés aux programmeurs Airflow.

Identifier la source du problème

Pour commencer le dépannage, déterminez si le problème se produit au moment de l'analyse du DAG ou pendant le traitement des tâches au moment de l'exécution. Pour en savoir plus sur le temps d'analyse et la durée d'exécution du DAG, consultez la section Différence entre le temps d'analyse du DAG et la durée d'exécution du DAG.

Inspecter les journaux du processeur DAG

Si vous avez des DAG complexes, il est possible que le processeur DAG, exécuté par le programmeur, n'analyse pas tous vos DAG. Cela peut entraîner de nombreux problèmes présentant les symptômes suivants.

Symptômes :

  • Si le processeur DAG rencontre des problèmes lors de l'analyse de vos DAG, cela peut entraîner une combinaison des problèmes listés ci-dessous. Si les DAG sont générés dynamiquement, ces problèmes peuvent avoir plus d'impact que les DAG statiques.

  • Les DAG ne sont pas visibles dans l'interface utilisateur d'Airflow ni dans celle des DAG.

  • L'exécution des DAG n'est pas planifiée.

  • Les journaux du processeur DAG contiennent des erreurs, par exemple:

    dag-processor-manager [2023-04-21 21:10:44,510] {manager.py:1144} ERROR -
    Processor for /home/airflow/gcs/dags/dag-example.py with PID 68311 started
    at 2023-04-21T21:09:53.772793+00:00 has timed out, killing it.
    

    ou

    dag-processor-manager [2023-04-26 06:18:34,860] {manager.py:948} ERROR -
    Processor for /home/airflow/gcs/dags/dag-example.py exited with return
    code 1.
    
  • Les problèmes liés aux programmeurs Airflow entraînent des redémarrages.

  • Les tâches Airflow dont l'exécution est planifiée sont annulées et les exécutions DAG des DAG qui n'ont pas pu être analysés peuvent être marquées comme failed. Exemple :

    airflow-scheduler Failed to get task '<TaskInstance: dag-example.task1--1
    manual__2023-04-17T10:02:03.137439+00:00 [removed]>' for dag
    'dag-example'. Marking it as removed.
    

Solution :

  • Augmentez les paramètres liés à l'analyse du DAG:

    • Définissez dagbag-import-timeout sur au moins 120 secondes (ou plus, si nécessaire).

    • Définissez dag-file-processor-timeout sur au moins 180 secondes (ou plus, si nécessaire). Cette valeur doit être supérieure à dagbag-import-timeout.

  • Corrigez ou supprimez les DAG qui provoquent des problèmes pour le processeur DAG.

Inspecter les durées d'analyse des DAG

Pour vérifier si le problème se produit au moment de l'analyse du DAG, procédez comme suit :

Console

Dans la console Google Cloud, vous pouvez utiliser la page Surveillance et l'onglet Journaux pour inspecter les temps d'analyse des DAG.

Inspectez les temps d'analyse du DAG à l'aide de la page de surveillance de Cloud Composer:

  1. Dans la console Google Cloud, accédez à la page Environnements.

    Accéder à la page Environnements

  2. Dans la liste des environnements, cliquez sur le nom de votre environnement. La page Surveillance s'ouvre.

  3. Dans l'onglet Surveillance, consultez le graphique Temps total d'analyse pour tous les fichiers DAG de la section Exécutions DAG et identifiez les problèmes potentiels.

    La section "Exécutions du DAG" de l'onglet Surveillance de Composer affiche les métriques d'état des DAG de votre environnement

Inspectez les temps d'analyse du DAG à l'aide de l'onglet Journaux Cloud Composer:

  1. Dans la console Google Cloud, accédez à la page Environnements.

    Accéder à la page Environnements

  2. Dans la liste des environnements, cliquez sur le nom de votre environnement. La page Surveillance s'ouvre.

  3. Accédez à l'onglet Journaux et, dans l'arborescence de navigation Tous les journaux, sélectionnez la section Gestionnaire de processeur DAG.

  4. Examinez les journaux dag-processor-manager et identifiez les problèmes potentiels.

    Les journaux du processeur DAG affichent les temps d'analyse du DAG

gcloud

Utilisez la commande dags report pour afficher la durée d'analyse de tous vos DAG.

gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags report

Remplacez :

  • ENVIRONMENT_NAME par le nom de l'environnement.
  • LOCATION par la région dans laquelle se trouve l'environnement.

Le résultat de la commande ressemble à ceci :

Executing within the following Kubernetes cluster namespace: composer-2-0-31-airflow-2-3-3
file                  | duration       | dag_num | task_num | dags
======================+================+=========+==========+===================
/manydagsbig.py       | 0:00:00.038334 | 2       | 10       | serial-0,serial-0
/airflow_monitoring.py| 0:00:00.001620 | 1       | 1        | airflow_monitoring

Recherchez la valeur duration de chacun des DAG répertoriés dans le tableau. Une valeur élevée peut indiquer que l'un de vos DAG n'est pas implémenté de manière optimale. Dans la table de sortie, vous pouvez identifier les DAG dont le temps d'analyse est long.

Surveiller les tâches en cours et en file d'attente

Pour vérifier si des tâches sont bloquées dans une file d'attente, procédez comme suit :

  1. Dans la console Google Cloud, accédez à la page Environnements.

    Accéder à la page Environnements

  2. Dans la liste des environnements, cliquez sur le nom de votre environnement. La page Détails de l'environnement s'ouvre.

  3. Accédez à l'onglet Surveillance.

  4. Dans l'onglet Surveillance, consultez le graphique Tâches Airflow dans la section Exécutions DAG et identifiez les problèmes potentiels. Les tâches Airflow sont des tâches en file d'attente dans Airflow. Elles peuvent être placées dans la file d'attente de l'agent Celery ou Kubernetes Executor. Les tâches en file d'attente Celery sont des instances de tâches placées dans la file d'attente de l'agent Celery.

Résoudre les problèmes rencontrés au moment de l'analyse du DAG

Les sections suivantes décrivent les symptômes et les correctifs potentiels pour certains problèmes courants qui surviennent au moment de l'analyse du DAG.

Nombre limité de threads

Autoriser le gestionnaire de processeur DAG (la partie du programmeur qui traite les fichiers DAG) à n'utiliser qu'un nombre limité de threads peut avoir une incidence sur la durée d'analyse du DAG.

Pour résoudre le problème, remplacez les options de configuration Airflow suivantes:

  • Pour Airflow 1.10.12 et les versions antérieures, remplacez le paramètre max_threads:

    Section Clé Valeur Notes
    scheduler max_threads NUMBER_OF_CORES_IN_MACHINE - 1 Remplacez NUMBER_OF_CORES_IN_MACHINE par le nombre de cœurs
    dans les machines des nœuds de calcul.
  • Pour Airflow 1.10.14 et versions ultérieures, remplacez le paramètre parsing_processes:

    Section Clé Valeur Notes
    scheduler parsing_processes NUMBER_OF_CORES_IN_MACHINE - 1 Remplacez NUMBER_OF_CORES_IN_MACHINE par le nombre de cœurs
    dans les machines des nœuds de calcul.

Répartition du nombre et de la durée des tâches

Airflow est connu pour avoir des difficultés avec la planification d'un grand nombre de petites tâches. Dans de telles situations, il est préférable d'utiliser un nombre plus faible de tâches mieux consolidées.

La planification simultanée d'un grand nombre de DAG ou de tâches peut également être source de problèmes. Pour éviter ce problème, répartissez vos tâches de manière plus uniforme au fil du temps.

Résoudre les problèmes liés aux tâches en cours d'exécution et en file d'attente

Les sections suivantes décrivent les symptômes et les correctifs potentiels pour certains problèmes courants qui surviennent au moment de l'exécution ou de la mise en file d'attente des tâches.

Les files d'attente de tâches sont trop longues

Dans certains cas, une file d'attente de tâches peut être trop longue pour le planificateur. Pour en savoir plus sur l'optimisation des paramètres de nœud de calcul et de Celery, consultez la page Faire évoluer votre environnement Cloud Composer avec votre entreprise.

Utiliser la fonctionnalité TimeTable du programmeur Airflow

À partir d'Airflow 2.2, vous pouvez définir une table temporelle pour un DAG à l'aide d'une nouvelle fonctionnalité appelée "TimeTable".

Vous pouvez définir un créneau horaire à l'aide de l'une des méthodes suivantes:

Ressources de cluster limitées

Cette section ne s'applique qu'à Cloud Composer 1.

Vous risquez de rencontrer des problèmes de performances si le cluster GKE de votre environnement est trop petit pour gérer tous vos DAG et tâches. Dans ce cas, essayez l'une des solutions suivantes :

  • Créez un environnement avec un type de machine offrant plus de performances et migrez vos DAG vers cet environnement.
  • Créez d'autres environnements Cloud Composer et répartissez les DAG sur ces environnements.
  • Modifiez le type de machine pour les nœuds GKE, comme décrit dans la section Mettre à niveau le type de machine pour les nœuds GKE. Cette procédure étant sujette aux erreurs, il s'agit de l'option la moins recommandée.
  • Mettez à niveau le type de machine de l'instance Cloud SQL qui exécute la base de données Airflow dans votre environnement, par exemple à l'aide des commandes gcloud composer environments update. Les faibles performances de la base de données Airflow peuvent être la cause de la lenteur du programmeur.

Éviter la planification des tâches pendant les intervalles de maintenance

Vous pouvez définir des intervalles de maintenance spécifiques pour votre environnement. Au cours de ces périodes, des événements de maintenance pour Cloud SQL et GKE ont lieu.

Forcer le programmeur Airflow à ignorer les fichiers inutiles

Vous pouvez améliorer les performances du programmeur Airflow en ignorant les fichiers inutiles dans le dossier des DAG. Le programmeur Airflow ignore tous les fichiers et dossiers spécifiés dans le fichier .airflowignore.

Pour que le programmeur Airflow ignore les fichiers inutiles, procédez comme suit :

  1. Créez un fichier .airflowignore.
  2. Dans ce fichier, répertoriez les fichiers et dossiers à ignorer.
  3. Importez ce fichier dans le dossier /dags du bucket de votre environnement.

Pour en savoir plus sur le format de fichier .airflowignore, consultez la documentation Airflow.

Le programmeur Airflow traite des DAG interrompus

Les utilisateurs Airflow peuvent interrompre un DAG pour en éviter l'exécution. Cela permet d'économiser les cycles de traitement des nœuds de calcul Airflow.

Cependant, le programmeur Airflow continue d'analyser les DAG interrompus. Si vous souhaitez vraiment améliorer les performances du programmeur Airflow, utilisez .airflowignore ou supprimez les DAG mis en pause du dossier des DAG.

Utilisation de "wait_for_downstream" dans vos DAG

Si vous définissez le paramètre wait_for_downstream sur True dans vos DAG, pour qu'une tâche réussisse, toutes les tâches immédiatement en aval de cette tâche doivent également réussir. Cela signifie que l'exécution des tâches d'un DAG donné peut être ralentie par l'exécution des tâches du DAG précédent. Pour en savoir plus, consultez la documentation Airflow.

Les tâches mises en file d'attente depuis trop longtemps seront annulées et reprogrammées

Si une tâche Airflow est conservée dans la file d'attente trop longtemps, le programmeur la replanifie pour son exécution (dans les versions Airflow antérieures à la version 2.3.1, la tâche est également marquée comme ayant échoué et relancée si elle est éligible).

Pour observer les symptômes de cette situation, vous pouvez consulter le graphique indiquant le nombre de tâches en file d'attente (onglet "Surveillance" dans l'interface utilisateur Cloud Composer). Si les pics de ce graphique ne diminuent pas au bout de deux heures environ, les tâches seront probablement reprogrammées (sans journaux), suivies des entrées de journal "Les tâches adoptées étaient toujours en attente..." dans les journaux du planificateur. Dans ce cas, le message "Log file is not found..." (Fichier journal introuvable) peut s'afficher dans les journaux de tâches Airflow, car la tâche n'a pas été exécutée.

En général, ce comportement est normal et l'instance suivante de la tâche planifiée doit être exécutée conformément à la planification. Si vous observez de nombreux cas de ce type dans les environnements Cloud Composer, cela peut signifier qu'il n'y a pas assez de nœuds de calcul Airflow dans votre environnement pour traiter toutes les tâches planifiées.

Solution: pour résoudre ce problème, vous devez vous assurer que les nœuds de calcul Airflow ont toujours la capacité d'exécuter des tâches en file d'attente. Par exemple, vous pouvez augmenter le nombre de nœuds de calcul ou worker_concurrency. Vous pouvez également régler le parallélisme ou les pools pour éviter que la mise en file d'attente de tâches dépasse la capacité dont vous disposez.

Il se peut que des tâches obsolètes bloquent l'exécution d'un DAG spécifique

En règle générale, le programmeur Airflow doit être capable de gérer les situations dans lesquelles la file d'attente contient des tâches obsolètes et, pour une raison quelconque, il est impossible de les exécuter correctement (par exemple, un DAG auquel appartiennent les tâches obsolètes a été supprimé).

Si ces tâches obsolètes ne sont pas supprimées définitivement par le planificateur, vous devrez peut-être les supprimer manuellement. Vous pouvez le faire, par exemple, dans l'interface utilisateur d'Airflow (Menu > Navigateur > Instances de tâches), rechercher les tâches en file d'attente appartenant à un DAG obsolète, puis les supprimer.

Pour résoudre ce problème, mettez à niveau votre environnement vers Cloud Composer 2.1.12 ou version ultérieure.

Approche Cloud Composer du paramètre [scheduler]min_file_process_interval

Cloud Composer modifie la façon dont [scheduler]min_file_process_interval est utilisé par le programmeur Airflow.

Airflow 1

Si Cloud Composer utilise Airflow 1, les utilisateurs peuvent définir la valeur de [scheduler]min_file_process_interval entre 0 et 600 secondes. Les valeurs supérieures à 600 secondes fournissent les mêmes résultats que si [scheduler]min_file_process_interval était défini sur 600 secondes.

Airflow 2

Dans Airflow 2, [scheduler]min_file_process_interval ne peut être utilisé qu'avec les versions 1.19.9 et 2.0.26 ou ultérieures.

  • Versions de Cloud Composer antérieures à 1.19.9 et 2.0.26

    Dans ces versions, [scheduler]min_file_process_interval est ignoré.

  • Cloud Composer versions 1.19.9 ou 2.0.26, ou versions plus récentes

    Le programmeur Airflow est redémarré après un certain nombre de fois que tous les DAG sont planifiés et que le paramètre [scheduler]num_runs contrôle le nombre de fois qu'il est effectué par le programmeur. Lorsque le programmeur atteint les boucles de planification [scheduler]num_runs, il est redémarré. Le programmeur est un composant sans état, et un tel redémarrage est un mécanisme de réparation automatique en cas de problème rencontré par le programmeur. Si aucune valeur n'est spécifiée, la valeur par défaut de [scheduler]num_runs est appliquée, à savoir 5 000.

    [scheduler]min_file_process_interval permet de configurer la fréquence d'analyse des DAG, mais ce paramètre ne peut pas dépasser le temps nécessaire à un programmeur pour effectuer des boucles [scheduler]num_runs lors de la planification de vos DAG.

Scaling de la configuration Airflow

Airflow fournit des options de configuration Airflow qui contrôlent le nombre de tâches et de DAG Airflow pouvant être exécutés simultanément. Pour définir ces options de configuration, remplacez leurs valeurs pour votre environnement.

  • Simultanéité des nœuds de calcul

    Le paramètre [celery]worker_concurrency contrôle le nombre maximal de tâches qu'un nœud de calcul Airflow peut exécuter en même temps. Si vous multipliez la valeur de ce paramètre par le nombre de nœuds de calcul Airflow dans votre environnement Cloud Composer, vous obtenez le nombre maximal de tâches pouvant être exécutées simultanément dans votre environnement. Ce nombre est limité par l'option de configuration Airflow [core]parallelism, décrite plus en détail.

    Dans les environnements Cloud Composer 2, la valeur par défaut de [celery]worker_concurrency est calculée automatiquement.

    • Pour les versions d'Airflow 2.3.3 et ultérieures, [celery]worker_concurrency est défini sur une valeur minimale de 32, 12 * worker_CPU et 8 * worker_memory.

    • Pour les versions Airflow 2.2.5 ou antérieures, [celery]worker_concurrency est défini sur 12 * nombre de processeurs des nœuds de calcul.

  • Nombre maximal d'exécutions de DAG actives

    L'option de configuration Airflow [core]max_active_runs_per_dag contrôle le nombre maximal d'exécutions de DAG actives par DAG. Si cette limite est atteinte, le programmeur ne crée pas d'autres exécutions de DAG.

    Une définition incorrecte de ce paramètre peut créer un problème où le programmeur limite l'exécution du DAG, car il ne peut pas créer d'autres instances d'exécution de DAG simultanées.

  • Nombre maximal de tâches actives par DAG

    L'option de configuration Airflow [core]max_active_tasks_per_dag contrôle le nombre maximal d'instances de tâche pouvant s'exécuter simultanément dans chaque DAG. Il s'agit d'un paramètre au niveau du DAG.

    Si ce paramètre est défini de manière incorrecte, vous risquez de rencontrer un problème dans lequel l'exécution d'une instance de DAG est ralentie car le nombre de tâches de DAG pouvant être exécutées simultanément est limité.

    Solution: augmentez [core]max_active_tasks_per_dag.

  • Parallélisme et taille du pool

    L'option de configuration Airflow [core]parallelism contrôle le nombre de tâches que le programmeur Airflow peut mettre en file d'attente dans la file d'attente de l'exécuteur une fois que toutes les dépendances pour ces tâches sont satisfaites.

    Il s'agit d'un paramètre global pour l'ensemble de la configuration Airflow.

    Les tâches sont mises en file d'attente et exécutées dans un pool. Les environnements Cloud Composer n'utilisent qu'un seul pool. La taille de ce pool contrôle le nombre de tâches pouvant être mises en file d'attente simultanément par le programmeur. Si la taille du pool est trop faible, le programmeur ne pourra plus mettre de tâches en file d'attente avant même que les seuils (définis par l'option de configuration [core]parallelism et par l'option de configuration [celery]worker_concurrency multipliée par le nombre de nœuds de calcul Airflow) ne soient dépassés.

    Vous pouvez configurer la taille du pool dans l'interface utilisateur Airflow (Menu > Admin > Pools). Ajustez la taille du pool en fonction du niveau de parallélisme attendu dans votre environnement.

    Généralement, [core]parallelism est défini comme produit du nombre maximal de nœuds de calcul et de [celery]worker_concurrency.

Les DAG ne sont pas planifiés par le programmeur en raison des délais avant expiration du processeur DAG

Pour en savoir plus sur ce problème, consultez la section Dépanner les DAG.

Marquer des tâches comme ayant échoué après avoir atteint dagrun_timeout

Le programmeur marque les tâches qui ne sont pas terminées (en cours d'exécution, planifiées et en file d'attente) comme ayant échoué si une exécution du DAG ne se termine pas dans dagrun_timeout (un paramètre DAG).

Solution :

Symptômes de la charge de travail de la base de données Airflow

L'entrée de journal d'avertissement suivante peut parfois s'afficher dans les journaux du programmeur Airflow:

Scheduler heartbeat got an exception: (_mysql_exceptions.OperationalError) (2006, "Lost connection to MySQL server at 'reading initial communication packet', system error: 0")"

Des symptômes similaires peuvent également apparaître dans les journaux des nœuds de calcul Airflow:

Pour MySQL:

(_mysql_exceptions.OperationalError) (2006, "Lost connection to MySQL server at
'reading initial communication packet', system error: 0")"

Pour PostgreSQL:

psycopg2.OperationalError: connection to server at ... failed

Ces erreurs ou avertissements peuvent être le symptôme de la base de données Airflow encombrée par le nombre de connexions ouvertes ou le nombre de requêtes exécutées en même temps, que ce soit par des programmeurs ou d'autres composants Airflow tels que des nœuds de calcul, des déclencheurs ou des serveurs Web.

Solutions possibles:

Le serveur Web affiche l'avertissement "Le planificateur ne semble pas être en cours d'exécution"

Le programmeur signale régulièrement sa pulsation à la base de données Airflow. À partir de ces informations, le serveur Web Airflow détermine si le programmeur est actif.

Parfois, si le planificateur est soumis à une charge importante, il peut ne pas être en mesure de signaler sa pulsation à chaque [scheduler]scheduler-heartbeat-sec.

Dans ce cas, le serveur Web Airflow peut afficher l'avertissement suivant:

The scheduler does not appear to be running. Last heartbeat was received <X>
seconds ago.

Solutions possibles:

  • Augmentez les ressources de processeur et de mémoire pour le programmeur.

  • Optimisez vos DAG afin que leur analyse et leur planification soient plus rapides et qu'elles ne consomment pas trop de ressources du programmeur.

  • Évitez d'utiliser des variables globales dans les DAG Airflow : variables d'environnement Cloud Composer et variables Airflow.

  • Augmentez la valeur du paramètre [scheduler]scheduler-health-check-threshold afin que le serveur Web attende plus longtemps avant de signaler l'indisponibilité du programmeur.

Solutions de contournement pour les problèmes rencontrés lors du remplissage des DAG

Il se peut que vous souhaitiez parfois réexécuter des DAG déjà exécutés. Vous pouvez le faire avec l'outil de ligne de commande Airflow comme suit:

Airflow 1

gcloud composer environments run \
  ENVIRONMENT_NAME \
  --location LOCATION \
  backfill -- -B \
  -s START_DATE \
  -e END_DATE \
  DAG_NAME

Pour ne réexécuter que les tâches ayant échoué pour un DAG spécifique, utilisez également l'argument --rerun_failed_tasks.

Airflow 2

gcloud composer environments run \
  ENVIRONMENT_NAME \
  --location LOCATION \
   dags backfill -- -B \
   -s START_DATE \
   -e END_DATE \
   DAG_NAME

Pour ne réexécuter que les tâches ayant échoué pour un DAG spécifique, utilisez également l'argument --rerun-failed-tasks.

Remplacez :

  • ENVIRONMENT_NAME par le nom de l'environnement.
  • LOCATION par la région dans laquelle se trouve l'environnement.
  • START_DATE par une valeur pour le paramètre DAG start_date, au format YYYY-MM-DD.
  • END_DATE par une valeur pour le paramètre DAG end_date, au format YYYY-MM-DD.
  • DAG_NAME par le nom du DAG.

L'opération de remplissage peut parfois générer une situation d'interblocage où le remplissage n'est pas possible en raison du verrouillage d'une tâche. Exemple :

2022-11-08 21:24:18.198 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.201 CET -------- --------- -------- ------------
2022-11-08 21:24:18.202 CET 2022-11-08 21:24:18.203 CET These tasks are deadlocked:
2022-11-08 21:24:18.203 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.204 CET ----------------------- ----------- ----------------------------------- ------------
2022-11-08 21:24:18.204 CET <DAG name> <Task name> backfill__2022-10-27T00:00:00+00:00 1
2022-11-08 21:24:19.249 CET Command exited with return code 1
...
2022-11-08 21:24:19.348 CET Failed to execute job 627927 for task backfill

Dans certains cas, vous pouvez contourner les interblocages à l'aide des solutions suivantes:

  • Désactivez le mini-Scheduler en remplaçant [core]schedule-after-task-execution par False.

  • Exécutez des remplissages pour des plages de dates plus restreintes. Par exemple, définissez START_DATE et END_DATE pour spécifier une période d'un seul jour.

Étapes suivantes