Agrupar tarefas dentro de DAGs

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Nesta página, descrevemos como agrupar tarefas em pipelines do Airflow usando os seguintes padrões de design:

  • Agrupar tarefas no gráfico do DAG.
  • Como acionar DAGs filhos de um DAG pai.
  • Agrupar tarefas com o operador TaskGroup.

Agrupar tarefas no gráfico do DAG

Para agrupar tarefas em determinadas fases do pipeline, use as relações entre as tarefas no arquivo DAG.

Veja o exemplo a seguir.

Gráfico de tarefas do Airflow mostrando tarefas de ramificação
Figura 1. As tarefas podem ser agrupadas em um DAG do Airflow (clique para ampliar)

Neste fluxo de trabalho, as tarefas op-1 e op-2 são executadas juntas após a tarefa inicial start. Para isso, agrupe as tarefas com a instrução start >> [task_1, task_2].

O exemplo a seguir fornece uma implementação completa deste DAG:

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago

DAG_NAME = "all_tasks_in_one_dag"

args = {"owner": "airflow", "start_date": days_ago(1), "schedule_interval": "@once"}

with DAG(dag_id=DAG_NAME, default_args=args) as dag:
    start = DummyOperator(task_id="start")

    task_1 = BashOperator(task_id="op-1", bash_command=":", dag=dag)

    task_2 = BashOperator(task_id="op-2", bash_command=":", dag=dag)

    some_other_task = DummyOperator(task_id="some-other-task")

    task_3 = BashOperator(task_id="op-3", bash_command=":", dag=dag)

    task_4 = BashOperator(task_id="op-4", bash_command=":", dag=dag)

    end = DummyOperator(task_id="end")

    start >> [task_1, task_2] >> some_other_task >> [task_3, task_4] >> end

Acionar DAGs filhos de um DAG pai

É possível acionar um DAG de outro com o operador TriggerDagRunOperator.

Veja o exemplo a seguir.

Gráfico das tarefas do Airflow mostrando DAGs filhos acionados como parte de um gráfico de DAG
Figura 2. É possível acionar DAGs a partir de um DAG com o TriggerDagRunOperator (clique para ampliar)

Neste fluxo de trabalho, os blocos dag_1 e dag_2 representam uma série de tarefas agrupadas em um DAG separado no ambiente do Cloud Composer.

A implementação desse fluxo de trabalho requer dois arquivos DAG separados. O arquivo DAG de controle tem a seguinte aparência:

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.utils.dates import days_ago


with DAG(
    dag_id="controller_dag_to_trigger_other_dags",
    default_args={"owner": "airflow"},
    start_date=days_ago(1),
    schedule_interval="@once",
) as dag:
    start = DummyOperator(task_id="start")

    trigger_1 = TriggerDagRunOperator(
        task_id="dag_1",
        trigger_dag_id="dag-to-trigger",  # Ensure this equals the dag_id of the DAG to trigger
        conf={"message": "Hello World"},
    )
    trigger_2 = TriggerDagRunOperator(
        task_id="dag_2",
        trigger_dag_id="dag-to-trigger",  # Ensure this equals the dag_id of the DAG to trigger
        conf={"message": "Hello World"},
    )

    some_other_task = DummyOperator(task_id="some-other-task")

    end = DummyOperator(task_id="end")

    start >> trigger_1 >> some_other_task >> trigger_2 >> end

A implementação do DAG filho, que é acionada pelo DAG de controle, tem a seguinte aparência:

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago

DAG_NAME = "dag-to-trigger"

args = {"owner": "airflow", "start_date": days_ago(1), "schedule_interval": "None"}

with DAG(dag_id=DAG_NAME, default_args=args) as dag:
    dag_task = DummyOperator(task_id="dag-task")

É preciso fazer upload dos dois arquivos DAG no ambiente do Cloud Composer para que o DAG funcione.

Como agrupar tarefas com o operador TaskGroup

É possível usar o operador TaskGroup para agrupar tarefas no DAG. As tarefas definidas em um bloco TaskGroup ainda fazem parte do DAG principal.

Veja o exemplo a seguir.

Gráfico de tarefas do Airflow mostrando dois grupos de tarefas
Figura 3. As tarefas podem ser agrupadas visualmente na IU com o operador TaskGroup (clique para ampliar)

As tarefas op-1 e op-2 são agrupadas em um bloco com o ID taskgroup_1. Uma implementação desse fluxo de trabalho se parece com o seguinte código:

from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago
from airflow.utils.task_group import TaskGroup

with DAG(dag_id="taskgroup_example", start_date=days_ago(1)) as dag:
    start = DummyOperator(task_id="start")

    with TaskGroup("taskgroup_1", tooltip="task group #1") as section_1:
        task_1 = BashOperator(task_id="op-1", bash_command=":")
        task_2 = BashOperator(task_id="op-2", bash_command=":")

    with TaskGroup("taskgroup_2", tooltip="task group #2") as section_2:
        task_3 = BashOperator(task_id="op-3", bash_command=":")
        task_4 = BashOperator(task_id="op-4", bash_command=":")

    some_other_task = DummyOperator(task_id="some-other-task")

    end = DummyOperator(task_id="end")

    start >> section_1 >> some_other_task >> section_2 >> end

A seguir