Déclencher des DAG

Cloud Composer 1 | Cloud Composer 2

Cette page décrit différentes manières de déclencher des DAG dans les environnements Cloud Composer.

Airflow propose les méthodes suivantes pour déclencher un DAG :

  • Déclenchement planifié. Lorsque vous créez un DAG, vous spécifiez son exécution. Airflow déclenche le DAG automatiquement en fonction des paramètres de planification spécifiés.

  • Déclenchement manuel. Vous pouvez déclencher un DAG manuellement à partir de l'UI d'Airflow ou en exécutant une commande de CLI Airflow dans gcloud.

  • Déclenchement en réponse à des événements. Le moyen standard de déclencher un DAG en réponse à des événements consiste à utiliser un capteur.

Autres méthodes pour déclencher des DAG :

Déclencher un DAG selon un calendrier

Pour déclencher un DAG selon un calendrier :

  1. Spécifiez les paramètres start_date et schedule_interval dans le fichier DAG, comme décrit plus loin dans cette section.
  2. Importez le fichier DAG dans votre environnement.

Spécifier les paramètres de planification

Lorsque vous définissez un DAG, dans le paramètre schedule_interval, vous spécifiez la fréquence à laquelle vous souhaitez exécuter le DAG. Dans le paramètre start_date, vous indiquez à quel moment Airflow doit commencer à planifier votre DAG. Les tâches de votre DAG peuvent avoir des dates de début individuelles, ou vous pouvez spécifier une date de début unique pour toutes les tâches. En fonction de la date de début minimale des tâches de votre DAG et de l'intervalle de planification, Airflow planifie l'exécution du DAG.

La planification fonctionne de la manière suivante : Une fois l'opération start_date réussie, Airflow attend l'occurrence suivante de schedule_interval. Il planifie ensuite la première exécution du DAG à la fin de cet intervalle de planification. Par exemple, si un DAG est exécuté toutes les heures (schedule_interval a comme valeur 1 heure) et que la date de début est à 12h00, la première exécution du DAG a lieu aujourd'hui à 13h00.

L'exemple suivant présente un DAG qui s'exécute toutes les heures à partir de 15h00 le 5 avril 2021. Avec les paramètres utilisés dans l'exemple, Airflow planifie la première exécution du DAG à 16h00 le 5 avril 2021.

from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator

with DAG(
    dag_id='example_dag_schedule',
    # At 15:00 on 5 April, 2021
    start_date=datetime(2021, 4, 5, 15, 0),
    # At minute 0 of every hour
    schedule_interval='0 * * * *') as dag:

    # Output the current date and time
    t1 = BashOperator(
        task_id='date',
        bash_command='date',
        dag=dag)

    t1

Pour en savoir plus sur les paramètres de planification, consultez la page Exécutions de DAG dans la documentation d'Airflow.

Autres exemples de paramètres de planification

Les exemples suivants de paramètres de planification illustrent le fonctionnement de la planification avec différentes combinaisons de paramètres :

  • Si le paramètre start_date est défini sur datetime(2021, 4, 4, 16, 25) et que le paramètre schedule_interval est défini sur 30 16 * * *, la première exécution du DAG se produit à 16h30 le 5 avril 2021.
  • Si le paramètre start_date est défini sur datetime(2021, 4, 4, 16, 35) et que le paramètre schedule_interval est défini sur 30 16 * * *, la première exécution du DAG se produit à 16h30 le 6 avril 2021. Étant donné que la date de début est postérieure à l'intervalle de planification le 4 avril 2021, l'exécution du DAG n'a pas lieu le 5 avril 2021. Au lieu de cela, l'intervalle de planification se termine le 5 avril 2021 à 16h35. L'exécution du DAG suivante est donc programmée pour 16h30 le lendemain.
  • Si le paramètre start_date est défini sur datetime(2021, 4, 4) et que le paramètre schedule_interval est défini sur @daily, la première exécution du DAG est planifiée pour le 5 avril 2021 à 00h00.
  • Si le paramètre start_date est défini sur datetime(2021, 4, 4, 16, 30) et que le paramètre schedule_interval est défini sur 0 * * * *, la première exécution du DAG est planifiée pour le 4 avril 2021 à 18h00. Une fois la date et l'heure spécifiées écoulées, Airflow planifie l'exécution d'un DAG à la minute zéro de chaque heure. Le moment le plus proche lorsque cela se produit est 17h00. Pour le moment, Airflow planifie l'exécution d'un DAG à la fin de l'intervalle de planification, c'est-à-dire à 18h00.

Déclencher un DAG manuellement

Lorsque vous déclenchez un DAG manuellement, Airflow effectue une exécution du DAG. Par exemple, si vous avez un DAG qui s'exécute déjà selon un calendrier et que vous le déclenchez manuellement, Airflow exécute votre DAG une seule fois, indépendamment de la planification réelle spécifiée pour ce DAG.

Console

L'interface utilisateur du DAG est compatible avec Cloud Composer 1.17.8 et versions ultérieures.

Pour déclencher un DAG depuis la console Google Cloud:

  1. Dans la console Google Cloud, accédez à la page Environnements.

    Accéder à la page Environnements

  2. Sélectionnez un environnement pour afficher ses détails.

  3. Sur la page Détails de l'environnement, accédez à l'onglet DAG.

  4. Cliquez sur le nom d'un DAG.

  5. Sur la page Détails du DAG, cliquez sur Déclencher le DAG. Une exécution du DAG est créée.

Interface utilisateur d'Airflow

Pour déclencher un DAG à partir de l'interface Web d'Airflow, procédez comme suit :

  1. Dans la console Google Cloud, accédez à la page Environnements.

Accéder à la page Environnements

  1. Dans la colonne Serveur Web Airflow, cliquez sur le lien Airflow correspondant à votre environnement.

  2. Connectez-vous avec le compte Google disposant des autorisations appropriées.

  3. Dans l'interface Web Airflow, sur la page DAG, dans la colonne Liens de votre DAG, cliquez sur le bouton Déclencher le DAG.

  4. (Facultatif) Spécifiez la configuration de l'exécution du DAG.

  5. Cliquez sur Déclencher.

gcloud

Dans Airflow 1.10.12 ou version antérieure, exécutez la commande CLI Airflow trigger_dag:

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    trigger_dag -- DAG_ID

Dans Airflow 1.10.14 ou une version ultérieure, y compris Airflow 2, exécutez la commande CLI Airflow dags trigger:

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags trigger -- DAG_ID

Remplacez :

  • ENVIRONMENT_NAME par le nom de l'environnement.
  • LOCATION par la région dans laquelle se trouve l'environnement.
  • DAG_ID par le nom du DAG.

Pour en savoir plus sur l'exécution de commandes de CLI Airflow dans les environnements Cloud Composer, consultez la page Exécuter des commandes de CLI Airflow.

Pour en savoir plus sur les commandes de CLI Airflow disponibles, consultez la documentation de référence sur les commandes gcloud composer environments run.

Étapes suivantes