Déboguer les problèmes de planification des tâches

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Ce tutoriel vous guide dans le diagnostic et le dépannage de la planification des tâches et d'analyse des problèmes qui conduisent à un dysfonctionnement du planificateur, des erreurs d'analyse et la latence et l'échec d'une tâche.

Présentation

Le programmeur Airflow est principalement affecté par deux facteurs: la planification des tâches et Analyse du DAG Les problèmes liés à l'un de ces facteurs peuvent avoir un impact négatif l'état et les performances de l'environnement.

Parfois, trop de tâches sont planifiées simultanément. Dans ce cas, file d'attente est pleine et les tâches restent dans l'état « planifié ». état ou devenir reprogrammés après avoir été mis en file d'attente, ce qui peut entraîner un échec de la tâche et des performances la latence.

Un autre problème courant concerne l'analyse de la latence et des erreurs causées par la complexité un code DAG. Par exemple, un code DAG contenant des variables Airflow au début du code peut entraîner des retards d'analyse, une surcharge de la base de données, et les délais avant expiration des DAG.

Dans ce tutoriel, vous allez diagnostiquer les exemples de DAG et apprendre à résoudre les problèmes de planification et d'analyse, à améliorer la planification des DAG et à optimiser le code et les configurations d'environnement de vos DAG pour améliorer les performances.

Objectifs

Cette section liste les objectifs des exemples de ce tutoriel.

Exemple : Dysfonctionnement du planificateur et latence causés par une forte concurrence de tâches

  • Importez l'exemple de DAG qui s'exécute plusieurs fois simultanément et diagnostiquez le dysfonctionnement du planificateur et les problèmes de latence avec Cloud Monitoring.

  • Optimisez le code de votre DAG en consolidant les tâches et en évaluant impact sur les performances.

  • Répartissez les tâches de manière plus uniforme au fil du temps et évaluez l'impact sur les performances.

  • optimiser vos configurations Airflow et votre environnement, évaluer l'impact.

Exemple : erreurs d'analyse DAG et latence causées par un code complexe

  • Importer l'exemple de DAG contenant les variables Airflow et diagnostiquer l'analyse les problèmes liés à Cloud Monitoring.

  • Optimiser le code du DAG en évitant les variables Airflow au niveau supérieur le code et évaluer son impact sur le temps d'analyse.

  • Optimisez les configurations Airflow et les configurations d'environnement, puis évaluez l'impact sur le temps d'analyse.

Coûts

Ce tutoriel utilise les composants facturables suivants de Google Cloud :

Une fois que vous avez terminé ce tutoriel, évitez de continuer à payer des frais en supprimant les ressources que vous avez créées. Pour en savoir plus, consultez Effectuer un nettoyage.

Avant de commencer

Cette section décrit les actions requises avant de commencer le tutoriel.

Créer et configurer un projet

Pour ce tutoriel, vous avez besoin d'un projet Google Cloud. Configurez le projet comme suit :

  1. Dans la console Google Cloud, sélectionnez ou créez un projet:

    Accéder au sélecteur de projet

  2. Assurez-vous que la facturation est activée pour votre projet. Découvrez comment vérifier si la facturation est activée sur un projet.

  3. Assurez-vous que l'utilisateur de votre projet Google Cloud dispose des rôles suivants pour créer les ressources nécessaires :

    • Administrateur de l'environnement et des objets Storage (roles/composer.environmentAndStorageObjectAdmin)
    • Administrateur de Compute (roles/compute.admin)

Activer les API pour votre projet.

Enable the Cloud Composer API.

Enable the API

Créer votre environnement Cloud Composer

Créez un environnement Cloud Composer 2.

Pour créer l'environnement, vous accordez l'extension d'agent de service de l'API Cloud Composer v2 (roles/composer.ServiceAgentV2Ext) à l'agent de service Composer de service. Cloud Composer utilise ce compte pour effectuer des opérations dans votre projet Google Cloud.

Exemple : Défaillance du planificateur et échec de la tâche en raison de problèmes de planification

Cet exemple illustre le dysfonctionnement du programmeur et la latence causés par une simultanéité élevée des tâches.

Importer l'exemple de DAG dans votre environnement

Importez l'exemple de DAG suivant dans l'environnement que vous avez créées aux étapes précédentes. Dans ce tutoriel, ce DAG est nommé dag_10_tasks_200_seconds_1

Ce DAG comporte 200 tâches. Chaque tâche attend une seconde et affiche "Terminé". Le DAG se déclenche automatiquement une fois l'importation terminée. Cloud Composer exécute ce DAG 10 fois, et toutes les exécutions du DAG ont lieu en parallèle.

import time
from datetime import datetime, timedelta

from airflow import DAG
from airflow.decorators import task


tasks_amount = 200
seconds = 1
minutes = 5

with DAG(
    dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}",
    start_date=datetime(2023, 11, 22, 20, 0),
    end_date=datetime(2023, 11, 22, 20, 49),
    schedule_interval=timedelta(minutes=minutes),
    catchup=True,
) as dag:

    @task
    def create_subtasks(seconds: int) -> None:
        time.sleep(seconds)

    for i in range(tasks_amount):
        create_subtasks(seconds)

