Déboguer les problèmes de planification des tâches

Cloud Composer 1 | Cloud Composer 2

Ce tutoriel vous guide dans le diagnostic et la résolution des problèmes de planification et d'analyse des tâches qui entraînent un dysfonctionnement du programmeur, des erreurs d'analyse et de la latence, ainsi que des échecs de tâches.

Introduction

Le programmeur Airflow est principalement affecté par deux facteurs: la planification des tâches et l'analyse du DAG. Les problèmes liés à l'un de ces facteurs peuvent avoir un impact négatif sur la santé et les performances de l'environnement.

Parfois, trop de tâches sont planifiées simultanément. Dans cette situation, la file d'attente est remplie et les tâches restent à l'état "planifiée" ou sont reprogrammées après avoir été mises en file d'attente, ce qui peut entraîner l'échec des tâches et une latence des performances.

Un autre problème courant concerne l'analyse de la latence et des erreurs causées par la complexité du code d'un DAG. Par exemple, un code DAG contenant des variables Airflow au niveau supérieur peut entraîner des délais d'analyse, une surcharge de la base de données, des échecs de planification et des délais d'inactivité du DAG.

Dans ce tutoriel, vous allez diagnostiquer les exemples de DAG et apprendre à résoudre les problèmes de planification et d'analyse, à améliorer la planification des DAG, et à optimiser le code des DAG et les configurations d'environnement pour améliorer les performances.

Objectifs

Cette section liste les objectifs des exemples de ce tutoriel.

Exemple: dysfonctionnement du programmeur et latence causés par un nombre élevé de simultanéités de tâches

  • Importer l'exemple de DAG qui s'exécute plusieurs fois simultanément, et diagnostiquer les dysfonctionnements du programmeur et les problèmes de latence avec Cloud Monitoring

  • Optimisez le code de votre DAG en regroupant les tâches et en évaluant l'impact sur les performances.

  • Répartissez les tâches plus régulièrement dans le temps et évaluez l'impact sur les performances.

  • Optimisez vos configurations Airflow et les configurations d'environnement, et évaluez l'impact.

Exemple: erreurs d'analyse du DAG et latence causées par un code complexe

  • Importer l'exemple de DAG avec des variables Airflow et diagnostiquer les problèmes d'analyse avec Cloud Monitoring

  • Optimiser le code du DAG en évitant les variables Airflow au niveau supérieur du code et évaluer l'impact sur la durée d'analyse.

  • Optimiser les configurations Airflow et l'environnement, et évaluer l'impact sur la durée d'analyse

Coûts

Ce tutoriel utilise les composants facturables suivants de Google Cloud :

Une fois que vous avez terminé ce tutoriel, évitez de continuer à payer des frais en supprimant les ressources que vous avez créées. Pour en savoir plus, consultez la section Effectuer un nettoyage.

Avant de commencer

Cette section décrit les actions requises avant de démarrer le tutoriel.

Créer et configurer un projet

Pour ce tutoriel, vous avez besoin d'un projet Google Cloud. Configurez le projet de la manière suivante:

  1. Dans la console Google Cloud, sélectionnez ou créez un projet:

    Accéder au sélecteur de projet

  2. Assurez-vous que la facturation est activée pour votre projet. Découvrez comment vérifier si la facturation est activée sur un projet.

  3. Assurez-vous que l'utilisateur de votre projet Google Cloud dispose des rôles suivants pour créer les ressources nécessaires:

    • Administrateur de l'environnement et des objets Storage (roles/composer.environmentAndStorageObjectAdmin)
    • Administrateur de Compute (roles/compute.admin)

Activer les API pour votre projet.

Activez l'API Cloud Composer

Activer l'API

Créer l'environnement Cloud Composer

Créez un environnement Cloud Composer 2.

Dans le cadre de la création de l'environnement, vous attribuez le rôle Extension d'agent de service de l'API Cloud Composer v2 (roles/composer.ServiceAgentV2Ext) au compte d'agent de service Composer. Cloud Composer utilise ce compte pour effectuer des opérations dans votre projet Google Cloud.

Exemple: Dysfonctionnement du planificateur et échec des tâches en raison de problèmes de planification des tâches

Cet exemple illustre le débogage des dysfonctionnements et de la latence du programmeur causés par un haut niveau de simultanéité des tâches.

Importer l'exemple de DAG dans votre environnement

Importez l'exemple de DAG suivant dans l'environnement que vous avez créé aux étapes précédentes. Dans ce tutoriel, ce DAG est nommé dag_10_tasks_200_seconds_1.

Ce DAG comporte 200 tâches. Chaque tâche attend 1 seconde et affiche le message "Complete!" (Terminé). Le DAG se déclenche automatiquement une fois importé. Cloud Composer exécute ce DAG 10 fois, et toutes les exécutions du DAG ont lieu en parallèle.

import time
from datetime import datetime, timedelta

from airflow import DAG
from airflow.decorators import task

tasks_amount = 200
seconds = 1
minutes = 5

