Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
Cette page explique comment regrouper des tâches dans vos pipelines Airflow à l'aide des modèles de conception suivants :
- Regrouper des tâches dans le graphique DAG.
- Déclencher des DAG enfants à partir d'un DAG parent.
- Regrouper des tâches avec l'opérateur
TaskGroup
(uniquement avec Airflow 2).
Regrouper des tâches dans le graphique DAG
Pour regrouper des tâches dans certaines phases de votre pipeline, vous pouvez utiliser des relations entre les tâches de votre fichier de DAG.
Prenons l'exemple suivant :
Dans ce workflow, les tâches op-1
et op-2
s'exécutent ensemble après la tâche initiale start
.
Pour ce faire, regroupez des tâches avec l'instruction start >> [task_1, task_2]
.
Le code suivant fournit une mise en œuvre complète du DAG ci-dessus :
Airflow 2
Airflow 1
Déclencher des DAG enfants à partir d'un DAG parent
Vous pouvez déclencher un DAG à partir d'un autre DAG à l'aide de la commande TriggerDagRunOperator
suivante :
Selon votre version d'Airflow, vous trouverez l'opérateur TriggerDagRunOperator
dans un autre module :
- Pour Airflow 1.10.*, utilisez la
airflow.operators.dagrun_operator
de ce module. - Pour Airflow 2, utilisez le module
airflow.operators.trigger_dagrun
.
Prenons l'exemple suivant :
Dans ce workflow, les blocs dag_1
et dag_2
représentent une série de tâches regroupées dans un DAG distinct dans l'environnement Cloud Composer.
La mise en œuvre de ce workflow nécessite deux fichiers DAG distincts. Le fichier de DAG de contrôle ressemble à ceci :
Airflow 2
Airflow 1
La mise en œuvre du DAG enfant, déclenché par le DAG contrôlant, se présente comme suit :
Airflow 2
Airflow 1
Vous devez importer les deux fichiers DAG dans votre environnement Cloud Composer pour que le workflow fonctionne.
Regrouper des tâches avec l'opérateur TaskGroup
Dans Airflow 2, vous pouvez utiliser l'opérateur TaskGroup
pour regrouper des tâches dans votre DAG.
Les tâches définies dans un bloc TaskGroup
font toujours partie du DAG principal.
Prenons l'exemple suivant :
Les tâches op-1
et op-2
sont regroupées dans un bloc doté de l'ID taskgroup_1
.
Une mise en œuvre de ce workflow ressemble au code suivant :