Diagnostiquer les problèmes de dysfonctionnement du planificateur et d'échec des tâches

Une fois l'exécution du DAG terminée, ouvrez l'interface utilisateur d'Airflow, puis cliquez sur le bouton dag_10_tasks_200_seconds_1 DAG. Vous verrez que 10 exécutions de DAG au total ont été réussies, et chacune comporte 200 tâches réussies.

Consultez les journaux des tâches Airflow :

  1. Dans la console Google Cloud, accédez à la page Environnements.

    Accéder à la page Environnements

  2. Dans la liste des environnements, cliquez sur le nom de votre environnement. La page Détails de l'environnement s'ouvre.

  3. Accédez à l'onglet Journaux, puis à Tous les journaux > Journaux Airflow > Nœuds de calcul > Afficher dans l'explorateur de journaux.

L'histogramme des journaux affiche les erreurs et les avertissements en rouge et orange:

Histogramme des journaux de nœuds de calcul Airflow avec des erreurs et des avertissements
    indiqués par des couleurs rouge et orange
Figure 1 : Histogramme des journaux des nœuds de calcul Airflow (cliquez pour agrandir)

L'exemple de DAG a généré environ 130 avertissements et 60 erreurs. Cliquez n'importe où une colonne qui contient des barres jaunes et rouges. Certaines des avertissements et erreurs suivants s'affichent dans les journaux :

State of this instance has been externally set to success. Terminating
instance.

Received SIGTERM. Terminating subprocesses.

worker: Warm shutdown (MainProcess).

Ces journaux peuvent indiquer que l'utilisation des ressources a dépassé les limites le nœud de calcul a redémarré lui-même.

Si une tâche Airflow est conservée dans la file d'attente trop longtemps, le planificateur marque en échec et en up_for_retry, et va replanifier l'opération pour l'exécution. Pour observer les symptômes de cette situation, vous pouvez consulter le graphique du nombre de tâches en file d'attente. Si les pics de ce graphique ne diminuent pas au bout d'environ 10 minutes, il est probable que des tâches échouent (sans journaux).

Examinez les informations de surveillance :

  1. Accédez à l'onglet Monitoring (Surveillance), puis sélectionnez Overview (Aperçu).

  2. Examinez le graphique Tâches Airflow.

    Graphique des tâches Airflow au fil du temps, montrant un pic du nombre de tâches en file d'attente
    Figure 2 : Graphique des tâches Airflow (cliquez pour agrandir)

    Dans le graphique des tâches Airflow, il existe un pic de tâches en file d'attente qui dure plus de 10 minutes, ce qui peut signifier qu'il n'y a pas suffisamment de ressources dans votre environnement pour traiter toutes les tâches planifiées.

  3. Examinez le graphique Nœuds de calcul actifs :

    Le graphique des nœuds de calcul Airflow actifs au fil du temps montre que le nombre de nœuds de calcul actifs a été augmenté jusqu'à la limite maximale.
    Figure 3 : Graphique des nœuds de calcul actifs (cliquez pour agrandir)

    Le graphique Nœuds de calcul actifs indique que le DAG a déclenché l'autoscaling à la limite maximale autorisée de trois nœuds de calcul pendant l'exécution du DAG.

  4. Les graphiques d'utilisation des ressources peuvent indiquer le manque de capacité des nœuds de calcul Airflow. pour exécuter des tâches en file d'attente. Dans l'onglet Surveillance, sélectionnez Nœuds de calcul, puis examinez les graphiques Utilisation totale du processeur des nœuds de calcul et Utilisation totale de la mémoire des nœuds de calcul.

    Graphique représentant l'utilisation du processeur par les nœuds de calcul Airflow
    augmentant jusqu'à la limite maximale
    Figure 4 : Graphique de l'utilisation totale du processeur par les nœuds de calcul (cliquez pour agrandir)
    <ph type="x-smartling-placeholder">
    </ph> Le graphique de l&#39;utilisation de la mémoire par les nœuds de calcul Airflow montre que l&#39;utilisation de la mémoire augmente, mais qu&#39;elle n&#39;atteint pas la limite maximale.
    Figure 5 : Graphique de l'utilisation totale de la mémoire des nœuds de calcul (cliquez pour agrandir)

    Les graphiques indiquent que l'exécution simultanée d'un trop grand nombre de tâches a permis d'atteindre la limite du processeur. Les ressources avaient été utilisées minutes, ce qui est encore plus long que la durée totale de 200 tâches en 10 Le DAG s'exécute une par une.

Voici les indicateurs de remplissage de la file d'attente et d'un manque de ressources pour traiter toutes les tâches planifiées.

Consolider vos tâches

Le code actuel crée de nombreux DAG et tâches sans ressources suffisantes pour traiter toutes les tâches en parallèle, ce qui entraîne le remplissage de la file d'attente. Laisser les tâches dans la file d'attente trop longtemps peut entraîner leur reprogrammation ou leur échec. Dans ce cas, nous vous conseillons de limiter le nombre tâches.

