Résoudre les problèmes avec le programmeur Airflow

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Cette page fournit des étapes de dépannage et des informations sur les problèmes courants liés aux planificateurs et aux processeurs de DAG Airflow.

Identifier la source du problème

Pour commencer le dépannage, déterminez si le problème se produit :

  • Au moment de l'analyse du DAG, lorsque le DAG est analysé par un processeur DAG Airflow
  • Au moment de l'exécution, lorsque le DAG est traité par un programmeur Airflow

Pour en savoir plus sur le temps d'analyse et la durée d'exécution du DAG, consultez la section Différence entre le temps d'analyse du DAG et la durée d'exécution du DAG.

Inspecter les problèmes de traitement des DAG

  1. Inspectez les journaux du processeur de DAG.
  2. Vérifiez les durées d'analyse des DAG.

Surveiller les tâches en cours et en file d'attente

Pour vérifier si des tâches sont bloquées dans une file d'attente, procédez comme suit :

  1. Dans la console Google Cloud , accédez à la page Environnements.

    Accéder à la page Environnements

  2. Dans la liste des environnements, cliquez sur le nom de votre environnement. La page Détails de l'environnement s'ouvre.

  3. Accédez à l'onglet Surveillance.

  4. Dans l'onglet Surveillance, consultez le graphique Tâches Airflow dans la section Exécutions DAG et identifiez les problèmes potentiels. Les tâches Airflow sont des tâches en file d'attente dans Airflow. Elles peuvent être envoyées à la file d'attente de l'agent Celery ou de l'exécuteur Kubernetes. Les tâches en file d'attente Celery sont des instances de tâches placées dans la file d'attente de l'agent Celery.

Résoudre les problèmes rencontrés au moment de l'analyse du DAG

Les sections suivantes décrivent les symptômes et les correctifs potentiels pour certains problèmes courants qui surviennent au moment de l'analyse du DAG.

Analyse et planification des DAG dans Cloud Composer 1 et Airflow 1

L'efficacité de l'analyse des DAG a été considérablement améliorée dans Airflow 2. Si vous rencontrez des problèmes de performances liés à l'analyse et à la planification des DAG, envisagez de migrer vers Airflow 2.

Dans Cloud Composer 1, le planificateur s'exécute sur les nœuds de cluster avec d'autres composants Cloud Composer. Par conséquent, la charge des nœuds de cluster individuels peut être supérieure ou inférieure à celle des autres nœuds. Les performances du planificateur (analyse et planification des DAG) peuvent varier en fonction du nœud sur lequel il s'exécute. De plus, un nœud individuel sur lequel le planificateur s'exécute peut changer à la suite d'opérations de mise à niveau ou de maintenance. Cette limite a été résolue dans Cloud Composer 2, où vous pouvez allouer des ressources de processeur et de mémoire au planificateur. Les performances du planificateur ne dépendent pas de la charge des nœuds de cluster.

Répartition du nombre et de la durée des tâches

Airflow peut rencontrer des problèmes lors de la planification simultanée d'un grand nombre de DAG ou de tâches. Pour éviter les problèmes de programmation, vous pouvez :

  • Ajustez vos DAG pour utiliser un nombre plus faible de tâches mieux consolidées.
  • Ajustez les intervalles de planification de vos DAG pour répartir plus uniformément les exécutions de DAG au fil du temps.

Scaling de la configuration Airflow

