Écrire des DAG (workflows)

Ce guide explique comment écrire un DAG (graphe orienté acyclique) Apache Airflow exécuté dans un environnement Cloud Composer.

Structurer un DAG

Un DAG Airflow est défini dans un fichier Python et comprend les composants suivants : une définition du DAG, des opérateurs et des relations entre opérateurs. Les extraits de code suivants illustrent des exemples de chaque composant hors contexte :

  1. La définition du DAG.

    import datetime
    
    from airflow import models
    default_dag_args = {
        # The start_date describes when a DAG is valid / can be run. Set this to a
        # fixed point in time rather than dynamically, since it is evaluated every
        # time a DAG is parsed. See:
        # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
        'start_date': datetime.datetime(2018, 1, 1),
    }
    
    # Define a DAG (directed acyclic graph) of tasks.
    # Any task you create within the context manager is automatically added to the
    # DAG object.
    with models.DAG(
            'composer_sample_simple_greeting',
            schedule_interval=datetime.timedelta(days=1),
            default_args=default_dag_args) as dag:
  2. Des opérateurs pour décrire la tâche à effectuer. L'instanciation d'un opérateur s'appelle une tâche.

    from airflow.operators import bash_operator
    from airflow.operators import python_operator
        def greeting():
            import logging
            logging.info('Hello World!')
    
        # An instance of an operator is called a task. In this case, the
        # hello_python task calls the "greeting" Python function.
        hello_python = python_operator.PythonOperator(
            task_id='hello',
            python_callable=greeting)
    
        # Likewise, the goodbye_bash task calls a Bash script.
        goodbye_bash = bash_operator.BashOperator(
            task_id='bye',
            bash_command='echo Goodbye.')
  3. Les relations entre opérateurs pour décrire l'ordre dans lequel la tâche doit être effectuée.

    # Define the order in which the tasks complete by using the >> and <<
    # operators. In this example, hello_python executes before goodbye_bash.
    hello_python >> goodbye_bash

Le workflow suivant est un exemple fonctionnel complet et se compose de deux tâches, hello_python et goodbye_bash :

from __future__ import print_function

import datetime

from airflow import models
from airflow.operators import bash_operator
from airflow.operators import python_operator

