Déboguer les problèmes de planification des tâches

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Ce tutoriel vous explique comment diagnostiquer et résoudre les problèmes de planification et d'analyse des tâches qui entraînent un dysfonctionnement du planificateur, des erreurs et des latences d'analyse, ainsi que des échecs de tâche.

Introduction

Le planificateur Airflow est principalement affecté par deux facteurs: la planification des tâches et l'analyse des DAG. Les problèmes liés à l'un de ces facteurs peuvent avoir un impact négatif sur la santé et les performances de l'environnement.

Il arrive que trop de tâches soient planifiées simultanément. Dans ce cas, la file d'attente est remplie, et les tâches restent à l'état "planifiée" ou sont reprogrammées après avoir été mises en file d'attente, ce qui peut entraîner un échec de la tâche et une latence de performances.

Un autre problème courant est la latence d'analyse et les erreurs causées par la complexité d'un code DAG. Par exemple, un code DAG contenant des variables Airflow au niveau supérieur du code peut entraîner des retards d'analyse, une surcharge de la base de données, des échecs de planification et des délais avant expiration du DAG.

Dans ce tutoriel, vous allez diagnostiquer les exemples de DAG et apprendre à résoudre les problèmes de planification et d'analyse, à améliorer la planification des DAG et à optimiser le code et les configurations d'environnement de vos DAG pour améliorer les performances.

Objectifs

Cette section liste les objectifs des exemples de ce tutoriel.

Exemple: Dysfonctionnement du planificateur et latence causés par une forte concurrence de tâches

  • Importez l'exemple de DAG qui s'exécute plusieurs fois simultanément et diagnostiquez le dysfonctionnement du planificateur et les problèmes de latence avec Cloud Monitoring.

  • Optimisez votre code DAG en consolidant les tâches et évaluez l'impact sur les performances.

  • Répartissez les tâches de manière plus uniforme au fil du temps et évaluez l'impact sur les performances.

  • Optimisez vos configurations Airflow et vos configurations d'environnement, puis évaluez l'impact.

Exemple: erreurs d'analyse DAG et latence causées par un code complexe

  • Importez l'exemple de DAG avec des variables Airflow et diagnostiquez les problèmes d'analyse avec Cloud Monitoring.

  • Optimisez le code DAG en évitant les variables Airflow au niveau supérieur du code et évaluez l'impact sur le temps d'analyse.

  • Optimisez les configurations Airflow et les configurations d'environnement, puis évaluez l'impact sur le temps d'analyse.

Coûts

Ce tutoriel utilise les composants facturables suivants de Google Cloud:

Une fois que vous avez terminé ce tutoriel, évitez de continuer à payer des frais en supprimant les ressources que vous avez créées. Pour en savoir plus, consultez la section Effectuer un nettoyage.

Avant de commencer

Cette section décrit les actions requises avant de commencer le tutoriel.

Créer et configurer un projet

Pour ce tutoriel, vous avez besoin d'un projet Google Cloud. Configurez le projet comme suit:

  1. Dans la console Google Cloud, sélectionnez ou créez un projet:

    Accéder au sélecteur de projet

  2. Assurez-vous que la facturation est activée pour votre projet. Découvrez comment vérifier si la facturation est activée sur un projet.

  3. Assurez-vous que l' Google Cloud utilisateur de votre projet dispose des rôles suivants pour créer les ressources nécessaires:

    • Administrateur de l'environnement et des objets Storage (roles/composer.environmentAndStorageObjectAdmin)
    • Administrateur de Compute (roles/compute.admin)

Activer les API pour votre projet.

Enable the Cloud Composer API.

Enable the API

Créer votre environnement Cloud Composer

Créez un environnement Cloud Composer 2.

Lors de la création de l'environnement, vous attribuez le rôle Extension de l'agent de service de l'API Cloud Composer v2 (roles/composer.ServiceAgentV2Ext) au compte d'agent de service Composer. Cloud Composer utilise ce compte pour effectuer des opérations dans votre projet Google Cloud .

Exemple: Défaillance du planificateur et échec de la tâche en raison de problèmes de planification

Cet exemple illustre le dysfonctionnement du planificateur de débogage et la latence causée par une forte concurrence des tâches.

Importer l'exemple de DAG dans votre environnement

Importez l'exemple de DAG suivant dans l'environnement que vous avez créé lors des étapes précédentes. Dans ce tutoriel, ce DAG est nommé dag_10_tasks_200_seconds_1.

Ce DAG comporte 200 tâches. Chaque tâche attend une seconde et affiche "Terminé !". Le DAG est déclenché automatiquement une fois importé. Cloud Composer exécute ce DAG 10 fois, et toutes les exécutions de DAG se produisent en parallèle.

import time
from datetime import datetime, timedelta

from airflow import DAG
from airflow.decorators import task


tasks_amount = 200
seconds = 1
minutes = 5

with DAG(
    dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}",
    start_date=datetime(2023, 11, 22, 20, 0),
    end_date=datetime(2023, 11, 22, 20, 49),
    schedule_interval=timedelta(minutes=minutes),
    catchup=True,
) as dag:

    @task
    def create_subtasks(seconds: int) -> None:
        time.sleep(seconds)

    for i in range(tasks_amount):
        create_subtasks(seconds)