with DAG(
    dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}",
    start_date=datetime(2023, 11, 22, 20, 0),
    end_date=datetime(2023, 11, 22, 20, 49),
    schedule_interval=timedelta(minutes=minutes),
    catchup=True,
) as dag:

    @task
    def create_subtasks(seconds: int) -> None:
        time.sleep(seconds)

    for i in range(tasks_amount):
        create_subtasks(seconds)

Diagnostiquer les dysfonctionnements du planificateur et des tâches

Une fois le DAG exécuté, ouvrez l'interface utilisateur d'Airflow, puis cliquez sur le DAG dag_10_tasks_200_seconds_1. Vous constaterez que 10 exécutions du DAG au total ont abouti, et que chacune contient 200 tâches ayant abouti.

Consultez les journaux des tâches Airflow:

  1. Dans la console Google Cloud, accédez à la page Environnements.

    Accéder à la page Environnements

  2. Dans la liste des environnements, cliquez sur le nom de votre environnement. La page Détails de l'environnement s'ouvre.

  3. Accédez à l'onglet Journaux, puis à Tous les journaux > Journaux Airflow > Nœuds de calcul > Afficher dans l'explorateur de journaux.

Dans l'histogramme des journaux, les erreurs et les avertissements sont indiqués en rouge et en orange:

Histogramme des journaux de nœuds de calcul Airflow avec les erreurs et les avertissements indiqués en rouge et en orange
Figure 1 : Histogramme des journaux de nœuds de calcul Airflow (cliquez pour agrandir)

L'exemple de DAG a généré environ 130 avertissements et 60 erreurs. Cliquez sur n'importe quelle colonne contenant des barres jaunes et rouges. Certains des avertissements et erreurs suivants s'afficheront dans les journaux:

State of this instance has been externally set to success. Terminating
instance.

Received SIGTERM. Terminating subprocesses.

worker: Warm shutdown (MainProcess).

Ces journaux peuvent indiquer que l'utilisation des ressources a dépassé les limites et que le nœud de calcul a redémarré lui-même.

Si une tâche Airflow est conservée dans la file d'attente trop longtemps, le programmeur la marque comme ayant échoué et up_for_retry, puis la replanifiera pour son exécution. Une façon d'observer les symptômes de cette situation consiste à examiner le graphique avec le nombre de tâches en file d'attente. Si les pics de ce graphique ne baissent pas au bout de 10 minutes environ, il y aura probablement des échecs de tâches (sans journaux).

Examinez les informations de surveillance:

  1. Accédez à l'onglet Surveillance, puis sélectionnez Aperçu.

  2. Examinez le graphique Tâches Airflow.

    Graphique des tâches Airflow au fil du temps, montrant un pic du nombre de tâches en file d'attente
    Figure 2 : Graphique des tâches Airflow (cliquez pour agrandir)

    Dans le graphique des tâches Airflow, il y a un pic de tâches en file d'attente qui dure plus de 10 minutes, ce qui peut signifier que votre environnement n'a pas suffisamment de ressources pour traiter toutes les tâches planifiées.

  3. Examinez le graphique Active workers (Nœuds de calcul actifs) :

    Le graphique des nœuds de calcul Airflow actifs au fil du temps montre que le nombre de nœuds de calcul actifs a été augmenté jusqu'à la limite maximale
    Figure 3 : Graphique des nœuds de calcul actifs (cliquez pour agrandir)

    Le graphique Nœuds de calcul actifs indique que le DAG a déclenché l'autoscaling jusqu'à la limite maximale autorisée de trois nœuds de calcul lors de l'exécution du DAG.

  4. Les graphiques d'utilisation des ressources peuvent indiquer le manque de capacité des nœuds de calcul Airflow à exécuter les tâches en file d'attente. Dans l'onglet Surveillance, sélectionnez Nœuds de calcul, puis consultez les graphiques Utilisation totale du processeur par les nœuds de calcul et Utilisation totale de la mémoire des nœuds de calcul.

    Le graphique de l'utilisation du processeur par les nœuds de calcul Airflow montre que l'utilisation du processeur augmente jusqu'à la limite maximale
    Figure 4 : Graphique de l'utilisation totale du processeur par les nœuds de calcul (cliquez pour agrandir)
    Le graphique d'utilisation de la mémoire par les nœuds de calcul Airflow indique que l'utilisation de la mémoire augmente, mais n'atteint pas la limite maximale
    Figure 5 : Graphique sur l'utilisation totale de la mémoire des nœuds de calcul (cliquez pour agrandir)

    Les graphiques indiquent que l'exécution simultanée d'un trop grand nombre de tâches a entraîné l'atteinte de la limite du processeur. Les ressources avaient été utilisées pendant plus de 30 minutes, ce qui est encore plus que la durée totale de 200 tâches dans 10 exécutions de DAG l'une après l'autre.

Ce sont des indicateurs indiquant que la file d'attente est remplie et que les ressources sont insuffisantes pour traiter toutes les tâches planifiées.

Consolider vos tâches

Le code actuel crée de nombreux DAG et tâches sans ressources suffisantes pour traiter toutes les tâches en parallèle, ce qui entraîne le remplissage de la file d'attente. Conserver des tâches dans la file d'attente trop longtemps peut entraîner la replanification ou l'échec des tâches. Dans ce cas, opter pour un plus petit nombre de tâches plus consolidées.

