Résoudre les problèmes liés au processeur DAG

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Cette page ne traite que des problèmes liés au traitement des fichiers DAG. Pour les problèmes de planification des tâches, consultez Résoudre les problèmes liés au programmateur Airflow.

Résoudre les problèmes liés aux workflows

Inspecter les journaux du processeur DAG

Si vos DAG sont complexes, il est possible que les processeurs DAG Airflow ne les analysent pas tous. Cela peut entraîner de nombreux problèmes dont les symptômes sont les suivants.

Symptômes :

  • Si un processeur DAG rencontre des problèmes lors de l'analyse de vos DAG, cela peut entraîner une combinaison des problèmes listés. Si les DAG sont générés de manière dynamique, ces problèmes peuvent avoir un impact plus important que les DAG statiques.

    • Les DAG ne sont pas visibles dans l'UI Airflow ni dans l'UI DAG.
    • Les DAG ne sont pas planifiés pour l'exécution.
    • Des erreurs sont présentes dans les journaux du processeur DAG, par exemple :
    dag-processor-manager [2023-04-21 21:10:44,510] {manager.py:1144} ERROR -
    Processor for /home/airflow/gcs/dags/dag-example.py with PID 68311 started
    at 2023-04-21T21:09:53.772793+00:00 has timed out, killing it.
    

    ou

    dag-processor-manager [2023-04-26 06:18:34,860] {manager.py:948} ERROR -
    Processor for /home/airflow/gcs/dags/dag-example.py exited with return
    code 1.
    
  • Les tâches Airflow planifiées pour exécution sont annulées, et les exécutions de DAG pour les DAG qui n'ont pas pu être analysés peuvent être marquées comme failed. Exemple :

    airflow-scheduler Failed to get task '<TaskInstance: dag-example.task1--1
    manual__2023-04-17T10:02:03.137439+00:00 [removed]>' for dag
    'dag-example'. Marking it as removed.
    

Solution :

  • Augmentez les paramètres liés à l'analyse des DAG :

  • Corrigez ou supprimez les DAG qui posent problème aux processeurs DAG.

Inspecter les durées d'analyse des DAG

Pour vérifier si le problème se produit au moment de l'analyse du DAG, procédez comme suit :

Console

Dans la console Google Cloud , vous pouvez utiliser la page Surveillance et l'onglet Journaux pour inspecter les durées d'analyse des DAG.

Inspectez les durées d'analyse des DAG sur la page de surveillance Cloud Composer :

  1. Dans la console Google Cloud , accédez à la page Environnements.

    [Accéder à la page "Environnements"][console-list-env]

  2. Dans la liste des environnements, cliquez sur le nom de votre environnement. La page Surveillance s'affiche.

  3. Dans l'onglet Surveillance, sélectionnez Statistiques DAG et consultez le graphique Temps total d'analyse pour tous les fichiers DAG afin d'identifier les problèmes potentiels. Nous vous recommandons de surveiller ce graphique pendant un certain temps pour identifier les problèmes d'analyse des DAG sur plusieurs cycles d'analyse des DAG.

    Le temps d&#39;analyse total pour tous les fichiers DAG s&#39;affiche dans l&#39;onglet &quot;Surveillance&quot;.

Inspectez les durées d'analyse des DAG dans l'onglet Journaux Cloud Composer :

  1. Dans la console Google Cloud , accédez à la page Environnements.

    [Accéder à la page "Environnements"][console-list-env]

  2. Dans la liste des environnements, cliquez sur le nom de votre environnement. La page Surveillance s'affiche.

  3. Accédez à l'onglet Journaux, puis sélectionnez la section Gestionnaire de traitement des DAG dans l'arborescence de navigation Tous les journaux.

  4. Consultez les journaux dag-processor-manager et identifiez les problèmes potentiels.

    Les journaux du processeur DAG indiquent les durées d&#39;analyse des DAG.

gcloud

Exécutez la commande dags report pour afficher le temps d'analyse de tous vos DAG.

gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags report

Remplacez :

  • ENVIRONMENT_NAME par le nom de l'environnement.
  • LOCATION par la région dans laquelle se trouve l'environnement.

Le résultat de la commande ressemble à ceci :

file                  | duration       | dag_num | task_num | dags
======================+================+=========+==========+===================
/manydagsbig.py       | 0:00:00.038334 | 2       | 10       | serial-0,serial-0
/airflow_monitoring.py| 0:00:00.001620 | 1       | 1        | airflow_monitoring

Recherchez la valeur duration pour chacun des DAG listés dans le tableau. Une valeur élevée peut indiquer que l'un de vos DAG n'est pas mis en œuvre de manière optimale. La table de sortie vous permet d'identifier les DAG dont la durée d'analyse est longue.

Résoudre les problèmes rencontrés au moment de l'analyse du DAG

Les sections suivantes décrivent les symptômes et les correctifs potentiels pour certains problèmes courants qui surviennent au moment de l'analyse du DAG.

Nombre limité de threads