Diagnostiquer les problèmes de dysfonctionnement du planificateur et d'échec des tâches

Une fois l'exécution du DAG terminée, ouvrez l'interface utilisateur d'Airflow, puis cliquez sur le DAG dag_10_tasks_200_seconds_1. Vous constaterez que 10 exécutions de DAG ont réussi, et que chacune a 200 tâches réussies.

Consultez les journaux des tâches Airflow:

  1. Dans la console Google Cloud, accédez à la page Environnements.

    Accéder à la page Environnements

  2. Dans la liste des environnements, cliquez sur le nom de votre environnement. La page Détails de l'environnement s'ouvre.

  3. Accédez à l'onglet Journaux, puis à Tous les journaux > Journaux Airflow > Nœuds de calcul > Afficher dans l'explorateur de journaux.

Sur l'histogramme des journaux, vous pouvez voir les erreurs et les avertissements indiqués en rouge et en orange:

Histogramme des journaux des nœuds de calcul Airflow, avec les erreurs et les avertissements indiqués en rouge et en orange
Figure 1 Histogramme des journaux des nœuds de calcul Airflow (cliquez pour agrandir)

L'exemple de DAG a généré environ 130 avertissements et 60 erreurs. Cliquez sur une colonne contenant des barres jaunes et rouges. Certaines des avertissements et erreurs suivants s'affichent dans les journaux:

State of this instance has been externally set to success. Terminating
instance.

Received SIGTERM. Terminating subprocesses.

worker: Warm shutdown (MainProcess).

Ces journaux peuvent indiquer que l'utilisation des ressources a dépassé les limites et que le worker s'est redémarré.

Si une tâche Airflow est maintenue dans la file d'attente trop longtemps, le planificateur la marque comme ayant échoué et comme up_for_retry, puis la replanifie à nouveau pour l'exécution. Pour observer les symptômes de cette situation, vous pouvez consulter le graphique du nombre de tâches en file d'attente. Si les pics de ce graphique ne diminuent pas au bout d'environ 10 minutes, il est probable que des tâches échouent (sans journaux).

Examinez les informations de surveillance:

  1. Accédez à l'onglet Monitoring (Surveillance), puis sélectionnez Overview (Aperçu).

  2. Examinez le graphique Tâches Airflow.

    Graphique des tâches Airflow au fil du temps, montrant un pic du nombre de tâches en file d'attente
    Figure 2 : Graphique des tâches Airflow (cliquez pour agrandir)

    Dans le graphique des tâches Airflow, il existe un pic de tâches en file d'attente qui dure plus de 10 minutes, ce qui peut signifier qu'il n'y a pas suffisamment de ressources dans votre environnement pour traiter toutes les tâches planifiées.

  3. Examinez le graphique Nœuds de calcul actifs:

    Le graphique des nœuds de calcul Airflow actifs au fil du temps montre que le nombre de nœuds de calcul actifs a été augmenté jusqu'à la limite maximale.
    Figure 3 Graphique des nœuds de calcul actifs (cliquez pour agrandir)

    Le graphique Nœuds de calcul actifs indique que le DAG a déclenché l'autoscaling jusqu'à la limite maximale autorisée de trois nœuds de calcul lors de l'exécution du DAG.

  4. Les graphiques d'utilisation des ressources peuvent indiquer que les nœuds de calcul Airflow ne disposent pas de suffisamment de capacité pour exécuter les tâches en file d'attente. Dans l'onglet Surveillance, sélectionnez Nœuds de calcul, puis examinez les graphiques Utilisation totale du processeur des nœuds de calcul et Utilisation totale de la mémoire des nœuds de calcul.

    Le graphique de l'utilisation du processeur par les nœuds de calcul Airflow montre que l'utilisation du processeur augmente jusqu'à la limite maximale.
    Figure 4 Graphique de l'utilisation totale du processeur des nœuds de calcul (cliquez pour agrandir)
    Le graphique de l'utilisation de la mémoire par les nœuds de calcul Airflow montre que l'utilisation de la mémoire augmente, mais qu'elle n'atteint pas la limite maximale.
    Figure 5 Graphique de l'utilisation totale de la mémoire des nœuds de calcul (cliquez pour agrandir)

    Les graphiques indiquent que l'exécution d'un trop grand nombre de tâches simultanément a entraîné l'atteinte de la limite de processeur. Les ressources ont été utilisées pendant plus de 30 minutes, ce qui est encore plus long que la durée totale de 200 tâches dans 10 exécutions de DAG exécutées une par une.

Il s'agit d'indicateurs indiquant que la file d'attente est remplie et qu'il manque des ressources pour traiter toutes les tâches planifiées.

Consolider vos tâches

Le code actuel crée de nombreux DAG et tâches sans ressources suffisantes pour traiter toutes les tâches en parallèle, ce qui entraîne le remplissage de la file d'attente. Laisser les tâches dans la file d'attente trop longtemps peut entraîner leur reprogrammation ou leur échec. Dans de telles situations, il est préférable d'utiliser un nombre plus faible de tâches mieux consolidées.

