Dépanner les DAG

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Cette page fournit les étapes de dépannage et des informations sur les workflows courants les problèmes de sécurité.

De nombreux problèmes d'exécution des DAG sont causés par des performances d'environnement non optimales. Vous pouvez optimiser votre environnement Cloud Composer 2 en suivant les instructions du guide Optimize sur les performances et les coûts de l'environnement.

Certains problèmes d'exécution de DAG peuvent être dus au fait que le planificateur Airflow ne fonctionne pas correctement ou de manière optimale. Veuillez suivre Instructions pour résoudre les problèmes liés au programmeur pour résoudre ces problèmes.

Résoudre un problème lié aux workflows

Pour commencer à résoudre les problèmes, procédez comme suit :

  1. Vérifiez les journaux Airflow.

    Vous pouvez augmenter le niveau de journalisation d'Airflow en remplaçant l'option de configuration Airflow suivante.

    Airflow 2

    Section Clé Valeur
    logging logging_level La valeur par défaut est INFO. Définissez la valeur sur DEBUG pour obtenir plus de verbosité dans les messages de journal.

    Airflow 1

    Section Clé Valeur
    core logging_level La valeur par défaut est INFO. Définissez la valeur sur DEBUG pour obtenir plus de verbosité dans les messages de journal.
  2. Consultez le tableau de bord Monitoring.

  3. Consultez Cloud Monitoring.

  4. Dans la console Google Cloud, recherchez les erreurs sur les pages de les composants de votre environnement ;

  5. Dans l'interface Web Airflow, recherchez les instances de tâche ayant échoué dans la vue graphique du DAG.

    Section Clé Valeur
    webserver dag_orientation LR, TB, RL ou BT

Déboguer des échecs de l'opérateur

Pour déboguer un échec de l'opérateur, procédez comme suit :

  1. Recherchez les erreurs spécifiques à la tâche.
  2. Vérifiez les journaux Airflow.
  3. Consultez Cloud Monitoring.
  4. Vérifiez les journaux spécifiques à l'opérateur.
  5. Corrigez les erreurs.
  6. Importez le DAG dans le dossier dags/.
  7. Dans l'interface Web Airflow, effacez les états antérieurs du DAG.
  8. Relancez ou exécutez le DAG.

Résoudre les problèmes d'exécution des tâches

Airflow est un système distribué avec de nombreuses entités telles que le planificateur, l'exécuteur et les nœuds de calcul qui communiquent entre eux via une file d'attente de tâches et la base de données Airflow, et envoient des signaux (comme SIGTERM). Le diagramme suivant présente une vue d'ensemble des interconnexions entre les composants Airflow.

Interaction entre les composants Airflow
Figure 1. Interaction entre les composants Airflow (cliquez pour agrandir)