Airflow fournit des options de configuration Airflow qui contrôlent le nombre de tâches et de DAG Airflow pouvant être exécutés simultanément. Pour définir ces options de configuration, remplacez leurs valeurs pour votre environnement. Vous pouvez également définir certaines de ces valeurs au niveau du DAG ou de la tâche.

  • Simultanéité des nœuds de calcul

    Le paramètre [celery]worker_concurrency contrôle le nombre maximal de tâches qu'un nœud de calcul Airflow peut exécuter en même temps. Si vous multipliez la valeur de ce paramètre par le nombre de nœuds de calcul Airflow dans votre environnement Cloud Composer, vous obtenez le nombre maximal de tâches pouvant être exécutées simultanément dans votre environnement. Ce nombre est limité par l'option de configuration Airflow [core]parallelism, décrite en détail plus bas.

  • Nombre maximal d'exécutions de DAG actives

    L'option de configuration Airflow [core]max_active_runs_per_dag contrôle le nombre maximal d'exécutions de DAG actives par DAG. Si cette limite est atteinte, le programmeur ne crée pas d'autres exécutions de DAG.

    Une définition incorrecte de ce paramètre peut créer un problème où le programmeur limite l'exécution du DAG, car il ne peut pas créer d'autres instances d'exécution de DAG simultanées.

    Vous pouvez également définir cette valeur au niveau du DAG avec le paramètre max_active_runs.

  • Nombre maximal de tâches actives par DAG

    L'option de configuration Airflow [core]max_active_tasks_per_dag contrôle le nombre maximal d'instances de tâche pouvant s'exécuter simultanément dans chaque DAG.

    Si ce paramètre est défini de manière incorrecte, vous risquez de rencontrer un problème dans lequel l'exécution d'une instance de DAG est ralentie car le nombre de tâches de DAG pouvant être exécutées simultanément est limité. Dans ce cas, vous pouvez augmenter la valeur de cette option de configuration.

    Vous pouvez également définir cette valeur au niveau du DAG avec le paramètre max_active_tasks.

    Vous pouvez utiliser les paramètres max_active_tis_per_dag et max_active_tis_per_dagrun au niveau de la tâche pour contrôler le nombre d'instances avec un ID de tâche spécifique autorisées à s'exécuter par DAG et par exécution de DAG.

  • Parallélisme et taille du pool

    L'option de configuration Airflow [core]parallelism contrôle le nombre de tâches que le programmeur Airflow peut mettre en file d'attente dans la file d'attente de l'exécuteur une fois que toutes les dépendances pour ces tâches sont satisfaites.

    Il s'agit d'un paramètre global pour l'ensemble de la configuration Airflow.

    Les tâches sont mises en file d'attente et exécutées dans un pool. Les environnements Cloud Composer n'utilisent qu'un seul pool. La taille de ce pool contrôle le nombre de tâches pouvant être mises en file d'attente simultanément par le programmeur. Si la taille du pool est trop faible, le programmeur ne pourra plus mettre de tâches en file d'attente avant même que les seuils (définis par l'option de configuration [core]parallelism et par l'option de configuration [celery]worker_concurrency multipliée par le nombre de nœuds de calcul Airflow) ne soient dépassés.

    Vous pouvez configurer la taille du pool dans l'interface utilisateur Airflow (Menu > Admin > Pools). Ajustez la taille du pool en fonction du niveau de parallélisme attendu dans votre environnement.

    En général, [core]parallelism est défini comme le produit du nombre maximal de nœuds de calcul et de [celery]worker_concurrency.

Résoudre les problèmes liés aux tâches en cours d'exécution et en file d'attente

Les sections suivantes décrivent les symptômes et les correctifs potentiels pour certains problèmes courants qui surviennent au moment de l'exécution ou de la mise en file d'attente des tâches.

Les exécutions de DAG ne sont pas exécutées

Symptôme :

Lorsque la date de planification d'un DAG est définie de manière dynamique, cela peut entraîner divers effets secondaires inattendus. Exemple :

  • Une exécution de DAG est toujours dans le futur et le DAG n'est jamais exécuté.

  • Les exécutions DAG passées sont marquées comme exécutées et réussies, même si elles ne l'ont pas été.

Pour en savoir plus, consultez la documentation Apache Airflow.

Solutions possibles :

  • Suivez les recommandations de la documentation Apache Airflow.

  • Définissez des start_date statiques pour les DAG. Vous pouvez également utiliser catchup=False pour désactiver l'exécution du DAG pour les dates passées.

  • Évitez d'utiliser datetime.now() ou days_ago(<number of days>), sauf si vous êtes conscient des effets secondaires de cette approche.

Utiliser la fonctionnalité TimeTable du planificateur Airflow

Les tableaux de planification sont disponibles à partir d'Airflow 2.2.

Vous pouvez définir un tableau de planification pour un DAG à l'aide de l'une des méthodes suivantes :

Vous pouvez également utiliser les plannings intégrés.

Ressources de cluster limitées

Vous risquez de rencontrer des problèmes de performances si le cluster GKE de votre environnement est trop petit pour gérer tous vos DAG et tâches. Dans ce cas, essayez l'une des solutions suivantes :

  • Créez un environnement avec un type de machine offrant plus de performances et migrez vos DAG vers cet environnement.
  • Créez d'autres environnements Cloud Composer et répartissez les DAG sur ces environnements.
  • Modifiez le type de machine pour les nœuds GKE, comme décrit dans la section Mettre à niveau le type de machine pour les nœuds GKE. Cette procédure étant sujette aux erreurs, il s'agit de l'option la moins recommandée.
  • Mettez à niveau le type de machine de l'instance Cloud SQL qui exécute la base de données Airflow dans votre environnement, par exemple à l'aide des commandes gcloud composer environments update. Une base de données Airflow peu performante peut ralentir le planificateur.

Éviter la planification des tâches pendant les intervalles de maintenance

