Planifier et déclencher des DAG Airflow

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Cette page explique le fonctionnement de la planification et du déclenchement de DAG dans Airflow, comment définir une planification pour un DAG, et comment déclencher un DAG manuellement ou le mettre en pause.

À propos des DAG Airflow dans Cloud Composer

Dans Cloud Composer, les DAG Airflow sont exécutés un ou plusieurs environnements Cloud Composer dans votre projet. Vous importez les fichiers sources de vos DAG Airflow dans un bucket Cloud Storage associé à un environnement. L'instance d'Airflow de l'environnement analyse ensuite ces fichiers et planifie les exécutions de DAG, comme défini par la planification de chaque DAG. Lors de l'exécution d'un DAG, Airflow planifie et exécute les tâches individuelles qui composent un DAG dans une séquence définie par le DAG.

Pour en savoir plus sur les concepts fondamentaux d'Airflow, tels que les DAG Airflow, les exécutions de DAG, les tâches ou les opérateurs, consultez la page Concepts fondamentaux de la documentation Airflow.

À propos de la planification des DAG dans Airflow

Airflow propose les concepts suivants pour son mécanisme de planification:

Date logique

Représente la date à laquelle une exécution de DAG particulière est effectuée.

Il ne s'agit pas de la date réelle à laquelle Airflow exécute un DAG, mais d'une période de temps qu'une exécution de DAG spécifique doit traiter. Par exemple, pour un DAG programmé pour s'exécuter tous les jours à 12h00, la date logique sera également 12h00 pour un jour spécifique. Puisqu'il s'exécute deux fois par jour, la durée pendant laquelle sur les 12 dernières heures. Dans le même temps, la logique définie dans le Le DAG lui-même peut ne pas utiliser du tout la date logique ou l'intervalle de temps. Par exemple, un DAG peut exécuter le même script une fois par jour sans utiliser le de la date logique.

Dans les versions d'Airflow antérieures à la version 2.2, cette date est appelée date d'exécution.

Date d'exécution

Représente la date à laquelle une exécution de DAG spécifique est exécutée.

Par exemple, pour un DAG programmé pour s'exécuter tous les jours à 12h, l'exécution réelle du DAG peut avoir lieu à 12h05, quelque temps après une date logique passe.

Intervalle de planification

Représente quand et à quelle fréquence un DAG doit être exécuté, en termes de logique dates.

Par exemple, une programmation quotidienne signifie qu'un DAG est exécuté une fois par jour, Les dates logiques d'exécution des DAG ont des intervalles de 24 heures.

Date de début

Indique à quel moment vous souhaitez qu'Airflow commence à planifier votre DAG.

Les tâches de votre DAG peuvent avoir des dates de début individuelles, ou vous pouvez spécifier une une seule date de début pour toutes les tâches. En fonction de la date de début minimale des tâches de votre DAG et de l'intervalle de planification, Airflow planifie l'exécution du DAG.

Mise à jour, remplissage et nouvelles tentatives

Mécanismes d'exécution des exécutions de DAG pour des dates passées

Le rattrapage permet d'exécuter les exécutions de DAG qui n'ont pas encore été exécutées. Par exemple : si le DAG a été mis en pause pendant une longue période, puis réactivé. Vous pouvez utiliser le remplissage pour exécuter des exécutions DAG pour une certaine plage de dates. Nouvelles tentatives spécifier le nombre de tentatives qu'Airflow doit effectuer lors de l'exécution de tâches à partir d'un DAG.

La planification fonctionne comme suit :

  1. Une fois la date de début dépassée, Airflow attend l'occurrence suivante de la valeur intervalle de planification.

  2. Airflow planifie l'exécution de la première exécution du DAG à la fin de cette programmation l'intervalle.

    Par exemple, si un DAG est programmé pour s'exécuter toutes les heures et que la date de début est à 12h aujourd'hui, la première exécution du DAG aura lieu aujourd'hui à 13h00.

La section Planifier un DAG Airflow de ce document explique comment configurer la planification de vos DAG à l'aide de ces concepts. Pour plus sur les exécutions et la planification des DAG, consultez Consultez la section Exécutions DAG de la documentation Airflow.

Méthodes de déclenchement d'un DAG

Airflow propose les méthodes suivantes pour déclencher un DAG :

  • Déclenchement planifié. Airflow déclenche le DAG automatiquement en fonction de la planification spécifiée dans le fichier DAG.

  • Déclenchement manuel Vous pouvez déclencher un DAG manuellement à partir de la console Google Cloud, de l'UI d'Airflow ou en exécutant une commande de CLI Airflow à partir de la CLI Google Cloud.

  • Déclencher en réponse à des événements Le moyen standard de déclencher un DAG en réponse à des événements consiste à utiliser un capteur.

Autres méthodes pour déclencher des DAG :

Avant de commencer

  • Assurez-vous que votre compte dispose d'un rôle permettant de gérer les objets dans les buckets d'environnement, et d'afficher et de déclencher des DAG. Pour plus d'informations, consultez la section Contrôle des accès.