L'exemple de DAG suivant modifie le nombre de tâches de l'exemple initial de 200 à 20 et augmente le temps d'attente de 1 à 10 secondes pour imiter des tâches plus consolidées qui effectuent la même quantité de travail.

Importez l'exemple de DAG suivant dans l'environnement que vous avez créé. Dans ce tutoriel, ce DAG est nommé dag_10_tasks_20_seconds_10.

import time
from datetime import datetime, timedelta

from airflow import DAG
from airflow.decorators import task


tasks_amount = 20
seconds = 10
minutes = 5

with DAG(
    dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}",
    start_date=datetime(2021, 12, 22, 20, 0),
    end_date=datetime(2021, 12, 22, 20, 49),
    schedule_interval=timedelta(minutes=minutes),
    catchup=True,
) as dag:

    @task
    def create_subtasks(seconds: int) -> None:
        time.sleep(seconds)

    for i in range(tasks_amount):
        create_subtasks(seconds)

Évaluez l'impact de tâches plus consolidées sur les processus de planification:

  1. Attendez que les exécutions du DAG soient terminées.

  2. Dans l'interface utilisateur d'Airflow, sur la page DAG, cliquez sur le DAG dag_10_tasks_20_seconds_10. Vous verrez 10 exécutions de DAG, chacune comptant 20 exécutions des tâches qui ont réussi.

  3. Dans la console Google Cloud, accédez à la page Environnements.

    Accéder à la page Environnements

  4. Dans la liste des environnements, cliquez sur le nom de votre environnement. La page Détails de l'environnement s'ouvre.

  5. Accédez à l'onglet Journaux, puis à Tous les journaux > Journaux Airflow > Nœuds de calcul > Afficher dans l'explorateur de journaux.

    Le deuxième exemple avec des tâches plus consolidées a généré environ 10 avertissements et 7 erreurs. Sur l'histogramme, vous pouvez comparer le nombre erreurs et avertissements dans l'exemple initial (valeurs précédentes), et dans la deuxième exemple (valeurs suivantes).

    Histogramme des journaux de nœuds de calcul Airflow avec des erreurs et des avertissements
    indique la diminution du nombre d&#39;erreurs et d&#39;avertissements
    consolidé
    Figure 6 : Le nœud de calcul Airflow consigne l'histogramme tâches consolidées (cliquez pour agrandir)

    Lorsque vous comparez le premier exemple avec le plus consolidé, vous pouvez le nombre d'erreurs et d'avertissements au cours de la deuxième à titre d'exemple. Toutefois, les mêmes erreurs liées à un arrêt tiède apparaissent toujours les journaux en raison d'une surcharge de ressources.

  6. Dans l'onglet Surveillance, sélectionnez Nœuds de calcul, puis examinez les graphiques.

    Lorsque vous comparez le graphique Tâches Airflow du premier exemple (précédemment ) avec le graphique du deuxième exemple présentant des tâches plus consolidées, vous pouvez constater que le pic de tâches en file d'attente a duré plus longtemps moment où les tâches étaient plus consolidées. Cependant, il durait près de 10 minutes ce qui n'est pas optimal.

    Le graphique des tâches Airflow au fil du temps montre que le pic de
    Les tâches Airflow ont duré moins longtemps qu&#39;avant.
    Figure 7 : Graphique des tâches Airflow après la consolidation des tâches (cliquez pour agrandir)

    Sur le graphique "Nœuds de calcul actifs", vous pouvez voir que le premier exemple (à gauche du graphique) a utilisé des ressources pendant une période beaucoup plus longue que le second, même si les deux exemples imitent la même quantité de travail.

    Le graphique des nœuds de calcul Airflow actifs au fil du temps montre que
    le nombre de nœuds de calcul actifs a augmenté sur une plus courte période
    qu&#39;avant.
    Figure 8 : Graphique des nœuds de calcul actifs après la consolidation des tâches (cliquez pour agrandir)

    Examinez les graphiques de consommation des ressources des nœuds de calcul. Même si la différence entre les ressources utilisées dans l'exemple avec des tâches plus consolidées et l'exemple initial est assez importante, l'utilisation du processeur atteint toujours 70 % de la limite.

    Le graphique de l&#39;utilisation du processeur par les nœuds de calcul Airflow montre que l&#39;utilisation du processeur augmente jusqu&#39;à 70 % de la limite maximale.
    Figure 9 Graphique de l'utilisation totale du processeur pour les nœuds de calcul après la consolidation des tâches (cliquez pour agrandir)
    Le graphique illustrant l&#39;utilisation de la mémoire par les nœuds de calcul Airflow indique que l&#39;utilisation de la mémoire augmente, mais n&#39;atteint pas la limite maximale.
    Figure 10 Graphique d'utilisation totale de la mémoire des nœuds de calcul après les tâches ont été consolidées (cliquez pour agrandir)

Répartir les tâches de manière plus uniforme au fil du temps

Trop de tâches simultanées entraînent le remplissage de la file d'attente, ce qui conduit à tâches bloquées dans la file d'attente ou reprogrammées. Au cours des étapes précédentes, vous avez a réduit le nombre de tâches en les regroupant, mais les résultats les journaux et la surveillance indiquent que le nombre de tâches simultanées est toujours sous-optimale.

