Résoudre les problèmes liés à Airflow Scheduler

Cette page fournit des étapes de dépannage et des informations sur les problèmes courants liés au planificateur Airflow.

Identifier la source du problème

Pour commencer le dépannage, déterminez si le problème se produit au moment de l'analyse du DAG ou pendant le traitement des tâches au moment de l'exécution. Pour en savoir plus sur le temps d'analyse et la durée d'exécution du DAG, consultez la section Différence entre le temps d'analyse du DAG et la durée d'exécution du DAG.

Inspecter les durées d'analyse des DAG

Pour vérifier si le problème se produit au moment de l'analyse du DAG, procédez comme suit :

Console

  1. Dans Cloud Console, accédez à la page Environnements.

    Ouvrir la page "Environnements"

  2. Sélectionnez votre environnement dans la liste.
  3. Dans l'onglet Surveillance, consultez le graphique Temps total d'analyse pour tous les fichiers DAG de la section Exécutions DAG et identifiez les problèmes potentiels.

    La section "Exécutions DAG" de l'onglet "Surveillance" de Composer affiche les métriques d'état des DAG dans votre environnement.

gcloud

Exécutez la commande list_dags avec l'option -r pour afficher le temps d'analyse de tous vos DAG.

gcloud composer environments run ENVIRONMENT_NAME \
  --location ENVIRONMENT_LOCATION \
  list_dags -- -r

Remplacez les éléments suivants :

  • ENVIRONMENT_NAME : nom de votre environnement
  • ENVIRONMENT_LOCATION : région de votre environnement

Le résultat de la commande ressemble à ceci :

-------------------------------------------------------------------
DagBag loading stats for /home/airflow/gcs/dags
-------------------------------------------------------------------
Number of DAGs: 5
Total task number: 13
DagBag parsing time: 0.6765180000000001
-----------+----------+---------+----------+-----------------------
file       | duration | dag_num | task_num | dags
-----------+----------+---------+----------+-----------------------
/dag_1.py  | 0.6477   |       1 |        2 | ['dag_1']
/dag_2.py  | 0.018652 |       1 |        2 | ['dag_2']
/dag_3.py  | 0.004024 |       1 |        6 | ['dag_3']
/dag_4.py  | 0.003476 |       1 |        2 | ['dag_4']
/dag_5.py  | 0.002666 |       1 |        1 | ['dag_5']
-----------+----------+---------+----------+-----------------------

Recherchez la valeur Temps d'analyse DagBag. Une valeur élevée peut indiquer que l'un de vos DAG n'est pas mis en œuvre de manière optimale. La table de sortie vous permet d'identifier les DAG dont la durée d'analyse est longue.

Surveiller les tâches en cours et en file d'attente

Pour vérifier si des tâches sont bloquées dans une file d'attente, procédez comme suit :

  1. Dans Cloud Console, accédez à la page Environnements.

    Ouvrir la page "Environnements"

  2. Sélectionnez le projet dans la liste.
  3. Dans l'onglet Surveillance, consultez le graphique Tâches en cours et en file d'attente dans la section Exécutions DAG et identifiez les problèmes potentiels.

Résoudre les problèmes rencontrés au moment de l'analyse du DAG

Les sections suivantes décrivent les symptômes et les correctifs potentiels pour certains problèmes courants qui surviennent au moment de l'analyse du DAG.

Nombre limité de threads

Le fait d'autoriser le gestionnaire de processeurs DAG (la partie du programmeur qui traite les fichiers DAG) à utiliser un nombre limité de threads peut avoir une incidence sur le temps d'analyse des DAG. Pour résoudre le problème, appliquez les modifications suivantes au fichier de configuration airflow.cfg :

  • Pour Airflow 1.10.12 et versions antérieures, utilisez le paramètre max_threads :

    [scheduler]
    max_threads = <NUMBER_OF_CORES_IN_MACHINE - 1>
    
  • Pour Airflow 1.10.14 et versions ultérieures, utilisez le paramètre parsing_processes :

    [scheduler]
    parsing_processes = <NUMBER_OF_CORES_IN_MACHINE - 1>
    

Remplacez NUMBER_OF_CORES_IN_MACHINE par le nombre de cœurs des machines de nœuds de calcul.

Répartition du nombre et de la durée des tâches

Airflow est connu pour avoir des difficultés avec la planification d'un grand nombre de petites tâches. Dans de telles situations, il est préférable d'utiliser un nombre plus faible de tâches mieux consolidées.

La planification simultanée d'un grand nombre de DAG ou de tâches peut également être source de problèmes. Pour éviter ce problème, répartissez vos tâches de manière plus uniforme au fil du temps.

Résoudre les problèmes liés aux tâches en cours d'exécution et en file d'attente

Les sections suivantes décrivent les symptômes et les correctifs potentiels pour certains problèmes courants qui surviennent au moment de l'exécution ou de la mise en file d'attente des tâches.

Les files d'attente de tâches sont trop longues

Dans certains cas, une file d'attente de tâches peut être trop longue pour le programmeur. Pour en savoir plus sur l'optimisation des paramètres de nœud de calcul et de Celery, consultez la page Faire évoluer votre environnement Cloud Composer avec votre entreprise.

Ressources de cluster limitées

