Agrupe tarefas em DAGs

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Esta página descreve como pode agrupar tarefas nos seus pipelines do Airflow usando os seguintes padrões de design:

  • Agrupar tarefas no gráfico DAG.
  • Acionar DAGs secundários a partir de um DAG principal.
  • Agrupar tarefas com o operador TaskGroup.

Agrupe tarefas no gráfico DAG

Para agrupar tarefas em determinadas fases do seu pipeline, pode usar relações entre as tarefas no ficheiro DAG.

Considere o seguinte exemplo:

O gráfico de tarefas do Airflow que mostra tarefas de ramificação
Figura 1. As tarefas podem ser agrupadas num DAG do Airflow (clique para aumentar)

Neste fluxo de trabalho, as tarefas op-1 e op-2 são executadas em conjunto após a tarefa inicial start. Pode fazê-lo agrupando tarefas com a declaração start >> [task_1, task_2].

O exemplo seguinte apresenta uma implementação completa deste DAG:

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago

DAG_NAME = "all_tasks_in_one_dag"

args = {"owner": "airflow", "start_date": days_ago(1), "schedule_interval": "@once"}

with DAG(dag_id=DAG_NAME, default_args=args) as dag:
    start = DummyOperator(task_id="start")

    task_1 = BashOperator(task_id="op-1", bash_command=":", dag=dag)

    task_2 = BashOperator(task_id="op-2", bash_command=":", dag=dag)

    some_other_task = DummyOperator(task_id="some-other-task")

    task_3 = BashOperator(task_id="op-3", bash_command=":", dag=dag)

    task_4 = BashOperator(task_id="op-4", bash_command=":", dag=dag)

    end = DummyOperator(task_id="end")

    start >> [task_1, task_2] >> some_other_task >> [task_3, task_4] >> end

Acione DAGs filhos a partir de um DAG principal

Pode acionar um DAG a partir de outro DAG com o operador TriggerDagRunOperator.

Considere o seguinte exemplo:

O gráfico de tarefas do Airflow que mostra DAGs secundários acionados como parte de um gráfico DAG
Figura 2. Os DAGs podem ser acionados a partir de um DAG com o TriggerDagRunOperator (clique para aumentar)

Neste fluxo de trabalho, os blocos dag_1 e dag_2 representam uma série de tarefas agrupadas num DAG separado no ambiente do Cloud Composer.

A implementação deste fluxo de trabalho requer dois ficheiros DAG separados. O ficheiro DAG de controlo tem o seguinte aspeto:

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.utils.dates import days_ago


with DAG(
    dag_id="controller_dag_to_trigger_other_dags",
    default_args={"owner": "airflow"},
    start_date=days_ago(1),
    schedule_interval="@once",
) as dag:
    start = DummyOperator(task_id="start")

    trigger_1 = TriggerDagRunOperator(
        task_id="dag_1",
        trigger_dag_id="dag-to-trigger",  # Ensure this equals the dag_id of the DAG to trigger
        conf={"message": "Hello World"},
    )
    trigger_2 = TriggerDagRunOperator(
        task_id="dag_2",
        trigger_dag_id="dag-to-trigger",  # Ensure this equals the dag_id of the DAG to trigger
        conf={"message": "Hello World"},
    )

    some_other_task = DummyOperator(task_id="some-other-task")

    end = DummyOperator(task_id="end")

    start >> trigger_1 >> some_other_task >> trigger_2 >> end

A implementação do DAG filho, que é acionado pelo DAG de controlo, tem o seguinte aspeto:

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago

DAG_NAME = "dag-to-trigger"

args = {"owner": "airflow", "start_date": days_ago(1), "schedule_interval": "None"}

with DAG(dag_id=DAG_NAME, default_args=args) as dag:
    dag_task = DummyOperator(task_id="dag-task")

Tem de carregar ambos os ficheiros DAG no seu ambiente do Cloud Composer para que o DAG funcione.

Agrupar tarefas com o operador TaskGroup

Pode usar o operador TaskGroup para agrupar tarefas no seu DAG. As tarefas definidas num bloco TaskGroup continuam a fazer parte do DAG principal.

Considere o seguinte exemplo:

O gráfico de tarefas do Airflow que mostra dois grupos de tarefas
Figura 3. As tarefas podem ser agrupadas visualmente na IU com o operador TaskGroup (clique para aumentar)

As tarefas op-1 e op-2 estão agrupadas num bloco com o ID taskgroup_1. Uma implementação deste fluxo de trabalho tem o seguinte aspeto:

from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago
from airflow.utils.task_group import TaskGroup

with DAG(dag_id="taskgroup_example", start_date=days_ago(1)) as dag:
    start = DummyOperator(task_id="start")

    with TaskGroup("taskgroup_1", tooltip="task group #1") as section_1:
        task_1 = BashOperator(task_id="op-1", bash_command=":")
        task_2 = BashOperator(task_id="op-2", bash_command=":")

    with TaskGroup("taskgroup_2", tooltip="task group #2") as section_2:
        task_3 = BashOperator(task_id="op-3", bash_command=":")
        task_4 = BashOperator(task_id="op-4", bash_command=":")

    some_other_task = DummyOperator(task_id="some-other-task")

    end = DummyOperator(task_id="end")

    start >> section_1 >> some_other_task >> section_2 >> end

O que se segue?