Vous pouvez contrôler le nombre d'exécutions de tâches simultanées en implémentant une planification ou en définissant des limites pour le nombre de tâches pouvant être exécutées simultanément.

Dans ce tutoriel, vous allez répartir les tâches de manière plus uniforme au fil du temps en ajoutant des paramètres au niveau du DAG dans le DAG dag_10_tasks_20_seconds_10 :

  1. Ajout de l'argument max_active_runs=1 au gestionnaire de contexte du DAG. Cet argument définit la limite d'une seule instance d'une exécution du DAG à un moment donné.

  2. Ajoutez l'argument max_active_tasks=5 au gestionnaire de contexte DAG. Cet argument contrôle le nombre maximal d'instances de tâche pouvant s'exécuter simultanément dans chaque DAG.

Importez l'exemple de DAG suivant dans l'environnement que vous avez créé. Dans ce tutoriel, ce DAG est nommé dag_10_tasks_20_seconds_10_scheduled.py

import time
from datetime import datetime, timedelta

from airflow import DAG
from airflow.decorators import task


tasks_amount = 20
seconds = 10
minutes = 5
active_runs = 1
active_tasks = 5


with DAG(
    dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}_runs_{active_runs}_tasks_{active_tasks}",
    start_date=datetime(2021, 12, 22, 20, 0),
    end_date=datetime(2021, 12, 22, 20, 49),
    schedule_interval=timedelta(minutes=minutes),
    max_active_runs=active_runs,
    max_active_tasks=active_tasks,
    catchup=True,
) as dag:

    @task
    def create_subtasks(seconds: int) -> None:
        time.sleep(seconds)

    for i in range(tasks_amount):
        create_subtasks(seconds)

Évaluez l'impact de la distribution des tâches au fil du temps sur les processus de planification :

  1. Attendez que les exécutions du DAG soient terminées.

  2. Dans la console Google Cloud, accédez à la page Environnements.

    Accéder à la page Environnements

  3. Dans la liste des environnements, cliquez sur le nom de votre environnement. La page Détails de l'environnement s'ouvre.

  4. Accédez à l'onglet Journaux, puis à Tous les journaux > Journaux Airflow > Nœuds de calcul > Afficher dans l'explorateur de journaux.

  5. Sur l'histogramme, vous pouvez voir que le troisième DAG, qui comporte un nombre limité de tâches et d'exécutions actives, n'a généré aucune alerte ni aucune erreur, et que la distribution des journaux semble plus uniforme par rapport aux valeurs précédentes.

    L&#39;histogramme des journaux des nœuds de calcul Airflow contenant des erreurs et des avertissements n&#39;affiche aucune erreur ni avertissement une fois les tâches consolidées et réparties au fil du temps.
    Figure 11 Le nœud de calcul Airflow consigne l'histogramme tâches consolidées et distribuées au fil du temps (cliquez pour agrandir)

Les tâches de l'exemple dag_10_tasks_20_seconds_10_scheduled qui ont un un nombre limité de tâches et d'exécutions actives n'a pas entraîné de pression sur les ressources, car les tâches étaient mises en file d’attente uniformément.

Après avoir suivi les étapes décrites, vous avez optimisé l'utilisation des ressources en regroupant les petites tâches et en les répartissant plus uniformément au fil du temps.

Optimiser les configurations d'environnement

Vous pouvez ajuster les configurations de votre environnement pour vous assurer que les nœuds de calcul Airflow disposent toujours de la capacité nécessaire pour exécuter les tâches en file d'attente.

Nombre de nœuds de calcul et simultanéité des nœuds de calcul

Vous pouvez ajuster le nombre maximal de nœuds de calcul pour que Cloud Composer effectue un scaling automatique de votre environnement dans les limites définies.

Le paramètre [celery]worker_concurrency définit le nombre maximal de tâches qu'un seul nœud de calcul peut récupérer dans la file d'attente de tâches. La modification de ce paramètre ajuste le nombre de tâches qu'un seul nœud de calcul peut exécuter simultanément. Vous pouvez modifier cette option de configuration Airflow en la remplaçant. Par défaut, la simultanéité des nœuds de calcul est définie sur un minimum de 32, 12 * worker_CPU, 8 * worker_memory, ce qui signifie qu'elle dépend des limites de ressources des nœuds de calcul. Pour en savoir plus sur les valeurs de simultanéité par défaut des workers, consultez la section Optimiser les environnements.