Vous pouvez définir des intervalles de maintenance pour votre environnement afin que la maintenance de cet environnement ait lieu en dehors des périodes où vous exécutez vos DAG. Vous pouvez toujours exécuter vos DAG pendant les intervalles de maintenance, à condition que l'interruption et la nouvelle tentative de certaines tâches soient acceptables. Pour en savoir plus sur l'impact des intervalles de maintenance sur votre environnement, consultez Spécifier des intervalles de maintenance.

Utilisation de "wait_for_downstream" dans vos DAG

Si vous définissez le paramètre wait_for_downstream sur True dans vos DAG, pour qu'une tâche réussisse, toutes les tâches immédiatement en aval de cette tâche doivent également réussir. Cela signifie que l'exécution des tâches d'un DAG donné peut être ralentie par l'exécution des tâches du DAG précédent. Pour en savoir plus, consultez la documentation Airflow.

Les tâches mises en file d'attente depuis trop longtemps seront annulées et reprogrammées.

Si une tâche Airflow est conservée trop longtemps dans la file d'attente, le planificateur la reprogrammera pour exécution une fois que le délai défini dans l'option de configuration Airflow [scheduler]task_queued_timeout sera écoulé. La valeur par défaut est 2400. Dans les versions d'Airflow antérieures à la version 2.3.1, la tâche est également marquée comme ayant échoué et fait l'objet d'une nouvelle tentative si elle y est éligible.

Pour observer les symptômes de cette situation, vous pouvez consulter le graphique indiquant le nombre de tâches en file d'attente (onglet "Surveillance" de l'interface utilisateur Cloud Composer). Si les pics de ce graphique ne diminuent pas au bout de deux heures environ, les tâches seront très probablement reprogrammées (sans journaux), suivies d'entrées de journal "Les tâches adoptées étaient toujours en attente..." dans les journaux du planificateur. Dans ce cas, le message "Fichier journal introuvable..." peut s'afficher dans les journaux des tâches Airflow, car la tâche n'a pas été exécutée.

En général, ce comportement est attendu et la prochaine instance de la tâche planifiée est censée être exécutée selon la planification. Si vous constatez de nombreux cas de ce type dans vos environnements Cloud Composer, cela peut signifier qu'il n'y a pas assez de nœuds de calcul Airflow dans votre environnement pour traiter toutes les tâches planifiées.

Solution : Pour résoudre ce problème, vous devez vous assurer que les nœuds de calcul Airflow disposent toujours de la capacité nécessaire pour exécuter les tâches en file d'attente. Par exemple, vous pouvez augmenter le nombre de nœuds de calcul ou la simultanéité des nœuds de calcul. Vous pouvez également ajuster le parallélisme ou les pools pour éviter de mettre en file d'attente des tâches au-delà de votre capacité.

Approche de Cloud Composer concernant le paramètre min_file_process_interval

Cloud Composer modifie la façon dont [scheduler]min_file_process_interval est utilisé par le planificateur Airflow.

Airflow 1

Si Cloud Composer utilise Airflow 1, les utilisateurs peuvent définir la valeur de [scheduler]min_file_process_interval entre 0 et 600 secondes. Les valeurs supérieures à 600 secondes donnent les mêmes résultats que si [scheduler]min_file_process_interval est défini sur 600 secondes.

Airflow 2

Dans les versions de Cloud Composer antérieures à la version 1.19.9, [scheduler]min_file_process_interval est ignoré.

Versions de Cloud Composer ultérieures à la version 1.19.9 :

Le planificateur Airflow est redémarré après un certain nombre de fois où tous les DAG sont planifiés. Le paramètre [scheduler]num_runs contrôle le nombre de fois où cela est effectué par le planificateur. Lorsque le planificateur atteint [scheduler]num_runs boucles de planification, il est redémarré. Le planificateur est un composant sans état. Un tel redémarrage est un mécanisme d'autoréparation pour tout problème que le planificateur pourrait rencontrer. La valeur par défaut de [scheduler]num_runs est 5 000.

[scheduler]min_file_process_interval peut être utilisé pour configurer la fréquence d'analyse des DAG, mais ce paramètre ne peut pas être plus long que le temps nécessaire à un planificateur pour effectuer des boucles [scheduler]num_runs lors de la planification de vos DAG.

Marquer les tâches comme ayant échoué après avoir atteint dagrun_timeout

Le planificateur marque les tâches non terminées (en cours d'exécution, planifiées et en file d'attente) comme ayant échoué si une exécution de DAG ne se termine pas dans dagrun_timeout (paramètre de DAG).

Solution :

Symptômes d'une charge importante sur la base de données Airflow

