Cloud Composer 1 | Cloud Composer 2
Cette page fournit des étapes de dépannage et des informations pour les problèmes courants liés aux workflows.
De nombreux problèmes d'exécution des DAG sont causés par des performances d'environnement non optimales. Vous pouvez optimiser votre environnement Cloud Composer 2 en suivant le guide Optimiser les performances et les coûts de l'environnement.
Certains problèmes d'exécution du DAG peuvent être causés par un mauvais fonctionnement ou un fonctionnement optimal du programmeur Airflow. Veuillez suivre les instructions de dépannage de l'outil de programmation pour résoudre ces problèmes.
Résoudre un problème lié aux workflows
Pour commencer à résoudre les problèmes, procédez comme suit :
Vérifiez les journaux Airflow.
Vous pouvez augmenter le niveau de journalisation d'Airflow en remplaçant l'option de configuration Airflow suivante.
Airflow 2
Section Clé Value (Valeur) logging
logging_level
La valeur par défaut est INFO
. Définissez la valeur surDEBUG
pour obtenir plus de verbosité dans les messages de journal.Airflow 1
Section Clé Value (Valeur) core
logging_level
La valeur par défaut est INFO
. Définissez la valeur surDEBUG
pour obtenir plus de verbosité dans les messages de journal.Consultez le tableau de bord Monitoring.
Examinez Cloud Monitoring.
Dans la console Google Cloud, recherchez les erreurs sur les pages des composants de votre environnement.
Dans l'interface Web Airflow, recherchez les instances de tâches ayant échoué dans la vue graphique du DAG.
Section Clé Value (Valeur) webserver
dag_orientation
LR
,TB
,RL
ouBT
Déboguer des échecs de l'opérateur
Pour déboguer un échec de l'opérateur, procédez comme suit :
- Recherchez les erreurs spécifiques à la tâche.
- Vérifiez les journaux Airflow.
- Examinez Cloud Monitoring.
- Vérifiez les journaux spécifiques à l'opérateur.
- Corrigez les erreurs.
- Importez le DAG dans le dossier
dags/
. - Dans l'interface Web Airflow, effacez les états antérieurs du DAG.
- Relancez ou exécutez le DAG.
Résoudre les problèmes d'exécution de tâches
Airflow est un système distribué avec de nombreuses entités telles que le planificateur, l'exécuteur et les nœuds de calcul qui communiquent entre elles via une file d'attente de tâches et la base de données Airflow, et envoient des signaux (comme SIGTERM). Le schéma suivant présente les interconnexions entre les composants Airflow.
Dans un système distribué tel qu'Airflow, il peut y avoir des problèmes de connectivité réseau, ou l'infrastructure sous-jacente peut rencontrer des problèmes intermittents. Cela peut entraîner l'échec des tâches et la replanification de l'exécution, ou l'échec de l'exécution des tâches (par exemple, des tâches zombies ou bloquées dans leur exécution). Airflow dispose de mécanismes permettant de gérer ces situations et de rétablir automatiquement un fonctionnement normal. Les sections suivantes décrivent les problèmes courants qui surviennent lors de l'exécution de tâches par Airflow : tâches zombies, pilules empoisonnées et signaux SIGTERM.
Résoudre les problèmes liés aux tâches zombies
Airflow détecte deux types de non-concordance entre une tâche et un processus qui l'exécute:
Les tâches zombies sont censées être en cours d'exécution, mais ce n'est pas le cas. Cela peut se produire si le processus de la tâche a été arrêté ou ne répond pas, si le nœud de calcul Airflow n'a pas signalé l'état de la tâche à temps parce qu'elle est surchargée, ou si la VM sur laquelle la tâche est exécutée a été arrêtée. Airflow détecte ces tâches régulièrement, et échoue ou relance la tâche en fonction de ses paramètres.
Les tâches non mortes sont des tâches qui ne sont pas censées être en cours d'exécution. Airflow détecte ces tâches régulièrement et les arrête.
La cause la plus fréquente des tâches zombies est le manque de ressources de processeur et de mémoire dans le cluster de votre environnement. Par conséquent, un nœud de calcul Airflow peut ne pas être en mesure de signaler l'état d'une tâche ou un capteur peut être interrompu brusquement. Dans ce cas, le programmeur marque une tâche donnée comme tâche zombie. Pour éviter les tâches zombies, attribuez plus de ressources à votre environnement.
Pour en savoir plus sur le scaling de votre environnement Cloud Composer, consultez le guide de scaling de l'environnement. Si vous rencontrez des tâches zombies, vous pouvez augmenter le délai avant expiration de ces tâches. Par conséquent, le programmeur attend plus longtemps avant de considérer une tâche comme un zombie. De cette manière, un nœud de calcul Airflow a plus de temps pour signaler l'état de la tâche.
Pour augmenter le délai avant expiration des tâches zombies, remplacez la valeur de l'option de configuration Airflow [scheduler]scheduler_zombie_task_threshold
:
Section | Clé | Value (Valeur) | Notes |
---|---|---|---|
scheduler |
scheduler_zombie_task_threshold |
Nouveau délai avant expiration (en secondes) | La valeur par défaut est 300 . |
Résoudre les problèmes liés aux pilules antipoison
La pilule antipoison est un mécanisme utilisé par Airflow pour arrêter les tâches Airflow.
Airflow utilise des pilules antipoison dans les situations suivantes:
- Lorsqu'un planificateur met fin à une tâche qui ne s'est pas terminée à temps.
- Lorsqu'une tâche expire ou est exécutée trop longtemps.
Lorsque Airflow utilise un empoisonnement, vous pouvez voir les entrées de journal suivantes dans les journaux d'un nœud de calcul Airflow ayant exécuté la tâche:
INFO - Subtask ... WARNING - State of this instance has been externally set
to success. Taking the poison pill.
INFO - Subtask ... INFO - Sending Signals.SIGTERM to GPID <X>
INFO - Subtask ... ERROR - Received SIGTERM. Terminating subprocesses.
Solutions possibles:
- Vérifiez que le code de la tâche ne contient pas d'erreurs qui pourraient entraîner une exécution trop longue.
- (Cloud Composer 2) Augmentez la capacité de processeur et de mémoire pour les nœuds de calcul Airflow, afin d'accélérer l'exécution des tâches.
Augmentez la valeur de l'option de configuration Airflow
[celery_broker_transport_options]visibility-timeout
.Par conséquent, le programmeur attend plus longtemps qu'une tâche terminée avant de la considérer comme une tâche zombie. Cette option est particulièrement utile pour les tâches chronophages qui durent plusieurs heures. Si cette valeur est trop faible (par exemple, 3 heures), le planificateur considère les tâches qui s'exécutent pendant 5 ou 6 heures comme des tâches "suspendues" (tâches zombies).
Augmentez la valeur de l'option de configuration Airflow
[core]killed_task_cleanup_time
.Une valeur plus longue laisse plus de temps aux nœuds de calcul Airflow pour mener à bien leurs tâches. Si cette valeur est trop faible, les tâches Airflow peuvent être interrompues brusquement, sans assez de temps pour terminer correctement leur travail.
Résoudre les problèmes liés aux signaux SIGTERM
Les signaux SIGTERM sont utilisés par Linux, Kubernetes, le programmeur Airflow et Celery pour arrêter les processus responsables de l'exécution des nœuds de calcul Airflow ou des tâches Airflow.
Plusieurs raisons peuvent expliquer pourquoi des signaux SIGTERM sont envoyés dans un environnement:
Une tâche devient une tâche zombie et doit être arrêtée.
Le programmeur a détecté un doublon d'une tâche et envoie des signaux Poison Pill et SIGTERM à la tâche pour l'arrêter.
Dans l'autoscaling horizontal des pods, le plan de contrôle GKE envoie des signaux SIGTERM pour supprimer les pods qui ne sont plus nécessaires.
Le programmeur peut envoyer des signaux SIGTERM au processus DagFileProcessorManager. Ces signaux SIGTERM sont utilisés par le planificateur pour gérer le cycle de vie du processus DagFileProcessorManager et peuvent être ignorés en toute sécurité.
Exemple :
Launched DagFileProcessorManager with pid: 353002 Sending Signals.SIGTERM to group 353002. PIDs of all processes in the group: [] Sending the signal Signals.SIGTERM to group 353002 Sending the signal Signals.SIGTERM to process 353002 as process group is missing.
Condition de concurrence entre le rappel de pulsation et les rappels de sortie dans le job local_task_job, qui surveille l'exécution de la tâche. Si la pulsation détecte qu'une tâche a été marquée comme réussie, il ne peut pas distinguer si la tâche a réussi ou qu'Airflow a été invité à considérer la tâche comme réussie. Néanmoins, il met fin à un exécuteur de tâches, sans attendre qu'il se termine.
Ces signaux SIGTERM peuvent être ignorés en toute sécurité. La tâche a déjà un état de réussite et l'exécution de l'exécution du DAG dans son ensemble ne sera pas affectée.
L'entrée de journal
Received SIGTERM.
est la seule différence entre la sortie standard et l'arrêt de la tâche lorsque l'état a abouti.Figure 2 : Condition de concurrence entre les rappels de pulsation et de sortie (cliquez pour agrandir) Un composant Airflow utilise plus de ressources (processeur et mémoire) que ne le permet le nœud de cluster.
Le service GKE effectue des opérations de maintenance et envoie des signaux SIGTERM aux pods qui s'exécutent sur un nœud sur le point d'être mis à niveau. Lorsqu'une instance de tâche est arrêtée avec SIGTERM, vous pouvez voir les entrées suivantes dans les journaux d'un nœud de calcul Airflow ayant exécuté la tâche:
{local_task_job.py:211} WARNING - State of this instance has been externally
set to queued. Terminating instance. {taskinstance.py:1411} ERROR - Received
SIGTERM. Terminating subprocesses. {taskinstance.py:1703} ERROR - Task failed
with exception
Solutions possibles:
Ce problème se produit lorsqu'une VM qui exécute la tâche est à court de mémoire. Cela n'est pas lié aux configurations Airflow, mais à la quantité de mémoire disponible pour la VM.
L'augmentation de la mémoire dépend de la version de Cloud Composer que vous utilisez. Exemple :
Dans Cloud Composer 2, vous pouvez attribuer davantage de ressources de processeur et de mémoire aux nœuds de calcul Airflow.
Dans le cas de Cloud Composer 1, vous pouvez recréer votre environnement à l'aide d'un type de machine offrant plus de performances.
Dans les deux versions de Cloud Composer, vous pouvez réduire la valeur de l'option de configuration Airflow de simultanéité
[celery]worker_concurrency
. Cette option détermine le nombre de tâches exécutées simultanément par un nœud de calcul Airflow donné.
Pour en savoir plus sur l'optimisation de votre environnement Cloud Composer 2, consultez la page Optimiser les performances et les coûts de l'environnement.
Requêtes Cloud Logging permettant d'identifier les raisons de redémarrages ou d'évictions de pods
Les environnements Cloud Composer utilisent des clusters GKE en tant que couche d'infrastructure de calcul. Dans cette section, vous trouverez des requêtes utiles qui peuvent vous aider à déterminer les raisons des redémarrages ou évictions des nœuds de calcul Airflow ou du programmeur Airflow.
Les requêtes présentées ci-dessous peuvent être réglées comme suit:
vous pouvez spécifier une chronologie qui vous intéresse dans Cloud Logging (6 heures ou 3 derniers jours, par exemple), ou définir une période personnalisée
vous devez spécifier le CLUSTER_NAME de Cloud Composer
vous pouvez également limiter la recherche à un pod spécifique en ajoutant le POD_NAME
Découvrir les conteneurs redémarrés
resource.type="k8s_node" log_id("kubelet") jsonPayload.MESSAGE:"will be restarted" resource.labels.cluster_name="CLUSTER_NAME"
Autre requête pour limiter les résultats à un pod spécifique:
resource.type="k8s_node" log_id("kubelet") jsonPayload.MESSAGE:"will be restarted" resource.labels.cluster_name="CLUSTER_NAME" "POD_NAME"
Découvrir l'arrêt des conteneurs en raison d'un événement de saturation de la mémoire
resource.type="k8s_node" log_id("events") (jsonPayload.reason:("OOMKilling" OR "SystemOOM") OR jsonPayload.message:("OOM encountered" OR "out of memory")) severity=WARNING resource.labels.cluster_name="CLUSTER_NAME"
Autre requête pour limiter les résultats à un pod spécifique:
resource.type="k8s_node" log_id("events") (jsonPayload.reason:("OOMKilling" OR "SystemOOM") OR jsonPayload.message:("OOM encountered" OR "out of memory")) severity=WARNING resource.labels.cluster_name="CLUSTER_NAME" "POD_NAME"
Identifier les conteneurs qui ont cessé de s'exécuter
resource.type="k8s_node" log_id("kubelet") jsonPayload.MESSAGE:"ContainerDied" severity=DEFAULT resource.labels.cluster_name="CLUSTER_NAME"
Autre requête pour limiter les résultats à un pod spécifique:
resource.type="k8s_node" log_id("kubelet") jsonPayload.MESSAGE:"ContainerDied" severity=DEFAULT resource.labels.cluster_name="CLUSTER_NAME" "POD_NAME"
Impact des opérations de mise à jour sur les exécutions de tâches Airflow
Les opérations de mise à jour ou de mise à niveau interrompent l'exécution des tâches Airflow en cours, sauf si une tâche est exécutée en mode différable.
Nous vous recommandons d'effectuer ces opérations lorsque vous prévoyez un impact minimal sur les exécutions de tâches Airflow et que vous configurez des mécanismes de nouvelle tentative appropriés dans vos DAG et vos tâches.
Problèmes courants
Les sections suivantes décrivent les symptômes et les correctifs potentiels de certains problèmes courants liés aux DAG.
La tâche Airflow a été interrompue par Negsignal.SIGKILL
Parfois, votre tâche peut utiliser plus de mémoire que le nœud de calcul Airflow n'est alloué.
Dans ce cas, il peut être interrompu par Negsignal.SIGKILL
. Le système envoie ce signal pour éviter toute utilisation de mémoire supplémentaire, ce qui peut affecter l'exécution d'autres tâches Airflow. L'entrée de journal suivante peut apparaître dans le journal du nœud de calcul Airflow:
{local_task_job.py:102} INFO - Task exited with return code Negsignal.SIGKILL
Solutions possibles:
Diminution du nombre de nœuds de calcul de simultanéité des nœuds de calcul Airflow
Dans le cas de Cloud Composer 2, augmentez la mémoire des nœuds de calcul Airflow.
Dans le cas de Cloud Composer 1, passez à un type de machine plus performant utilisé dans le cluster Composer
Optimisez vos tâches pour utiliser moins de mémoire
La tâche échoue sans émettre de journaux en raison d'erreurs d'analyse du DAG
Parfois, des erreurs subtiles liées au DAG peuvent entraîner une situation où un programmeur Airflow et un processeur DAG sont en mesure de planifier des tâches à exécuter et d'analyser un fichier DAG (respectivement), mais le nœud de calcul Airflow ne parvient pas à exécuter les tâches de ce DAG, car des erreurs de programmation se produisent dans le fichier DAG Python. Cela peut entraîner une situation où une tâche Airflow est marquée comme Failed
et qu'aucun journal n'est créé pour son exécution.
Solutions :
Vérifiez dans les journaux des nœuds de calcul Airflow qu'il n'y a aucune erreur générée par le nœud de calcul Airflow liée à des erreurs d'analyse du DAG ou du DAG manquantes.
Augmentez les paramètres liés à l'analyse du DAG:
Augmentez dagbag-import-timeout sur au moins 120 secondes (ou plus, si nécessaire).
Augmentez dag-file-processor-timeout sur 180 secondes au minimum (ou plus, si nécessaire). Cette valeur doit être supérieure à
dagbag-import-timeout
.
Consultez également la section Inspecter les journaux du processeur DAG.
La tâche échoue sans émettre de journaux en raison de la pression des ressources
Problème constaté: lors de l'exécution d'une tâche, le sous-processus du nœud de calcul Airflow responsable de l'exécution de la tâche Airflow est interrompu brusquement. L'erreur visible dans le journal du nœud de calcul Airflow peut ressembler à l'erreur ci-dessous:
...
File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 412, in trace_task R = retval = fun(*args, **kwargs) File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 704, in __protected_call__ return self.run(*args, **kwargs) File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 88, in execute_command _execute_in_fork(command_to_exec) File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 99, in _execute_in_fork
raise AirflowException('Celery command failed on host: ' + get_hostname())airflow.exceptions.AirflowException: Celery command failed on host: airflow-worker-9qg9x
...
Solution :
- Dans Cloud Composer 1, créez un environnement avec un type de machine plus volumineux que le type de machine actuel. Envisagez d'ajouter des nœuds à votre environnement et de réduire la
[celery]worker_concurrency
pour vos nœuds de calcul. - Dans Cloud Composer 2, augmentez les limites de mémoire pour les nœuds de calcul Airflow.
- Si votre environnement génère également des tâches zombies, consultez la page Résoudre les problèmes liés aux tâches zombies.
- Pour obtenir un tutoriel sur le débogage des problèmes de mémoire insuffisante, consultez la section Déboguer les problèmes de DAG à cause de mémoire et d'espace de stockage insuffisants.
La tâche échoue sans émettre de journaux en raison de l'éviction des pods
Les pods Google Kubernetes Engine sont soumis au cycle de vie des pods Kubernetes et à leur éviction. Les pics de tâches et la co-planification des nœuds de calcul sont les deux causes les plus courantes de l'éviction des pods dans Cloud Composer.
L'éviction des pods peut se produire lorsqu'un pod particulier utilise trop de ressources sur un nœud, par rapport aux attentes de consommation de ressources configurées pour le nœud. Par exemple, l'éviction peut se produire lorsque plusieurs tâches gourmandes en mémoire s'exécutent dans un pod, et que leur charge combinée entraîne le dépassement de la limite d'utilisation de mémoire par le nœud sur lequel ce pod s'exécute.
Si un pod de nœud de calcul Airflow est évincé, toutes les instances de tâche qui y sont exécutées sont interrompues, puis marquées comme ayant échoué par Airflow.
Les journaux sont mis en mémoire tampon. Si un pod de nœuds de calcul est évincé avant la purge du tampon, les journaux ne sont pas envoyés. L'échec de la tâche sans journaux indique que les nœuds de calcul Airflow sont redémarrés en raison d'une mémoire saturée (OOM, Out Of Memory). Certains journaux peuvent être présents dans Cloud Logging, même si les journaux Airflow n'ont pas été envoyés.
Pour afficher les journaux :
Dans la console Google Cloud, accédez à la page Environnements.
Dans la liste des environnements, cliquez sur le nom de votre environnement. La page Détails de l'environnement s'ouvre.
Accédez à l'onglet Journaux.
Affichez les journaux de chaque nœud de calcul sous Tous les journaux -> Journaux Airflow -> Nœuds de calcul -> (nœud de calcul individuel).
L'exécution du DAG est limitée en mémoire. L'exécution de chaque tâche commence par deux processus Airflow : l'exécution et la surveillance de la tâche. Chaque nœud peut accepter jusqu'à six tâches simultanées (environ 12 processus chargés avec des modules Airflow). Vous pouvez utiliser plus de mémoire, en fonction de la nature du DAG.
Symptôme :
Dans la console Google Cloud, accédez à la page Charges de travail.
Si des pods
airflow-worker
affichentEvicted
, cliquez sur chaque pod évincé et recherchez le messageThe node was low on resource: memory
en haut de la fenêtre.
Correctif :
- Dans Cloud Composer 1, créez un environnement Cloud Composer avec un type de machine plus volumineux que le type de machine actuel.
- Dans Cloud Composer 2, augmentez les limites de mémoire pour les nœuds de calcul Airflow.
- Vérifiez les journaux des pods
airflow-worker
pour identifier les causes d'éviction possibles. Pour en savoir plus sur la récupération des journaux à partir de pods individuels, consultez la page Résoudre les problèmes liés aux charges de travail déployées. - Assurez-vous que les tâches du DAG sont idempotentes et récupérables.
Évitez de télécharger des fichiers inutiles dans le système de fichiers local des nœuds de calcul Airflow.
La capacité des nœuds de calcul Airflow est limitée. Par exemple, dans Cloud Composer 2, un nœud de calcul peut disposer de 1 à 10 Go d'espace de stockage. Lorsque l'espace de stockage est épuisé, le pod de nœuds de calcul Airflow est évincé par le plan de contrôle GKE. Cela fait échouer toutes les tâches que le nœud de calcul évincé était en train d'exécuter.
Exemples d'opérations problématiques:
- Télécharger des fichiers ou des objets et les stocker localement dans un nœud de calcul Airflow. Stockez plutôt ces objets directement dans un service approprié tel qu'un bucket Cloud Storage.
- Accès aux objets volumineux du dossier
/data
à partir d'un nœud de calcul Airflow. Le nœud de calcul Airflow télécharge l'objet dans son système de fichiers local. À la place, implémentez vos DAG de sorte que les fichiers volumineux soient traités en dehors du pod de nœuds de calcul Airflow.
Délai avant expiration de l'importation du chargement DAG
Symptôme :
- Dans l'interface Web Airflow, en haut de la page de la liste des DAG, une zone d'alerte rouge affiche
Broken DAG: [/path/to/dagfile] Timeout
. Dans Cloud Monitoring, les journaux
airflow-scheduler
contiennent des entrées semblables à celles-ci :ERROR - Process timed out
ERROR - Failed to import: /path/to/dagfile
AirflowTaskTimeout: Timeout
Correctif :
Remplacez l'option de configuration Airflow dag_file_processor_timeout
et accordez plus de temps à l'analyse du DAG:
Section | Clé | Value (Valeur) |
---|---|---|
core |
dag_file_processor_timeout |
Nouvelle valeur du délai |
L'exécution du DAG ne se termine pas dans l'heure prévue
Symptôme :
Parfois, l'exécution du DAG ne s'arrête pas parce que les tâches Airflow sont bloquées et que l'exécution du DAG dure plus longtemps que prévu. Dans des conditions normales, les tâches Airflow ne restent pas indéfiniment en file d'attente ou en cours d'exécution, car Airflow dispose de procédures de délai avant expiration et de nettoyage permettant d'éviter cette situation.
Correctif :
Utilisez le paramètre
dagrun_timeout
pour les DAG. Exemple :dagrun_timeout=timedelta(minutes=120)
. Par conséquent, chaque exécution du DAG doit se terminer dans le délai imparti, et les tâches non terminées sont marquées commeFailed
ouUpstream Failed
. Pour en savoir plus sur les états des tâches Airflow, consultez la documentation Apache Airflow.Utilisez le paramètre délai avant expiration de l'exécution de la tâche pour définir un délai avant expiration par défaut pour les tâches exécutées en fonction des opérateurs Apache Airflow.
Exécutions du DAG non exécutées
Symptôme :
Lorsqu'une date de planification est définie de manière dynamique pour un DAG, cela peut entraîner divers effets secondaires inattendus. Exemple :
L'exécution d'un DAG se situe toujours dans le futur et le DAG n'est jamais exécuté.
Les exécutions précédentes du DAG sont marquées comme exécutées et ont abouti, même si elles n'ont pas été exécutées.
Pour en savoir plus, consultez la documentation Apache Airflow.
Correctif :
Suivez les recommandations de la documentation Apache Airflow.
Définissez un
start_date
statique pour les DAG. Si vous le souhaitez, vous pouvez utilisercatchup=False
pour désactiver l'exécution du DAG pour des dates passées.Évitez d'utiliser
datetime.now()
oudays_ago(<number of days>)
, sauf si vous êtes conscient des effets secondaires de cette approche.
Augmentation du trafic réseau vers et depuis la base de données Airflow
La quantité de trafic réseau entre le cluster GKE de votre environnement et la base de données Airflow dépend du nombre de DAG, du nombre de tâches dans les DAG et de la manière dont les DAG accèdent aux données de la base de données Airflow. Les facteurs suivants peuvent influencer l'utilisation du réseau :
Requêtes envoyées à la base de données Airflow. Si vos DAG effectuent de nombreuses requêtes, ils génèrent une grande quantité de trafic. Exemples : vérifier l'état des tâches avant de poursuivre avec d'autres tâches, interroger la table XCom, créer un vidage du contenu de la base de données Airflow.
Grand nombre de tâches. Plus le nombre de tâches à planifier est élevé, plus du trafic réseau est généré. Ces considérations s'appliquent au nombre total de tâches dans vos DAG et à la fréquence de planification. Lorsque le programmeur Airflow planifie l'exécution du DAG, il envoie des requêtes à la base de données Airflow et génère du trafic.
L'interface Web Airflow génère du trafic réseau, car elle envoie des requêtes à la base de données Airflow. L'utilisation intensive de pages avec des graphiques, des tâches et des schémas peut générer de gros volumes de trafic réseau.
Le DAG entraîne le plantage du serveur Web Airflow ou lui fait renvoyer une erreur 502 gateway timeout
Des défaillances du serveur Web peuvent survenir pour plusieurs raisons. Consultez les journaux airflow-webserver dans Cloud Logging pour déterminer la cause de l'erreur 502 gateway timeout
.
Calculs lourds
Cette section ne s'applique qu'à Cloud Composer 1.
Évitez d'exécuter des calculs lourds au moment de l'analyse du DAG.
Contrairement aux nœuds de calcul et de programmeur, dont les types de machines peuvent être personnalisés pour augmenter la capacité du processeur et de la mémoire, le serveur Web utilise un type de machine fixe, ce qui peut entraîner des échecs d'analyse du DAG si le calcul au moment de l'analyse est trop lourd.
Veuillez prendre en compte que le serveur Web dispose de deux processeurs virtuels et de 2 Go de mémoire.
La valeur par défaut pour core-dagbag_import_timeout
est de 30 secondes. La valeur du délai avant expiration définit la limite supérieure de la durée pendant laquelle Airflow charge un module Python dans le dossier dags/
.
Autorisations incorrectes
Cette section ne s'applique qu'à Cloud Composer 1.
Le serveur Web ne s'exécute pas sous le même compte de service que les nœuds de calcul et le planificateur. En tant que tels, les nœuds de calcul et le planificateur peuvent être en mesure d'accéder à des ressources gérées par l'utilisateur auxquelles le serveur Web n'a pas accès.
Nous vous recommandons d'éviter l'accès à des ressources non publiques lors de l'analyse du DAG. Parfois, c'est inévitable et vous devrez accorder des autorisations au compte de service du serveur Web. Le nom du compte de service est dérivé du domaine de serveur Web. Par exemple, si le domaine est example-tp.appspot.com
, le compte de service est example-tp@appspot.gserviceaccount.com
.
Erreurs du DAG
Cette section ne s'applique qu'à Cloud Composer 1.
Le serveur Web s'exécute sur App Engine et est distinct du cluster GKE de votre environnement. Le serveur Web analyse les fichiers de définition du DAG, et une erreur 502 gateway timeout
peut se produire en cas d'erreurs dans le DAG. Airflow fonctionne normalement sans serveur Web fonctionnel si le DAG problématique n'interrompt aucun processus exécuté dans GKE.
Dans ce cas, vous pouvez utiliser gcloud composer environments run
pour récupérer des informations sur votre environnement et comme solution de contournement si le serveur Web devient indisponible.
Dans d'autres cas, vous pouvez exécuter l'analyse du DAG dans GKE et rechercher les DAG générant des exceptions fatales Python ou ce délai d'expiration (30 secondes par défaut). Pour résoudre ce problème, connectez-vous à une interface système distante dans un conteneur de nœud de calcul Airflow et testez les erreurs de syntaxe. Pour en savoir plus, reportez-vous à la section Tester les DAG.
Gérer un grand nombre de DAG et de plug-ins dans les dossiers de DAG et de plug-ins
Le contenu des dossiers /dags
et /plugins
est synchronisé entre le bucket de votre environnement et les systèmes de fichiers locaux des nœuds de calcul et des programmeurs Airflow.
Plus il y a de données stockées dans ces dossiers, plus la synchronisation prend du temps. Pour remédier à ces situations:
Limitez le nombre de fichiers dans les dossiers
/dags
et/plugins
. Ne stockez que le minimum de fichiers requis.Si possible, augmentez l'espace disque disponible pour les programmeurs et les nœuds de calcul Airflow.
Si possible, augmentez le nombre de processeurs et la mémoire pour les programmeurs et les nœuds de calcul Airflow, afin d'accélérer l'opération de synchronisation.
Si le nombre de DAG est très important, divisez-les en lots, compressez-les en archives ZIP et déployez ces archives dans le dossier
/dags
. Cette approche accélère le processus de synchronisation des DAG. Les composants Airflow décompressent les archives ZIP avant de traiter les DAG.La génération programmatique de DAG peut également être une méthode permettant de limiter le nombre de fichiers DAG stockés dans le dossier
/dags
. Reportez-vous à la section sur les DAG programmatiques pour éviter les problèmes liés à la planification et à l'exécution des DAG générés de manière programmatique.
Ne pas programmer en même temps les DAG générés de manière automatisée
La génération programmatique d'objets à partir d'un fichier DAG est une méthode efficace pour créer de nombreux DAG similaires qui ne présentent que de légères différences.
Il est important de ne pas programmer l'exécution immédiate de tous ces DAG. Il est fort probable que les nœuds de calcul Airflow ne disposent pas de suffisamment de ressources de processeur et de mémoire pour exécuter toutes les tâches planifiées en même temps.
Pour éviter les problèmes liés à la planification des DAG programmatiques:
- Augmentez la simultanéité des nœuds de calcul et faites évoluer votre environnement afin qu'il puisse exécuter davantage de tâches simultanément.
- Générez des DAG de manière à répartir leurs planifications de manière uniforme dans le temps, afin d'éviter de planifier des centaines de tâches en même temps, afin que les nœuds de calcul Airflow aient le temps d'exécuter toutes les tâches planifiées.
Erreur 504 lors de l'accès au serveur Web Airflow
Consultez l'erreur 504 lors de l'accès à l'interface utilisateur Airflow.
L'exception Lost connection to Postgres / MySQL server during query
est générée pendant ou juste après l'exécution de la tâche.
Les exceptions Lost connection to Postgres / MySQL server during query
se produisent souvent lorsque les conditions suivantes sont remplies:
- Votre DAG utilise
PythonOperator
ou un opérateur personnalisé. - Votre DAG envoie des requêtes à la base de données Airflow.
Si plusieurs requêtes sont effectuées à partir d'une fonction appelable, les traces peuvent pointer vers la ligne self.refresh_from_db(lock_for_update=True)
dans le code Airflow de manière incorrecte car il s'agit de la première requête de base de données après l'exécution de la tâche. La cause réelle de l'exception se produit avant, lorsqu'une session SQLAlchemy n'est pas correctement fermée.
Les sessions SQLAlchemy s'appliquent à un thread et sont créées dans une session de fonction appelable qui peut ensuite être prolongée dans le code Airflow. S'il existe des retards importants entre les requêtes au cours d'une session, la connexion peut déjà être fermée par le serveur Postgres ou MySQL. Le délai avant expiration de la connexion dans les environnements Cloud Composer est d'environ 10 minutes.
Correctif :
- Utilisez le décorateur
airflow.utils.db.provide_session
. Ce décorateur fournit une session valide à la base de données Airflow dans le paramètresession
et ferme correctement la session à la fin de la fonction. - N'utilisez pas une seule fonction de longue durée. Déplacez plutôt toutes les requêtes de base de données vers des fonctions distinctes, afin qu'il existe plusieurs fonctions avec le décorateur
airflow.utils.db.provide_session
. Dans ce cas, les sessions sont automatiquement fermées après la récupération des résultats de la requête.
Contrôler la durée d'exécution des DAG, des tâches et des exécutions parallèles d'un même DAG
Si vous souhaitez contrôler la durée d'exécution d'un DAG pour un DAG spécifique, utilisez le paramètre dagrun_timeout
du DAG. Par exemple, si vous vous attendez à ce qu'un seul DAG (que l'exécution se termine par une réussite ou un échec) ne dépasse pas une heure, définissez ce paramètre sur 3 600 secondes.
Vous pouvez également contrôler la durée d'une tâche Airflow. Pour ce faire, vous pouvez utiliser execution_timeout
.
Si vous souhaitez contrôler le nombre d'exécutions de DAG actives que vous souhaitez avoir pour un DAG particulier, vous pouvez utiliser l'option de configuration Airflow [core]max-active-runs-per-dag
.
Si vous souhaitez qu'une seule instance d'un DAG s'exécute à un moment donné, définissez le paramètre max-active-runs-per-dag
sur 1
.
Problèmes affectant la synchronisation des DAG et des plug-ins avec les programmeurs, les nœuds de calcul et les serveurs Web
Cloud Composer synchronise le contenu des dossiers /dags
et /plugins
avec les planificateurs et les nœuds de calcul. Certains objets des dossiers /dags
et /plugins
peuvent empêcher cette synchronisation de fonctionner correctement ou au moins la ralentir.
Le dossier
/dags
est synchronisé avec les planificateurs et les nœuds de calcul. Ce dossier n'est pas synchronisé avec les serveurs Web dans Cloud Composer 2 ou si vous activezDAG Serialization
dans Cloud Composer 1.Le dossier
/plugins
est synchronisé avec les planificateurs, les nœuds de calcul et les serveurs Web.
Vous pouvez rencontrer les problèmes suivants:
Vous avez importé des fichiers compressés au format gzip qui utilisent le transcodage par compression dans les dossiers
/dags
et/plugins
. Cela se produit généralement si vous utilisez la commandegsutil cp -Z
pour importer des données dans le bucket.Solution: supprimez l'objet qui a utilisé le transcodage par compression, puis réimportez-le dans le bucket.
L'un des objets est nommé "." : cet objet n'est pas synchronisé avec les planificateurs et les nœuds de calcul, et peut cesser du tout de se synchroniser.
Solution: renommez l'objet problématique.
Un dossier et un fichier DAG Python portent les mêmes noms. Par exemple :
a.py
. Dans ce cas, le fichier DAG n'est pas correctement synchronisé avec les composants Airflow.Solution: supprimez le dossier portant le même nom qu'un fichier DAG Python.
L'un des objets des dossiers
/dags
ou/plugins
contient un symbole/
à la fin du nom de l'objet. De tels objets peuvent induire en erreur le processus de synchronisation, car le symbole/
signifie qu'un objet est un dossier, et non un fichier.Solution: supprimez le symbole
/
du nom de l'objet qui pose problème.Ne stockez pas de fichiers inutiles dans les dossiers
/dags
et/plugins
.Les DAG et les plug-ins que vous implémentez sont parfois accompagnés de fichiers supplémentaires, tels que des fichiers stockant des tests pour ces composants. Ces fichiers sont synchronisés avec les nœuds de calcul et les planificateurs et ont un impact sur le temps nécessaire à leur copie sur les planificateurs, les nœuds de calcul et les serveurs Web.
Solution: ne stockez pas de fichiers supplémentaires et inutiles dans les dossiers
/dags
et/plugins
.
L'erreur Done [Errno 21] Is a directory: '/home/airflow/gcs/dags/...'
est générée par les programmeurs et les nœuds de calcul
Ce problème est dû au fait que les objets peuvent avoir des espaces de noms qui se chevauchent dans Cloud Storage, tandis que, dans le même temps, les planificateurs et les nœuds de calcul utilisent des systèmes de fichiers traditionnels. Par exemple, il est possible d'ajouter à la fois un dossier et un objet portant le même nom au bucket d'un environnement. Lorsque le bucket est synchronisé avec les planificateurs et les nœuds de calcul de l'environnement, cette erreur est générée, ce qui peut entraîner l'échec des tâches.
Pour résoudre ce problème, assurez-vous que le bucket de l'environnement ne contient pas d'espaces de noms qui se chevauchent. Par exemple, si /dags/misc
(un fichier) et /dags/misc/example_file.txt
(un autre fichier) se trouvent tous deux dans un bucket, le programmeur génère une erreur.
Interruptions temporaires lors de la connexion à la base de données de métadonnées Airflow
Cloud Composer s'exécute sur l'infrastructure cloud distribuée. Cela signifie que, de temps en temps, des problèmes temporaires peuvent survenir et interrompre l'exécution de vos tâches Airflow.
Dans de telles situations, les messages d'erreur suivants peuvent s'afficher dans les journaux des nœuds de calcul Airflow:
"Can't connect to Postgres / MySQL server on 'airflow-sqlproxy-service.default.svc.cluster.local' (111)"
ou
"Can't connect to Postgres / MySQL server on 'airflow-sqlproxy-service.default.svc.cluster.local' (104)"
Ces problèmes intermittents peuvent également être causés par des opérations de maintenance effectuées pour vos environnements Cloud Composer.
En général, ces erreurs sont intermittentes et si vos tâches Airflow sont idempotentes et que vous avez configuré de nouvelles tentatives, vous ne devriez pas en avoir. Vous pouvez également envisager de définir des intervalles de maintenance.
Une raison supplémentaire de ces erreurs peut être le manque de ressources dans le cluster de votre environnement. Dans ce cas, vous pouvez effectuer le scaling à la hausse ou l'optimisation de votre environnement comme décrit dans les instructions Effectuer le scaling d'environnements ou Optimiser votre environnement.
Une exécution du DAG est marquée comme réussie, mais n'a aucune tâche exécutée
Si l'exécution du DAG execution_date
est antérieure à sa date start_date
, il est possible que vous constatiez des exécutions du DAG sans aucune exécution de tâche, mais qui sont tout de même marquées comme ayant réussi.

