Agrupar tareas dentro de los DAG

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

En esta página, se describe cómo puedes agrupar tareas en tus canalizaciones de Airflow con los siguientes patrones de diseño:

  • Agrupación de tareas en el grafo de DAG.
  • Activación de DAG secundarios desde un DAG superior.
  • Agrupación de tareas con el operador TaskGroup

Agrupación de tareas en el grafo de DAG

Para agrupar tareas en ciertas fases de tu canalización, puedes usar relaciones entre las tareas de tu archivo de DAG.

Considera el siguiente ejemplo:

Gráfico de tareas de Airflow que muestra tareas de ramificación
Figura 1: Las tareas se pueden agrupar en un DAG de Airflow (haz clic para ampliar)

En este flujo de trabajo, las tareas op-1 y op-2 se ejecutan en conjunto después de la tarea inicial start. Para lograrlo, agrupa las tareas junto con la declaración start >> [task_1, task_2].

En el siguiente ejemplo, se proporciona una implementación completa de este DAG:

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago

DAG_NAME = "all_tasks_in_one_dag"

args = {"owner": "airflow", "start_date": days_ago(1), "schedule_interval": "@once"}

with DAG(dag_id=DAG_NAME, default_args=args) as dag:
    start = DummyOperator(task_id="start")

    task_1 = BashOperator(task_id="op-1", bash_command=":", dag=dag)

    task_2 = BashOperator(task_id="op-2", bash_command=":", dag=dag)

    some_other_task = DummyOperator(task_id="some-other-task")

    task_3 = BashOperator(task_id="op-3", bash_command=":", dag=dag)

    task_4 = BashOperator(task_id="op-4", bash_command=":", dag=dag)

    end = DummyOperator(task_id="end")

    start >> [task_1, task_2] >> some_other_task >> [task_3, task_4] >> end

Activación de DAG secundarios desde un DAG superior

Puedes activar un DAG desde otro DAG con el operador TriggerDagRunOperator.

Considera el siguiente ejemplo:

El gráfico de tareas de Airflow que muestra los DAG secundarios activados como parte de un gráfico de DAG
Figura 2: Los DAG se pueden activar desde un DAG con TriggerDagRunOperator (haz clic para ampliar).

En este flujo de trabajo, los bloques dag_1 y dag_2 representan una serie de tareas que se agrupan en un DAG independiente en el entorno de Cloud Composer.

La implementación de este flujo de trabajo requiere dos archivos DAG separados. El archivo de DAG de control se ve de la siguiente manera:

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.utils.dates import days_ago


with DAG(
    dag_id="controller_dag_to_trigger_other_dags",
    default_args={"owner": "airflow"},
    start_date=days_ago(1),
    schedule_interval="@once",
) as dag:
    start = DummyOperator(task_id="start")

    trigger_1 = TriggerDagRunOperator(
        task_id="dag_1",
        trigger_dag_id="dag-to-trigger",  # Ensure this equals the dag_id of the DAG to trigger
        conf={"message": "Hello World"},
    )
    trigger_2 = TriggerDagRunOperator(
        task_id="dag_2",
        trigger_dag_id="dag-to-trigger",  # Ensure this equals the dag_id of the DAG to trigger
        conf={"message": "Hello World"},
    )

    some_other_task = DummyOperator(task_id="some-other-task")

    end = DummyOperator(task_id="end")

    start >> trigger_1 >> some_other_task >> trigger_2 >> end

La implementación del DAG secundario, que se activa mediante el DAG de control, se ve de la siguiente manera:

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago

DAG_NAME = "dag-to-trigger"

args = {"owner": "airflow", "start_date": days_ago(1), "schedule_interval": "None"}

with DAG(dag_id=DAG_NAME, default_args=args) as dag:
    dag_task = DummyOperator(task_id="dag-task")

Debes subir ambos archivos de DAG en tu entorno de Cloud Composer para que el DAG funcione.

Agrupación de tareas con el operador TaskGroup

Puedes usar el operador TaskGroup para agrupar tareas en tu DAG. Las tareas definidas dentro de un bloque TaskGroup siguen siendo parte del DAG principal.

Considera el siguiente ejemplo:

El gráfico de tareas de Airflow que muestra dos grupos de tareas
Figura 3: Las tareas se pueden agrupar visualmente en la IU con el operador TaskGroup (haz clic para ampliar)

Las tareas op-1 y op-2 se agrupan en un bloque con el ID taskgroup_1. Una implementación de este flujo de trabajo es similar al siguiente código:

from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago
from airflow.utils.task_group import TaskGroup

with DAG(dag_id="taskgroup_example", start_date=days_ago(1)) as dag:
    start = DummyOperator(task_id="start")

    with TaskGroup("taskgroup_1", tooltip="task group #1") as section_1:
        task_1 = BashOperator(task_id="op-1", bash_command=":")
        task_2 = BashOperator(task_id="op-2", bash_command=":")

    with TaskGroup("taskgroup_2", tooltip="task group #2") as section_2:
        task_3 = BashOperator(task_id="op-3", bash_command=":")
        task_4 = BashOperator(task_id="op-4", bash_command=":")

    some_other_task = DummyOperator(task_id="some-other-task")

    end = DummyOperator(task_id="end")

    start >> section_1 >> some_other_task >> section_2 >> end

¿Qué sigue?