Le nombre de nœuds de calcul et la concurrence entre eux sont combinés, et les performances de votre environnement dépendent fortement de ces deux paramètres. Pour choisir la bonne combinaison, vous pouvez tenir compte des éléments suivants:

  • Exécution de plusieurs tâches rapides en parallèle Vous pouvez augmenter le nombre lorsque des tâches sont en attente dans la file d'attente et que vos nœuds de calcul utilisent un faible pourcentage de leurs processeurs et de la mémoire en même temps. Toutefois, dans certaines circonstances, la file d'attente peut ne jamais se remplir, ce qui empêche l'autoscaling de se déclencher. Si les petites tâches sont terminées au moment où les nouveaux nœuds de calcul sont prêts, un nœud de calcul existant peut prendre en charge les tâches restantes, et il n'y aura aucune tâche pour les nœuds de calcul nouvellement créés.

    Dans ces situations, il est recommandé augmenter le nombre minimal de nœuds de calcul et augmenter leur simultanéité ; pour éviter un scaling excessif.

  • Plusieurs longues tâches s'exécutant en parallèle. La simultanéité élevée des nœuds de calcul empêche le système de faire évoluer le nombre de nœuds de calcul. Si plusieurs tâches sont gourmandes en ressources et prennent beaucoup de temps à s'exécuter, une simultanéité élevée des nœuds de calcul peut entraîner une file d'attente qui ne se remplit jamais et toutes les tâches sont traitées par un seul nœud de calcul, ce qui entraîne des problèmes de performances. Dans ces situations, il est recommandé de augmenter le nombre maximal de nœuds de calcul et réduire la simultanéité des nœuds de calcul.

L'importance du parallélisme

Les programmeurs Airflow contrôlent la planification des exécutions de DAG et des tâches individuelles DAG. L'option de configuration Airflow [core]parallelism contrôle le nombre de tâches que le programmeur Airflow peut mettre en file d'attente dans la file d'attente de l'exécuteur une fois que toutes les dépendances pour ces tâches sont satisfaites.

Le parallélisme est un mécanisme de protection d'Airflow qui détermine le nombre de tâches pouvant être exécutées en même temps par chaque planificateur, quel que soit le nombre de nœuds de calcul. La valeur de parallélisme, multipliée par le nombre de planificateurs de votre cluster, correspond au nombre maximal d'instances de tâche que votre environnement peut mettre en file d'attente.

Généralement, [core]parallelism est défini comme le produit d'un nombre maximal de nœuds de calcul et [celery]worker_concurrency. Elle est également affectée par pool. Vous pouvez modifier cette option de configuration Airflow en la remplaçant. Pour en savoir plus sur l'ajustement des configurations Airflow liées à la mise à l'échelle, consultez la section Mettre à l'échelle la configuration Airflow.

Trouver les configurations d'environnement optimales

La méthode recommandée pour résoudre les problèmes de planification consiste à regrouper les petites tâches dans les tâches plus importantes et de répartir les tâches plus uniformément au fil du temps. En plus d'optimiser le code DAG, vous pouvez également optimiser les configurations d'environnement afin de disposer d'une capacité suffisante pour exécuter plusieurs tâches simultanément.

Par exemple, supposons que vous regroupiez des tâches dans votre DAG. autant que possible, mais en limitant les tâches actives pour les répartir plus uniformément sur n'est pas une solution à privilégier pour votre cas d'utilisation spécifique.

Vous pouvez ajuster le parallélisme, le nombre de nœuds de calcul et la simultanéité des nœuds de calcul pour exécuter le DAG dag_10_tasks_20_seconds_10 sans limiter les paramètres tâches. Dans cet exemple, le DAG s'exécute 10 fois, et chaque exécution contient 20 petites tâches. Si vous souhaitez les exécuter simultanément :

  • Vous aurez besoin d'un environnement de plus grande taille, car il permet de contrôler les performances de l'infrastructure Cloud Composer gérée de votre environnement.

  • Les nœuds de calcul Airflow doivent pouvoir exécuter 20 tâches simultanément, ce qui signifie vous devez définir la simultanéité des nœuds de calcul sur 20.

  • Les nœuds de calcul ont besoin de suffisamment de processeurs et de mémoire pour gérer toutes les tâches. Nœud de calcul dépend du processeur et de la mémoire des nœuds de calcul. Par conséquent, vous aurez besoin au moins worker_concurrency / 12 au processeur et à least worker_concurrency / 8 en mémoire.

  • Vous devrez augmenter le parallélisme pour correspondre à la simultanéité des nœuds de calcul plus élevée. Pour que les nœuds de calcul récupèrent 20 tâches de la file d'attente, le planificateur devez d’abord planifier ces 20 tâches.

Ajustez les configurations de votre environnement comme suit :

  1. Dans la console Google Cloud, accédez à la page Environnements.

    Accéder à la page Environnements

  2. Dans la liste des environnements, cliquez sur le nom de votre environnement. La page Détails de l'environnement s'ouvre.

  3. Accédez à l'onglet Configuration de l'environnement.

  4. Recherchez la section Resources (Ressources) &gt; Workloads (Charges de travail). cliquez sur Modifier.

  5. Dans la section Worker (Nœud de calcul), dans le champ Memory (Mémoire), spécifiez la nouvelle limite de mémoire pour les nœuds de calcul Airflow. Dans ce tutoriel, utilisez 4 Go.

  6. Dans le champ CPU, spécifiez la nouvelle limite de processeur pour les nœuds de calcul Airflow. Dans ce utilisez deux vCPU.

  7. Enregistrez les modifications et attendez quelques minutes pour que vos nœuds de calcul Airflow redémarrent.