Le fait d'autoriser les processeurs DAG à utiliser un nombre limité de threads peut avoir une incidence sur le temps d'analyse des DAG.

Pour résoudre le problème, remplacez l'option de configuration Airflow [scheduler]parsing_processes. Définissez la valeur initiale sur le nombre de processeurs virtuels utilisés par un seul processeur de DAG. Ajustez ensuite l'allocation des ressources pour les processeurs DAG afin qu'ils fonctionnent à environ 70 % de la capacité de processeur virtuel ou de mémoire.

Forcer le processeur DAG à ignorer les fichiers inutiles

Vous pouvez améliorer les performances du processeur DAG Airflow en ignorant les fichiers inutiles dans le dossier des DAG. Le processeur DAG Airflow ignore tous les fichiers et dossiers spécifiés dans le fichier .airflowignore.

Pour que le processeur DAG Airflow ignore les fichiers inutiles, procédez comme suit :

  1. Créez un fichier .airflowignore.
  2. Dans ce fichier, répertoriez les fichiers et dossiers à ignorer.
  3. Importez ce fichier dans le dossier /dags du bucket de votre environnement.

Pour en savoir plus sur le format de fichier .airflowignore, consultez la documentation Airflow.

Airflow traite les DAG mis en veille

Vous pouvez suspendre les DAG pour les empêcher de s'exécuter. Cela permet d'économiser les ressources des nœuds de calcul Airflow.

Cependant, les processeurs DAG Airflow continuent d'analyser les DAG interrompus. Si vous souhaitez améliorer les performances des processeurs DAG, utilisez .airflowignore ou supprimez les DAG mis en veille du dossier des DAG.

Problèmes courants

Les sections suivantes décrivent les symptômes et les correctifs potentiels de certains problèmes d'analyse courants.

Délai avant expiration de l'importation du chargement DAG

Symptôme :

  • Dans l'interface Web Airflow, en haut de la page de la liste des DAG, une zone d'alerte rouge indique Broken DAG: [/path/to/dagfile] Timeout.
  • Dans Cloud Monitoring : les journaux airflow-scheduler contiennent des entrées semblables aux suivantes :

    • ERROR - Process timed out
    • ERROR - Failed to import: /path/to/dagfile
    • AirflowTaskTimeout: Timeout

Correctif :

Remplacez l'option de configuration Airflow dag_file_processor_timeout et allouez plus de temps à l'analyse du DAG :

Section Clé Valeur
core dag_file_processor_timeout Nouvelle valeur du délai

Un DAG n'est pas visible dans l'UI Airflow ni dans l'UI DAG, et le planificateur ne le planifie pas.

Le processeur DAG analyse chaque DAG avant que celui-ci puisse être planifié par le programmeur et avant qu'un DAG ne devienne visible dans l'UI Airflow ou l'UI DAG.

Les options de configuration Airflow suivantes définissent les délais d'expiration pour l'analyse des DAG :

Si un DAG n'est pas visible dans l'UI Airflow ou l'UI DAG :

  • Consultez les journaux du processeur DAG pour vérifier s'il est en mesure de traiter correctement votre DAG. En cas de problème, les entrées de journal suivantes peuvent s'afficher dans les journaux du processeur de DAG ou du planificateur :

    [2020-12-03 03:06:45,672] {dag_processing.py:1334} ERROR - Processor for
    /usr/local/airflow/dags/example_dag.py with PID 21903 started at
    2020-12-03T03:05:55.442709+00:00 has timed out, killing it.
    
  • Vérifiez les journaux du planificateur pour voir s'il fonctionne correctement. En cas de problème, les entrées de journal suivantes peuvent s'afficher dans les journaux du planificateur :

    DagFileProcessorManager (PID=732) last sent a heartbeat 240.09 seconds ago! Restarting it
    Process timed out, PID: 68496
    

Solutions :

  • Corrigez toutes les erreurs d'analyse de DAG. Le processeur DAG analyse plusieurs DAG. Dans de rares cas, les erreurs d'analyse d'un DAG peuvent avoir un impact négatif sur l'analyse d'autres DAG.

  • Si l'analyse de votre DAG prend plus de temps que le nombre de secondes défini dans [core]dagbag_import_timeout, augmentez ce délai.

  • Si l'analyse de tous vos DAG prend plus de temps que le nombre de secondes défini dans [core]dag_file_processor_timeout, augmentez ce délai d'expiration.

  • Si votre DAG met beaucoup de temps à être analysé, cela peut également signifier qu'il n'est pas mis en œuvre de manière optimale. Par exemple, s'il lit de nombreuses variables d'environnement ou effectue des appels à des services externes ou à la base de données Airflow. Dans la mesure du possible, évitez d'effectuer de telles opérations dans les sections globales des DAG.

  • Augmentez les ressources de processeur et de mémoire pour le processeur de DAG afin qu'il puisse fonctionner plus rapidement.

  • Réduisez la fréquence d'analyse des DAG.

  • Réduisez la charge sur la base de données Airflow.

Étapes suivantes