Group tasks inside DAGs

Cloud Composer 1 | Cloud Composer 2

This page describes how you can group tasks in your Airflow pipelines using the following design patterns:

  • Grouping tasks in the DAG graph.
  • Triggering children DAGs from a parent DAG.
  • Grouping tasks with the TaskGroup operator (only with Airflow 2).

Grouping tasks in the DAG graph

To group tasks in certain phases of your pipeline, you can use relationships between the tasks in your DAG file.

Consider the following example:

Tasks can be grouped together in an Airflow DAG

In this workflow, tasks op-1 and op-2 run together after the initial task start. You can achieve this by grouping tasks together with the statement start >> [task_1, task_2].

The following code provides a complete implementation of the DAG above:

Airflow 2

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago

DAG_NAME = 'all_tasks_in_one_dag'

args = {'owner': 'airflow', 'start_date': days_ago(1), 'schedule_interval': "@once"}

with DAG(dag_id=DAG_NAME, default_args=args) as dag:

    start = DummyOperator(
        task_id='start'
    )

    task_1 = BashOperator(
        task_id='op-1',
        bash_command=':',
        dag=dag)

    task_2 = BashOperator(
        task_id='op-2',
        bash_command=':',
        dag=dag)

    some_other_task = DummyOperator(
        task_id='some-other-task'
    )

    task_3 = BashOperator(
        task_id='op-3',
        bash_command=':',
        dag=dag)

    task_4 = BashOperator(
        task_id='op-4',
        bash_command=':',
        dag=dag)

    end = DummyOperator(
        task_id='end'
    )

    start >> [task_1, task_2] >> some_other_task >> [task_3, task_4] >> end

Airflow 1


from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago

DAG_NAME = 'all_tasks_in_one_dag'

args = {'owner': 'airflow', 'start_date': days_ago(1), 'schedule_interval': "@once"}

with DAG(dag_id=DAG_NAME, default_args=args) as dag:

    start = DummyOperator(
        task_id='start'
    )

    task_1 = BashOperator(
        task_id='op-1',
        bash_command=':',
        dag=dag)

    task_2 = BashOperator(
        task_id='op-2',
        bash_command=':',
        dag=dag)

    some_other_task = DummyOperator(
        task_id='some-other-task'
    )

    task_3 = BashOperator(
        task_id='op-3',
        bash_command=':',
        dag=dag)

    task_4 = BashOperator(
        task_id='op-4',
        bash_command=':',
        dag=dag)

    end = DummyOperator(
        task_id='end'
    )

    start >> [task_1, task_2] >> some_other_task >> [task_3, task_4] >> end

Triggering children DAGs from a parent DAG

You can trigger one DAG from another DAG with the TriggerDagRunOperator. Depending on your Airflow version, you can find the TriggerDagRunOperator operator in a different module:

Consider the following example:

DAGs can be triggered from within a DAG with the TriggerDagRunOperator

In this workflow, the blocks dag_1 and dag_2 represent a series of tasks that are grouped together in a separate DAG in the Cloud Composer environment.

The implementation of this workflow requires two separate DAG files. The controlling DAG file looks like the following:

Airflow 2

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.utils.dates import days_ago


with DAG(
    dag_id="controller_dag_to_trigger_other_dags",
    default_args={"owner": "airflow"},
    start_date=days_ago(1),
    schedule_interval="@once",
) as dag:
    start = DummyOperator(
        task_id='start'
    )

    trigger_1 = TriggerDagRunOperator(
        task_id="dag_1",
        trigger_dag_id="dag-to-trigger",  # Ensure this equals the dag_id of the DAG to trigger
        conf={"message": "Hello World"}
    )
    trigger_2 = TriggerDagRunOperator(
        task_id="dag_2",
        trigger_dag_id="dag-to-trigger",  # Ensure this equals the dag_id of the DAG to trigger
        conf={"message": "Hello World"}
    )

    some_other_task = DummyOperator(
        task_id='some-other-task'
    )

    end = DummyOperator(
        task_id='end'
    )

    start >> trigger_1 >> some_other_task >> trigger_2 >> end

Airflow 1

from airflow import DAG
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago


with DAG(
    dag_id="controller_dag_to_trigger_other_dags",
    default_args={"owner": "airflow"},
    start_date=days_ago(1),
    schedule_interval="@once",
) as dag:
    start = DummyOperator(
        task_id='start'
    )

    trigger_1 = TriggerDagRunOperator(
        task_id="dag_1",
        trigger_dag_id="dag-to-trigger",  # Ensure this equals the dag_id of the DAG to trigger
        conf={"message": "Hello World"}
    )
    trigger_2 = TriggerDagRunOperator(
        task_id="dag_2",
        trigger_dag_id="dag-to-trigger",  # Ensure this equals the dag_id of the DAG to trigger
        conf={"message": "Hello World"}
    )

    some_other_task = DummyOperator(
        task_id='some-other-task'
    )

    end = DummyOperator(
        task_id='end'
    )

    start >> trigger_1 >> some_other_task >> trigger_2 >> end

The implementation of the child DAG, which is triggered by the controlling DAG, looks like the following:

Airflow 2

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago

DAG_NAME = 'dag-to-trigger'

args = {'owner': 'airflow', 'start_date': days_ago(1), 'schedule_interval': "None"}

with DAG(dag_id=DAG_NAME, default_args=args) as dag:

    dag_task = DummyOperator(
        task_id='dag-task'
    )

Airflow 1

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago


DAG_NAME = 'dag-to-trigger'

args = {'owner': 'airflow', 'start_date': days_ago(1), 'schedule_interval': "None"}

with DAG(dag_id=DAG_NAME, default_args=args) as dag:
    dag_task = DummyOperator(
        task_id='dag-task'
    )

You must upload both DAG files in your Cloud Composer environment for the workflow to work.

Grouping tasks with the TaskGroup operator

In Airflow 2, you can use the TaskGroup operator to group tasks together in your DAG. Tasks defined within a TaskGroup block are still part of the main DAG.

Consider the following example:

Tasks can be visually grouped together in the UI with the TaskGroup operator in Airflow 2

The tasks op-1 and op-2 are grouped together in a block with ID taskgroup_1. An implementation of this workflow looks like the following code:

from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago
from airflow.utils.task_group import TaskGroup

with DAG(dag_id="taskgroup_example", start_date=days_ago(1)) as dag:
    start = DummyOperator(task_id="start")

    with TaskGroup("taskgroup_1", tooltip="task group #1") as section_1:
        task_1 = BashOperator(task_id="op-1", bash_command=':')
        task_2 = BashOperator(task_id="op-2", bash_command=':')

    with TaskGroup("taskgroup_2", tooltip="task group #2") as section_2:
        task_3 = BashOperator(task_id="op-3", bash_command=':')
        task_4 = BashOperator(task_id="op-4", bash_command=':')

    some_other_task = DummyOperator(
        task_id='some-other-task'
    )

    end = DummyOperator(task_id='end')

    start >> section_1 >> some_other_task >> section_2 >> end