Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
Ce guide explique comment écrire un graphe orienté acyclique (DAG) Apache Airflow exécuté dans un environnement Cloud Composer.
Étant donné qu'Apache Airflow n'offre pas une forte isolation des DAG et des tâches, nous vous recommandons d'utiliser des environnements de production et de test distincts afin d'éviter les interférences au niveau des DAG. Pour en savoir plus, reportez-vous à la section Tester les DAG.
Structurer un DAG Airflow
Un DAG Airflow est défini dans un fichier Python et se compose des éléments suivants : composants:
- Définition du DAG
- Opérateurs Airflow
- Relations avec les opérateurs
Les extraits de code suivants illustrent des exemples de chaque composant hors contexte.
Une définition de DAG
L'exemple suivant illustre une définition de DAG Airflow :
Airflow 2
Airflow 1
Opérateurs et tâches
Les opérateurs Airflow décrivent la tâche à effectuer. Une tâche task est une instance spécifique d'un opérateur.
Airflow 2
Airflow 1
Relations entre les tâches
Les relations entre les tâches décrivent l'ordre dans lequel le travail doit être effectué.
Airflow 2
Airflow 1
Exemple de workflow DAG complet en Python
Le workflow suivant est un modèle de DAG complet et composé des éléments suivants :
Deux tâches: une tâche hello_python
et une tâche goodbye_bash
:
Airflow 2
Airflow 1
Pour en savoir plus sur la définition des DAG Airflow, consultez le tutoriel Airflow et les concepts Airflow.
Opérateurs Airflow
Les exemples suivants présentent quelques opérateurs Airflow connus. Pour obtenir une documentation de référence faisant autorité sur les opérateurs Airflow, consultez la documentation de référence sur les opérateurs et les hooks et l'index des fournisseurs.
BashOperator
Utilisez BashOperator pour exécuter des programmes de ligne de commande.
Airflow 2
Airflow 1
Cloud Composer exécute les commandes fournies dans un script Bash sur une Nœud de calcul Airflow. Le nœud de calcul est un conteneur Docker basé sur Debian qui inclut plusieurs packages.
- Commande
gcloud
, y compris la sous-commandegcloud storage
pour travailler avec les buckets Cloud Storage. - Commande
bq
- Commande
kubectl
PythonOperator
Utilisez PythonOperator pour exécuter du code Python arbitraire.
Cloud Composer exécute le code Python dans un conteneur qui inclut : packages pour la version d'image Cloud Composer utilisés dans votre environnement.
Pour installer des packages Python supplémentaires, reportez-vous à la section Installer des dépendances Python.
Opérateurs Google Cloud
Pour exécuter des tâches qui utilisent des produits Google Cloud, utilisez la Opérateurs Google Cloud Airflow. Par exemple, les opérateurs BigQuery interrogent et traitent les données dans BigQuery.
Il existe de nombreux autres opérateurs Airflow pour Google Cloud, ainsi que des services individuels fournis par Google Cloud. Pour obtenir la liste complète, consultez la page Opérateurs Google Cloud.
Airflow 2
Airflow 1
EmailOperator
Utilisez EmailOperator pour envoyer un e-mail à partir d'un DAG. Pour envoyer depuis un environnement Cloud Composer, configurer votre environnement pour utiliser SendGrid.
Airflow 2
Airflow 1
Notifications en cas d'échec de l'opérateur
Définissez email_on_failure
sur True
pour envoyer une notification par e-mail lorsqu'un opérateur du DAG échoue. Pour envoyer des notifications par e-mail à partir d'un environnement Cloud Composer, vous devez configurer votre environnement pour qu'il utilise SendGrid.
Airflow 2
Airflow 1
Consignes concernant les workflows DAG
Placez toutes les bibliothèques Python personnalisées dans l'archive ZIP des DAG, dans un répertoire imbriqué. Ne placez pas de bibliothèques au niveau supérieur du répertoire des DAG.
Lorsqu'Airflow analyse le dossier
dags/
, il ne recherche que les DAG dans Modules Python situés au premier niveau du dossier des DAG et au sommet d'une archive ZIP également située dans le dossierdags/
de premier niveau. Si Airflow rencontre un module Python dans une archive ZIP ne contenant pas les deux sous-chaînesairflow
etDAG
, il cesse de traiter l'archive ZIP. Airflow ne renvoie que les DAG trouvés jusqu'à ce point.Utilisez Airflow 2 au lieu d'Airflow 1.
La communauté Airflow ne publie pas de nouvelles versions mineures ou de correctifs pour Airflow 1.
Pour la tolérance aux pannes, ne définissez pas plusieurs objets DAG dans le même module Python.
N'utilisez pas de SubDAG. À la place, regrouper des tâches dans des DAG.
Placez les fichiers requis au moment de l'analyse du DAG dans le dossier
dags/
, et non dans le dossierdata/
.Testez les DAG développés ou modifiés comme recommandé des instructions pour tester les DAG.
Vérifier que les DAG développés n'augmentent pas Trop de durées d'analyse du DAG.
Les tâches Airflow peuvent échouer pour plusieurs raisons. Pour éviter les échecs de l'exécution complète d'un DAG, nous vous recommandons d'activer les nouvelles tentatives de tâches. Si vous définissez le nombre maximal de nouvelles tentatives sur
0
, aucune nouvelle tentative n'est effectuée.Nous vous recommandons d'ignorer l'option
default_task_retries
en définissant une valeur pour les nouvelles tentatives de la tâche autre que0
. De plus, vous pouvez définir Paramètreretries
au niveau de la tâche.Si vous souhaitez utiliser un GPU dans vos tâches Airflow, créez une couche Cluster GKE basé sur des nœuds utilisant des machines avec des GPU. Utilisez GKEStartPodOperator pour exécuter vos tâches.
Évitez d'exécuter des tâches gourmandes en processeur et en mémoire dans le pool de nœuds du cluster où d'autres composants Airflow (programmeurs, nœuds de calcul, serveurs Web) s'exécutent. Utilisez plutôt KubernetesPodOperator ou GKEStartPodOperator à la place.
Lorsque vous déployez des DAG dans un environnement, n'importez dans le dossier
/dags
que les fichiers absolument nécessaires à l'interprétation et à l'exécution des DAG.Limitez le nombre de fichiers DAG dans le dossier
/dags
.Airflow analyse en continu les DAG dans le dossier
/dags
. L'analyse est une qui passe en boucle dans le dossier des DAG et indique le nombre de fichiers (avec leurs dépendances) affecte les performances de l'analyse des DAG et de la planification des tâches. Il est beaucoup plus efficace d'utiliser 100 fichiers avec 100 DAG chacun que 10 000 fichiers avec un DAG chacun. Par conséquent, une telle optimisation est recommandée. Cette optimisation est un équilibre entre le temps d'analyse et l'efficacité de la création et de la gestion des DAG.Par exemple, vous pouvez aussi envisager de déployer 10 000 fichiers DAG créer 100 fichiers ZIP contenant chacun 100 fichiers DAG.
Outre les conseils ci-dessus, si vous avez plus de 10 000 fichiers DAG, générer des DAG de manière programmatique peut être une bonne option. Par exemple : vous pouvez implémenter un seul fichier DAG Python, qui génère un certain nombre Objets DAG (par exemple, 20 ou 100 objets DAG)
Éviter d'utiliser des opérateurs Airflow obsolètes
Les opérateurs listés dans le tableau suivant sont obsolètes. Certains de ces opérateurs étaient compatibles avec les premières versions de Cloud Composer 1. Évitez de les utiliser dans vos DAG. Utilisez plutôt les alternatives à jour fournies.
Opérateur obsolète | Opérateur à utiliser |
---|---|
BigQueryExecuteQueryOperator | BigQueryInsertJobOperator |
BigQueryPatchDatasetOperator | BigQueryUpdateTableOperator |
DataflowCreateJavaJobOperator | BeamRunJavaPipelineOperator |
DataflowCreatePythonJobOperator | BeamRunPythonPipelineOperator |
DataprocScaleClusterOperator | DataprocUpdateClusterOperator |
DataprocSubmitPigJobOperator | DataprocSubmitJobOperator |
DataprocSubmitSparkSqlJobOperator | DataprocSubmitJobOperator |
DataprocSubmitSparkJobOperator | DataprocSubmitJobOperator |
DataprocSubmitHadoopJobOperator | DataprocSubmitJobOperator |
DataprocSubmitPySparkJobOperator | DataprocSubmitJobOperator |
MLEngineManageModelOperator | MLEngineCreateModelOperator, MLEngineGetModelOperator |
MLEngineManageVersionOperator | MLEngineCreateVersion, MLEngineSetDefaultVersion, MLEngineListVersions, MLEngineDeleteVersion |
GCSObjectsWtihPrefixExistenceSensor | GCSObjectsWithPrefixExistenceSensor |
Questions fréquentes sur l'écriture des DAG
Comment réduire la répétition du code lorsque j'exécute des tâches identiques ou semblables dans plusieurs DAG ?
Nous vous suggérons de définir des bibliothèques et des wrappers pour pour minimiser la répétition du code.
Comment réutiliser le code à travers plusieurs fichiers DAG ?
Placez vos fonctions utilitaires dans un
bibliothèque Python locale
et importer les fonctions. Vous pouvez référencer les fonctions de n'importe quel DAG situé dans le dossier dags/
du bucket de votre environnement.
Comment minimiser le risque d'apparition de définitions différentes ?
Imaginons que deux équipes souhaitent agréger des données brutes en métriques de revenus. Les équipes rédigent deux tâches légèrement différentes pour effectuer la même opération. Définissez les bibliothèques pour qu'elles fonctionnent avec les données sur les revenus. Ainsi, les développeurs des DAG doivent clarifier la définition des revenus en cours d'agrégation.
Comment définir des dépendances entre les DAG ?
Tout dépend de la manière dont vous souhaitez définir la dépendance.
Si vous avez deux DAG (DAG A et DAG B) et que vous souhaitez que le DAG B se déclenche après le DAG
A, vous pouvez placer
TriggerDagRunOperator
à la fin du DAG A.
Si le DAG B ne dépend que d'un artefact généré par le DAG A, tel qu'un message Pub/Sub, il est possible qu'un capteur fonctionne mieux.
Si le DAG B est étroitement intégré au DAG A, vous pourrez peut-être fusionner les deux DAG en un seul.
Comment transférer des ID d'exécution uniques à un DAG et à ses tâches ?
Imaginons que vous souhaitiez transférer les noms de cluster Dataproc et les chemins des fichiers.
Vous pouvez générer un ID aléatoire unique en renvoyant str(uuid.uuid4())
dans un opérateur PythonOperator
. Cela place l'ID dans
XComs
afin de pouvoir faire référence à l'ID dans d'autres opérateurs
à l'aide de champs modélisés.
Avant de générer un uuid
, essayez de déterminer si un ID propre à DagRun serait plus utile. Vous pouvez également référencer ces ID dans les substitutions Jinja à l'aide de macros.
Comment séparer les tâches dans un DAG ?
Chaque tâche doit constituer une unité de travail idempotente. Par conséquent, évitez d'encapsuler un workflow en plusieurs étapes au sein d'une tâche unique, comme un programme complexe exécuté dans un opérateur PythonOperator
.
Faut-il définir plusieurs tâches dans un même DAG pour agréger des données provenant de plusieurs sources ?
Par exemple, vous avez plusieurs tables avec des données brutes et vous souhaitez créer des agrégats quotidiens pour chaque table. Les tâches ne dépendent pas les unes des autres. Faut-il créer une tâche et un DAG pour chaque table ou créer un DAG général ?
Si vous acceptez que chaque tâche partage les mêmes propriétés au niveau du DAG, telles que schedule_interval
, il est judicieux de définir plusieurs tâches dans un même DAG. Sinon, pour réduire la répétition du code, plusieurs DAG peuvent être générés à partir d'un seul module Python en les plaçant dans les globals()
du module.
Comment limiter le nombre de tâches simultanées en cours d'exécution dans un DAG ?
Par exemple, vous souhaitez éviter de dépasser les limites/quotas d'utilisation de l'API ou éviter d'exécuter trop de processus simultanés.
Vous pouvez définir des pools Airflow dans l'interface utilisateur Web d'Airflow et associer des tâches à des pools existants dans vos DAG.
Questions fréquentes sur l'utilisation des opérateurs
Faut-il utiliser l'opérateur DockerOperator
?
Nous vous déconseillons d'utiliser
le DockerOperator
, sauf s'il est utilisé pour lancer
des conteneurs sur une installation Docker distante (et non dans la couche
cluster). Dans un environnement Cloud Composer, l'opérateur n'a pas accès aux daemons Docker.
Utilisez plutôt KubernetesPodOperator
ou
GKEStartPodOperator
Ces opérateurs lancent les pods Kubernetes dans
Kubernetes ou les clusters GKE. Notez que nous ne
recommandent de lancer des pods dans le cluster d'un environnement, car cela peut entraîner
à la concurrence des ressources.
Faut-il utiliser l'opérateur SubDagOperator
?
Nous vous déconseillons d'utiliser l'opérateur SubDagOperator
.
Utilisez des alternatives, comme suggéré dans la section Regrouper des tâches.
Faut-il exécuter du code Python uniquement dans des opérateurs PythonOperators
pour séparer complètement les opérateurs Python ?
En fonction de votre objectif, plusieurs options s'offrent à vous.
Si votre seule préoccupation est de conserver des dépendances Python distinctes, vous pouvez utiliser l'opérateur PythonVirtualenvOperator
.
Envisagez d'utiliser KubernetesPodOperator
. Cet opérateur vous permet
pour définir des pods Kubernetes et les exécuter dans d'autres clusters.
Comment ajouter des packages binaires personnalisés ou non PyPI ?
Vous pouvez installer des packages hébergés dans des dépôts de packages privés.
Comment transmettre uniformément des arguments à un DAG et à ses tâches ?
La compatibilité avec Airflow intégrée à la modélisation Jinja permet de transmettre des arguments pouvant être utilisés dans des champs modélisés.
Quand se produit la substitution de modèle ?
La substitution de modèle se produit sur les nœuds de calcul Airflow juste avant l'appel de la fonction pre_execute
d'un opérateur. En pratique, cela signifie que les modèles ne sont remplacés que juste avant l'exécution d'une tâche.
Comment connaître les arguments d'opérateur compatibles avec la substitution de modèle ?
Les arguments d'opérateur qui acceptent la substitution de modèle Jinja2 sont explicitement marqué comme tel.
Recherchez le champ template_fields
dans la définition de l'opérateur, qui contient la liste des noms d'arguments qui subiront une substitution de modèle.
Par exemple, consultez
le BashOperator
, qui est compatible avec la création de modèles
les arguments bash_command
et env
.
Étape suivante
- Découvrez comment résoudre les problèmes liés aux DAG.
- Dépannage du programmeur
- Opérateurs Google
- Opérateurs Google Cloud
- Tutoriel Apache Airflow