Ensuite, remplacez les options de configuration Airflow pour le parallélisme et la simultanéité des nœuds de calcul :

  1. Accédez à l'onglet Remplacements de configuration Airflow.

  2. Cliquez sur Modifier, puis sur Ajouter un remplacement de configuration Airflow.

  3. Remplacez la configuration de parralélisme:

    Section Clé Valeur
    core parallelism 20
  4. Cliquez sur Add Airflow Configuration Override (Ajouter un remplacement de configuration Airflow) et remplacez la configuration de la concurrence des workers :

    Section Clé Valeur
    celery worker_concurrency 20
  5. Cliquez sur Enregistrer et attendez que l'environnement mette à jour sa configuration.

Déclenchez à nouveau l'exemple de DAG avec les configurations ajustées :

  1. Dans l'interface utilisateur d'Airflow, accédez à la page DAG.

  2. Recherchez le DAG dag_10_tasks_20_seconds_10, puis supprimez-le.

    Une fois le DAG supprimé, Airflow vérifie le dossier des DAG dans le bucket de votre environnement et exécute automatiquement le DAG à nouveau.

Une fois les exécutions du DAG terminées, examinez à nouveau l'histogramme des journaux. Sur le diagramme, vous pouvez voir que l'exemple dag_10_tasks_20_seconds_10 avec plus les tâches consolidées n'ont pas généré d'erreurs ni d'avertissements lors de l'exécution avec la configuration de l'environnement ajustée. Comparer les résultats aux données précédentes sur le diagramme, où le même exemple a généré des erreurs et des avertissements avec la configuration d'environnement tge par défaut.

L&#39;histogramme des journaux des nœuds de calcul Airflow contenant des erreurs et des avertissements n&#39;affiche aucune erreur ni avertissement après avoir ajusté la configuration de l&#39;environnement.
Figure 12 Histogramme des journaux des nœuds de calcul Airflow après l'ajustement de la configuration de l'environnement (cliquez pour agrandir)

Les configurations d'environnement et les configurations Airflow jouent un rôle crucial dans la planification des tâches. Toutefois, il n'est pas possible d'augmenter les configurations au-delà de certaines limites.

Nous vous recommandons d'optimiser le code du DAG, de consolider les tâches et d'utiliser la planification pour des performances et une efficacité optimales.

Exemple : erreurs d'analyse et latence du DAG en raison d'un code DAG complexe

Dans cet exemple, vous allez examiner la latence d'analyse d'un exemple de DAG qui imite un excès de variables Airflow.

Créer une variable Airflow

Avant d'importer l'exemple de code, créez une variable Airflow.

  1. Dans la console Google Cloud, accédez à la page Environnements.

    Accéder à la page Environnements

  2. Dans la colonne Serveur Web Airflow, cliquez sur le lien Airflow correspondant à votre environnement.

  3. Accédez à Admin &gt; Variables &gt; Ajouter un enregistrement

  4. Définissez les valeurs suivantes :

    • clé: example_var
    • val : test_airflow_variable

Importer l'exemple de DAG dans votre environnement

Importez l'exemple de DAG suivant dans l'environnement que vous avez créées aux étapes précédentes. Dans ce tutoriel, ce DAG est nommé dag_for_loop_airflow_variable

Ce DAG contient une boucle for qui s'exécute 1 000 fois et imite un excès de variables Airflow. Chaque itération lit la variable example_var et génère une tâche. Chaque tâche contient une commande qui imprime l'ID de la variable .

from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.models import Variable

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 2, 17),
    'retries': 0
}

dag = DAG(
    'dag_for_loop_airflow_variable',
    default_args=default_args,
    catchup=False,
    schedule_interval="@daily"
)

for i in range(1000):
    a = Variable.get('example_var', 'N/A')
    task = BashOperator(
        task_id=f'task_{i}',
        bash_command='echo variable foo=${foo_env}',
        dag=dag,
        env={'foo_env': a}
    )

Diagnostiquer les problèmes d'analyse

Le temps d'analyse du DAG correspond au temps nécessaire au planificateur Airflow pour lire et analyser un fichier DAG. Avant que le programmeur Airflow puisse planifier une tâche à partir d'un DAG, le programmeur doit analyser le fichier DAG pour découvrir la structure le DAG et les tâches définies.

Si l'analyse d'un DAG prend beaucoup de temps, elle consomme la capacité de l'ordonnanceur et peut réduire les performances des exécutions de DAG.

