Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
Cette page fournit des étapes de dépannage et des informations sur les problèmes courants liés aux planificateurs et aux processeurs de DAG Airflow.
Identifier la source du problème
Pour commencer le dépannage, déterminez si le problème se produit :
- Au moment de l'analyse du DAG, lorsque le DAG est analysé par un processeur DAG Airflow
- Au moment de l'exécution, lorsque le DAG est traité par un programmeur Airflow
Pour en savoir plus sur le temps d'analyse et la durée d'exécution du DAG, consultez la section Différence entre le temps d'analyse du DAG et la durée d'exécution du DAG.
Inspecter les problèmes de traitement des DAG
Surveiller les tâches en cours et en file d'attente
Pour vérifier si des tâches sont bloquées dans une file d'attente, procédez comme suit :
Dans la console Google Cloud , accédez à la page Environnements.
Dans la liste des environnements, cliquez sur le nom de votre environnement. La page Détails de l'environnement s'ouvre.
Accédez à l'onglet Surveillance.
Dans l'onglet Surveillance, consultez le graphique Tâches Airflow dans la section Exécutions DAG et identifiez les problèmes potentiels. Les tâches Airflow sont des tâches en file d'attente dans Airflow. Elles peuvent être envoyées à la file d'attente de l'agent Celery ou de l'exécuteur Kubernetes. Les tâches en file d'attente Celery sont des instances de tâches placées dans la file d'attente de l'agent Celery.
Résoudre les problèmes rencontrés au moment de l'analyse du DAG
Les sections suivantes décrivent les symptômes et les correctifs potentiels pour certains problèmes courants qui surviennent au moment de l'analyse du DAG.
Répartition du nombre et de la durée des tâches
Airflow peut rencontrer des problèmes lors de la planification simultanée d'un grand nombre de DAG ou de tâches. Pour éviter les problèmes de programmation, vous pouvez :
- Ajustez vos DAG pour utiliser un nombre plus faible de tâches mieux consolidées.
- Ajustez les intervalles de planification de vos DAG pour répartir plus uniformément les exécutions de DAG au fil du temps.
Scaling de la configuration Airflow
Airflow fournit des options de configuration Airflow qui contrôlent le nombre de tâches et de DAG Airflow pouvant être exécutés simultanément. Pour définir ces options de configuration, remplacez leurs valeurs pour votre environnement. Vous pouvez également définir certaines de ces valeurs au niveau du DAG ou de la tâche.
Simultanéité des nœuds de calcul
Le paramètre
[celery]worker_concurrency
contrôle le nombre maximal de tâches qu'un nœud de calcul Airflow peut exécuter en même temps. Si vous multipliez la valeur de ce paramètre par le nombre de nœuds de calcul Airflow dans votre environnement Cloud Composer, vous obtenez le nombre maximal de tâches pouvant être exécutées simultanément dans votre environnement. Ce nombre est limité par l'option de configuration Airflow[core]parallelism
, décrite en détail plus bas.Dans les environnements Cloud Composer 3, la valeur par défaut de
[celery]worker_concurrency
est calculée automatiquement en fonction du nombre d'instances de tâches simultanées légères qu'un nœud de calcul peut prendre en charge. Cela signifie que sa valeur dépend des limites de ressources des nœuds de calcul. La valeur de simultanéité des nœuds de calcul ne dépend pas du nombre de nœuds de calcul dans votre environnement.Nombre maximal d'exécutions de DAG actives
L'option de configuration Airflow
[core]max_active_runs_per_dag
contrôle le nombre maximal d'exécutions de DAG actives par DAG. Si cette limite est atteinte, le programmeur ne crée pas d'autres exécutions de DAG.Une définition incorrecte de ce paramètre peut créer un problème où le programmeur limite l'exécution du DAG, car il ne peut pas créer d'autres instances d'exécution de DAG simultanées.
Vous pouvez également définir cette valeur au niveau du DAG avec le paramètre
max_active_runs
.Nombre maximal de tâches actives par DAG
L'option de configuration Airflow
[core]max_active_tasks_per_dag
contrôle le nombre maximal d'instances de tâche pouvant s'exécuter simultanément dans chaque DAG.Si ce paramètre est défini de manière incorrecte, vous risquez de rencontrer un problème dans lequel l'exécution d'une instance de DAG est ralentie car le nombre de tâches de DAG pouvant être exécutées simultanément est limité. Dans ce cas, vous pouvez augmenter la valeur de cette option de configuration.
Vous pouvez également définir cette valeur au niveau du DAG avec le paramètre
max_active_tasks
.Vous pouvez utiliser les paramètres
max_active_tis_per_dag
etmax_active_tis_per_dagrun
au niveau de la tâche pour contrôler le nombre d'instances avec un ID de tâche spécifique autorisées à s'exécuter par DAG et par exécution de DAG.Parallélisme et taille du pool
L'option de configuration Airflow
[core]parallelism
contrôle le nombre de tâches que le programmeur Airflow peut mettre en file d'attente dans la file d'attente de l'exécuteur une fois que toutes les dépendances pour ces tâches sont satisfaites.Il s'agit d'un paramètre global pour l'ensemble de la configuration Airflow.
Les tâches sont mises en file d'attente et exécutées dans un pool. Les environnements Cloud Composer n'utilisent qu'un seul pool. La taille de ce pool contrôle le nombre de tâches pouvant être mises en file d'attente simultanément par le programmeur. Si la taille du pool est trop faible, le programmeur ne pourra plus mettre de tâches en file d'attente avant même que les seuils (définis par l'option de configuration
[core]parallelism
et par l'option de configuration[celery]worker_concurrency
multipliée par le nombre de nœuds de calcul Airflow) ne soient dépassés.Vous pouvez configurer la taille du pool dans l'interface utilisateur Airflow (Menu > Admin > Pools). Ajustez la taille du pool en fonction du niveau de parallélisme attendu dans votre environnement.
En général,
[core]parallelism
est défini comme le produit du nombre maximal de nœuds de calcul et de[celery]worker_concurrency
.
Résoudre les problèmes liés aux tâches en cours d'exécution et en file d'attente
Les sections suivantes décrivent les symptômes et les correctifs potentiels pour certains problèmes courants qui surviennent au moment de l'exécution ou de la mise en file d'attente des tâches.
Les exécutions de DAG ne sont pas exécutées
Symptôme :
Lorsque la date de planification d'un DAG est définie de manière dynamique, cela peut entraîner divers effets secondaires inattendus. Exemple :
Une exécution de DAG est toujours dans le futur et le DAG n'est jamais exécuté.
Les exécutions DAG passées sont marquées comme exécutées et réussies, même si elles ne l'ont pas été.
Pour en savoir plus, consultez la documentation Apache Airflow.
Solutions possibles :
Suivez les recommandations de la documentation Apache Airflow.
Définissez des
start_date
statiques pour les DAG. Vous pouvez également utilisercatchup=False
pour désactiver l'exécution du DAG pour les dates passées.Évitez d'utiliser
datetime.now()
oudays_ago(<number of days>)
, sauf si vous êtes conscient des effets secondaires de cette approche.
Utiliser la fonctionnalité TimeTable du planificateur Airflow
Cloud Composer 3 n'est pas compatible avec les plug-ins personnalisés pour le planificateur Airflow, y compris les tableaux horaires implémentés dans les DAG. Les plug-ins ne sont pas synchronisés avec les planificateurs de votre environnement.
Vous pouvez toujours utiliser les tableaux de planification intégrés dans Cloud Composer 3.
Éviter la planification des tâches pendant les intervalles de maintenance
Vous pouvez définir des intervalles de maintenance pour votre environnement afin que la maintenance de cet environnement ait lieu en dehors des périodes où vous exécutez vos DAG. Vous pouvez toujours exécuter vos DAG pendant les intervalles de maintenance, à condition que l'interruption et la nouvelle tentative de certaines tâches soient acceptables. Pour en savoir plus sur l'impact des intervalles de maintenance sur votre environnement, consultez Spécifier des intervalles de maintenance.
Utilisation de "wait_for_downstream" dans vos DAG
Si vous définissez le paramètre wait_for_downstream
sur True
dans vos DAG, pour qu'une tâche réussisse, toutes les tâches immédiatement en aval de cette tâche doivent également réussir. Cela signifie que l'exécution des tâches d'un DAG donné peut être ralentie par l'exécution des tâches du DAG précédent. Pour en savoir plus, consultez la documentation Airflow.
Les tâches mises en file d'attente depuis trop longtemps seront annulées et reprogrammées.
Si une tâche Airflow est conservée trop longtemps dans la file d'attente, le planificateur la reprogrammera pour exécution une fois que le délai défini dans l'option de configuration Airflow [scheduler]task_queued_timeout
sera écoulé. La valeur par défaut est 2400
.
Pour observer les symptômes de cette situation, vous pouvez consulter le graphique indiquant le nombre de tâches en file d'attente (onglet "Surveillance" de l'interface utilisateur Cloud Composer). Si les pics de ce graphique ne diminuent pas au bout de deux heures environ, les tâches seront très probablement reprogrammées (sans journaux), suivies d'entrées de journal "Les tâches adoptées étaient toujours en attente..." dans les journaux du planificateur. Dans ce cas, le message "Fichier journal introuvable..." peut s'afficher dans les journaux des tâches Airflow, car la tâche n'a pas été exécutée.
En général, ce comportement est attendu et la prochaine instance de la tâche planifiée est censée être exécutée selon la planification. Si vous constatez de nombreux cas de ce type dans vos environnements Cloud Composer, cela peut signifier qu'il n'y a pas assez de nœuds de calcul Airflow dans votre environnement pour traiter toutes les tâches planifiées.
Solution : Pour résoudre ce problème, vous devez vous assurer que les nœuds de calcul Airflow disposent toujours de la capacité nécessaire pour exécuter les tâches en file d'attente. Par exemple, vous pouvez augmenter le nombre de nœuds de calcul ou la simultanéité des nœuds de calcul. Vous pouvez également ajuster le parallélisme ou les pools pour éviter de mettre en file d'attente des tâches au-delà de votre capacité.
Approche de Cloud Composer concernant le paramètre min_file_process_interval
Cloud Composer modifie la façon dont [scheduler]min_file_process_interval
est utilisé par le planificateur Airflow.
Le planificateur Airflow est redémarré après un certain nombre de fois où tous les DAG sont planifiés. Le paramètre [scheduler]num_runs
contrôle le nombre de fois où cela est effectué par le planificateur. Lorsque le planificateur atteint [scheduler]num_runs
boucles de planification, il est redémarré. Le planificateur est un composant sans état. Un tel redémarrage est un mécanisme d'autoréparation pour tout problème que le planificateur pourrait rencontrer. La valeur par défaut de [scheduler]num_runs
est 5 000.
[scheduler]min_file_process_interval
peut être utilisé pour configurer la fréquence d'analyse des DAG, mais ce paramètre ne peut pas être plus long que le temps nécessaire à un planificateur pour effectuer des boucles [scheduler]num_runs
lors de la planification de vos DAG.
Marquer les tâches comme ayant échoué après avoir atteint dagrun_timeout
Le planificateur marque les tâches non terminées (en cours d'exécution, planifiées et en file d'attente) comme ayant échoué si une exécution de DAG ne se termine pas dans dagrun_timeout
(paramètre de DAG).
Solution :
Prolongez
dagrun_timeout
pour respecter le délai d'expiration.Augmentez le nombre de nœuds de calcul ou augmentez les paramètres de performances des nœuds de calcul pour que le DAG s'exécute plus rapidement.
Symptômes d'une charge importante sur la base de données Airflow
Il arrive que l'entrée de journal d'avertissement suivante s'affiche dans les journaux des nœuds de calcul Airflow :
psycopg2.OperationalError: connection to server at ... failed
Ces erreurs ou avertissements peuvent être le signe d'une surcharge de la base de données Airflow en raison du nombre de connexions ouvertes ou du nombre de requêtes exécutées en même temps, que ce soit par les planificateurs ou par d'autres composants Airflow tels que les nœuds de calcul, les déclencheurs et les serveurs Web.
Solutions possibles :
Supprimez les données inutiles de la base de données Airflow.
Faites évoluer la base de données Airflow en ajustant la taille de votre environnement.
Réduisez le nombre de planificateurs et de processeurs DAG. Commencez par un programmateur et un processeur de DAG, puis augmentez le nombre de programmateurs ou de processeurs de DAG si vous constatez que ces composants approchent de leurs limites de ressources.
Évitez d'utiliser des variables globales dans les DAG Airflow. Utilisez plutôt les variables d'environnement et les variables Airflow.
Définissez
[scheduler]scheduler_heartbeat_sec
sur une valeur plus élevée, par exemple 15 secondes ou plus.Définissez
[scheduler]job_heartbeat_sec
sur une valeur plus élevée, par exemple 30 secondes ou plus.Définissez
[scheduler]scheduler_health_check_threshold
sur une valeur égale à[scheduler]job_heartbeat_sec
multiplié par4
.
Le serveur Web affiche l'avertissement "Le planificateur ne semble pas être en cours d'exécution"
Le planificateur signale régulièrement sa pulsation à la base de données Airflow. En fonction de ces informations, le serveur Web Airflow détermine si le planificateur est actif.
Parfois, si le planificateur est fortement sollicité, il peut ne pas être en mesure de signaler son signal de vie toutes les [scheduler]scheduler_heartbeat_sec
secondes.
Dans ce cas, le serveur Web Airflow peut afficher l'avertissement suivant :
The scheduler does not appear to be running. Last heartbeat was received <X>
seconds ago.
Solutions possibles :
Augmentez les ressources de processeur et de mémoire pour le programmateur.
Optimisez vos DAG afin que leur analyse et leur planification soient plus rapides et ne consomment pas trop de ressources du planificateur.
Évitez d'utiliser des variables globales dans les DAG Airflow. Utilisez plutôt les variables d'environnement et les variables Airflow.
Augmentez la valeur de l'option de configuration Airflow
[scheduler]scheduler_health_check_threshold
afin que le serveur Web attende plus longtemps avant de signaler l'indisponibilité du planificateur.
Solutions de contournement pour les problèmes rencontrés lors du remplissage des DAG
Il peut arriver que vous souhaitiez réexécuter des DAG qui ont déjà été exécutés. Pour ce faire, utilisez une commande de CLI Airflow comme suit :
gcloud composer environments run \
ENVIRONMENT_NAME \
--location LOCATION \
dags backfill -- -B \
-s START_DATE \
-e END_DATE \
DAG_NAME
Pour relancer uniquement les tâches ayant échoué pour un DAG spécifique, utilisez également l'argument --rerun-failed-tasks
.
Remplacez :
ENVIRONMENT_NAME
par le nom de l'environnement.LOCATION
par la région dans laquelle se trouve l'environnement.START_DATE
avec une valeur pour le paramètre DAGstart_date
, au formatYYYY-MM-DD
.END_DATE
avec une valeur pour le paramètre DAGend_date
, au formatYYYY-MM-DD
.DAG_NAME
par le nom du DAG.
Il arrive que l'opération de remplissage génère une situation de blocage dans laquelle le remplissage n'est pas possible, car une tâche est verrouillée. Exemple :
2022-11-08 21:24:18.198 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.201 CET -------- --------- -------- ------------
2022-11-08 21:24:18.202 CET 2022-11-08 21:24:18.203 CET These tasks are deadlocked:
2022-11-08 21:24:18.203 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.204 CET ----------------------- ----------- ----------------------------------- ------------
2022-11-08 21:24:18.204 CET <DAG name> <Task name> backfill__2022-10-27T00:00:00+00:00 1
2022-11-08 21:24:19.249 CET Command exited with return code 1
...
2022-11-08 21:24:19.348 CET Failed to execute job 627927 for task backfill
Dans certains cas, vous pouvez utiliser les solutions de contournement suivantes pour surmonter les blocages :
Désactivez le mini-planificateur en remplaçant
[core]schedule_after_task_execution
parFalse
.Exécutez des remplissages pour des plages de dates plus courtes. Par exemple, définissez
START_DATE
etEND_DATE
pour spécifier une période d'un jour seulement.