Regrouper des tâches dans des DAG

Cloud Composer 1 | Cloud Composer 2

Cette page explique comment regrouper des tâches dans vos pipelines Airflow à l'aide des modèles de conception suivants :

  • Regrouper des tâches dans le graphique DAG.
  • Déclencher des DAG enfants à partir d'un DAG parent.
  • Regrouper des tâches avec l'opérateur TaskGroup (uniquement avec Airflow 2).

Regrouper des tâches dans le graphique DAG

Pour regrouper des tâches dans certaines phases de votre pipeline, vous pouvez utiliser des relations entre les tâches de votre fichier de DAG.

Prenons l'exemple suivant :

Les tâches peuvent être regroupées dans un DAG Airflow.

Dans ce workflow, les tâches op-1 et op-2 s'exécutent ensemble après la tâche initiale start. Pour ce faire, regroupez des tâches avec l'instruction start >> [task_1, task_2].

Le code suivant fournit une mise en œuvre complète du DAG ci-dessus :

Airflow 2

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago

DAG_NAME = 'all_tasks_in_one_dag'

args = {'owner': 'airflow', 'start_date': days_ago(1), 'schedule_interval': "@once"}

with DAG(dag_id=DAG_NAME, default_args=args) as dag:

    start = DummyOperator(
        task_id='start'
    )

    task_1 = BashOperator(
        task_id='op-1',
        bash_command=':',
        dag=dag)

    task_2 = BashOperator(
        task_id='op-2',
        bash_command=':',
        dag=dag)

    some_other_task = DummyOperator(
        task_id='some-other-task'
    )

    task_3 = BashOperator(
        task_id='op-3',
        bash_command=':',
        dag=dag)

    task_4 = BashOperator(
        task_id='op-4',
        bash_command=':',
        dag=dag)

    end = DummyOperator(
        task_id='end'
    )

    start >> [task_1, task_2] >> some_other_task >> [task_3, task_4] >> end

Airflow 1


from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago

DAG_NAME = 'all_tasks_in_one_dag'

args = {'owner': 'airflow', 'start_date': days_ago(1), 'schedule_interval': "@once"}

with DAG(dag_id=DAG_NAME, default_args=args) as dag:

    start = DummyOperator(
        task_id='start'
    )

    task_1 = BashOperator(
        task_id='op-1',
        bash_command=':',
        dag=dag)

    task_2 = BashOperator(
        task_id='op-2',
        bash_command=':',
        dag=dag)

    some_other_task = DummyOperator(
        task_id='some-other-task'
    )

    task_3 = BashOperator(
        task_id='op-3',
        bash_command=':',
        dag=dag)

    task_4 = BashOperator(
        task_id='op-4',
        bash_command=':',
        dag=dag)

    end = DummyOperator(
        task_id='end'
    )

    start >> [task_1, task_2] >> some_other_task >> [task_3, task_4] >> end

Déclencher des DAG enfants à partir d'un DAG parent

Vous pouvez déclencher un DAG à partir d'un autre DAG à l'aide de la commande TriggerDagRunOperator suivante : Selon votre version d'Airflow, vous trouverez l'opérateur TriggerDagRunOperator dans un autre module :

Prenons l'exemple suivant :

Les DAG peuvent être déclenchés à partir d'un DAG avec l'opérateur TriggerDagRunOperator.

Dans ce workflow, les blocs dag_1 et dag_2 représentent une série de tâches regroupées dans un DAG distinct dans l'environnement Cloud Composer.

La mise en œuvre de ce workflow nécessite deux fichiers DAG distincts. Le fichier de DAG de contrôle ressemble à ceci :

Airflow 2

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.utils.dates import days_ago