Pour surveiller la durée d'analyse des DAG :

  1. Exécutez la commande CLI Airflow dags report dans gcloud CLI pour afficher le temps d'analyse de tous vos DAG:

    gcloud composer environments run ENVIRONMENT_NAME \
        --location LOCATION \
        dags report
    

    Remplacez les éléments suivants :

    • ENVIRONMENT_NAME : nom de votre environnement
    • LOCATION: région dans laquelle se trouve l'environnement.
  2. Dans le résultat de la commande, recherchez la valeur de durée de la dag_for_loop_airflow_variables DAG. Une valeur élevée peut indiquer qu'un ce DAG n'est pas implémenté de manière optimale. Si vous avez plusieurs DAG, de la table de sortie, vous pouvez identifier les DAG dont le temps d'analyse est long.

    Exemple :

    file                | duration       | dag_num | task_num | dags
    ====================+================+=========+==========+=====================
    /dag_for_loop_airfl | 0:00:14.773594 | 1       | 1000     | dag_for_loop_airflow
    ow_variable.py      |                |         |          | _variable
    /airflow_monitoring | 0:00:00.003035 | 1       | 1        | airflow_monitoring
    .py
    
    
  3. Inspectez les temps d'analyse des DAG dans la console Google Cloud:

    1. Dans la console Google Cloud, accédez à la page Environnements.

    Accéder à la page Environnements

  4. Dans la liste des environnements, cliquez sur le nom de votre environnement. La page Détails de l'environnement s'ouvre.

  5. Accédez à l'onglet Journaux, puis à Tous les journaux &gt; Gestionnaire de processeur DAG.

  6. Examinez les journaux dag-processor-manager et identifiez les problèmes éventuels.

    Une entrée de journal pour l&#39;exemple de DAG indique que la durée d&#39;analyse du DAG est de 46,3 secondes.
    Figure 13 : Les journaux du gestionnaire de processeur DAG affichent le DAG temps d'analyse (cliquez pour agrandir)

Si la durée d'analyse totale des DAG dépasse environ 10 secondes, vos planificateurs peuvent être surchargés par l'analyse des DAG et ne pas pouvoir exécuter efficacement les DAG.

Optimiser le code du DAG

Nous vous recommandons d'éviter le code Python "de niveau supérieur" inutile dans vos DAG. Les DAG contenant de nombreuses importations, variables et fonctions en dehors du DAG entraînent des temps d'analyse plus longs pour le planificateur Airflow. Cela réduit les performances et l'évolutivité Cloud Composer et Airflow. Un excès de lecture de variables Airflow entraîne une longue durée d'analyse et une charge de base de données élevée. Si ce code se trouve dans un DAG , ces fonctions s'exécutent à chaque pulsation du planificateur, ce qui peut être lent.

Les champs de modèle d'Airflow vous permettent d'intégrer les valeurs des variables Airflow et des modèles Jinja dans vos DAG. Cela évite l'exécution inutile de fonctions pendant les battements de cœur du planificateur.

Pour améliorer la mise en œuvre de l'exemple de DAG, évitez d'utiliser des variables Airflow dans le code Python de premier niveau des DAG. Transmettez plutôt des variables Airflow aux opérateurs existants via un modèle Jinja, ce qui retardera la lecture de la valeur jusqu'à l'exécution de la tâche.

Importez la nouvelle version de l'exemple de DAG dans votre environnement. Dans ce tutoriel, ce DAG est nommé dag_for_loop_airflow_variable_optimized.

from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 2, 17),
    'retries': 0
}

dag = DAG(
    'dag_for_loop_airflow_variable_optimized',
    default_args=default_args,
    catchup=False,
    schedule_interval='@daily'
)

for i in range(1000):
    task = BashOperator(
        task_id=f'bash_use_variable_good_{i}',
        bash_command='echo variable foo=${foo_env}',
        dag=dag,
        env={'foo_env': '{{ var.value.get("example_var") }}'},
    )

Inspectez la durée d'analyse du nouveau DAG:

  1. Attendez la fin de l'exécution du DAG.

  2. Exécutez à nouveau la commande dags report pour afficher le temps d'analyse de tous vos DAG :

    file                | duration       | dag_num | task_num | dags
    ====================+================+=========+==========+=====================
    /dag_for_loop_airfl | 0:00:37.000369 | 1       | 1000     | dag_for_loop_airflow
    ow_variable.py      |                |         |          | _variable
    /dag_for_loop_airfl | 0:00:01.109457 | 1       | 1000     | dag_for_loop_airflow
    ow_variable_optimiz |                |         |          | _variable_optimized
    ed.py               |                |         |          |
    /airflow_monitoring | 0:00:00.040510 | 1       | 1        | airflow_monitoring
    .py                 |                |         |          |
    
  3. Réexaminez les journaux dag-processor-manager et analysez la durée d'analyse.

    Une entrée de journal pour l&#39;exemple de DAG indique que la durée d&#39;analyse du DAG est de 4,21 secondes.
    Figure 14 Les journaux du gestionnaire de processeur DAG indiquent les durées d'analyse des DAG après l'optimisation du code DAG (cliquez pour agrandir)

En remplaçant les variables d'environnement par des modèles Airflow, vous avez simplifié le code DAG et a réduit la latence d'analyse d'environ 10 fois.

Optimiser les configurations d'environnement Airflow

Le planificateur Airflow tente constamment de déclencher de nouvelles tâches et d'analyser tous les DAG dans le bucket de votre environnement. Si la durée d'analyse de vos DAG est longue et que le consomme beaucoup de ressources, vous pouvez l'optimiser configurations afin que le planificateur utilise les ressources plus efficacement.