L'exemple de DAG suivant fait passer le nombre de tâches de l'exemple initial de 200 à 20 et augmente le temps d'attente de 1 à 10 secondes afin d'imiter des tâches plus consolidées effectuant le même volume de travail.

Importez l'exemple de DAG suivant dans l'environnement que vous avez créé. Dans ce tutoriel, ce DAG est nommé dag_10_tasks_20_seconds_10.

import time
from datetime import datetime, timedelta

from airflow import DAG
from airflow.decorators import task

tasks_amount = 20
seconds = 10
minutes = 5

with DAG(
    dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}",
    start_date=datetime(2021, 12, 22, 20, 0),
    end_date=datetime(2021, 12, 22, 20, 49),
    schedule_interval=timedelta(minutes=minutes),
    catchup=True,
) as dag:

    @task
    def create_subtasks(seconds: int) -> None:
        time.sleep(seconds)

    for i in range(tasks_amount):
        create_subtasks(seconds)

Évaluez l'impact de tâches plus consolidées sur les processus de planification:

  1. Attendez la fin de l'exécution du DAG.

  2. Dans l'interface utilisateur d'Airflow, sur la page DAG, cliquez sur le DAG dag_10_tasks_20_seconds_10. 10 exécutions du DAG s'affichent, chacune comportant 20 tâches réussies.

  3. Dans la console Google Cloud, accédez à la page Environnements.

    Accéder à la page Environnements

  4. Dans la liste des environnements, cliquez sur le nom de votre environnement. La page Détails de l'environnement s'ouvre.

  5. Accédez à l'onglet Journaux, puis à Tous les journaux > Journaux Airflow > Nœuds de calcul > Afficher dans l'explorateur de journaux.

    Le deuxième exemple, qui comportait des tâches plus consolidées, a généré environ 10 avertissements et 7 erreurs. Sur l'histogramme, vous pouvez comparer le nombre d'erreurs et d'avertissements de l'exemple initial (valeurs précédentes) et du deuxième exemple (valeurs suivantes).

    L'histogramme des journaux de nœuds de calcul Airflow avec les erreurs et les avertissements indique la diminution du nombre d'erreurs et d'avertissements une fois les tâches consolidées
    Figure 6 : Histogramme des journaux des journaux de nœud de calcul Airflow après le regroupement des tâches (cliquez pour agrandir)

    En comparant le premier exemple au premier exemple, vous pouvez constater que le deuxième exemple comporte beaucoup moins d'erreurs et d'avertissements. Toutefois, les mêmes erreurs liées à l'arrêt à chaud apparaissent toujours dans les journaux en raison de la surcharge des ressources.

  6. Dans l'onglet Surveillance, sélectionnez Nœuds de calcul et examinez les graphiques.

    Lorsque vous comparez le graphique Tâches Airflow du premier exemple (valeurs précédentes) au graphique du deuxième exemple avec des tâches plus consolidées, vous pouvez constater que le pic des tâches en file d'attente a duré moins longtemps, lorsque les tâches ont été plus consolidées. Cependant, elle a duré près de 10 minutes, ce qui n'est toujours pas optimal.

    Le graphique des tâches Airflow au fil du temps montre que le pic de tâches Airflow a duré moins longtemps qu'auparavant.
    Figure 7 : Graphique des tâches Airflow après le regroupement des tâches (cliquez pour agrandir)

    Sur le graphique "Nœuds de calcul actifs", vous pouvez voir que le premier exemple (à gauche du graphique) a utilisé des ressources pendant une période beaucoup plus longue que le second, même si les deux exemples imitent le même volume de travail.

    Le graphique des nœuds de calcul Airflow actifs au fil du temps montre que le nombre de nœuds de calcul actifs a augmenté pendant une période plus courte qu'auparavant.
    Figure 8 : Graphique des nœuds de calcul actifs après le regroupement des tâches (cliquez pour agrandir)

    Examinez les graphiques de consommation des ressources de nœud de calcul. Bien que la différence entre les ressources utilisées dans l'exemple avec des tâches plus consolidées et l'exemple initial soit assez importante, l'utilisation du processeur atteint toujours 70% de la limite.

    Le graphique d'utilisation du processeur par les nœuds de calcul Airflow montre une augmentation de l'utilisation du processeur pouvant atteindre 70% de la limite maximale
    Figure 9 : Graphique de l'utilisation totale du processeur par les nœuds de calcul après le regroupement des tâches (cliquez pour agrandir)
    Le graphique d'utilisation de la mémoire par les nœuds de calcul Airflow indique que l'utilisation de la mémoire augmente, mais n'atteint pas la limite maximale
    Figure 10 : Graphique de l'utilisation totale de la mémoire des nœuds de calcul après le regroupement des tâches (cliquez pour agrandir)

Répartir les tâches plus uniformément dans le temps

Un trop grand nombre de tâches simultanées entraîne le remplissage de la file d'attente, ce qui entraîne le blocage ou la replanification des tâches. Au cours des étapes précédentes, vous avez réduit le nombre de tâches en regroupant ces tâches. Toutefois, les journaux de sortie et la surveillance ont indiqué que le nombre de tâches simultanées n'était toujours pas optimal.

