Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
Ce tutoriel vous guide dans le diagnostic et la résolution des problèmes de planification et d'analyse des tâches qui entraînent un dysfonctionnement du planificateur, des erreurs d'analyse, de la latence et des échecs de tâches.
Introduction
Le programmeur Airflow est principalement affecté par deux facteurs : la planification des tâches et l'analyse des DAG. Les problèmes liés à l'un de ces facteurs peuvent avoir un impact négatif sur l'état et les performances de l'environnement.
Il arrive que trop de tâches soient planifiées simultanément. Dans ce cas, la file d'attente est pleine et les tâches restent à l'état "planifié" ou sont replanifiées après avoir été mises en file d'attente, ce qui peut entraîner des échecs de tâches et une latence des performances.
Un autre problème courant concerne la latence d'analyse et les erreurs causées par la complexité d'un code DAG. Par exemple, un code DAG contenant des variables Airflow au niveau supérieur du code peut entraîner des retards d'analyse, une surcharge de la base de données, des échecs de planification et des délais d'attente de DAG.
Dans ce tutoriel, vous allez diagnostiquer les exemples de DAG et apprendre à résoudre les problèmes de planification et d'analyse, à améliorer la planification des DAG et à optimiser le code DAG et les configurations d'environnement pour améliorer les performances.
Objectifs
Cette section liste les objectifs des exemples de ce tutoriel.
Exemple : dysfonctionnement du planificateur et latence causée par une forte concurrence des tâches
Importez l'exemple de DAG qui s'exécute plusieurs fois simultanément, puis diagnostiquez les problèmes de latence et de dysfonctionnement du planificateur avec Cloud Monitoring.
Optimisez votre code DAG en consolidant les tâches et évaluez l'impact sur les performances.
Répartissez les tâches de manière plus uniforme au fil du temps et évaluez l'impact sur les performances.
Optimisez vos configurations Airflow et d'environnement, et évaluez l'impact.
Exemple : erreurs d'analyse DAG et latence causées par un code complexe
Importez l'exemple de DAG avec des variables Airflow et diagnostiquez les problèmes d'analyse avec Cloud Monitoring.
Optimisez le code DAG en évitant les variables Airflow au niveau supérieur du code et évaluez l'impact sur le temps d'analyse.
Optimisez les configurations Airflow et d'environnement, et évaluez l'impact sur le temps d'analyse.
Coûts
Ce tutoriel utilise les composants facturables suivants de Google Cloud :
- Cloud Composer (voir les coûts supplémentaires)
- Cloud Monitoring
Une fois que vous avez terminé ce tutoriel, évitez de continuer à payer des frais en supprimant les ressources que vous avez créées. Pour en savoir plus, consultez Effectuer un nettoyage.
Avant de commencer
Cette section décrit les actions à effectuer avant de commencer le tutoriel.
Créer et configurer un projet
Pour ce tutoriel, vous avez besoin d'un projet Google Cloud. Configurez le projet comme suit :
Dans la console Google Cloud , sélectionnez ou créez un projet :
Assurez-vous que la facturation est activée pour votre projet. Découvrez comment vérifier si la facturation est activée sur un projet.
Assurez-vous que l'utilisateur de votre projet Google Cloud dispose des rôles suivants pour créer les ressources nécessaires :
- Administrateur de l'environnement et des objets Storage
(
roles/composer.environmentAndStorageObjectAdmin
) - Administrateur de Compute (
roles/compute.admin
)
- Administrateur de l'environnement et des objets Storage
(
Activer les API pour votre projet.
Enable the Cloud Composer API.
Créer votre environnement Cloud Composer
Créez un environnement Cloud Composer 2.
Lorsque vous créez l'environnement, vous attribuez le rôle Extension de l'agent de service de l'API Cloud Composer v2 (roles/composer.ServiceAgentV2Ext
) au compte de l'agent de service Composer. Cloud Composer utilise ce compte pour effectuer des opérations dans votre projet Google Cloud .
Exemple : dysfonctionnement du planificateur et échec des tâches en raison de problèmes de planification
Cet exemple montre comment déboguer un dysfonctionnement du planificateur et une latence causée par une forte concurrence des tâches.
Importer l'exemple de DAG dans votre environnement
Importez l'exemple de DAG suivant dans l'environnement que vous avez créé lors des étapes précédentes. Dans ce tutoriel, ce DAG est nommé dag_10_tasks_200_seconds_1
.
Ce DAG comporte 200 tâches. Chaque tâche attend une seconde et affiche "Terminé !". Le DAG est déclenché automatiquement une fois importé. Cloud Composer exécute ce DAG 10 fois, et toutes les exécutions de DAG ont lieu en parallèle.
import time
from datetime import datetime, timedelta
from airflow import DAG
from airflow.decorators import task
tasks_amount = 200
seconds = 1
minutes = 5
with DAG(
dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}",
start_date=datetime(2023, 11, 22, 20, 0),
end_date=datetime(2023, 11, 22, 20, 49),
schedule_interval=timedelta(minutes=minutes),
catchup=True,
) as dag:
@task
def create_subtasks(seconds: int) -> None:
time.sleep(seconds)
for i in range(tasks_amount):
create_subtasks(seconds)
Diagnostiquer les problèmes de dysfonctionnement du planificateur et d'échec des tâches
Une fois les exécutions du DAG terminées, ouvrez l'interface utilisateur Airflow et cliquez sur le DAG dag_10_tasks_200_seconds_1
. Vous verrez que 10 exécutions de DAG ont réussi, et que chacune d'elles comporte 200 tâches qui ont réussi.
Consultez les journaux des tâches Airflow :
Dans la console Google Cloud , accédez à la page Environnements.
Dans la liste des environnements, cliquez sur le nom de votre environnement. La page Détails de l'environnement s'ouvre.
Accédez à l'onglet Journaux, puis à Tous les journaux > Journaux Airflow > Nœuds de calcul > Afficher dans l'explorateur de journaux.
Dans l'histogramme des journaux, les erreurs et les avertissements sont indiqués en rouge et en orange :

