Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
Cette page fournit des étapes de dépannage et des informations sur les problèmes courants liés aux planificateurs Airflow.
Identifier la source du problème
Pour commencer le dépannage, déterminez si le problème se produit au moment de l'analyse du DAG ou pendant le traitement des tâches au moment de l'exécution. Pour en savoir plus sur le temps d'analyse et la durée d'exécution du DAG, consultez la section Différence entre le temps d'analyse du DAG et la durée d'exécution du DAG.
Inspecter les journaux du processeur DAG
Si vous avez des DAG complexes, le processeur DAG, qui est exécuté par votre programmeur, peut ne pas analyser tous vos DAG. Cela peut entraîner de nombreux problèmes qui présentent les symptômes suivants.
Symptômes :
Si le processeur DAG rencontre des problèmes lors de l'analyse de vos DAG, les problèmes indiqués ci-dessous peuvent être combinés. Si les DAG sont générés de manière dynamique, ces problèmes peuvent avoir plus d'impact que les DAG statiques.
Les DAG ne sont pas visibles dans l'interface utilisateur d'Airflow et dans l'interface utilisateur du DAG.
Les DAG ne sont pas planifiés pour l'exécution.
Les journaux du processeur DAG contiennent des erreurs, par exemple:
dag-processor-manager [2023-04-21 21:10:44,510] {manager.py:1144} ERROR - Processor for /home/airflow/gcs/dags/dag-example.py with PID 68311 started at 2023-04-21T21:09:53.772793+00:00 has timed out, killing it.
ou
dag-processor-manager [2023-04-26 06:18:34,860] {manager.py:948} ERROR - Processor for /home/airflow/gcs/dags/dag-example.py exited with return code 1.
Les planificateurs Airflow rencontrent des problèmes qui entraînent leur redémarrage.
Les tâches Airflow planifiées pour l'exécution sont annulées, et les exécutions de DAG pour les DAG qui n'ont pas pu être analysés peuvent être marquées comme
failed
. Exemple :airflow-scheduler Failed to get task '<TaskInstance: dag-example.task1--1 manual__2023-04-17T10:02:03.137439+00:00 [removed]>' for dag 'dag-example'. Marking it as removed.
Solution :
Augmentation des paramètres liés à l'analyse des DAG :
Augmentez dagbag-import-timeout à au moins 120 secondes (ou plus, si nécessaire).
Augmenter dag-file-processor-timeout d'au moins 180 secondes (ou plus, si nécessaire). Cette valeur doit être supérieure à
dagbag-import-timeout
.
Corrigez ou supprimez les DAG qui causent des problèmes au processeur DAG.
Inspecter les durées d'analyse des DAG
Pour vérifier si le problème se produit au moment de l'analyse du DAG, procédez comme suit :
Console
Dans la console Google Cloud, vous pouvez utiliser la page Monitoring et l'onglet Journaux pour inspecter les durées d'analyse du DAG.
Inspectez les temps d'analyse des DAG sur la page de surveillance de Cloud Composer:
Dans la console Google Cloud, accédez à la page Environnements.
Dans la liste des environnements, cliquez sur le nom de votre environnement. La page Surveillance s'ouvre.
Dans l'onglet Surveillance, consultez le graphique Temps total d'analyse pour tous les fichiers DAG de la section Exécutions DAG et identifiez les problèmes potentiels.
Inspectez les durées d'analyse des DAG dans l'onglet Journaux Cloud Composer :
Dans la console Google Cloud, accédez à la page Environnements.
Dans la liste des environnements, cliquez sur le nom de votre environnement. La page Monitoring s'ouvre.
Accédez à l'onglet Journaux, puis à l'arborescence de navigation Tous les journaux. sélectionnez la section Gestionnaire de processeur DAG.
Examinez les journaux
dag-processor-manager
et identifiez les problèmes potentiels.
gcloud – Airflow 1
Exécutez la commande list_dags
avec l'option -r
pour afficher le temps d'analyse de tous vos DAG.
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
list_dags -- -r
Remplacez :
ENVIRONMENT_NAME
par le nom de l'environnement.LOCATION
par la région où se trouve l'environnement.
Le résultat de la commande ressemble à ceci :
-------------------------------------------------------------------
DagBag loading stats for /home/airflow/gcs/dags
-------------------------------------------------------------------
Number of DAGs: 5
Total task number: 13
DagBag parsing time: 0.6765180000000001
-----------+----------+---------+----------+-----------------------
file | duration | dag_num | task_num | dags
-----------+----------+---------+----------+-----------------------
/dag_1.py | 0.6477 | 1 | 2 | ['dag_1']
/dag_2.py | 0.018652 | 1 | 2 | ['dag_2']
/dag_3.py | 0.004024 | 1 | 6 | ['dag_3']
/dag_4.py | 0.003476 | 1 | 2 | ['dag_4']
/dag_5.py | 0.002666 | 1 | 1 | ['dag_5']
-----------+----------+---------+----------+-----------------------
Recherchez la valeur Temps d'analyse DagBag. Une valeur élevée peut indiquer que l'un de vos DAG n'est pas mis en œuvre de manière optimale. La table de sortie vous permet d'identifier les DAG dont la durée d'analyse est longue.
gcloud – Airflow 2
Utilisez la commande dags report
pour afficher la durée d'analyse de tous vos DAG.
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
dags report
Remplacez :
ENVIRONMENT_NAME
par le nom de l'environnement.LOCATION
par la région où se trouve l'environnement.
Le résultat de la commande ressemble à ceci :
Executing within the following Kubernetes cluster namespace: composer-2-0-31-airflow-2-3-3
file | duration | dag_num | task_num | dags
======================+================+=========+==========+===================
/manydagsbig.py | 0:00:00.038334 | 2 | 10 | serial-0,serial-0
/airflow_monitoring.py| 0:00:00.001620 | 1 | 1 | airflow_monitoring
Recherchez la valeur duration pour chacun des dags listés dans le tableau. Une valeur élevée peut indiquer que l'un de vos DAG n'est pas implémenté de manière optimale. La table de sortie vous permet d'identifier les DAG ayant une longue durée d'analyse.
Surveiller les tâches en cours et en file d'attente
Pour vérifier si des tâches sont bloquées dans une file d'attente, procédez comme suit :
Dans la console Google Cloud, accédez à la page Environnements.
Dans la liste des environnements, cliquez sur le nom de votre environnement. La page Détails de l'environnement s'ouvre.
Accédez à l'onglet Surveillance.
Dans l'onglet Surveillance, consultez le graphique Tâches Airflow dans la section Exécutions DAG et identifiez les problèmes potentiels. Tâches Airflow sont des tâches en file d'attente dans Airflow, elles peuvent être File d'attente d'agents Celery ou Kubernetes Executor. Les tâches en file d'attente Celery sont des instances de tâches placées dans la file d'attente de l'agent Celery.
Résoudre les problèmes rencontrés au moment de l'analyse du DAG
Les sections suivantes décrivent les symptômes et les correctifs potentiels pour certains problèmes courants qui surviennent au moment de l'analyse du DAG.
Analyse et planification des DAG dans Cloud Composer 1 et Airflow 1
L'efficacité de l'analyse des DAG a été considérablement améliorée dans Airflow 2. Si vous rencontrez des problèmes de performances liés à l'analyse et à la planification des DAG, la migration vers Airflow 2.
Dans Cloud Composer 1, le planificateur s'exécute sur les nœuds de cluster avec d'autres composants Cloud Composer. De ce fait, la charge individuelle peuvent être supérieurs ou inférieurs à d'autres nœuds. Les performances du planificateur (analyse et planification du DAG) peuvent varier en fonction du nœud sur lequel il s'exécute. De plus, un nœud individuel dans lequel les exécutions du programmeur peuvent changer à la suite des opérations de mise à niveau ou de maintenance. Cette limitation a été résolue dans Cloud Composer 2, où vous pouvez allouer des ressources de processeur et de mémoire au planificateur, et les performances du planificateur ne dépendent pas de la charge des nœuds du cluster.
Nombre limité de threads
Autoriser le gestionnaire de processeur DAG (la partie de l'ordonnanceur qui traite les fichiers DAG) d'utiliser seulement un nombre limité de threads peut avoir un impact la durée d'analyse de votre DAG.
Pour résoudre le problème, remplacez les options de configuration Airflow suivantes:
Pour Airflow 1.10.12 et versions antérieures, remplacez le paramètre
max_threads
:Section Clé Valeur Remarques scheduler
max_threads
NUMBER_OF_CORES_IN_MACHINE - 1
Remplacez NUMBER_OF_CORES_IN_MACHINE
par le nombre de cœurs
. sur les machines des nœuds de calcul.Pour Airflow 1.10.14 et versions ultérieures, remplacez la Paramètre
parsing_processes
:Section Clé Valeur Remarques scheduler
parsing_processes
NUMBER_OF_CORES_IN_MACHINE - 1
Remplacez NUMBER_OF_CORES_IN_MACHINE
par le nombre de cœurs
. sur les machines des nœuds de calcul.
Répartition du nombre et de la durée des tâches
Airflow est connu pour avoir des difficultés avec la planification d'un grand nombre de petites tâches. Dans de telles situations, il est préférable d'utiliser un nombre plus faible de tâches mieux consolidées.
La planification simultanée d'un grand nombre de DAG ou de tâches peut également être source de problèmes. Pour éviter ce problème, répartissez vos tâches de manière plus uniforme au fil du temps.
Résoudre les problèmes liés aux tâches en cours d'exécution et en file d'attente
Les sections suivantes décrivent les symptômes et les correctifs potentiels pour certains problèmes courants qui surviennent au moment de l'exécution ou de la mise en file d'attente des tâches.
Les files d'attente de tâches sont trop longues
Dans certains cas, une file d'attente de tâches peut être trop longue pour le planificateur. Pour en savoir plus sur l'optimisation des paramètres de nœud de calcul et de Celery, consultez la page Faire évoluer votre environnement Cloud Composer avec votre entreprise.
Utiliser la fonctionnalité TimeTable du planificateur Airflow
À partir d'Airflow 2.2, vous pouvez définir un calendrier pour un DAG à l'aide d'une nouvelle fonctionnalité appelée TimeTable.
Vous pouvez définir un horaire en utilisant l'une des méthodes suivantes:
Ressources de cluster limitées
Cette section ne s'applique qu'à Cloud Composer 1.
Vous risquez de rencontrer des problèmes de performances si le cluster GKE de votre environnement est trop petit pour gérer tous vos DAG et tâches. Dans ce cas, essayez l'une des solutions suivantes :
- Créez un environnement avec un type de machine offrant plus de performances et migrez vos DAG vers cet environnement.
- Créez d'autres environnements Cloud Composer et répartissez les DAG sur ces environnements.
- Modifiez le type de machine pour les nœuds GKE, comme décrit dans la section Mettre à niveau le type de machine pour les nœuds GKE. Cette procédure étant sujette aux erreurs, il s'agit de l'option la moins recommandée.
- Mettez à niveau le type de machine de l'instance Cloud SQL qui exécute la base de données Airflow dans votre environnement, par exemple à l'aide des commandes
gcloud composer environments update
. Les faibles performances de la base de données Airflow en sont peut-être la raison. le programmeur est lent.
Éviter la planification des tâches pendant les intervalles de maintenance
Vous pouvez définir des intervalles de maintenance spécifiques pour votre environnement. Au cours de ces périodes, des événements de maintenance pour Cloud SQL et GKE ont lieu.
Forcer le programmeur Airflow à ignorer les fichiers inutiles
Vous pouvez améliorer les performances du programmeur Airflow en ignorant les fichiers inutiles dans le dossier des DAG. Le programmeur Airflow ignore tous les fichiers et dossiers spécifiés dans le fichier .airflowignore
.
Pour que le programmeur Airflow ignore les fichiers inutiles, procédez comme suit :
- Créez un fichier
.airflowignore
. - Dans ce fichier, répertoriez les fichiers et dossiers à ignorer.
- Importez ce fichier dans le dossier
/dags
du bucket de votre environnement.
Pour en savoir plus sur le format de fichier .airflowignore
, consultez la documentation Airflow.
Le programmeur Airflow traite des DAG interrompus
Les utilisateurs Airflow peuvent interrompre un DAG pour en éviter l'exécution. Cela permet d'économiser les cycles de traitement des nœuds de calcul Airflow.
Cependant, le programmeur Airflow continue d'analyser les DAG interrompus. Si vous souhaitez vraiment améliorer les performances du programmeur Airflow, utilisez .airflowignore
ou supprimez les DAG interrompus dans le dossier des DAG.
Utilisation de "wait_for_downstream" dans vos DAG
Si vous définissez le paramètre wait_for_downstream
sur True
dans vos DAG, pour qu'une tâche réussisse, toutes les tâches immédiatement en aval de cette tâche doivent également réussir. Cela signifie que l'exécution des tâches d'un DAG donné peut être ralentie par l'exécution des tâches du DAG précédent. Pour en savoir plus, consultez
la documentation Airflow.
Les tâches placées en file d'attente trop longtemps seront annulées et replanifiées
Si une tâche Airflow est maintenue dans la file d'attente trop longtemps, le planificateur la replanifie à nouveau pour l'exécution (dans les versions d'Airflow antérieures à 2.3.1, la tâche est également marquée comme ayant échoué et est réessayée si elle est éligible à une nouvelle tentative).
Pour observer les symptômes de cette situation, vous pouvez consulter le graphique du nombre de tâches en file d'attente (onglet "Surveillance" dans l'UI de Cloud Composer). Si les pics de ce graphique ne diminuent pas dans un délai d'environ deux heures, les tâches seront probablement reprogrammées (sans journal) et suivies d'entrées de journal "Les tâches adoptées étaient toujours en attente…" dans les journaux de l'ordonnanceur. Dans ce cas, le message "Fichier journal introuvable" peut s'afficher. message dans les journaux des tâches Airflow, car la tâche n'a pas été exécutée.
En général, ce comportement est attendu, et l'instance suivante de la tâche planifiée doit être exécutée conformément au calendrier. Si vous observez de nombreux cas de ce type dans vos environnements Cloud Composer, cela peut signifier qu'il n'y a pas suffisamment de nœuds de calcul Airflow dans votre environnement pour traiter toutes les tâches planifiées.
Résolution: pour résoudre ce problème, vous devez vous assurer qu'il y a toujours de la capacité des nœuds de calcul Airflow pour exécuter des tâches en file d'attente. Par exemple, vous pouvez augmenter le nombre ou worker_concurrency. Vous pouvez également ajuster le parallélisme ou les pools pour éviter de mettre en file d'attente des tâches en plus grande quantité que votre capacité.
De manière sporadique, des tâches obsolètes peuvent bloquer l'exécution d'un DAG spécifique.
Dans des cas normaux, le programmeur Airflow doit pouvoir gérer les situations dans laquelle la file d'attente contient des tâches obsolètes. possible de les exécuter correctement (par exemple, un DAG auquel les tâches obsolètes appartiennent a été supprimé.
Si ces tâches obsolètes ne sont pas supprimées définitivement par le planificateur, vous devrez peut-être les supprimer manuellement. Vous pouvez le faire dans l'interface utilisateur d'Airflow, par exemple accédez à (Menu > Navigateur > Instances de tâches), recherchez les tâches en file d'attente appartenant à un DAG obsolète et supprimez-les.
Pour résoudre ce problème, mettez à niveau votre environnement vers Cloud Composer version 2.1.12 ou ultérieure.
Approche de Cloud Composer pour le paramètre [scheduler]min_file_process_interval
Cloud Composer modifie la façon dont [scheduler]min_file_process_interval
est utilisé par le programmeur Airflow.
Airflow 1
Si Cloud Composer utilise Airflow 1, les utilisateurs peuvent définir la valeur
de [scheduler]min_file_process_interval
entre 0 et 600 secondes. Valeurs supérieures à
600 secondes donnent les mêmes résultats que si [scheduler]min_file_process_interval
était défini sur 600 secondes.
Airflow 2
Dans Airflow 2, [scheduler]min_file_process_interval
ne peut être utilisé qu'avec
versions 1.19.9 et 2.0.26 ou plus récentes
Versions de Cloud Composer antérieures à 1.19.9 et 2.0.26
Dans ces versions,
[scheduler]min_file_process_interval
est ignoré.Cloud Composer versions 1.19.9 ou 2.0.26, ou versions plus récentes
Le programmeur Airflow est redémarré après un certain nombre de fois que tous les DAG sont programmées, et le paramètre
[scheduler]num_runs
contrôle le nombre de fois qu'elles sont effectuées par le planificateur. Lorsque le planificateur atteint les boucles de planification[scheduler]num_runs
, il est redémarré. Le planificateur est un composant sans état, et un tel redémarrage est un mécanisme d'autoréparation pour tous les problèmes que le planificateur peut rencontrer. Si aucune valeur n'est spécifiée, la valeur par défaut de[scheduler]num_runs
, soit 5 000, est appliquée.[scheduler]min_file_process_interval
permet de configurer la fréquence L'analyse du DAG a lieu, mais ce paramètre ne peut pas dépasser le temps requis pour qu'un planificateur effectue[scheduler]num_runs
lors de la programmation des DAG.
Scaling de la configuration Airflow
Airflow fournit des options de configuration Airflow qui contrôlent le nombre de tâches et de DAG Airflow pouvant être exécutés simultanément. Pour définir ces options de configuration, remplacez leurs valeurs pour votre environnement.
Simultanéité des nœuds de calcul
Le paramètre
[celery]worker_concurrency
contrôle le nombre maximal de tâches qu'un nœud de calcul Airflow peut exécuter en même temps. Si vous multipliez la valeur de ce paramètre par le nombre de nœuds de calcul Airflow dans votre environnement Cloud Composer, vous obtenez le nombre maximal de tâches pouvant être exécutées simultanément dans votre environnement. Ce est limité par l'option de configuration Airflow[core]parallelism
, qui est décrite plus en détail.Dans les environnements Cloud Composer 2, la valeur par défaut de La valeur
[celery]worker_concurrency
est calculée automatiquementPour les versions Airflow 2.3.3 et ultérieures,
[celery]worker_concurrency
est défini sur une valeur minimale sur 32, 12 * worker_CPU et 8 * worker_memory.Pour les versions Airflow 2.2.5 ou antérieures,
[celery]worker_concurrency
est défini sur 12 * nombre de nœuds de calcul CPU
Nombre maximal d'exécutions de DAG actives
L'option de configuration Airflow
[core]max_active_runs_per_dag
contrôle le nombre maximal d'exécutions de DAG actives par DAG. Si cette limite est atteinte, le programmeur ne crée pas d'autres exécutions de DAG.Une définition incorrecte de ce paramètre peut créer un problème où le programmeur limite l'exécution du DAG, car il ne peut pas créer d'autres instances d'exécution de DAG simultanées.
Nombre maximal de tâches actives par DAG
L'option de configuration Airflow
[core]max_active_tasks_per_dag
contrôle le nombre maximal d'instances de tâche pouvant s'exécuter simultanément dans chaque DAG. Il s'agit d'un paramètre au niveau du DAG.Si ce paramètre est défini de manière incorrecte, vous risquez de rencontrer un problème dans lequel l'exécution d'une instance de DAG est ralentie car le nombre de tâches de DAG pouvant être exécutées simultanément est limité.
Solution : augmentez
[core]max_active_tasks_per_dag
.Parallélisme et taille du pool
L'option de configuration Airflow
[core]parallelism
contrôle le nombre de tâches que le programmeur Airflow peut mettre en file d'attente dans la file d'attente de l'exécuteur une fois que toutes les dépendances pour ces tâches sont satisfaites.Il s'agit d'un paramètre global pour l'ensemble de la configuration Airflow.
Les tâches sont mises en file d'attente et exécutées dans un pool. Les environnements Cloud Composer n'utilisent qu'un seul pool. La taille de ce pool contrôle le nombre de tâches pouvant être mises en file d'attente simultanément par le programmeur. Si la taille du pool est trop faible, le programmeur ne pourra plus mettre de tâches en file d'attente avant même que les seuils (définis par l'option de configuration
[core]parallelism
et par l'option de configuration[celery]worker_concurrency
multipliée par le nombre de nœuds de calcul Airflow) ne soient dépassés.Vous pouvez configurer la taille du pool dans l'interface utilisateur Airflow (Menu > Admin > Pools). Ajustez la taille du pool en fonction du niveau de parallélisme attendu dans votre environnement.
En général,
[core]parallelism
est défini comme un produit du nombre maximal de nœuds de calcul et [celery]worker_concurrency.
Les DAG ne sont pas programmés par le programmeur en raison des délais avant expiration du processeur DAG
Pour en savoir plus sur ce problème, consultez la page Dépanner les DAG.
Marquage des tâches comme ayant échoué lorsque vous avez atteint dagrun_timeout
Le planificateur marque les tâches inachevées (en cours d'exécution, planifiées et mises en file d'attente) comme échouées si une exécution de DAG ne se termine pas dans un délai de dagrun_timeout
(un paramètre de DAG).
Solution :
Prolongez
dagrun_timeout
pour respecter le délai d'inactivité.(Cloud Composer 2) Augmentez le nombre de nœuds de calcul ou augmentez les paramètres de performances des nœuds de calcul pour accélérer l'exécution du DAG.
Symptômes lorsque la base de données Airflow est soumise à une pression de charge
Dans les journaux du planificateur Airflow, l'entrée de journal d'avertissement suivante peut parfois s'afficher :
Scheduler heartbeat got an exception: (_mysql_exceptions.OperationalError) (2006, "Lost connection to MySQL server at 'reading initial communication packet', system error: 0")"
Des symptômes similaires peuvent également être observés dans les journaux de nœuds de calcul Airflow:
Pour MySQL:
(_mysql_exceptions.OperationalError) (2006, "Lost connection to MySQL server at
'reading initial communication packet', system error: 0")"
Pour PostgreSQL:
psycopg2.OperationalError: connection to server at ... failed
Ces erreurs ou avertissements peuvent être le signe que la base de données Airflow est submergée par le nombre de connexions ouvertes ou le nombre de requêtes exécutées en même temps, que ce soit par des planificateurs ou d'autres composants Airflow tels que des nœuds de calcul, des déclencheurs et des serveurs Web.
Solutions possibles :
Supprimer les données inutiles de la base de données Airflow
Évoluez la base de données Airflow :
- (Cloud Composer 1) Modifiez le type de machine de l'instance Cloud SQL qui stocke la base de données Airflow de votre environnement.
- (Cloud Composer 2) Ajustez la taille de votre environnement.
Réduisez le nombre de planificateurs. Dans la plupart des cas, un ou deux planificateurs suffisent pour analyser et planifier les tâches Airflow. Il n'est pas recommandé de configurer plus de deux planificateurs, sauf si cela est justifié.
Évitez d'utiliser des variables globales dans les DAG Airflow: Variables d'environnement Cloud Composer et Variables Airflow.
Définissez [scheduler]scheduler-heartbeat-sec sur une valeur plus élevée, par exemple 15 secondes ou plus.
Définir [scheduler]job-heartbeat-sec sur une valeur plus élevée (par exemple, 30 secondes ou plus).
Définir [scheduler]scheduler_health_check_threshold à une valeur égale à
[scheduler]job-heartbeat-sec
multipliée par4
.
Le serveur Web affiche le message suivant : "The scheduler does not be running" (Le programmeur ne semble pas être en cours d'exécution). avertissement
Le planificateur envoie régulièrement son "heartbeat" à la base de données Airflow. Sur la base de ces informations, le serveur Web Airflow détermine si le planificateur est actif.
Parfois, si le planificateur est soumis à une forte charge, il peut ne pas être en mesure de signaler son "heartbeat" toutes les [scheduler]scheduler-heartbeat-sec.
Dans ce cas, le serveur Web Airflow peut afficher l'avertissement suivant :
The scheduler does not appear to be running. Last heartbeat was received <X>
seconds ago.
Solutions possibles:
Augmentez les ressources de processeur et de mémoire du planificateur.
Optimiser vos DAG pour que leur analyse et leur planification soient plus rapides consomment trop de ressources du programmeur.
Évitez d'utiliser des variables globales dans les DAG Airflow: Variables d'environnement Cloud Composer et Variables Airflow.
Augmentez la valeur de [scheduler]scheduler-health-check-threshold pour que le serveur Web attende plus longtemps avant de signaler l'indisponibilité du planificateur.
Solutions de contournement pour les problèmes rencontrés lors du remplissage des DAG
Il peut arriver que vous souhaitiez réexécuter des DAG déjà exécutés. Pour ce faire, utilisez l'outil de ligne de commande Airflow comme suit:
Airflow 1
gcloud composer environments run \
ENVIRONMENT_NAME \
--location LOCATION \
backfill -- -B \
-s START_DATE \
-e END_DATE \
DAG_NAME
Pour ne réexécuter que les tâches ayant échoué pour un DAG spécifique, utilisez également l'argument --rerun_failed_tasks
.
Airflow 2
gcloud composer environments run \
ENVIRONMENT_NAME \
--location LOCATION \
dags backfill -- -B \
-s START_DATE \
-e END_DATE \
DAG_NAME
Pour ne réexécuter que les tâches ayant échoué pour un DAG spécifique, utilisez également l'argument --rerun-failed-tasks
.
Remplacez :
ENVIRONMENT_NAME
par le nom de l'environnement.LOCATION
par la région où se trouve l'environnement.START_DATE
par une valeur pour le paramètre DAGstart_date
, dans le formatYYYY-MM-DD
.END_DATE
par une valeur pour le paramètre DAGend_date
, dans le formatYYYY-MM-DD
.DAG_NAME
par le nom du DAG.
L'opération de remplissage peut parfois entraîner une situation d'interblocage où un le remplissage n'est pas possible, car une tâche est verrouillée. Exemple :
2022-11-08 21:24:18.198 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.201 CET -------- --------- -------- ------------
2022-11-08 21:24:18.202 CET 2022-11-08 21:24:18.203 CET These tasks are deadlocked:
2022-11-08 21:24:18.203 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.204 CET ----------------------- ----------- ----------------------------------- ------------
2022-11-08 21:24:18.204 CET <DAG name> <Task name> backfill__2022-10-27T00:00:00+00:00 1
2022-11-08 21:24:19.249 CET Command exited with return code 1
...
2022-11-08 21:24:19.348 CET Failed to execute job 627927 for task backfill
Dans certains cas, vous pouvez utiliser les solutions de contournement suivantes pour résoudre les interblocages :
Désactivez le mini-planificateur en remplaçant
[core]schedule-after-task-execution
parFalse
.Exécutez des remplissages pour des plages de dates plus courtes. Par exemple, définissez
START_DATE
etEND_DATE
pour spécifier une période de seulement un jour.