Regrouper des tâches dans les DAG

Cette page explique comment regrouper des tâches dans vos pipelines Airflow à l'aide des modèles de conception suivants:

  • Regrouper des tâches dans le graphique du DAG
  • Déclencher des DAG d'enfants à partir d'un DAG parent
  • Regroupez les tâches avec l'opérateur TaskGroup (uniquement avec Airflow 2).

Regrouper des tâches dans le graphique du DAG

Pour regrouper des tâches dans certaines phases de votre pipeline, vous pouvez utiliser des relations entre les tâches de votre fichier DAG.

Prenons l'exemple suivant :

Les tâches peuvent être regroupées dans un DAG Airflow

Dans ce workflow, les tâches op-1 et op-2 s'exécutent après la tâche start initiale. Pour ce faire, regroupez les tâches et l'instruction start >> [task_1, task_2].

Le code suivant fournit une implémentation complète du DAG ci-dessus:

Airflow 2

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago

DAG_NAME = 'all_tasks_in_one_dag'

args = {'owner': 'airflow', 'start_date': days_ago(1), 'schedule_interval': "@once"}

with DAG(dag_id=DAG_NAME, default_args=args) as dag:

    start = DummyOperator(
        task_id='start'
    )

    task_1 = BashOperator(
        task_id='op-1',
        bash_command=':',
        dag=dag)

    task_2 = BashOperator(
        task_id='op-2',
        bash_command=':',
        dag=dag)

    some_other_task = DummyOperator(
        task_id='some-other-task'
    )

    task_3 = BashOperator(
        task_id='op-3',
        bash_command=':',
        dag=dag)

    task_4 = BashOperator(
        task_id='op-4',
        bash_command=':',
        dag=dag)

    end = DummyOperator(
        task_id='end'
    )

    start >> [task_1, task_2] >> some_other_task >> [task_3, task_4] >> end

Airflow 1


from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago

DAG_NAME = 'all_tasks_in_one_dag'

args = {'owner': 'airflow', 'start_date': days_ago(1), 'schedule_interval': "@once"}

with DAG(dag_id=DAG_NAME, default_args=args) as dag:

    start = DummyOperator(
        task_id='start'
    )

    task_1 = BashOperator(
        task_id='op-1',
        bash_command=':',
        dag=dag)

    task_2 = BashOperator(
        task_id='op-2',
        bash_command=':',
        dag=dag)

    some_other_task = DummyOperator(
        task_id='some-other-task'
    )

    task_3 = BashOperator(
        task_id='op-3',
        bash_command=':',
        dag=dag)

    task_4 = BashOperator(
        task_id='op-4',
        bash_command=':',
        dag=dag)

    end = DummyOperator(
        task_id='end'
    )

    start >> [task_1, task_2] >> some_other_task >> [task_3, task_4] >> end

Déclencher des DAG d'enfants à partir d'un DAG parent

Vous pouvez déclencher un DAG à partir d'un autre DAG à l'aide de l'TriggerDagRunOperator. Selon la version d'Airflow, vous pouvez trouver l'opérateur TriggerDagRunOperator dans un autre module:

Prenons l'exemple suivant :

Les DAG peuvent être déclenchés à partir d'un DAG à l'aide de TriggerDagRunOperator.

Dans ce workflow, les blocs dag_1 et dag_2 représentent une série de tâches regroupées dans un DAG distinct dans l'environnement Cloud Composer.

La mise en œuvre de ce workflow nécessite deux fichiers DAG distincts. Le fichier DAG de contrôle se présente comme suit:

Airflow 2

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.utils.dates import days_ago

with DAG(
    dag_id="controller_dag_to_trigger_other_dags",
    default_args={"owner": "airflow"},
    start_date=days_ago(1),
    schedule_interval="@once",
) as dag:
    start = DummyOperator(
        task_id='start'
    )

    trigger_1 = TriggerDagRunOperator(
        task_id="dag_1",
        trigger_dag_id="dag-to-trigger",  # Ensure this equals the dag_id of the DAG to trigger
        conf={"message": "Hello World"}
    )
    trigger_2 = TriggerDagRunOperator(
        task_id="dag_2",
        trigger_dag_id="dag-to-trigger",  # Ensure this equals the dag_id of the DAG to trigger
        conf={"message": "Hello World"}
    )

    some_other_task = DummyOperator(
        task_id='some-other-task'
    )

    end = DummyOperator(
        task_id='end'
    )

    start >> trigger_1 >> some_other_task >> trigger_2 >> end

Airflow 1

from airflow import DAG
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago

with DAG(
    dag_id="controller_dag_to_trigger_other_dags",
    default_args={"owner": "airflow"},
    start_date=days_ago(1),
    schedule_interval="@once",
) as dag:
    start = DummyOperator(
        task_id='start'
    )

    trigger_1 = TriggerDagRunOperator(
        task_id="dag_1",
        trigger_dag_id="dag-to-trigger",  # Ensure this equals the dag_id of the DAG to trigger
        conf={"message": "Hello World"}
    )
    trigger_2 = TriggerDagRunOperator(
        task_id="dag_2",
        trigger_dag_id="dag-to-trigger",  # Ensure this equals the dag_id of the DAG to trigger
        conf={"message": "Hello World"}
    )

    some_other_task = DummyOperator(
        task_id='some-other-task'
    )

    end = DummyOperator(
        task_id='end'
    )

    start >> trigger_1 >> some_other_task >> trigger_2 >> end

La mise en œuvre du DAG enfant, déclenchée par le DAG de contrôle, ressemble à ceci:

Airflow 2

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago

DAG_NAME = 'dag-to-trigger'

args = {'owner': 'airflow', 'start_date': days_ago(1), 'schedule_interval': "None"}

with DAG(dag_id=DAG_NAME, default_args=args) as dag:

    dag_task = DummyOperator(
        task_id='dag-task'
    )

Airflow 1

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago

DAG_NAME = 'dag-to-trigger'

args = {'owner': 'airflow', 'start_date': days_ago(1), 'schedule_interval': "None"}

with DAG(dag_id=DAG_NAME, default_args=args) as dag:
    dag_task = DummyOperator(
        task_id='dag-task'
    )

Vous devez importer les deux fichiers DAG dans votre environnement Cloud Composer pour que le workflow fonctionne.

Regrouper des tâches avec l'opérateur TaskGroup

Dans Airflow 2, vous pouvez utiliser l'opérateur TaskGroup pour regrouper les tâches dans votre DAG. Les tâches définies dans un bloc TaskGroup font toujours partie du DAG principal.

Prenons l'exemple suivant :

Les tâches peuvent être regroupées sous l'interface utilisateur d'Airflow 2 dans l'interface utilisateur.

Les tâches op-1 et op-2 sont regroupées dans un bloc avec l'ID taskgroup_1. La mise en œuvre de ce workflow se présente comme suit:

from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago
from airflow.utils.task_group import TaskGroup

with DAG(dag_id="taskgroup_example", start_date=days_ago(1)) as dag:
    start = DummyOperator(task_id="start")

    with TaskGroup("taskgroup_1", tooltip="task group #1") as section_1:
        task_1 = BashOperator(task_id="op-1", bash_command=':')
        task_2 = BashOperator(task_id="op-2", bash_command=':')

    with TaskGroup("taskgroup_2", tooltip="task group #2") as section_2:
        task_3 = BashOperator(task_id="op-3", bash_command=':')
        task_4 = BashOperator(task_id="op-4", bash_command=':')

    some_other_task = DummyOperator(
        task_id='some-other-task'
    )

    end = DummyOperator(task_id='end')

    start >> section_1 >> some_other_task >> section_2 >> end