Le DAG exemple a généré environ 130 avertissements et 60 erreurs. Cliquez sur n'importe quelle colonne contenant des barres jaunes et rouges. Vous verrez certains des avertissements et erreurs suivants dans les journaux :
State of this instance has been externally set to success. Terminating
instance.
Received SIGTERM. Terminating subprocesses.
worker: Warm shutdown (MainProcess).
Ces journaux peuvent indiquer que l'utilisation des ressources a dépassé les limites et que le nœud de calcul a redémarré automatiquement.
Si une tâche Airflow est conservée trop longtemps dans la file d'attente, le planificateur la marque comme ayant échoué et up_for_retry, et la replanifie pour exécution. Pour observer les symptômes de cette situation, vous pouvez consulter le graphique indiquant le nombre de tâches en file d'attente. Si les pics de ce graphique ne diminuent pas au bout de 10 minutes environ, il est probable que des tâches échouent (sans journaux).
Consultez les informations de surveillance :
Accédez à l'onglet Surveillance, puis sélectionnez Aperçu.
Examinez le graphique Tâches Airflow.
Figure 2 : Graphique des tâches Airflow (cliquez pour agrandir) Dans le graphique des tâches Airflow, on observe un pic de tâches en file d'attente qui dure plus de 10 minutes. Cela peut signifier qu'il n'y a pas assez de ressources dans votre environnement pour traiter toutes les tâches planifiées.
Examinez le graphique Nœuds de calcul actifs :
Figure 3 : Graphique des nœuds de calcul actifs (cliquez pour agrandir) Le graphique Nœuds de calcul actifs indique que le DAG a déclenché l'autoscaling jusqu'à la limite maximale autorisée de trois nœuds de calcul pendant l'exécution du DAG.
Les graphiques d'utilisation des ressources peuvent indiquer le manque de capacité des nœuds de calcul Airflow pour exécuter les tâches en file d'attente. Dans l'onglet Surveillance, sélectionnez Nœuds de calcul, puis examinez les graphiques Utilisation totale du processeur des nœuds de calcul et Utilisation totale de la mémoire des nœuds de calcul.
Figure 4 : Graphique de l'utilisation totale du processeur des nœuds de calcul (cliquez pour agrandir) Figure 5 : Graphique de l'utilisation totale de la mémoire des nœuds de calcul (cliquez pour agrandir) Les graphiques indiquent que l'exécution d'un trop grand nombre de tâches simultanément a entraîné le dépassement de la limite du processeur. Les ressources avaient été utilisées pendant plus de 30 minutes, ce qui est encore plus long que la durée totale de 200 tâches dans 10 exécutions de DAG s'exécutant les unes après les autres.
Ces indicateurs montrent que la file d'attente est pleine et qu'il n'y a pas assez de ressources pour traiter toutes les tâches planifiées.
Regrouper vos tâches
Le code actuel crée de nombreux DAG et tâches sans ressources suffisantes pour traiter toutes les tâches en parallèle, ce qui entraîne le remplissage de la file d'attente. Si les tâches restent trop longtemps dans la file d'attente, elles peuvent être reprogrammées ou échouer. Dans de telles situations, il est préférable d'utiliser un nombre plus faible de tâches mieux consolidées.
L'exemple de DAG suivant modifie le nombre de tâches dans l'exemple initial (de 200 à 20) et augmente le temps d'attente (de 1 à 10 secondes) pour imiter des tâches plus consolidées qui effectuent la même quantité de travail.
Importez l'exemple de DAG suivant dans l'environnement que vous avez créé. Dans ce tutoriel, ce DAG est nommé dag_10_tasks_20_seconds_10
.
import time
from datetime import datetime, timedelta
from airflow import DAG
from airflow.decorators import task
tasks_amount = 20
seconds = 10
minutes = 5
with DAG(
dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}",
start_date=datetime(2021, 12, 22, 20, 0),
end_date=datetime(2021, 12, 22, 20, 49),
schedule_interval=timedelta(minutes=minutes),
catchup=True,
) as dag:
@task
def create_subtasks(seconds: int) -> None:
time.sleep(seconds)
for i in range(tasks_amount):
create_subtasks(seconds)
Évaluez l'impact de tâches plus regroupées sur les processus de planification :
Attendez la fin des exécutions de DAG.
Dans l'UI Airflow, sur la page DAG, cliquez sur le DAG
dag_10_tasks_20_seconds_10
. Vous verrez 10 exécutions de DAG, chacune comportant 20 tâches réussies.Dans la console Google Cloud , accédez à la page Environnements.
Dans la liste des environnements, cliquez sur le nom de votre environnement. La page Détails de l'environnement s'ouvre.
Accédez à l'onglet Journaux, puis à Tous les journaux > Journaux Airflow > Nœuds de calcul > Afficher dans l'explorateur de journaux.
Le deuxième exemple, avec des tâches plus consolidées, a généré environ 10 avertissements et 7 erreurs. Dans l'histogramme, vous pouvez comparer le nombre d'erreurs et d'avertissements dans le premier exemple (valeurs antérieures) et le deuxième exemple (valeurs ultérieures).
Figure 6. Histogramme des journaux des nœuds de calcul Airflow après consolidation des tâches (cliquez pour agrandir) Si vous comparez le premier exemple avec celui qui est plus consolidé, vous constaterez qu'il y a beaucoup moins d'erreurs et d'avertissements dans le second exemple. Toutefois, les mêmes erreurs liées à l'arrêt progressif s'affichent toujours dans les journaux en raison de la surcharge des ressources.
Dans l'onglet Surveillance, sélectionnez Nœuds de calcul et examinez les graphiques.
Lorsque vous comparez le graphique Tâches Airflow du premier exemple (valeurs antérieures) avec celui du deuxième exemple, qui présente des tâches plus regroupées, vous pouvez constater que le pic de tâches en file d'attente a duré moins longtemps lorsque les tâches étaient plus regroupées. Cependant, il a duré près de 10 minutes, ce qui n'est toujours pas optimal.
Figure 7. Graphique des tâches Airflow après la consolidation des tâches (cliquez pour agrandir) Dans le graphique "Nœuds de calcul actifs", vous pouvez voir que le premier exemple (à gauche du graphique) a utilisé des ressources pendant une période beaucoup plus longue que le second, même si les deux exemples imitent la même quantité de travail.
Figure 8. Graphique des nœuds de calcul actifs après la consolidation des tâches (cliquez pour agrandir) Examinez les graphiques de consommation des ressources des nœuds de calcul. Même si la différence entre les ressources utilisées dans l'exemple avec plus de tâches consolidées et l'exemple initial est assez importante, l'utilisation du processeur atteint toujours 70 % de la limite.
Figure 9. Graphique de l'utilisation totale du processeur des nœuds de calcul après la consolidation des tâches (cliquez pour agrandir) Figure 10 : Graphique de l'utilisation totale de la mémoire des nœuds de calcul après la consolidation des tâches (cliquez pour agrandir)
Répartissez les tâches de manière plus uniforme au fil du temps.
Si le nombre de tâches simultanées est trop élevé, la file d'attente se remplit, ce qui entraîne le blocage ou la reprogrammation des tâches. Lors des étapes précédentes, vous avez réduit le nombre de tâches en les consolidant. Toutefois, les journaux de sortie et la surveillance ont indiqué que le nombre de tâches simultanées était toujours sous-optimal.
Vous pouvez contrôler le nombre d'exécutions de tâches simultanées en implémentant une planification ou en définissant des limites pour le nombre de tâches pouvant être exécutées simultanément.
Dans ce tutoriel, vous répartissez les tâches de manière plus uniforme au fil du temps en ajoutant des paramètres au niveau du DAG dans le DAG dag_10_tasks_20_seconds_10
:
Ajoutez l'argument
max_active_runs=1
au gestionnaire de contexte DAG. Cet argument limite l'exécution d'un DAG à une seule instance à la fois.Ajoutez l'argument
max_active_tasks=5
au gestionnaire de contexte DAG. Cet argument contrôle le nombre maximal d'instances de tâche pouvant s'exécuter simultanément dans chaque DAG.
Importez l'exemple de DAG suivant dans l'environnement que vous avez créé. Dans ce tutoriel, ce DAG est nommé dag_10_tasks_20_seconds_10_scheduled.py
.
import time
from datetime import datetime, timedelta
from airflow import DAG
from airflow.decorators import task
tasks_amount = 20
seconds = 10
minutes = 5
active_runs = 1
active_tasks = 5
with DAG(
dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}_runs_{active_runs}_tasks_{active_tasks}",
start_date=datetime(2021, 12, 22, 20, 0),
end_date=datetime(2021, 12, 22, 20, 49),
schedule_interval=timedelta(minutes=minutes),
max_active_runs=active_runs,
max_active_tasks=active_tasks,
catchup=True,
) as dag:
@task
def create_subtasks(seconds: int) -> None:
time.sleep(seconds)
for i in range(tasks_amount):
create_subtasks(seconds)
Évaluez l'impact de la répartition des tâches dans le temps sur les processus de planification :
Attendez la fin des exécutions de DAG.
Dans la console Google Cloud , accédez à la page Environnements.
Dans la liste des environnements, cliquez sur le nom de votre environnement. La page Détails de l'environnement s'ouvre.
Accédez à l'onglet Journaux, puis à Tous les journaux > Journaux Airflow > Nœuds de calcul > Afficher dans l'explorateur de journaux.
Sur l'histogramme, vous pouvez voir que le troisième DAG avec un nombre limité de tâches et d'exécutions actives n'a généré aucun avertissement ni aucune erreur, et que la distribution des journaux semble plus uniforme par rapport aux valeurs précédentes.
Figure 11 : Histogramme des journaux de nœuds de calcul Airflow après la consolidation et la distribution des tâches au fil du temps (cliquez pour agrandir)
Les tâches de l'exemple dag_10_tasks_20_seconds_10_scheduled
qui comportent un nombre limité de tâches et d'exécutions actives n'ont pas entraîné de pression sur les ressources, car les tâches ont été mises en file d'attente de manière uniforme.
Après avoir effectué les étapes décrites, vous avez optimisé l'utilisation des ressources en regroupant les petites tâches et en les répartissant plus uniformément dans le temps.
Optimiser les configurations d'environnement
Vous pouvez ajuster les configurations de votre environnement pour vous assurer que les nœuds de calcul Airflow disposent toujours de la capacité nécessaire pour exécuter les tâches en file d'attente.
Nombre de nœuds de calcul et simultanéité des nœuds de calcul
Vous pouvez ajuster le nombre maximal de nœuds de calcul pour que Cloud Composer effectue un scaling automatique de votre environnement dans les limites définies.
Le paramètre [celery]worker_concurrency
définit le nombre maximal de tâches qu'un seul nœud de calcul peut récupérer dans la file d'attente des tâches. La modification de ce paramètre ajuste le nombre de tâches qu'un seul nœud de calcul peut exécuter simultanément.
Vous pouvez modifier cette option de configuration Airflow en la remplaçant. Par défaut, la simultanéité des nœuds de calcul est définie en fonction du nombre d'instances de tâches simultanées légères qu'un nœud de calcul peut gérer. Cela signifie que sa valeur dépend des limites de ressources des nœuds de calcul.
La valeur de simultanéité des nœuds de calcul ne dépend pas du nombre de nœuds de calcul dans votre environnement.
Le nombre de nœuds de calcul et la simultanéité des nœuds de calcul fonctionnent ensemble. Les performances de votre environnement dépendent fortement de ces deux paramètres. Vous pouvez utiliser les considérations suivantes pour choisir la bonne combinaison :
Plusieurs tâches rapides exécutées en parallèle. Vous pouvez augmenter la simultanéité des nœuds de calcul lorsque des tâches sont en attente dans la file d'attente et que vos nœuds de calcul utilisent un faible pourcentage de leurs allocations de processeur et de mémoire. Toutefois, dans certaines circonstances, la file d'attente peut ne jamais se remplir, ce qui fait que l'autoscaling ne se déclenche jamais. Si les petites tâches se terminent avant que les nouveaux nœuds de calcul ne soient prêts, un nœud de calcul existant peut prendre en charge les tâches restantes. Il n'y aura donc aucune tâche pour les nœuds de calcul nouvellement créés.
Dans ces situations, il est recommandé d'augmenter le nombre minimal de nœuds de calcul et la simultanéité des nœuds de calcul pour éviter un scaling trop rapide.
Plusieurs tâches longues s'exécutent en parallèle. La simultanéité élevée des nœuds de calcul empêche le système de faire évoluer leur nombre. Si plusieurs tâches nécessitent beaucoup de ressources et prennent beaucoup de temps à s'exécuter, une simultanéité élevée des nœuds de calcul peut entraîner une file d'attente qui ne se remplit jamais et toutes les tâches sont récupérées par un seul nœud de calcul, ce qui entraîne des problèmes de performances. Dans ces situations, il est recommandé d'augmenter le nombre maximal de nœuds de calcul et de diminuer la simultanéité des nœuds de calcul.
L'importance du parallélisme
Les programmeurs Airflow contrôlent la planification des exécutions de DAG et des tâches individuelles des DAG. L'option de configuration Airflow [core]parallelism
contrôle le nombre de tâches que le programmeur Airflow peut mettre en file d'attente dans la file d'attente de l'exécuteur une fois que toutes les dépendances pour ces tâches sont satisfaites.
Le parallélisme est un mécanisme de protection d'Airflow qui détermine le nombre de tâches pouvant être exécutées en même temps par planificateur, quel que soit le nombre de nœuds de calcul. La valeur du parallélisme, multipliée par le nombre de planificateurs de votre cluster, correspond au nombre maximal d'instances de tâches que votre environnement peut mettre en file d'attente.
En général, [core]parallelism
est défini comme le produit d'un nombre maximal de nœuds de calcul et de [celery]worker_concurrency
. Elle est également affectée par le pool.
Vous pouvez modifier cette option de configuration Airflow en la remplaçant. Pour en savoir plus sur l'ajustement des configurations Airflow liées au scaling, consultez Scaling de la configuration Airflow.
Trouver les configurations d'environnement optimales
Pour résoudre les problèmes de planification, nous vous recommandons de regrouper les petites tâches en tâches plus importantes et de les répartir de manière plus uniforme au fil du temps. En plus d'optimiser le code DAG, vous pouvez également optimiser les configurations d'environnement pour disposer d'une capacité suffisante pour exécuter plusieurs tâches simultanément.
Par exemple, supposons que vous consolidez les tâches dans votre DAG autant que possible, mais que la limitation des tâches actives pour les répartir plus uniformément dans le temps n'est pas une solution privilégiée pour votre cas d'utilisation spécifique.
Vous pouvez ajuster les paramètres de parallélisme, du nombre de nœuds de calcul et de la simultanéité des nœuds de calcul pour exécuter le DAG dag_10_tasks_20_seconds_10
sans limiter les tâches actives. Dans cet exemple, le DAG s'exécute 10 fois et chaque exécution contient 20 petites tâches.
Si vous souhaitez les exécuter tous en même temps :
Vous aurez besoin d'une taille d'environnement plus importante, car elle contrôle les paramètres de performances de l'infrastructure Cloud Composer gérée de votre environnement.
Les nœuds de calcul Airflow doivent pouvoir exécuter 20 tâches simultanément. Vous devez donc définir la simultanéité des nœuds de calcul sur 20.
Les nœuds de calcul doivent disposer de suffisamment de processeurs et de mémoire pour gérer toutes les tâches. La simultanéité des nœuds de calcul est affectée par le processeur et la mémoire des nœuds de calcul. Vous aurez donc besoin d'au moins
worker_concurrency / 12
processeurs etleast worker_concurrency / 8
mémoire.Vous devrez augmenter le parallélisme pour qu'il corresponde à la simultanéité plus élevée des nœuds de calcul. Pour que les nœuds de calcul puissent récupérer 20 tâches de la file d'attente, le planificateur devra d'abord planifier ces 20 tâches.
Ajustez les configurations de votre environnement comme suit :
Dans la console Google Cloud , accédez à la page Environnements.
Dans la liste des environnements, cliquez sur le nom de votre environnement. La page Détails de l'environnement s'ouvre.
Accédez à l'onglet Configuration de l'environnement.
Recherchez la configuration Ressources > Charges de travail, puis cliquez sur Modifier.
Dans la section Nœud de calcul, dans le champ Mémoire, spécifiez la nouvelle limite de mémoire pour les nœuds de calcul Airflow. Dans ce tutoriel, utilisez 4 Go.
Dans le champ Processeur, spécifiez la nouvelle limite de processeurs pour les nœuds de calcul Airflow. Dans ce tutoriel, utilisez deux processeurs virtuels.
Enregistrez les modifications et attendez quelques minutes que vos nœuds de calcul Airflow redémarrent.
Ensuite, remplacez les options de configuration Airflow pour le parallélisme et la simultanéité des nœuds de calcul :
Accédez à l'onglet Remplacements de configuration Airflow.
Cliquez sur Modifier, puis sur Ajouter un remplacement de configuration Airflow.
Remplacez la configuration du parallélisme :
Section Clé Valeur core
parallelism
20
Cliquez sur Ajouter un remplacement de configuration Airflow et remplacez la configuration de la simultanéité des nœuds de calcul :
Section Clé Valeur celery
worker_concurrency
20
Cliquez sur Enregistrer et attendez que l'environnement mette à jour sa configuration.
Déclenchez à nouveau le même exemple de DAG avec les configurations ajustées :
Dans l'UI Airflow, accédez à la page DAG.
Recherchez le DAG
dag_10_tasks_20_seconds_10
et supprimez-le.Une fois le DAG supprimé, Airflow vérifie le dossier des DAG dans le bucket de votre environnement et exécute automatiquement le DAG à nouveau.
Une fois les exécutions de DAG terminées, examinez à nouveau l'histogramme des journaux. Sur le schéma, vous pouvez voir que l'exemple dag_10_tasks_20_seconds_10
avec des tâches plus consolidées n'a généré aucune erreur ni aucun avertissement lors de l'exécution avec la configuration d'environnement ajustée. Comparez les résultats aux données précédentes du diagramme, où le même exemple a généré des erreurs et des avertissements lors de l'exécution avec la configuration d'environnement par défaut.