L'exemple de DAG suivant modifie le nombre de tâches de l'exemple initial de 200 à 20 et augmente le temps d'attente de 1 à 10 secondes pour imiter des tâches plus consolidées qui effectuent la même quantité de travail.

Importez l'exemple de DAG suivant dans l'environnement que vous avez créé. Dans ce tutoriel, ce DAG est nommé dag_10_tasks_20_seconds_10.

import time
from datetime import datetime, timedelta

from airflow import DAG
from airflow.decorators import task


tasks_amount = 20
seconds = 10
minutes = 5

with DAG(
    dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}",
    start_date=datetime(2021, 12, 22, 20, 0),
    end_date=datetime(2021, 12, 22, 20, 49),
    schedule_interval=timedelta(minutes=minutes),
    catchup=True,
) as dag:

    @task
    def create_subtasks(seconds: int) -> None:
        time.sleep(seconds)

    for i in range(tasks_amount):
        create_subtasks(seconds)

Évaluez l'impact des tâches plus consolidées sur les processus de planification:

  1. Attendez que les exécutions du DAG soient terminées.

  2. Dans l'interface utilisateur d'Airflow, sur la page DAG, cliquez sur le DAG dag_10_tasks_20_seconds_10. Vous verrez 10 exécutions de DAG, chacune comportant 20 tâches réussies.

  3. Dans la console Google Cloud, accédez à la page Environnements.

    Accéder à la page Environnements

  4. Dans la liste des environnements, cliquez sur le nom de votre environnement. La page Détails de l'environnement s'ouvre.

  5. Accédez à l'onglet Journaux, puis à Tous les journaux > Journaux Airflow > Nœuds de calcul > Afficher dans l'explorateur de journaux.

    Le deuxième exemple avec des tâches plus consolidées a généré environ 10 avertissements et 7 erreurs. Sur l'histogramme, vous pouvez comparer le nombre d'erreurs et d'avertissements dans l'exemple initial (valeurs antérieures) et dans le deuxième exemple (valeurs ultérieures).

    L'histogramme des journaux des nœuds de calcul Airflow contenant des erreurs et des avertissements montre la diminution du nombre d'erreurs et d'avertissements après la consolidation des tâches.
    Figure 6 Histogramme des journaux des nœuds de calcul Airflow après la consolidation des tâches (cliquez pour agrandir)

    En comparant le premier exemple au plus consolidé, vous pouvez constater qu'il y a beaucoup moins d'erreurs et d'avertissements dans le second exemple. Toutefois, les mêmes erreurs liées à l'arrêt à chaud continuent d'apparaître dans les journaux en raison d'une surcharge de ressources.

  6. Dans l'onglet Surveillance, sélectionnez Nœuds de calcul, puis examinez les graphiques.

    Lorsque vous comparez le graphique des tâches Airflow du premier exemple (valeurs antérieures) avec celui du deuxième exemple avec des tâches plus consolidées, vous pouvez constater que le pic des tâches en file d'attente a duré moins longtemps lorsque les tâches étaient plus consolidées. Toutefois, il a duré près de 10 minutes, ce qui n'est pas optimal.

    Le graphique des tâches Airflow au fil du temps montre que le pic des tâches Airflow a duré moins longtemps qu'auparavant.
    Figure 7 Graphique des tâches Airflow après la consolidation des tâches (cliquez pour agrandir)

    Sur le graphique "Nœuds de calcul actifs", vous pouvez voir que le premier exemple (à gauche du graphique) a utilisé des ressources pendant une période beaucoup plus longue que le second, même si les deux exemples imitent la même quantité de travail.

    Le graphique des nœuds de calcul Airflow actifs au fil du temps montre que le nombre de nœuds de calcul actifs a augmenté pendant une période plus courte qu'auparavant.
    Figure 8 Graphique des nœuds de calcul actifs après la consolidation des tâches (cliquez pour agrandir)

    Examinez les graphiques de consommation des ressources des nœuds de calcul. Même si la différence entre les ressources utilisées dans l'exemple avec des tâches plus consolidées et l'exemple initial est assez importante, l'utilisation du processeur atteint toujours 70% de la limite.

    Le graphique de l'utilisation du processeur par les nœuds de calcul Airflow montre que l'utilisation du processeur augmente jusqu'à 70% de la limite maximale.
    Figure 9 Graphique de l'utilisation totale du processeur des nœuds de calcul après la consolidation des tâches (cliquez pour agrandir)
    Le graphique de l'utilisation de la mémoire par les nœuds de calcul Airflow montre que l'utilisation de la mémoire augmente, mais qu'elle n'atteint pas la limite maximale.
    Figure 10 Graphique de l'utilisation totale de la mémoire des nœuds de calcul après consolidation des tâches (cliquez pour agrandir)

Répartir les tâches de manière plus uniforme au fil du temps

