Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
En esta página, se describe cómo puedes agrupar tareas en tus canalizaciones de Airflow con los siguientes patrones de diseño:
- Agrupación de tareas en el grafo de DAG.
- Activación de DAG secundarios desde un DAG superior.
- Agrupación de tareas con el operador
TaskGroup
(solo con Airflow 2).
Agrupación de tareas en el grafo de DAG
Para agrupar tareas en ciertas fases de tu canalización, puedes usar relaciones entre las tareas de tu archivo de DAG.
Considera el siguiente ejemplo:
En este flujo de trabajo, las tareas op-1
y op-2
se ejecutan en conjunto después de la tarea inicial start
.
Para lograrlo, agrupa las tareas junto con la declaración start >> [task_1, task_2]
.
En el código siguiente, se proporciona una implementación completa del DAG anterior:
Airflow 2
Airflow 1
Activación de DAG secundarios desde un DAG superior
Puedes activar un DAG desde otro DAG con el TriggerDagRunOperator
.
Según la versión de Airflow, puedes encontrar el operador TriggerDagRunOperator
en un módulo diferente:
- Para Airflow 1.10.*, usa el
airflow.operators.dagrun_operator
módulo. - Para Airflow 2, usa el
airflow.operators.trigger_dagrun
módulo.
Considera el siguiente ejemplo:
En este flujo de trabajo, los bloques dag_1
y dag_2
representan una serie de tareas que se agrupan en un DAG independiente en el entorno de Cloud Composer.
La implementación de este flujo de trabajo requiere dos archivos DAG separados. El archivo de DAG de control se ve de la siguiente manera:
Airflow 2
Airflow 1
La implementación del DAG secundario, que se activa mediante el DAG de control, se ve de la siguiente manera:
Airflow 2
Airflow 1
Debes subir ambos archivos de DAG en tu entorno de Cloud Composer para que el flujo de trabajo funcione.
Agrupa tareas con el operador TaskGroup
En Airflow 2, puedes usar el
Operador TaskGroup
para agrupar tareas en tu DAG.
Las tareas definidas dentro de un bloque TaskGroup
siguen siendo parte del DAG principal.
Considera el siguiente ejemplo:
Las tareas op-1
y op-2
se agrupan en un bloque con el ID taskgroup_1
.
Una implementación de este flujo de trabajo es similar al siguiente código: