Cloud Composer 1 | Cloud Composer 2
Ce tutoriel vous guide dans le diagnostic et la résolution des problèmes de planification et d'analyse des tâches qui entraînent un dysfonctionnement du programmeur, des erreurs d'analyse et de la latence, ainsi que des échecs de tâches.
Introduction
Le programmeur Airflow est principalement affecté par deux facteurs: la planification des tâches et l'analyse du DAG. Les problèmes liés à l'un de ces facteurs peuvent avoir un impact négatif sur la santé et les performances de l'environnement.
Parfois, trop de tâches sont planifiées simultanément. Dans cette situation, la file d'attente est remplie et les tâches restent à l'état "planifiée" ou sont reprogrammées après avoir été mises en file d'attente, ce qui peut entraîner l'échec des tâches et une latence des performances.
Un autre problème courant concerne l'analyse de la latence et des erreurs causées par la complexité du code d'un DAG. Par exemple, un code DAG contenant des variables Airflow au niveau supérieur peut entraîner des délais d'analyse, une surcharge de la base de données, des échecs de planification et des délais d'inactivité du DAG.
Dans ce tutoriel, vous allez diagnostiquer les exemples de DAG et apprendre à résoudre les problèmes de planification et d'analyse, à améliorer la planification des DAG, et à optimiser le code des DAG et les configurations d'environnement pour améliorer les performances.
Objectifs
Cette section liste les objectifs des exemples de ce tutoriel.
Exemple: dysfonctionnement du programmeur et latence causés par un nombre élevé de simultanéités de tâches
Importer l'exemple de DAG qui s'exécute plusieurs fois simultanément, et diagnostiquer les dysfonctionnements du programmeur et les problèmes de latence avec Cloud Monitoring
Optimisez le code de votre DAG en regroupant les tâches et en évaluant l'impact sur les performances.
Répartissez les tâches plus régulièrement dans le temps et évaluez l'impact sur les performances.
Optimisez vos configurations Airflow et les configurations d'environnement, et évaluez l'impact.
Exemple: erreurs d'analyse du DAG et latence causées par un code complexe
Importer l'exemple de DAG avec des variables Airflow et diagnostiquer les problèmes d'analyse avec Cloud Monitoring
Optimiser le code du DAG en évitant les variables Airflow au niveau supérieur du code et évaluer l'impact sur la durée d'analyse.
Optimiser les configurations Airflow et l'environnement, et évaluer l'impact sur la durée d'analyse
Coûts
Ce tutoriel utilise les composants facturables suivants de Google Cloud :
- Cloud Composer (consultez la section sur les coûts supplémentaires)
- Cloud Monitoring
Une fois que vous avez terminé ce tutoriel, évitez de continuer à payer des frais en supprimant les ressources que vous avez créées. Pour en savoir plus, consultez la section Effectuer un nettoyage.
Avant de commencer
Cette section décrit les actions requises avant de démarrer le tutoriel.
Créer et configurer un projet
Pour ce tutoriel, vous avez besoin d'un projet Google Cloud. Configurez le projet de la manière suivante:
Dans la console Google Cloud, sélectionnez ou créez un projet:
Assurez-vous que la facturation est activée pour votre projet. Découvrez comment vérifier si la facturation est activée sur un projet.
Assurez-vous que l'utilisateur de votre projet Google Cloud dispose des rôles suivants pour créer les ressources nécessaires:
- Administrateur de l'environnement et des objets Storage (
roles/composer.environmentAndStorageObjectAdmin
) - Administrateur de Compute (
roles/compute.admin
)
- Administrateur de l'environnement et des objets Storage (
Activer les API pour votre projet.
Activez l'API Cloud Composer
Créer l'environnement Cloud Composer
Créez un environnement Cloud Composer 2.
Dans le cadre de la création de l'environnement, vous attribuez le rôle Extension d'agent de service de l'API Cloud Composer v2 (roles/composer.ServiceAgentV2Ext
) au compte d'agent de service Composer. Cloud Composer utilise ce compte pour effectuer des opérations dans votre projet Google Cloud.
Exemple: Dysfonctionnement du planificateur et échec des tâches en raison de problèmes de planification des tâches
Cet exemple illustre le débogage des dysfonctionnements et de la latence du programmeur causés par un haut niveau de simultanéité des tâches.
Importer l'exemple de DAG dans votre environnement
Importez l'exemple de DAG suivant dans l'environnement que vous avez créé aux étapes précédentes. Dans ce tutoriel, ce DAG est nommé dag_10_tasks_200_seconds_1
.
Ce DAG comporte 200 tâches. Chaque tâche attend 1 seconde et affiche le message "Complete!" (Terminé). Le DAG se déclenche automatiquement une fois importé. Cloud Composer exécute ce DAG 10 fois, et toutes les exécutions du DAG ont lieu en parallèle.
import time
from datetime import datetime, timedelta
from airflow import DAG
from airflow.decorators import task
tasks_amount = 200
seconds = 1
minutes = 5
with DAG(
dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}",
start_date=datetime(2023, 11, 22, 20, 0),
end_date=datetime(2023, 11, 22, 20, 49),
schedule_interval=timedelta(minutes=minutes),
catchup=True,
) as dag:
@task
def create_subtasks(seconds: int) -> None:
time.sleep(seconds)
for i in range(tasks_amount):
create_subtasks(seconds)
Diagnostiquer les dysfonctionnements du planificateur et des tâches
Une fois le DAG exécuté, ouvrez l'interface utilisateur d'Airflow, puis cliquez sur le DAG dag_10_tasks_200_seconds_1
. Vous constaterez que 10 exécutions du DAG au total ont abouti, et que chacune contient 200 tâches ayant abouti.
Consultez les journaux des tâches Airflow:
Dans la console Google Cloud, accédez à la page Environnements.
Dans la liste des environnements, cliquez sur le nom de votre environnement. La page Détails de l'environnement s'ouvre.
Accédez à l'onglet Journaux, puis à Tous les journaux > Journaux Airflow > Nœuds de calcul > Afficher dans l'explorateur de journaux.
Dans l'histogramme des journaux, les erreurs et les avertissements sont indiqués en rouge et en orange:
L'exemple de DAG a généré environ 130 avertissements et 60 erreurs. Cliquez sur n'importe quelle colonne contenant des barres jaunes et rouges. Certains des avertissements et erreurs suivants s'afficheront dans les journaux:
State of this instance has been externally set to success. Terminating
instance.
Received SIGTERM. Terminating subprocesses.
worker: Warm shutdown (MainProcess).
Ces journaux peuvent indiquer que l'utilisation des ressources a dépassé les limites et que le nœud de calcul a redémarré lui-même.
Si une tâche Airflow est conservée dans la file d'attente trop longtemps, le programmeur la marque comme ayant échoué et up_for_retry, puis la replanifiera pour son exécution. Une façon d'observer les symptômes de cette situation consiste à examiner le graphique avec le nombre de tâches en file d'attente. Si les pics de ce graphique ne baissent pas au bout de 10 minutes environ, il y aura probablement des échecs de tâches (sans journaux).
Examinez les informations de surveillance:
Accédez à l'onglet Surveillance, puis sélectionnez Aperçu.
Examinez le graphique Tâches Airflow.
Dans le graphique des tâches Airflow, il y a un pic de tâches en file d'attente qui dure plus de 10 minutes, ce qui peut signifier que votre environnement n'a pas suffisamment de ressources pour traiter toutes les tâches planifiées.
Examinez le graphique Active workers (Nœuds de calcul actifs) :
Le graphique Nœuds de calcul actifs indique que le DAG a déclenché l'autoscaling jusqu'à la limite maximale autorisée de trois nœuds de calcul lors de l'exécution du DAG.
Les graphiques d'utilisation des ressources peuvent indiquer le manque de capacité des nœuds de calcul Airflow à exécuter les tâches en file d'attente. Dans l'onglet Surveillance, sélectionnez Nœuds de calcul, puis consultez les graphiques Utilisation totale du processeur par les nœuds de calcul et Utilisation totale de la mémoire des nœuds de calcul.
Les graphiques indiquent que l'exécution simultanée d'un trop grand nombre de tâches a entraîné l'atteinte de la limite du processeur. Les ressources avaient été utilisées pendant plus de 30 minutes, ce qui est encore plus que la durée totale de 200 tâches dans 10 exécutions de DAG l'une après l'autre.
Ce sont des indicateurs indiquant que la file d'attente est remplie et que les ressources sont insuffisantes pour traiter toutes les tâches planifiées.
Consolider vos tâches
Le code actuel crée de nombreux DAG et tâches sans ressources suffisantes pour traiter toutes les tâches en parallèle, ce qui entraîne le remplissage de la file d'attente. Conserver des tâches dans la file d'attente trop longtemps peut entraîner la replanification ou l'échec des tâches. Dans ce cas, opter pour un plus petit nombre de tâches plus consolidées.
L'exemple de DAG suivant fait passer le nombre de tâches de l'exemple initial de 200 à 20 et augmente le temps d'attente de 1 à 10 secondes afin d'imiter des tâches plus consolidées effectuant le même volume de travail.
Importez l'exemple de DAG suivant dans l'environnement que vous avez créé. Dans ce tutoriel, ce DAG est nommé dag_10_tasks_20_seconds_10
.
import time
from datetime import datetime, timedelta
from airflow import DAG
from airflow.decorators import task
tasks_amount = 20
seconds = 10
minutes = 5
with DAG(
dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}",
start_date=datetime(2021, 12, 22, 20, 0),
end_date=datetime(2021, 12, 22, 20, 49),
schedule_interval=timedelta(minutes=minutes),
catchup=True,
) as dag:
@task
def create_subtasks(seconds: int) -> None:
time.sleep(seconds)
for i in range(tasks_amount):
create_subtasks(seconds)
Évaluez l'impact de tâches plus consolidées sur les processus de planification:
Attendez la fin de l'exécution du DAG.
Dans l'interface utilisateur d'Airflow, sur la page DAG, cliquez sur le DAG
dag_10_tasks_20_seconds_10
. 10 exécutions du DAG s'affichent, chacune comportant 20 tâches réussies.Dans la console Google Cloud, accédez à la page Environnements.
Dans la liste des environnements, cliquez sur le nom de votre environnement. La page Détails de l'environnement s'ouvre.
Accédez à l'onglet Journaux, puis à Tous les journaux > Journaux Airflow > Nœuds de calcul > Afficher dans l'explorateur de journaux.
Le deuxième exemple, qui comportait des tâches plus consolidées, a généré environ 10 avertissements et 7 erreurs. Sur l'histogramme, vous pouvez comparer le nombre d'erreurs et d'avertissements de l'exemple initial (valeurs précédentes) et du deuxième exemple (valeurs suivantes).
En comparant le premier exemple au premier exemple, vous pouvez constater que le deuxième exemple comporte beaucoup moins d'erreurs et d'avertissements. Toutefois, les mêmes erreurs liées à l'arrêt à chaud apparaissent toujours dans les journaux en raison de la surcharge des ressources.
Dans l'onglet Surveillance, sélectionnez Nœuds de calcul et examinez les graphiques.
Lorsque vous comparez le graphique Tâches Airflow du premier exemple (valeurs précédentes) au graphique du deuxième exemple avec des tâches plus consolidées, vous pouvez constater que le pic des tâches en file d'attente a duré moins longtemps, lorsque les tâches ont été plus consolidées. Cependant, elle a duré près de 10 minutes, ce qui n'est toujours pas optimal.
Sur le graphique "Nœuds de calcul actifs", vous pouvez voir que le premier exemple (à gauche du graphique) a utilisé des ressources pendant une période beaucoup plus longue que le second, même si les deux exemples imitent le même volume de travail.
Examinez les graphiques de consommation des ressources de nœud de calcul. Bien que la différence entre les ressources utilisées dans l'exemple avec des tâches plus consolidées et l'exemple initial soit assez importante, l'utilisation du processeur atteint toujours 70% de la limite.
Répartir les tâches plus uniformément dans le temps
Un trop grand nombre de tâches simultanées entraîne le remplissage de la file d'attente, ce qui entraîne le blocage ou la replanification des tâches. Au cours des étapes précédentes, vous avez réduit le nombre de tâches en regroupant ces tâches. Toutefois, les journaux de sortie et la surveillance ont indiqué que le nombre de tâches simultanées n'était toujours pas optimal.
Vous pouvez contrôler le nombre d'exécutions simultanées de tâches en implémentant une planification ou en définissant des limites pour le nombre de tâches pouvant être exécutées simultanément.
Dans ce tutoriel, vous allez répartir les tâches de manière plus uniforme dans le temps en ajoutant des paramètres au niveau du DAG dans le DAG dag_10_tasks_20_seconds_10
:
Ajout de l'argument
max_active_runs=1
au gestionnaire de contexte des DAG. Cet argument définit une limite d'une seule instance d'exécution du DAG à un moment donné.Ajout de l'argument
max_active_tasks=5
au gestionnaire de contexte des DAG. Cet argument contrôle le nombre maximal d'instances de tâche pouvant être exécutées simultanément dans chaque DAG.
Importez l'exemple de DAG suivant dans l'environnement que vous avez créé. Dans ce tutoriel, ce DAG est nommé dag_10_tasks_20_seconds_10_scheduled.py
.
import time
from datetime import datetime, timedelta
from airflow import DAG
from airflow.decorators import task
tasks_amount = 20
seconds = 10
minutes = 5
active_runs = 1
active_tasks = 5
with DAG(
dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}_runs_{active_runs}_tasks_{active_tasks}",
start_date=datetime(2021, 12, 22, 20, 0),
end_date=datetime(2021, 12, 22, 20, 49),
schedule_interval=timedelta(minutes=minutes),
max_active_runs=active_runs,
max_active_tasks=active_tasks,
catchup=True,
) as dag:
@task
def create_subtasks(seconds: int) -> None:
time.sleep(seconds)
for i in range(tasks_amount):
create_subtasks(seconds)
Évaluez l'impact de la répartition des tâches au fil du temps sur les processus de planification:
Attendez la fin de l'exécution du DAG.
Dans la console Google Cloud, accédez à la page Environnements.
Dans la liste des environnements, cliquez sur le nom de votre environnement. La page Détails de l'environnement s'ouvre.
Accédez à l'onglet Journaux, puis à Tous les journaux > Journaux Airflow > Nœuds de calcul > Afficher dans l'explorateur de journaux.
Sur l'histogramme, vous pouvez constater que le troisième DAG avec un nombre limité de tâches et d'exécutions actives n'a généré aucun avertissement ni erreur, et que la distribution des journaux est plus homogène par rapport aux valeurs précédentes.
Les tâches de l'exemple dag_10_tasks_20_seconds_10_scheduled
, qui comportent un nombre limité de tâches actives et d'exécutions, n'entraînent pas de pression sur les ressources, car elles étaient mises en file d'attente de manière uniforme.
Après avoir suivi les étapes décrites, vous avez optimisé l'utilisation des ressources en consolidant les petites tâches et en les répartissant de manière plus uniforme dans le temps.
Configurations d'environnement Optimize
Vous pouvez ajuster les configurations de votre environnement pour vous assurer que les nœuds de calcul Airflow ont toujours la capacité d'exécuter des tâches en file d'attente.
Nombre de nœuds de calcul et simultanéité de nœuds de calcul
Vous pouvez ajuster le nombre maximal de nœuds de calcul pour que Cloud Composer effectue automatiquement le scaling de votre environnement dans les limites définies.
Le paramètre [celery]worker_concurrency
définit le nombre maximal de tâches qu'un seul nœud de calcul peut récupérer dans la file d'attente de tâches. La modification de ce paramètre ajuste le nombre de tâches qu'un seul nœud de calcul peut exécuter en même temps.
Vous pouvez modifier cette option de configuration Airflow en la remplaçant. Par défaut, la simultanéité des nœuds de calcul est définie sur au minimum l'une des valeurs suivantes: 32, 12 * worker_CPU, 8 * worker_memory
, ce qui signifie qu'elle dépend des limites des ressources des nœuds de calcul. Pour en savoir plus sur les valeurs de simultanéité des nœuds de calcul par défaut, consultez la section Optimiser les environnements.
Le nombre de nœuds de calcul et leur simultanéité fonctionnent conjointement, et les performances de votre environnement dépendent fortement des deux paramètres. Vous pouvez prendre en compte les éléments suivants pour choisir la bonne combinaison:
Plusieurs tâches rapides exécutées en parallèle. Vous pouvez augmenter la simultanéité des nœuds de calcul lorsque des tâches sont en attente dans la file d'attente et qu'ils utilisent un faible pourcentage de processeurs et de mémoire en même temps. Toutefois, dans certaines circonstances, la file d'attente peut ne jamais se remplir, ce qui empêche l'autoscaling de se déclencher. Si l'exécution des petites tâches se termine avant que les nouveaux nœuds de calcul soient prêts, un nœud de calcul existant peut récupérer les tâches restantes, et il n'y aura plus aucune tâche pour les nouveaux nœuds de calcul.
Dans ces situations, il est recommandé d'augmenter le nombre minimal de nœuds de calcul et d'augmenter la simultanéité des nœuds de calcul pour éviter un scaling excessif.
Plusieurs tâches longues exécutées en parallèle. La simultanéité élevée des nœuds de calcul empêche le système de faire évoluer le nombre de nœuds de calcul. Si plusieurs tâches consomment beaucoup de ressources et prennent beaucoup de temps, une simultanéité de nœud de calcul élevée peut empêcher le remplissage de la file d'attente et empêcher la récupération de toutes les tâches par un seul nœud de calcul, ce qui entraîne des problèmes de performances. Dans ces situations, il est recommandé d'augmenter le nombre maximal de nœuds de calcul et de réduire leur simultanéité.
L'importance du parallélisme
Les programmeurs Airflow contrôlent la planification des exécutions de DAG et des tâches individuelles à partir des DAG. L'option de configuration Airflow [core]parallelism
contrôle le nombre de tâches que le programmeur Airflow peut mettre en file d'attente dans la file d'attente de l'exécuteur une fois que toutes les dépendances pour ces tâches sont remplies.
Le parallélisme est un mécanisme de protection d'Airflow qui détermine le nombre de tâches pouvant être exécutées simultanément pour chaque programmeur, quel que soit le nombre de nœuds de calcul. La valeur de parallélisme, multipliée par le nombre de programmeurs dans votre cluster, correspond au nombre maximal d'instances de tâche que votre environnement peut mettre en file d'attente.
En général, [core]parallelism
est défini comme le produit d'un nombre maximal de nœuds de calcul et de [celery]worker_concurrency
. Il est également affecté par le pool.
Vous pouvez modifier cette option de configuration Airflow en la remplaçant. Pour en savoir plus sur l'ajustement des configurations Airflow liées au scaling, consultez la page Effectuer le scaling de la configuration Airflow.
Trouver les configurations d'environnement optimales
La méthode recommandée pour résoudre les problèmes de planification consiste à regrouper les petites tâches en tâches plus importantes et à les répartir de manière plus uniforme au fil du temps. En plus d'optimiser le code du DAG, vous pouvez également optimiser les configurations d'environnement afin de disposer d'une capacité suffisante pour exécuter plusieurs tâches simultanément.
Par exemple, supposons que vous consolidez les tâches dans votre DAG autant que possible, mais que limiter les tâches actives pour les répartir plus uniformément au fil du temps n'est pas la solution préférée pour votre cas d'utilisation spécifique.
Vous pouvez ajuster les paramètres de parallélisme, du nombre de nœuds de calcul et de simultanéité des nœuds de calcul pour exécuter le DAG dag_10_tasks_20_seconds_10
sans limiter les tâches actives. Dans cet exemple, le DAG s'exécute 10 fois, et chaque exécution contient 20 petites tâches.
Si vous souhaitez les exécuter simultanément, procédez comme suit:
Vous aurez besoin d'une taille d'environnement plus importante, car elle contrôle les paramètres de performance de l'infrastructure Cloud Composer gérée de votre environnement.
Les nœuds de calcul Airflow doivent pouvoir exécuter 20 tâches simultanément, ce qui signifie que vous devez définir la simultanéité des nœuds de calcul sur 20.
Les nœuds de calcul ont besoin de suffisamment de processeurs et de mémoire pour gérer toutes les tâches. La simultanéité des nœuds de calcul est affectée par le processeur et la mémoire des nœuds de calcul. Vous aurez donc besoin d'au moins
worker_concurrency / 12
en processeur et deleast worker_concurrency / 8
en mémoire.Vous devez augmenter le parallélisme pour correspondre à la simultanéité supérieure des nœuds de calcul. Pour que les nœuds de calcul puissent récupérer 20 tâches dans la file d'attente, le programmeur doit d'abord les planifier.
Ajustez les configurations de votre environnement de la manière suivante:
Dans la console Google Cloud, accédez à la page Environnements.
Dans la liste des environnements, cliquez sur le nom de votre environnement. La page Détails de l'environnement s'ouvre.
Accédez à l'onglet Configuration de l'environnement.
Recherchez la configuration Ressources > Charges de travail, puis cliquez sur Modifier.
Dans le champ Mémoire de la section Nœud de calcul, spécifiez la nouvelle limite de mémoire pour les nœuds de calcul Airflow. Dans ce tutoriel, utilisez 4 Go.
Dans le champ Processeur, spécifiez la nouvelle limite de processeur pour les nœuds de calcul Airflow. Dans ce tutoriel, utilisez deux processeurs virtuels.
Enregistrez les modifications et patientez quelques minutes pour que vos nœuds de calcul Airflow redémarrent.
Ensuite, remplacez les options de configuration Airflow de parallélisme et de simultanéité des nœuds de calcul:
Accédez à l'onglet Remplacements de configuration Airflow.
Cliquez sur Modifier, puis sur Ajouter un remplacement de configuration Airflow.
Ignorer la configuration du parralélisme:
Section Clé Value (Valeur) core
parallelism
20
Cliquez sur Ajouter un remplacement de configuration Airflow et remplacez la configuration de simultanéité des nœuds de calcul:
Section Clé Value (Valeur) celery
worker_concurrency
20
Cliquez sur Enregistrer et attendez que la configuration de l'environnement soit mise à jour.
Déclenchez à nouveau le même exemple de DAG avec les configurations ajustées:
Dans l'interface utilisateur d'Airflow, accédez à la page DAG.
Recherchez le DAG
dag_10_tasks_20_seconds_10
, puis supprimez-le.Une fois le DAG supprimé, Airflow vérifie le dossier des DAG dans le bucket de votre environnement, puis réexécute automatiquement le DAG.
Une fois les exécutions du DAG terminées, examinez à nouveau l'histogramme des journaux. Sur le schéma, vous pouvez voir que l'exemple dag_10_tasks_20_seconds_10
avec des tâches plus consolidées n'a généré aucune erreur ni aucun avertissement lors de l'exécution avec la configuration de l'environnement ajustée. Comparez les résultats aux données précédentes du schéma, où le même exemple a généré des erreurs et des avertissements lors de l'exécution avec la configuration de l'environnement par défaut.
Les configurations d'environnement et Airflow jouent un rôle crucial dans la planification des tâches. Toutefois, il n'est pas possible d'augmenter le nombre de configurations au-delà de certaines limites.
Nous vous recommandons d'optimiser le code du DAG, de consolider les tâches et d'utiliser la planification pour optimiser les performances et l'efficacité.
Exemple: erreurs d'analyse du DAG et latence en raison du code complexe du DAG
Dans cet exemple, vous allez examiner la latence d'analyse d'un exemple de DAG imitant un excès de variables Airflow.
Créer une variable Airflow
Avant d'importer l'exemple de code, créez une variable Airflow.
Dans la console Google Cloud, accédez à la page Environnements.
Dans la colonne Serveur Web Airflow, cliquez sur le lien Airflow correspondant à votre environnement.
Accédez à Admin > Variables > Ajouter un nouvel enregistrement.
Définissez les valeurs suivantes :
- clé:
example_var
- val:
test_airflow_variable
- clé:
Importer l'exemple de DAG dans votre environnement
Importez l'exemple de DAG suivant dans l'environnement que vous avez créé aux étapes précédentes. Dans ce tutoriel, ce DAG est nommé dag_for_loop_airflow_variable
.
Ce DAG contient une boucle For qui s'exécute 1 000 fois et imite un excès de variables Airflow. Chaque itération lit la variable example_var
et génère une tâche. Chaque tâche contient une commande qui affiche la valeur de la variable.
from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.models import Variable
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 2, 17),
'retries': 0
}
dag = DAG(
'dag_for_loop_airflow_variable',
default_args=default_args,
catchup=False,
schedule_interval="@daily"
)
for i in range(1000):
a = Variable.get('example_var', 'N/A')
task = BashOperator(
task_id=f'task_{i}',
bash_command='echo variable foo=${foo_env}',
dag=dag,
env={'foo_env': a}
)
Diagnostiquer les problèmes d'analyse
La durée d'analyse du DAG correspond au temps nécessaire au programmeur Airflow pour lire un fichier DAG et l'analyser. Avant que le programmeur Airflow puisse programmer une tâche à partir d'un DAG, il doit analyser le fichier DAG pour découvrir la structure de ce DAG et les tâches définies.
Si l'analyse d'un DAG prend beaucoup de temps, cela consomme la capacité du programmeur et peut réduire les performances d'exécution du DAG.
Pour surveiller la durée d'analyse du DAG, procédez comme suit:
Exécutez la commande CLI Airflow
dags report
dans gcloud CLI pour afficher la durée d'analyse de tous vos DAG:gcloud composer environments run ENVIRONMENT_NAME \ --location LOCATION \ dags report
Remplacez les éléments suivants :
ENVIRONMENT_NAME
: nom de votre environnementLOCATION
: région dans laquelle se trouve l'environnement.
Dans le résultat de la commande, recherchez la valeur de durée du DAG
dag_for_loop_airflow_variables
. Une valeur élevée peut indiquer que ce DAG n'est pas implémenté de manière optimale. Si vous avez plusieurs DAG, vous pouvez identifier ceux qui ont une longue durée d'analyse à partir de la table de sortie.Exemple :
file | duration | dag_num | task_num | dags ====================+================+=========+==========+===================== /dag_for_loop_airfl | 0:00:14.773594 | 1 | 1000 | dag_for_loop_airflow ow_variable.py | | | | _variable /airflow_monitoring | 0:00:00.003035 | 1 | 1 | airflow_monitoring .py
Inspectez les durées d'analyse des DAG dans la console Google Cloud:
- Dans la console Google Cloud, accédez à la page Environnements.
Dans la liste des environnements, cliquez sur le nom de votre environnement. La page Détails de l'environnement s'ouvre.
Accédez à l'onglet Journaux, puis à Tous les journaux > Gestionnaire de processeur DAG.
Examinez les journaux
dag-processor-manager
et identifiez les problèmes potentiels.
Si la durée totale d'analyse des DAG dépasse environ 10 secondes, vos programmeurs peuvent être surchargés d'analyse des DAG et ne peuvent pas exécuter les DAG efficacement.
Optimiser le code du DAG
Il est recommandé d'éviter l'utilisation inutile de code Python de "niveau supérieur" dans vos DAG. Les DAG comportant de nombreuses importations, variables et fonctions en dehors du DAG augmentent les temps d'analyse du programmeur Airflow. Cela réduit les performances et l'évolutivité de Cloud Composer et Airflow. Si les variables Airflow sont lues trop, la durée d'analyse est longue et la charge de la base de données est élevée. Si ce code se trouve dans un fichier DAG, ces fonctions s'exécutent à chaque pulsation du programmeur, ce qui peut être lent.
Les champs de modèle Airflow vous permettent d'incorporer les valeurs des variables Airflow et des modèles Jinja dans vos DAG. Cela évite l'exécution inutile de fonctions pendant les pulsations du programmeur.
Pour mieux implémenter l'exemple de DAG, évitez d'utiliser des variables Airflow au niveau du code Python de premier niveau des DAG. Transmettez plutôt des variables Airflow aux opérateurs existants via un modèle Jinja, ce qui retardera la lecture de la valeur jusqu'à l'exécution de la tâche.
Importez la nouvelle version de l'exemple de DAG dans votre environnement. Dans ce tutoriel, ce DAG est nommé dag_for_loop_airflow_variable_optimized
.
from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 2, 17),
'retries': 0
}
dag = DAG(
'dag_for_loop_airflow_variable_optimized',
default_args=default_args,
catchup=False,
schedule_interval='@daily'
)
for i in range(1000):
task = BashOperator(
task_id=f'bash_use_variable_good_{i}',
bash_command='echo variable foo=${foo_env}',
dag=dag,
env={'foo_env': '{{ var.value.get("example_var") }}'},
)
Inspectez la durée d'analyse du nouveau DAG:
Attendez la fin de l'exécution du DAG.
Exécutez à nouveau la commande
dags report
pour afficher la durée d'analyse de tous vos DAG:file | duration | dag_num | task_num | dags ====================+================+=========+==========+===================== /dag_for_loop_airfl | 0:00:37.000369 | 1 | 1000 | dag_for_loop_airflow ow_variable.py | | | | _variable /dag_for_loop_airfl | 0:00:01.109457 | 1 | 1000 | dag_for_loop_airflow ow_variable_optimiz | | | | _variable_optimized ed.py | | | | /airflow_monitoring | 0:00:00.040510 | 1 | 1 | airflow_monitoring .py | | | |
Examinez à nouveau les journaux
dag-processor-manager
et analysez la durée d'analyse.
En remplaçant les variables d'environnement par des modèles Airflow, vous avez simplifié le code du DAG et réduit la latence d'analyse d'environ 10 fois.
Optimiser les configurations de l'environnement Airflow
Le programmeur Airflow tente en permanence de déclencher de nouvelles tâches et analyse tous les DAG du bucket d'environnement. Si vos DAG ont une longue durée d'analyse et que le programmeur consomme beaucoup de ressources, vous pouvez optimiser les configurations du programmeur Airflow afin que celui-ci utilise les ressources plus efficacement.
Dans ce tutoriel, l'analyse des fichiers DAG prend beaucoup de temps. Les cycles d'analyse commencent à se chevaucher, ce qui épuise la capacité du programmeur. Dans notre exemple, l'analyse du premier exemple de DAG prend plus de cinq secondes. Vous allez donc configurer le programmeur pour qu'il s'exécute moins fréquemment afin d'utiliser les ressources plus efficacement. Vous allez ignorer l'option de configuration Airflow scheduler_heartbeat_sec
. Cette configuration définit la fréquence d'exécution du programmeur (en secondes). Par défaut, la valeur est définie sur 5 secondes.
Vous pouvez modifier cette option de configuration Airflow en la remplaçant.
Ignorez l'option de configuration Airflow scheduler_heartbeat_sec
:
Dans la console Google Cloud, accédez à la page Environnements.
Dans la liste des environnements, cliquez sur le nom de votre environnement. La page Détails de l'environnement s'ouvre.
Accédez à l'onglet Remplacements de configuration Airflow.
Cliquez sur Modifier, puis sur Ajouter un remplacement de configuration Airflow.
Ignorez l'option de configuration Airflow:
Section Clé Value (Valeur) scheduler
scheduler_heartbeat_sec
10
Cliquez sur Enregistrer et attendez que la configuration de l'environnement soit mise à jour.
Vérifiez les métriques du programmeur:
Accédez à l'onglet Surveillance, puis sélectionnez Programmeurs.
Dans le graphique Fréquence cardiaque du programmeur, cliquez sur le bouton Plus d'options (trois points), puis sur Afficher dans l'explorateur de métriques.
Sur le graphique, vous constaterez que le planificateur s'exécute deux fois moins fréquemment après que vous avez modifié la configuration par défaut de 5 secondes à 10 secondes. En réduisant la fréquence des pulsations, vous vous assurez que le programmeur ne démarre pas tant que le cycle d'analyse précédent est en cours et que sa capacité de ressources n'est pas épuisée.
Attribuer plus de ressources au programmeur
Dans Cloud Composer 2, vous pouvez allouer davantage de ressources de processeur et de mémoire au programmeur. De cette manière, vous pouvez augmenter les performances de votre programmeur et réduire le temps d'analyse de votre DAG.
Allouez du processeur et de la mémoire supplémentaires au programmeur:
Dans la console Google Cloud, accédez à la page Environnements.
Dans la liste des environnements, cliquez sur le nom de votre environnement. La page Détails de l'environnement s'ouvre.
Accédez à l'onglet Configuration de l'environnement.
Recherchez la configuration Ressources > Charges de travail, puis cliquez sur Modifier.
Dans le champ Memory (Mémoire) de la section Scheduler (Programmeur), spécifiez la nouvelle limite de mémoire. Dans ce tutoriel, utilisez 4 Go.
Dans le champ CPU (Processeur), spécifiez la nouvelle limite de processeur. Dans ce tutoriel, utilisez deux processeurs virtuels.
Enregistrez les modifications et patientez quelques minutes pour que vos programmeurs Airflow puissent redémarrer.
Accédez à l'onglet Journaux, puis à Tous les journaux > Gestionnaire de processeur DAG.
Consultez les journaux
dag-processor-manager
et comparez la durée d'analyse des exemples de DAG:
En attribuant davantage de ressources au programmeur, vous avez augmenté sa capacité et réduit la latence d'analyse de manière significative par rapport aux configurations de l'environnement par défaut. Avec davantage de ressources, le programmeur peut analyser les DAG plus rapidement, mais les coûts associés aux ressources Cloud Composer augmentent également. De plus, il n'est pas possible d'augmenter les ressources au-delà d'une certaine limite.
Nous vous recommandons de n'allouer des ressources qu'après avoir implémenté le code du DAG et les optimisations de configuration Airflow possibles.
Effectuer un nettoyage
Pour éviter que les ressources utilisées dans ce tutoriel soient facturées sur votre compte Google Cloud, supprimez le projet contenant les ressources, ou conservez le projet et supprimez les ressources individuelles.
Supprimer le projet
- Dans la console Google Cloud, accédez à la page Gérer les ressources.
- Dans la liste des projets, sélectionnez le projet que vous souhaitez supprimer, puis cliquez sur Supprimer.
- Dans la boîte de dialogue, saisissez l'ID du projet, puis cliquez sur Arrêter pour supprimer le projet.
Supprimer des ressources individuelles
Si vous envisagez d'explorer plusieurs tutoriels et guides de démarrage rapide, réutiliser des projets peut vous aider à ne pas dépasser les limites de quotas des projets.
Supprimez l'environnement Cloud Composer. Vous allez également supprimer le bucket de l'environnement au cours de cette procédure.
Étapes suivantes
- Optimiser les performances et les coûts de votre environnement
- Faire évoluer des environnements
- Découvrez comment résoudre les problèmes liés aux DAG.