Regrouper des tâches dans des DAG

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Cette page explique comment regrouper des tâches dans vos pipelines Airflow à l'aide des modèles de conception suivants :

  • Regrouper des tâches dans le graphique DAG.
  • Déclencher des DAG enfants à partir d'un DAG parent.
  • Regrouper des tâches avec l'opérateur TaskGroup

Regrouper des tâches dans le graphique DAG

Pour regrouper des tâches dans certaines phases de votre pipeline, vous pouvez utiliser des relations entre les tâches de votre fichier de DAG.

Prenons l'exemple suivant :

Graphique des tâches Airflow montrant les tâches de ramification
Figure 1 Les tâches peuvent être regroupées dans un DAG Airflow (cliquez pour agrandir)

Dans ce workflow, les tâches op-1 et op-2 s'exécutent ensemble après la tâche initiale start. Pour ce faire, regroupez des tâches avec l'instruction start >> [task_1, task_2].

L'exemple suivant fournit une implémentation complète de ce DAG:

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago

DAG_NAME = "all_tasks_in_one_dag"

args = {"owner": "airflow", "start_date": days_ago(1), "schedule_interval": "@once"}

with DAG(dag_id=DAG_NAME, default_args=args) as dag:
    start = DummyOperator(task_id="start")

    task_1 = BashOperator(task_id="op-1", bash_command=":", dag=dag)

    task_2 = BashOperator(task_id="op-2", bash_command=":", dag=dag)

    some_other_task = DummyOperator(task_id="some-other-task")

    task_3 = BashOperator(task_id="op-3", bash_command=":", dag=dag)

    task_4 = BashOperator(task_id="op-4", bash_command=":", dag=dag)

    end = DummyOperator(task_id="end")

    start >> [task_1, task_2] >> some_other_task >> [task_3, task_4] >> end

Déclencher des DAG enfants à partir d'un DAG parent

Vous pouvez déclencher un DAG à partir d'un autre DAG à l'aide de l'opérateur TriggerDagRunOperator.

Prenons l'exemple suivant :

Graphique des tâches Airflow montrant les DAG enfants déclenchés dans le cadre d'un graphique DAG
Figure 2 : Les DAG peuvent être déclenchés à partir d'un DAG avec l'opérateur TriggerDagRunOperator (cliquez pour agrandir)

Dans ce workflow, les blocs dag_1 et dag_2 représentent une série de tâches regroupées dans un DAG distinct dans l'environnement Cloud Composer.

La mise en œuvre de ce workflow nécessite deux fichiers DAG distincts. Le fichier de DAG de contrôle ressemble à ceci :

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.utils.dates import days_ago


with DAG(
    dag_id="controller_dag_to_trigger_other_dags",
    default_args={"owner": "airflow"},
    start_date=days_ago(1),
    schedule_interval="@once",
) as dag:
    start = DummyOperator(task_id="start")

    trigger_1 = TriggerDagRunOperator(
        task_id="dag_1",
        trigger_dag_id="dag-to-trigger",  # Ensure this equals the dag_id of the DAG to trigger
        conf={"message": "Hello World"},
    )
    trigger_2 = TriggerDagRunOperator(
        task_id="dag_2",
        trigger_dag_id="dag-to-trigger",  # Ensure this equals the dag_id of the DAG to trigger
        conf={"message": "Hello World"},
    )

    some_other_task = DummyOperator(task_id="some-other-task")

    end = DummyOperator(task_id="end")

    start >> trigger_1 >> some_other_task >> trigger_2 >> end

La mise en œuvre du DAG enfant, déclenché par le DAG contrôlant, se présente comme suit :

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago

DAG_NAME = "dag-to-trigger"

args = {"owner": "airflow", "start_date": days_ago(1), "schedule_interval": "None"}

with DAG(dag_id=DAG_NAME, default_args=args) as dag:
    dag_task = DummyOperator(task_id="dag-task")

Vous devez importer les deux fichiers DAG dans votre environnement Cloud Composer pour que le DAG fonctionne.

Regrouper des tâches avec l'opérateur TaskGroup

Vous pouvez utiliser l'opérateur TaskGroup pour regrouper des tâches dans votre DAG. Les tâches définies dans un bloc TaskGroup font toujours partie du DAG principal.

Prenons l'exemple suivant :

Graphique des tâches Airflow affichant deux groupes de tâches
Figure 3 Les tâches peuvent être regroupées visuellement dans l'interface utilisateur avec l'opérateur TaskGroup (cliquez pour agrandir)

Les tâches op-1 et op-2 sont regroupées dans un bloc doté de l'ID taskgroup_1. Une mise en œuvre de ce workflow ressemble au code suivant :

from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago
from airflow.utils.task_group import TaskGroup

with DAG(dag_id="taskgroup_example", start_date=days_ago(1)) as dag:
    start = DummyOperator(task_id="start")

    with TaskGroup("taskgroup_1", tooltip="task group #1") as section_1:
        task_1 = BashOperator(task_id="op-1", bash_command=":")
        task_2 = BashOperator(task_id="op-2", bash_command=":")

    with TaskGroup("taskgroup_2", tooltip="task group #2") as section_2:
        task_3 = BashOperator(task_id="op-3", bash_command=":")
        task_4 = BashOperator(task_id="op-4", bash_command=":")

    some_other_task = DummyOperator(task_id="some-other-task")

    end = DummyOperator(task_id="end")

    start >> section_1 >> some_other_task >> section_2 >> end

Étape suivante