Cloud Composer 1 est en mode post-maintenance. Google ne publie aucune autre mise à jour de Cloud Composer 1, y compris les nouvelles versions d'Airflow, les corrections de bugs et les mises à jour de sécurité. Nous vous recommandons de planifier la migration vers Cloud Composer 2.
Cette page fournit les étapes de dépannage et des informations sur les workflows courants
les problèmes de performances.
De nombreux problèmes d'exécution des DAG sont causés par des performances d'environnement non optimales.
Vous pouvez optimiser votre environnement Cloud Composer 2 en suivant les instructions du guide Optimize
sur les performances et les coûts de l'environnement.
Certains problèmes d'exécution du DAG peuvent être causés par le programmeur Airflow
ne fonctionne pas correctement ni de manière optimale. Veuillez suivre
Instructions pour résoudre les problèmes liés au programmeur
pour résoudre ces problèmes.
Résoudre un problème lié aux workflows
Pour commencer à résoudre les problèmes, procédez comme suit :
Airflow est un système distribué avec de nombreuses entités
comme le programmeur, l'exécuteur,
les nœuds de calcul qui communiquent entre eux via une file d'attente de tâches et
base de données et envoyer des signaux (comme SIGTERM). Le schéma suivant illustre un
Présentation des interconnexions entre les composants Airflow.
Figure 1. Interaction entre les composants Airflow (cliquez pour agrandir)
Dans un système distribué comme Airflow, il peut y avoir une certaine connectivité réseau
ou l'infrastructure sous-jacente peut rencontrer des problèmes intermittents ;
cela peut conduire à des situations où des tâches
peuvent échouer et être reprogrammées pour
ou des tâches peuvent ne pas aboutir (pour examen, zombie, etc.).
tâches ou tâches bloquées dans l'exécution). Airflow propose des mécanismes
dans de telles situations et reprendre
automatiquement le fonctionnement normal. Suivies
Les sections suivantes décrivent les problèmes courants survenant lors de l'exécution des tâches par Airflow:
Tâches zombies, pilules empoisonnées et signaux SIGTERM.
Résoudre les problèmes liés aux tâches zombies
Airflow détecte deux types de non-concordance entre une tâche et un processus qui s'exécute
la tâche:
Les tâches zombies sont des tâches censées s'exécuter, mais qui ne sont pas
en cours d'exécution. Cela peut se produire si le processus
de la tâche a été arrêté ou n'est pas
répondant si le nœud de calcul Airflow n'a pas signalé l'état d'une tâche à temps ;
parce qu'elle est surchargée, ou si la VM où la tâche est exécutée a été arrêtée.
Airflow détecte ces tâches régulièrement et échoue ou relance la tâche.
en fonction des paramètres de la tâche.
Les tâches zombies sont des tâches qui ne sont pas censées être en cours d'exécution. Airflow trouve
ces tâches périodiquement et
les y met fin.
Vous trouverez ci-dessous les raisons les plus courantes des tâches zombies et les solutions qui s'y rapportent.
Le nœud de calcul Airflow est à court de mémoire
Chaque nœud de calcul Airflow peut exécuter jusqu'à [celery]worker_concurrency instances de tâche
simultanément. Si la consommation cumulative de la mémoire de ces instances de tâche
dépasse la limite de mémoire pour un nœud de calcul Airflow, un processus aléatoire s'exécute sur celui-ci
arrêté pour libérer des ressources.
Découvrir les événements de saturation de la mémoire des nœuds de calcul Airflow
resource.type="k8s_node"
resource.labels.cluster_name="GKE_CLUSTER_NAME"
log_id("events")
jsonPayload.message:"Killed process"
jsonPayload.message:("airflow task" OR "celeryd")
Dans Cloud Composer 2 versions antérieures à 2.6.0, mettez à jour
[celery]worker_concurrency à l'aide de la formule actuelle si
cette valeur est inférieure ;
Les évictions de pods font partie intégrante de l'exécution des charges de travail sur Kubernetes.
GKE évince les pods en cas d'espace de stockage insuffisant ou
des ressources pour les charges de travail ayant une priorité plus élevée.
Découvrir les évictions de nœuds de calcul Airflow
Si une éviction est due à un manque de stockage, vous pouvez réduire le stockage
utiliser ou supprimer les fichiers
temporaires dès qu’ils ne sont plus nécessaires.
Vous pouvez également
augmenter l'espace de stockage disponible ou exécuter
charges de travail dans un pod dédié avec KubernetesPodOperator.
Le nœud de calcul Airflow a été arrêté
Les nœuds de calcul Airflow peuvent être supprimés en externe. Si les tâches en cours d'exécution
se terminer pendant une période de grâce, ils seront interrompus et pourraient
finissent par être identifiés
comme des zombies.
Découvrir les arrêts de pods de nœuds de calcul Airflow
Dans Cloud Composer 2 des versions antérieures à la version 2.4.5, un nœud Airflow d'arrêt
Le nœud de calcul peut ignorer le signal SIGTERM et continuer à exécuter les tâches:
Découvrir le scaling à la baisse par l'autoscaling de Composer
Vous pouvez passer à une version ultérieure de Cloud Composer, où cette
est résolu.
Le nœud de calcul Airflow était soumis à une charge importante
La quantité de ressources de processeur et de mémoire disponibles pour un nœud de calcul Airflow est limitée
par la configuration de l'environnement. Si une utilisation se rapproche des limites,
cela entraînerait un conflit de ressources
et des retards inutiles pendant la tâche
l'exécution. Dans les situations extrêmes, lorsque les ressources sont insuffisantes pendant de longues périodes
cela pourrait causer des
tâches zombies.
La base de données Airflow était soumise à une charge importante
Une base de données est utilisée par divers composants Airflow pour communiquer entre eux et,
en particulier pour stocker les pulsations
des instances de tâches. Pénurie de ressources sur le
peut entraîner des temps de requête plus longs et peut affecter l'exécution d'une tâche.
La base de données Airflow était temporairement indisponible
La détection et la gestion optimale des nœuds de calcul Airflow par intermittence peuvent prendre du temps
telles que des problèmes de connectivité temporaires. Elle peut dépasser la valeur par défaut
de détection des zombies.
Découvrir les délais avant expiration des pulsations Airflow
resource.type="cloud_composer_environment"
resource.labels.environment_name="ENVIRONMENT_NAME"
log_id("airflow-worker")
textPayload:"Heartbeat time limit exceeded"
Solutions :
Augmentez le délai avant expiration des tâches zombies et remplacez les
valeur de [scheduler]scheduler_zombie_task_threshold
Option de configuration Airflow:
Section
Clé
Valeur
Remarques
scheduler
scheduler_zombie_task_threshold
Nouveau
délai avant expiration
(dans
secondes)
La valeur par défaut
la valeur est
300
Résoudre les problèmes de pilule empoisonnée
Pilule empoisonnée est un mécanisme utilisé par Airflow pour arrêter les tâches Airflow.
Airflow utilise les pilules empoisonnées dans les situations suivantes:
Lorsqu'un planificateur met fin à une tâche qui ne s'est pas terminée à temps.
Lorsqu'une tâche expire ou est exécutée trop longtemps.
Lorsqu'Airflow utilise une pilule empoisonnée, les entrées de journal suivantes s'affichent dans les journaux d'un nœud de calcul Airflow qui a exécuté la tâche:
INFO - Subtask ... WARNING - State of this instance has been externally set
to success. Taking the poison pill.
INFO - Subtask ... INFO - Sending Signals.SIGTERM to GPID <X>
INFO - Subtask ... ERROR - Received SIGTERM. Terminating subprocesses.
Solutions possibles:
Vérifiez que le code de la tâche ne contient pas d'erreurs qui pourraient entraîner une exécution trop longue.
Augmentez la valeur du paramètre
[celery_broker_transport_options]visibility-timeout configuration Airflow
.
Par conséquent, le planificateur attend plus longtemps qu'une tâche soit terminée,
avant de la considérer comme une tâche zombie. Cette option est particulièrement
utiles pour les tâches chronophages
qui durent de nombreuses heures. Si
la valeur est trop faible (par exemple, 3 heures), alors le programmeur considère
Tâches exécutées pendant 5 ou 6 heures comme "bloquées" (tâches zombies).
Augmenter la valeur d'Airflow [core]killed_task_cleanup_time
.
Une valeur plus élevée donne plus de temps aux nœuds de calcul Airflow pour terminer leurs tâches
avec grâce. Si cette valeur est trop faible, les tâches Airflow risquent d'être interrompues.
brusquement, sans avoir assez de temps
pour terminer leur travail en douceur.
Résoudre les problèmes liés aux signaux SIGTERM
Les signaux SIGTERM sont utilisés par Linux,
Kubernetes, le programmeur Airflow et Celery pour arrêter les processus responsables de
des nœuds de calcul Airflow ou des tâches Airflow.
Plusieurs raisons peuvent expliquer pourquoi les signaux SIGTERM sont envoyés dans un environnement:
Une tâche est devenue zombie et doit être arrêtée.
L’ordonnanceur a découvert un doublon
d’une tâche et envoie une pilule poison et
SIGTERM indique à
la tâche de l'arrêter.
Dans Autoscaling horizontal des pods, le cluster GKE
Le plan de contrôle envoie des signaux SIGTERM pour supprimer les pods qui ne sont plus
nécessaires.
Le programmeur peut envoyer des signaux SIGTERM au processus DagFileProcessorManager.
Ces signaux SIGTERM sont utilisés par le planificateur pour gérer
DagFileProcessorManager, le cycle de vie du processus
peut être ignoré sans risque.
Exemple :
Launched DagFileProcessorManager with pid: 353002
Sending Signals.SIGTERM to group 353002. PIDs of all processes in the group: []
Sending the signal Signals.SIGTERM to group 353002
Sending the signal Signals.SIGTERM to process 353002 as process group is missing.
Condition de concurrence entre le rappel Heartbeat et les rappels de sortie dans le
local_task_job, qui surveille l'exécution de la tâche. Si les battements du cœur
détecte qu'une tâche a été marquée comme réussie, il ne peut pas distinguer si
La tâche a réussi ou qu'Airflow a été invité à prendre en compte la tâche
réussi. Néanmoins, il arrêtera un exécuteur de tâches, sans attendre
pour qu'il se ferme.
Ces signaux SIGTERM peuvent être ignorés sans risque. La tâche se trouve déjà dans le
et que l'exécution de l'exécution du DAG dans son ensemble n'est pas
concernés.
L'entrée de journal Received SIGTERM. est la seule différence entre l'entrée standard
et la fin de la tâche
à l’état de réussite.
<ph type="x-smartling-placeholder"></ph>
Figure 2 : Condition de concurrence entre les rappels de pulsation et de sortie (cliquez pour agrandir)
Un composant Airflow utilise plus de ressources (processeur, mémoire) que ne le permet le
le nœud de cluster.
Le service GKE effectue des opérations de maintenance
envoie des signaux SIGTERM aux pods qui s'exécutent sur un nœud sur le point d'être mis à niveau.
Lorsqu'une instance de tâche est terminée avec SIGTERM, vous pouvez voir le journal suivant
Entrées des journaux d'un nœud de calcul Airflow qui a exécuté la tâche:
{local_task_job.py:211} WARNING - State of this instance has been externally
set to queued. Terminating instance. {taskinstance.py:1411} ERROR - Received
SIGTERM. Terminating subprocesses. {taskinstance.py:1703} ERROR - Task failed
with exception
Solutions possibles:
Ce problème se produit lorsqu'une VM qui exécute la tâche est à court de mémoire. Ceci n'est pas
aux configurations Airflow, mais aussi à la quantité de mémoire disponible pour
VM.
L'augmentation de la mémoire dépend de la version de Cloud Composer
que vous utilisez. Exemple :
Dans Cloud Composer 2, vous pouvez attribuer davantage de ressources de processeur et de mémoire à Airflow.
les nœuds de calcul.
Si vous utilisez Cloud Composer 1, vous pouvez recréer votre environnement à l'aide d'un
plus performant.
Dans les deux versions de Cloud Composer, vous pouvez réduire la valeur
l'option de configuration Airflow de simultanéité [celery]worker_concurrency.
Cette option détermine le nombre de tâches exécutées simultanément par
Nœud de calcul Airflow.
Requêtes Cloud Logging pour identifier les motifs de redémarrage ou d'éviction des pods
Les environnements Cloud Composer utilisent des clusters GKE en tant qu'infrastructure de calcul
couche de données. Dans cette section, vous trouverez des requêtes utiles qui vous aideront à
trouver les motifs de redémarrages ou d'évictions des nœuds de calcul et du programmeur Airflow.
Les requêtes présentées ci-dessous peuvent être affinées de la manière suivante:
vous pouvez spécifier une chronologie
qui vous intéresse dans Cloud Logging.
(par exemple, les 6 dernières heures ou les trois derniers jours) ou vous pouvez définir une période personnalisée
vous pouvez aussi limiter la recherche à un pod spécifique en ajoutant le paramètre
POD_NAME
Découvrir les conteneurs redémarrés
resource.type="k8s_node"
log_id("kubelet")
jsonPayload.MESSAGE:"will be restarted"
resource.labels.cluster_name="CLUSTER_NAME"
Autre requête permettant de limiter les résultats à un pod spécifique:
resource.type="k8s_node"
log_id("kubelet")
jsonPayload.MESSAGE:"will be restarted"
resource.labels.cluster_name="CLUSTER_NAME"
"POD_NAME"
Découvrir l'arrêt des conteneurs à la suite d'un événement de saturation de la mémoire
resource.type="k8s_node"
log_id("events")
(jsonPayload.reason:("OOMKilling" OR "SystemOOM")
OR jsonPayload.message:("OOM encountered" OR "out of memory"))
severity=WARNING
resource.labels.cluster_name="CLUSTER_NAME"
Autre requête permettant de limiter les résultats à un pod spécifique:
resource.type="k8s_node"
log_id("events")
(jsonPayload.reason:("OOMKilling" OR "SystemOOM")
OR jsonPayload.message:("OOM encountered" OR "out of memory"))
severity=WARNING
resource.labels.cluster_name="CLUSTER_NAME"
"POD_NAME"
Impact des opérations de mise à jour ou de mise à niveau sur les exécutions de tâches Airflow
Les opérations de mise à jour ou de mise à niveau interrompent les tâches Airflow en cours d'exécution.
sauf si une tâche est exécutée en mode différable.
Nous vous recommandons d'effectuer ces opérations lorsque vous prévoyez un impact minimal
sur les exécutions de tâches Airflow et configurez des mécanismes de nouvelle tentative appropriés dans votre
DAG et tâches.
Dépannage des tâches KubernetesExecutor
CeleryKubernetesExecutor est un type d'exécuteur dans Cloud Composer 3
qui peuvent utiliser CeleryExecutor et KubernetesExecutor en même temps
en temps réel.
Consultez la page Utiliser CeleryKubernetesExecutor pour en savoir plus.
sur les tâches de dépannage exécutées avec KubernetesExecutor.
Problèmes courants
Les sections suivantes décrivent les symptômes et les correctifs potentiels de certains problèmes courants liés aux DAG.
La tâche Airflow a été interrompue par Negsignal.SIGKILL
Il peut arriver que votre tâche utilise plus de mémoire que ce qu'alloue le nœud de calcul Airflow.
Dans ce cas, elle peut être interrompue par Negsignal.SIGKILL. Le système
envoie ce signal pour éviter une utilisation de mémoire supplémentaire, ce qui peut affecter
l'exécution d'autres tâches Airflow. Dans le journal du nœud de calcul Airflow, vous pouvez voir
l'entrée de journal suivante:
{local_task_job.py:102} INFO - Task exited with return code Negsignal.SIGKILL
Negsignal.SIGKILL peut également s'afficher sous la forme de code -9.
Solutions possibles:
Réduction de worker_concurrency de nœuds de calcul Airflow.
Dans le cas de Cloud Composer 2, augmentez la mémoire des nœuds de calcul Airflow.
Dans le cas de Cloud Composer 1, passez à un type de machine plus grand utilisé dans
cluster Cloud Composer.
Optimisez vos tâches pour utiliser moins de mémoire.
Gérez les tâches gourmandes en ressources dans Cloud Composer à l'aide de
KubernetesPodOperator
ou GKEStartPodOperator pour
l'isolation des tâches
et l'allocation personnalisée des ressources.
La tâche échoue sans émettre de journaux en raison d'erreurs d'analyse DAG
Parfois, il peut y avoir des erreurs de DAG subtiles qui conduisent à une situation où
un programmeur Airflow et un processeur DAG peuvent planifier des tâches à exécuter
et pour analyser un fichier DAG (respectivement), mais le nœud de calcul Airflow n'exécute pas les tâches.
à partir d'un DAG, car il y a des erreurs
de programmation dans le fichier DAG Python. Cela pourrait
peut entraîner une situation où une tâche Airflow est marquée comme Failed.
et aucun journal de son exécution.
Solutions :
Vérifiez dans les journaux des nœuds de calcul Airflow qu'aucune erreur n'a été générée par
Nœud de calcul Airflow lié à des erreurs d'analyse du DAG ou du DAG manquantes.
Augmentez le nombre de paramètres liés à l'analyse du DAG:
La tâche échoue sans émettre de journaux en raison de la pression des ressources
Problème: lors de l'exécution d'une tâche, le sous-processus du nœud de calcul Airflow est responsable
pour l'exécution de la tâche Airflow est interrompue brusquement.
L'erreur visible dans le journal du nœud de calcul Airflow peut ressembler à l'exemple ci-dessous:
...
File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 412, in trace_task R = retval = fun(*args, **kwargs) File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 704, in __protected_call__ return self.run(*args, **kwargs) File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 88, in execute_command _execute_in_fork(command_to_exec) File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 99, in _execute_in_fork
raise AirflowException('Celery command failed on host: ' + get_hostname())airflow.exceptions.AirflowException: Celery command failed on host: airflow-worker-9qg9x
...
Solution :
Dans Cloud Composer 1, créez un environnement avec
Un type de machine plus volumineux que celui de la machine actuelle
de mots clés. Envisagez d'ajouter plus de nœuds à votre environnement et de réduire la capacité de [celery]worker_concurrency pour vos nœuds de calcul.
La tâche échoue sans émettre de journaux en raison de l'éviction du pod
Les pods Google Kubernetes Engine sont soumis au
Cycle de vie des pods Kubernetes et éviction des pods Tâche
les pics et la co-programmation des nœuds de calcul sont deux causes les plus courantes d'éviction des pods
dans Cloud Composer.
L'éviction des pods peut se produire lorsqu'un pod particulier utilise trop de ressources sur un nœud, par rapport aux attentes de consommation de ressources configurées pour le nœud. Pour
exemple, l'éviction peut se produire lorsque plusieurs
tâches gourmandes en mémoire s'exécutent dans un pod,
et leur charge combinée entraîne le dépassement du nombre d'instances
limite d'utilisation de la mémoire.
Si un pod de nœud de calcul Airflow est évincé, toutes les instances de tâche qui y sont exécutées sont interrompues, puis marquées comme ayant échoué par Airflow.
Les journaux sont mis en mémoire tampon. Si un pod de nœuds de calcul est évincé avant la purge du tampon, les journaux ne sont pas envoyés. L'échec de la tâche sans journaux indique que les nœuds de calcul Airflow sont redémarrés en raison d'une mémoire saturée (OOM, Out Of Memory). Certains journaux peuvent être présents dans Cloud Logging, même si les journaux Airflow n'ont pas été envoyés.
Pour afficher les journaux :
Dans la console Google Cloud, accédez à la page Environnements.
Dans la liste des environnements, cliquez sur le nom de votre environnement.
La page Détails de l'environnement s'ouvre.
Accédez à l'onglet Journaux.
Affichez les journaux de nœuds de calcul individuels sous Tous les journaux ->
Journaux Airflow -> Nœuds de calcul -> (travailleur individuel).
L'exécution du DAG est limitée en mémoire. L'exécution de chaque tâche commence par deux processus Airflow : l'exécution et la surveillance de la tâche. Chaque nœud peut accepter jusqu'à six tâches simultanées (environ 12 processus chargés avec des modules Airflow).
Vous pouvez utiliser plus de mémoire, en fonction de la nature du DAG.
Symptôme :
Dans la console Google Cloud, accédez à la page Charges de travail.
Si des pods airflow-worker affichent Evicted, cliquez sur chaque pod évincé et recherchez le message The node was low on resource: memory en haut de la fenêtre.
Correctif :
Dans Cloud Composer 1, créez un environnement Cloud Composer avec
Un type de machine plus volumineux que celui de la machine actuelle
de mots clés.
Assurez-vous que les tâches du DAG sont idempotentes et récupérables.
Évitez de télécharger des fichiers inutiles sur le système de fichiers local des nœuds de calcul Airflow.
La capacité du système de fichiers local des nœuds de calcul Airflow est limitée. Par exemple, dans
Cloud Composer 2, un nœud de calcul peut disposer de 1 Go à 10 Go d'espace de stockage. Lorsque
est insuffisant, le pod de nœuds de calcul Airflow est évincé
plan de contrôle GKE. Cela échoue toutes les tâches que l'éviction
était en cours d'exécution.
Exemples d'opérations problématiques:
télécharger des fichiers ou des objets et les stocker localement dans une instance Airflow ;
ou un nœud de calcul. Stockez plutôt ces objets directement dans un service approprié tel qu'un bucket Cloud Storage.
Accès aux objets volumineux du dossier /data à partir d'un nœud de calcul Airflow.
Le nœud de calcul Airflow télécharge l'objet dans son système de fichiers local. À la place, implémentez vos DAG de sorte que les fichiers volumineux soient traités en dehors du pod de nœuds de calcul Airflow.
Délai avant expiration de l'importation du chargement DAG
Symptôme :
Dans l'interface Web Airflow, en haut de la page listant les DAG, une alerte rouge
affiche Broken DAG: [/path/to/dagfile] Timeout.
Dans Cloud Monitoring: les journaux airflow-scheduler contiennent des entrées
semblable à:
ERROR - Process timed out
ERROR - Failed to import: /path/to/dagfile
AirflowTaskTimeout: Timeout
Correctif :
Remplacer Airflow dag_file_processor_timeout
et laissez plus de temps à l'analyse du DAG:
Section
Clé
Valeur
core
dag_file_processor_timeout
Nouvelle valeur du délai
L'exécution du DAG ne se termine pas dans le délai prévu
Symptôme :
Parfois, l'exécution d'un DAG ne s'arrête pas, car des tâches Airflow sont bloquées et le DAG s'exécute
dure plus longtemps que prévu. Dans des conditions normales, les tâches Airflow ne restent pas
indéfiniment en file d'attente ou en cours d'exécution, car Airflow dispose d'un délai avant expiration et
de nettoyage qui aident
à éviter cette situation.
Correctif :
Utilisez les
dagrun_timeout
pour les DAG. Exemple : dagrun_timeout=timedelta(minutes=120). Par conséquent, chaque exécution du DAG doit être
terminées dans le délai avant expiration de l'exécution du DAG et que les tâches non terminées sont marquées comme terminées
comme Failed ou Upstream Failed. Pour en savoir plus sur les états des tâches Airflow, consultez la page
Documentation Apache Airflow
Utilisez les
délai d'exécution de la tâche
pour définir un délai d'inactivité par défaut pour les tâches exécutées sur la base d'Apache
Opérateurs Airflow.
Exécutions DAG non exécutées
Symptôme :
Lorsqu'une date de planification est définie de manière dynamique pour un DAG, cela peut entraîner diverses
des effets secondaires inattendus. Exemple :
Une exécution du DAG se situe toujours dans le futur, et le DAG n'est jamais exécuté.
Les exécutions DAG précédentes sont marquées comme exécutées et réussies alors qu'elles n'ont pas été exécutées
exécuté.
Définissez une valeur start_date statique pour les DAG. Si vous le souhaitez, vous pouvez utiliser catchup=False.
pour désactiver l'exécution du DAG
pour des dates passées.
Évitez d'utiliser datetime.now() ou days_ago(<number of days>), sauf si
conscientes des
effets secondaires de cette approche.
Augmentation du trafic réseau vers et depuis la base de données Airflow
La quantité de trafic réseau entre le cluster GKE de votre environnement et la base de données Airflow dépend du nombre de DAG, du nombre de tâches dans les DAG et de la manière dont les DAG accèdent aux données de la base de données Airflow. Les facteurs suivants peuvent influencer l'utilisation du réseau :
Requêtes envoyées à la base de données Airflow. Si vos DAG effectuent de nombreuses requêtes, ils génèrent une grande quantité de trafic. Exemples : vérifier l'état des tâches avant de poursuivre avec d'autres tâches, interroger la table XCom, créer un vidage du contenu de la base de données Airflow.
Grand nombre de tâches. Plus le nombre de tâches à planifier est élevé, plus du trafic réseau est généré. Ces considérations s'appliquent au nombre total de tâches dans vos DAG et à la fréquence de planification. Lorsque le programmeur Airflow planifie l'exécution du DAG, il envoie des requêtes à la base de données Airflow et génère du trafic.
L'interface Web Airflow génère du trafic réseau, car elle envoie des requêtes à la base de données Airflow. L'utilisation intensive de pages avec des graphiques, des tâches et des schémas peut générer de gros volumes de trafic réseau.
Le DAG entraîne le plantage du serveur Web Airflow ou lui fait renvoyer une erreur 502 gateway timeout
Des défaillances du serveur Web peuvent survenir pour plusieurs raisons. Chèque
airflow-webserver se connecte
Cloud Logging pour déterminer la cause de l'erreur
Erreur 502 gateway timeout.
Calculs lourds
Cette section ne s'applique qu'à Cloud Composer 1.
Évitez d'exécuter des calculs lourds au moment de l'analyse du DAG.
Contrairement aux nœuds de calcul et de planificateur, dont les types de machines peuvent être personnalisés
ont une capacité de processeur et de mémoire supérieure, le serveur Web utilise un type de machine fixe,
ce qui peut entraîner des échecs d'analyse du DAG si le calcul du temps d'analyse est trop élevé
lourd.
Veuillez prendre en compte que le serveur Web dispose de deux processeurs virtuels et de 2 Go de mémoire.
La valeur par défaut pour core-dagbag_import_timeout est de 30 secondes. La valeur du délai avant expiration définit la limite supérieure de la durée pendant laquelle Airflow charge un module Python dans le dossier dags/.
Autorisations incorrectes
Cette section ne s'applique qu'à Cloud Composer 1.
Le serveur Web ne s'exécute pas sous le même compte de service que les nœuds de calcul et le planificateur. En tant que tels, les nœuds de calcul et le planificateur peuvent être en mesure d'accéder à des ressources gérées par l'utilisateur auxquelles le serveur Web n'a pas accès.
Nous vous recommandons d'éviter l'accès à des ressources non publiques lors de l'analyse du DAG. Parfois, c'est inévitable et vous devrez accorder des autorisations au compte de service du serveur Web. Le nom du compte de service est dérivé du domaine de serveur Web. Par exemple, si le domaine
est example-tp.appspot.com, le compte de service
example-tp@appspot.gserviceaccount.com
Erreurs du DAG
Cette section ne s'applique qu'à Cloud Composer 1.
Le serveur Web s'exécute sur App Engine et est distinct du cluster GKE de votre environnement. Le serveur Web analyse les fichiers de définition du DAG, et une erreur 502 gateway timeout peut se produire en cas d'erreurs dans le DAG. Airflow fonctionne normalement sans serveur Web fonctionnel si le
le DAG problématique n'interrompt aucun processus en cours d'exécution dans GKE.
Dans ce cas, vous pouvez utiliser gcloud composer environments run pour récupérer
les détails de votre environnement et comme solution de contournement si le serveur Web devient
indisponible.
Dans d'autres cas, vous pouvez exécuter l'analyse du DAG dans GKE et rechercher les DAG générant des exceptions fatales Python ou ce délai d'expiration (30 secondes par défaut).
Pour résoudre ce problème, connectez-vous à une interface système distante dans un conteneur de nœud de calcul Airflow et testez les erreurs de syntaxe. Pour en savoir plus, reportez-vous à la section Tester les DAG.
Gérer un grand nombre de DAG et de plug-ins dans les dossiers de DAG et de plug-ins
Le contenu des dossiers /dags et /plugins est synchronisé à partir de
le bucket de votre environnement aux systèmes de fichiers locaux des nœuds de calcul Airflow et
les planificateurs.
Plus il y a de données stockées dans ces dossiers, plus il faut de temps pour effectuer
la synchronisation. Pour résoudre ce type de situations:
Limitez le nombre de fichiers dans les dossiers /dags et /plugins. Stockez uniquement les
le nombre de fichiers requis.
Si possible, augmentez l'espace disque disponible pour les programmeurs Airflow et
les nœuds de calcul.
Si possible, augmentez le nombre de processeurs et la mémoire des programmeurs et des nœuds de calcul Airflow, afin de
que l'opération de synchronisation
est effectuée plus rapidement.
Dans le cas d'un très grand nombre de DAG, divisez les DAG en lots, compressez
dans des archives ZIP et les déployer dans le dossier /dags.
Cette approche accélère le processus de synchronisation des DAG. Composants Airflow
décompresser les archives zip
avant de traiter les DAG.
La génération de DAG dans un programme
peut aussi être une méthode pour limiter
le nombre de fichiers DAG stockés dans le dossier /dags.
Consultez la section sur les DAG programmatiques afin d'éviter
les problèmes liés à la planification et à l'exécution des DAG générés de manière automatisée.
Ne pas planifier les DAG générés de manière automatisée en même temps
La génération automatisée d'objets DAG à partir d'un fichier DAG est une méthode efficace
pour créer de nombreux DAG similaires qui ne présentent que de légères différences.
Il est important de ne pas programmer immédiatement l'exécution de tous ces DAG. Il y
est un risque élevé que les nœuds de calcul Airflow n'aient pas assez de processeurs et de mémoire
ressources pour exécuter toutes les tâches planifiées en même temps.
Pour éviter les problèmes de planification des DAG programmatiques:
Augmentez la simultanéité des nœuds de calcul et effectuez un scaling à la hausse de votre environnement afin qu'il puisse
exécuter plus de tâches simultanément.
générer des DAG de manière à répartir leurs planifications uniformément dans le temps
éviter de planifier des centaines de tâches en même temps, ce qui permet aux nœuds de calcul Airflow
d’avoir le temps d’exécuter
toutes les tâches planifiées.
L'exception Lost connection to Postgres server during query est générée pendant ou juste après l'exécution de la tâche.
Lost connection to Postgres server during query exception
se produisent souvent lorsque les conditions suivantes sont remplies:
Votre DAG utilise PythonOperator ou un opérateur personnalisé.
Votre DAG envoie des requêtes à la base de données Airflow.
Si plusieurs requêtes sont effectuées à partir d'une fonction appelable, les traces peuvent pointer vers la ligne self.refresh_from_db(lock_for_update=True) dans le code Airflow de manière incorrecte car il s'agit de la première requête de base de données après l'exécution de la tâche. La cause réelle de l'exception se produit avant, lorsqu'une session SQLAlchemy n'est pas correctement fermée.
Les sessions SQLAlchemy s'appliquent à un thread et sont créées dans une session de fonction appelable qui peut ensuite être prolongée dans le code Airflow. S'il existe des
entre les requêtes au cours d'une même session, il est possible que la connexion soit déjà
fermé par le serveur Postgres. Le délai avant expiration de la connexion dans les environnements Cloud Composer est d'environ 10 minutes.
Correctif :
Utilisez le décorateur airflow.utils.db.provide_session. Ce décorateur fournit une session valide à la base de données Airflow dans le paramètre session et ferme correctement la session à la fin de la fonction.
N'utilisez pas une seule fonction de longue durée. Déplacez plutôt toutes les requêtes de base de données vers des fonctions distinctes, afin qu'il existe plusieurs fonctions avec le décorateur airflow.utils.db.provide_session. Dans ce cas, les sessions sont automatiquement fermées après la récupération des résultats de la requête.
Contrôler le temps d'exécution des DAG, des tâches et des exécutions parallèles d'un même DAG
Si vous souhaitez contrôler la durée d'exécution d'un DAG spécifique
dure, vous pouvez alors utiliser
le paramètre DAG dagrun_timeout pour effectuer
donc. Par exemple, si vous vous attendez à ce qu'un seul DAG s'exécute (quel que soit le
l'exécution se termine par une réussite ou un échec) ne doit pas durer plus d'une heure,
puis définissez ce paramètre sur 3 600 secondes.
Vous pouvez également contrôler la durée d'une tâche Airflow. À faire
Vous pouvez donc utiliser execution_timeout.
Si vous souhaitez contrôler le nombre d'exécutions de DAG actives
DAG spécifique, vous pouvez utiliser [core]max-active-runs-per-dag
l'option de configuration Airflow correspondante.
Si vous souhaitez qu'une seule instance d'un DAG s'exécute à un moment donné, définissez
Paramètre max-active-runs-per-dag sur 1.
Problèmes affectant la synchronisation des DAG et des plug-ins avec les planificateurs, les nœuds de calcul et les serveurs Web
Cloud Composer synchronise le contenu des dossiers /dags et /plugins
aux planificateurs et aux nœuds de calcul. Certains objets dans les dossiers /dags et /plugins
peut empêcher cette synchronisation de fonctionner correctement ou du moins la ralentir.
Le dossier /dags est synchronisé avec les planificateurs et les nœuds de calcul. Ce dossier n'est pas synchronisé
vers des serveurs Web dans Cloud Composer 2 ou si vous activez DAG Serialization dans Cloud Composer 1.
Le dossier /plugins est synchronisé avec les programmeurs, les nœuds de calcul et les serveurs Web.
Il se peut que vous rencontriez les problèmes suivants:
Vous avez importé des fichiers compressés gzip qui utilisent
transcodage par compression vers /dags et /plugins
dossiers. Cela se produit généralement lorsque vous utilisez la commande gsutil cp -Z pour importer
des données dans le bucket.
Solution: supprimez l'objet qui a utilisé le transcodage par compression, puis réimportez-le.
dans le bucket.
L'un des objets s'appelle ".". Il n'est pas synchronisé avec
des planificateurs et des nœuds de calcul, et il pourrait arrêter de se synchroniser.
Solution: renommez l'objet problématique.
Un dossier et un fichier Python DAG portent les mêmes noms, par exemple a.py.
Dans ce cas, le fichier DAG n'est pas correctement synchronisé avec les composants Airflow.
Solution: supprimez le dossier portant le même nom qu'un fichier DAG Python.
L'un des objets des dossiers /dags ou /plugins contient un symbole /
à la fin du nom de l'objet. De tels objets peuvent induire en erreur le processus de synchronisation
car le symbole / signifie qu'un objet est un dossier, et non un fichier.
Solution: supprimez le symbole / du nom de l'objet problématique.
Ne stockez pas de fichiers inutiles dans les dossiers /dags et /plugins.
Parfois, les DAG et les plug-ins que vous implémentez sont accompagnés
des fichiers supplémentaires, tels que des fichiers
stockant des tests pour ces composants. Ces
sont synchronisés avec les workers et les planificateurs et ont un impact sur le temps nécessaire
copier ces fichiers sur les planificateurs, les nœuds de calcul et les serveurs Web.
Solution: ne stockez pas de fichiers supplémentaires et inutiles dans /dags et
/plugins dossiers.
L'erreur Done [Errno 21] Is a directory: '/home/airflow/gcs/dags/...' est générée par les programmeurs et les nœuds de calcul
Ce problème se produit parce
que les objets peuvent avoir
qui se chevauchent dans Cloud Storage. Dans le même temps,
les planificateurs et les nœuds de calcul
utilisent des systèmes de fichiers traditionnels. Par exemple, il est possible
pour ajouter à la fois un dossier et un objet portant le même nom au
bucket. Lorsque le bucket est synchronisé avec
les planificateurs et les nœuds de calcul de l'environnement,
cette erreur est générée, ce qui peut
entraîner des échecs de tâches.
Pour résoudre ce problème, assurez-vous que les espaces de noms ne se chevauchent pas dans
bucket de l'environnement. Par exemple, si /dags/misc (un fichier) et
/dags/misc/example_file.txt (autre fichier) se trouvent dans un bucket, une erreur est
générées par le programmeur.
Interruptions temporaires lors de la connexion à la base de données de métadonnées Airflow
Cloud Composer s'exécute sur une infrastructure cloud distribuée.
Cela signifie que de temps en temps, des problèmes
temporaires peuvent apparaître et ils peuvent
interrompre l'exécution de vos tâches Airflow.
Dans ce cas, les messages d'erreur suivants peuvent s'afficher dans le flux de travail des nœuds de calcul Airflow journaux:
"Can't connect to Postgres server on 'airflow-sqlproxy-service.default.svc.cluster.local' (111)"
ou
"Can't connect to Postgres server on 'airflow-sqlproxy-service.default.svc.cluster.local' (104)"
De tels problèmes intermittents peuvent également être causés par des opérations de maintenance
pour vos environnements Cloud Composer.
Généralement, ces erreurs sont intermittentes et si vos tâches Airflow sont idempotentes
et que vous avez configuré les tentatives, vous ne devriez pas les voir. Vous pouvez également
envisagez de définir des intervalles de maintenance.
Le manque de ressources dans votre système peut également s'expliquer par de telles erreurs.
cluster de l'environnement. Dans ce cas, vous pouvez augmenter ou optimiser
comme décrit dans la section
Effectuer le scaling d'environnements
Instructions pour optimiser votre environnement.
Une exécution du DAG est marquée comme ayant réussi, mais n'a aucune tâche exécutée
Si l'exécution execution_date d'un DAG est antérieure à la valeur start_date du DAG,
vous pouvez voir des exécutions de DAG n'ayant aucune exécution de tâche, mais qui sont tout de même marquées comme réussies.
<ph type="x-smartling-placeholder"></ph>
Figure 3 : Exécution réussie d'un DAG sans tâches exécutées (cliquez pour agrandir)
Cause
Cette situation peut se produire dans l'un des cas suivants:
Une non-concordance est causée par la différence de fuseau horaire entre les
execution_date et start_date. Cela peut se produire, par exemple, lorsque
en utilisant pendulum.parse(...) pour définir start_date.
Le start_date du DAG est défini sur une valeur dynamique, par exemple
airflow.utils.dates.days_ago(1)
Solution
Assurez-vous que execution_date et start_date utilisent le même fuseau horaire.
Spécifiez un start_date statique et combinez-le avec catchup=False pour éviter
DAG en cours d'exécution
dont la date de début est passée.
Un DAG n'est pas visible dans l'interface utilisateur d'Airflow ou de DAG, et le programmeur ne le programme pas
Le processeur DAG analyse chaque DAG avant qu'il puisse être planifié par le programmeur
et avant qu'un DAG ne soit visible
l'UI Airflow ou l'UI du DAG.
Les options de configuration Airflow suivantes définissent les délais avant expiration pour l'analyse des DAG:
Si un DAG n'est pas visible dans l'interface utilisateur d'Airflow ou du DAG:
Vérifier les journaux du processeur DAG s'il est capable de les traiter correctement
votre DAG. En cas de problème, les entrées de journal suivantes peuvent s'afficher
dans les journaux du processeur ou du programmeur DAG:
[2020-12-03 03:06:45,672] {dag_processing.py:1334} ERROR - Processor for
/usr/local/airflow/dags/example_dag.py with PID 21903 started at
2020-12-03T03:05:55.442709+00:00 has timed out, killing it.
Consultez les journaux du programmeur pour vérifier qu'il fonctionne correctement. En cas de
les entrées suivantes peuvent s'afficher dans les journaux du programmeur:
DagFileProcessorManager (PID=732) last sent a heartbeat 240.09 seconds ago! Restarting it
Process timed out, PID: 68496
Solutions :
Corrigez toutes les erreurs d'analyse du DAG. Le processeur DAG analyse plusieurs DAG
de rares cas d'analyse des erreurs d'un DAG peut avoir un impact négatif sur l'analyse
dans d'autres DAG.
Si l'analyse de votre DAG dure plus longtemps que le nombre de secondes défini dans
[core]dagrun_import_timeout,
puis augmentez ce délai.
Si l'analyse de tous vos DAG dure plus longtemps que le nombre de secondes défini
dans
[core]dag_file_processor_timeout,
puis augmentez ce délai.
Si l'analyse de votre DAG prend beaucoup de temps, cela peut également signifier qu'il n'est pas
mises en œuvre de manière optimale. Par exemple, s'il est lu
des variables d'environnement, ou effectue des appels vers des services externes ou Airflow,
base de données. Dans la mesure du possible, évitez d'effectuer de telles opérations
dans les sections globales des DAG.
Augmentez les ressources de processeur et de mémoire du programmeur afin qu'il puisse fonctionner plus rapidement.
Augmenter le nombre de processus de processeur DAG afin de pouvoir effectuer une analyse
plus rapidement. Pour ce faire, augmentez la valeur
[scheduler]parsing_process.
Sauf indication contraire, le contenu de cette page est régi par une licence Creative Commons Attribution 4.0, et les échantillons de code sont régis par une licence Apache 2.0. Pour en savoir plus, consultez les Règles du site Google Developers. Java est une marque déposée d'Oracle et/ou de ses sociétés affiliées.
Dernière mise à jour le 2024/07/18 (UTC).
[{
"type": "thumb-down",
"id": "hardToUnderstand",
"label":"Hard to understand"
},{
"type": "thumb-down",
"id": "incorrectInformationOrSampleCode",
"label":"Incorrect information or sample code"
},{
"type": "thumb-down",
"id": "missingTheInformationSamplesINeed",
"label":"Missing the information/samples I need"
},{
"type": "thumb-down",
"id": "translationIssue",
"label":"Problème de traduction"
},{
"type": "thumb-down",
"id": "otherDown",
"label":"Autre"
}]
[{
"type": "thumb-up",
"id": "easyToUnderstand",
"label":"Facile à comprendre"
},{
"type": "thumb-up",
"id": "solvedMyProblem",
"label":"J'ai pu résoudre mon problème"
},{
"type": "thumb-up",
"id": "otherUp",
"label":"Autre"
}]