Un trop grand nombre de tâches simultanées entraîne le remplissage de la file d'attente, ce qui entraîne le blocage des tâches dans la file d'attente ou leur reprogrammation. Dans les étapes précédentes, vous avez réduit le nombre de tâches en les consolidant. Toutefois, les journaux de sortie et la surveillance ont indiqué que le nombre de tâches simultanées était toujours sous-optimal.

Vous pouvez contrôler le nombre d'exécutions de tâches simultanées en implémentant un calendrier ou en définissant des limites sur le nombre de tâches pouvant être exécutées simultanément.

Dans ce tutoriel, vous allez répartir les tâches de manière plus uniforme au fil du temps en ajoutant des paramètres au niveau du DAG dans le DAG dag_10_tasks_20_seconds_10:

  1. Ajoutez l'argument max_active_runs=1 au gestionnaire de contexte DAG. Cet argument limite l'exécution d'une seule instance de DAG à un moment donné.

  2. Ajoutez l'argument max_active_tasks=5 au gestionnaire de contexte DAG. Cet argument contrôle le nombre maximal d'instances de tâche pouvant s'exécuter simultanément dans chaque DAG.

Importez l'exemple de DAG suivant dans l'environnement que vous avez créé. Dans ce tutoriel, ce DAG est nommé dag_10_tasks_20_seconds_10_scheduled.py.

import time
from datetime import datetime, timedelta

from airflow import DAG
from airflow.decorators import task


tasks_amount = 20
seconds = 10
minutes = 5
active_runs = 1
active_tasks = 5


with DAG(
    dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}_runs_{active_runs}_tasks_{active_tasks}",
    start_date=datetime(2021, 12, 22, 20, 0),
    end_date=datetime(2021, 12, 22, 20, 49),
    schedule_interval=timedelta(minutes=minutes),
    max_active_runs=active_runs,
    max_active_tasks=active_tasks,
    catchup=True,
) as dag:

    @task
    def create_subtasks(seconds: int) -> None:
        time.sleep(seconds)

    for i in range(tasks_amount):
        create_subtasks(seconds)

Évaluez l'impact de la distribution des tâches au fil du temps sur les processus de planification:

  1. Attendez que les exécutions du DAG soient terminées.

  2. Dans la console Google Cloud, accédez à la page Environnements.

    Accéder à la page Environnements

  3. Dans la liste des environnements, cliquez sur le nom de votre environnement. La page Détails de l'environnement s'ouvre.

  4. Accédez à l'onglet Journaux, puis à Tous les journaux > Journaux Airflow > Nœuds de calcul > Afficher dans l'explorateur de journaux.

  5. Sur l'histogramme, vous pouvez voir que le troisième DAG, qui comporte un nombre limité de tâches et d'exécutions actives, n'a généré aucune alerte ni aucune erreur, et que la distribution des journaux semble plus uniforme par rapport aux valeurs précédentes.

    L'histogramme des journaux des nœuds de calcul Airflow contenant des erreurs et des avertissements n'affiche aucune erreur ni avertissement une fois les tâches consolidées et réparties au fil du temps.
    Figure 11 Histogramme des journaux des nœuds de calcul Airflow après la consolidation et la distribution des tâches au fil du temps (cliquez pour agrandir)

Les tâches de l'exemple dag_10_tasks_20_seconds_10_scheduled, qui comporte un nombre limité de tâches actives et d'exécutions, n'ont pas exercé de pression sur les ressources, car les tâches étaient mises en file d'attente de manière uniforme.

Après avoir suivi les étapes décrites, vous avez optimisé l'utilisation des ressources en consolidant les petites tâches et en les répartissant plus uniformément au fil du temps.

Optimiser les configurations d'environnement

Vous pouvez ajuster les configurations de votre environnement pour vous assurer que les nœuds de calcul Airflow disposent toujours de la capacité nécessaire pour exécuter les tâches en file d'attente.

Nombre de nœuds de calcul et simultanéité des nœuds de calcul

Vous pouvez ajuster le nombre maximal de nœuds de calcul pour que Cloud Composer effectue un scaling automatique de votre environnement dans les limites définies.

Le paramètre [celery]worker_concurrency définit le nombre maximal de tâches qu'un seul nœud de calcul peut récupérer dans la file d'attente de tâches. La modification de ce paramètre ajuste le nombre de tâches qu'un seul nœud de calcul peut exécuter simultanément. Vous pouvez modifier cette option de configuration Airflow en la remplaçant. Par défaut, la simultanéité des nœuds de calcul est définie sur un minimum de 32, 12 * worker_CPU, 8 * worker_memory, ce qui signifie qu'elle dépend des limites de ressources des nœuds de calcul. Pour en savoir plus sur les valeurs de simultanéité par défaut des workers, consultez la section Optimiser les environnements.