Vous pouvez contrôler le nombre d'exécutions simultanées de tâches en implémentant une planification ou en définissant des limites pour le nombre de tâches pouvant être exécutées simultanément.

Dans ce tutoriel, vous allez répartir les tâches de manière plus uniforme dans le temps en ajoutant des paramètres au niveau du DAG dans le DAG dag_10_tasks_20_seconds_10:

  1. Ajout de l'argument max_active_runs=1 au gestionnaire de contexte des DAG. Cet argument définit une limite d'une seule instance d'exécution du DAG à un moment donné.

  2. Ajout de l'argument max_active_tasks=5 au gestionnaire de contexte des DAG. Cet argument contrôle le nombre maximal d'instances de tâche pouvant être exécutées simultanément dans chaque DAG.

Importez l'exemple de DAG suivant dans l'environnement que vous avez créé. Dans ce tutoriel, ce DAG est nommé dag_10_tasks_20_seconds_10_scheduled.py.

import time
from datetime import datetime, timedelta

from airflow import DAG
from airflow.decorators import task

tasks_amount = 20
seconds = 10
minutes = 5
active_runs = 1
active_tasks = 5

with DAG(
    dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}_runs_{active_runs}_tasks_{active_tasks}",
    start_date=datetime(2021, 12, 22, 20, 0),
    end_date=datetime(2021, 12, 22, 20, 49),
    schedule_interval=timedelta(minutes=minutes),
    max_active_runs=active_runs,
    max_active_tasks=active_tasks,
    catchup=True,
) as dag:

    @task
    def create_subtasks(seconds: int) -> None:
        time.sleep(seconds)

    for i in range(tasks_amount):
        create_subtasks(seconds)

Évaluez l'impact de la répartition des tâches au fil du temps sur les processus de planification:

  1. Attendez la fin de l'exécution du DAG.

  2. Dans la console Google Cloud, accédez à la page Environnements.

    Accéder à la page Environnements

  3. Dans la liste des environnements, cliquez sur le nom de votre environnement. La page Détails de l'environnement s'ouvre.

  4. Accédez à l'onglet Journaux, puis à Tous les journaux > Journaux Airflow > Nœuds de calcul > Afficher dans l'explorateur de journaux.

  5. Sur l'histogramme, vous pouvez constater que le troisième DAG avec un nombre limité de tâches et d'exécutions actives n'a généré aucun avertissement ni erreur, et que la distribution des journaux est plus homogène par rapport aux valeurs précédentes.

    L'histogramme des journaux de nœuds de calcul Airflow avec les erreurs et les avertissements n'affiche aucune erreur ni aucun avertissement une fois les tâches consolidées et distribuées au fil du temps.
    Figure 11 : Histogramme des journaux de nœuds de calcul Airflow après consolidation et répartition des tâches (cliquez pour agrandir)

Les tâches de l'exemple dag_10_tasks_20_seconds_10_scheduled, qui comportent un nombre limité de tâches actives et d'exécutions, n'entraînent pas de pression sur les ressources, car elles étaient mises en file d'attente de manière uniforme.

Après avoir suivi les étapes décrites, vous avez optimisé l'utilisation des ressources en consolidant les petites tâches et en les répartissant de manière plus uniforme dans le temps.

Configurations d'environnement Optimize

Vous pouvez ajuster les configurations de votre environnement pour vous assurer que les nœuds de calcul Airflow ont toujours la capacité d'exécuter des tâches en file d'attente.

Nombre de nœuds de calcul et simultanéité de nœuds de calcul

Vous pouvez ajuster le nombre maximal de nœuds de calcul pour que Cloud Composer effectue automatiquement le scaling de votre environnement dans les limites définies.

Le paramètre [celery]worker_concurrency définit le nombre maximal de tâches qu'un seul nœud de calcul peut récupérer dans la file d'attente de tâches. La modification de ce paramètre ajuste le nombre de tâches qu'un seul nœud de calcul peut exécuter en même temps. Vous pouvez modifier cette option de configuration Airflow en la remplaçant. Par défaut, la simultanéité des nœuds de calcul est définie sur au minimum l'une des valeurs suivantes: 32, 12 * worker_CPU, 8 * worker_memory, ce qui signifie qu'elle dépend des limites des ressources des nœuds de calcul. Pour en savoir plus sur les valeurs de simultanéité des nœuds de calcul par défaut, consultez la section Optimiser les environnements.

