Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
Ce tutoriel vous guide dans le diagnostic et le dépannage de la planification des tâches et d'analyse des problèmes qui conduisent à un dysfonctionnement du planificateur, des erreurs d'analyse et la latence et l'échec d'une tâche.
Présentation
Le programmeur Airflow est principalement affecté par deux facteurs: la planification des tâches et Analyse du DAG Les problèmes liés à l'un de ces facteurs peuvent avoir un impact négatif l'état et les performances de l'environnement.
Parfois, trop de tâches sont planifiées simultanément. Dans ce cas, file d'attente est pleine et les tâches restent dans l'état « planifié ». état ou devenir reprogrammés après avoir été mis en file d'attente, ce qui peut entraîner un échec de la tâche et des performances la latence.
Un autre problème courant concerne l'analyse de la latence et des erreurs causées par la complexité un code DAG. Par exemple, un code DAG contenant des variables Airflow au début du code peut entraîner des retards d'analyse, une surcharge de la base de données, et les délais avant expiration des DAG.
Dans ce tutoriel, vous allez diagnostiquer les exemples de DAG et apprendre à résoudre les problèmes de planification et d'analyse, à améliorer la planification des DAG et à optimiser le code et les configurations d'environnement de vos DAG pour améliorer les performances.
Objectifs
Cette section liste les objectifs des exemples de ce tutoriel.
Exemple : Dysfonctionnement du planificateur et latence causés par une forte concurrence de tâches
Importez l'exemple de DAG qui s'exécute plusieurs fois simultanément et diagnostiquez le dysfonctionnement du planificateur et les problèmes de latence avec Cloud Monitoring.
Optimisez le code de votre DAG en consolidant les tâches et en évaluant impact sur les performances.
Répartissez les tâches de manière plus uniforme au fil du temps et évaluez l'impact sur les performances.
optimiser vos configurations Airflow et votre environnement, évaluer l'impact.
Exemple : erreurs d'analyse DAG et latence causées par un code complexe
Importer l'exemple de DAG contenant les variables Airflow et diagnostiquer l'analyse les problèmes liés à Cloud Monitoring.
Optimiser le code du DAG en évitant les variables Airflow au niveau supérieur le code et évaluer son impact sur le temps d'analyse.
Optimisez les configurations Airflow et les configurations d'environnement, puis évaluez l'impact sur le temps d'analyse.
Coûts
Ce tutoriel utilise les composants facturables suivants de Google Cloud :
- Cloud Composer (voir les frais supplémentaires)
- Cloud Monitoring
Une fois que vous avez terminé ce tutoriel, évitez de continuer à payer des frais en supprimant les ressources que vous avez créées. Pour en savoir plus, consultez Effectuer un nettoyage.
Avant de commencer
Cette section décrit les actions requises avant de commencer le tutoriel.
Créer et configurer un projet
Pour ce tutoriel, vous avez besoin d'un projet Google Cloud. Configurez le projet comme suit :
Dans la console Google Cloud, sélectionnez ou créez un projet:
Assurez-vous que la facturation est activée pour votre projet. Découvrez comment vérifier si la facturation est activée sur un projet.
Assurez-vous que l'utilisateur de votre projet Google Cloud dispose des rôles suivants pour créer les ressources nécessaires :
- Administrateur de l'environnement et des objets Storage (
roles/composer.environmentAndStorageObjectAdmin
) - Administrateur de Compute (
roles/compute.admin
)
- Administrateur de l'environnement et des objets Storage (
Activer les API pour votre projet.
Enable the Cloud Composer API.
Créer votre environnement Cloud Composer
Créez un environnement Cloud Composer 2.
Pour créer l'environnement,
vous accordez l'extension d'agent de service de l'API Cloud Composer v2
(roles/composer.ServiceAgentV2Ext
) à l'agent de service Composer
de service. Cloud Composer utilise ce compte pour effectuer des opérations
dans votre projet Google Cloud.
Exemple : Défaillance du planificateur et échec de la tâche en raison de problèmes de planification
Cet exemple illustre le dysfonctionnement du programmeur et la latence causés par une simultanéité élevée des tâches.
Importer l'exemple de DAG dans votre environnement
Importez l'exemple de DAG suivant dans l'environnement
que vous avez créées
aux étapes précédentes. Dans ce tutoriel, ce DAG est nommé
dag_10_tasks_200_seconds_1
Ce DAG comporte 200 tâches. Chaque tâche attend une seconde et affiche "Terminé". Le DAG se déclenche automatiquement une fois l'importation terminée. Cloud Composer exécute ce DAG 10 fois, et toutes les exécutions du DAG ont lieu en parallèle.
import time
from datetime import datetime, timedelta
from airflow import DAG
from airflow.decorators import task
tasks_amount = 200
seconds = 1
minutes = 5
with DAG(
dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}",
start_date=datetime(2023, 11, 22, 20, 0),
end_date=datetime(2023, 11, 22, 20, 49),
schedule_interval=timedelta(minutes=minutes),
catchup=True,
) as dag:
@task
def create_subtasks(seconds: int) -> None:
time.sleep(seconds)
for i in range(tasks_amount):
create_subtasks(seconds)
Diagnostiquer les problèmes de dysfonctionnement du planificateur et d'échec des tâches
Une fois l'exécution du DAG terminée, ouvrez l'interface utilisateur d'Airflow, puis cliquez sur le bouton
dag_10_tasks_200_seconds_1
DAG. Vous verrez que 10 exécutions de DAG au total ont été
réussies, et chacune comporte 200 tâches réussies.
Consultez les journaux des tâches Airflow :
Dans la console Google Cloud, accédez à la page Environnements.
Dans la liste des environnements, cliquez sur le nom de votre environnement. La page Détails de l'environnement s'ouvre.
Accédez à l'onglet Journaux, puis à Tous les journaux > Journaux Airflow > Nœuds de calcul > Afficher dans l'explorateur de journaux.
L'histogramme des journaux affiche les erreurs et les avertissements en rouge et orange:
L'exemple de DAG a généré environ 130 avertissements et 60 erreurs. Cliquez n'importe où une colonne qui contient des barres jaunes et rouges. Certaines des avertissements et erreurs suivants s'affichent dans les journaux :
State of this instance has been externally set to success. Terminating
instance.
Received SIGTERM. Terminating subprocesses.
worker: Warm shutdown (MainProcess).
Ces journaux peuvent indiquer que l'utilisation des ressources a dépassé les limites le nœud de calcul a redémarré lui-même.
Si une tâche Airflow est conservée dans la file d'attente trop longtemps, le planificateur marque en échec et en up_for_retry, et va replanifier l'opération pour l'exécution. Pour observer les symptômes de cette situation, vous pouvez consulter le graphique du nombre de tâches en file d'attente. Si les pics de ce graphique ne diminuent pas au bout d'environ 10 minutes, il est probable que des tâches échouent (sans journaux).
Examinez les informations de surveillance :
Accédez à l'onglet Monitoring (Surveillance), puis sélectionnez Overview (Aperçu).
Examinez le graphique Tâches Airflow.
Dans le graphique des tâches Airflow, il existe un pic de tâches en file d'attente qui dure plus de 10 minutes, ce qui peut signifier qu'il n'y a pas suffisamment de ressources dans votre environnement pour traiter toutes les tâches planifiées.
Examinez le graphique Nœuds de calcul actifs :
Le graphique Nœuds de calcul actifs indique que le DAG a déclenché l'autoscaling à la limite maximale autorisée de trois nœuds de calcul pendant l'exécution du DAG.
Les graphiques d'utilisation des ressources peuvent indiquer le manque de capacité des nœuds de calcul Airflow. pour exécuter des tâches en file d'attente. Dans l'onglet Surveillance, sélectionnez Nœuds de calcul, puis examinez les graphiques Utilisation totale du processeur des nœuds de calcul et Utilisation totale de la mémoire des nœuds de calcul.
<ph type="x-smartling-placeholder">Les graphiques indiquent que l'exécution simultanée d'un trop grand nombre de tâches a permis d'atteindre la limite du processeur. Les ressources avaient été utilisées minutes, ce qui est encore plus long que la durée totale de 200 tâches en 10 Le DAG s'exécute une par une.
Voici les indicateurs de remplissage de la file d'attente et d'un manque de ressources pour traiter toutes les tâches planifiées.
Consolider vos tâches
Le code actuel crée de nombreux DAG et tâches sans ressources suffisantes pour traiter toutes les tâches en parallèle, ce qui entraîne le remplissage de la file d'attente. Laisser les tâches dans la file d'attente trop longtemps peut entraîner leur reprogrammation ou leur échec. Dans ce cas, nous vous conseillons de limiter le nombre tâches.
L'exemple de DAG suivant modifie le nombre de tâches de l'exemple initial de 200 à 20 et augmente le temps d'attente de 1 à 10 secondes pour imiter des tâches plus consolidées qui effectuent la même quantité de travail.
Importez l'exemple de DAG suivant dans l'environnement que vous avez créé. Dans ce tutoriel, ce DAG est nommé dag_10_tasks_20_seconds_10
.
import time
from datetime import datetime, timedelta
from airflow import DAG
from airflow.decorators import task
tasks_amount = 20
seconds = 10
minutes = 5
with DAG(
dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}",
start_date=datetime(2021, 12, 22, 20, 0),
end_date=datetime(2021, 12, 22, 20, 49),
schedule_interval=timedelta(minutes=minutes),
catchup=True,
) as dag:
@task
def create_subtasks(seconds: int) -> None:
time.sleep(seconds)
for i in range(tasks_amount):
create_subtasks(seconds)
Évaluez l'impact de tâches plus consolidées sur les processus de planification:
Attendez que les exécutions du DAG soient terminées.
Dans l'interface utilisateur d'Airflow, sur la page DAG, cliquez sur le DAG
dag_10_tasks_20_seconds_10
. Vous verrez 10 exécutions de DAG, chacune comptant 20 exécutions des tâches qui ont réussi.Dans la console Google Cloud, accédez à la page Environnements.
Dans la liste des environnements, cliquez sur le nom de votre environnement. La page Détails de l'environnement s'ouvre.
Accédez à l'onglet Journaux, puis à Tous les journaux > Journaux Airflow > Nœuds de calcul > Afficher dans l'explorateur de journaux.
Le deuxième exemple avec des tâches plus consolidées a généré environ 10 avertissements et 7 erreurs. Sur l'histogramme, vous pouvez comparer le nombre erreurs et avertissements dans l'exemple initial (valeurs précédentes), et dans la deuxième exemple (valeurs suivantes).
Lorsque vous comparez le premier exemple avec le plus consolidé, vous pouvez le nombre d'erreurs et d'avertissements au cours de la deuxième à titre d'exemple. Toutefois, les mêmes erreurs liées à un arrêt tiède apparaissent toujours les journaux en raison d'une surcharge de ressources.
Dans l'onglet Surveillance, sélectionnez Nœuds de calcul, puis examinez les graphiques.
Lorsque vous comparez le graphique Tâches Airflow du premier exemple (précédemment ) avec le graphique du deuxième exemple présentant des tâches plus consolidées, vous pouvez constater que le pic de tâches en file d'attente a duré plus longtemps moment où les tâches étaient plus consolidées. Cependant, il durait près de 10 minutes ce qui n'est pas optimal.
Sur le graphique "Nœuds de calcul actifs", vous pouvez voir que le premier exemple (à gauche du graphique) a utilisé des ressources pendant une période beaucoup plus longue que le second, même si les deux exemples imitent la même quantité de travail.
Examinez les graphiques de consommation des ressources des nœuds de calcul. Même si la différence entre les ressources utilisées dans l'exemple avec des tâches plus consolidées et l'exemple initial est assez importante, l'utilisation du processeur atteint toujours 70 % de la limite.
Répartir les tâches de manière plus uniforme au fil du temps
Trop de tâches simultanées entraînent le remplissage de la file d'attente, ce qui conduit à tâches bloquées dans la file d'attente ou reprogrammées. Au cours des étapes précédentes, vous avez a réduit le nombre de tâches en les regroupant, mais les résultats les journaux et la surveillance indiquent que le nombre de tâches simultanées est toujours sous-optimale.
Vous pouvez contrôler le nombre d'exécutions de tâches simultanées en implémentant une planification ou en définissant des limites pour le nombre de tâches pouvant être exécutées simultanément.
Dans ce tutoriel, vous allez répartir les tâches de manière plus uniforme au fil du temps en ajoutant des paramètres au niveau du DAG dans le DAG dag_10_tasks_20_seconds_10
:
Ajout de l'argument
max_active_runs=1
au gestionnaire de contexte du DAG. Cet argument définit la limite d'une seule instance d'une exécution du DAG à un moment donné.Ajoutez l'argument
max_active_tasks=5
au gestionnaire de contexte DAG. Cet argument contrôle le nombre maximal d'instances de tâche pouvant s'exécuter simultanément dans chaque DAG.
Importez l'exemple de DAG suivant dans l'environnement que vous avez créé. Dans ce tutoriel, ce DAG est nommé
dag_10_tasks_20_seconds_10_scheduled.py
import time
from datetime import datetime, timedelta
from airflow import DAG
from airflow.decorators import task
tasks_amount = 20
seconds = 10
minutes = 5
active_runs = 1
active_tasks = 5
with DAG(
dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}_runs_{active_runs}_tasks_{active_tasks}",
start_date=datetime(2021, 12, 22, 20, 0),
end_date=datetime(2021, 12, 22, 20, 49),
schedule_interval=timedelta(minutes=minutes),
max_active_runs=active_runs,
max_active_tasks=active_tasks,
catchup=True,
) as dag:
@task
def create_subtasks(seconds: int) -> None:
time.sleep(seconds)
for i in range(tasks_amount):
create_subtasks(seconds)
Évaluez l'impact de la distribution des tâches au fil du temps sur les processus de planification :
Attendez que les exécutions du DAG soient terminées.
Dans la console Google Cloud, accédez à la page Environnements.
Dans la liste des environnements, cliquez sur le nom de votre environnement. La page Détails de l'environnement s'ouvre.
Accédez à l'onglet Journaux, puis à Tous les journaux > Journaux Airflow > Nœuds de calcul > Afficher dans l'explorateur de journaux.
Sur l'histogramme, vous pouvez voir que le troisième DAG, qui comporte un nombre limité de tâches et d'exécutions actives, n'a généré aucune alerte ni aucune erreur, et que la distribution des journaux semble plus uniforme par rapport aux valeurs précédentes.
Les tâches de l'exemple dag_10_tasks_20_seconds_10_scheduled
qui ont un
un nombre limité de tâches et d'exécutions actives n'a pas entraîné de pression sur les ressources, car
les tâches étaient
mises en file d’attente uniformément.
Après avoir suivi les étapes décrites, vous avez optimisé l'utilisation des ressources en regroupant les petites tâches et en les répartissant plus uniformément au fil du temps.
Optimiser les configurations d'environnement
Vous pouvez ajuster les configurations de votre environnement pour vous assurer que les nœuds de calcul Airflow disposent toujours de la capacité nécessaire pour exécuter les tâches en file d'attente.
Nombre de nœuds de calcul et simultanéité des nœuds de calcul
Vous pouvez ajuster le nombre maximal de nœuds de calcul pour que Cloud Composer effectue un scaling automatique de votre environnement dans les limites définies.
Le paramètre [celery]worker_concurrency
définit le nombre maximal de tâches qu'un seul nœud de calcul peut récupérer dans la file d'attente de tâches. La modification de ce paramètre ajuste le nombre de tâches qu'un seul nœud de calcul peut exécuter simultanément.
Vous pouvez modifier cette option de configuration Airflow en la remplaçant. Par défaut, la simultanéité des nœuds de calcul est définie sur un minimum de 32, 12 * worker_CPU, 8 * worker_memory
, ce qui signifie qu'elle dépend des limites de ressources des nœuds de calcul. Pour en savoir plus sur les valeurs de simultanéité par défaut des workers, consultez la section Optimiser les environnements.
Le nombre de nœuds de calcul et la concurrence entre eux sont combinés, et les performances de votre environnement dépendent fortement de ces deux paramètres. Pour choisir la bonne combinaison, vous pouvez tenir compte des éléments suivants:
Exécution de plusieurs tâches rapides en parallèle Vous pouvez augmenter le nombre lorsque des tâches sont en attente dans la file d'attente et que vos nœuds de calcul utilisent un faible pourcentage de leurs processeurs et de la mémoire en même temps. Toutefois, dans certaines circonstances, la file d'attente peut ne jamais se remplir, ce qui empêche l'autoscaling de se déclencher. Si les petites tâches sont terminées au moment où les nouveaux nœuds de calcul sont prêts, un nœud de calcul existant peut prendre en charge les tâches restantes, et il n'y aura aucune tâche pour les nœuds de calcul nouvellement créés.
Dans ces situations, il est recommandé augmenter le nombre minimal de nœuds de calcul et augmenter leur simultanéité ; pour éviter un scaling excessif.
Plusieurs longues tâches s'exécutant en parallèle. La simultanéité élevée des nœuds de calcul empêche le système de faire évoluer le nombre de nœuds de calcul. Si plusieurs tâches sont gourmandes en ressources et prennent beaucoup de temps à s'exécuter, une simultanéité élevée des nœuds de calcul peut entraîner une file d'attente qui ne se remplit jamais et toutes les tâches sont traitées par un seul nœud de calcul, ce qui entraîne des problèmes de performances. Dans ces situations, il est recommandé de augmenter le nombre maximal de nœuds de calcul et réduire la simultanéité des nœuds de calcul.
L'importance du parallélisme
Les programmeurs Airflow contrôlent la planification des exécutions de DAG et des tâches individuelles
DAG. L'option de configuration Airflow [core]parallelism
contrôle le nombre de tâches que le programmeur Airflow peut mettre en file d'attente dans la file d'attente de l'exécuteur une fois que toutes les dépendances pour ces tâches sont satisfaites.
Le parallélisme est un mécanisme de protection d'Airflow qui détermine le nombre de tâches pouvant être exécutées en même temps par chaque planificateur, quel que soit le nombre de nœuds de calcul. La valeur de parallélisme, multipliée par le nombre de planificateurs de votre cluster, correspond au nombre maximal d'instances de tâche que votre environnement peut mettre en file d'attente.
Généralement, [core]parallelism
est défini comme le produit d'un nombre maximal de nœuds de calcul
et [celery]worker_concurrency
. Elle est également affectée par
pool.
Vous pouvez modifier cette option de configuration Airflow en la remplaçant. Pour en savoir plus sur l'ajustement des configurations Airflow liées à la mise à l'échelle, consultez la section Mettre à l'échelle la configuration Airflow.
Trouver les configurations d'environnement optimales
La méthode recommandée pour résoudre les problèmes de planification consiste à regrouper les petites tâches dans les tâches plus importantes et de répartir les tâches plus uniformément au fil du temps. En plus d'optimiser le code DAG, vous pouvez également optimiser les configurations d'environnement afin de disposer d'une capacité suffisante pour exécuter plusieurs tâches simultanément.
Par exemple, supposons que vous regroupiez des tâches dans votre DAG. autant que possible, mais en limitant les tâches actives pour les répartir plus uniformément sur n'est pas une solution à privilégier pour votre cas d'utilisation spécifique.
Vous pouvez ajuster le parallélisme, le nombre de nœuds de calcul et la simultanéité des nœuds de calcul
pour exécuter le DAG dag_10_tasks_20_seconds_10
sans limiter les paramètres
tâches. Dans cet exemple, le DAG s'exécute 10 fois, et chaque exécution contient 20 petites tâches.
Si vous souhaitez les exécuter simultanément :
Vous aurez besoin d'un environnement de plus grande taille, car il permet de contrôler les performances de l'infrastructure Cloud Composer gérée de votre environnement.
Les nœuds de calcul Airflow doivent pouvoir exécuter 20 tâches simultanément, ce qui signifie vous devez définir la simultanéité des nœuds de calcul sur 20.
Les nœuds de calcul ont besoin de suffisamment de processeurs et de mémoire pour gérer toutes les tâches. Nœud de calcul dépend du processeur et de la mémoire des nœuds de calcul. Par conséquent, vous aurez besoin au moins
worker_concurrency / 12
au processeur et àleast worker_concurrency / 8
en mémoire.Vous devrez augmenter le parallélisme pour correspondre à la simultanéité des nœuds de calcul plus élevée. Pour que les nœuds de calcul récupèrent 20 tâches de la file d'attente, le planificateur devez d’abord planifier ces 20 tâches.
Ajustez les configurations de votre environnement comme suit :
Dans la console Google Cloud, accédez à la page Environnements.
Dans la liste des environnements, cliquez sur le nom de votre environnement. La page Détails de l'environnement s'ouvre.
Accédez à l'onglet Configuration de l'environnement.
Recherchez la section Resources (Ressources) > Workloads (Charges de travail). cliquez sur Modifier.
Dans la section Worker (Nœud de calcul), dans le champ Memory (Mémoire), spécifiez la nouvelle limite de mémoire pour les nœuds de calcul Airflow. Dans ce tutoriel, utilisez 4 Go.
Dans le champ CPU, spécifiez la nouvelle limite de processeur pour les nœuds de calcul Airflow. Dans ce utilisez deux vCPU.
Enregistrez les modifications et attendez quelques minutes pour que vos nœuds de calcul Airflow redémarrent.
Ensuite, remplacez les options de configuration Airflow pour le parallélisme et la simultanéité des nœuds de calcul :
Accédez à l'onglet Remplacements de configuration Airflow.
Cliquez sur Modifier, puis sur Ajouter un remplacement de configuration Airflow.
Remplacez la configuration de parralélisme:
Section Clé Valeur core
parallelism
20
Cliquez sur Add Airflow Configuration Override (Ajouter un remplacement de configuration Airflow) et remplacez la configuration de la concurrence des workers :
Section Clé Valeur celery
worker_concurrency
20
Cliquez sur Enregistrer et attendez que l'environnement mette à jour sa configuration.
Déclenchez à nouveau l'exemple de DAG avec les configurations ajustées :
Dans l'interface utilisateur d'Airflow, accédez à la page DAG.
Recherchez le DAG
dag_10_tasks_20_seconds_10
, puis supprimez-le.Une fois le DAG supprimé, Airflow vérifie le dossier des DAG dans le bucket de votre environnement et exécute automatiquement le DAG à nouveau.
Une fois les exécutions du DAG terminées, examinez à nouveau l'histogramme des journaux. Sur le diagramme,
vous pouvez voir que l'exemple dag_10_tasks_20_seconds_10
avec plus
les tâches consolidées n'ont pas généré d'erreurs ni d'avertissements lors de l'exécution avec
la configuration de l'environnement ajustée. Comparer les résultats aux données précédentes
sur le diagramme, où le même exemple a généré des erreurs et des avertissements
avec la configuration d'environnement tge par défaut.
Les configurations d'environnement et les configurations Airflow jouent un rôle crucial dans la planification des tâches. Toutefois, il n'est pas possible d'augmenter les configurations au-delà de certaines limites.
Nous vous recommandons d'optimiser le code du DAG, de consolider les tâches et d'utiliser la planification pour des performances et une efficacité optimales.
Exemple : erreurs d'analyse et latence du DAG en raison d'un code DAG complexe
Dans cet exemple, vous allez examiner la latence d'analyse d'un exemple de DAG qui imite un excès de variables Airflow.
Créer une variable Airflow
Avant d'importer l'exemple de code, créez une variable Airflow.
Dans la console Google Cloud, accédez à la page Environnements.
Dans la colonne Serveur Web Airflow, cliquez sur le lien Airflow correspondant à votre environnement.
Accédez à Admin > Variables > Ajouter un enregistrement
Définissez les valeurs suivantes :
- clé:
example_var
- val :
test_airflow_variable
- clé:
Importer l'exemple de DAG dans votre environnement
Importez l'exemple de DAG suivant dans l'environnement
que vous avez créées
aux étapes précédentes. Dans ce tutoriel, ce DAG est nommé
dag_for_loop_airflow_variable
Ce DAG contient une boucle for qui s'exécute 1 000 fois et imite un excès de variables Airflow. Chaque itération lit la variable example_var
et génère une tâche. Chaque tâche contient une commande qui imprime l'ID de la variable
.
from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.models import Variable
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 2, 17),
'retries': 0
}
dag = DAG(
'dag_for_loop_airflow_variable',
default_args=default_args,
catchup=False,
schedule_interval="@daily"
)
for i in range(1000):
a = Variable.get('example_var', 'N/A')
task = BashOperator(
task_id=f'task_{i}',
bash_command='echo variable foo=${foo_env}',
dag=dag,
env={'foo_env': a}
)
Diagnostiquer les problèmes d'analyse
Le temps d'analyse du DAG correspond au temps nécessaire au planificateur Airflow pour lire et analyser un fichier DAG. Avant que le programmeur Airflow puisse planifier une tâche à partir d'un DAG, le programmeur doit analyser le fichier DAG pour découvrir la structure le DAG et les tâches définies.
Si l'analyse d'un DAG prend beaucoup de temps, elle consomme la capacité de l'ordonnanceur et peut réduire les performances des exécutions de DAG.
Pour surveiller la durée d'analyse des DAG :
Exécutez la commande CLI Airflow
dags report
dans gcloud CLI pour afficher le temps d'analyse de tous vos DAG:gcloud composer environments run ENVIRONMENT_NAME \ --location LOCATION \ dags report
Remplacez les éléments suivants :
ENVIRONMENT_NAME
: nom de votre environnementLOCATION
: région dans laquelle se trouve l'environnement.
Dans le résultat de la commande, recherchez la valeur de durée de la
dag_for_loop_airflow_variables
DAG. Une valeur élevée peut indiquer qu'un ce DAG n'est pas implémenté de manière optimale. Si vous avez plusieurs DAG, de la table de sortie, vous pouvez identifier les DAG dont le temps d'analyse est long.Exemple :
file | duration | dag_num | task_num | dags ====================+================+=========+==========+===================== /dag_for_loop_airfl | 0:00:14.773594 | 1 | 1000 | dag_for_loop_airflow ow_variable.py | | | | _variable /airflow_monitoring | 0:00:00.003035 | 1 | 1 | airflow_monitoring .py
Inspectez les temps d'analyse des DAG dans la console Google Cloud:
- Dans la console Google Cloud, accédez à la page Environnements.
Dans la liste des environnements, cliquez sur le nom de votre environnement. La page Détails de l'environnement s'ouvre.
Accédez à l'onglet Journaux, puis à Tous les journaux > Gestionnaire de processeur DAG.
Examinez les journaux
dag-processor-manager
et identifiez les problèmes éventuels.
Si la durée d'analyse totale des DAG dépasse environ 10 secondes, vos planificateurs peuvent être surchargés par l'analyse des DAG et ne pas pouvoir exécuter efficacement les DAG.
Optimiser le code du DAG
Nous vous recommandons d'éviter le code Python "de niveau supérieur" inutile dans vos DAG. Les DAG contenant de nombreuses importations, variables et fonctions en dehors du DAG entraînent des temps d'analyse plus longs pour le planificateur Airflow. Cela réduit les performances et l'évolutivité Cloud Composer et Airflow. Un excès de lecture de variables Airflow entraîne une longue durée d'analyse et une charge de base de données élevée. Si ce code se trouve dans un DAG , ces fonctions s'exécutent à chaque pulsation du planificateur, ce qui peut être lent.
Les champs de modèle d'Airflow vous permettent d'intégrer les valeurs des variables Airflow et des modèles Jinja dans vos DAG. Cela évite l'exécution inutile de fonctions pendant les battements de cœur du planificateur.
Pour améliorer la mise en œuvre de l'exemple de DAG, évitez d'utiliser des variables Airflow dans le code Python de premier niveau des DAG. Transmettez plutôt des variables Airflow aux opérateurs existants via un modèle Jinja, ce qui retardera la lecture de la valeur jusqu'à l'exécution de la tâche.
Importez la nouvelle version de l'exemple de DAG dans votre
environnement. Dans ce tutoriel, ce DAG est nommé dag_for_loop_airflow_variable_optimized
.
from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 2, 17),
'retries': 0
}
dag = DAG(
'dag_for_loop_airflow_variable_optimized',
default_args=default_args,
catchup=False,
schedule_interval='@daily'
)
for i in range(1000):
task = BashOperator(
task_id=f'bash_use_variable_good_{i}',
bash_command='echo variable foo=${foo_env}',
dag=dag,
env={'foo_env': '{{ var.value.get("example_var") }}'},
)
Inspectez la durée d'analyse du nouveau DAG:
Attendez la fin de l'exécution du DAG.
Exécutez à nouveau la commande
dags report
pour afficher le temps d'analyse de tous vos DAG :file | duration | dag_num | task_num | dags ====================+================+=========+==========+===================== /dag_for_loop_airfl | 0:00:37.000369 | 1 | 1000 | dag_for_loop_airflow ow_variable.py | | | | _variable /dag_for_loop_airfl | 0:00:01.109457 | 1 | 1000 | dag_for_loop_airflow ow_variable_optimiz | | | | _variable_optimized ed.py | | | | /airflow_monitoring | 0:00:00.040510 | 1 | 1 | airflow_monitoring .py | | | |
Réexaminez les journaux
dag-processor-manager
et analysez la durée d'analyse.
En remplaçant les variables d'environnement par des modèles Airflow, vous avez simplifié le code DAG et a réduit la latence d'analyse d'environ 10 fois.
Optimiser les configurations d'environnement Airflow
Le planificateur Airflow tente constamment de déclencher de nouvelles tâches et d'analyser tous les DAG dans le bucket de votre environnement. Si la durée d'analyse de vos DAG est longue et que le consomme beaucoup de ressources, vous pouvez l'optimiser configurations afin que le planificateur utilise les ressources plus efficacement.
Dans ce tutoriel, l'analyse des fichiers DAG prend beaucoup de temps, et les cycles d'analyse commencent à se chevaucher, ce qui épuise la capacité du planificateur. Dans notre exemple,
l'analyse du premier exemple de DAG prend plus de 5 secondes. Vous allez donc configurer
le programmeur de s'exécuter moins fréquemment
pour utiliser les ressources plus efficacement. Vous allez remplacer l'option de configuration Airflow scheduler_heartbeat_sec
. Cette configuration définit la fréquence d'exécution du planificateur (en secondes). Par défaut, la valeur est définie sur 5 secondes.
Vous pouvez modifier cette option de configuration Airflow en la remplaçant.
Remplacez l'option de configuration Airflow scheduler_heartbeat_sec
:
Dans la console Google Cloud, accédez à la page Environnements.
Dans la liste des environnements, cliquez sur le nom de votre environnement. La page Détails de l'environnement s'ouvre.
Accédez à l'onglet Remplacements de configuration Airflow.
Cliquez sur Modifier, puis sur Ajouter un remplacement de configuration Airflow.
Remplacez l'option de configuration Airflow :
Section Clé Valeur scheduler
scheduler_heartbeat_sec
10
Cliquez sur Enregistrer et attendez que l'environnement mette à jour sa configuration.
Vérifiez les métriques du planificateur :
Accédez à l'onglet Surveillance, puis sélectionnez Planificateurs.
Dans le graphique Heartbeat du planificateur, cliquez sur le bouton Plus d'options (trois points), puis sur Afficher dans l'explorateur de métriques.
Sur le graphique, vous pouvez voir que le planificateur s'exécute deux fois moins fréquemment après avoir modifié la configuration par défaut de 5 secondes à 10 secondes. En réduisant la fréquence des battements de cœur, vous vous assurez que le planificateur ne commence pas à s'exécuter pendant le cycle d'analyse précédent et que la capacité de ressources du planificateur n'est pas épuisée.
Attribuer davantage de ressources au planificateur
Dans Cloud Composer 2, vous pouvez allouer davantage de ressources de processeur et de mémoire au scheduler. Vous pouvez ainsi améliorer les performances de votre planificateur et accélérer le temps d'analyse de votre DAG.
Allouez du processeur et de la mémoire supplémentaires au programmeur:
Dans la console Google Cloud, accédez à la page Environnements.
Dans la liste des environnements, cliquez sur le nom de votre environnement. La page Détails de l'environnement s'ouvre.
Accédez à l'onglet Configuration de l'environnement.
Accédez à la configuration Ressources > Charges de travail, puis cliquez sur Modifier.
Dans la section Scheduler (Planificateur), dans le champ Memory (Mémoire), spécifiez la nouvelle limite de mémoire. Dans ce tutoriel, utilisez 4 Go.
Dans le champ Processeur, spécifiez la nouvelle limite de processeur. Dans ce utilisez deux vCPU.
Enregistrez les modifications et attendez quelques minutes pour que vos programmeurs Airflow puissent redémarrer.
Accédez à l'onglet Journaux, puis à Tous les journaux > Gestionnaire de processeur DAG.
Examinez les journaux
dag-processor-manager
et comparez la durée d'analyse pour le Exemples de DAG:
En attribuant plus de ressources au planificateur, vous avez augmenté sa capacité et réduit considérablement la latence d'analyse par rapport aux configurations d'environnement par défaut. Avec plus de ressources, le planificateur peut analyser les DAG plus rapidement. Toutefois, les coûts associés aux ressources Cloud Composer augmenteront également. De plus, il n'est pas possible d'augmenter la valeur au-delà d'une certaine limite.
Nous vous recommandons de n'allouer des ressources qu'après le code DAG possible et Des optimisations de configuration Airflow ont été mises en œuvre.
Effectuer un nettoyage
Pour éviter que les ressources soient facturées sur votre compte Google Cloud, utilisées dans ce tutoriel, supprimez le projet qui contient les ressources ou conserver le projet et supprimer les ressources individuelles.
Supprimer le projet
- In the Google Cloud console, go to the Manage resources page.
- In the project list, select the project that you want to delete, and then click Delete.
- In the dialog, type the project ID, and then click Shut down to delete the project.
Supprimer des ressources individuelles
Si vous envisagez d'explorer plusieurs tutoriels et guides de démarrage rapide, réutiliser des projets peut vous aider à ne pas dépasser les limites de quotas des projets.
Supprimez l'environnement Cloud Composer. Vous devez également supprimer le bucket de l'environnement au cours de cette procédure.
Étape suivante
- Optimiser les performances et les coûts de votre environnement
- Faire évoluer des environnements
- Découvrez comment résoudre les problèmes liés aux DAG.