Le nombre de nœuds de calcul et la concurrence des nœuds de calcul fonctionnent de concert, et les performances de votre environnement dépendent fortement de ces deux paramètres. Vous pouvez utiliser les considérations suivantes pour choisir la combinaison appropriée:

  • Exécution de plusieurs tâches rapides en parallèle Vous pouvez augmenter la concurrence des nœuds de calcul lorsque des tâches sont en attente dans la file d'attente et que vos nœuds de calcul utilisent un faible pourcentage de leurs allocations de processeur et de mémoire. Toutefois, dans certaines circonstances, la file d'attente peut ne jamais se remplir, ce qui empêche l'autoscaling de se déclencher. Si les petites tâches sont terminées au moment où les nouveaux nœuds de calcul sont prêts, un nœud de calcul existant peut prendre en charge les tâches restantes, et il n'y aura aucune tâche pour les nœuds de calcul nouvellement créés.

    Dans ce cas, nous vous recommandons d'augmenter le nombre minimal de nœuds de calcul et la simultanéité des nœuds de calcul pour éviter un scaling trop agressif.

  • Exécution de plusieurs tâches longues en parallèle La simultanéité élevée des nœuds de calcul empêche le système d'ajuster le nombre de nœuds de calcul. Si plusieurs tâches sont gourmandes en ressources et prennent beaucoup de temps à s'exécuter, une simultanéité élevée des nœuds de calcul peut entraîner la non-saturation de la file d'attente et la prise en charge de toutes les tâches par un seul nœud de calcul, ce qui entraîne des problèmes de performances. Dans ces situations, il est recommandé d'augmenter le nombre maximal de nœuds de calcul et de réduire leur simultanéité.

L'importance du parallélisme

Les programmeurs Airflow contrôlent la planification des exécutions DAG et les tâches individuelles des DAG. L'option de configuration Airflow [core]parallelism contrôle le nombre de tâches que le programmeur Airflow peut mettre en file d'attente dans la file d'attente de l'exécuteur une fois que toutes les dépendances pour ces tâches sont satisfaites.

Le parallélisme est un mécanisme de protection d'Airflow qui détermine le nombre de tâches pouvant être exécutées en même temps par chaque planificateur, quel que soit le nombre de nœuds de calcul. La valeur de parallélisme, multipliée par le nombre de planificateurs de votre cluster, correspond au nombre maximal d'instances de tâche que votre environnement peut mettre en file d'attente.

En règle générale, [core]parallelism est défini comme un produit du nombre maximal de nœuds de calcul et de [celery]worker_concurrency. Il est également affecté par le pool. Vous pouvez modifier cette option de configuration Airflow en la remplaçant. Pour en savoir plus sur l'ajustement des configurations Airflow liées à la mise à l'échelle, consultez la section Mettre à l'échelle la configuration Airflow.

Trouver des configurations d'environnement optimales

Pour résoudre les problèmes de planification, nous vous recommandons de regrouper les petites tâches en tâches plus importantes et de les répartir de manière plus uniforme au fil du temps. En plus d'optimiser le code DAG, vous pouvez également optimiser les configurations d'environnement pour disposer d'une capacité suffisante pour exécuter plusieurs tâches simultanément.

Par exemple, supposons que vous consolidiez autant que possible les tâches dans votre DAG, mais que limiter les tâches actives pour les répartir plus uniformément dans le temps ne soit pas la solution privilégiée pour votre cas d'utilisation spécifique.

Vous pouvez ajuster les paramètres de parallélisme, du nombre de nœuds de calcul et de la concurrence des nœuds de calcul pour exécuter le DAG dag_10_tasks_20_seconds_10 sans limiter les tâches actives. Dans cet exemple, le DAG s'exécute 10 fois, et chaque exécution contient 20 petites tâches. Si vous souhaitez les exécuter simultanément:

  • Vous aurez besoin d'une taille d'environnement plus importante, car elle contrôle les paramètres de performances de l'infrastructure Cloud Composer gérée de votre environnement.

  • Les nœuds de calcul Airflow doivent pouvoir exécuter 20 tâches simultanément, ce qui signifie que vous devez définir la concurrence des nœuds de calcul sur 20.

  • Les nœuds de calcul ont besoin de suffisamment de processeur et de mémoire pour gérer toutes les tâches. La simultanéité des nœuds de calcul est affectée par le processeur et la mémoire des nœuds de calcul. Vous aurez donc besoin d'au moins worker_concurrency / 12 en CPU et least worker_concurrency / 8 en mémoire.

  • Vous devrez augmenter le parallélisme pour qu'il corresponde à la simultanéité des nœuds de calcul plus élevée. Pour que les nœuds de calcul récupèrent 20 tâches de la file d'attente, le planificateur doit d'abord planifier ces 20 tâches.

Ajustez les configurations de votre environnement comme suit:

  1. Dans la console Google Cloud, accédez à la page Environnements.

    Accéder à la page Environnements

  2. Dans la liste des environnements, cliquez sur le nom de votre environnement. La page Détails de l'environnement s'ouvre.

  3. Accédez à l'onglet Configuration de l'environnement.

  4. Accédez à la configuration Ressources > Charges de travail, puis cliquez sur Modifier.

  5. Dans la section Worker (Nœud de calcul), dans le champ Memory (Mémoire), spécifiez la nouvelle limite de mémoire pour les nœuds de calcul Airflow. Dans ce tutoriel, utilisez 4 Go.

  6. Dans le champ CPU, spécifiez la nouvelle limite de processeur pour les nœuds de calcul Airflow. Dans ce tutoriel, utilisez deux vCPU.

  7. Enregistrez les modifications et attendez quelques minutes pour que vos nœuds de calcul Airflow redémarrent.