Cause
Cette situation peut se produire dans les cas suivants:
Cette non-concordance est due à la différence de fuseau horaire entre les valeurs
execution_date
etstart_date
du DAG. Cela peut se produire, par exemple, lorsque vous utilisezpendulum.parse(...)
pour définirstart_date
.Le
start_date
du DAG est défini sur une valeur dynamique, par exempleairflow.utils.dates.days_ago(1)
.
Solution
Assurez-vous que
execution_date
etstart_date
utilisent le même fuseau horaire.Spécifiez une
start_date
statique et combinez-la aveccatchup=False
pour éviter d'exécuter des DAG avec des dates de début passées.
Un DAG n'est pas visible dans l'interface utilisateur d'Airflow ou du DAG, et le programmeur ne le planifie pas
Le processeur DAG analyse chaque DAG avant qu'il ne puisse être planifié par le programmeur et avant qu'un DAG ne soit visible dans l'interface utilisateur Airflow ou l'interface utilisateur du DAG.
Les options de configuration Airflow suivantes définissent les délais d'inactivité pour l'analyse des DAG:
[core]dagrun_import_timeout
définit le temps dont dispose le processeur DAG pour analyser un seul DAG.[core]dag_file_processor_timeout
définit le temps total que le processeur DAG peut consacrer à l'analyse de tous les DAG.
Si un DAG n'est pas visible dans l'interface utilisateur d'Airflow ou de DAG:
- Vérifiez les journaux du processeur DAG si celui-ci est capable de traiter correctement votre DAG. En cas de problème, les entrées de journal suivantes peuvent s'afficher dans les journaux du processeur ou du programmeur DAG:
[2020-12-03 03:06:45,672] {dag_processing.py:1334} ERROR - Processor for
/usr/local/airflow/dags/example_dag.py with PID 21903 started at
2020-12-03T03:05:55.442709+00:00 has timed out, killing it.
- Consultez les journaux du programmeur pour voir si celui-ci fonctionne correctement. En cas de problème, les entrées de journal suivantes peuvent s'afficher dans les journaux du programmeur:
DagFileProcessorManager (PID=732) last sent a heartbeat 240.09 seconds ago! Restarting it
Process timed out, PID: 68496
Solutions :
Corrigez toutes les erreurs d'analyse du DAG. Le processeur DAG analyse plusieurs DAG. Dans de rares cas, les erreurs d'analyse d'un DAG peuvent avoir un impact négatif sur l'analyse d'autres DAG.
Si l'analyse de votre DAG dépasse le nombre de secondes défini dans
[core]dagrun_import_timeout
, augmentez ce délai.Si l'analyse de tous vos DAG prend plus de secondes que celle définie dans
[core]dag_file_processor_timeout
, augmentez ce délai.Si l'analyse de votre DAG prend beaucoup de temps, cela peut également signifier qu'il n'est pas implémenté de manière optimale. (par exemple, s'il lit de nombreuses variables d'environnement ou effectue des appels à des services externes ou à la base de données Airflow). Dans la mesure du possible, évitez d'effectuer de telles opérations dans les sections globales des DAG.
Augmentez les ressources de processeur et de mémoire pour le programmeur afin qu'il puisse fonctionner plus rapidement.
Augmentez le nombre de processus du processeur DAG afin d'accélérer l'analyse. Pour ce faire, augmentez la valeur de
[scheduler]parsing_process
.
Problèmes constatés lorsque la base de données Airflow est soumise à une charge importante
Pour en savoir plus, consultez la section Symptômes d'une charge de travail de la base de données Airflow dans laquelle la base de données Airflow est soumise.
Étapes suivantes
- Résoudre les problèmes d'installation de packages PyPI
- Résoudre les problèmes liés aux mises à jour et aux mises à niveau d'environnement