Le nombre de nœuds de calcul et leur simultanéité fonctionnent conjointement, et les performances de votre environnement dépendent fortement des deux paramètres. Vous pouvez prendre en compte les éléments suivants pour choisir la bonne combinaison:

  • Plusieurs tâches rapides exécutées en parallèle. Vous pouvez augmenter la simultanéité des nœuds de calcul lorsque des tâches sont en attente dans la file d'attente et qu'ils utilisent un faible pourcentage de processeurs et de mémoire en même temps. Toutefois, dans certaines circonstances, la file d'attente peut ne jamais se remplir, ce qui empêche l'autoscaling de se déclencher. Si l'exécution des petites tâches se termine avant que les nouveaux nœuds de calcul soient prêts, un nœud de calcul existant peut récupérer les tâches restantes, et il n'y aura plus aucune tâche pour les nouveaux nœuds de calcul.

    Dans ces situations, il est recommandé d'augmenter le nombre minimal de nœuds de calcul et d'augmenter la simultanéité des nœuds de calcul pour éviter un scaling excessif.

  • Plusieurs tâches longues exécutées en parallèle. La simultanéité élevée des nœuds de calcul empêche le système de faire évoluer le nombre de nœuds de calcul. Si plusieurs tâches consomment beaucoup de ressources et prennent beaucoup de temps, une simultanéité de nœud de calcul élevée peut empêcher le remplissage de la file d'attente et empêcher la récupération de toutes les tâches par un seul nœud de calcul, ce qui entraîne des problèmes de performances. Dans ces situations, il est recommandé d'augmenter le nombre maximal de nœuds de calcul et de réduire leur simultanéité.

L'importance du parallélisme

Les programmeurs Airflow contrôlent la planification des exécutions de DAG et des tâches individuelles à partir des DAG. L'option de configuration Airflow [core]parallelism contrôle le nombre de tâches que le programmeur Airflow peut mettre en file d'attente dans la file d'attente de l'exécuteur une fois que toutes les dépendances pour ces tâches sont remplies.

Le parallélisme est un mécanisme de protection d'Airflow qui détermine le nombre de tâches pouvant être exécutées simultanément pour chaque programmeur, quel que soit le nombre de nœuds de calcul. La valeur de parallélisme, multipliée par le nombre de programmeurs dans votre cluster, correspond au nombre maximal d'instances de tâche que votre environnement peut mettre en file d'attente.

En général, [core]parallelism est défini comme le produit d'un nombre maximal de nœuds de calcul et de [celery]worker_concurrency. Il est également affecté par le pool. Vous pouvez modifier cette option de configuration Airflow en la remplaçant. Pour en savoir plus sur l'ajustement des configurations Airflow liées au scaling, consultez la page Effectuer le scaling de la configuration Airflow.

Trouver les configurations d'environnement optimales

La méthode recommandée pour résoudre les problèmes de planification consiste à regrouper les petites tâches en tâches plus importantes et à les répartir de manière plus uniforme au fil du temps. En plus d'optimiser le code du DAG, vous pouvez également optimiser les configurations d'environnement afin de disposer d'une capacité suffisante pour exécuter plusieurs tâches simultanément.

Par exemple, supposons que vous consolidez les tâches dans votre DAG autant que possible, mais que limiter les tâches actives pour les répartir plus uniformément au fil du temps n'est pas la solution préférée pour votre cas d'utilisation spécifique.

Vous pouvez ajuster les paramètres de parallélisme, du nombre de nœuds de calcul et de simultanéité des nœuds de calcul pour exécuter le DAG dag_10_tasks_20_seconds_10 sans limiter les tâches actives. Dans cet exemple, le DAG s'exécute 10 fois, et chaque exécution contient 20 petites tâches. Si vous souhaitez les exécuter simultanément, procédez comme suit:

  • Vous aurez besoin d'une taille d'environnement plus importante, car elle contrôle les paramètres de performance de l'infrastructure Cloud Composer gérée de votre environnement.

  • Les nœuds de calcul Airflow doivent pouvoir exécuter 20 tâches simultanément, ce qui signifie que vous devez définir la simultanéité des nœuds de calcul sur 20.

  • Les nœuds de calcul ont besoin de suffisamment de processeurs et de mémoire pour gérer toutes les tâches. La simultanéité des nœuds de calcul est affectée par le processeur et la mémoire des nœuds de calcul. Vous aurez donc besoin d'au moins worker_concurrency / 12 en processeur et de least worker_concurrency / 8 en mémoire.

  • Vous devez augmenter le parallélisme pour correspondre à la simultanéité supérieure des nœuds de calcul. Pour que les nœuds de calcul puissent récupérer 20 tâches dans la file d'attente, le programmeur doit d'abord les planifier.

Ajustez les configurations de votre environnement de la manière suivante:

  1. Dans la console Google Cloud, accédez à la page Environnements.

    Accéder à la page Environnements

  2. Dans la liste des environnements, cliquez sur le nom de votre environnement. La page Détails de l'environnement s'ouvre.

  3. Accédez à l'onglet Configuration de l'environnement.

  4. Recherchez la configuration Ressources > Charges de travail, puis cliquez sur Modifier.

  5. Dans le champ Mémoire de la section Nœud de calcul, spécifiez la nouvelle limite de mémoire pour les nœuds de calcul Airflow. Dans ce tutoriel, utilisez 4 Go.

  6. Dans le champ Processeur, spécifiez la nouvelle limite de processeur pour les nœuds de calcul Airflow. Dans ce tutoriel, utilisez deux processeurs virtuels.

  7. Enregistrez les modifications et patientez quelques minutes pour que vos nœuds de calcul Airflow redémarrent.

