Agrupar tarefas dentro de DAGs

Cloud Composer 1 | Cloud Composer 2

Nesta página, descrevemos como agrupar tarefas em pipelines do Airflow usando os seguintes padrões de design:

  • Agrupar tarefas no gráfico do DAG.
  • Como acionar DAGs filhos de um DAG pai.
  • Agrupar tarefas com o operador TaskGroup (somente com o Airflow 2).

Agrupar tarefas no gráfico do DAG

Para agrupar tarefas em determinadas fases do pipeline, use as relações entre as tarefas no arquivo DAG.

Veja o exemplo a seguir.

As tarefas podem ser agrupadas em um DAG do Airflow.

Neste fluxo de trabalho, as tarefas op-1 e op-2 são executadas juntas após a tarefa inicial start. Para isso, agrupe as tarefas com a instrução start >> [task_1, task_2].

O código a seguir fornece uma implementação completa do DAG acima:

Airflow 2

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago

DAG_NAME = 'all_tasks_in_one_dag'

args = {'owner': 'airflow', 'start_date': days_ago(1), 'schedule_interval': "@once"}

with DAG(dag_id=DAG_NAME, default_args=args) as dag:

    start = DummyOperator(
        task_id='start'
    )

    task_1 = BashOperator(
        task_id='op-1',
        bash_command=':',
        dag=dag)

    task_2 = BashOperator(
        task_id='op-2',
        bash_command=':',
        dag=dag)

    some_other_task = DummyOperator(
        task_id='some-other-task'
    )

    task_3 = BashOperator(
        task_id='op-3',
        bash_command=':',
        dag=dag)

    task_4 = BashOperator(
        task_id='op-4',
        bash_command=':',
        dag=dag)

    end = DummyOperator(
        task_id='end'
    )

    start >> [task_1, task_2] >> some_other_task >> [task_3, task_4] >> end

Airflow 1


from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago

DAG_NAME = 'all_tasks_in_one_dag'

args = {'owner': 'airflow', 'start_date': days_ago(1), 'schedule_interval': "@once"}

with DAG(dag_id=DAG_NAME, default_args=args) as dag:

    start = DummyOperator(
        task_id='start'
    )

    task_1 = BashOperator(
        task_id='op-1',
        bash_command=':',
        dag=dag)

    task_2 = BashOperator(
        task_id='op-2',
        bash_command=':',
        dag=dag)

    some_other_task = DummyOperator(
        task_id='some-other-task'
    )

    task_3 = BashOperator(
        task_id='op-3',
        bash_command=':',
        dag=dag)

    task_4 = BashOperator(
        task_id='op-4',
        bash_command=':',
        dag=dag)

    end = DummyOperator(
        task_id='end'
    )

    start >> [task_1, task_2] >> some_other_task >> [task_3, task_4] >> end

Como acionar DAGs filhos de um DAG pai

É possível acionar um DAG de outro com o TriggerDagRunOperator. Dependendo da versão do Airflow, o operador TriggerDagRunOperator está em um módulo diferente:

Veja o exemplo a seguir.

É possível acionar DAGs a partir de um DAG com o TriggerDagRunOperator.

Neste fluxo de trabalho, os blocos dag_1 e dag_2 representam uma série de tarefas agrupadas em um DAG separado no ambiente do Cloud Composer.

A implementação desse fluxo de trabalho requer dois arquivos DAG separados. O arquivo DAG de controle tem a seguinte aparência:

Airflow 2

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.utils.dates import days_ago

with DAG(
    dag_id="controller_dag_to_trigger_other_dags",
    default_args={"owner": "airflow"},
    start_date=days_ago(1),
    schedule_interval="@once",
) as dag:
    start = DummyOperator(
        task_id='start'
    )

    trigger_1 = TriggerDagRunOperator(
        task_id="dag_1",
        trigger_dag_id="dag-to-trigger",  # Ensure this equals the dag_id of the DAG to trigger
        conf={"message": "Hello World"}
    )
    trigger_2 = TriggerDagRunOperator(
        task_id="dag_2",
        trigger_dag_id="dag-to-trigger",  # Ensure this equals the dag_id of the DAG to trigger
        conf={"message": "Hello World"}
    )

    some_other_task = DummyOperator(
        task_id='some-other-task'
    )

    end = DummyOperator(
        task_id='end'
    )

    start >> trigger_1 >> some_other_task >> trigger_2 >> end

Airflow 1

from airflow import DAG
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago

with DAG(
    dag_id="controller_dag_to_trigger_other_dags",
    default_args={"owner": "airflow"},
    start_date=days_ago(1),
    schedule_interval="@once",
) as dag:
    start = DummyOperator(
        task_id='start'
    )

    trigger_1 = TriggerDagRunOperator(
        task_id="dag_1",
        trigger_dag_id="dag-to-trigger",  # Ensure this equals the dag_id of the DAG to trigger
        conf={"message": "Hello World"}
    )
    trigger_2 = TriggerDagRunOperator(
        task_id="dag_2",
        trigger_dag_id="dag-to-trigger",  # Ensure this equals the dag_id of the DAG to trigger
        conf={"message": "Hello World"}
    )

    some_other_task = DummyOperator(
        task_id='some-other-task'
    )

    end = DummyOperator(
        task_id='end'
    )

    start >> trigger_1 >> some_other_task >> trigger_2 >> end

A implementação do DAG filho, que é acionada pelo DAG de controle, tem a seguinte aparência:

Airflow 2

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago

DAG_NAME = 'dag-to-trigger'

args = {'owner': 'airflow', 'start_date': days_ago(1), 'schedule_interval': "None"}

with DAG(dag_id=DAG_NAME, default_args=args) as dag:

    dag_task = DummyOperator(
        task_id='dag-task'
    )

Airflow 1

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago

DAG_NAME = 'dag-to-trigger'

args = {'owner': 'airflow', 'start_date': days_ago(1), 'schedule_interval': "None"}

with DAG(dag_id=DAG_NAME, default_args=args) as dag:
    dag_task = DummyOperator(
        task_id='dag-task'
    )

É preciso fazer upload dos dois arquivos DAG no ambiente do Cloud Composer para que o fluxo de trabalho funcione.

Como agrupar tarefas com o operador TaskGroup

No Airflow 2, é possível usar o operador TaskGroup para agrupar tarefas no DAG. As tarefas definidas em um bloco TaskGroup ainda fazem parte do DAG principal.

Veja o exemplo a seguir.

As tarefas podem ser agrupadas visualmente na IU com o operador TaskGroup no Airflow 2

As tarefas op-1 e op-2 são agrupadas em um bloco com o ID taskgroup_1. Uma implementação desse fluxo de trabalho se parece com o seguinte código:

from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago
from airflow.utils.task_group import TaskGroup

with DAG(dag_id="taskgroup_example", start_date=days_ago(1)) as dag:
    start = DummyOperator(task_id="start")

    with TaskGroup("taskgroup_1", tooltip="task group #1") as section_1:
        task_1 = BashOperator(task_id="op-1", bash_command=':')
        task_2 = BashOperator(task_id="op-2", bash_command=':')

    with TaskGroup("taskgroup_2", tooltip="task group #2") as section_2:
        task_3 = BashOperator(task_id="op-3", bash_command=':')
        task_4 = BashOperator(task_id="op-4", bash_command=':')

    some_other_task = DummyOperator(
        task_id='some-other-task'
    )

    end = DummyOperator(task_id='end')

    start >> section_1 >> some_other_task >> section_2 >> end