Les configurations d'environnement et d'Airflow jouent un rôle crucial dans la planification des tâches. Toutefois, il n'est pas possible d'augmenter les configurations au-delà de certaines limites.
Nous vous recommandons d'optimiser le code DAG, de regrouper les tâches et d'utiliser la planification pour optimiser les performances et l'efficacité.
Exemple : Erreurs d'analyse de DAG et latence dues à un code DAG complexe
Dans cet exemple, vous allez examiner la latence d'analyse d'un exemple de DAG qui imite un excès de variables Airflow.
Créer une variable Airflow
Avant d'importer l'exemple de code, créez une variable Airflow.
Dans la console Google Cloud , accédez à la page Environnements.
Dans la colonne Serveur Web Airflow, cliquez sur le lien Airflow correspondant à votre environnement.
Accédez à Admin > Variables > Ajouter un enregistrement.
Définissez les valeurs suivantes :
- clé :
example_var
- val :
test_airflow_variable
- clé :
Importer l'exemple de DAG dans votre environnement
Importez l'exemple de DAG suivant dans l'environnement que vous avez créé lors des étapes précédentes. Dans ce tutoriel, ce DAG est nommé dag_for_loop_airflow_variable
.
Ce DAG contient une boucle "for" qui s'exécute 1 000 fois et imite un excès de variables Airflow. Chaque itération lit la variable example_var
et génère une tâche. Chaque tâche contient une commande qui affiche la valeur de la variable.
from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.models import Variable
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 2, 17),
'retries': 0
}
dag = DAG(
'dag_for_loop_airflow_variable',
default_args=default_args,
catchup=False,
schedule_interval="@daily"
)
for i in range(1000):
a = Variable.get('example_var', 'N/A')
task = BashOperator(
task_id=f'task_{i}',
bash_command='echo variable foo=${foo_env}',
dag=dag,
env={'foo_env': a}
)
Diagnostiquer les problèmes d'analyse
Le temps d'analyse du DAG correspond au temps nécessaire au planificateur Airflow pour lire et analyser un fichier DAG. Avant de pouvoir planifier une tâche d'un DAG, le programmeur Airflow doit analyser le fichier DAG pour découvrir la structure du DAG et les tâches définies.
Si l'analyse d'un DAG prend beaucoup de temps, cela consomme les capacités du programmeur et peut réduire les performances des exécutions DAG.
Pour surveiller la durée d'analyse des DAG :
Exécutez la commande de CLI Airflow
dags report
dans gcloud CLI pour afficher le temps d'analyse de tous vos DAG :gcloud composer environments run ENVIRONMENT_NAME \ --location LOCATION \ dags report
Remplacez les éléments suivants :
ENVIRONMENT_NAME
: nom de votre environnementLOCATION
: région où se trouve l'environnement.
Dans le résultat de la commande, recherchez la valeur de durée du DAG
dag_for_loop_airflow_variables
. Une valeur élevée peut indiquer que ce DAG n'est pas mis en œuvre de manière optimale. Si vous avez plusieurs DAG, la table de sortie vous permet d'identifier ceux dont la durée d'analyse est longue.Exemple :
file | duration | dag_num | task_num | dags ====================+================+=========+==========+===================== /dag_for_loop_airfl | 0:00:14.773594 | 1 | 1000 | dag_for_loop_airflow ow_variable.py | | | | _variable /airflow_monitoring | 0:00:00.003035 | 1 | 1 | airflow_monitoring .py
Inspecter les durées d'analyse des DAG dans la console Google Cloud :
- Dans la console Google Cloud , accédez à la page Environnements.
Dans la liste des environnements, cliquez sur le nom de votre environnement. La page Détails de l'environnement s'ouvre.
Accédez à l'onglet Journaux, puis à Tous les journaux > Gestionnaire de traitement des DAG.
Consultez les journaux
dag-processor-manager
et identifiez les problèmes potentiels.Figure 13. Les journaux du gestionnaire de processeur DAG affichent les durées d'analyse des DAG (cliquez pour agrandir).
Si la durée totale d'analyse des DAG dépasse environ 10 secondes, vos planificateurs peuvent être surchargés par l'analyse des DAG et ne pas pouvoir exécuter efficacement les DAG.
Optimiser le code du DAG
Il est recommandé d'éviter le code Python "de premier niveau" inutile dans vos DAG. Les DAG comportant de nombreuses importations, variables et fonctions en dehors du DAG entraînent des temps d'analyse plus longs pour le programmeur Airflow. Cela réduit les performances et l'évolutivité de Cloud Composer et d'Airflow. Un excès de lecture des variables Airflow entraîne un long temps d'analyse et une charge de base de données élevée. Si ce code se trouve dans un fichier DAG, ces fonctions s'exécutent à chaque battement de cœur du planificateur, ce qui peut être lent.
Les champs de modèle d'Airflow vous permettent d'intégrer des valeurs provenant de variables Airflow et de modèles Jinja dans vos DAG. Cela évite l'exécution inutile de fonctions lors des pulsations du planificateur.
Pour mieux implémenter l'exemple de DAG, évitez d'utiliser des variables Airflow dans le code Python de premier niveau des DAG. Au lieu de cela, transmettez les variables Airflow aux opérateurs existants via un modèle Jinja, ce qui retardera la lecture de la valeur jusqu'à l'exécution de la tâche.
Importez la nouvelle version de l'exemple de DAG dans votre environnement. Dans ce tutoriel, ce DAG est nommé dag_for_loop_airflow_variable_optimized
.
from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 2, 17),
'retries': 0
}
dag = DAG(
'dag_for_loop_airflow_variable_optimized',
default_args=default_args,
catchup=False,
schedule_interval='@daily'
)
for i in range(1000):
task = BashOperator(
task_id=f'bash_use_variable_good_{i}',
bash_command='echo variable foo=${foo_env}',
dag=dag,
env={'foo_env': '{{ var.value.get("example_var") }}'},
)
Inspectez la nouvelle durée d'analyse du DAG :
Attendez la fin de l'exécution du DAG.
Exécutez à nouveau la commande
dags report
pour afficher le temps d'analyse de tous vos DAG :file | duration | dag_num | task_num | dags ====================+================+=========+==========+===================== /dag_for_loop_airfl | 0:00:37.000369 | 1 | 1000 | dag_for_loop_airflow ow_variable.py | | | | _variable /dag_for_loop_airfl | 0:00:01.109457 | 1 | 1000 | dag_for_loop_airflow ow_variable_optimiz | | | | _variable_optimized ed.py | | | | /airflow_monitoring | 0:00:00.040510 | 1 | 1 | airflow_monitoring .py | | | |
Examinez à nouveau les journaux
dag-processor-manager
et analysez la durée d'analyse.Figure 14. Les journaux du gestionnaire de processeur DAG affichent les durées d'analyse des DAG après l'optimisation du code DAG (cliquez pour agrandir).
En remplaçant les variables d'environnement par des modèles Airflow, vous avez simplifié le code DAG et réduit la latence d'analyse par un facteur 10 environ.
Optimiser les configurations d'environnement Airflow
Le planificateur Airflow essaie constamment de déclencher de nouvelles tâches et analyse tous les DAG dans le bucket de votre environnement. Si l'analyse de vos DAG prend beaucoup de temps et que le programmateur consomme beaucoup de ressources, vous pouvez optimiser les configurations du programmateur Airflow pour qu'il utilise les ressources plus efficacement.
Dans ce tutoriel, l'analyse des fichiers DAG prend beaucoup de temps et les cycles d'analyse commencent à se chevaucher, ce qui épuise la capacité du programmeur. Dans notre exemple, le premier DAG prend plus de cinq secondes à analyser. Vous allez donc configurer le planificateur pour qu'il s'exécute moins souvent afin d'utiliser les ressources plus efficacement. Vous allez remplacer l'option de configuration Airflow scheduler_heartbeat_sec
. Cette configuration définit la fréquence d'exécution du planificateur (en secondes). Par défaut, la valeur est définie sur 5 secondes.
Vous pouvez modifier cette option de configuration Airflow en la remplaçant.
Remplacez l'option de configuration Airflow scheduler_heartbeat_sec
:
Dans la console Google Cloud , accédez à la page Environnements.
Dans la liste des environnements, cliquez sur le nom de votre environnement. La page Détails de l'environnement s'ouvre.
Accédez à l'onglet Remplacements de configuration Airflow.
Cliquez sur Modifier, puis sur Ajouter un remplacement de configuration Airflow.
Remplacez l'option de configuration Airflow :
Section Clé Valeur scheduler
scheduler_heartbeat_sec
10
Cliquez sur Enregistrer et attendez que l'environnement mette à jour sa configuration.
Vérifiez les métriques du planificateur :
Accédez à l'onglet Surveillance, puis sélectionnez Planificateurs.
Dans le graphique Signal de présence du planificateur, cliquez sur le bouton Autres options (trois points), puis sur Afficher dans l'explorateur de métriques.

