Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
Cette page explique comment gérer les DAG dans Cloud Composer environnement.
Cloud Composer utilise un bucket Cloud Storage pour stocker les DAG de votre environnement Cloud Composer. Votre environnement synchronise les DAG de ce bucket avec les composants Airflow tels que les nœuds de calcul et les programmeurs Airflow.
Avant de commencer
- Étant donné qu'Apache Airflow n'offre pas une forte isolation DAG, nous vous recommandons de maintenir des environnements de production et de test distincts afin d'éviter les interférences au niveau des DAG. Pour en savoir plus, reportez-vous à la section Tester les DAG.
- Assurez-vous que votre compte dispose d'autorisations suffisantes pour gérer les DAG.
- Les modifications apportées au DAG sont propagées vers Airflow dans un délai de 3 à 5 minutes. Vous pouvez voir qu'il s'agit dans l'interface Web Airflow.
Accéder au bucket de votre environnement
Pour accéder au bucket associé à votre environnement :
Console
Dans la console Google Cloud, accédez à la page Environnements.
Dans la liste des environnements, recherchez une ligne avec le nom de votre environnement. Dans la colonne Dossier des DAG, cliquez sur le lien DAG. La page Informations sur le bucket s'ouvre. Il affiche le contenu du dossier
/dags
dans le bucket de votre environnement.
gcloud
La gcloud CLI propose des commandes distinctes ajouter et supprimer des DAG dans le bucket de votre environnement.
Si vous souhaitez interagir avec le bucket de votre environnement, vous pouvez également utiliser la Google Cloud CLI. Pour obtenir l'adresse du bucket de votre environnement, exécutez la commande suivante de la CLI gcloud :
gcloud composer environments describe ENVIRONMENT_NAME \
--location LOCATION \
--format="get(config.dagGcsPrefix)"
Remplacez :
ENVIRONMENT_NAME
par le nom de l'environnement.LOCATION
par la région dans laquelle se trouve l'environnement.
Exemple :
gcloud beta composer environments describe example-environment \
--location us-central1 \
--format="get(config.dagGcsPrefix)"
API
Rédigez une requête API environments.get
. Dans la ressource Environment, dans la ressource EnvironmentConfig, dans la ressource dagGcsPrefix
, vous trouverez l'adresse du bucket de votre environnement.
Exemple :
GET https://composer.googleapis.com/v1/projects/example-project/
locations/us-central1/environments/example-environment
Python
Utilisez la bibliothèque google-auth pour obtenir des identifiants et la bibliothèque requests pour appeler l'API REST.
Ajouter ou mettre à jour un DAG
Pour ajouter ou mettre à jour un DAG, déplacez le fichier Python .py
du DAG vers
dans le dossier /dags
du bucket de l'environnement.
Console
Dans la console Google Cloud, accédez à la page Environnements.
Dans la liste des environnements, recherchez une ligne portant le nom de votre environnement, puis dans la colonne Dossier des DAG, cliquez sur le lien DAG. La page Informations sur le bucket s'ouvre. Il affiche le contenu du dossier
/dags
dans le bucket de votre environnement.Cliquez sur Importer des fichiers. Sélectionnez ensuite le fichier Python
.py
pour le DAG à l'aide de la boîte de dialogue du navigateur, puis confirmez.
gcloud
gcloud composer environments storage dags import \
--environment ENVIRONMENT_NAME \
--location LOCATION \
--source="LOCAL_FILE_TO_UPLOAD"
Remplacez :
ENVIRONMENT_NAME
par le nom de l'environnement.LOCATION
par la région dans laquelle se trouve l'environnement.LOCAL_FILE_TO_UPLOAD
est le fichier.py
Python du DAG.
Exemple :
gcloud composer environments storage dags import \
--environment example-environment \
--location us-central1 \
--source="example_dag.py"
Mettre à jour un DAG contenant des exécutions de DAG actives
Si vous mettez à jour un DAG contenant des exécutions DAG actives :
- Toutes les tâches en cours d'exécution finissent par utiliser le fichier DAG d'origine.
- Toutes les tâches planifiées, mais qui ne sont pas en cours d'exécution, utiliseront le fichier DAG mis à jour.
- Toutes les tâches qui ne figurent plus dans le fichier DAG mis à jour sont marquées comme supprimés.
Mettre à jour les DAG qui s'exécutent fréquemment
Après l'importation d'un fichier DAG, Airflow met un certain temps à le charger et mettre à jour le DAG. Si votre DAG s'exécute fréquemment, vous pouvez pour s'assurer que le DAG utilise la version mise à jour du fichier DAG. Pour ce faire :
Suspendez le DAG dans l'interface utilisateur d'Airflow.
Importez un fichier DAG mis à jour.
Attendez que les mises à jour s'affichent dans l'interface utilisateur d'Airflow. Cela signifie que le DAG a été correctement analysé par le programmeur et mis à jour dans la base de données Airflow.
Si l'interface utilisateur d'Airflow affiche les DAG mis à jour, cela ne garantit pas qu'ils utilisent bien la version mise à jour du fichier DAG. Ce comportement est dû au fait que les fichiers DAG sont synchronisés indépendamment pour les programmeurs et les nœuds de calcul.
Vous pouvez prolonger le temps d'attente pour vous assurer que le fichier DAG est synchronisé avec tous les nœuds de calcul de votre environnement. La synchronisation a lieu plusieurs fois par minute. Dans un environnement qui fonctionne normalement, il suffit d'attendre 20 à 30 secondes pour que tous les nœuds de calcul soient synchronisés.
(Facultatif) Pour être certain que tous les nœuds de calcul disposent de la nouvelle version du fichier DAG, consultez les journaux de chaque nœud de calcul. Pour ce faire :
Ouvrez l'onglet Journaux de votre environnement dans la console Google Cloud.
Accédez à Journaux Composer > Infrastructure > de synchronisation Cloud Storage, et inspectez les journaux de chaque nœud de calcul dans votre environnement. Recherchez le journal
Syncing dags directory
le plus récent avec un code temporel après l'importation du nouveau fichier DAG. SiFinished syncing
s'affiche après l'élément de journal, les DAG sont correctement synchronisés sur ce nœud de calcul.
Relancez le DAG.
Supprimer un DAG dans votre environnement
Pour supprimer un DAG, supprimez le fichier Python .py
du DAG du
/dags
dans le bucket de votre environnement.
Console
Dans la console Google Cloud, accédez à la page Environnements.
Dans la liste des environnements, recherchez une ligne avec le nom de votre environnement. Dans la colonne Dossier des DAG, cliquez sur le lien DAG. La La page Informations sur le bucket s'ouvre. Il affiche le contenu du dossier
/dags
dans le bucket de votre environnement.Sélectionnez le fichier DAG, cliquez sur Supprimer, puis confirmez l'opération.
gcloud
gcloud composer environments storage dags delete \
--environment ENVIRONMENT_NAME \
--location LOCATION \
DAG_FILE
Remplacez :
ENVIRONMENT_NAME
par le nom de l'environnement.LOCATION
par la région dans laquelle se trouve l'environnement.DAG_FILE
par le fichier Python.py
pour le DAG ;
Exemple :
gcloud composer environments storage dags delete \
--environment example-environment \
--location us-central1 \
example_dag.py
Supprimer un DAG de l'interface utilisateur d'Airflow
Pour supprimer les métadonnées d'un DAG à partir de l'interface Web Airflow, procédez comme suit:
Interface utilisateur d'Airflow
- Accédez à l'interface utilisateur d'Airflow pour votre environnement.
- Pour le DAG, cliquez sur Supprimer le DAG.
gcloud
Exécutez la commande suivante dans la gcloud CLI:
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
dags delete -- DAG_NAME
Remplacez :
ENVIRONMENT_NAME
par le nom de l'environnement.LOCATION
par la région dans laquelle se trouve l'environnement.DAG_NAME
est le nom du DAG à supprimer.
Étape suivante
- Écrire des DAG
- Tester, synchroniser et déployer vos DAG à partir de GitHub
- Découvrez comment résoudre les problèmes liés aux DAG.