Dans ce tutoriel, l'analyse des fichiers DAG prend beaucoup de temps, et les cycles d'analyse commencent à se chevaucher, ce qui épuise la capacité du planificateur. Dans notre exemple, l'analyse du premier exemple de DAG prend plus de 5 secondes. Vous allez donc configurer le programmeur de s'exécuter moins fréquemment pour utiliser les ressources plus efficacement. Vous allez remplacer l'option de configuration Airflow scheduler_heartbeat_sec. Cette configuration définit la fréquence d'exécution du planificateur (en secondes). Par défaut, la valeur est définie sur 5 secondes. Vous pouvez modifier cette option de configuration Airflow en la remplaçant.

Remplacez l'option de configuration Airflow scheduler_heartbeat_sec :

  1. Dans la console Google Cloud, accédez à la page Environnements.

    Accéder à la page Environnements

  2. Dans la liste des environnements, cliquez sur le nom de votre environnement. La page Détails de l'environnement s'ouvre.

  3. Accédez à l'onglet Remplacements de configuration Airflow.

  4. Cliquez sur Modifier, puis sur Ajouter un remplacement de configuration Airflow.

  5. Remplacez l'option de configuration Airflow :

    Section Clé Valeur
    scheduler scheduler_heartbeat_sec 10
  6. Cliquez sur Enregistrer et attendez que l'environnement mette à jour sa configuration.

Vérifiez les métriques du planificateur :

  1. Accédez à l'onglet Surveillance, puis sélectionnez Planificateurs.

  2. Dans le graphique Heartbeat du planificateur, cliquez sur le bouton Plus d'options (trois points), puis sur Afficher dans l'explorateur de métriques.

<ph type="x-smartling-placeholder"></ph> Le graphique des pulsations du programmeur montre que les pulsations sont moins fréquentes
Figure 15. Graphique de pulsation du planificateur (cliquez pour agrandir)

Sur le graphique, vous pouvez voir que le planificateur s'exécute deux fois moins fréquemment après avoir modifié la configuration par défaut de 5 secondes à 10 secondes. En réduisant la fréquence des battements de cœur, vous vous assurez que le planificateur ne commence pas à s'exécuter pendant le cycle d'analyse précédent et que la capacité de ressources du planificateur n'est pas épuisée.

Attribuer davantage de ressources au planificateur

Dans Cloud Composer 2, vous pouvez allouer davantage de ressources de processeur et de mémoire au scheduler. Vous pouvez ainsi améliorer les performances de votre planificateur et accélérer le temps d'analyse de votre DAG.

Allouez du processeur et de la mémoire supplémentaires au programmeur:

  1. Dans la console Google Cloud, accédez à la page Environnements.

    Accéder à la page Environnements

  2. Dans la liste des environnements, cliquez sur le nom de votre environnement. La page Détails de l'environnement s'ouvre.

  3. Accédez à l'onglet Configuration de l'environnement.

  4. Accédez à la configuration Ressources > Charges de travail, puis cliquez sur Modifier.

  5. Dans la section Scheduler (Planificateur), dans le champ Memory (Mémoire), spécifiez la nouvelle limite de mémoire. Dans ce tutoriel, utilisez 4 Go.

  6. Dans le champ Processeur, spécifiez la nouvelle limite de processeur. Dans ce utilisez deux vCPU.

  7. Enregistrez les modifications et attendez quelques minutes pour que vos programmeurs Airflow puissent redémarrer.

  8. Accédez à l'onglet Journaux, puis à Tous les journaux &gt; Gestionnaire de processeur DAG.

  9. Examinez les journaux dag-processor-manager et comparez la durée d'analyse pour le Exemples de DAG:

    Une entrée de journal pour l&#39;exemple de DAG indique que le temps d&#39;analyse du DAG optimisé est de 1,5 seconde. Pour le DAG non optimisé, le temps d&#39;analyse est de 28,71 secondes
    Figure 16 : Les journaux du gestionnaire de processeur DAG indiquent les durées d'analyse des DAG après l'attribution de plus de ressources au planificateur (cliquez pour agrandir)

En attribuant plus de ressources au planificateur, vous avez augmenté sa capacité et réduit considérablement la latence d'analyse par rapport aux configurations d'environnement par défaut. Avec plus de ressources, le planificateur peut analyser les DAG plus rapidement. Toutefois, les coûts associés aux ressources Cloud Composer augmenteront également. De plus, il n'est pas possible d'augmenter la valeur au-delà d'une certaine limite.

Nous vous recommandons de n'allouer des ressources qu'après le code DAG possible et Des optimisations de configuration Airflow ont été mises en œuvre.

Effectuer un nettoyage

Pour éviter que les ressources soient facturées sur votre compte Google Cloud, utilisées dans ce tutoriel, supprimez le projet qui contient les ressources ou conserver le projet et supprimer les ressources individuelles.

Supprimer le projet

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

Supprimer des ressources individuelles

Si vous envisagez d'explorer plusieurs tutoriels et guides de démarrage rapide, réutiliser des projets peut vous aider à ne pas dépasser les limites de quotas des projets.

Supprimez l'environnement Cloud Composer. Vous devez également supprimer le bucket de l'environnement au cours de cette procédure.

Étape suivante