with DAG(
    dag_id="controller_dag_to_trigger_other_dags",
    default_args={"owner": "airflow"},
    start_date=days_ago(1),
    schedule_interval="@once",
) as dag:
    start = DummyOperator(
        task_id='start'
    )

    trigger_1 = TriggerDagRunOperator(
        task_id="dag_1",
        trigger_dag_id="dag-to-trigger",  # Ensure this equals the dag_id of the DAG to trigger
        conf={"message": "Hello World"}
    )
    trigger_2 = TriggerDagRunOperator(
        task_id="dag_2",
        trigger_dag_id="dag-to-trigger",  # Ensure this equals the dag_id of the DAG to trigger
        conf={"message": "Hello World"}
    )

    some_other_task = DummyOperator(
        task_id='some-other-task'
    )

    end = DummyOperator(
        task_id='end'
    )

    start >> trigger_1 >> some_other_task >> trigger_2 >> end

Airflow 1

from airflow import DAG
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago

with DAG(
    dag_id="controller_dag_to_trigger_other_dags",
    default_args={"owner": "airflow"},
    start_date=days_ago(1),
    schedule_interval="@once",
) as dag:
    start = DummyOperator(
        task_id='start'
    )

    trigger_1 = TriggerDagRunOperator(
        task_id="dag_1",
        trigger_dag_id="dag-to-trigger",  # Ensure this equals the dag_id of the DAG to trigger
        conf={"message": "Hello World"}
    )
    trigger_2 = TriggerDagRunOperator(
        task_id="dag_2",
        trigger_dag_id="dag-to-trigger",  # Ensure this equals the dag_id of the DAG to trigger
        conf={"message": "Hello World"}
    )

    some_other_task = DummyOperator(
        task_id='some-other-task'
    )

    end = DummyOperator(
        task_id='end'
    )

    start >> trigger_1 >> some_other_task >> trigger_2 >> end

La mise en œuvre du DAG enfant, déclenché par le DAG contrôlant, se présente comme suit :

Airflow 2

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago

DAG_NAME = 'dag-to-trigger'

args = {'owner': 'airflow', 'start_date': days_ago(1), 'schedule_interval': "None"}

with DAG(dag_id=DAG_NAME, default_args=args) as dag:

    dag_task = DummyOperator(
        task_id='dag-task'
    )

Airflow 1

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago

DAG_NAME = 'dag-to-trigger'

args = {'owner': 'airflow', 'start_date': days_ago(1), 'schedule_interval': "None"}

with DAG(dag_id=DAG_NAME, default_args=args) as dag:
    dag_task = DummyOperator(
        task_id='dag-task'
    )

Vous devez importer les deux fichiers DAG dans votre environnement Cloud Composer pour que le workflow fonctionne.

Regrouper des tâches avec l'opérateur TaskGroup

Dans Airflow 2, vous pouvez utiliser l'opérateur TaskGroup pour regrouper des tâches dans votre DAG. Les tâches définies dans un bloc TaskGroup font toujours partie du DAG principal.

Prenons l'exemple suivant :

Les tâches peuvent être regroupées visuellement dans l'interface utilisateur avec l'opérateur TaskGroup dans Airflow 2.

Les tâches op-1 et op-2 sont regroupées dans un bloc doté de l'ID taskgroup_1. Une mise en œuvre de ce workflow ressemble au code suivant :

from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago
from airflow.utils.task_group import TaskGroup

with DAG(dag_id="taskgroup_example", start_date=days_ago(1)) as dag:
    start = DummyOperator(task_id="start")

    with TaskGroup("taskgroup_1", tooltip="task group #1") as section_1:
        task_1 = BashOperator(task_id="op-1", bash_command=':')
        task_2 = BashOperator(task_id="op-2", bash_command=':')

    with TaskGroup("taskgroup_2", tooltip="task group #2") as section_2:
        task_3 = BashOperator(task_id="op-3", bash_command=':')
        task_4 = BashOperator(task_id="op-4", bash_command=':')

    some_other_task = DummyOperator(
        task_id='some-other-task'
    )

    end = DummyOperator(task_id='end')

    start >> section_1 >> some_other_task >> section_2 >> end