Vous risquez de rencontrer des problèmes de performances si le cluster GKE de votre environnement est trop petit pour gérer tous vos DAG et tâches. Dans ce cas, essayez l'une des solutions suivantes :

  • Créez un environnement avec un type de machine offrant plus de performances et migrez vos DAG vers cet environnement.
  • Créez d'autres environnements Cloud Composer et répartissez les DAG sur ces environnements.
  • Modifiez le type de machine pour les nœuds GKE, comme décrit dans la section Mettre à niveau le type de machine pour les nœuds GKE. Cette procédure étant sujette aux erreurs, il s'agit de l'option la moins recommandée.
  • Mettez à niveau le type de machine de l'instance Cloud SQL qui exécute la base de données Airflow dans votre environnement, par exemple à l'aide des commandes gcloud composer environments update. Une base de données Airflow peu performante peut ralentir le programmeur.

Éviter la planification des tâches pendant les intervalles de maintenance

Vous pouvez définir des intervalles de maintenance spécifiques pour votre environnement. Au cours de ces périodes, des événements de maintenance pour Cloud SQL et GKE ont lieu.

Évitez de programmer des exécutions DAG pendant les intervalles de maintenance, car cela peut entraîner des problèmes de planification ou d'exécution.

Forcer le programmeur Airflow à ignorer les fichiers inutiles

Vous pouvez améliorer les performances du programmeur Airflow en ignorant les fichiers inutiles dans le dossier des DAG. Le programmeur Airflow ignore tous les fichiers et dossiers spécifiés dans le fichier .airflowignore.

Pour que le programmeur Airflow ignore les fichiers inutiles, procédez comme suit :

  1. Créez un fichier .airflowignore.
  2. Dans ce fichier, répertoriez les fichiers et dossiers à ignorer.
  3. Importez ce fichier dans le dossier /dags du bucket de votre environnement.

Pour en savoir plus sur le format de fichier .airflowignore, consultez la documentation Airflow.

Le programmeur Airflow traite des DAG interrompus

Les utilisateurs Airflow peuvent interrompre un DAG pour en éviter l'exécution. Cela permet d'économiser les cycles de traitement des nœuds de calcul Airflow.

Cependant, le programmeur Airflow continue d'analyser les DAG interrompus. Si vous souhaitez vraiment améliorer les performances du programmeur Airflow, utilisez .airflowignore ou supprimez les DAG interrompus dans le dossier des DAG.

Utilisation de "wait_for_downstream" dans vos DAG

Si vous définissez le paramètre wait_for_downstream sur True dans vos DAG, pour qu'une tâche réussisse, toutes les tâches immédiatement en aval de cette tâche doivent également réussir. Cela signifie que l'exécution des tâches d'un DAG donné peut être ralentie par l'exécution des tâches du DAG précédent. Pour en savoir plus, consultez la documentation Airflow.

Scaling de la configuration Airflow

Airflow fournit des options de configuration Airflow qui contrôlent le nombre de tâches et de DAG Airflow pouvant être exécutés simultanément. Pour définir ces options de configuration, remplacez leurs valeurs pour votre environnement.

  • Simultanéité des nœuds de calcul

    Le paramètre [core]worker_concurrency contrôle le nombre maximal de tâches qu'un nœud de calcul Airflow peut exécuter en même temps. Si vous multipliez la valeur de ce paramètre par le nombre de nœuds de calcul Airflow dans votre environnement Cloud Composer, vous obtenez le nombre maximal de tâches pouvant être exécutées simultanément dans votre environnement. Ce nombre est limité par l'option de configuration Airflow [core]parallelism, décrite en détail plus bas.

  • Nombre maximal d'exécutions de DAG actives

    L'option de configuration Airflow [core]max_active_runs_per_dag contrôle le nombre maximal d'exécutions de DAG actives par DAG. Si cette limite est atteinte, le programmeur ne crée pas d'autres exécutions de DAG.

    Une définition incorrecte de ce paramètre peut créer un problème où le programmeur limite l'exécution du DAG, car il ne peut pas créer d'autres instances d'exécution de DAG simultanées.

  • Simultanéité des DAG

    L'option de configuration Airflow [core]dag_concurrency contrôle le nombre maximal d'instances de tâche pouvant s'exécuter simultanément dans chaque DAG. Il s'agit d'un paramètre au niveau du DAG.

    Si ce paramètre est défini de manière incorrecte, vous risquez de rencontrer un problème dans lequel l'exécution d'une instance de DAG est ralentie car le nombre de tâches de DAG pouvant être exécutées simultanément est limité.

  • Parallélisme et taille du pool

    L'option de configuration Airflow [core]parallelism contrôle le nombre de tâches que le programmeur Airflow peut mettre en file d'attente dans la file d'attente de l'exécuteur une fois que toutes les dépendances pour ces tâches sont satisfaites.

    Il s'agit d'un paramètre global pour l'ensemble de la configuration Airflow.

    Les tâches sont mises en file d'attente et exécutées dans un pool. Les environnements Cloud Composer n'utilisent qu'un seul pool. La taille de ce pool contrôle le nombre de tâches pouvant être mises en file d'attente simultanément par le programmeur. Si la taille du pool est trop faible, le programmeur ne pourra plus mettre de tâches en file d'attente avant même que les seuils (définis par l'option de configuration [core]parallelism et par l'option de configuration [core]worker_concurrency multipliée par le nombre de nœuds de calcul Airflow) ne soient dépassés.

    Vous pouvez configurer la taille du pool dans l'interface utilisateur Airflow (Menu > Admin > Pools). Ajustez la taille du pool en fonction du niveau de parallélisme attendu dans votre environnement.