Sur le graphique, vous verrez que le planificateur s'exécute deux fois moins souvent après avoir modifié la configuration par défaut de 5 à 10 secondes. En réduisant la fréquence des pulsations, vous vous assurez que le planificateur ne commence pas à s'exécuter alors que le cycle d'analyse précédent est en cours et que la capacité de ressources du planificateur n'est pas épuisée.
Attribuer davantage de ressources au planificateur
Dans Cloud Composer 2, vous pouvez allouer davantage de ressources de processeur et de mémoire au planificateur. Vous pouvez ainsi améliorer les performances de votre programmateur et accélérer le temps d'analyse de votre DAG.
Allouez des ressources de processeur et de mémoire supplémentaires au programmateur :
Dans la console Google Cloud , accédez à la page Environnements.
Dans la liste des environnements, cliquez sur le nom de votre environnement. La page Détails de l'environnement s'ouvre.
Accédez à l'onglet Configuration de l'environnement.
Recherchez la configuration Ressources > Charges de travail, puis cliquez sur Modifier.
Dans la section Planificateur, dans le champ Mémoire, spécifiez la nouvelle limite de mémoire. Dans ce tutoriel, utilisez 4 Go.
Dans le champ CPU, spécifiez la nouvelle limite de processeur. Dans ce tutoriel, utilisez deux processeurs virtuels.
Enregistrez les modifications et attendez plusieurs minutes que vos planificateurs Airflow redémarrent.
Accédez à l'onglet Journaux, puis à Tous les journaux > Gestionnaire de traitement des DAG.
Examinez les journaux
dag-processor-manager
et comparez la durée d'analyse des DAG d'exemple :Figure 16. Les journaux du gestionnaire de processeur de DAG indiquent les durées d'analyse des DAG après l'attribution de ressources supplémentaires au planificateur (cliquez pour agrandir)
En attribuant plus de ressources au planificateur, vous avez augmenté sa capacité et réduit considérablement la latence d'analyse par rapport aux configurations d'environnement par défaut. Avec plus de ressources, le planificateur peut analyser les DAG plus rapidement. Toutefois, les coûts associés aux ressources Cloud Composer augmenteront également. De plus, il est impossible d'augmenter les ressources au-delà d'une certaine limite.
Nous vous recommandons d'allouer des ressources uniquement après avoir implémenté les optimisations possibles du code DAG et de la configuration Airflow.
Effectuer un nettoyage
Pour éviter que les ressources utilisées dans ce tutoriel ne soient facturées sur votre compte Google Cloud , supprimez le projet contenant les ressources, ou conservez le projet et supprimez les ressources individuelles.
Supprimer le projet
- In the Google Cloud console, go to the Manage resources page.
- In the project list, select the project that you want to delete, and then click Delete.
- In the dialog, type the project ID, and then click Shut down to delete the project.
Supprimer des ressources individuelles
Si vous envisagez d'explorer plusieurs tutoriels et guides de démarrage rapide, réutiliser des projets peut vous aider à ne pas dépasser les limites de quotas des projets.
Supprimez l'environnement Cloud Composer. Vous allez également supprimer le bucket de l'environnement au cours de cette procédure.
Étapes suivantes
- Optimiser les performances et les coûts de votre environnement
- Faire évoluer des environnements
- Découvrez comment résoudre les problèmes liés aux DAG.