Ensuite, remplacez les options de configuration Airflow de parallélisme et de simultanéité des nœuds de calcul:

  1. Accédez à l'onglet Remplacements de configuration Airflow.

  2. Cliquez sur Modifier, puis sur Ajouter un remplacement de configuration Airflow.

  3. Ignorer la configuration du parralélisme:

    Section Clé Value (Valeur)
    core parallelism 20
  4. Cliquez sur Ajouter un remplacement de configuration Airflow et remplacez la configuration de simultanéité des nœuds de calcul:

    Section Clé Value (Valeur)
    celery worker_concurrency 20
  5. Cliquez sur Enregistrer et attendez que la configuration de l'environnement soit mise à jour.

Déclenchez à nouveau le même exemple de DAG avec les configurations ajustées:

  1. Dans l'interface utilisateur d'Airflow, accédez à la page DAG.

  2. Recherchez le DAG dag_10_tasks_20_seconds_10, puis supprimez-le.

    Une fois le DAG supprimé, Airflow vérifie le dossier des DAG dans le bucket de votre environnement, puis réexécute automatiquement le DAG.

Une fois les exécutions du DAG terminées, examinez à nouveau l'histogramme des journaux. Sur le schéma, vous pouvez voir que l'exemple dag_10_tasks_20_seconds_10 avec des tâches plus consolidées n'a généré aucune erreur ni aucun avertissement lors de l'exécution avec la configuration de l'environnement ajustée. Comparez les résultats aux données précédentes du schéma, où le même exemple a généré des erreurs et des avertissements lors de l'exécution avec la configuration de l'environnement par défaut.

L'histogramme des journaux de nœuds de calcul Airflow avec les erreurs et les avertissements ne montre aucune erreur ni aucun avertissement une fois la configuration de l'environnement ajustée.
Figure 12 : Histogramme des journaux de nœud de calcul Airflow après ajustement de la configuration de l'environnement (cliquez pour agrandir)

Les configurations d'environnement et Airflow jouent un rôle crucial dans la planification des tâches. Toutefois, il n'est pas possible d'augmenter le nombre de configurations au-delà de certaines limites.

Nous vous recommandons d'optimiser le code du DAG, de consolider les tâches et d'utiliser la planification pour optimiser les performances et l'efficacité.

Exemple: erreurs d'analyse du DAG et latence en raison du code complexe du DAG

Dans cet exemple, vous allez examiner la latence d'analyse d'un exemple de DAG imitant un excès de variables Airflow.

Créer une variable Airflow

Avant d'importer l'exemple de code, créez une variable Airflow.

  1. Dans la console Google Cloud, accédez à la page Environnements.

    Accéder à la page Environnements

  2. Dans la colonne Serveur Web Airflow, cliquez sur le lien Airflow correspondant à votre environnement.

  3. Accédez à Admin > Variables > Ajouter un nouvel enregistrement.

  4. Définissez les valeurs suivantes :

    • clé: example_var
    • val: test_airflow_variable

Importer l'exemple de DAG dans votre environnement

Importez l'exemple de DAG suivant dans l'environnement que vous avez créé aux étapes précédentes. Dans ce tutoriel, ce DAG est nommé dag_for_loop_airflow_variable.

Ce DAG contient une boucle For qui s'exécute 1 000 fois et imite un excès de variables Airflow. Chaque itération lit la variable example_var et génère une tâche. Chaque tâche contient une commande qui affiche la valeur de la variable.

from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.models import Variable

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 2, 17),
    'retries': 0
}

dag = DAG(
    'dag_for_loop_airflow_variable',
    default_args=default_args,
    catchup=False,
    schedule_interval="@daily"
)

for i in range(1000):
    a = Variable.get('example_var', 'N/A')
    task = BashOperator(
        task_id=f'task_{i}',
        bash_command='echo variable foo=${foo_env}',
        dag=dag,
        env={'foo_env': a}
    )

Diagnostiquer les problèmes d'analyse

La durée d'analyse du DAG correspond au temps nécessaire au programmeur Airflow pour lire un fichier DAG et l'analyser. Avant que le programmeur Airflow puisse programmer une tâche à partir d'un DAG, il doit analyser le fichier DAG pour découvrir la structure de ce DAG et les tâches définies.

Si l'analyse d'un DAG prend beaucoup de temps, cela consomme la capacité du programmeur et peut réduire les performances d'exécution du DAG.