Planifier un DAG Airflow

Vous définissez la programmation d'un DAG dans le fichier du DAG. Modifiez la définition du DAG comme suit :

  1. Recherchez et modifiez le fichier DAG sur votre ordinateur. Si vous ne disposez pas du fichier DAG, vous pouvez télécharger sa copie à partir du bucket de l'environnement. Pour un nouveau DAG, vous pouvez définir tous les paramètres lorsque vous créez le fichier DAG.

  2. Dans le paramètre schedule_interval, définissez la planification. Vous pouvez utiliser une expression CRON, telle que 0 0 * * *, ou un préréglage, tel que @daily. Pour en savoir plus, consultez la section Cron et intervalles de temps dans la documentation Airflow.

    Airflow détermine les dates logiques d'exécution des DAG en fonction de la programmation que vous avez défini.

  3. Dans le paramètre start_date, définissez la date de début.

    Airflow détermine la date logique de la première exécution du DAG à l'aide de ce .

  4. (Facultatif) Dans le paramètre catchup, indiquez si Airflow doit exécuter toutes les exécutions précédentes de ce DAG de la date de début à la date actuelle qui n'ont pas encore été exécutées.

    Les exécutions DAG exécutées pendant la récupération auront une date logique dans le passé et leur date d'exécution reflétera l'heure à laquelle l'exécution DAG a été effectivement exécutée.

  5. (Facultatif) Dans le paramètre retries, définissez le nombre de fois où Airflow doit réessayer les tâches ayant échoué (chaque DAG se compose d'une ou de plusieurs tâches individuelles). Par défaut, les tâches Cloud Composer sont relancées deux fois fois.

  6. Importez la nouvelle version du DAG dans le bucket de l'environnement.

  7. Attendez qu'Airflow analyse le DAG. Par exemple, vous pouvez consulter la liste des DAG de votre environnement dans la console Google Cloud ou dans l'interface utilisateur d'Airflow.

L'exemple de définition de DAG suivant s'exécute deux fois par jour à 00h00 et 12h00. Son La date de début est définie sur le 1er janvier 2024, mais Airflow ne l'exécute pas pour des dates passées après l'avoir mise en ligne ou mise en pause, car le rattrapage est désactivé.

Le DAG contient une tâche nommée insert_query_job, qui insère une ligne dans une avec l'opérateur BigQueryInsertJobOperator. Cet opérateur fait partie des opérateurs Google Cloud BigQuery, que vous pouvez utiliser pour gérer des ensembles de données et des tables, exécuter des requêtes et valider des données. Si une exécution particulière de cette tâche échoue, Airflow la relance quatre fois avec l'intervalle de nouvelle tentative par défaut. La date logique de ces nouvelles tentatives reste la même.

La requête SQL de cette ligne utilise des modèles Airflow pour écrire la date et le nom logiques du DAG dans la ligne.

import datetime

from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator

with DAG(
  "bq_example_scheduling_dag",
  start_date=datetime.datetime(2024, 1, 1),
  schedule_interval='0 */12 * * *',
  catchup=False
  ) as dag:

  insert_query_job = BigQueryInsertJobOperator(
    task_id="insert_query_job",
    retries=4,
    configuration={
        "query": {
            # schema: date (string), description (string)
            # example row: "20240101T120000", "DAG run: <DAG: bq_example_scheduling_dag>"
            "query": "INSERT example_dataset.example_table VALUES ('{{ ts_nodash }}', 'DAG run: {{ dag }}' )",
            "useLegacySql": False,
            "priority": "BATCH",
        }
    },
    location="us-central1"
  )

  insert_query_job

Pour tester ce DAG, vous pouvez le déclencher manuellement, puis afficher les journaux d'exécution des tâches.

Autres exemples de paramètres de planification

Les exemples de paramètres de planification suivants illustrent le fonctionnement de la planification avec différentes combinaisons de paramètres :

  • Si start_date est défini sur datetime(2024, 4, 4, 16, 25) et que schedule_interval est défini sur 30 16 * * *, la première exécution du DAG aura lieu le 5 avril 2024 à 16h30.

  • Si le paramètre start_date est défini sur datetime(2024, 4, 4, 16, 35) et que le paramètre schedule_interval est défini sur 30 16 * * *, la première exécution du DAG se produit à 16h30 le 6 avril 2024. Étant donné que la date de début est postérieure à l'intervalle de planification le 4 avril 2024, l'exécution du DAG n'a pas lieu le 5 avril 2024. Au lieu de cela, le calendrier se termine à 16h35 le 5 avril 2024, la prochaine exécution du DAG est donc planifiée à 16h30 le lendemain.

  • Si le paramètre start_date est défini sur datetime(2024, 4, 4) et que le paramètre schedule_interval est défini sur @daily, la première exécution du DAG est planifiée pour le 5 avril 2024 à 00h00.

  • Si start_date est défini sur datetime(2024, 4, 4, 16, 30), et schedule_interval est 0 * * * *, la première exécution du DAG est prévu pour le 4 avril 2024 à 18h. Une fois la date et l'heure spécifiées écoulées, Airflow planifie l'exécution d'un DAG à la minute zéro de chaque heure. Le moment le plus proche lorsque cela se produit est 17h00. Pour le moment, Airflow planifie l'exécution d'un DAG à la fin de l'intervalle de planification, c'est-à-dire à 18h00.

Déclencher un DAG manuellement

Lorsque vous déclenchez manuellement un DAG Airflow, Airflow exécute le DAG une seule fois, indépendamment de la planification spécifiée dans le fichier DAG.

Console

L'UI DAG est compatible avec Cloud Composer 1.17.8 et versions ultérieures.

Pour déclencher un DAG depuis la console Google Cloud:

  1. Dans la console Google Cloud, accédez à la page Environnements.

    Accéder à la page Environnements

  2. Sélectionnez un environnement pour afficher ses détails.

  3. Sur la page Détails de l'environnement, accédez à l'onglet DAG.

  4. Cliquez sur le nom d'un DAG.

  5. Sur la page Détails du DAG, cliquez sur Déclencher le DAG. Une nouvelle exécution du DAG créé.

Interface utilisateur d'Airflow

Pour déclencher un DAG à partir de l'interface utilisateur d'Airflow :

  1. Dans la console Google Cloud, accédez à la page Environnements.

    Accéder à la page Environnements

  2. Dans la colonne Serveur Web Airflow, cliquez sur le lien Airflow correspondant à votre environnement.

  3. Connectez-vous avec le compte Google disposant des autorisations appropriées.

  4. Dans l'interface Web Airflow, sur la page DAG, dans la colonne Liens de votre DAG, cliquez sur le bouton Déclencher le DAG.

  5. (Facultatif) Spécifiez la configuration de l'exécution du DAG.

  6. Cliquez sur Déclencher.

gcloud

Dans Airflow 1.10.12 ou version antérieure, exécutez la commande trigger_dag de la CLI Airflow:

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    trigger_dag -- DAG_ID

Dans Airflow 1.10.14 ou version ultérieure, y compris Airflow 2, exécutez la commande dags trigger Commande CLI Airflow:

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags trigger -- DAG_ID

Remplacez les éléments suivants :

  • ENVIRONMENT_NAME : nom de votre environnement
  • LOCATION: région dans laquelle se trouve l'environnement.
  • DAG_ID: nom du DAG.

Pour en savoir plus sur l'exécution de commandes de CLI Airflow dans les environnements Cloud Composer, consultez la page Exécuter des commandes de CLI Airflow.

Pour en savoir plus sur les commandes de CLI Airflow disponibles, consultez la documentation de référence sur les commandes gcloud composer environments run.

Afficher les journaux et les détails d'exécution du DAG

Dans la console Google Cloud, vous pouvez :

De plus, Cloud Composer permet d'accéder à l'interface utilisateur d'Airflow, qui est la propre interface Web d'Airflow.

Suspendre un DAG

Console

L'UI du DAG est compatible avec Cloud Composer 1.17.8 et versions ultérieures.

Pour suspendre un DAG à partir de la console Google Cloud :

  1. Dans la console Google Cloud, accédez à la page Environnements.

    Accéder à la page Environnements

  2. Sélectionnez un environnement pour afficher ses détails.

  3. Sur la page Détails de l'environnement, accédez à l'onglet DAG.

  4. Cliquez sur le nom d'un DAG.

  5. Sur la page Détails du DAG, cliquez sur Suspendre le DAG.

Interface utilisateur d'Airflow

Pour suspendre un DAG à partir de l'interface utilisateur d'Airflow, procédez comme suit:

  1. Dans la console Google Cloud, accédez à la page Environnements.

Accéder à la page Environnements

  1. Dans la colonne Serveur Web Airflow, cliquez sur le lien Airflow correspondant à votre environnement.

  2. Connectez-vous avec le compte Google disposant des autorisations appropriées.

  3. Dans l'interface Web Airflow, sur la page DAG, cliquez sur le bouton bascule à côté du nom du DAG.

gcloud

Dans Airflow 1.10.12 ou version antérieure, exécutez la commande de CLI Airflow pause :

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    pause -- DAG_ID

Dans Airflow 1.10.14 ou version ultérieure, y compris Airflow 2, exécutez la commande de CLI Airflow dags pause :

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags pause -- DAG_ID

Remplacez les éléments suivants :

  • ENVIRONMENT_NAME : nom de votre environnement
  • LOCATION: région dans laquelle se trouve l'environnement.
  • DAG_ID: nom du DAG.

Pour en savoir plus sur l'exécution de commandes de CLI Airflow dans les environnements Cloud Composer, consultez la page Exécuter des commandes de CLI Airflow.

Pour en savoir plus sur les commandes de CLI Airflow disponibles, consultez la documentation de référence sur les commandes gcloud composer environments run.

Étape suivante