Ensuite, remplacez les options de configuration Airflow pour le parallélisme et la simultanéité des nœuds de calcul:

  1. Accédez à l'onglet Remplacements de configuration Airflow.

  2. Cliquez sur Modifier, puis sur Ajouter un remplacement de configuration Airflow.

  3. Remplacez la configuration de parallélisme:

    Section Clé Valeur
    core parallelism 20
  4. Cliquez sur Add Airflow Configuration Override (Ajouter un remplacement de configuration Airflow) et remplacez la configuration de la concurrence des workers:

    Section Clé Valeur
    celery worker_concurrency 20
  5. Cliquez sur Enregistrer et attendez que l'environnement mette à jour sa configuration.

Déclenchez à nouveau l'exemple de DAG avec les configurations ajustées:

  1. Dans l'interface utilisateur d'Airflow, accédez à la page DAG.

  2. Recherchez le DAG dag_10_tasks_20_seconds_10, puis supprimez-le.

    Une fois le DAG supprimé, Airflow vérifie le dossier des DAG dans le bucket de votre environnement et exécute automatiquement le DAG à nouveau.

Une fois les exécutions du DAG terminées, examinez à nouveau l'histogramme des journaux. Sur le diagramme, vous pouvez voir que l'exemple dag_10_tasks_20_seconds_10 avec des tâches plus consolidées n'a généré aucune erreur ni avertissement lors de l'exécution avec la configuration d'environnement ajustée. Comparez les résultats aux données précédentes du diagramme, où le même exemple a généré des erreurs et des avertissements lors de l'exécution avec la configuration d'environnement par défaut.

L'histogramme des journaux des nœuds de calcul Airflow contenant des erreurs et des avertissements n'affiche aucune erreur ni avertissement après avoir ajusté la configuration de l'environnement.
Figure 12 Histogramme des journaux des nœuds de calcul Airflow après l'ajustement de la configuration de l'environnement (cliquez pour agrandir)

Les configurations d'environnement et les configurations Airflow jouent un rôle crucial dans la planification des tâches. Toutefois, il n'est pas possible d'augmenter les configurations au-delà de certaines limites.

Nous vous recommandons d'optimiser le code DAG, de regrouper les tâches et d'utiliser la planification pour optimiser les performances et l'efficacité.

Exemple: erreurs d'analyse et latence du DAG en raison d'un code DAG complexe

Dans cet exemple, vous allez examiner la latence d'analyse d'un exemple de DAG qui imite un excès de variables Airflow.

Créer une variable Airflow

Avant d'importer l'exemple de code, créez une variable Airflow.

  1. Dans la console Google Cloud, accédez à la page Environnements.

    Accéder à la page Environnements

  2. Dans la colonne Serveur Web Airflow, cliquez sur le lien Airflow correspondant à votre environnement.

  3. Accédez à Administration > Variables > Ajouter un enregistrement.

  4. Définissez les valeurs suivantes :

    • key: example_var
    • val: test_airflow_variable

Importer l'exemple de DAG dans votre environnement

Importez l'exemple de DAG suivant dans l'environnement que vous avez créé lors des étapes précédentes. Dans ce tutoriel, ce DAG est nommé dag_for_loop_airflow_variable.

Ce DAG contient une boucle for qui s'exécute 1 000 fois et imite un excès de variables Airflow. Chaque itération lit la variable example_var et génère une tâche. Chaque tâche contient une commande qui affiche la valeur de la variable.

from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.models import Variable

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 2, 17),
    'retries': 0
}

dag = DAG(
    'dag_for_loop_airflow_variable',
    default_args=default_args,
    catchup=False,
    schedule_interval="@daily"
)

for i in range(1000):
    a = Variable.get('example_var', 'N/A')
    task = BashOperator(
        task_id=f'task_{i}',
        bash_command='echo variable foo=${foo_env}',
        dag=dag,
        env={'foo_env': a}
    )

Diagnostiquer les problèmes d'analyse

Le temps d'analyse du DAG correspond au temps nécessaire au planificateur Airflow pour lire et analyser un fichier DAG. Avant de pouvoir planifier une tâche d'un DAG, le programmeur Airflow doit analyser le fichier DAG pour découvrir la structure du DAG et les tâches définies.

Si l'analyse d'un DAG prend beaucoup de temps, cela consomme les capacités du planificateur et peut réduire les performances des exécutions DAG.