default_dag_args = {
    # The start_date describes when a DAG is valid / can be run. Set this to a
    # fixed point in time rather than dynamically, since it is evaluated every
    # time a DAG is parsed. See:
    # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
    'start_date': datetime.datetime(2018, 1, 1),
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
        'composer_sample_simple_greeting',
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:
    def greeting():
        import logging
        logging.info('Hello World!')

    # An instance of an operator is called a task. In this case, the
    # hello_python task calls the "greeting" Python function.
    hello_python = python_operator.PythonOperator(
        task_id='hello',
        python_callable=greeting)

    # Likewise, the goodbye_bash task calls a Bash script.
    goodbye_bash = bash_operator.BashOperator(
        task_id='bye',
        bash_command='echo Goodbye.')

    # Define the order in which the tasks complete by using the >> and <<
    # operators. In this example, hello_python executes before goodbye_bash.
    hello_python >> goodbye_bash

Consultez le tutoriel Airflow et les concepts Airflow pour en savoir plus sur la définition des DAG Airflow.

Opérateurs

Les exemples suivants présentent quelques opérateurs Airflow connus. Pour obtenir une documentation de référence primaire sur les opérateurs Airflow, consultez la documentation de référence sur l'API Apache Airflow ou parcourez le code source des opérateurs principaux et de contribution.

BashOperator

Utilisez BashOperator pour exécuter des programmes de ligne de commande.

from airflow.operators import bash_operator
    # Create BigQuery output dataset.
    make_bq_dataset = bash_operator.BashOperator(
        task_id='make_bq_dataset',
        # Executing 'bq' command requires Google Cloud SDK which comes
        # preinstalled in Cloud Composer.
        bash_command='bq ls {} || bq mk {}'.format(
            bq_dataset_name, bq_dataset_name))

Cloud Composer exécute les commandes fournies dans un script bash sur un nœud de calcul. Le nœud de calcul est un conteneur Docker basé sur Debian et comprend plusieurs packages.

PythonOperator

Utilisez PythonOperator pour exécuter le code Python de votre choix. Cloud Composer exécute le code Python dans un conteneur qui inclut les packages suivants :

Pour installer des packages Python supplémentaires, reportez-vous à la section Installer des dépendances Python.

Opérateurs Google Cloud

Utilisez les opérateurs Airflow pour Google Cloud afin d'exécuter des tâches qui utilisent des produits Google Cloud. Cloud Composer configure automatiquement une connexion Airflow sur le projet de l'environnement.

EmailOperator

Utilisez EmailOperator pour envoyer un e-mail à partir d'un DAG. Pour envoyer un e-mail à partir d'un environnement Cloud Composer, vous devez configurer votre environnement pour qu'il utilise SendGrid.

from airflow.operators import email_operator
    # Send email confirmation
    email_summary = email_operator.EmailOperator(
        task_id='email_summary',
        to=models.Variable.get('email'),
        subject='Sample BigQuery notify data ready',
        html_content="""
        Analyzed Stack Overflow posts data from {min_date} 12AM to {max_date}
        12AM. The most popular question was '{question_title}' with
        {view_count} views. Top 100 questions asked are now available at:
        {export_location}.
        """.format(
            min_date=min_query_date,
            max_date=max_query_date,
            question_title=(
                '{{ ti.xcom_pull(task_ids=\'bq_read_most_popular\', '
                'key=\'return_value\')[0][0] }}'
            ),
            view_count=(
                '{{ ti.xcom_pull(task_ids=\'bq_read_most_popular\', '
                'key=\'return_value\')[0][1] }}'
            ),
            export_location=output_file))

Notifications

Définissez email_on_failure sur True pour envoyer une notification par e-mail lorsqu'un opérateur du DAG échoue. Pour envoyer des notifications par e-mail à partir d'un environnement Cloud Composer, vous devez configurer votre environnement pour qu'il utilise SendGrid.

from airflow import models
default_dag_args = {
    'start_date': yesterday,
    # Email whenever an Operator in the DAG fails.
    'email': models.Variable.get('email'),
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': models.Variable.get('gcp_project')
}

with models.DAG(
        'composer_sample_bq_notify',
        schedule_interval=datetime.timedelta(weeks=4),
        default_args=default_dag_args) as dag:

Consignes

  1. Placez toutes les bibliothèques Python personnalisées dans l'archive ZIP des DAG, dans un répertoire imbriqué. Ne placez pas de bibliothèques au niveau supérieur du répertoire des DAG.

    Lorsque Airflow analyse le dossier dags/, il ne recherche que les DAG des modules Python situés au niveau supérieur du dossier des DAG et au niveau supérieur d'une archive ZIP également située au niveau supérieur du dossier dags/. Si Airflow rencontre un module Python dans une archive ZIP ne contenant pas les deux sous-chaînes airflow et DAG, il cesse de traiter l'archive ZIP. Airflow ne renvoie que les DAG trouvés jusqu'à ce point.

  2. Pour la tolérance aux pannes, ne définissez pas plusieurs objets DAG dans le même module Python.

  3. Ne définissez pas les subDAG en tant qu'objets de niveau supérieur.

    En règle générale, Airflow sélectionne les objets DAG dans l'espace de noms global d'un module situé dans le répertoire dags/ en tant que DAG de niveau supérieur. Tous les subDAG définis en tant qu'objets de niveau supérieur s'exécutent selon leur propre planification, en plus des planifications d'autres DAG intégrant les subDAG.

  4. Placez les fichiers requis au moment de l'analyse du DAG dans le répertoire dags/, et non dans le répertoire data/. Le répertoire data/ n'est pas installé sur le serveur Web.

FAQ

Créer des DAG

Utiliser des opérateurs

Étape suivante