Pour surveiller la durée d'analyse du DAG, procédez comme suit:

  1. Exécutez la commande CLI Airflow dags report dans gcloud CLI pour afficher la durée d'analyse de tous vos DAG:

    gcloud composer environments run ENVIRONMENT_NAME \
        --location LOCATION \
        dags report
    

    Remplacez les éléments suivants :

    • ENVIRONMENT_NAME : nom de votre environnement
    • LOCATION: région dans laquelle se trouve l'environnement.
  2. Dans le résultat de la commande, recherchez la valeur de durée du DAG dag_for_loop_airflow_variables. Une valeur élevée peut indiquer que ce DAG n'est pas implémenté de manière optimale. Si vous avez plusieurs DAG, vous pouvez identifier ceux qui ont une longue durée d'analyse à partir de la table de sortie.

    Exemple :

    file                | duration       | dag_num | task_num | dags
    ====================+================+=========+==========+=====================
    /dag_for_loop_airfl | 0:00:14.773594 | 1       | 1000     | dag_for_loop_airflow
    ow_variable.py      |                |         |          | _variable
    /airflow_monitoring | 0:00:00.003035 | 1       | 1        | airflow_monitoring
    .py
    
    
  3. Inspectez les durées d'analyse des DAG dans la console Google Cloud:

    1. Dans la console Google Cloud, accédez à la page Environnements.

    Accéder à la page Environnements

  4. Dans la liste des environnements, cliquez sur le nom de votre environnement. La page Détails de l'environnement s'ouvre.

  5. Accédez à l'onglet Journaux, puis à Tous les journaux > Gestionnaire de processeur DAG.

  6. Examinez les journaux dag-processor-manager et identifiez les problèmes potentiels.

    Une entrée de journal pour l'exemple de DAG indique que le temps d'analyse du DAG est de 46,3 secondes
    Figure 13 : Les journaux du gestionnaire de processeur DAG indiquent les durées d'analyse du DAG (cliquez pour agrandir)

Si la durée totale d'analyse des DAG dépasse environ 10 secondes, vos programmeurs peuvent être surchargés d'analyse des DAG et ne peuvent pas exécuter les DAG efficacement.

Optimiser le code du DAG

Il est recommandé d'éviter l'utilisation inutile de code Python de "niveau supérieur" dans vos DAG. Les DAG comportant de nombreuses importations, variables et fonctions en dehors du DAG augmentent les temps d'analyse du programmeur Airflow. Cela réduit les performances et l'évolutivité de Cloud Composer et Airflow. Si les variables Airflow sont lues trop, la durée d'analyse est longue et la charge de la base de données est élevée. Si ce code se trouve dans un fichier DAG, ces fonctions s'exécutent à chaque pulsation du programmeur, ce qui peut être lent.

Les champs de modèle Airflow vous permettent d'incorporer les valeurs des variables Airflow et des modèles Jinja dans vos DAG. Cela évite l'exécution inutile de fonctions pendant les pulsations du programmeur.

Pour mieux implémenter l'exemple de DAG, évitez d'utiliser des variables Airflow au niveau du code Python de premier niveau des DAG. Transmettez plutôt des variables Airflow aux opérateurs existants via un modèle Jinja, ce qui retardera la lecture de la valeur jusqu'à l'exécution de la tâche.

Importez la nouvelle version de l'exemple de DAG dans votre environnement. Dans ce tutoriel, ce DAG est nommé dag_for_loop_airflow_variable_optimized.

from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 2, 17),
    'retries': 0
}

dag = DAG(
    'dag_for_loop_airflow_variable_optimized',
    default_args=default_args,
    catchup=False,
    schedule_interval='@daily'
)

for i in range(1000):
    task = BashOperator(
        task_id=f'bash_use_variable_good_{i}',
        bash_command='echo variable foo=${foo_env}',
        dag=dag,
        env={'foo_env': '{{ var.value.get("example_var") }}'},
    )

Inspectez la durée d'analyse du nouveau DAG:

  1. Attendez la fin de l'exécution du DAG.

  2. Exécutez à nouveau la commande dags report pour afficher la durée d'analyse de tous vos DAG:

    file                | duration       | dag_num | task_num | dags
    ====================+================+=========+==========+=====================
    /dag_for_loop_airfl | 0:00:37.000369 | 1       | 1000     | dag_for_loop_airflow
    ow_variable.py      |                |         |          | _variable
    /dag_for_loop_airfl | 0:00:01.109457 | 1       | 1000     | dag_for_loop_airflow
    ow_variable_optimiz |                |         |          | _variable_optimized
    ed.py               |                |         |          |
    /airflow_monitoring | 0:00:00.040510 | 1       | 1        | airflow_monitoring
    .py                 |                |         |          |
    
  3. Examinez à nouveau les journaux dag-processor-manager et analysez la durée d'analyse.

    Une entrée de journal pour l'exemple de DAG indique que le temps d'analyse du DAG est de 4,21 secondes
    Figure 14 : Les journaux du gestionnaire de processeur DAG indiquent les temps d'analyse du DAG après l'optimisation du code du DAG (cliquez pour agrandir)

En remplaçant les variables d'environnement par des modèles Airflow, vous avez simplifié le code du DAG et réduit la latence d'analyse d'environ 10 fois.

Optimiser les configurations de l'environnement Airflow

Le programmeur Airflow tente en permanence de déclencher de nouvelles tâches et analyse tous les DAG du bucket d'environnement. Si vos DAG ont une longue durée d'analyse et que le programmeur consomme beaucoup de ressources, vous pouvez optimiser les configurations du programmeur Airflow afin que celui-ci utilise les ressources plus efficacement.

Dans ce tutoriel, l'analyse des fichiers DAG prend beaucoup de temps. Les cycles d'analyse commencent à se chevaucher, ce qui épuise la capacité du programmeur. Dans notre exemple, l'analyse du premier exemple de DAG prend plus de cinq secondes. Vous allez donc configurer le programmeur pour qu'il s'exécute moins fréquemment afin d'utiliser les ressources plus efficacement. Vous allez ignorer l'option de configuration Airflow scheduler_heartbeat_sec. Cette configuration définit la fréquence d'exécution du programmeur (en secondes). Par défaut, la valeur est définie sur 5 secondes. Vous pouvez modifier cette option de configuration Airflow en la remplaçant.