Pour surveiller la durée d'analyse des DAG:

  1. Exécutez la commande de CLI Airflow dags report dans la CLI gcloud pour afficher le temps d'analyse de tous vos DAG:

    gcloud composer environments run ENVIRONMENT_NAME \
        --location LOCATION \
        dags report
    

    Remplacez les éléments suivants :

    • ENVIRONMENT_NAME : nom de votre environnement
    • LOCATION: région où se trouve l'environnement.
  2. Dans le résultat de la commande, recherchez la valeur de durée du DAG dag_for_loop_airflow_variables. Une valeur élevée peut indiquer que ce DAG n'est pas mis en œuvre de manière optimale. Si vous disposez de plusieurs DAG, la table de sortie vous permet d'identifier ceux dont la durée d'analyse est longue.

    Exemple :

    file                | duration       | dag_num | task_num | dags
    ====================+================+=========+==========+=====================
    /dag_for_loop_airfl | 0:00:14.773594 | 1       | 1000     | dag_for_loop_airflow
    ow_variable.py      |                |         |          | _variable
    /airflow_monitoring | 0:00:00.003035 | 1       | 1        | airflow_monitoring
    .py
    
    
  3. Inspecter les durées d'analyse des DAG dans la console Google Cloud:

    1. Dans la console Google Cloud, accédez à la page Environnements.

    Accéder à la page Environnements

  4. Dans la liste des environnements, cliquez sur le nom de votre environnement. La page Détails de l'environnement s'ouvre.

  5. Accédez à l'onglet Journaux, puis à Tous les journaux > Gestionnaire de traitement des DAG.

  6. Examinez les journaux dag-processor-manager et identifiez les problèmes potentiels.

    Une entrée de journal pour l'exemple de DAG indique que la durée d'analyse du DAG est de 46,3 secondes.
    Figure 13 Les journaux du gestionnaire de processeur DAG affichent les durées d'analyse des DAG (cliquez pour agrandir)

Si la durée d'analyse totale des DAG dépasse environ 10 secondes, vos planificateurs peuvent être surchargés par l'analyse des DAG et ne pas pouvoir exécuter efficacement les DAG.

Optimiser le code du DAG

Nous vous recommandons d'éviter le code Python "de niveau supérieur" inutile dans vos DAG. Les DAG contenant de nombreuses importations, variables et fonctions en dehors du DAG entraînent des temps d'analyse plus longs pour le planificateur Airflow. Cela réduit les performances et l'évolutivité de Cloud Composer et d'Airflow. Un excès de lecture de variables Airflow entraîne une longue durée d'analyse et une charge de base de données élevée. Si ce code se trouve dans un fichier DAG, ces fonctions s'exécutent à chaque battement de cœur du planificateur, ce qui peut être lent.

Les champs de modèle d'Airflow vous permettent d'intégrer les valeurs des variables Airflow et des modèles Jinja dans vos DAG. Cela évite l'exécution inutile de fonctions pendant les battements de cœur du planificateur.

Pour implémenter l'exemple de DAG de manière plus efficace, évitez d'utiliser des variables Airflow au niveau du code Python de premier niveau des DAG. Transmettez plutôt des variables Airflow aux opérateurs existants via un modèle Jinja, ce qui retardera la lecture de la valeur jusqu'à l'exécution de la tâche.

Importez la nouvelle version de l'exemple de DAG dans votre environnement. Dans ce tutoriel, ce DAG est nommé dag_for_loop_airflow_variable_optimized.

from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 2, 17),
    'retries': 0
}

dag = DAG(
    'dag_for_loop_airflow_variable_optimized',
    default_args=default_args,
    catchup=False,
    schedule_interval='@daily'
)

for i in range(1000):
    task = BashOperator(
        task_id=f'bash_use_variable_good_{i}',
        bash_command='echo variable foo=${foo_env}',
        dag=dag,
        env={'foo_env': '{{ var.value.get("example_var") }}'},
    )

Examinez la nouvelle durée d'analyse du DAG:

  1. Attendez la fin de l'exécution du DAG.

  2. Exécutez à nouveau la commande dags report pour afficher le temps d'analyse de tous vos DAG:

    file                | duration       | dag_num | task_num | dags
    ====================+================+=========+==========+=====================
    /dag_for_loop_airfl | 0:00:37.000369 | 1       | 1000     | dag_for_loop_airflow
    ow_variable.py      |                |         |          | _variable
    /dag_for_loop_airfl | 0:00:01.109457 | 1       | 1000     | dag_for_loop_airflow
    ow_variable_optimiz |                |         |          | _variable_optimized
    ed.py               |                |         |          |
    /airflow_monitoring | 0:00:00.040510 | 1       | 1        | airflow_monitoring
    .py                 |                |         |          |
    
  3. Réexaminez les journaux dag-processor-manager et analysez la durée d'analyse.

    Une entrée de journal pour l'exemple de DAG indique que la durée d'analyse du DAG est de 4,21 secondes.
    Figure 14 Les journaux du gestionnaire de processeur DAG indiquent les durées d'analyse des DAG après l'optimisation du code DAG (cliquez pour agrandir)

En remplaçant les variables d'environnement par des modèles Airflow, vous avez simplifié le code DAG et réduit la latence d'analyse d'environ dix fois.

Optimiser les configurations d'environnement Airflow

Le planificateur Airflow tente constamment de déclencher de nouvelles tâches et d'analyser tous les DAG dans le bucket de votre environnement. Si l'analyse de vos DAG prend beaucoup de temps et que le planificateur consomme beaucoup de ressources, vous pouvez optimiser les configurations du planificateur Airflow afin qu'il utilise les ressources plus efficacement.

