Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
Cette page explique comment la planification et le déclenchement de DAG fonctionnent dans Airflow, comment définir une planification pour un DAG, et comment déclencher un DAG manuellement ou le mettre en veille.
À propos des DAG Airflow dans Cloud Composer
Les DAG Airflow dans Cloud Composer sont exécutés dans un ou plusieurs environnements Cloud Composer de votre projet. Vous importez les fichiers sources de vos DAG Airflow dans un bucket Cloud Storage associé à un environnement. L'instance Airflow de l'environnement analyse ensuite ces fichiers et planifie les exécutions de DAG, comme défini par la programmation de chaque DAG. Lors d'une exécution de DAG, Airflow planifie et exécute les tâches individuelles qui composent un DAG dans une séquence définie par le DAG.
Pour en savoir plus sur les concepts fondamentaux d'Airflow, tels que les DAG Airflow, les exécutions de DAG, les tâches ou les opérateurs, consultez la page Concepts fondamentaux de la documentation Airflow.
À propos de la planification des DAG dans Airflow
Airflow propose les concepts suivants pour son mécanisme de planification :
- Date logique
Représente une date pour laquelle une exécution de DAG spécifique est exécutée.
Il ne s'agit pas de la date à laquelle Airflow exécute un DAG, mais d'une période pendant laquelle une exécution de DAG spécifique doit être traitée. Par exemple, pour un DAG qui doit s'exécuter tous les jours à 12h00, la date logique sera également 12h00 un jour spécifique. Comme il s'exécute deux fois par jour, la période qu'il doit traiter correspond aux 12 dernières heures. En même temps, la logique définie dans le DAG lui-même peut ne pas utiliser du tout la date logique ni l'intervalle de temps. Par exemple, un DAG peut exécuter le même script une fois par jour sans utiliser la valeur de la date logique.
Dans les versions d'Airflow antérieures à la version 2.2, cette date est appelée date d'exécution.
- Date d'exécution
Représente une date à laquelle une exécution de DAG spécifique est exécutée.
Par exemple, pour un DAG qui doit s'exécuter tous les jours à 12h00, l'exécution réelle peut avoir lieu à 12h05, soit un peu après la date logique.
- Intervalle de planification
Représente la date et la fréquence d'exécution d'un DAG, en termes de dates logiques.
Par exemple, une planification quotidienne signifie qu'un DAG est exécuté une fois par jour et que les dates logiques de ses exécutions de DAG ont des intervalles de 24 heures.
- Date de début
Spécifie à quel moment Airflow doit commencer à planifier votre DAG.
Les tâches de votre DAG peuvent avoir des dates de début individuelles, ou vous pouvez spécifier une date de début unique pour toutes les tâches. En fonction de la date de début minimale des tâches de votre DAG et de l'intervalle de planification, Airflow planifie l'exécution du DAG.
- Rattrapage, remplissage et nouvelles tentatives
Mécanismes d'exécution des exécutions de DAG pour les dates passées.
La fonctionnalité de rattrapage exécute les exécutions de DAG qui n'ont pas encore eu lieu, par exemple si le DAG a été mis en veille pendant une longue période, puis réactivé. Vous pouvez utiliser le remplissage pour exécuter des DAG pour une certaine plage de dates. Les nouvelles tentatives spécifient le nombre de tentatives qu'Airflow doit effectuer lors de l'exécution des tâches d'un DAG.
La planification fonctionne de la manière suivante :
Une fois la date de début passée, Airflow attend la prochaine occurrence de l'intervalle de planification.
Airflow planifie la première exécution du DAG à la fin de cet intervalle de planification.
Par exemple, si un DAG est exécuté toutes les heures et que la date de début est à 12h00, la première exécution du DAG a lieu aujourd'hui à 13h.
La section Planifier un DAG Airflow de ce document explique comment configurer la planification de vos DAG à l'aide de ces concepts. Pour en savoir plus sur les exécutions et la planification des DAG, consultez Exécutions de DAG dans la documentation Airflow.
À propos des méthodes de déclenchement d'un DAG
Airflow propose les méthodes suivantes pour déclencher un DAG :
Déclenchement planifié. Airflow déclenche le DAG automatiquement en fonction de la planification spécifiée dans le fichier DAG.
Déclenchement manuel Vous pouvez déclencher un DAG manuellement à partir de la consoleGoogle Cloud ou de l'UI Airflow, ou en exécutant une commande de CLI Airflow depuis Google Cloud CLI.
Déclenchement en réponse à des événements. Le moyen standard de déclencher un DAG en réponse à des événements consiste à utiliser un capteur.
Autres méthodes pour déclencher des DAG :
Déclenchement par programmation. Vous pouvez déclencher un DAG à l'aide de l'API REST Airflow. Par exemple, à partir d'un script Python.
Déclenchement par programmation en réponse à des événements. Vous pouvez déclencher des DAG en réponse à des événements à l'aide des fonctions Cloud Run et de l'API REST Airflow.
Avant de commencer
- Assurez-vous que votre compte dispose d'un rôle permettant de gérer les objets dans les buckets d'environnement, et d'afficher et de déclencher des DAG. Pour plus d'informations, consultez la section Contrôle des accès.
Planifier un DAG Airflow
Vous définissez une programmation pour un DAG dans le fichier DAG. Modifiez la définition du DAG comme suit :
Recherchez et modifiez le fichier DAG sur votre ordinateur. Si vous n'avez pas le fichier DAG, vous pouvez en télécharger une copie depuis le bucket de l'environnement. Pour un nouveau DAG, vous pouvez définir tous les paramètres lorsque vous créez le fichier DAG.
Dans le paramètre
schedule_interval
, définissez la programmation. Vous pouvez utiliser une expression Cron, telle que0 0 * * *
, ou un préréglage, tel que@daily
. Pour en savoir plus, consultez Cron and Time Intervals (Cron et intervalles de temps) dans la documentation Airflow.Airflow détermine les dates logiques pour les exécutions de DAG en fonction du calendrier que vous définissez.
Dans le paramètre
start_date
, définissez la date de début.Airflow détermine la date logique de la première exécution du DAG à l'aide de ce paramètre.
(Facultatif) Dans le paramètre
catchup
, définissez si Airflow doit exécuter toutes les exécutions précédentes de ce DAG, de la date de début à la date actuelle, qui n'ont pas encore été exécutées.La date logique des exécutions de DAG effectuées pendant la rattrapage sera dans le passé, et leur date d'exécution reflétera le moment où l'exécution de DAG a réellement eu lieu.
(Facultatif) Dans le paramètre
retries
, définissez le nombre de fois qu'Airflow doit réessayer les tâches ayant échoué (chaque DAG se compose d'une ou plusieurs tâches individuelles). Par défaut, les tâches dans Cloud Composer font l'objet de deux nouvelles tentatives.Importez la nouvelle version du DAG dans le bucket de l'environnement.
Attendez qu'Airflow analyse correctement le DAG. Par exemple, vous pouvez consulter la liste des DAG de votre environnement dans la consoleGoogle Cloud ou dans l'UI Airflow.
La définition de DAG suivante s'exécute deux fois par jour, à 00h00 et à 12h00. Sa date de début est définie sur le 1er janvier 2024, mais Airflow ne l'exécute pas pour les dates passées après que vous l'avez importé ou mis en veille, car la mise à jour est désactivée.
Le DAG contient une tâche nommée insert_query_job
, qui insère une ligne dans un tableau avec l'opérateur BigQueryInsertJobOperator
. Cet opérateur fait partie des opérateurs BigQuery, que vous pouvez utiliser pour gérer les ensembles de données et les tables, exécuter des requêtes et valider des données.Google Cloud
Si une exécution spécifique de cette tâche échoue, Airflow la relance quatre fois de plus avec l'intervalle de relance par défaut. La date logique de ces nouvelles tentatives reste la même.
La requête SQL de cette ligne utilise des modèles Airflow pour écrire la date logique et le nom du DAG dans la ligne.
import datetime
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
with DAG(
"bq_example_scheduling_dag",
start_date=datetime.datetime(2024, 1, 1),
schedule_interval='0 */12 * * *',
catchup=False
) as dag:
insert_query_job = BigQueryInsertJobOperator(
task_id="insert_query_job",
retries=4,
configuration={
"query": {
# schema: date (string), description (string)
# example row: "20240101T120000", "DAG run: <DAG: bq_example_scheduling_dag>"
"query": "INSERT example_dataset.example_table VALUES ('{{ ts_nodash }}', 'DAG run: {{ dag }}' )",
"useLegacySql": False,
"priority": "BATCH",
}
},
location="us-central1"
)
insert_query_job
Pour tester ce DAG, vous pouvez le déclencher manuellement, puis afficher les journaux d'exécution des tâches.
Autres exemples de paramètres de planification
Les exemples suivants de paramètres de planification illustrent le fonctionnement de la planification avec différentes combinaisons de paramètres :
Si le paramètre
start_date
est défini surdatetime(2024, 4, 4, 16, 25)
et que le paramètreschedule_interval
est défini sur30 16 * * *
, la première exécution du DAG se produit à 16h30 le 5 avril 2024.Si le paramètre
start_date
est défini surdatetime(2024, 4, 4, 16, 35)
et que le paramètreschedule_interval
est défini sur30 16 * * *
, la première exécution du DAG se produit à 16h30 le 6 avril 2024. Étant donné que la date de début est postérieure à l'intervalle de planification le 4 avril 2024, l'exécution du DAG n'a pas lieu le 5 avril 2024. Au lieu de cela, l'intervalle de planification se termine le 5 avril 2024 à 16h35. L'exécution du DAG suivante est donc programmée pour 16h30 le lendemain.Si le paramètre
start_date
est défini surdatetime(2024, 4, 4)
et que le paramètreschedule_interval
est défini sur@daily
, la première exécution du DAG est planifiée pour le 5 avril 2024 à 00h00.Si le paramètre
start_date
est défini surdatetime(2024, 4, 4, 16, 30)
et que le paramètreschedule_interval
est défini sur0 * * * *
, la première exécution du DAG est planifiée pour le 4 avril 2024 à 18h00. Une fois la date et l'heure spécifiées écoulées, Airflow planifie l'exécution d'un DAG à la minute zéro de chaque heure. Le moment le plus proche lorsque cela se produit est 17h00. Pour le moment, Airflow planifie l'exécution d'un DAG à la fin de l'intervalle de planification, c'est-à-dire à 18h00.
Déclencher un DAG manuellement
Lorsque vous déclenchez manuellement un DAG Airflow, Airflow exécute le DAG une seule fois, indépendamment de la planification spécifiée dans le fichier DAG.
Console
Pour déclencher un DAG à partir de la console Google Cloud :
Dans la console Google Cloud , accédez à la page Environnements.
Sélectionnez un environnement pour afficher ses détails.
Sur la page Détails de l'environnement, accédez à l'onglet DAG.
Cliquez sur le nom d'un DAG.
Sur la page Détails du DAG, cliquez sur Déclencher le DAG. Une nouvelle exécution du DAG est créée.
Interface utilisateur d'Airflow
Pour déclencher un DAG à partir de l'UI Airflow :
Dans la console Google Cloud , accédez à la page Environnements.
Dans la colonne Serveur Web Airflow, cliquez sur le lien Airflow correspondant à votre environnement.
Connectez-vous avec le compte Google disposant des autorisations appropriées.
Dans l'interface Web Airflow, sur la page DAG, dans la colonne Actions de votre DAG, cliquez sur le bouton Déclencher le DAG.
gcloud
Exécutez la commande de CLI Airflow dags trigger
:
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
dags trigger -- DAG_ID
Remplacez les éléments suivants :
ENVIRONMENT_NAME
: nom de votre environnementLOCATION
: région où se trouve l'environnement.DAG_ID
: nom du DAG.
Pour en savoir plus sur l'exécution de commandes de CLI Airflow dans les environnements Cloud Composer, consultez la page Exécuter des commandes de CLI Airflow.
Pour en savoir plus sur les commandes de CLI Airflow disponibles, consultez la documentation de référence sur les commandes gcloud composer environments run
.
Afficher les journaux et les détails d'une exécution de DAG
Dans la console Google Cloud , vous pouvez :
- Affichez l'état des exécutions de DAG passées et les détails des DAG.
- Explorez les journaux détaillés de toutes les exécutions de DAG et de toutes les tâches de ces DAG.
- Affichez les statistiques du DAG.
De plus, Cloud Composer permet d'accéder à l'interface utilisateur Airflow, qui est la propre interface Web d'Airflow.
Suspendre un DAG
Console
Pour mettre en veille un DAG depuis la console Google Cloud :
Dans la console Google Cloud , accédez à la page Environnements.
Sélectionnez un environnement pour afficher ses détails.
Sur la page Détails de l'environnement, accédez à l'onglet DAG.
Cliquez sur le nom d'un DAG.
Sur la page Détails du DAG, cliquez sur Mettre en veille le DAG.
Interface utilisateur d'Airflow
Pour suspendre un DAG à partir de l'interface utilisateur d'Airflow :
- Dans la console Google Cloud , accédez à la page Environnements.
Accéder à la page Environnements
Dans la colonne Serveur Web Airflow, cliquez sur le lien Airflow correspondant à votre environnement.
Connectez-vous avec le compte Google disposant des autorisations appropriées.
Dans l'interface Web Airflow, sur la page DAG, cliquez sur le bouton bascule à côté du nom du DAG.
gcloud
Exécutez la commande de CLI Airflow dags pause
:
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
dags pause -- DAG_ID
Remplacez les éléments suivants :
ENVIRONMENT_NAME
: nom de votre environnementLOCATION
: région où se trouve l'environnement.DAG_ID
: nom du DAG.
Pour en savoir plus sur l'exécution de commandes de CLI Airflow dans les environnements Cloud Composer, consultez la page Exécuter des commandes de CLI Airflow.
Pour en savoir plus sur les commandes de CLI Airflow disponibles, consultez la documentation de référence sur les commandes gcloud composer environments run
.