Ignorez l'option de configuration Airflow scheduler_heartbeat_sec:

  1. Dans la console Google Cloud, accédez à la page Environnements.

    Accéder à la page Environnements

  2. Dans la liste des environnements, cliquez sur le nom de votre environnement. La page Détails de l'environnement s'ouvre.

  3. Accédez à l'onglet Remplacements de configuration Airflow.

  4. Cliquez sur Modifier, puis sur Ajouter un remplacement de configuration Airflow.

  5. Ignorez l'option de configuration Airflow:

    Section Clé Value (Valeur)
    scheduler scheduler_heartbeat_sec 10
  6. Cliquez sur Enregistrer et attendez que la configuration de l'environnement soit mise à jour.

Vérifiez les métriques du programmeur:

  1. Accédez à l'onglet Surveillance, puis sélectionnez Programmeurs.

  2. Dans le graphique Fréquence cardiaque du programmeur, cliquez sur le bouton Plus d'options (trois points), puis sur Afficher dans l'explorateur de métriques.

Le graphique des pulsations du programmeur montre que les pulsations cardiaques se produisent moins fréquemment
Figure 15 : Graphique de pulsation du programmeur (cliquez pour agrandir)

Sur le graphique, vous constaterez que le planificateur s'exécute deux fois moins fréquemment après que vous avez modifié la configuration par défaut de 5 secondes à 10 secondes. En réduisant la fréquence des pulsations, vous vous assurez que le programmeur ne démarre pas tant que le cycle d'analyse précédent est en cours et que sa capacité de ressources n'est pas épuisée.

Attribuer plus de ressources au programmeur

Dans Cloud Composer 2, vous pouvez allouer davantage de ressources de processeur et de mémoire au programmeur. De cette manière, vous pouvez augmenter les performances de votre programmeur et réduire le temps d'analyse de votre DAG.

Allouez du processeur et de la mémoire supplémentaires au programmeur:

  1. Dans la console Google Cloud, accédez à la page Environnements.

    Accéder à la page Environnements

  2. Dans la liste des environnements, cliquez sur le nom de votre environnement. La page Détails de l'environnement s'ouvre.

  3. Accédez à l'onglet Configuration de l'environnement.

  4. Recherchez la configuration Ressources > Charges de travail, puis cliquez sur Modifier.

  5. Dans le champ Memory (Mémoire) de la section Scheduler (Programmeur), spécifiez la nouvelle limite de mémoire. Dans ce tutoriel, utilisez 4 Go.

  6. Dans le champ CPU (Processeur), spécifiez la nouvelle limite de processeur. Dans ce tutoriel, utilisez deux processeurs virtuels.

  7. Enregistrez les modifications et patientez quelques minutes pour que vos programmeurs Airflow puissent redémarrer.

  8. Accédez à l'onglet Journaux, puis à Tous les journaux > Gestionnaire de processeur DAG.

  9. Consultez les journaux dag-processor-manager et comparez la durée d'analyse des exemples de DAG:

    Une entrée de journal de l'exemple de DAG indique que le temps d'analyse du DAG optimisé est de 1,5 seconde. Pour le DAG non optimisé, la durée d'analyse est de 28,71 secondes.
    Figure 16 : Les journaux du gestionnaire de processeur DAG indiquent les temps d'analyse des DAG après l'attribution d'autres ressources au programmeur (cliquez pour agrandir)

En attribuant davantage de ressources au programmeur, vous avez augmenté sa capacité et réduit la latence d'analyse de manière significative par rapport aux configurations de l'environnement par défaut. Avec davantage de ressources, le programmeur peut analyser les DAG plus rapidement, mais les coûts associés aux ressources Cloud Composer augmentent également. De plus, il n'est pas possible d'augmenter les ressources au-delà d'une certaine limite.

Nous vous recommandons de n'allouer des ressources qu'après avoir implémenté le code du DAG et les optimisations de configuration Airflow possibles.

Effectuer un nettoyage

Pour éviter que les ressources utilisées dans ce tutoriel soient facturées sur votre compte Google Cloud, supprimez le projet contenant les ressources, ou conservez le projet et supprimez les ressources individuelles.

Supprimer le projet

  1. Dans la console Google Cloud, accédez à la page Gérer les ressources.

    Accéder à la page Gérer les ressources

  2. Dans la liste des projets, sélectionnez le projet que vous souhaitez supprimer, puis cliquez sur Supprimer.
  3. Dans la boîte de dialogue, saisissez l'ID du projet, puis cliquez sur Arrêter pour supprimer le projet.

Supprimer des ressources individuelles

Si vous envisagez d'explorer plusieurs tutoriels et guides de démarrage rapide, réutiliser des projets peut vous aider à ne pas dépasser les limites de quotas des projets.

Supprimez l'environnement Cloud Composer. Vous allez également supprimer le bucket de l'environnement au cours de cette procédure.

Étapes suivantes