Il arrive que l'entrée de journal d'avertissement suivante s'affiche dans les journaux du planificateur Airflow :

Scheduler heartbeat got an exception: (_mysql_exceptions.OperationalError) (2006, "Lost connection to MySQL server at 'reading initial communication packet', system error: 0")"

Des symptômes similaires peuvent également être observés dans les journaux des nœuds de calcul Airflow :

Pour MySQL :

(_mysql_exceptions.OperationalError) (2006, "Lost connection to MySQL server at
'reading initial communication packet', system error: 0")"

Pour PostgreSQL :

psycopg2.OperationalError: connection to server at ... failed

Ces erreurs ou avertissements peuvent être le signe d'une surcharge de la base de données Airflow en raison du nombre de connexions ouvertes ou du nombre de requêtes exécutées en même temps, que ce soit par les planificateurs ou par d'autres composants Airflow tels que les nœuds de calcul, les déclencheurs et les serveurs Web.

Solutions possibles :

Le serveur Web affiche l'avertissement "Le planificateur ne semble pas être en cours d'exécution"

Le planificateur signale régulièrement sa pulsation à la base de données Airflow. En fonction de ces informations, le serveur Web Airflow détermine si le planificateur est actif.

Parfois, si le planificateur est fortement sollicité, il peut ne pas être en mesure de signaler son signal de vie toutes les [scheduler]scheduler_heartbeat_sec secondes.

Dans ce cas, le serveur Web Airflow peut afficher l'avertissement suivant :

The scheduler does not appear to be running. Last heartbeat was received <X>
seconds ago.

Solutions possibles :

  • Augmentez les ressources de processeur et de mémoire pour le programmateur.

  • Optimisez vos DAG afin que leur analyse et leur planification soient plus rapides et ne consomment pas trop de ressources du planificateur.

  • Évitez d'utiliser des variables globales dans les DAG Airflow. Utilisez plutôt les variables d'environnement et les variables Airflow.

  • Augmentez la valeur de l'option de configuration Airflow [scheduler]scheduler_health_check_threshold afin que le serveur Web attende plus longtemps avant de signaler l'indisponibilité du planificateur.

Solutions de contournement pour les problèmes rencontrés lors du remplissage des DAG

Il peut arriver que vous souhaitiez réexécuter des DAG qui ont déjà été exécutés. Pour ce faire, utilisez une commande de CLI Airflow comme suit :

Airflow 2

gcloud composer environments run \
  ENVIRONMENT_NAME \
  --location LOCATION \
   dags backfill -- -B \
   -s START_DATE \
   -e END_DATE \
   DAG_NAME

Pour relancer uniquement les tâches ayant échoué pour un DAG spécifique, utilisez également l'argument --rerun-failed-tasks.

Airflow 1

gcloud composer environments run \
  ENVIRONMENT_NAME \
  --location LOCATION \
  backfill -- -B \
  -s START_DATE \
  -e END_DATE \
  DAG_NAME

Pour relancer uniquement les tâches ayant échoué pour un DAG spécifique, utilisez également l'argument --rerun_failed_tasks.

Remplacez :

  • ENVIRONMENT_NAME par le nom de l'environnement.
  • LOCATION par la région dans laquelle se trouve l'environnement.
  • START_DATE avec une valeur pour le paramètre DAG start_date, au format YYYY-MM-DD.
  • END_DATE avec une valeur pour le paramètre DAG end_date, au format YYYY-MM-DD.
  • DAG_NAME par le nom du DAG.

Il arrive que l'opération de remplissage génère une situation de blocage dans laquelle le remplissage n'est pas possible, car une tâche est verrouillée. Exemple :

2022-11-08 21:24:18.198 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.201 CET -------- --------- -------- ------------
2022-11-08 21:24:18.202 CET 2022-11-08 21:24:18.203 CET These tasks are deadlocked:
2022-11-08 21:24:18.203 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.204 CET ----------------------- ----------- ----------------------------------- ------------
2022-11-08 21:24:18.204 CET <DAG name> <Task name> backfill__2022-10-27T00:00:00+00:00 1
2022-11-08 21:24:19.249 CET Command exited with return code 1
...
2022-11-08 21:24:19.348 CET Failed to execute job 627927 for task backfill

Dans certains cas, vous pouvez utiliser les solutions de contournement suivantes pour surmonter les blocages :

  • Désactivez le mini-planificateur en remplaçant [core]schedule_after_task_execution par False.

  • Exécutez des remplissages pour des plages de dates plus courtes. Par exemple, définissez START_DATE et END_DATE pour spécifier une période d'un jour seulement.

Étapes suivantes