Dans un système distribué comme Airflow, il peut y avoir des problèmes de connectivité réseau ou l'infrastructure sous-jacente peut rencontrer des problèmes intermittents. Cela peut entraîner des situations où des tâches peuvent échouer et être reprogrammées pour l'exécution, ou où des tâches peuvent ne pas être correctement exécutées (par exemple, des tâches zombies ou des tâches bloquées lors de l'exécution). Airflow dispose de mécanismes dans de telles situations et reprendre automatiquement le fonctionnement normal. Les sections suivantes expliquent les problèmes courants qui se produisent lors de l'exécution des tâches par Airflow : tâches zombies, pilules empoisonnées et signaux SIGTERM.

Résoudre les problèmes liés aux tâches zombie

Airflow détecte deux types de non-correspondance entre une tâche et un processus qui l'exécute :

  • Les tâches zombies sont des tâches censées s'exécuter, mais qui ne sont pas en cours d'exécution. Cela peut se produire si le processus de la tâche a été arrêté ou n'est pas répondant si le nœud de calcul Airflow n'a pas signalé l'état d'une tâche à temps ; parce qu'elle est surchargée, ou si la VM où la tâche est exécutée a été arrêtée. Airflow détecte ces tâches régulièrement et échoue ou relance la tâche. en fonction des paramètres de la tâche.

    Découvrez des missions zombies

    resource.type="cloud_composer_environment"
    resource.labels.environment_name="ENVIRONMENT_NAME"
    log_id("airflow-scheduler")
    textPayload:"Detected zombie job"
  • Les tâches de morts-vivants ne sont pas censées être en cours d'exécution. Airflow trouve ces tâches périodiquement et les y met fin.

Vous trouverez ci-dessous les raisons et solutions les plus courantes pour les tâches zombies.

Le nœud de calcul Airflow est à court de mémoire

Chaque nœud de calcul Airflow peut exécuter jusqu'à [celery]worker_concurrency instances de tâche simultanément. Si la consommation cumulative de mémoire de ces instances de tâche dépasse la limite de mémoire d'un nœud de calcul Airflow, un processus aléatoire sur celui-ci sera arrêté pour libérer des ressources.

Découvrir les événements de saturation de la mémoire des nœuds de calcul Airflow

resource.type="k8s_node"
resource.labels.cluster_name="GKE_CLUSTER_NAME"
log_id("events")
jsonPayload.message:"Killed process"
jsonPayload.message:("airflow task" OR "celeryd")

Solutions :

Le nœud de calcul Airflow a été évincé

Les évictions de pods font partie intégrante de l'exécution des charges de travail sur Kubernetes. GKE évince les pods en cas d'espace de stockage insuffisant ou les ressources pour les charges de travail ayant une priorité plus élevée.

Découvrir les évictions de nœuds de calcul Airflow

resource.type="k8s_pod"
resource.labels.cluster_name="GKE_CLUSTER_NAME"
resource.labels.pod_name:"airflow-worker"
log_id("events")
jsonPayload.reason="Evicted"

Solutions :

Le nœud de calcul Airflow a été arrêté

Les nœuds de calcul Airflow peuvent être supprimés en externe. Si les tâches en cours d'exécution ne se terminent pas pendant une période d'arrêt progressif, elles seront interrompues et pourraient être détectées comme des zombies.

Découvrir les arrêts de pods de nœuds de calcul Airflow

resource.type="k8s_cluster"
resource.labels.cluster_name="GKE_CLUSTER_NAME"
protoPayload.methodName:"pods.delete"
protoPayload.response.metadata.name:"airflow-worker"

Scénarios et solutions possibles:

  • Les nœuds de calcul Airflow sont redémarrés lors des modifications de l'environnement, telles que les mises à niveau ou l'installation de packages :

    Découvrir les modifications de l'environnement Composer

    resource.type="cloud_composer_environment"
    resource.labels.environment_name="ENVIRONMENT_NAME"
    log_id("cloudaudit.googleapis.com%2Factivity")

    Vous pouvez effectuer ces opérations lorsqu'aucune tâche critique n'est en cours d'exécution ou activer les nouvelles tentatives de tâches.

  • Divers composants peuvent être temporairement indisponibles pendant les opérations de maintenance :

    Découvrir les opérations de maintenance de GKE

    resource.type="gke_nodepool"
    resource.labels.cluster_name="GKE_CLUSTER_NAME"
    protoPayload.metadata.operationType="UPGRADE_NODES"

    Vous pouvez spécifier des intervalles de maintenance pour réduire chevauche l’exécution des tâches critiques.

  • Dans Cloud Composer 2 des versions antérieures à la version 2.4.5, un nœud Airflow de fin Le nœud de calcul peut ignorer le signal SIGTERM et continuer à exécuter les tâches:

    Découvrir le scaling à la baisse par l'autoscaling de Composer

    resource.type="cloud_composer_environment"
    resource.labels.environment_name="ENVIRONMENT_NAME"
    log_id("airflow-worker-set")
    textPayload:"Workers deleted"

    Vous pouvez effectuer une mise à niveau vers une version ultérieure de Cloud Composer, où cette est résolu.

Le nœud de calcul Airflow était soumis à une charge importante

La quantité de ressources de processeur et de mémoire disponibles pour un nœud de calcul Airflow est limitée par la configuration de l'environnement. Si une utilisation se rapproche des limites, cela entraînerait un conflit de ressources et des retards inutiles pendant la tâche l'exécution. Dans les situations extrêmes, lorsque les ressources sont insuffisantes pendant de longues périodes cela pourrait causer des tâches zombies.

Solutions :

La base de données Airflow était soumise à une charge importante

Une base de données est utilisée par divers composants Airflow pour communiquer entre eux et, en particulier pour stocker les pulsations des instances de tâches. Pénurie de ressources sur le peut entraîner des temps de requête plus longs et peut affecter l'exécution d'une tâche.

Solutions :

La base de données Airflow était temporairement indisponible

Un nœud de calcul Airflow peut mettre du temps à détecter et à gérer correctement les erreurs intermittentes, telles que les problèmes de connectivité temporaires. Elle peut dépasser la valeur par défaut de détection des zombies.

Découvrir les délais avant expiration des pulsations Airflow

resource.type="cloud_composer_environment"
resource.labels.environment_name="ENVIRONMENT_NAME"
log_id("airflow-worker")
textPayload:"Heartbeat time limit exceeded"

Solutions :

  • Augmentez le délai avant expiration des tâches zombies et remplacez la valeur de l'option de configuration Airflow [scheduler]scheduler_zombie_task_threshold :

    Section Clé Valeur Remarques
    scheduler scheduler_zombie_task_threshold Nouveau délai avant expiration (en secondes) La valeur par défaut est 300.

Résoudre les problèmes de pilule empoisonnée

Poison Pill est un mécanisme utilisé par Airflow pour arrêter les tâches Airflow.

Airflow utilise la pilule de poison dans les situations suivantes :

  • Lorsqu'un planificateur met fin à une tâche qui ne s'est pas terminée à temps.
  • Lorsqu'une tâche expire ou est exécutée pendant trop longtemps.

Lorsqu'Airflow utilise une pilule empoisonnée, les entrées de journal suivantes s'affichent dans les journaux d'un nœud de calcul Airflow qui a exécuté la tâche:

  INFO - Subtask ... WARNING - State of this instance has been externally set
  to success. Taking the poison pill.
  INFO - Subtask ... INFO - Sending Signals.SIGTERM to GPID <X>
  INFO - Subtask ... ERROR - Received SIGTERM. Terminating subprocesses.

Solutions possibles:

  • Vérifiez le code de la tâche pour détecter les erreurs susceptibles de l'exécuter trop longtemps.
  • (Cloud Composer 2) Augmenter le processeur et la mémoire pour Airflow pour accélérer l'exécution des tâches.
  • Augmentez la valeur de l'option de configuration Airflow [celery_broker_transport_options]visibility-timeout.

    Par conséquent, le planificateur attend plus longtemps qu'une tâche soit terminée, avant de la considérer comme une tâche zombie. Cette option est particulièrement utile pour les tâches longues qui durent plusieurs heures. Si la valeur est trop faible (par exemple, trois heures), le planificateur considère les tâches exécutées pendant cinq ou six heures comme "bloquées" (tâches zombies).

  • Augmenter la valeur d'Airflow [core]killed_task_cleanup_time .

    Une valeur plus élevée permet aux nœuds de calcul Airflow de terminer leurs tâches de manière élégante. Si la valeur est trop faible, les tâches Airflow peuvent être interrompues brusquement, sans avoir le temps de terminer leur travail correctement.

Résoudre les problèmes liés aux signaux SIGTERM

Les signaux SIGTERM sont utilisés par Linux, Kubernetes, le planificateur Airflow et Celery pour arrêter les processus chargés d'exécuter des nœuds de calcul Airflow ou des tâches Airflow.

Plusieurs raisons peuvent expliquer l'envoi de signaux SIGTERM dans un environnement :

  • Une tâche est devenue zombie et doit être arrêtée.

  • L’ordonnanceur a découvert un doublon d’une tâche et envoie une pilule poison et SIGTERM indique à la tâche de l'arrêter.

  • Dans Autoscaling horizontal des pods, le cluster GKE Le plan de contrôle envoie des signaux SIGTERM pour supprimer les pods qui ne sont plus nécessaires.

  • Le programmeur peut envoyer des signaux SIGTERM au processus DagFileProcessorManager. Ces signaux SIGTERM sont utilisés par le planificateur pour gérer le cycle de vie du processus DagFileProcessorManager et peuvent être ignorés en toute sécurité.

    Exemple :

    Launched DagFileProcessorManager with pid: 353002
    Sending Signals.SIGTERM to group 353002. PIDs of all processes in the group: []
    Sending the signal Signals.SIGTERM to group 353002
    Sending the signal Signals.SIGTERM to process 353002 as process group is missing.
    
  • Condition de course entre le rappel de battement de cœur et les rappels de sortie dans local_task_job, qui surveille l'exécution de la tâche. Si les battements du cœur détecte qu'une tâche a été marquée comme réussie, il ne peut pas distinguer si La tâche a réussi ou qu'Airflow a été invité à prendre en compte la tâche réussi. Toutefois, il mettra fin à un exécuteur de tâches, sans attendre qu'il se ferme.

    Ces signaux SIGTERM peuvent être ignorés sans risque. La tâche se trouve déjà dans le et que l'exécution de l'exécution du DAG dans son ensemble ne sera pas concernés.

    L'entrée de journal Received SIGTERM. est la seule différence entre l'entrée standard et la fin de la tâche à l’état de réussite.

    Condition de concurrence entre les rappels de battement de cœur et de sortie
    Figure 2. Condition de course entre les rappels de battement de cœur et de sortie (cliquez pour agrandir)
  • Un composant Airflow utilise plus de ressources (processeur, mémoire) que le nœud du cluster ne le permet.

  • Le service GKE effectue des opérations de maintenance et envoie des signaux SIGTERM aux pods exécutés sur un nœud sur le point d'être mis à niveau. Lorsqu'une instance de tâche est arrêtée avec SIGTERM, vous pouvez voir le journal suivant Entrées des journaux d'un nœud de calcul Airflow qui a exécuté la tâche:

{local_task_job.py:211} WARNING - State of this instance has been externally
set to queued. Terminating instance. {taskinstance.py:1411} ERROR - Received
SIGTERM. Terminating subprocesses. {taskinstance.py:1703} ERROR - Task failed
with exception

Solutions possibles:

Ce problème se produit lorsqu'une VM qui exécute la tâche manque de mémoire. Ceci n'est pas aux configurations Airflow, mais aussi à la quantité de mémoire disponible VM.

L'augmentation de la mémoire dépend de la version de Cloud Composer que vous utilisez. Exemple :

  • Dans Cloud Composer 2, vous pouvez attribuer plus de ressources de processeur et de mémoire aux nœuds de calcul Airflow.

  • Dans le cas de Cloud Composer 1, vous pouvez recréer votre environnement à l'aide d'un type de machine plus performant.

  • Dans les deux versions de Cloud Composer, vous pouvez réduire la valeur de l'option de configuration Airflow de la concurrence [celery]worker_concurrency. Cette option détermine le nombre de tâches exécutées simultanément par un nœud de calcul Airflow donné.

Pour en savoir plus sur l'optimisation de votre environnement Cloud Composer 2, consultez Optimiser les performances et les coûts de l'environnement

Requêtes Cloud Logging pour identifier les motifs de redémarrage ou d'éviction des pods

Les environnements Cloud Composer utilisent des clusters GKE en tant qu'infrastructure de calcul couche de données. Dans cette section, vous trouverez des requêtes utiles qui vous aideront trouver les motifs de redémarrages ou d'évictions des nœuds de calcul et du programmeur Airflow.

Les requêtes présentées ci-dessous peuvent être affinées de la manière suivante :

  • Vous pouvez spécifier la période qui vous intéresse dans Cloud Logging (par exemple, les six dernières heures, les trois derniers jours ou une période personnalisée).

  • Vous devez spécifier le CLUSTER_NAME de Cloud Composer.

  • Vous pouvez également limiter la recherche à un pod spécifique en ajoutant POD_NAME.

Découvrir les conteneurs redémarrés

    resource.type="k8s_node"
    log_id("kubelet")
    jsonPayload.MESSAGE:"will be restarted"
    resource.labels.cluster_name="CLUSTER_NAME"
  

Autre requête pour limiter les résultats à un pod spécifique :

    resource.type="k8s_node"
    log_id("kubelet")
    jsonPayload.MESSAGE:"will be restarted"
    resource.labels.cluster_name="CLUSTER_NAME"
    "POD_NAME"
  

Détecter l'arrêt des conteneurs en raison d'un événement de saturation de la mémoire

    resource.type="k8s_node"
    log_id("events")
    (jsonPayload.reason:("OOMKilling" OR "SystemOOM")
      OR jsonPayload.message:("OOM encountered" OR "out of memory"))
    severity=WARNING
    resource.labels.cluster_name="CLUSTER_NAME"
    

Autre requête permettant de limiter les résultats à un pod spécifique:

    resource.type="k8s_node"
    log_id("events")
    (jsonPayload.reason:("OOMKilling" OR "SystemOOM")
      OR jsonPayload.message:("OOM encountered" OR "out of memory"))
    severity=WARNING
    resource.labels.cluster_name="CLUSTER_NAME"
    "POD_NAME"
    

Identifier les conteneurs qui ne s'exécutent plus

    resource.type="k8s_node"
    log_id("kubelet")
    jsonPayload.MESSAGE:"ContainerDied"
    severity=DEFAULT
    resource.labels.cluster_name="CLUSTER_NAME"
    

Autre requête permettant de limiter les résultats à un pod spécifique:

    resource.type="k8s_node"
    log_id("kubelet")
    jsonPayload.MESSAGE:"ContainerDied"
    severity=DEFAULT
    resource.labels.cluster_name="CLUSTER_NAME"
    "POD_NAME"
    

Impact des opérations de mise à jour ou de mise à niveau sur les exécutions de tâches Airflow

Les opérations de mise à jour ou de mise à niveau interrompent les tâches Airflow en cours d'exécution, sauf si une tâche est exécutée en mode différable.

Nous vous recommandons d'effectuer ces opérations lorsque vous prévoyez un impact minimal sur les exécutions de tâches Airflow et de configurer des mécanismes de nouvelle tentative appropriés dans vos DAG et tâches.

Résoudre les problèmes liés aux tâches KubernetesExecutor

CeleryKubernetesExecutor est un type d'exécuteur dans Cloud Composer 3 qui peut utiliser CeleryExecutor et KubernetesExecutor en même temps.

Pour en savoir plus sur le dépannage des tâches exécutées avec KubernetesExecutor, consultez la page Utiliser CeleryKubernetesExecutor.

Problèmes courants

Les sections suivantes décrivent les symptômes et les correctifs potentiels de certains problèmes courants liés aux DAG.

La tâche Airflow a été interrompue par Negsignal.SIGKILL

Il peut arriver que votre tâche utilise plus de mémoire que ce qu'alloue le nœud de calcul Airflow. Dans ce cas, il peut être interrompu par Negsignal.SIGKILL. Le système envoie ce signal pour éviter une consommation de mémoire supplémentaire qui pourrait affecter l'exécution d'autres tâches Airflow. Dans le journal du nœud de calcul Airflow, vous pouvez voir l'entrée de journal suivante:

{local_task_job.py:102} INFO - Task exited with return code Negsignal.SIGKILL

Negsignal.SIGKILL peut également s'afficher sous la forme de code -9.

Solutions possibles :

  • Réduire le worker_concurrency des nœuds de calcul Airflow.

  • Dans le cas de Cloud Composer 2, augmentez la mémoire des nœuds de calcul Airflow.

  • Dans le cas de Cloud Composer 1, passez à un type de machine plus volumineux utilisé dans le cluster Cloud Composer.

  • Optimisez vos tâches pour utiliser moins de mémoire.

  • Gérez les tâches gourmandes en ressources dans Cloud Composer à l'aide de KubernetesPodOperator ou GKEStartPodOperator pour l'isolation des tâches et l'allocation personnalisée des ressources.

La tâche échoue sans émettre de journaux en raison d'erreurs d'analyse DAG

Parfois, il peut y avoir des erreurs de DAG subtiles qui conduisent à une situation où un programmeur Airflow et un processeur DAG peuvent planifier l'exécution de tâches et pour analyser un fichier DAG (respectivement), mais le nœud de calcul Airflow n'exécute pas les tâches. à partir d'un DAG, car il y a des erreurs de programmation dans le fichier DAG Python. Cela peut entraîner une situation où une tâche Airflow est marquée comme Failed et qu'aucun journal de son exécution n'est disponible.

Solutions :

  • Dans les journaux des nœuds de calcul Airflow, vérifiez qu'aucune erreur n'est générée par le nœud de calcul Airflow en raison d'un DAG manquant ou d'erreurs d'analyse des DAG.

  • Augmentez le nombre de paramètres liés à l'analyse du DAG:

    • Augmentez dagbag-import-timeout à au moins 120 secondes (ou plus, si nécessaire).

    • Augmenter dag-file-processor-timeout d'au moins 180 secondes (ou plus, si nécessaire). Cette valeur doit être supérieure à dagbag-import-timeout.

  • Voir aussi Inspectez les journaux du processeur DAG.

La tâche échoue sans émettre de journaux en raison de la pression des ressources

Symptôme : lors de l'exécution d'une tâche, le sous-processus du nœud de calcul Airflow responsable de l'exécution de la tâche Airflow est interrompu brusquement. L'erreur visible dans le journal du nœud de calcul Airflow peut ressembler à l'exemple ci-dessous:

...
File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 412, in trace_task    R = retval = fun(*args, **kwargs)  File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 704, in __protected_call__    return self.run(*args, **kwargs)  File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 88, in execute_command    _execute_in_fork(command_to_exec)  File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 99, in _execute_in_fork
raise AirflowException('Celery command failed on host: ' + get_hostname())airflow.exceptions.AirflowException: Celery command failed on host: airflow-worker-9qg9x
...

Solution :

La tâche échoue sans émettre de journaux en raison de l'éviction du pod

Les pods Google Kubernetes Engine sont soumis au Cycle de vie des pods Kubernetes et éviction des pods Les pics de tâches et la coplanification des nœuds de calcul sont deux des causes les plus courantes d'éviction de pods dans Cloud Composer.

L'éviction des pods peut se produire lorsqu'un pod particulier utilise trop de ressources sur un nœud, par rapport aux attentes de consommation de ressources configurées pour le nœud. Par exemple, l'éviction peut se produire lorsque plusieurs tâches gourmandes en mémoire sont exécutées dans un pod et que leur charge combinée entraîne le dépassement de la limite de consommation de mémoire pour le pod.

Si un pod de nœud de calcul Airflow est évincé, toutes les instances de tâche qui y sont exécutées sont interrompues, puis marquées comme ayant échoué par Airflow.

Les journaux sont mis en mémoire tampon. Si un pod de nœuds de calcul est évincé avant la purge du tampon, les journaux ne sont pas envoyés. L'échec de la tâche sans journaux indique que les nœuds de calcul Airflow sont redémarrés en raison d'une mémoire saturée (OOM, Out Of Memory). Certains journaux peuvent être présents dans Cloud Logging, même si les journaux Airflow n'ont pas été envoyés.

Pour afficher les journaux :

  1. Dans la console Google Cloud, accédez à la page Environnements.

    Accéder à la page Environnements

  2. Dans la liste des environnements, cliquez sur le nom de votre environnement. La page Détails de l'environnement s'ouvre.

  3. Accédez à l'onglet Journaux.

  4. Consultez les journaux des nœuds de calcul individuels sous Tous les journaux -> Journaux Airflow -> Nœuds de calcul -> (nœud de calcul individuel).

L'exécution du DAG est limitée par la mémoire. L'exécution de chaque tâche commence par deux processus Airflow : l'exécution et la surveillance de la tâche. Chaque nœud peut accepter jusqu'à six tâches simultanées (environ 12 processus chargés avec des modules Airflow). Vous pouvez utiliser plus de mémoire, en fonction de la nature du DAG.

Symptôme :

  1. Dans la console Google Cloud, accédez à la page Charges de travail.

    Accéder à la page Charges de travail

  2. Si des pods airflow-worker affichent Evicted, cliquez sur chaque pod évincé et recherchez le message The node was low on resource: memory en haut de la fenêtre.

Correctif :

  • Dans Cloud Composer 1, créez un environnement Cloud Composer avec un type de machine plus volumineux que le type de machine actuel.
  • Dans Cloud Composer 2, augmentez les limites de mémoire pour les nœuds de calcul Airflow.
  • Consultez les journaux des pods airflow-worker pour identifier d'éventuelles causes d'éviction. Pour plus sur la récupération des journaux à partir de pods individuels, consultez Résoudre les problèmes liés aux charges de travail déployées
  • Assurez-vous que les tâches du DAG sont idempotentes et récupérables.
  • Évitez de télécharger des fichiers inutiles dans le système de fichiers local des nœuds de calcul Airflow.

    La capacité du système de fichiers local des nœuds de calcul Airflow est limitée. Par exemple, dans Cloud Composer 2, un nœud de calcul peut disposer de 1 Go à 10 Go d'espace de stockage. Lorsque l'espace de stockage est épuisé, le pod de nœud de calcul Airflow est expulsé par le plan de contrôle GKE. Toutes les tâches exécutées par le worker éjecté échouent.

    Exemples d'opérations problématiques:

    • télécharger des fichiers ou des objets et les stocker localement dans une instance Airflow ; ou un nœud de calcul. Stockez plutôt ces objets directement dans un service approprié tel qu'un bucket Cloud Storage.
    • Accéder à de grands objets dans le dossier /data à partir d'un nœud de calcul Airflow. Le nœud de calcul Airflow télécharge l'objet dans son système de fichiers local. À la place, implémentez vos DAG de sorte que les fichiers volumineux soient traités en dehors du pod de nœuds de calcul Airflow.

Délai avant expiration de l'importation du chargement DAG

Symptôme :

  • Dans l'interface Web Airflow, en haut de la page de la liste des DAG, une zone d'alerte rouge indique Broken DAG: [/path/to/dagfile] Timeout.
  • Dans Cloud Monitoring: les journaux airflow-scheduler contiennent des entrées semblable à:

    • ERROR - Process timed out
    • ERROR - Failed to import: /path/to/dagfile
    • AirflowTaskTimeout: Timeout

Correctif :

Remplacer Airflow dag_file_processor_timeout et laissez plus de temps à l'analyse du DAG:

Section Clé Valeur
core dag_file_processor_timeout Nouvelle valeur du délai

L'exécution du DAG ne se termine pas dans le délai prévu

Symptôme :

Parfois, l'exécution d'un DAG ne s'arrête pas, car des tâches Airflow sont bloquées et le DAG s'exécute dure plus longtemps que prévu. Dans des conditions normales, les tâches Airflow ne restent pas indéfiniment en file d'attente ou en cours d'exécution, car Airflow dispose d'un délai avant expiration et de nettoyage qui aident à éviter cette situation.

Correctif :

  • Utilisez les dagrun_timeout pour les DAG. Exemple : dagrun_timeout=timedelta(minutes=120). Par conséquent, chaque exécution de DAG doit être terminée avant le délai avant expiration de l'exécution du DAG, et les tâches inachevées sont marquées comme Failed ou Upstream Failed. Pour en savoir plus sur les états des tâches Airflow, consultez la page Documentation Apache Airflow

  • Utilisez le paramètre timeout d'exécution de la tâche pour définir un délai par défaut pour les tâches exécutées en fonction des opérateurs Apache Airflow.

Exécutions DAG non exécutées

Symptôme :

Quand la date de planification d'un DAG est définie de manière dynamique, cela peut entraîner diverses des effets secondaires inattendus. Exemple :

  • Une exécution du DAG se situe toujours dans le futur, et le DAG n'est jamais exécuté.

  • Les exécutions de DAG précédentes sont marquées comme exécutées et réussies alors qu'elles n'ont pas été exécutées exécuté.

Pour en savoir plus, consultez la documentation Apache Airflow.

Correctif :

  • Suivez les recommandations de la documentation Apache Airflow.

  • Définissez un start_date statique pour les DAG. Vous pouvez également utiliser catchup=False pour désactiver l'exécution du DAG pour les dates passées.

  • Évitez d'utiliser datetime.now() ou days_ago(<number of days>), sauf si conscient des effets secondaires de cette approche.

Augmentation du trafic réseau vers et depuis la base de données Airflow

La quantité de trafic réseau entre le cluster GKE de votre environnement et la base de données Airflow dépend du nombre de DAG, du nombre de tâches dans les DAG et de la manière dont les DAG accèdent aux données de la base de données Airflow. Les facteurs suivants peuvent influencer l'utilisation du réseau :

  • Requêtes envoyées à la base de données Airflow. Si vos DAG effectuent de nombreuses requêtes, ils génèrent une grande quantité de trafic. Exemples : vérifier l'état des tâches avant de poursuivre avec d'autres tâches, interroger la table XCom, créer un vidage du contenu de la base de données Airflow.

  • Grand nombre de tâches. Plus le nombre de tâches à planifier est élevé, plus du trafic réseau est généré. Ces considérations s'appliquent au nombre total de tâches dans vos DAG et à la fréquence de planification. Lorsque le programmeur Airflow planifie l'exécution du DAG, il envoie des requêtes à la base de données Airflow et génère du trafic.

  • L'interface Web Airflow génère du trafic réseau, car elle envoie des requêtes à la base de données Airflow. L'utilisation intensive de pages avec des graphiques, des tâches et des schémas peut générer de gros volumes de trafic réseau.

Le DAG entraîne le plantage du serveur Web Airflow ou lui fait renvoyer une erreur 502 gateway timeout

Des défaillances du serveur Web peuvent survenir pour plusieurs raisons. Consultez les journaux airflow-webserver dans Cloud Logging pour déterminer la cause de l'erreur 502 gateway timeout.

Calculs lourds

Cette section ne s'applique qu'à Cloud Composer 1.

Évitez d'exécuter des calculs lourds au moment de l'analyse du DAG.

Contrairement aux nœuds de calcul et de planificateur, dont les types de machine peuvent être personnalisés pour augmenter la capacité du processeur et de la mémoire, le serveur Web utilise un type de machine fixe, ce qui peut entraîner des échecs d'analyse du DAG si les calculs réalisés au moment de l'analyse sont trop lourds.

Veuillez prendre en compte que le serveur Web dispose de deux processeurs virtuels et de 2 Go de mémoire. La valeur par défaut pour core-dagbag_import_timeout est de 30 secondes. La valeur du délai avant expiration définit la limite supérieure de la durée pendant laquelle Airflow charge un module Python dans le dossier dags/.

Autorisations incorrectes

Cette section ne s'applique qu'à Cloud Composer 1.

Le serveur Web ne s'exécute pas sous le même compte de service que les nœuds de calcul et le planificateur. En tant que tels, les nœuds de calcul et le planificateur peuvent être en mesure d'accéder à des ressources gérées par l'utilisateur auxquelles le serveur Web n'a pas accès.

Nous vous recommandons d'éviter l'accès à des ressources non publiques lors de l'analyse du DAG. Parfois, c'est inévitable et vous devrez accorder des autorisations au compte de service du serveur Web. Le nom du compte de service est dérivé du domaine de serveur Web. Par exemple, si le domaine est example-tp.appspot.com, le compte de service example-tp@appspot.gserviceaccount.com

Erreurs du DAG

Cette section ne s'applique qu'à Cloud Composer 1.

Le serveur Web s'exécute sur App Engine et est distinct du cluster GKE de votre environnement. Le serveur Web analyse les fichiers de définition du DAG, et une erreur 502 gateway timeout peut se produire en cas d'erreurs dans le DAG. Airflow fonctionne normalement sans serveur Web fonctionnel si le DAG problématique n'interrompt aucun processus en cours d'exécution dans GKE. Dans ce cas, vous pouvez utiliser gcloud composer environments run pour récupérer des détails de votre environnement ou comme solution de contournement en cas d'indisponibilité du serveur Web.

Dans d'autres cas, vous pouvez exécuter l'analyse du DAG dans GKE et rechercher les DAG générant des exceptions fatales Python ou ce délai d'expiration (30 secondes par défaut). Pour résoudre ce problème, connectez-vous à une interface système distante dans un conteneur de nœud de calcul Airflow et testez les erreurs de syntaxe. Pour en savoir plus, reportez-vous à la section Tester les DAG.

Gérer un grand nombre de DAG et de plug-ins dans des dossiers DAG et plug-ins

Le contenu des dossiers /dags et /plugins est synchronisé à partir de le bucket de votre environnement aux systèmes de fichiers locaux des nœuds de calcul Airflow et les planificateurs.

Plus il y a de données stockées dans ces dossiers, plus il faut de temps pour effectuer la synchronisation. Pour résoudre ce problème :

  • Limitez le nombre de fichiers dans les dossiers /dags et /plugins. Ne stockez que le nombre minimal de fichiers requis.

  • Si possible, augmentez l'espace disque disponible pour les programmeurs Airflow et les nœuds de calcul.

  • Si possible, augmentez la quantité de processeur et de mémoire des programmeurs et des nœuds de calcul Airflow afin que l'opération de synchronisation soit effectuée plus rapidement.

  • En cas de très grand nombre de DAG, divisez-les en lots, compressez-les dans des archives ZIP et déployez ces archives dans le dossier /dags. Cette approche accélère le processus de synchronisation des DAG. Les composants Airflow décompressent les archives ZIP avant de traiter les DAG.

  • La génération de DAG dans un programme peut aussi être une méthode pour limiter le nombre de fichiers DAG stockés dans le dossier /dags. Consultez la section sur les DAG programmatiques afin d'éviter les problèmes liés à la planification et à l'exécution des DAG générés de façon automatisée.

Ne planifiez pas les DAG générés de manière programmatique en même temps.

La génération automatisée d'objets DAG à partir d'un fichier DAG est une méthode efficace pour créer de nombreux DAG similaires qui ne présentent que de légères différences.

Il est important de ne pas planifier l'exécution de tous ces DAG immédiatement. Il est fort probable que les nœuds de calcul Airflow ne disposent pas de suffisamment de ressources de processeur et de mémoire pour exécuter toutes les tâches planifiées en même temps.

Pour éviter les problèmes de planification des DAG programmatiques:

  • Augmentez la simultanéité des nœuds de calcul et faites évoluer votre environnement afin qu'il puisse exécuter plus de tâches simultanément.
  • Générez des DAG de manière à répartir leurs planifications de manière uniforme au fil du temps, afin d'éviter de planifier des centaines de tâches en même temps, ce qui permet aux nœuds de calcul Airflow d'avoir le temps d'exécuter toutes les tâches planifiées.

Erreur 504 lors de l'accès au serveur Web Airflow

Consultez la section Erreur 504 lors de l'accès à l'interface utilisateur d'Airflow.

L'exception Lost connection to Postgres server during query est générée pendant ou juste après l'exécution de la tâche.

Lost connection to Postgres server during query exception se produisent souvent lorsque les conditions suivantes sont remplies:

  • Votre DAG utilise PythonOperator ou un opérateur personnalisé.
  • Votre DAG envoie des requêtes à la base de données Airflow.

Si plusieurs requêtes sont effectuées à partir d'une fonction appelable, les traces peuvent pointer vers la ligne self.refresh_from_db(lock_for_update=True) dans le code Airflow de manière incorrecte car il s'agit de la première requête de base de données après l'exécution de la tâche. La cause réelle de l'exception se produit avant, lorsqu'une session SQLAlchemy n'est pas correctement fermée.

Les sessions SQLAlchemy s'appliquent à un thread et sont créées dans une session de fonction appelable qui peut ensuite être prolongée dans le code Airflow. S'il existe des entre les requêtes au cours d'une même session, il est possible que la connexion soit déjà fermé par le serveur Postgres. Le délai avant expiration de la connexion dans les environnements Cloud Composer est d'environ 10 minutes.

Correctif :

  • Utilisez le décorateur airflow.utils.db.provide_session. Ce décorateur fournit une session valide à la base de données Airflow dans le paramètre session et ferme correctement la session à la fin de la fonction.
  • N'utilisez pas une seule fonction de longue durée. Déplacez plutôt toutes les requêtes de base de données vers des fonctions distinctes, afin qu'il existe plusieurs fonctions avec le décorateur airflow.utils.db.provide_session. Dans ce cas, les sessions sont automatiquement fermées après la récupération des résultats de la requête.

Contrôler le temps d'exécution des DAG, des tâches et des exécutions parallèles d'un même DAG

Si vous souhaitez contrôler la durée d'exécution d'un DAG spécifique dure, vous pouvez alors utiliser le paramètre DAG dagrun_timeout pour effectuer donc. Par exemple, si vous vous attendez à ce qu'un seul DAG s'exécute (quel que soit le l'exécution se termine par une réussite ou un échec) ne doit pas durer plus d'une heure, puis définissez ce paramètre sur 3 600 secondes.

Vous pouvez également contrôler la durée d'une tâche Airflow. Pour ce faire, vous pouvez utiliser execution_timeout.

Si vous souhaitez contrôler le nombre d'exécutions de DAG actives DAG spécifique, vous pouvez utiliser [core]max-active-runs-per-dag l'option de configuration Airflow correspondante.

Si vous souhaitez qu'une seule instance d'un DAG s'exécute à un moment donné, définissez Définissez le paramètre max-active-runs-per-dag sur 1.

Problèmes affectant la synchronisation des DAG et des plug-ins avec les planificateurs, les nœuds de calcul et les serveurs Web

Cloud Composer synchronise le contenu des dossiers /dags et /plugins aux planificateurs et aux nœuds de calcul. Certains objets dans les dossiers /dags et /plugins peut empêcher cette synchronisation de fonctionner correctement ou du moins la ralentir.

  • Le dossier /dags est synchronisé avec les planificateurs et les nœuds de calcul. Ce dossier n'est pas synchronisé vers des serveurs Web dans Cloud Composer 2 ou si vous activez DAG Serialization dans Cloud Composer 1.

  • Le dossier /plugins est synchronisé avec les planificateurs, les nœuds de calcul et les serveurs Web.

Vous pouvez rencontrer les problèmes suivants :

  • Vous avez importé des fichiers compressés au format gzip qui utilisent le transcodage de compression dans les dossiers /dags et /plugins. Cela se produit généralement si vous utilisez l'indicateur --gzip-local-all dans un gcloud storage cp pour importer des données dans le bucket.

    Solution : Supprimez l'objet qui utilisait le transcodage de compression, puis réimportez-le dans le bucket.

  • L'un des objets s'appelle ".". Il n'est pas synchronisé avec des planificateurs et des nœuds de calcul, et il pourrait arrêter de se synchroniser.

    Solution: renommez l'objet problématique.

  • Un dossier et un fichier Python DAG portent les mêmes noms, par exemple a.py. Dans ce cas, le fichier DAG n'est pas correctement synchronisé avec les composants Airflow.

    Solution : Supprimez le dossier portant le même nom qu'un fichier Python DAG.

  • L'un des objets des dossiers /dags ou /plugins contient un symbole / à la fin du nom de l'objet. De tels objets peuvent induire en erreur le processus de synchronisation car le symbole / signifie qu'un objet est un dossier, et non un fichier.

    Solution: supprimez le symbole / du nom de l'objet problématique.

  • Ne stockez pas de fichiers inutiles dans les dossiers /dags et /plugins.

    Parfois, les DAG et les plug-ins que vous implémentez sont accompagnés de fichiers supplémentaires, tels que des fichiers stockant des tests pour ces composants. Ces fichiers sont synchronisés avec les nœuds de calcul et les planificateurs, et ont un impact sur le temps nécessaire pour les copier sur les planificateurs, les nœuds de calcul et les serveurs Web.

    Solution: ne stockez pas de fichiers supplémentaires et inutiles dans /dags et /plugins dossiers.

L'erreur Done [Errno 21] Is a directory: '/home/airflow/gcs/dags/...' est générée par les programmeurs et les nœuds de calcul

Ce problème se produit, car les objets peuvent avoir un espace de noms qui se chevauche dans Cloud Storage, tandis que les planificateurs et les travailleurs utilisent des systèmes de fichiers traditionnels. Par exemple, vous pouvez ajouter à la fois un dossier et un objet portant le même nom au bucket d'un environnement. Lorsque le bucket est synchronisé avec les planificateurs et les nœuds de calcul de l'environnement, cette erreur est générée, ce qui peut entraîner l'échec des tâches.

Pour résoudre ce problème, assurez-vous que les espaces de noms ne se chevauchent pas dans bucket de l'environnement. Par exemple, si /dags/misc (un fichier) et /dags/misc/example_file.txt (un autre fichier) se trouvent dans un bucket, une erreur est générée par le planificateur.

Interruptions temporaires lors de la connexion à la base de données de métadonnées Airflow

Cloud Composer s'exécute sur une infrastructure cloud distribuée. Cela signifie que de temps en temps, certains problèmes temporaires peuvent apparaître et interrompre l'exécution de vos tâches Airflow.

Dans ce cas, les messages d'erreur suivants peuvent s'afficher dans le flux de travail des nœuds de calcul Airflow journaux:

"Can't connect to Postgres server on 'airflow-sqlproxy-service.default.svc.cluster.local' (111)"

ou

"Can't connect to Postgres server on 'airflow-sqlproxy-service.default.svc.cluster.local' (104)"

Ces problèmes intermittents peuvent également être causés par des opérations de maintenance effectuées pour vos environnements Cloud Composer.

Généralement, ces erreurs sont intermittentes et si vos tâches Airflow sont idempotentes et que vous avez configuré les tentatives, vous ne devriez pas les voir. Vous pouvez également définir des intervalles de maintenance.

Un autre facteur pouvant entraîner ces erreurs est le manque de ressources dans le cluster de votre environnement. Dans ce cas, vous pouvez augmenter ou optimiser comme décrit dans la section Effectuer le scaling d'environnements Instructions pour optimiser votre environnement.

Une exécution DAG est marquée comme réussie, mais aucune tâche n'est exécutée

Si l'exécution execution_date d'un DAG est antérieure à la valeur start_date du DAG, vous pouvez voir des exécutions de DAG n'ayant aucune exécution de tâche, mais qui sont tout de même marquées comme réussies.

Exécution DAG réussie sans tâches exécutées
Figure 3 : Exécution réussie d'un DAG sans tâches exécutées (cliquez pour agrandir)

Cause

Cette situation peut se produire dans l'un des cas suivants :

  • Une non-concordance est causée par la différence de fuseau horaire entre les execution_date et start_date. Cela peut se produire, par exemple, lorsque en utilisant pendulum.parse(...) pour définir start_date.

  • Le start_date du DAG est défini sur une valeur dynamique, par exemple airflow.utils.dates.days_ago(1).

Solution

  • Assurez-vous que execution_date et start_date utilisent le même fuseau horaire.

  • Spécifiez un start_date statique et combinez-le avec catchup=False pour éviter d'exécuter des DAG avec des dates de début passées.

Un DAG n'est pas visible dans l'interface utilisateur d'Airflow ou de DAG, et le programmeur ne le planifie pas

Le processeur DAG analyse chaque DAG avant qu'il puisse être planifié par le programmeur et avant qu'un DAG ne soit visible l'UI Airflow ou l'UI du DAG.

Les options de configuration Airflow suivantes définissent les délais avant expiration pour l'analyse des DAG :

Si un DAG n'est pas visible dans l'interface utilisateur d'Airflow ou de DAG:

  • Vérifiez les journaux du processeur DAG pour voir s'il est en mesure de traiter correctement votre DAG. En cas de problème, les entrées de journal suivantes peuvent s'afficher dans les journaux du processeur ou du programmeur DAG:
[2020-12-03 03:06:45,672] {dag_processing.py:1334} ERROR - Processor for
/usr/local/airflow/dags/example_dag.py with PID 21903 started at
2020-12-03T03:05:55.442709+00:00 has timed out, killing it.
  • Consultez les journaux du programmeur pour vérifier qu'il fonctionne correctement. En cas de problème, les entrées de journal suivantes peuvent s'afficher dans les journaux du planificateur :
DagFileProcessorManager (PID=732) last sent a heartbeat 240.09 seconds ago! Restarting it
Process timed out, PID: 68496

Solutions :

  • Corrigez toutes les erreurs d'analyse DAG. Le processeur DAG analyse plusieurs DAG. Dans de rares cas, les erreurs d'analyse d'un DAG peuvent avoir un impact négatif sur l'analyse d'autres DAG.

  • Si l'analyse de votre DAG prend plus de secondes que celles définies dans [core]dagrun_import_timeout, augmentez ce délai avant expiration.

  • Si l'analyse de tous vos DAG prend plus de secondes que celles définies dans [core]dag_file_processor_timeout, augmentez ce délai avant expiration.

  • Si l'analyse de votre DAG prend beaucoup de temps, cela peut également signifier qu'il n'est pas mises en œuvre de manière optimale. Par exemple, s'il est lu des variables d'environnement, ou effectue des appels vers des services externes ou Airflow, base de données. Dans la mesure du possible, évitez d'effectuer de telles opérations dans les sections globales des DAG.

  • Augmentez les ressources de processeur et de mémoire du programmeur afin qu'il puisse fonctionner plus rapidement.

  • Ajustez le nombre de planificateurs.

  • Augmenter le nombre de processus de processeur DAG afin de pouvoir effectuer une analyse plus rapidement. Pour ce faire, augmentez la valeur de [scheduler]parsing_process.

  • Réduisez la fréquence d'analyse des DAG.

  • Réduisez la charge sur la base de données Airflow.

Symptômes d'une base de données Airflow soumise à une charge importante

Pour en savoir plus, consultez la section Symptômes de la pression de charge sur la base de données Airflow.

Étape suivante