Dans ce tutoriel, l'analyse des fichiers DAG prend beaucoup de temps, et les cycles d'analyse commencent à se chevaucher, ce qui épuise la capacité du planificateur. Dans notre exemple, l'analyse du premier DAG prend plus de cinq secondes. Vous allez donc configurer le planificateur pour qu'il s'exécute moins fréquemment afin d'utiliser les ressources plus efficacement. Vous allez remplacer l'option de configuration Airflow scheduler_heartbeat_sec. Cette configuration définit la fréquence d'exécution du planificateur (en secondes). Par défaut, cette valeur est définie sur cinq secondes. Vous pouvez modifier cette option de configuration Airflow en la remplaçant.

Remplacez l'option de configuration Airflow scheduler_heartbeat_sec:

  1. Dans la console Google Cloud, accédez à la page Environnements.

    Accéder à la page Environnements

  2. Dans la liste des environnements, cliquez sur le nom de votre environnement. La page Détails de l'environnement s'ouvre.

  3. Accédez à l'onglet Remplacements de configuration Airflow.

  4. Cliquez sur Modifier, puis sur Ajouter un remplacement de configuration Airflow.

  5. Remplacez l'option de configuration Airflow:

    Section Clé Valeur
    scheduler scheduler_heartbeat_sec 10
  6. Cliquez sur Enregistrer et attendez que l'environnement mette à jour sa configuration.

Vérifiez les métriques du planificateur:

  1. Accédez à l'onglet Surveillance, puis sélectionnez Planificateurs.

  2. Dans le graphique Heartbeat du planificateur, cliquez sur le bouton Plus d'options (trois points), puis sur Afficher dans l'explorateur de métriques.

Le graphique de la pulsation du planificateur montre que la pulsation se produit moins fréquemment
Figure 15. Graphique de pulsation du planificateur (cliquez pour agrandir)

Sur le graphique, vous pouvez voir que le planificateur s'exécute deux fois moins fréquemment après avoir modifié la configuration par défaut de 5 secondes à 10 secondes. En réduisant la fréquence des battements de cœur, vous vous assurez que le planificateur ne commence pas à s'exécuter pendant le cycle d'analyse précédent et que la capacité de ressources du planificateur n'est pas épuisée.

Attribuer plus de ressources au planificateur

Dans Cloud Composer 2, vous pouvez allouer davantage de ressources de processeur et de mémoire au planificateur. Vous pouvez ainsi améliorer les performances de votre planificateur et accélérer le temps d'analyse de votre DAG.

Attribuez des ressources de processeur et de mémoire supplémentaires au planificateur:

  1. Dans la console Google Cloud, accédez à la page Environnements.

    Accéder à la page Environnements

  2. Dans la liste des environnements, cliquez sur le nom de votre environnement. La page Détails de l'environnement s'ouvre.

  3. Accédez à l'onglet Configuration de l'environnement.

  4. Accédez à la configuration Ressources > Charges de travail, puis cliquez sur Modifier.

  5. Dans la section Planificateur, dans le champ Mémoire, spécifiez la nouvelle limite de mémoire. Dans ce tutoriel, utilisez 4 Go.

  6. Dans le champ CPU (Processeur), spécifiez la nouvelle limite de processeur. Dans ce tutoriel, utilisez deux vCPU.

  7. Enregistrez les modifications et attendez quelques minutes pour que vos planificateurs Airflow redémarrent.

  8. Accédez à l'onglet Journaux, puis à Tous les journaux > Gestionnaire de traitement des DAG.

  9. Examinez les journaux dag-processor-manager et comparez la durée d'analyse des exemples de DAG:

    Une entrée de journal pour l'exemple de DAG indique que la durée d'analyse du DAG optimisé est de 1,5 seconde. Pour le DAG non optimisé, la durée d'analyse est de 28,71 secondes.
    Figure 16 Les journaux du gestionnaire de processeur DAG indiquent les durées d'analyse des DAG après l'attribution de plus de ressources au planificateur (cliquez pour agrandir)

En attribuant plus de ressources au planificateur, vous avez augmenté sa capacité et réduit considérablement la latence d'analyse par rapport aux configurations d'environnement par défaut. Avec plus de ressources, le planificateur peut analyser les DAG plus rapidement. Toutefois, les coûts associés aux ressources Cloud Composer augmenteront également. De plus, il est impossible d'augmenter les ressources au-delà d'une certaine limite.

Nous vous recommandons de n'allouer des ressources qu'après avoir implémenté les optimisations possibles du code DAG et de la configuration Airflow.

Effectuer un nettoyage

Pour éviter que les ressources utilisées dans ce tutoriel soient facturées sur votre compte Google Cloud , supprimez le projet contenant les ressources ou conservez le projet et supprimez les ressources individuelles.

Supprimer le projet

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

Supprimer des ressources individuelles

Si vous envisagez d'explorer plusieurs tutoriels et guides de démarrage rapide, réutiliser des projets peut vous aider à ne pas dépasser les limites de quotas des projets.

Supprimez l'environnement Cloud Composer. Vous devez également supprimer le bucket de l'environnement au cours de cette procédure.

Étape suivante